kt.log

Create Azure Managed Instance for Apache Cassandra schema on Azure Databricks in Python

前の記事 で、 Azure Databricks から Azure Managed Instance for Apache Cassandra に接続する方法について解説しました。

本記事では、当該 Cassandra クラスターに対して、 Azure Databricks の Notebook 上で Python を使ってスキーマを作成する方法について解説します。スキーマの作成に成功することをゴールとします。

スキーマ情報は Azure Open Datasets で提供されている NOAA Global Forecast System のものを利用します。

前提

免責

  • 本記事ではスキーマの作成をゴールとしているため、 PRIMARY KEY の持たせ方等のスキーマの妥当性については無視しています。

手順

  • コードは Azure Databricks クラスターの Notebook で実行します。
  • 以下、全般的にパラメタを環境変数で定義して利用しています。 Azure Databricks ではクラスターに対して環境変数を設定することができ、その操作はクラスターの管理画面から行います。今回設定した環境変数は以下の通りです。
1
2
3
4
5
6
7
8
9
10
11
12
CASSANDRA_KEYSPACE_REPLICASION_FACTOR=1
CASSANDRA_KEYSPACE_REPLICASION_CLASS=SimpleStrategy
CASSANDRA_KEYSPACE_NAME_PREFIX=experiment
CASSANDRA_QUERY_SELECT_RELEASE_VERSION="SELECT release_version FROM system.local;"
CASSANDRA_TABLE_NAME_FOR_SOURCE_DATA=noaa_gfs_weather
CASSANDRA_KEYSPACE_NAME=noaa_gfs_weather
OPENDATASETS_NOAAGFSWEATHER_START_DATE="2018-12-20"
OPENDATASETS_NOAAGFSWEATHER_END_DATE="2018-12-20"
CASSANDRA_AUTH_USERNAME=cassandra_user
CASSANDRA_CONTACT_POINTS=10.0.0.5,10.0.0.6,10.0.0.7
CASSANDRA_AUTH_PASSWORD=my_secret_password
CASSANDRA_TABLE_GC_GRACE_SECONDS=360

ライブラリをインポートする

1
2
3
4
import os
import ssl
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

Azure Managed Instance for Apache Cassandra との間でセッションを張る

1
2
3
4
5
6
7
contact_points = os.environ['CASSANDRA_CONTACT_POINTS'].split(',')
auth_provider = PlainTextAuthProvider(username=os.environ['CASSANDRA_AUTH_USERNAME'],
password=os.environ['CASSANDRA_AUTH_PASSWORD'])
cassandra_cluster = Cluster(contact_points,
auth_provider=auth_provider,
ssl_context=ssl.SSLContext())
cassandra_session = cassandra_cluster.connect()

CREATE TABLE の CQL クエリを作成する

データを取得しスキーマ情報を得る

1
2
3
4
5
6
7
8
9
from azureml.opendatasets import NoaaGfsWeather
from dateutil import parser

start_date = parser.parse(os.environ['OPENDATASETS_NOAAGFSWEATHER_START_DATE'])
end_date = parser.parse(os.environ['OPENDATASETS_NOAAGFSWEATHER_END_DATE'])
gfs = NoaaGfsWeather(start_date, end_date)
gfs_df = gfs.to_spark_dataframe()

c = gfs_df.columns

Keyspace と Table の名前を定義する

1
2
3
keyspace_name = "_".join([os.environ['CASSANDRA_KEYSPACE_NAME'],
os.environ['CASSANDRA_KEYSPACE_NAME_PREFIX']])
table_name = os.environ['CASSANDRA_TABLE_NAME_FOR_SOURCE_DATA']

Keyspace を作成する CQL クエリを作成する

1
2
3
4
5
6
7
8
query_create_keyspace = "".join([
"CREATE KEYSPACE IF NOT EXISTS ",
keyspace_name,
" WITH REPLICATION = {",
"'class':'{}', 'replication_factor':{}".format(os.environ['CASSANDRA_KEYSPACE_REPLICASION_CLASS'],
os.environ['CASSANDRA_KEYSPACE_REPLICASION_FACTOR']),
"};",
])

Table を作成する CQL クエリを作成する

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
query_create_table = "CREATE TABLE IF NOT EXISTS {}.{} (\
{} timestamp,\
{} smallint,\
{} float,\
{} float,\
{} double,\
{} double,\
{} double,\
{} double,\
{} double,\
{} double,\
{} smallint,\
{} tinyint,\
{} tinyint,\
PRIMARY KEY(({}, {}, {}), {}, {}, {})\
) WITH CLUSTERING ORDER BY ({} DESC, {} DESC, {} DESC)\
AND comment = '{}'\
AND gc_grace_seconds = {};\
".format(keyspace_name, table_name,
c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7], c[8], c[9], c[10], c[11], c[12],
c[10], c[11], c[12], c[1], c[2], c[3],
c[1], c[2], c[3],
"This table is for NOAA Global Forecast System (GFS) data from Azure Open Datasets.",
os.environ['CASSANDRA_TABLE_GC_GRACE_SECONDS'])

参考までに、リスト c の要素と番号との対応関係は以下の通りになります。

1
2
3
4
5
6
7
8
9
10
11
12
13
c[0]: currentDatetime:timestamp
c[1]: forecastHour:integer
c[2]: latitude:double
c[3]: longitude:double
c[4]: precipitableWaterEntireAtmosphere:double
c[5]: seaLvlPressure:double
c[6]: snowDepthSurface:double
c[7]: temperature:double
c[8]: windSpeedGustSurface:double
c[9]: totalCloudCoverConvectiveCloud:double
c[10]: year:integer
c[11]: month:integer
c[12]: day:integer

CREATE TABLE を実行する

Keyspace の作成を実行する

1
cassandra_session.execute(query_create_keyspace)

Table の作成を実行する

1
cassandra_session.execute(query_create_table)

それぞれ成功すればOKです。

まとめ

Azure Managed Instance for Apache Cassandra クラスターに対して、 Azure Databricks の Notebook 上で Python を使ってスキーマを作成する方法について解説しました。
今回実行した CQL は文字列で記述したものですが、実際にプロダクトとして実装していく際にはセキュリティと生産性の観点から、O/Rマッパーや Query ビルダーを利用することをお勧めします。 Python の CQL Query ビルダーとしては こちら のようなものがあります。
本記事ではスキーマ作成までしかしておりません。より実践的な使い方については追って記事にしたいと思います。