大きなデータの問題に取り組んでおり、いくつかの並行性と非同期性の問題があります。次のような問題は次のとおりです。各ファイル行の非同期HTTP API呼び出し - Python
1)私はconcurrent.futuresモジュールからこの方法をProcessPoolExecutorを使用して処理しています複数の巨大なファイル(〜15件まで各xを4GB)を持つ:
def process(source):
files = os.list(source)
with ProcessPoolExecutor() as executor:
future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files}
for future in as_completed(future_to_url):
data = future.result()
2)今、それぞれに私は行ごとに行こうと思っています。特定のjsonを作成するための行を処理し、そのような2K jsonsを一緒にグループ化し、そのリクエストでAPIを応答して応答します。ここでは、コードされています
def process_individual_file(source, input_file):
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
response = requests.post(API_URL, json=json_array)
#check response status here
limit = 2000
3)は、今の問題は、各ファイルの行数は本当に大きいことと、そのAPIコールがブロックし、応答が少し遅いが、プログラムが完了するまでに膨大な時間を取っています。
4)私が達成したいのは、そのAPI呼び出しが行われているときに2000の次のバッチを処理し続けるように、そのAPI呼び出しをasyncにすることです。
5)私は今まで試したことがあります:asyncioを使用して実装しようとしていましたが、将来の一連のタスクを収集し、イベントループを使用して完了を待つ必要があります。このような何か:
async def process_individual_file(source, input_file):
tasks = []
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
tasks.append(asyncio.ensure_future(call_api(json_array)))
limit = 2000
await asyncio.wait(tasks)
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(process_individual_file(source, input_file))
ioloop.close()
6)、それはそれらを起動する前にすべてのタスクを収集するために待機し、これは間接的に前回と同じであるので、私は本当にこのことを理解していないです。誰かがこの問題の正しいアーキテクチャーでなければならないものを教えてくれますか?どのようにして、すべてのタスクを収集することなく、次のバッチを並行して処理する能力を持たずに、APIを非同期的に呼び出すことができますか?それはそれらを を起動する前にすべてのタスクを収集するために待機し、これは間接的に、以前のよう 同じであるため
感謝。確かに私はそれを正しく理解しておらず、期待された結果が出てこなかったので止まってしまった。あなたが言っていることは理にかなっていますが、私がここに来るのはもう一つの疑問です。ファイルのサイズのために何百万という要求が行われることになり、そのために将来のオブジェクトを保存するのが不愉快ですリスト。 –
私は平均時間[リンク](https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba)でやっていたもう1つの記事です。私は別のスレッドでイベントループをトリガーし、そのスレッドに私の先物を委譲し、API応答でコールバックをトリガーできるようです。今、同じPOCで作業しています。私もあなたの提案を試して、結果を得ることができます。ありがとうたくさん:) –
@ShubhamPatil私は多くの並列要求を避ける方法を示す回答を更新しました。 –