kt.log

Failure pattern on UDF with PySpark

Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in PythonTransform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python の まとめ にて、並行処理に関するチャレンジに関して一言記述しました。

上記記事では成功するアプローチで執筆していますが、本記事では、うまくいかなかったアプローチについて紹介したいと思います。

前提

免責

  • 筆者の環境やアプローチではうまくいかなかったのですが、何らかの工夫をすることで、本記事のアプローチを成功に導ける可能性はあり得ます。

解説

Insert data into Azure Managed Instance for Apache Cassandra on Azure Databricks in PythonTransform and load data into Azure Managed Instance for Apache Cassandra on Azure Databricks in Python では、 Cassandra テーブルに対する CQL 実行を、 Azure Databricks の Notebook 上でのイテレーションで実現していました。以下のような感じです。
1
2
3
4
5
6
7
8
dataCollect = df.rdd.toLocalIterator()
for row in dataCollect:
try:
cassandra_session.execute(prepared_session,
(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]))
except Exception as e:
print(e)
pass

このアプローチは成功しますが、パフォーマンスの観点でいうと全くメリットがありません。

当初は Spark のメリットを活かすために、以下のようなコードを書いていました。

1
2
3
4
5
6
7
8
from pyspark.sql.types import UserDefinedType
import pyspark.sql.functions as F

_function = lambda row: cassandra_session.execute(prepared_session,
(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12]))
_returnType = UserDefinedType
_udf = F.udf(_function, _returnType())
df.select(_udf())

このコードの実行結果はエラーで、その内容は以下の通りです。

ちなみに、上記 UDF を使うアプローチ以外にも、 df.foreach()df.rdd.map()cassandra_session.execute(...) しようとした際にも、同じエラーが発生しています。
一方、 cassandra_session.execute(...) とは異なる、ありふれたコードをこれらで並行処理させようとすると、このようなエラーは出ずに Spark の処理が成功します。

このことから、 Spark に並行処理させるコードの中に、 Cassandra のセッション情報を含めることができないということが分かりました。

これは推測ですが、 cassandra_session はシングルトンになっていて、それを手元の Notebook の Python インタプリタから Spark に渡す際の PySpark によるシリアライズに失敗しているという状況なのだと思われます。
筆者は過去、別のインタプリタ言語で似たようなリモート処理を実装しようとして、やはりシリアライズに失敗した経験があります。もしかしたら、 Python ではなく Scala で記述することで、ブレークスルーできるかもしれません。

まとめ

Cassandra へのクエリを Spark で並行処理しようとして、うまくいかなかったパターンについてご紹介しました。
もしこれを改善できるアイディアがあれば、共有いただけると幸いです。

See also