2017-02-28 4 views
0

ローカルディレクトリに多数の販売テキストファイルを含むフォルダがありました。さんが2つのテキストファイルの例を見てみましょう:構造化されていないファイルのスパーク抽出と変換

テキストファイル1:

Sales Details 
20161120 


Sales Person: John 

Code Product  Quantity Price 
A0001 Product1 20   15.90 
A0003 Product3 13   23.80 

テキストファイル2:

Sales Details 
20161130 


Sales Person: Alicia 

Code Product  Quantity Price 
A0007 Product7 342   79.50 
A0008 Product8 55   432.80 
A0009 Product9 100   134.30 

を私はHDFSにファイルをストリーミングするために水路を使用していました。これらの小さなファイルはすべてHDFSの1つの大きなファイルにまとめられています。 Sparkを使ってこれらのファイルを抽出して変換するとき、私はここで皆さんからアドバイスを求める必要があるいくつかの問題に直面しました。

上記2つのファイルに基づいて、HDFSの1つのファイルに結合されます。私は次のようHDFSからテキストファイルを読むためにスパークを使用します。

私は2つの販売詳細に ラインを分割し、各営業担当者のための情報を抽出するにはどうすればよい
lines = spark.read.text('/user/tester/sales') 

Date  SalesPerson  Code  Product  Quantity Price 

感謝を:私の最終目標は、情報を抽出し、以下のstrucutureでハイブテーブルにそれを置くことです。

答えて

0

あなたのファイル構造は扱いが簡単ではありませんが、sparkのwholeTextFilesで正規表現を使用するといつでも表形式に書き直すことができます。例としてこのpysparkコードを参照してください:

import re 

def extract_sales(file): 
    for line in file[1].split("\n"): 
     if re.match('\d{8}', line.strip()): 
      date = line.strip() 
     if re.search('^Sales Person', line): 
      person = re.match("^Sales Person: (.*)", line).group(1) 
     if re.search('^A00', line): 
      yield [date, person] + re.split('\s+', line) 

raw_data = spark.sparkContext.wholeTextFiles('sales/') 
raw_data.flatMap(extract_sales) \ 
    .toDF(['Date', 'SalesPerson', 'Code', 'Product', 'Quantity', 'Price']).show() 

+--------+-----------+-----+--------+--------+------+ 
| Date|SalesPerson| Code| Product|Quantity| Price| 
+--------+-----------+-----+--------+--------+------+ 
|20161120|  John|A0001|Product1|  20| 15.90| 
|20161120|  John|A0003|Product3|  13| 23.80| 
|20161130|  Alicia|A0007|Product7|  342| 79.50| 
|20161130|  Alicia|A0008|Product8|  55|432.80| 
|20161130|  Alicia|A0009|Product9|  100|134.30| 
+--------+-----------+-----+--------+--------+------+ 
+0

ありがとうMariusz !!!それを処理する方法のアイデアを得るのに役立ちます。ユーザー定義関数を使用することで、sparkアプリケーションを実行しているときでも、ローカルのpython処理の代わりにsparkエンジンを活用しますか? – kcyea

+0

はい、確かです。この機能はクラスタノード上で実行されますが、並列処理を実現するには、入力ディレクトリに複数の大きなファイルが存在する必要があります。 – Mariusz

+0

Ok ...ところで、 "text"と比べて "wholeTextFiles"を使う方が良いでしょうか? – kcyea

関連する問題