2016-04-19 11 views
1

データの操作を実行しようとしています。その値が、条件のいずれかと一致する場合は、そうでない場合はフォールスルー値に変換されます。Apache Sparkデータフレーム(Python)を使用してSwitchステートメントを実行する方法

これは、同等のSQLのようになります。

CASE 
      WHEN user_agent LIKE \'%CanvasAPI%\' THEN \'api\' 
      WHEN user_agent LIKE \'%candroid%\' THEN \'mobile_app_android\' 
      WHEN user_agent LIKE \'%iCanvas%\' THEN \'mobile_app_ios\' 
      WHEN user_agent LIKE \'%CanvasKit%\' THEN \'mobile_app_ios\' 
      WHEN user_agent LIKE \'%Windows NT%\' THEN \'desktop\' 
      WHEN user_agent LIKE \'%MacBook%\' THEN \'desktop\' 
      WHEN user_agent LIKE \'%iPhone%\' THEN \'mobile\' 
      WHEN user_agent LIKE \'%iPod Touch%\' THEN \'mobile\' 
      WHEN user_agent LIKE \'%iPad%\' THEN \'mobile\' 
      WHEN user_agent LIKE \'%iOS%\' THEN \'mobile\' 
      WHEN user_agent LIKE \'%CrOS%\' THEN \'desktop\' 
      WHEN user_agent LIKE \'%Android%\' THEN \'mobile\' 
      WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\' 
      WHEN user_agent LIKE \'%Mac OS%\' THEN \'desktop\' 
      WHEN user_agent LIKE \'%Macintosh%\' THEN \'desktop\' 
      ELSE \'other_unknown\' 
      END AS user_agent_type 

私がスパークすることはかなり新しいですので、このプログラムでの私の最初の試みは、ルックアップ辞書を使用していますので、同様RDDに線での値の行を調整します:

USER_AGENT_VALS = { 
    'CanvasAPI': 'api', 
    'candroid': 'mobile_app_android', 
    'iCanvas': 'mobile_app_ios', 
    'CanvasKit': 'mobile_app_ios', 
    'Windows NT': 'desktop', 
    'MacBook': 'desktop', 
    'iPhone': 'mobile', 
    'iPod Touch': 'mobile', 
    'iPad': 'mobile', 
    'iOS': 'mobile', 
    'CrOS': 'desktop', 
    'Android': 'mobile', 
    'Linux': 'desktop', 
    'Mac OS': 'desktop', 
    'Macintosh': 'desktop' 
} 

def parse_requests(line: list, 
        id_data: dict, 
        user_vals: dict = USER_AGENT_VALS): 
    """ 
    Expects an input list which maps to the following indexes: 
     0: user_id 
     1: context(course)_id 
     2: request_month 
     3: user_agent_type 

    :param line: A list of values. 
    :return: A list 
    """ 
    found = False 
    for key, value in user_vals.items(): 
     if key in line[3]: 
      found = True 
      line[3] = value 
    if not found: 
     line[3] = 'other_unknown' 
    # Retrieves the session_id count from the id_data dictionary using 
    # the user_id as the key. 
    session_count = id_data[str(line[0])] 
    line.append(session_count) 
    line.extend(config3.ETL_LIST) 
    return [str(item) for item in line] 

私の現在のコードはdataframe内のデータを持っている、と私は最も効率的に上記の操作を実行する方法を正確にはわかりません。私はそれらが不変なので、新しいデータフレームとして返す必要があることを知っていますが、私の質問はこれを行うには最高の方法です。ここに私のコードです:

from boto3 import client 
import psycopg2 as ppg2 
from pyspark import SparkConf, SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import current_date, date_format, lit, StringType 

EMR_CLIENT = client('emr') 
conf = SparkConf().setAppName('Canvas Requests Logs') 
sc = SparkContext(conf=conf) 
sql_context = SQLContext(sc) 
# for dependencies 
# sc.addPyFile() 

