-2

djangoクエリーセットの各エントリについてPDFレポートを生成する必要があります。 30kから40kの間のエントリーがあります。python - 非同期にHTTPリクエストを処理する

PDFは外部APIを介して生成されます。現在はオンデマンドで生成されているので、これはHTTPリクエスト/レスポンスを介して同期的に処理されます。 これは、私がdjango管理コマンドを使用してクエリーセットをループしてPDF生成を実行すると思うので、この作業では違います。

このタスクでは、どのアプローチをとるべきですか?

1)Celery:ワーカーにタスク(別のペイロードを含むhttp要求)を割り当てて、それが完了したら取得します。これは2つの可能な解決策について考えました。

2)request-futures:リクエストを非ブロッキングで使用する。

目標は、同時に(例えばAPIを扱うことができますどのように多くの同時要求に応じて、同時に10件のまたは100のHTTPリクエストを送信)APIを使用することです。

誰もここで同様のタスクを処理し、これを進める方法についてアドバイスを与えることができますか?

次は、multiprocessing(注:私はこのプロジェクトの所有権を取ったように、コードのほとんどは、再利用され、自分で書かれていません。):で作られた最初の試みであり、残念ながら

class Checker(object): 

    def __init__(self, *args, **kwargs): 
     # ... various setup 

    # other methods 
    # ..... 

    def run_single(self, uuid, verbose=False): 
     """ 
     run a single PDF generation and local download 
     """ 
     start = timer() 
     headers = self.headers 

     data, obj = self.get_review_data(uuid) 
     if verbose: 
      print("** Report: {} **".format(obj)) 
     response = requests.post(
      url=self.endpoint_url, 
      headers=headers, 
      data=json.dumps(data) 
     ) 
     if verbose: 
      print('POST - Response: {} \n {} \n {} secs'.format(
       response.status_code, 
       response.content, 
       response.elapsed.total_seconds()) 
      ) 
     run_url = self.check_progress(post_response=response, verbose=True) 
     if run_url: 
      self.get_file(run_url, obj, verbose=True) 
     print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start)) 


    def run_all(self, uuids, verbose=True): 
     start = timer() 
     for obj_uuid in review_uuids: 
      self.run_single(obj_uuid, verbose=verbose) 
     print("\n\n### Downloaded {}{} reviews in {} secs".format(
      "(verbose) " if verbose else "", 
      len(uuids), 
      timer() - start) 
     ) 

    def run_all_multi(self, uuids, workers=4, verbose=True): 
     pool = Pool(processes=workers) 
     pool.map(self.run_single, uuids) 


    def check_progress(self, post_response, attempts_limit=10000, verbose=False): 
     """ 
     check the progress of PDF generation querying periodically the API endpoint 
     """ 
     if post_response.status_code != 200: 
      if verbose: print("POST response status code != 200 - exit") 
      return None 
     url = 'https://apidomain.com/{path}'.format(
      domain=self.domain, 
      path=post_response.json().get('links', {}).get('self', {}).get('href'), 
      headers = self.headers 
     ) 
     job_id = post_response.json().get('jobId', '') 
     status = 'Running' 
     attempt_counter = 0 
     start = timer() 
     if verbose: 
      print("GET - url: {}".format(url)) 
     while status == 'Running': 
      attempt_counter += 1 
      job_response = requests.get(
       url=url, 
       headers=self.headers, 
      ) 
      job_data = job_response.json() 
      status = job_data['status'] 
      message = job_data['message'] 
      progress = job_data['progress'] 
      if status == 'Error': 
       if verbose: 
        end = timer() 
        print(
         '{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
          sc=job_response.status_code, 
          job_id=job_id, 
          error_id=job_data['errorId'], 
          message=message 
         ), '{} secs'.format(end - start) 
        ) 
        print('Attempts: {} \n {}% progress'.format(attempt_counter, progress)) 
       return None 
      if status == 'Complete': 
       if verbose: 
        end = timer() 
        print('run_id: {run_id} - Complete - {secs} secs'.format(
         run_id=run_id, 
         secs=end - start) 
        ) 
        print('Attempts: {}'.format(attempt_counter)) 
        print('{url}/files/'.format(url=url)) 
       return '{url}/files/'.format(url=url) 
      if attempt_counter >= attempts_limit: 
       if verbose: 
        end = timer() 
        print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \ 
          ' - {message}'.format(
           att_limit=attempts_limit, 
           progress=int(progress * 100), 
           message=message 
         ), '{} secs'.format(end-start)) 
       return None 
      if verbose: 
       print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r') 
       sys.stdout.flush() 
      time.sleep(1) 
     if verbose: 
      end = timer() 
      print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start)) 
     return None 

    def get_review_data(self, uuid, host=None, protocol=None): 
     review = get_object_or_404(MyModel, uuid) 
     internal_api_headers = { 
      'Authorization': 'Token {}'.format(
       review.employee.csod_profile.csod_user_token 
      ) 
     } 

     data = requests.get(
      url=a_local_url, 
      params={'format': 'json', 'indirect': 'true'}, 
      headers=internal_api_headers, 
     ).json() 
     return (data, review) 

    def get_file(self, runs_url, obj, verbose=False): 

     runs_files_response = requests.get(
      url=runs_url, 
      headers=self.headers, 
      stream=True, 
     ) 

     runs_files_data = runs_files_response.json() 


     file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI 
     file_response_url = 'https://apidomain.com/{path}'.format(path=file_path) 
     file_response = requests.get(
      url=file_response_url, 
      headers=self.headers, 
      params={'userId': settings.CREDENTIALS['userId']}, 
      stream=True, 
     ) 
     if file_response.status_code != 200: 
      if verbose: 
       print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
        r_id=obj.uuid, url=file_response_url) 
       ) 
     local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
      temp_dir=self.local_temp_dir, 
      uuid=obj.uuid, 
      employee=slugify(obj.employee.get_full_name()), 
      filename=slugify(obj.task.name) 
     ) 
     with open(local_file_path, 'wb') as f: 
      for block in file_response.iter_content(1024): 
       f.write(block) 
      if verbose: 
       print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid)) 
       print('\n --> File downloaded: {path}'.format(path=local_file_path)) 

    @classmethod 
    def get_temp_directory(self): 
     """ 
     generate a local unique temporary directory 
     """ 
     return '{temp_dir}/'.format(
      temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX), 
     ) 

