2015-12-10 30 views
9

私はPandas library to read BigQueryデータを使いたいです。大きな結果を許可するにはどうすればよいですか?
パンダ以外のBigQueryインタラクションでは、これはthisのように実現できます。Python BigQuery allowLargeResults with pandas.io.gbq

パンダと現在のコード:

sProjectID = "project-id" 
sQuery = ''' 
    SELECT 
     column1, column2 
    FROM [dataset_name.tablename] 
''' 
from pandas.io import gbq 
df = gbq.read_gbq(sQuery, sProjectID) 

答えて

2

:ここに私の修正(これも仕事のほとんどがネイティブパンダせずにそれを行うには)です。私の前の答えを見て、私はそれがyosemite_kが言ったように失敗することを見る。

大きな結果は、BigQuery - > Storage - > local - > dataframeパターンを実際に実行する必要があります。

BigQueryの資源:

ストレージリソース:

パンダの資源:

インストール:

pip install pandas 
pip install google-cloud-storage 
pip install google-cloud-bigquery 

完全な実装(bigquery_to_dataframe.py):

""" 
We require python 3 for the google cloud python API 
    mkvirtualenv --python `which python3` env3 
And our dependencies: 
    pip install pandas 
    pip install google-cloud-bigquery 
    pip install google-cloud-storage 
""" 
import os 
import time 
import uuid 

from google.cloud import bigquery 
from google.cloud import storage 
import pandas as pd 


def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path): 
    """Pipeline to get data from BigQuery into a local pandas dataframe. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param dataset_id: BigQuery dataset id. 
    :type dataset_id: str 
    :param table_id: BigQuery table id. 
    :type table_id: str 
    :param storage_uri: Google Storage uri where data gets dropped off. 
    :type storage_uri: str 
    :param local_data_path: Path where data should end up. 
    :type local_data_path: str 
    :return: Pandas dataframe from BigQuery table. 
    :rtype: pd.DataFrame 
    """ 
    bq_to_storage(project_id, dataset_id, table_id, storage_uri) 

    storage_to_local(project_id, storage_uri, local_data_path) 

    data_dir = os.path.join(local_data_path, "test_data") 
    df = local_to_df(data_dir) 

    return df 


def bq_to_storage(project_id, dataset_id, table_id, target_uri): 
    """Export a BigQuery table to Google Storage. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param dataset_id: BigQuery dataset name where source data resides. 
    :type dataset_id: str 
    :param table_id: BigQuery table name where source data resides. 
    :type table_id: str 
    :param target_uri: Google Storage location where table gets saved. 
    :type target_uri: str 
    :return: The random ID generated to identify the job. 
    :rtype: str 
    """ 
    client = bigquery.Client(project=project_id) 

    dataset = client.dataset(dataset_name=dataset_id) 
    table = dataset.table(name=table_id) 

    job = client.extract_table_to_storage(
     str(uuid.uuid4()), # id we assign to be the job name 
     table, 
     target_uri 
    ) 
    job.destination_format = 'CSV' 
    job.write_disposition = 'WRITE_TRUNCATE' 

    job.begin() # async execution 

    if job.errors: 
     print(job.errors) 

    while job.state != 'DONE': 
     time.sleep(5) 
     print("exporting '{}.{}' to '{}': {}".format(
      dataset_id, table_id, target_uri, job.state 
     )) 
     job.reload() 

    print(job.state) 

    return job.name 


def storage_to_local(project_id, source_uri, target_dir): 
    """Save a file or folder from google storage to a local directory. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param source_uri: Google Storage location where file comes form. 
    :type source_uri: str 
    :param target_dir: Local file location where files are to be stored. 
    :type target_dir: str 
    :return: None 
    :rtype: None 
    """ 
    client = storage.Client(project=project_id) 

    bucket_name = source_uri.split("gs://")[1].split("/")[0] 
    file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::]) 
    bucket = client.lookup_bucket(bucket_name) 

    folder_name = "/".join(file_path.split("/")[0:-1]) + "/" 
    blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)] 

    # get files if we wanted just files 
    blob_name = file_path.split("/")[-1] 
    if blob_name != "*": 
     print("Getting just the file '{}'".format(file_path)) 
     our_blobs = [o for o in blobs if o.name.endswith(blob_name)] 
    else: 
     print("Getting all files in '{}'".format(folder_name)) 
     our_blobs = blobs 

    print([o.name for o in our_blobs]) 

    for blob in our_blobs: 
     filename = os.path.join(target_dir, blob.name) 

     # create a complex folder structure if necessary 
     if not os.path.isdir(os.path.dirname(filename)): 
      os.makedirs(os.path.dirname(filename)) 

     with open(filename, 'wb') as f: 
      blob.download_to_file(f) 


