Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python
本記事では、同じ環境において、作成したテーブルに対してデータを INSERT する方法について解説します。 INSERT がエラー無く成功することをゴールとします。なお、データには引き続き Azure Open Datasets の NOAA Global Forecast System を使用します。
前提
- Microsoft Azure に利用可能なサブスクリプションを持っている
- Create Azure Managed Instance for Apache Cassandra schema on Azure Databricks in Python の手順に沿って Azure Managed Instance for Apache Cassandra クラスター上にテーブルが作成されている
- Azure Managed Instance for Apache Cassandra との間で確立したセッションが
cassandra_session
変数に格納されている - Azure Open Datasets から取得した NOAA Global Forecast System のオープンデータが Spark DataFrame 形式で
gfs_df
変数に格納されている gfs_df
のカラムのリストがc
変数に格納されている
免責
- 本記事では処理の厳密さを追求していないため、INSERT の際にエラーが発生した場合は無視をして INSERT を継続することとしています。
手順
フォーマット変換
gfs_df
の currentDatetime
列について、 Spark DataFrame も Cassandra テーブルも型は timestamp
です。しかし、前者と後者とでは date と time との間に T
が入るかどうかに違いがあり、前者を後者にそのまま INSERT しようとするとエラーになります。そのためのフォーマット変換を以下のように行います。
1 | from pyspark.sql.types import TimestampType |
変換後の DataFrame は新たに df
という変数に代入しました。
型自体は timestamp
のまま変わっていないこと、 T
の文字が除去されていることを、以下のように確認しましょう。
1 | df.printSchema() |
1 | root |
1 | df.show(5) |
1 | +-------------------+------------+--------+---------+---------------------------------+--------------+------------------+------------------+--------------------+------------------------------+----+-----+---+ |
プリペアドステートメントの作成と登録
Cassandra でもプリペアドステートメントが利用可能です。以下のように記述して実行します。
1 | prepared_statement = "INSERT INTO {}.{} \ |
一般的な話ですが、プリペアドステートメント自体は文字列で定義し、データベースに登録を行い、 ?
の箇所に後から値を挿入します。
INSERT の実行
Spark DataFrame df
を1行ずつ、プリペアドステートメントを使って Cassandra のテーブルに INSERT します。
1 | dataCollect = df.rdd.toLocalIterator() |
df.rdd.toLocalIterator()
は df.collect()
と記述しても同様にイテレーション処理をすることができます。
INSERT 結果の確認
INSERT の完了には時間がかかります。今回、 gfs_df
は NOAA Global Forecast System のデータ わずか1日分でしたが、概算で約1GB程の大きさがあります。 INSERT 完了まで、筆者の環境では11時間20分の時間を要しました。
結果はテーブルの行数で確認します。変数 keyspace_name
table_name
は、それぞれ Azure Managed Instance for Apache Cassandra に作成した Keyspace と テーブルの名前です。
1 | print(session.execute("SELECT COUNT(*) FROM {}.{}".format(keyspace_name, table_name)).one()) |
筆者の環境では、上記のコードの実行は OperationTimedOut
となってしまいましたが、実際にレコードを SELECT することで、データが INSERT されていることを確認しています。
まとめ
Azure Managed Instance for Apache Cassandra クラスター上に作成したテーブルに対して、 Azure Databricks クラスターから Python でデータを INSERT する方法について解説しました。
本当は df
をイテレーションするのではなく、データセットごと Cassandra にインポートするか、 Spark にイテレーション実行の並行処理をさせたいところですが、特に後者に関してはできないようですので、別の記事で解説をしたいと思います。