USER_AGENT_VALS = { 
    'CanvasAPI': 'api', 
    'candroid': 'mobile_app_android', 
    'iCanvas': 'mobile_app_ios', 
    'CanvasKit': 'mobile_app_ios', 
    'Windows NT': 'desktop', 
    'MacBook': 'desktop', 
    'iPhone': 'mobile', 
    'iPod Touch': 'mobile', 
    'iPad': 'mobile', 
    'iOS': 'mobile', 
    'CrOS': 'desktop', 
    'Android': 'mobile', 
    'Linux': 'desktop', 
    'Mac OS': 'desktop', 
    'Macintosh': 'desktop' 
} 

if __name__ == '__main__': 
    df = sql_context.read.parquet(
     r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/' 
     r'usage_data.gz.parquet') 

    course_data = df.filter(df['context_type'] == 'Course') 
    request_data = df.select(
     df['user_id'], 
     df['context_id'].alias('course_id'), 
     date_format(df['request_timestamp'], 'MM').alias('request_month'), 
     df['user_agent'] 
    ) 

    sesh_id_data = df.groupBy('user_id').count() 

    joined_data = request_data.join(
     sesh_id_data, 
     on=request_data['user_id'] == sesh_id_data['user_id'] 
    ).drop(sesh_id_data['user_id']) 

    all_fields = joined_data.withColumn(
     'etl_requests_usage', lit('DEV') 
    ).withColumn(
     'etl_datetime_local', current_date() 
    ).withColumn(
     'etl_transformation_name', lit('agg_canvas_logs_user_agent_types') 
    ).withColumn(
     'etl_pdi_version', lit(r'Apache Spark') 
    ).withColumn(
     'etl_pdi_build_version', lit(r'1.6.1') 
    ).withColumn(
     'etl_pdi_hostname', lit(r'N/A') 
    ).withColumn(
     'etl_pdi_ipaddress', lit(r'N/A') 
    ).withColumn(
     'etl_checksum_md5', lit(r'N/A') 
    ) 

PSとして、私はそれをやったやり方よりも列を追加する良い方法はありますか?

答えて

3

あなたはあなたも直接SQL式を使用することができますしたい場合:

expr = """ 
    CASE 
     WHEN user_agent LIKE \'%Android%\' THEN \'mobile\' 
     WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\' 
     ELSE \'other_unknown\' 
    END AS user_agent_type""" 

df = sc.parallelize([ 
    (1, "Android"), (2, "Linux"), (3, "Foo") 
]).toDF(["id", "user_agent"]) 

df.selectExpr("*", expr).show() 
## +---+----------+---------------+ 
## | id|user_agent|user_agent_type| 
## +---+----------+---------------+ 
## | 1| Android|   mobile| 
## | 2|  Linux|  desktop| 
## | 3|  Foo| other_unknown| 
## +---+----------+---------------+ 

がそうでなければ、あなたがwhenlikeotherwiseの組み合わせでそれを置き換えることができます。

from pyspark.sql.functions import col, when 
from functools import reduce 

c = col("user_agent") 
vs = [("Android", "mobile"), ("Linux", "desktop")] 
expr = reduce(
    lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc), 
    vs, 
    "other_unknown" 
).alias("user_agent_type") 

df.select("*", expr).show() 

## +---+----------+---------------+ 
## | id|user_agent|user_agent_type| 
## +---+----------+---------------+ 
## | 1| Android|   mobile| 
## | 2|  Linux|  desktop| 
## | 3|  Foo| other_unknown| 
## +---+----------+---------------+ 

ます。また、複数追加することができます単一の列:select

exprs = [c.alias(a) for (a, c) in [ 
    ('etl_requests_usage', lit('DEV')), 
    ('etl_datetime_local', current_date())]] 

df.select("*", *exprs) 
+0

非常に印象的ですが、私はSQLを直接使用できることを忘れていました。私は使用していたPostGRESql方言とSpark SQLの類似点がわかりません。 – flybonzai

+1

HiveQLはANSI SQLではありませんが、十分に近いです。 Postgres固有の拡張ではないものを使うときは、うまくいくはずです。私は過度に使いませんが、時には表現を構成するよりははるかに簡潔です。 – zero323

+0

あなたの 'reduce'ステートメントのようなものはどこですか?私は 'pyspark'や' functools'でドキュメントを見つけることができません。 – flybonzai

関連する問題