kt.log

Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python

前の記事 では Azure Managed Instance for Apache Cassandra クラスターでのスキーマ作成を、 Azure Databricks クラスターから Python で実行する方法について解説しました。

本記事では、同じ環境において、作成したテーブルに対してデータを INSERT する方法について解説します。 INSERT がエラー無く成功することをゴールとします。なお、データには引き続き Azure Open Datasets の NOAA Global Forecast System を使用します。

前提

  • Microsoft Azure に利用可能なサブスクリプションを持っている
  • Create Azure Managed Instance for Apache Cassandra schema on Azure Databricks in Python の手順に沿って Azure Managed Instance for Apache Cassandra クラスター上にテーブルが作成されている
  • Azure Managed Instance for Apache Cassandra との間で確立したセッションが cassandra_session 変数に格納されている
  • Azure Open Datasets から取得した NOAA Global Forecast System のオープンデータが Spark DataFrame 形式で gfs_df 変数に格納されている
  • gfs_df のカラムのリストが c 変数に格納されている

免責

  • 本記事では処理の厳密さを追求していないため、INSERT の際にエラーが発生した場合は無視をして INSERT を継続することとしています。

手順

フォーマット変換

gfs_dfcurrentDatetime 列について、 Spark DataFrame も Cassandra テーブルも型は timestamp です。しかし、前者と後者とでは date と time との間に T が入るかどうかに違いがあり、前者を後者にそのまま INSERT しようとするとエラーになります。そのためのフォーマット変換を以下のように行います。

1
2
3
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import unix_timestamp
df = gfs_df.withColumn("currentDatetime", unix_timestamp("currentDatetime", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX").cast(TimestampType()))

変換後の DataFrame は新たに df という変数に代入しました。

型自体は timestamp のまま変わっていないこと、 T の文字が除去されていることを、以下のように確認しましょう。

1
df.printSchema()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
root
|-- currentDatetime: timestamp (nullable = true)
|-- forecastHour: integer (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- precipitableWaterEntireAtmosphere: double (nullable = true)
|-- seaLvlPressure: double (nullable = true)
|-- snowDepthSurface: double (nullable = true)
|-- temperature: double (nullable = true)
|-- windSpeedGustSurface: double (nullable = true)
|-- totalCloudCoverConvectiveCloud: double (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
1
df.show(5)
1
2
3
4
5
6
7
8
9
10
+-------------------+------------+--------+---------+---------------------------------+--------------+------------------+------------------+--------------------+------------------------------+----+-----+---+
| currentDatetime|forecastHour|latitude|longitude|precipitableWaterEntireAtmosphere|seaLvlPressure| snowDepthSurface| temperature|windSpeedGustSurface|totalCloudCoverConvectiveCloud|year|month|day|
+-------------------+------------+--------+---------+---------------------------------+--------------+------------------+------------------+--------------------+------------------------------+----+-----+---+
|2018-12-20 00:00:00| 0| -90.0| 79.0| 3.548314332962036| 71160.9765625|1.0099999904632568| 260.6067810058594| 12.820813179016113| null|2018| 12| 20|
|2018-12-20 00:00:00| 0| -90.0| 268.0| 3.548314332962036| 71160.9765625|1.0099999904632568| 260.6067810058594| 12.820813179016113| null|2018| 12| 20|
|2018-12-20 00:00:00| 0| -89.5| 36.5| 3.4483141899108887| 70757.7734375|1.0099999904632568| 258.6067810058594| 12.620813369750977| null|2018| 12| 20|
|2018-12-20 00:00:00| 0| -89.5| 43.0| 3.3483142852783203| 70597.7734375|1.0099999904632568| 258.3067932128906| 12.720812797546387| null|2018| 12| 20|
|2018-12-20 00:00:00| 0| -89.5| 144.0| 3.248314380645752| 69701.7734375|1.0099999904632568|259.50677490234375| 12.620813369750977| null|2018| 12| 20|
+-------------------+------------+--------+---------+---------------------------------+--------------+------------------+------------------+--------------------+------------------------------+----+-----+---+
only showing top 5 rows

プリペアドステートメントの作成と登録

Cassandra でもプリペアドステートメントが利用可能です。以下のように記述して実行します。

1
2
3
4
5
6
prepared_statement = "INSERT INTO {}.{} \
({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})\
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\
".format(keyspace_name, table_name,
c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7], c[8], c[9], c[10], c[11], c[12])
prepared_session = cassandra_session.prepare(prepared_statement)

一般的な話ですが、プリペアドステートメント自体は文字列で定義し、データベースに登録を行い、 ? の箇所に後から値を挿入します。

INSERT の実行

Spark DataFrame df を1行ずつ、プリペアドステートメントを使って Cassandra のテーブルに INSERT します。

1
2
3
4
5
6
7
8
dataCollect = df.rdd.toLocalIterator()
for row in dataCollect:
try:
cassandra_session.execute(prepared_session,
(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]))
except Exception as e:
print(e)
pass

df.rdd.toLocalIterator()df.collect() と記述しても同様にイテレーション処理をすることができます。

INSERT 結果の確認

INSERT の完了には時間がかかります。今回、 gfs_df は NOAA Global Forecast System のデータ わずか1日分でしたが、概算で約1GB程の大きさがあります。 INSERT 完了まで、筆者の環境では11時間20分の時間を要しました。

結果はテーブルの行数で確認します。変数 keyspace_name table_name は、それぞれ Azure Managed Instance for Apache Cassandra に作成した Keyspace と テーブルの名前です。

1
print(session.execute("SELECT COUNT(*) FROM {}.{}".format(keyspace_name, table_name)).one())

筆者の環境では、上記のコードの実行は OperationTimedOut となってしまいましたが、実際にレコードを SELECT することで、データが INSERT されていることを確認しています。

まとめ

Azure Managed Instance for Apache Cassandra クラスター上に作成したテーブルに対して、 Azure Databricks クラスターから Python でデータを INSERT する方法について解説しました。
本当は df をイテレーションするのではなく、データセットごと Cassandra にインポートするか、 Spark にイテレーション実行の並行処理をさせたいところですが、特に後者に関してはできないようですので、別の記事で解説をしたいと思います。

See also