2016-03-10 10 views
7

私は入力ストリームを持っており、結果をプログラムの別の部分に渡す前に2 HTTPSそれぞれのネットワーク要求を作りたいと思います。一般的なスループットは50 /秒です。Clojure(/ Java)で多くの同時HTTPS要求を強く行うには

for each input: 
    HTTP request A 
    HTTP request B 
    pass event on with (A.body and B.body) 

私は、デフォルトでは非同期ですhttp-kitクライアントを、使用しています。それは約束を返し、またコールバックを取ることができます。 Http-kitはJava NIOを使用します(herehere

要求の到着速度と要求を行う時間は、非同期で行う必要があるほど十分です。イベントが来ると

  1. は、チャネルの上に置く:

    私は3つのアプローチを試みました。多数のgoルーチンがチャネルを抜け出しています。各メイクは、HTTPリクエストから約束を受けてゴブロックをブロックするように要求しています。私は約束がスレッドでうまくいくとは思わないので、これはうまくいかない。

  2. イベントが到着したらすぐにfutureを開始します。これは非同期の約束を「ブロック」します。これにより、が非常にとCPU使用率が高くなります。何とかネットワーク資源の枯渇。
  3. イベントが到着したら、リクエストAを即座に起動し、リクエストBをコールバックしてイベントを渡すコールバックを渡します。これにより、数時間後にメモリ不足エラーが発生します。

これらはすべて機能し、しばらくの間容量を処理します。彼らはすべて最終的にクラッシュする。最新の約12時間後のクラッシュ:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending 
tasks! 
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run 
WARNING: com[email protected]1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status: 
     Managed Threads: 3 
     Active Threads: 1 
     Active Tasks: 
       com[email protected]65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0) 
     Pending Tasks: 
       [email protected]0d 
Pool thread stack traces: 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main] 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 
     Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main] 
       java.lang.Object.wait(Native Method) 
       com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534) 


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen 
java.lang.OutOfMemoryError: Java heap space 
     at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) 
     at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76) 
     at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65) 
     at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63) 
     at sun.security.ssl.Handshaker.activate(Handshaker.java:514) 
     at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717) 
     at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743) 
     at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310) 
     at org.httpkit.client.HttpClient.run(HttpClient.java:375) 
     at java.lang.Thread.run(Thread.java:745) 
Mar 10, 2016 4:56:34 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 5:00:43 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 
Mar 10, 2016 4:58:25 AM baleen.events invoke 
SEVERE: Thread error: Java heap space 
java.lang.OutOfMemoryError: Java heap space 

私は障害の原因がわかりません。閉鎖があまりにも多く、または徐々にリソースが漏れているか、スレッドが枯渇している可能性があります。

質問

  1. 過度の負担のように、任意の時点で飛行中の100個の要求があるかもしれないと音を意味し、それぞれが200msのを取るかもしれませんが、毎秒50件のHTTPリクエストを作成していますか?

  2. スループットを処理して堅牢な方法でこれを行うにはどうすればよいですか?

EDIT

YourKitプロファイラは、私は古いハンドラ(つまり要求)への参照が何らかの形で保持されていることを示唆しているjava.util.concurrent.FutureTask Sを経由してorg.httpkit.client.Handler秒経由char[]の約2GBの持っていることを私に伝えます。コールバックを使用しようとする全理由は、これを避けることでした(閉鎖時に何らかの形で巻き込まれる可能性がありますが)。

+1

OutOfMemoryErrorは、メモリの保持に問題があることを示していますが、コードを表示したり、完全な解決策を最初から作成したりすることはできません。私は、無限のシーケンスの先頭をつかむか、コネクションなどのリソースをクリーンアップしないかを探します。 –

+0

バッファが保持されているかどうかは分かりましたが、ガベージコレクションで解放メモリ/外部バッファを処理する必要があることがわかります。 NIOは割り当てていた。ダウンストリームで起こるのは、ちょうどデータベース挿入とチャンネルへの挿入です。 – Joe

+0

私はコードを投稿することを考えましたが、それはかなり複雑で、単純化したバージョンで問題を再現するかどうかを知るのに1日ほどかかります。 – Joe

答えて

0

メソッドの代わりに、ゴーブロック内のHTTP-kit返された先物があります。コアをブロックしないようにしてください。あなたはhttpkitのコールバックとcore.asyncを組み合わせることにより行うことができ、将来の非同期ハンドラスレッド、:

(defn handle-event 
"Return a core.async channel that will contain the result of making both HTTP call A and B." 
    [event-data] 
    (let [event-a-chan (clojure.core.async/chan) 
     event-b-chan (clojure.core.async/chan) 
     return-chan (clojure.core.async/chan)] 
    (org.httpkit.client/request "https://event-a-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-a-chan resp))) 
    (org.httpkit.client/request "https://event-b-call" 
           {:method :get :params {"param1-k" "param1-v"}} 
           (fn [resp] 
            (clojure.core.async/put! event-b-chan resp))) 
    (clojure.core.async/go 
     (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan) 
              :event-b-response (clojure.core.async/<! event-b-chan)})) 
    return-chan)) 
