FANCOMI Ad-Tech Blog

株式会社ファンコミュニケーションズ nend・新規事業のエンジニア・技術ブログ

【後編】Apache Sparkを使って、メモリ使用量が大きいバッチ処理をスケールアウト

こんにちは、弊社で1ヶ月半ほどインターンとして働いているt_sakaiです。

インターンシップの課題として、メモリ使用量が大きくて将来問題になりそうなバッチ処理をスケールアウトできるように書き換えるという課題に取り組みました。 解決手段として流行りのApache Sparkを使ってみたので、本記事ではこれについて書こうと思います。

本記事で使っている言語はScalaです。

前編ではSparkを選んだ理由とSparkの基礎について説明したので、後編では既存プログラムをSpark用に書き換える際の問題と解決方法について説明します。

fancs.hatenablog.com

本記事で紹介する内容

前編

  • 今回解決したかった課題
  • なぜSparkを選んだのか
  • RDDについて
  • 簡単なプログラムをSparkで書き換える

後編(本記事)

  • 本番プログラムをSpark用に書き換える
  • はまりどころ

本番プログラムをSpark用に書き換える

モデル(DB)まわり

既存のプログラムを書き換えるときにもっとも大変な作業は、データの入出力まわりです。特に、RDBMSやNoSQLなどのビッグデータ用でないデータソースからのデータ読み込みは大変です。

普通のプログラムにおいてはORMや独自モデルが使われていて、コントローラにデータが渡ってきた段階で必要なデータ整形処理などが完了しています。ここで注意しないといけないのは、モデルからデータを受け取る段階で1台のサーバのメモリ上にデータをまるまる置いていることです。

Sparkで扱いたいデータというのは基本的に1台のサーバのメモリにおける大きさではありません。そのため、ORMや既存のモデルを使うことは基本的にはできなくなります。 (そもそもSparkでは、入力データはHDFSやS3にある程度整形された状態で置いてあることを想定されているような気がします。)

同様に出力データが膨大な場合にも、1台のサーバのメモリにデータを集約してからDBやファイルに書き出すことはできません。

いずれの場合も、Spark用のデータコネクタや関数を使って入出力を行うことになります。

RDDへの読み込み

S3などに保存されたCSV形式のファイル

この場合は以下のように読み込めるためかなり簡単です。

val input = sc.textFile('s3n://bucketname/Filename')
MySQL

JdbcRDDというDBコネクタがあり、それを使って読み込むのが簡単なようです。普通にクエリを書くこともできます。 こちらのQiitaの記事で具体的な使い方が説明されているのでご覧ください。

MongoDB

MongoDBにはSpark専用のDBコネクタはコネクタはないようで、代わりにMongoDB Hadoop Connectorというものを使います。 ここに書いてあるパラメータを指定すれば、クエリも投げられるようです。

RDDからの書き込み

ファイルへの書き出し

ファイルへの書き出しは非常に簡単で、以下のメソッドを呼び出すだけです。

rdd.saveAsTextFile('s3n://bucketname/Filename')
DBへの書き出し(データが小さい場合)

もし出力データがそれほど大きくなく、1台のマシンのメモリに乗り切るくらいなのであれば、collectメソッドなどを用いて一度マスターサーバにデータを集約してから書き出すと簡単です。

val output = rdd.collect // 変数outputは普通のList型
yourORM.save(output) // outputは分散されていない普通のデータなので、ORMなどで自由に扱える。
DBへの書き出し(データが大きい場合)

もし出力データが1台のマシンのメモリに乗り切らない場合は少々大変です。この場合公式のドキュメントでも触れられているように、RDDのforEachPartition関数というものを使ってそれぞれのスレーブサーバでDBコネクションを取るのが定石のようです。

rdd.foreachPartition { iteration =>
  //SparkスレーブからDBやKVSに書き込むときには、それぞれのパーティションでコネクションを取らなければならない
  slaveClient = getYourOwnDBClient
  for (element <- iteration)
     slaveClient.save(element)
  }
}

はまりどころ

ここからは、実際に業務でSparkを使う人向けのかなりディープな話になります。

Serializableでないクラスをスレーブで使うと、実行時エラーになる

既存のクラスを再利用してSparkスレーブ内で使おうとすると、以下のようなエラーに出くわすことがあります。

Task not serializable: java.io.NotSerializableException : yourClass

Sparkスレーブでコードが実行されるとき、オブジェクトやクラスはマスターノードでシリアライズされてスレーブノードへ送られます。そして、スレーブノードで受け取ったデータをデシリアライズして実際のインスタンスを作ります。

この一連の流れを実行するには、 Sparkスレーブ上で利用される全てのクラスはSerializableである必要があります。Stack Overflowに投稿されたQ&A

しかし、普通に実装したクラスだとSerializableになっていないケースがあります。そのような場合、以下のようにしてSerializableにできる可能性があります。参考サイト

@SerialVersionUID(75L)
class YourUnserializableClass extends Serializable {
  ...
}

このエラーはSparkをローカルモードで動かしたときには起こらないので、結構気付きにくいです。(私はこれに丸3日くらいはまってました。) 上で紹介したような分かりやすいエラーメッセージが出ないこともありますので、Sparkスレーブでクラスをインスタンス化する際は気をつけてください。

スレーブで設定ファイルなどを読み込めない

以下のコードのようにローカルファイル読み込みをスレーブ上で行う場合は、そのファイルをスレーブにも渡してやる必要があります。

rdd.map { each =>
  val redisIp = File.read('/path/to/your/redisIpFile')
  val con = getRedisConnection(redisIp)
  val con.save(each)
}

これはマスターにだけコードをデプロイしてるときに起こりがちなミスです。jarファイルはマスターにだけデプロイしておけば事足りるので気づくのが遅れがちになります。

基本的に、マスターとスレーブでフォルダ構成は全く同じにしておくことをお勧めします。

エラーメッセージが分かりにくい

既に少し触れましたが、Sparkがランタイムエラーで落ちたときのエラーメッセージは非常に不親切です。

java.io.IOException: PARSING_ERROR(2)

とか

java.io.IOException: FAILED_TO_UNCOMPRESS(5)

とかいうググりにくいエラーが出たりします。

この問題に関してはSparkのJIRA上でも度々議論に上がりますが、なかなか解決されていません。私もこれという解決策は見出せませんでした。経験則としてはメモリを増やすと解決することが多かったです。

強制終了するとゴミファイルが残る

開発時はSparkプログラムをCtrl+Cで強制終了したりすることが多いのですが、そうするとゴミファイルがスレーブ上に残ってしまう場合が結構あります。私はこれのせいでスレーブサーバのディスクスペースを使い切ってしまって、謎のエラーに出くわしたりしました。この時は要らないファイルを手動で消して対応しました。

たいていの場合、/tmp/か$SPARK_HOME/work/にゴミファイルが残っていますので、適宜そのファイルを消すようにすればよいと思われます。

デバッグがしにくい

Sparkのメソッドは基本的にlazy evaluation(遅延評価)なので、デバッグがやりにくく、メモリ消費量を予測するのも難しいです。

また、並列処理の中でprintデバッグとかをしようとしてもコンソールには出力されず、それぞれのスレーブ上の指定のstdoutファイルに出力されてしまいます。

この件についてはまだこれといったベストプラクティスを見つけられていません。詳しい方がいらしゃったら、コメントしていただけると幸いです。

-- 追記 --

一応、デバッグや性能チューニングについて書かれた公式のスライドもありました。