djangoクエリーセットの各エントリについてPDFレポートを生成する必要があります。 30kから40kの間のエントリーがあります。python - 非同期にHTTPリクエストを処理する
PDFは外部APIを介して生成されます。現在はオンデマンドで生成されているので、これはHTTPリクエスト/レスポンスを介して同期的に処理されます。 これは、私がdjango管理コマンドを使用してクエリーセットをループしてPDF生成を実行すると思うので、この作業では違います。
このタスクでは、どのアプローチをとるべきですか?
1)Celery:ワーカーにタスク(別のペイロードを含むhttp要求)を割り当てて、それが完了したら取得します。これは2つの可能な解決策について考えました。
2)request-futures:リクエストを非ブロッキングで使用する。
目標は、同時に(例えばAPIを扱うことができますどのように多くの同時要求に応じて、同時に10件のまたは100のHTTPリクエストを送信)APIを使用することです。
誰もここで同様のタスクを処理し、これを進める方法についてアドバイスを与えることができますか?
次は、multiprocessing(注:私はこのプロジェクトの所有権を取ったように、コードのほとんどは、再利用され、自分で書かれていません。):で作られた最初の試みであり、残念ながら
class Checker(object):
def __init__(self, *args, **kwargs):
# ... various setup
# other methods
# .....
def run_single(self, uuid, verbose=False):
"""
run a single PDF generation and local download
"""
start = timer()
headers = self.headers
data, obj = self.get_review_data(uuid)
if verbose:
print("** Report: {} **".format(obj))
response = requests.post(
url=self.endpoint_url,
headers=headers,
data=json.dumps(data)
)
if verbose:
print('POST - Response: {} \n {} \n {} secs'.format(
response.status_code,
response.content,
response.elapsed.total_seconds())
)
run_url = self.check_progress(post_response=response, verbose=True)
if run_url:
self.get_file(run_url, obj, verbose=True)
print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start))
def run_all(self, uuids, verbose=True):
start = timer()
for obj_uuid in review_uuids:
self.run_single(obj_uuid, verbose=verbose)
print("\n\n### Downloaded {}{} reviews in {} secs".format(
"(verbose) " if verbose else "",
len(uuids),
timer() - start)
)
def run_all_multi(self, uuids, workers=4, verbose=True):
pool = Pool(processes=workers)
pool.map(self.run_single, uuids)
def check_progress(self, post_response, attempts_limit=10000, verbose=False):
"""
check the progress of PDF generation querying periodically the API endpoint
"""
if post_response.status_code != 200:
if verbose: print("POST response status code != 200 - exit")
return None
url = 'https://apidomain.com/{path}'.format(
domain=self.domain,
path=post_response.json().get('links', {}).get('self', {}).get('href'),
headers = self.headers
)
job_id = post_response.json().get('jobId', '')
status = 'Running'
attempt_counter = 0
start = timer()
if verbose:
print("GET - url: {}".format(url))
while status == 'Running':
attempt_counter += 1
job_response = requests.get(
url=url,
headers=self.headers,
)
job_data = job_response.json()
status = job_data['status']
message = job_data['message']
progress = job_data['progress']
if status == 'Error':
if verbose:
end = timer()
print(
'{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
sc=job_response.status_code,
job_id=job_id,
error_id=job_data['errorId'],
message=message
), '{} secs'.format(end - start)
)
print('Attempts: {} \n {}% progress'.format(attempt_counter, progress))
return None
if status == 'Complete':
if verbose:
end = timer()
print('run_id: {run_id} - Complete - {secs} secs'.format(
run_id=run_id,
secs=end - start)
)
print('Attempts: {}'.format(attempt_counter))
print('{url}/files/'.format(url=url))
return '{url}/files/'.format(url=url)
if attempt_counter >= attempts_limit:
if verbose:
end = timer()
print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \
' - {message}'.format(
att_limit=attempts_limit,
progress=int(progress * 100),
message=message
), '{} secs'.format(end-start))
return None
if verbose:
print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r')
sys.stdout.flush()
time.sleep(1)
if verbose:
end = timer()
print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start))
return None
def get_review_data(self, uuid, host=None, protocol=None):
review = get_object_or_404(MyModel, uuid)
internal_api_headers = {
'Authorization': 'Token {}'.format(
review.employee.csod_profile.csod_user_token
)
}
data = requests.get(
url=a_local_url,
params={'format': 'json', 'indirect': 'true'},
headers=internal_api_headers,
).json()
return (data, review)
def get_file(self, runs_url, obj, verbose=False):
runs_files_response = requests.get(
url=runs_url,
headers=self.headers,
stream=True,
)
runs_files_data = runs_files_response.json()
file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI
file_response_url = 'https://apidomain.com/{path}'.format(path=file_path)
file_response = requests.get(
url=file_response_url,
headers=self.headers,
params={'userId': settings.CREDENTIALS['userId']},
stream=True,
)
if file_response.status_code != 200:
if verbose:
print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
r_id=obj.uuid, url=file_response_url)
)
local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
temp_dir=self.local_temp_dir,
uuid=obj.uuid,
employee=slugify(obj.employee.get_full_name()),
filename=slugify(obj.task.name)
)
with open(local_file_path, 'wb') as f:
for block in file_response.iter_content(1024):
f.write(block)
if verbose:
print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid))
print('\n --> File downloaded: {path}'.format(path=local_file_path))
@classmethod
def get_temp_directory(self):
"""
generate a local unique temporary directory
"""
return '{temp_dir}/'.format(
temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX),
)
if __name__ == "__main__":
uuids = #list or generator of objs uuids
checker = Checker()
checker.run_all_multi(uuids=uuids)
、実行していますchecker.run_all_multi
には次のような効果があります。
- python shell freeze;
- 出力は出力されません。
- ファイルは生成されません。私は、コマンドラインからコンソールを殺さなければならない
- 、通常のキーボード割り込みが
checker.run_all
を実行すると、通常の仕事(一つずつ)しながら
を動作するように停止します。
このコードがなぜ機能しないのか(そしてマルチプロセッシングではなく私が何を使用できるか)に関する提案はありますか?
ありがとうございました。
これらのレポートを生成する頻度はどのくらいですか?世代は手動または自動でトリガーされますか? –
- 1年に一度 - 手動で – Luke
その頻度では、私は要望を使って見て、rabbitmqなどの設定を避けることに傾いているだろう – Anentropic