2016-12-16 34 views
1

私はScalaの構造を知っている入力ファイルを読み込もうとしていますが、9番目のエントリが必要です。これまでのところ、私が使用して全体を読み取るために管理している:Scalaはファイルの特定の部分だけを読み込みます

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

問題を、これは巨大(私たちはデータの20ギガバイトを話している)である配列を私に残します。私は、RDD [Array [String]]とArray [String]の間で変換するために非常に醜いコードを書くことを余儀なくされただけでなく、コードが本質的に役に立たなくなってしまった。

私は実際に私がするそれらを必要な形式に私の収集「セル」を置くしかし、何もし

.map() 
.flatMap() and 
.reduceByKey() 

を使用していない間の異なるアプローチとミックスを試みました。

はここで起こることになっているものです。

*---------* 
| NASDAQ: | 
*---------* 
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

のみstock_symbolのホールドを維持:当社のサーバーからテキストファイルのフォルダを読み込み、コードのフォーマットはテキストの各「行」をお読みくださいそれは私が数える識別子です。今までの私の試みでは、配列全体を1つ目から9番目までのインデックスをcollect_cells varに集めるだけでした。問題は私の計算と実生活の結果に基づいて、実行するのに335日かかる(冗談なし)ということです。

は、ここに参照のための私の現在のコードです:私はそれをデバッグしようとしたとして

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object SparkNum { 


    def main(args: Array[String]) { 

    // Do some Scala voodoo 
    val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical")) 

    // Set input file as per HDFS structure + input args 
    val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
    val fields = lines.map(line => line.split(",")) 
    var collected_cells:Array[String] = new Array[String](0) 

    //println("[MESSAGE] Length of CC: " + collected_cells.length) 

    val divider:Long = 9 
    val array_length = fields.count/divider 
    val casted_length = array_length.toInt 

    val indexedFields = fields.zipWithIndex 
    val indexKey = indexedFields.map{case (k,v) => (v,k)} 

    println("[MESSAGE] Number of lines: " + array_length) 
    println("[MESSAGE] Casted lenght of: " + casted_length) 



    for(i <- 1 to casted_length) { 

     println("[URGENT DEBUG] Processin line " + i + " of " + casted_length) 

     var index = 9 * i - 8 

     println("[URGENT DEBUG] Index defined to be " + index) 

     collected_cells :+ indexKey.lookup(index) 

    } 



    println("[MESSAGE] collected_cells size: " + collected_cells.length) 



    val single_cells = collected_cells.flatMap(collected_cells => collected_cells); 
    val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y}) 
    // val result = counted_cells.reduceByKey((a,b) => (a+b)) 

    // val inmem = counted_cells.persist() 
    // 
    // // Collect driver into file to be put into user archive 
    // inmem.saveAsTextFile("path to server location") 

    // ==> Not necessary to save the result as processing time is recorded, not output 


    } 

} 

底部には、現在、コメントアウトされていますが、私は私が行っ必要なものを知ることが擬似コードとして機能します。私はScalaに精通していないので、_表記のようなものが私の人生を混乱させていることを指摘したいかもしれません。

お時間をいただきありがとうございます。

+0

を前処理のみすべての9行目を保つためにスパーク外のファイルを。その部分にSparkを使用するだけでは、人生を難しくしています。 –

+0

「ナスダック」のバナーはファイルのヘッダーですか? – maasg

+0

CSV形式の株価を含むファイルを読み込み、それに含まれる(一意の) 'stock_symbol'の数を抽出したいと思いますか? – maasg

答えて

2

問題の明確化を必要とするいくつかの概念があります:私たちは、このコードを実行

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

データの大きさの巨大な配列にはなりません。この式は、基本データの変換を表します。私たちが望む情報セットにデータを減らすまで、それはさらに変換することができます。

exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

は、私はまた、データファイルは、このようなバナーが含まれていると仮定するつもりです:

*---------* 
| NASDAQ: | 
*---------* 
この場合

、我々はレコードのstock_symbolフィールドは、CSVをエンコードしたいです

最初に行うべきことは、このバナーのようなものを削除することです。実際、最初のフィールドは、英数字で始まる証券取引所の名前であると仮定します。私たちは、その結果、任意の分割を行う前に、私たちはこれを行います。

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields = validLines.map(line => line.split(",")) 

それは我々が期待するデータ型を持っている心の平和を持っているために、変数の型を記述するのに役立ちます。それほど重要ではないかもしれないScalaのスキルが進歩するにつれて、さんがタイプして上記の式を書き直してみましょう:

val lines: RDD[String] = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields: RDD[Array[String]] = validLines.map(line => line.split(",")) 

我々は位置0ベースの配列の要素の第1位であるstock_symbolフィールド、に興味がある:

val stockSymbols:RDD[String] = fields.map(record => record(1)) 

我々はカウントしたい場合すべてのレコードに対して1つのエントリがあるので、これはあまり役に立ちません。これはあまり役に立ちません。もっと興味深い質問があります:

私たちにはいくつの異なる株式記号がありますか?

val uniqueStockSymbols = stockSymbols.distinct.count() 

シンボルごとにいくつのレコードがありますか?スパーク2.0で

val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_) 

は、データフレームおよびデータセットのためのCSVサポートは、我々のデータは、フィールド名を持つヘッダ行(大規模なデータセットで通常の何)、我々はする必要がありますを持っていないことを考えるとボックス のうち利用可能です列名を提供します。

val stockDF = sparkSession.read.csv("/tmp/quotes_clean.csv").toDF("exchange", "symbol", "date", "open", "close", "volume", "price") 

我々は現在、非常に簡単に私たちの質問に答えることができます。

val uniqueSymbols = stockDF.select("symbol").distinct().count 
val recordsPerSymbol = stockDF.groupBy($"symbol").agg(count($"symbol")) 
+0

こんにちはmaasg、お返事いただきありがとうございます!該当する場合は私のコードを変更しましたが、.reduceByKey()で_を使用するとMavenは "拡張された関数のパラメータタイプがありません"と不平を言っています。今のところ私はそれを新しい行にしようとします。代わりに.reduceByKey((a、b)=>(a + b))の古いレイアウトを使用しようとします。乾杯。 – Synaxr

+1

@Synaxrスパークノートのコードをご覧ください:https://gist.github.com/maasg/7b8a4991ba9e2c236ddd8dfd823352cc – maasg

関連する問題