2016-12-18 9 views
5

私のフォルダに3つのログファイルがあります。 sparkのdiffファイル名のために別のロジックを呼び出す方法

foldera = emplog,deptlog,companylog 
folderb = emplog,deptlog,companylog 
folderc = emplog,deptlog,companylog 

と同じように、私はそれらのそれぞれからデータを抽出するための3差分Scalaのプログラムファイルを持っています。

employee.scala 
department.scala 
companylog.scala 

それぞれ以下のようにコードされています。

これらのファイルをすべて組み合わせて、並列に実行したいと考えています。

package com.sample 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 
import org.apache.spark.sql.SQLContext 
import org.apache.log4j.{Level, Logger} 

object logparser { 
    def main(args: Array[String]) = { 

     Logger.getLogger("org").setLevel(Level.OFF)  
     Logger.getLogger("akka").setLevel(Level.OFF) 
    //Start the Spark context 
    val conf = new SparkConf() 
     .setAppName("Parser") 
     .setMaster("local") 

     val sc = new SparkContext(conf) 
     val sqlContext= new SQLContext(sc) 

     val test = sc.wholeTextFiles("C:\\mkdir\\*\\*") 
     .map{l => 
      if(l._1.endsWith("emplog.txt")){ 
      empparser(l._2,sc,sqlContext) 
       } 

      l 
     } 
     .foreach{println} 
    } 

    def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = { 
    val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r 

     import sqlContext.implicits._ 
    val indrecs = emppattern.findAllIn(record) 
    .map{ line => 
     val emppattern(eid,ename) = line 

    (eid,ename) 
    } 
    .toSeq 
    .toDF("eid","ename") 

    .show() 


    } 
} 

私は同じオブジェクト内の各メソッドをアタッチする際に自分のコードを試しました。

今質問が2つ発生します Q1。私がコンパイルすると、私は得る

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext) 
    - object (class com.sample.logparser$$anonfun$1, <function1>) 

私が知っている限り(初心者のみ)スパークコンテキストは直列化できません。パラメータとしてscを渡さないと、Nullpointer例外が発生します。これをどうすれば解決できますか?

Q2:DFに変換した後empparserメソッド内でハイブテーブルコードに挿入します。それが終わると、私はメインの中で何もしたくない。しかし、私の地図コードは、それ以降のアクションがなければ実行されません。それ以来、私はなぜforeacch printlnを持っているのです。この問題を克服する方法はありますか?

+0

ちょうど3つある場合は、それぞれをRDDに読み込み、そのRDDをファイル名のキーにマップしてから、「結合」を1つにします。 –

+1

"スカラファイル"ではなく、スカラロジックをクラス/関数として考える必要があります。次に、データファイルをいくつかのロジックにマップするための識別器が必要になります。たぶんファイル名、たぶんファイルの内容ですか?私は質問が十分に具体的ではないので、わからないでしょう。処理ロジックとデータサンプルを質問に追加してみてください。また、投稿する前にさらに研究をすることをお勧めします。この種の問題をカバーする多くのリソースがあります。 – maasg

+0

はロジックで質問を更新しました。 – user7264473

答えて

1

質問に答えるために、従業員または部門を処理した結果が同じ種類のレコードのになると仮定します。これはデータの種類ごとに異なると思いますので、この「現実的な調整」を可能にするために別々のレコードの処理を別々に保っています。

最初に、異なる種類またはレコードタイプのレコードcase classとパーサーを定義します。

val dataPath = "/.../data/wholefiles/*/*" 
val logFiles = sc.wholeTextFiles(dataPath) 

をそして、我々はファイルをフィルタリングすることで、レコードの異なる種類を処理:

case class Record(id:String, name: String) 

val empParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

val deptParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

val companyParser: String => Option[Record] = { record => 
    val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r 
    record match { 
    case pattern(eid,ename) => Some(Record(eid, ename)) 
    case _ => None 
    } 
} 

我々はwholeFilesを使用してデータをロードする(ここでは、簡単のため同じIMPLをコピーしています)私たちが必要とする種類のファイルを取得し、上で定義したパーサーを適用します。実際に同じプロセスをどのように繰り返すのかに注意してください。これは抽象化することができます。

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))} 
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))} 
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))} 

現在DATAFRAME

val empDF = empLogs.toDF 

に変換し、我々としても、他のレコードタイプのために同じことを行うことができます。

さまざまなデータタイプのプロセスで共通性を見つけることができるかどうかによって、このプロセスでコードの重複を減らす余地が十分にあります。

+0

詳細な説明はありません – user7264473

関連する問題