私はScalaの構造を知っている入力ファイルを読み込もうとしていますが、9番目のエントリが必要です。これまでのところ、私が使用して全体を読み取るために管理している:Scalaはファイルの特定の部分だけを読み込みます
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
問題を、これは巨大(私たちはデータの20ギガバイトを話している)である配列を私に残します。私は、RDD [Array [String]]とArray [String]の間で変換するために非常に醜いコードを書くことを余儀なくされただけでなく、コードが本質的に役に立たなくなってしまった。
私は実際に私がするそれらを必要な形式に私の収集「セル」を置くしかし、何もし
.map()
.flatMap() and
.reduceByKey()
を使用していない間の異なるアプローチとミックスを試みました。
はここで起こることになっているものです。
*---------*
| NASDAQ: |
*---------*
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close
のみstock_symbolのホールドを維持:当社のサーバーからテキストファイルのフォルダを読み込み、コードのフォーマットはテキストの各「行」をお読みくださいそれは私が数える識別子です。今までの私の試みでは、配列全体を1つ目から9番目までのインデックスをcollect_cells varに集めるだけでした。問題は私の計算と実生活の結果に基づいて、実行するのに335日かかる(冗談なし)ということです。
は、ここに参照のための私の現在のコードです:私はそれをデバッグしようとしたとして
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkNum {
def main(args: Array[String]) {
// Do some Scala voodoo
val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical"))
// Set input file as per HDFS structure + input args
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
var collected_cells:Array[String] = new Array[String](0)
//println("[MESSAGE] Length of CC: " + collected_cells.length)
val divider:Long = 9
val array_length = fields.count/divider
val casted_length = array_length.toInt
val indexedFields = fields.zipWithIndex
val indexKey = indexedFields.map{case (k,v) => (v,k)}
println("[MESSAGE] Number of lines: " + array_length)
println("[MESSAGE] Casted lenght of: " + casted_length)
for(i <- 1 to casted_length) {
println("[URGENT DEBUG] Processin line " + i + " of " + casted_length)
var index = 9 * i - 8
println("[URGENT DEBUG] Index defined to be " + index)
collected_cells :+ indexKey.lookup(index)
}
println("[MESSAGE] collected_cells size: " + collected_cells.length)
val single_cells = collected_cells.flatMap(collected_cells => collected_cells);
val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y})
// val result = counted_cells.reduceByKey((a,b) => (a+b))
// val inmem = counted_cells.persist()
//
// // Collect driver into file to be put into user archive
// inmem.saveAsTextFile("path to server location")
// ==> Not necessary to save the result as processing time is recorded, not output
}
}
底部には、現在、コメントアウトされていますが、私は私が行っ必要なものを知ることが擬似コードとして機能します。私はScalaに精通していないので、_表記のようなものが私の人生を混乱させていることを指摘したいかもしれません。
お時間をいただきありがとうございます。
を前処理のみすべての9行目を保つためにスパーク外のファイルを。その部分にSparkを使用するだけでは、人生を難しくしています。 –
「ナスダック」のバナーはファイルのヘッダーですか? – maasg
CSV形式の株価を含むファイルを読み込み、それに含まれる(一意の) 'stock_symbol'の数を抽出したいと思いますか? – maasg