0
  1. がある可能性があることを意味し、200ミリ秒かかることがあり、それぞれが毎秒50件のHTTPリクエストを、作るん任意の時間に飛行中に100回のリクエストをすると、過度の負荷のように聞こえるでしょうか?

これは間違いない過度の近代的なハードウェア上のです。

  • スループットを処理して堅牢な方法でこれを行うにはどうすればよいですか?
  • これを実現するには、core.asyncパイプラインとhttp-kitのコールバックを組み合わせることができます。 http-kitコールバックからasync put!を使用することができるので、各リクエストに対してgoルーチンを作成する必要はありません(害はないはずです)。

    アクティブな接続の数を制限するために、パイプラインの各ステップに制限付きバッファを使用します。これは(少なくとも)システムで使用可能な一時的なTCPポートの数によって制限されます。

    ここでは、説明したのと同様の機能を果たす小さなプログラムの例を示します。この場合、チャネル—から「イベント」が読み取られ、各イベントはID「1」—であり、HTTPサービス上でこれらのIDを検索します。 JSONキー"next"を検索し、ステップ2のURLとしてエンキューします。最後に、このルックアップが完了すると、goルーチンが統計を報告するために監視するoutチャネルにイベントを追加します。

    (ns concur-req.core 
        (require [clojure.core.async :as async] 
          [cheshire.core :refer [decode]] 
          [org.httpkit.client :as http])) 
    
    (defn url-of 
        [id] 
        ;; this service responds within 100-200ms 
        (str "http://localhost:28080/" id ".json")) 
    
    (defn retrieve-json-async 
        [url c] 
        (http/get url nil 
          (fn [{body :body status :status :as resp}] 
           (if (= 200 status) 
           (async/put! c (decode body true)) 
           (println "ERROR:" resp)) 
           (async/close! c)))) 
    
    (defn run [parallelism stop-chan] 
        (let [;; allocate half of the parallelism to each step 
         step1-n (int (max (/ parallelism 2) 1)) 
         step2-n step1-n 
         ;; buffer to take ids, transform them into urls 
         step1-chan (async/chan step1-n (map url-of)) 
         ;; buffer for result of pulling urls from step1, xform by extracting :next url 
         step2-chan (async/chan step2-n (map :next)) 
         ;; buffer to count completed results 
         out-chan (async/chan 1 (map (constantly 1))) 
         ;; for delivering the final result 
         final-chan (async/chan) 
         start-time (System/currentTimeMillis)] 
    
        ;; process URLs from step1 and put the result in step2 
        (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan) 
        ;; process URLs from step2 and put the result in out 
        (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan) 
    
        ;; keep the input channel full until stop-chan is closed. 
        (async/go-loop [] 
         (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])] 
         (if (= c stop-chan) 
          (async/close! step1-chan) 
          (recur)))) 
    
        ;; count messages on out-chan until the pipeline is closed, printing 
        ;; status message every second 
        (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0] 
         (let [[v c] (async/alts! [status-timer out-chan])] 
         (cond (= c status-timer) 
           (do (println subt "records...") 
            (recur (async/timeout 1000) 0 (+ subt accu))) 
    
           (nil? v) 
           (async/>! final-chan (+ subt accu)) 
    
           :else 
           (recur status-timer (+ v subt) accu)))) 
    
        ;; block until done, then emit final report. 
        (let [final-total (async/<!! final-chan) 
          elapsed-ms (- (System/currentTimeMillis) start-time) 
          elapsed-s (/ elapsed-ms 1000.0)] 
         (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n" 
            final-total parallelism elapsed-s 
            (int (/ final-total elapsed-s))))))) 
    
    (defn run-for 
        [seconds parallelism] 
        (let [stop-chan (async/chan)] 
        (future 
         (Thread/sleep (* seconds 1000)) 
         (async/close! stop-chan)) 
        (run parallelism stop-chan))) 
    
    (do 
        ;; Warm up the connection pool, avoid somaxconn problems... 
        (doseq [p (map #(* 20 (inc %)) (range 25))] 
        (run-for 1 p)) 
        (run-for (* 60 60 6) 500)) 
    

    これをテストするには、100〜200msの間のランダムな時間をスリープした後に応答するHTTPサービスを設定します。その後、私はMacBook Proで6時間このプログラムを実行しました。

    並列性を500に設定すると、平均1155件のトランザクションが完了しました(1秒あたり2310件の完了済みHTTPリクエスト)。チューニングを行うと(そして、特にHTTPサービスを別のマシンに移動させることで)これがはるかに高い可能性があります。 JVMメモリは、最初の30分以内に1.5GBまでクリープされ、そのサイズを維持しました。私はオラクルの64ビット1.8 JVMを使用しています。

    関連する問題