kt.log

Failure pattern on parallelization with PySpark

Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in PythonTransform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python の まとめ にて、並行処理に関するチャレンジに関して一言記述しました。

上記記事では成功するアプローチで執筆していますが、本記事では、うまくいかなかったアプローチについて紹介したいと思います。

前提

免責

  • 筆者の環境やアプローチではうまくいかなかったのですが、何らかの工夫をすることで、本記事のアプローチを成功に導ける可能性はあり得ます。

解説

Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python では、Spark を利用したハッシュ計算の並行処理について、 UDF を利用する形で、以下のアプローチをとりました。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import hashlib

df = spark.createDataFrame(rows.all())

_udf = lambda lat, lon, yr, mo, d, hr: hashlib.sha256(
"".join([
str(lat),
str(lon),
str(yr),
str(mo),
str(d),
str(hr),
]).encode("utf-8")
).hexdigest()
udf = F.udf(_udf, StringType())

new_df = df.withColumn("sha256hash", udf("latitude", "longitude", "year", "month", "day", "forecasthour"))

筆者は当初 UDF を使わず、関数型言語的な考えで、以下のような処理を試みました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql import Row
df = spark.sparkContext.parallelize(rows.all()).map(lambda x: Row(x)).toDF()

import hashlib
hashes = df.rdd.map(lambda x: \
hashlib.sha256(
"".join([
str(x[0]['latitude']),
str(x[0]['longitude']),
str(x[0]['year']),
str(x[0]['month']),
str(x[0]['day']),
str(x[0]['forecasthour']),
]).encode("utf-8")
).hexdigest()
).collect()

new_df = df.withColumn("sha256hash", hashes)

結果としてこちらはうまくいきませんでした。 x[0]['year'], x[0]['month'], x[0]['day'], x[0]['forecasthour'] の値が None になってしまい、結果として hashes のリスト内にあるハッシュ値は全て同じ値となってしまいました。(なお、当該記事では latitudelongitude はある値を決め打ちで指定しています。)
色々と試行錯誤して検証しましたが、 Row(x) が float と double 以外の型を扱えないのが原因であると結論づけました。

まとめ

Spark の UDF を使わずに spark.sparkContext.parallelize を使って関数型言語的アプローチをとった結果、うまくいかなかったパターンについてご紹介しました。
もしこれを改善できるアイディアがあれば、共有いただけると幸いです。

See also