Failure pattern on parallelization with PySpark
Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python や Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python の まとめ にて、並行処理に関するチャレンジに関して一言記述しました。
上記記事では成功するアプローチで執筆していますが、本記事では、うまくいかなかったアプローチについて紹介したいと思います。
前提
- Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python における UDF 関連のコードの意図を理解している
免責
- 筆者の環境やアプローチではうまくいかなかったのですが、何らかの工夫をすることで、本記事のアプローチを成功に導ける可能性はあり得ます。
解説
Transform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python では、Spark を利用したハッシュ計算の並行処理について、 UDF を利用する形で、以下のアプローチをとりました。1 | from pyspark.sql.types import StringType |
筆者は当初 UDF を使わず、関数型言語的な考えで、以下のような処理を試みました。
1 | from pyspark.sql import Row |
結果としてこちらはうまくいきませんでした。 x[0]['year']
, x[0]['month']
, x[0]['day']
, x[0]['forecasthour']
の値が None
になってしまい、結果として hashes
のリスト内にあるハッシュ値は全て同じ値となってしまいました。(なお、当該記事では latitude
と longitude
はある値を決め打ちで指定しています。)
色々と試行錯誤して検証しましたが、 Row(x)
が float と double 以外の型を扱えないのが原因であると結論づけました。
まとめ
Spark の UDF を使わずに spark.sparkContext.parallelize
を使って関数型言語的アプローチをとった結果、うまくいかなかったパターンについてご紹介しました。
もしこれを改善できるアイディアがあれば、共有いただけると幸いです。