Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python
本記事では、 Azure Managed Instance for Apache Cassandra から Extract したデータを Azure Databricks で Transform し、 Cassandra に Load する方法について解説します。 Transform したデータを Cassandra の別テーブルに INSERT することをゴールとします。
前提
- Microsoft Azure に利用可能なサブスクリプションを持っている
- その他、 Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python における前提と同様
免責
- 本記事では処理の厳密さを追求していないため、INSERT の際にエラーが発生した場合は無視をして INSERT を継続することとしています。
手順
本記事で行う Transform 処理についてですが、今回は単純に、既存レコードからハッシュ値を計算して、各レコードに付加する処理とします。 先の記事 で扱ったデータにはユニークなキーが無いため、ハッシュ値をもってキーとするのがコンセプトです。
Extract
Azure Managed Instance for Apache Cassandra に INSERT 済みの NOAA Global Forecast System データを SELECT してきます。ここでは特定のレコード5件に絞って実行します。(当該データの場合、ある latitude と longitude の組み合わせにかかる1日分のデータは93件になります。)
CQL の作成
1 | keyspace_name = "_".join([os.environ['CASSANDRA_KEYSPACE_NAME'], |
SELECT の実行
1 | from cassandra.query import ordered_dict_factory |
Transform
Spark DataFrame への変換
1 | df = spark.createDataFrame(rows.all()) |
DataFrame の中身は以下の通りです。
UDF を使用した Transform
Spark では UDF (User Defined Functions) が使用できます。今回はデータ加工処理を Spark 側で UDF として定義し、それを使って処理を実行することで、 Spark の並行処理を活かしたいと思います。
UDF の定義
当該データは latitude
, longitude
, year
, month
, day
, forecasthour
の組み合わせでユニークになりますので、これらを元に sha256 ハッシュを生成します。
1 | from pyspark.sql.types import StringType |
定義した UDF を使った処理の実行
UDF を使って処理した結果は、元の df
に新しいカラム sha256hash
として連結します。そしてそれを new_df
という新しい DataFrame として扱います。
1 | new_df = df.withColumn("sha256hash", udf("latitude", "longitude", "year", "month", "day", "forecasthour")) |
new_df
のスキーマは以下の通りになります。
1 | sha256hash:string |
また、 new_df
のデータは以下のようになりました。
Load
new_df
を Azure Managed Instance for Apache Cassandra クラスターの新しいテーブルに INSERT します。
テーブルの作成
この段階ではまだ新しいテーブルが存在しないため、先の記事 と同じ要領で new_df
用の新しいテーブルを作成します。
新しいテーブル名は予め環境変数で以下のように定義しています。
1 | CASSANDRA_TABLE_NAME_FOR_DESTINATION_DATA=noaa_gfs_weather_with_sha256hash |
Keyspace 名 keyspace_name
およびその他環境変数は、先の記事と同じものを使い回します。
1 | table_name = os.environ['CASSANDRA_TABLE_NAME_FOR_DESTINATION_DATA'] |
1 | cassandra_session.execute(query_create_table) |
データの INSERT
こちらも先の記事と同じ要領で、新しいテーブルに対して INSERT をします。
プリペアドステートメントの作成
1 | prepared_statement = "INSERT INTO {}.{} \ |
INSERT の実行
1 | dataCollect = new_df.rdd.toLocalIterator() |
結果の確認
まとめ
Azure Managed Instance for Apache Cassandra から Extract したデータを Azure Databricks で Transform し、 Cassandra に Load する方法について解説しました。特に Transform にあたっては、 Spark の UDF は強力な存在と言えると思います。
一方で、 Load の部分で Spark のメリットが活かせていないのですが、これは先の記事でも触れた通りです。この話は別の記事で触れたいと思います。