2016-04-04 13 views
0

私はvirtualboxでsparkを勉強しています。私は./bin/spark-shellを使ってsparkを開き、scalaを使います。今では、スカラを使ったキー値フォーマットについて混乱しています。私は、このtxtファイルを取得するためにsc.textFileを使用sparkでscalaを使用してキー値形式を生成する方法

panda 0 
pink 3 
pirate 3 
panda 1 
pink 4 

は、私は次のようになり、家庭/風水/火花/データ内のtxtファイルを、持っています。私は

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7") 

をすればそれから私は、画面上のRDDを表示する)(rdd.collectを使用することができます。これなし」.txtの

scala> rdd.collect() 
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4) 

しかし、私がしなければ

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt") 

を" ここに。 rdd.collect()を使用すると、間違いがあります。

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 
...... 

しかし、私は他の例を見ました。それらのすべては最後に ".txt"を持っています。私のコードやシステムにsthが間違っていますか?

もう一つは、私がしようとしたときです:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x)) 
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29 
scala> rddd.collect() 
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4)) 

私はデータの最初の列を選択し、キーとして使用するためのもの。しかし、rddd.collect()は単語が2回出現するようには見えませんが、これは正しくありません。私はmapbykey、reducebykeyなどの残りの操作を続けることができません。どこが間違っていたのですか?

本当にありがとうございます。

+0

あなたの質問は「.txt」の使用と少し矛盾しているようです。テキストとコードの挿入を確認して、すべて正しいことを確認できますか?そうであれば、あなたのシステムは本当に台無しに見えます。 – Phasmid

答えて

1

ちょうど例えば私はラインでレコードを分割し、この後、あなたのデータセットStringを作成し、RDDを作成するためにSparkContextさんparallelizeメソッドを使用します。 RDDを作成した後、mapメソッドを使用して、各レコードに格納されているStringを分割し、に変換します。

import org.apache.spark.sql.Row 
val text = "panda 0\npink 3\npirate 3\npanda 1\npink 4" 

val rdd = sc.parallelize(text.split("\n")).map(x => Row(x.split(" "):_*)) 
rdd.take(3) 

take方法からの出力は次のとおりです。あなたの最初の質問について

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3]) 

、ファイルは任意の拡張子を持つようにする必要はありません。この場合、ファイルはプレーンテキストと見なされるからです。

関連する問題