私のフォルダに3つのログファイルがあります。 sparkのdiffファイル名のために別のロジックを呼び出す方法
foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog
と同じように、私はそれらのそれぞれからデータを抽出するための3差分Scalaのプログラムファイルを持っています。
employee.scala
department.scala
companylog.scala
それぞれ以下のようにコードされています。
これらのファイルをすべて組み合わせて、並列に実行したいと考えています。
package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}
object logparser {
def main(args: Array[String]) = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Start the Spark context
val conf = new SparkConf()
.setAppName("Parser")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext= new SQLContext(sc)
val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
.map{l =>
if(l._1.endsWith("emplog.txt")){
empparser(l._2,sc,sqlContext)
}
l
}
.foreach{println}
}
def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r
import sqlContext.implicits._
val indrecs = emppattern.findAllIn(record)
.map{ line =>
val emppattern(eid,ename) = line
(eid,ename)
}
.toSeq
.toDF("eid","ename")
.show()
}
}
私は同じオブジェクト内の各メソッドをアタッチする際に自分のコードを試しました。
今質問が2つ発生します Q1。私がコンパイルすると、私は得る
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: [email protected])
- field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class com.sample.logparser$$anonfun$1, <function1>)
私が知っている限り(初心者のみ)スパークコンテキストは直列化できません。パラメータとしてscを渡さないと、Nullpointer例外が発生します。これをどうすれば解決できますか?
Q2:DFに変換した後empparserメソッド内でハイブテーブルコードに挿入します。それが終わると、私はメインの中で何もしたくない。しかし、私の地図コードは、それ以降のアクションがなければ実行されません。それ以来、私はなぜforeacch printlnを持っているのです。この問題を克服する方法はありますか?
ちょうど3つある場合は、それぞれをRDDに読み込み、そのRDDをファイル名のキーにマップしてから、「結合」を1つにします。 –
"スカラファイル"ではなく、スカラロジックをクラス/関数として考える必要があります。次に、データファイルをいくつかのロジックにマップするための識別器が必要になります。たぶんファイル名、たぶんファイルの内容ですか?私は質問が十分に具体的ではないので、わからないでしょう。処理ロジックとデータサンプルを質問に追加してみてください。また、投稿する前にさらに研究をすることをお勧めします。この種の問題をカバーする多くのリソースがあります。 – maasg
はロジックで質問を更新しました。 – user7264473