def local_to_df(data_path): 
    """Import local data files into a single pandas dataframe. 

    :param data_path: File or folder path where csv data are located. 
    :type data_path: str 
    :return: Pandas dataframe containing data from data_path. 
    :rtype: pd.DataFrame 
    """ 
    # if data_dir is a file, then just load it into pandas 
    if os.path.isfile(data_path): 
     print("Loading '{}' into a dataframe".format(data_path)) 
     df = pd.read_csv(data_path, header=1) 
    elif os.path.isdir(data_path): 
     files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)] 
     print("Loading {} into a single dataframe".format(files)) 
     df = pd.concat((pd.read_csv(s) for s in files)) 
    else: 
     raise ValueError(
      "Please enter a valid path. {} does not exist.".format(data_path) 
     ) 

    return df 


if __name__ == '__main__': 
    PROJECT_ID = "my-project" 
    DATASET_ID = "bq_dataset" 
    TABLE_ID = "bq_table" 
    STORAGE_URI = "gs://my-bucket/path/for/dropoff/*" 
    LOCAL_DATA_PATH = "/path/to/save/" 

    bq_to_df(PROJECT_ID, DATASET_ID, TABLE_ID, STORAGE_URI, LOCAL_DATA_PATH) 
+0

これはpython 2.7ではできませんか? – Keith

+0

Pythonで可能です2.7 – Flair

+0

@Flair Apologies。私はそれがPython 3のみであるという印象の下で走ってきましたが、私はPython 2 virtualenvでそれをテストしました。それはうまく動作します。 – Roman

5

編集:私は私の他の回答ででこれを行うための適切な方法を掲載しました。 Googleのストレージに最初にデータを落としてください。この方法では、データが大きすぎることはありません。


[OK]を、私はパンダでそれを行うための直接的な方法を見つけられませんでしたので、私は通常のAPIで少し余分を書かなければなりませんでした。 のpython3 google.cloud API経由でこれを行うための適切な方法を投稿することを決めた

sProjectID = "project-id" 
sQuery = ''' 
    SELECT 
     column1, column2 
    FROM [dataset_name.tablename] 
''' 

df = create_dataframe(sQuery, sProjectID, bLargeResults=True) 


#*******Functions to make above work********* 



def create_dataframe(sQuery, sProjectID, bLargeResults=False): 
    "takes a BigQuery sql query and returns a Pandas dataframe" 

    if bLargeResults: 
     oService = create_service() 
     dDestinationTable = run_query(sQuery, oService, sProjectID) 
     df = pandas_get_table(dDestinationTable) 
    else: 
     df = pandas_query(sQuery, sProjectID) 

    return df 



def pandas_query(sQuery, sProjectID): 
    "go into bigquery and get the table with sql query and return dataframe" 
    from pandas.io import gbq 
    df = gbq.read_gbq(sQuery, sProjectID) 

    return df 



def pandas_get_table(dTable): 
    "fetch a table and return dataframe" 
    from pandas.io import gbq 

    sProjectID = dTable['projectId'] 
    sDatasetID = dTable['datasetId'] 
    sTableID = dTable['tableId'] 
    sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID) 

    df = gbq.read_gbq(sQuery, sProjectID) 

    return df 




def create_service(): 
    "create google service" 
    from oauth2client.client import GoogleCredentials 
    from apiclient.discovery import build 
    credentials = GoogleCredentials.get_application_default() 
    oService = build('bigquery', 'v2', credentials=credentials) 
    return oService 



def run_query(sQuery, oService, sProjectID): 
    "runs the bigquery query" 

    dQuery = { 
     'configuration': { 
      'query': { 
       'writeDisposition': 'OVERWRITE', 
       'useQueryCache': False, 
       'allowLargeResults': True, 
       'query': sQuery, 
       'destinationTable': { 
        'projectId': sProjectID, 
        'datasetId': 'sandbox', 
        'tableId': 'api_large_result_dropoff', 
       }, 
      } 
     } 
    } 

    job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute() 


    return job['configuration']['query']['destinationTable'] 
+0

これは有望です。しかし、私はエラーが発生する理由:notFound、メッセージ:見つからない:テーブルmy_project_id:sandbox.api_large_result_dropoff。このコードを実行する前に、前もって措置を講じる必要がありますか? – marcopah

+1

BigQueryプロジェクト(これは 'my_project_id'と呼ばれるようです)とそのプロジェクトのデータセット(上記の' sandbox'と呼ばれます)が必要です。 – Roman

+1

中間テーブルに保存した元のクエリの結果が依然として大きい場合、これは機能しません。 –

2

あなたはレガシーに標準pd.read_gbq内の関数からデフォルトの方言を変更することでそれを行うことができます。

pd.read_gbq(query, 'my-super-project', dialect='standard') 

確かに、あなたはパラメータAllowLargeResultsのためのビッグクエリドキュメントで読むことができます:

AllowLargeResults:標準のSQLクエリの場合、このフラグは 無視され、大きな結果が常に許可されています。

+0

特定のサイズのしきい値を超える宛先テーブルを指定する必要があります。 –