2017-01-13 16 views
1

で平坦化RDDに複雑なRDDの変換私は、次のCSV(サンプル)PySpark

id  timestamp   routeid creationdate  parameters 
1000 21-11-2016 22:55  14  21-11-2016 22:55 RSRP=-102, 
1002 21-11-2016 22:55  14  21-11-2016 22:55 RA Req. SN=-146,TPC=4,RX Antennas=-8, 
1003 21-11-2016 22:55  14  21-11-2016 22:55 RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191, 

基本的に私は以下のように複数の列に1列からパラメータを分離するために必要があります。

id , timestamp, routeid, creationdate, RSRP ,RA REQ. SN, TPC,RX Antennas,MCS 

そうにそれに対してパラメータの値がない場合は、代わりにNULLのように値を設定します。

1000 21-11-2016 22:55  14  21-11-2016 22:55 -102 NULL NULL NULL NULL 

from pyspark import SparkContext 
import os 
import sys 
from pyspark.sql import SQLContext 
import itertools 
import re 

sc = SparkContext("local","Work") 
sqlContext = SQLContext(sc) 

df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///sample.csv') 

def aaa(a): 
    aa = a.split(',', 15000) 
    filtered = filter(lambda p: not re.match(r'^\s*$', p), aa)   
    listWithNoEmptyLines = [z for z in filtered if z != []] 

    for x in listWithNoEmptyLines: 
     ab = x.split("=") 
     AllList = [] 
     rsrp = "" 
     ra_req_sn = "" 
     tpc = "" 
     rx_antenas = "" 
     mcs = "" 
     if 'RSRP' in ab: 
      rsrp = ab[1] 
     else: 
      rsrp = "NULL" 
     if 'RA Req. SN' in ab: 
      ra_req_sn = ab[1] 
     else: 
      ra_req_sn = "NULL" 
     if 'TPC' in ab: 
      tpc = ab[1] 
     else: 
      tpc = "NULL" 
     if 'RX Antennas' in ab: 
      rx_antenas = ab[1] 
     else: 
      rx_antenas = "NULL" 
     if 'MCS' in ab: 
      mcs = ab[1] 
     else: 
      mcs = "NULL" 
    return rsrp,ra_req_sn,tpc,rx_antenas 
DFtoRDD = df1.rdd.map(list).map(lambda x: [str(x[1]), str(x[2]), str(x[3]), aaa(str(x[4]))]) 
print DFtoRDD.collect() 

は私に次のような結果を与える、

[['1000','21-11-2016 22:55', '14', '21-11-2016 22:55', ('-102', 'NULL', 'NULL', 'NULL')], ['1002',21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '-146', 'NULL', 'NULL')], ['1003','21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '134', 'NULL', 'NULL')]] 

期待される結果:

id  timestamp   routeid creationdate  RSRP RA Req. SN TPC RX Antennas MCS 
    1000 21-11-2016 22:55  14  21-11-2016 22:55 -102  NULL  NULL NULL  NULL 
    1002 21-11-2016 22:55  14  21-11-2016 22:55 NULL -146   4  -8  NULL 
    1003 21-11-2016 22:55  14  21-11-2016 22:55 NULL 134  -191  -91  -83 
+0

私はあなたの質問を得ることはありません。 – eliasah

+0

これはsparkよりも正規表現の問題に似ています。 –

+0

はい、関数aaaではどのように正規表現を適用できますか?データフレームを作成して変換することはできますか? @ elisash私はそれの属性をチェックして、各属性とその値の列を作成することによってパラメータの列を分離したい –

答えて

2

を値は、行を記入

が存在する場合、これは私が試したものですudfを続けて定義し、各フィールドを選択する必要があります。私はタブセパレータで行ったのと同じデータを使用しています。

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt') 
df1.show() 
# +----+----------------+-------+----------------+--------------------+ 
# | id|  timestamp|routeid| creationdate|   parameters| 
# +----+----------------+-------+----------------+--------------------+ 
# |1000|21-11-2016 22:55|  14|21-11-2016 22:55|   RSRP=-102,| 
# |1002|21-11-2016 22:55|  14|21-11-2016 22:55|RA Req. SN=-146,T...| 
# |1003|21-11-2016 22:55|  14|21-11-2016 22:55|RA Req. SN=134,RX...| 
# +----+----------------+-------+----------------+--------------------+ 

今上述したように私たちのUDFを定義してみましょう:

import re 
def f_(s): 
    pattern = re.compile("([^,=]+)=([0-9\-]+)") 
    return dict(pattern.findall(s or "")) 

私たちは、 "シンプル" のサンプル上で直接機能をテストすることができます。

f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,") 
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'} 

[OK]を、それが働いています。私たちは今、SQLで使用するために登録することができます:

spark.udf.register("f", f_, MapType(StringType(), StringType())) 

spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show() 
# +---------------------------------------------------+ 
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)| 
# +---------------------------------------------------+ 
# |        Map(RA Req. SN ->...| 
# +---------------------------------------------------+ 

しかし、あなたの場合には、私はあなたが各フィールドの実際UDFに興味があると思う:

extract = udf(f_, MapType(StringType(), StringType())) 

df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show() 
# +----+----------------+-------+----------------+--------------------+----+ 
# | id|  timestamp|routeid| creationdate|   parameters|RSRP| 
# +----+----------------+-------+----------------+--------------------+----+ 
# |1000|21-11-2016 22:55|  14|21-11-2016 22:55|   RSRP=-102,|-102| 
# |1002|21-11-2016 22:55|  14|21-11-2016 22:55|RA Req. SN=-146,T...|null| 
# |1003|21-11-2016 22:55|  14|21-11-2016 22:55|RA Req. SN=134,RX...|null| 
# +----+----------------+-------+----------------+--------------------+----+ 
+0

ありがとうeliasah、どのように私のコードであなたの解決策を追加することができます、私は取得していません。あなたは助けることができます –

+0

おかげでたくさんのeliasah、あなたは本当に私を救いました –

+0

確かに。私は単一の文で複数のgetItem()を適用できますか? –