if __name__ == "__main__": 
    uuids = #list or generator of objs uuids 
    checker = Checker() 
    checker.run_all_multi(uuids=uuids) 

、実行していますchecker.run_all_multiには次のような効果があります。

  • python shell freeze;
  • 出力は出力されません。
  • ファイルは生成されません。私は、コマンドラインからコンソールを殺さなければならない
  • 、通常のキーボード割り込みがchecker.run_allを実行すると、通常の仕事(一つずつ)しながら

を動作するように停止します。

このコードがなぜ機能しないのか(そしてマルチプロセッシングではなく私が何を使用できるか)に関する提案はありますか?

ありがとうございました。

+0

これらのレポートを生成する頻度はどのくらいですか?世代は手動または自動でトリガーされますか? –

+0

- 1年に一度 - 手動で – Luke

+0

その頻度では、私は要望を使って見て、rabbitmqなどの設定を避けることに傾いているだろう – Anentropic

答えて

1

頻度は、年に一度&です。セロリやリクエスト先物は必要ありません。

次に(multiprocessing.Poolを使用して)コードで管理コマンドを作成

def record_to_pdf(record): 
    # create pdf from record 

のようなメソッドを作成します

from multiprocessing import Pool 
pool = Pool(processes=NUMBER_OF_CORES) 
pool.map(record_to_pdf, YOUR_QUERYSET) 

管理コマンドは、しかし、非同期ではありません。非同期にするには、バックグラウンドで実行できます。

また、あなたのプロセスがCPUバインディングでない(いくつかのAPIを呼び出すような)場合、@Anentropicはプールを作成する際に、より多くのプロセスを試すことを提案しています。

+0

CPUリソースではないタスクについても試してみることができます> NUMBER_OF_CORES> NUMBER_OF_CORES – Anentropic

+0

@Anentropicあなたは正しいです、メソッド 'record_to_pdf'はいくつかのAPIを呼び出すだけです。かなりの数(ネットワーク速度とAPIレート制限による制限)。 –

+0

試しました。それはstdoutに何も出力しません。また、ファイルを保存先のディレクトリに保存しません。また、シェルをフリーズします(kill -9でkillする必要があります)。 同じコードがマルチプロセッシングなしで動作し、すべてのアイテムを順番に処理します。 コードを貼り付けることができます。何か案は? – Luke

関連する問題