2017-12-24 32 views
1

大きなデータの問題に取り組んでおり、いくつかの並行性と非同期性の問題があります。次のような問題は次のとおりです。各ファイル行の非同期HTTP API呼び出し - Python

1)私はconcurrent.futuresモジュールからこの方法をProcessPoolExecutorを使用して処理しています複数の巨大なファイル(〜15件まで各xを4GB)を持つ:

def process(source): 
    files = os.list(source) 
    with ProcessPoolExecutor() as executor: 
     future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files} 
     for future in as_completed(future_to_url): 
      data = future.result() 

2)今、それぞれに私は行ごとに行こうと思っています。特定のjsonを作成するための行を処理し、そのような2K jsonsを一緒にグループ化し、そのリクエストでAPIを応答して応答します。ここでは、コードされています

def process_individual_file(source, input_file): 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       response = requests.post(API_URL, json=json_array) 
       #check response status here 
       limit = 2000 

3)は、今の問題は、各ファイルの行数は本当に大きいことと、そのAPIコールがブロックし、応答が少し遅いが、プログラムが完了するまでに膨大な時間を取っています。

4)私が達成したいのは、そのAPI呼び出しが行われているときに2000の次のバッチを処理し続けるように、そのAPI呼び出しをasyncにすることです。

5)私は今まで試したことがあります:asyncioを使用して実装しようとしていましたが、将来の一連のタスクを収集し、イベントループを使用して完了を待つ必要があります。このような何か:

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

ioloop = asyncio.get_event_loop() 
ioloop.run_until_complete(process_individual_file(source, input_file)) 
ioloop.close() 

6)、それはそれらを起動する前にすべてのタスクを収集するために待機し、これは間接的に前回と同じであるので、私は本当にこのことを理解していないです。誰かがこの問題の正しいアーキテクチャーでなければならないものを教えてくれますか?どのようにして、すべてのタスクを収集することなく、次のバッチを並行して処理する能力を持たずに、APIを非同期的に呼び出すことができますか?それはそれらを を起動する前にすべてのタスクを収集するために待機し、これは間接的に、以前のよう 同じであるため

答えて

1

私は本当にこのことを理解していないです。

いいえ、あなたは間違っています。 asyncio.Taskasyncio.ensure_futureで作成すると、すぐにcall_apiコルーチンを実行し始めます。これはasyncioの仕事にどのようなタスクです:

import asyncio 


async def test(i): 
    print(f'{i} started') 
    await asyncio.sleep(i) 


async def main(): 
    tasks = [ 
     asyncio.ensure_future(test(i)) 
     for i 
     in range(3) 
    ] 

    await asyncio.sleep(0) 
    print('At this moment tasks are already started') 

    await asyncio.wait(tasks) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

出力:それはあなたに制御を戻すことなく、CPU関連の仕事を大量に行います。あなたのアプローチと

0 started 
1 started 
2 started 
At this moment tasks are already started 

問題はprocess_individual_fileが実際に非同期ではないということですasyncioイベントループ。それは問題です - 機能ブロックイベントループは不可能なタスクを実行させます。

非常にシンプルですが、私はあなたが使用することができると思う効果的な解決策 - 各行を読んだ上で、例えば、process_individual_fileを実行するいくつかの量の後asyncio.sleep(0)を使用して手動でイベントループに制御を返すことです:

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      await asyncio.sleep(0) # Return control to event loop to allow it execute tasks 

      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

UPD:

で、そこで行われるリクエストの数百万人以上になりますので、私は は、それらのすべてのための将来のオブジェクトを格納するために不快に感じていますリスト

それは意味があります。何百万の並列ネットワーク要求を実行すると良いことはありません。この場合の制限を設定する通常の方法は、asyncio.Semaphoreのような同期プリミティブを使用することです。

私は、ファイルからjson_arrayを取得し、新しいタスクを追加する前にSemaphoreを取得し、それをタスク準備完了にリリースするよう助言します。多くの並列実行タスクから保護されたきれいなコードが得られます。

これは、次のようなもののようになります。私を修正するためのGerasimov @Mikhail

def get_json_array(input_file): 
    json_array = [] 
    limit = 2000 

    with open(input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 

      limit -= 1 
      if limit == 0: 
       yield json_array # generator will allow split file-reading logic from adding tasks 

       json_array = [] 
       limit = 2000 


sem = asyncio.Semaphore(50) # don't allow more than 50 parallel requests 

async def process_individual_file(input_file): 
    for json_array in get_json_array(input_file): 
     await sem.acquire() # file reading wouldn't resume until there's some place for newer tasks 
     task = asyncio.ensure_future(call_api(json_array)) 
     task.add_done_callback(lambda t: sem.release()) # on task done - free place for next tasks 
     task.add_done_callback(lambda t: print(t.result())) # print result on some call_api done 
+0

感謝。確かに私はそれを正しく理解しておらず、期待された結果が出てこなかったので止まってしまった。あなたが言っていることは理にかなっていますが、私がここに来るのはもう一つの疑問です。ファイルのサイズのために何百万という要求が行われることになり、そのために将来のオブジェクトを保存するのが不愉快ですリスト。 –

+0

私は平均時間[リンク](https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba)でやっていたもう1つの記事です。私は別のスレッドでイベントループをトリガーし、そのスレッドに私の先物を委譲し、API応答でコールバックをトリガーできるようです。今、同じPOCで作業しています。私もあなたの提案を試して、結果を得ることができます。ありがとうたくさん:) –

+0

@ShubhamPatil私は多くの並列要求を避ける方法を示す回答を更新しました。 –

関連する問題