kt.log

Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python

Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python では、 Azure Managed Instance for Apache Cassandra クラスターのテーブルに対して、 Azure Databricks クラスターから Python でデータを INSERT する方法を解説しました。

本記事では、 Azure Managed Instance for Apache Cassandra から Extract したデータを Azure Databricks で Transform し、 Cassandra に Load する方法について解説します。 Transform したデータを Cassandra の別テーブルに INSERT することをゴールとします。

前提

免責

  • 本記事では処理の厳密さを追求していないため、INSERT の際にエラーが発生した場合は無視をして INSERT を継続することとしています。

手順

本記事で行う Transform 処理についてですが、今回は単純に、既存レコードからハッシュ値を計算して、各レコードに付加する処理とします。 先の記事 で扱ったデータにはユニークなキーが無いため、ハッシュ値をもってキーとするのがコンセプトです。

Extract

Azure Managed Instance for Apache Cassandra に INSERT 済みの NOAA Global Forecast System データを SELECT してきます。ここでは特定のレコード5件に絞って実行します。(当該データの場合、ある latitude と longitude の組み合わせにかかる1日分のデータは93件になります。)

CQL の作成

1
2
3
4
5
6
7
8
9
10
keyspace_name = "_".join([os.environ['CASSANDRA_KEYSPACE_NAME'],
os.environ['CASSANDRA_KEYSPACE_NAME_PREFIX']])
table_name = os.environ['CASSANDRA_TABLE_NAME_FOR_SOURCE_DATA']

target_latitude = -13
target_longitude = 269.5
select_limit = 5

query_select_data = "SELECT * FROM {}.{} WHERE latitude = {} AND longitude = {} LIMIT {};"\
.format(keyspace_name, table_name, target_latitude, target_longitude, select_limit)

SELECT の実行

1
2
3
4
from cassandra.query import ordered_dict_factory

cassandra_session.row_factory = ordered_dict_factory
rows = cassandra_session.execute(query_select_data)

Transform

Spark DataFrame への変換

1
df = spark.createDataFrame(rows.all())

DataFrame の中身は以下の通りです。

UDF を使用した Transform

Spark では UDF (User Defined Functions) が使用できます。今回はデータ加工処理を Spark 側で UDF として定義し、それを使って処理を実行することで、 Spark の並行処理を活かしたいと思います。

UDF の定義

当該データは latitude, longitude, year, month, day, forecasthour の組み合わせでユニークになりますので、これらを元に sha256 ハッシュを生成します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import hashlib

_udf = lambda lat, lon, yr, mo, d, hr: hashlib.sha256(
"".join([
str(lat),
str(lon),
str(yr),
str(mo),
str(d),
str(hr),
]).encode("utf-8")
).hexdigest()
udf = F.udf(_udf, StringType())

定義した UDF を使った処理の実行

UDF を使って処理した結果は、元の df に新しいカラム sha256hash として連結します。そしてそれを new_df という新しい DataFrame として扱います。

1
2
3
new_df = df.withColumn("sha256hash", udf("latitude", "longitude", "year", "month", "day", "forecasthour"))
new_df = new_df.select("sha256hash", "latitude", "longitude", "year", "month", "day", "forecasthour", "currentdatetime",
"precipitablewaterentireatmosphere", "sealvlpressure", "temperature", "totalcloudcoverconvectivecloud", "windspeedgustsurface")

new_df のスキーマは以下の通りになります。

1
2
3
4
5
6
7
8
9
10
11
12
13
sha256hash:string
latitude:double
longitude:double
year:long
month:long
day:long
forecasthour:long
currentdatetime:timestamp
precipitablewaterentireatmosphere:double
sealvlpressure:double
temperature:double
totalcloudcoverconvectivecloud:double
windspeedgustsurface:double

また、 new_df のデータは以下のようになりました。

Load

new_df を Azure Managed Instance for Apache Cassandra クラスターの新しいテーブルに INSERT します。

テーブルの作成

この段階ではまだ新しいテーブルが存在しないため、先の記事 と同じ要領で new_df 用の新しいテーブルを作成します。

新しいテーブル名は予め環境変数で以下のように定義しています。

1
CASSANDRA_TABLE_NAME_FOR_DESTINATION_DATA=noaa_gfs_weather_with_sha256hash

Keyspace 名 keyspace_name およびその他環境変数は、先の記事と同じものを使い回します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
table_name = os.environ['CASSANDRA_TABLE_NAME_FOR_DESTINATION_DATA']
c = new_df.columns

query_create_table = "CREATE TABLE IF NOT EXISTS {}.{} (\
{} ascii,\
{} float,\
{} float,\
{} smallint,\
{} tinyint,\
{} tinyint,\
{} smallint,\
{} timestamp,\
{} double,\
{} double,\
{} double,\
{} double,\
{} double,\
PRIMARY KEY(({}), {}, {}, {}, {}, {}, {})\
) WITH CLUSTERING ORDER BY ({} DESC, {} DESC, {} DESC, {} DESC, {} DESC, {} DESC)\
AND comment = '{}'\
AND gc_grace_seconds = {};\
".format(keyspace_name, table_name,
c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7], c[8], c[9], c[10], c[11], c[12],
c[0], c[1], c[2], c[3], c[4], c[5], c[6],
c[1], c[2], c[3], c[4], c[5], c[6],
"This table is for NOAA Global Forecast System (GFS) data from Azure Open Datasets.",
os.environ['CASSANDRA_TABLE_GC_GRACE_SECONDS'])
1
cassandra_session.execute(query_create_table)

データの INSERT

こちらも先の記事と同じ要領で、新しいテーブルに対して INSERT をします。

プリペアドステートメントの作成

1
2
3
4
5
6
prepared_statement = "INSERT INTO {}.{} \
({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {})\
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\
".format(keyspace_name, table_name,
c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7], c[8], c[9], c[10], c[11], c[12])
prepared_session = cassandra_session.prepare(prepared_statement)

INSERT の実行

1
2
3
4
5
6
7
dataCollect = new_df.rdd.toLocalIterator()
for row in dataCollect:
try:
cassandra_session.execute(prepared_session, (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]))
except Exception as e:
print(e)
pass

結果の確認

まとめ

Azure Managed Instance for Apache Cassandra から Extract したデータを Azure Databricks で Transform し、 Cassandra に Load する方法について解説しました。特に Transform にあたっては、 Spark の UDF は強力な存在と言えると思います。
一方で、 Load の部分で Spark のメリットが活かせていないのですが、これは先の記事でも触れた通りです。この話は別の記事で触れたいと思います。

See also