ども、@kimihom です。
今回、大量のレコードを一つずつ処理する実装をしたので、その実装方法をまとめておく。
コードの大枠
以下は全ユーザー(User)に紐づいているレコード(Record) に対して処理をするコードとなっている。
User.all.order("id").each do |u| r_all = u.records r_all.find_in_batches do |records| Parallel.each(records, in_threads: 50) do |r| begin # 処理 ActiveRecord::Base.connection_pool.with_connection do # ActiveRecord を使った処理 end rescue => e puts "err #{e}" end end end r_all = nil end
find_in_batches
find_in_batches を使うことで、u.records
を一気に処理するのではなく、デフォルトでは1,000件ごとに分けて処理するようになる。これによって、サーバーのメモリ負荷を軽減することができる。
ドキュメントには
To be yielded each record one by one, use #find_each instead.
と書かれている。一つずつレコードを生成するには、#find_each を使うとのこと。
Parallel
大量のレコードを一つ一つ処理していては、日が暮れてしまう。ということでマルチスレッドでコードを実行するには Parallel という Gem が便利に使える。デフォルト Ruby の提供している Threads は、実際にコードを書いてみると複雑になりがちだ。
ActiveRecord で取ってきたデータをスレッドで each させる。この時指定するパラメータ in_threads
の数は、実行する環境によって左右される。例えば外部のデータにアクセスする際や、書くコードの重さなどによって低くしないといけないケースが出てくる。まずは少なめの数から実行してみて、最適な数を見つけていく形になるだろう。
ActiveRecord::Base.connection_pool.with_connection
マルチスレッドで処理をすると、ActiveRecord の DB アクセスがスレッドごとに作られてしまい、コネクションの作成に失敗してしまう。
could not obtain a connection from the pool within 5.000 seconds (waited 5.000 seconds);
DB コネクションを使い回すようにするために、この with_connection
のブロック内で ActiveRecord の処理を書く必要がある。
終わりに
普段 Rails でコードを書いているだけだと、このような大量の処理というケースはあまり出くわさないんだけど、大量のデータを一括で更新したいといったような運用のケースで並列処理は必要になってくるだろう。
私自身、Web コードばかり書いていた影響で並列処理を熟知しているわけではないんだけど、最終的に今回書いたコードでうまく大量の処理を実行できたので良かった。
こうした大量の処理をする前には、DB のバックアップは取っておいた方が身のためだね。より安全なバッチ処理についても考えていかなければならないと思った。