3
メモリ集中型の同時プロセスを抑制するために、clojure core.asyncチャネルを使用しようとしています。各プロセスは、画像をメモリにロードし、透かしを適用する。多すぎる画像を同時に処理しようとすると、OOMエラーが発生します。Clojure core.asyncを使用した調整プロセス
以下のパターンはうまくいくようですが、少し控えめな感じです。私の質問は、core.asyncでこれを行うより良い方法はありますか?または、代わりにこれを行うためにJavaの並行処理を使用するか(つまり、固定サイズのスレッドプールを作成するなど)
次のコードでの基本的な考え方は、基本的にtchan
のサイズへの同時プロセスの数を制限し、in-chan
に入るものを絞るために使用されるグローバル固定サイズのチャネル、tchan
を使用することです。
以下のコードでは、process-images
がエントリポイントです。
(def tbuff (buffer 20))
(def tchan
"tchan is used to throttle the number of processes
tbuff is a fixed size buffer"
(chan tbuff))
(defn accum-results
"Accumulates the images in results-chan"
[n result-chan]
(let [chans [result-chan (timeout timeout-ms)]]
(loop [imgs-out []
remaining n]
(if (zero? remaining)
imgs-out
(let [[img-result _] (alts!! chans)]
(if (nil? img-result)
(do
(log/warn "Image processing timed out")
(go (dotimes [_ remaining] (<! tchan)))
imgs-out)
(do
(go (<! tchan))
(recur (conj imgs-out img-result) (dec remaining)))))))))
(defn process-images
"Concurrently watermarks a list of images
Images is a sequence of maps representing image info
Concurrently fetches each actual image and applies the watermark
Returns a map of image info map -> image input stream"
[images]
(let [num-imgs (count images)
in-chan (chan num-imgs)
out-chan (chan num-imgs)]
;; set up the image-map consumer
;; asynchronously process things found on in-chan
(go
(dotimes [_ num-imgs]
; block here on input images
(let [img-in (<! in-chan)]
(thread
(let [img-out (watermark/watermarked-image-is img-in)]
(>!! out-chan [img-in img-out]))))))
;; put images on in-chan
(go
(doseq [img images]
(>! tchan :x)
(>! in-chan img)))
;; accum results
(let [results (accum-results num-imgs out-chan)]
(log/info (format "Processed %s of %s images and tbuff is %s"
(count results) num-imgs (count tbuff)))
(into {} results))))
私はここで編集した例では、それがマップするための呼び出しは、トランスデューサとない怠惰なシーケンスであることは注目に値します、それはちょうど1のように見えます。 –