2016-08-22 16 views
0

Spark Streaming JobでSpark SQLを使用して、Hiveテーブルを検索しています。 カフカストリーミングは問題なく正常に動作します。 hiveContext.runSqlHive(sqlQuery);directKafkaStream.foreachRDDの外に実行すると問題なく正常に動作します。しかし、ストリーミングジョブの中でハイブテーブルルックアップが必要です。 JDBC(jdbc:hive2://)を使用すると動作しますが、Spark SQLを使いたいと思います。次のようにSpark SQLがSpark Streaming(KafkaStream)で失敗しました

私のソースコードの重要な場所が見えます:

// set context 
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true"); 
SparkContext sparkSqlContext = new SparkContext(sparkConf); 
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration)); 
HiveContext hiveContext = new HiveContext(sparkSqlContext); 

// Initialize Direct Spark Kafka Stream. Starts from top 
JavaPairInputDStream<String, String> directKafkaStream = 
       KafkaUtils.createDirectStream(streamingContext, 
         String.class, 
         String.class, 
         StringDecoder.class, 
         StringDecoder.class, 
         kafkaParams, 
         topicsSet); 

// work on stream     
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> { 
    rdd.foreachPartition(tuple2Iterator -> { 
     // get message 
     Tuple2<String, String> item = tuple2Iterator.next(); 

     // lookup 
     String sqlQuery = "SELECT something FROM somewhere"; 
     Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery); 
     List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence); 

     }); 
    return null; 
}); 

// Start the computation 
streamingContext.start(); 
streamingContext.awaitTermination();    

私はのtry-catchで囲む場合でも、意味のあるエラーを取得していません。

誰かが助けてくれることを祈っています。ありがとうございました。

//編集:あなたはそれを可能にしませんスパークSQLを使用したいという理由だけで

// work on stream     
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> { 
    // driver 
    Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery); 
    rdd.foreachPartition(tuple2Iterator -> { 
     // worker 
     while (tuple2Iterator != null && tuple2Iterator.hasNext()) { 
      // get message 
      Tuple2<String, String> item = tuple2Iterator.next(); 
      // lookup 
      String result = lookupMap.get(item._2()); 
     } 
    }); 
    return null; 
}); 

答えて

1

: は、ソリューションは次のようになります。 Sparkのルール番号1は、ネストされたアクション、変換、または分散データ構造ではありません。

あなたはforeachRDDに1つのレベルにプッシュ使用することができ、これはかなりここでスパークSQLを使用するようにオプションを使い果たし参加するよう、あなたがたとえば、あなたのクエリを表現することができる場合:

directKafkaStream.foreachRDD(rdd -> 
    hiveContext.runSqlHive(sqlQuery) 
    rdd.foreachPartition(...) 
) 

そうでない場合は、直接JDBC接続することができます有効なオプションです。

+0

メッセージを処理するためにカフカメッセージから特定の値でテーブルルックアップが必要な場合は、どちらの方が良いでしょうか? – cSteusloff

+0

jdbcを介してjoin vs queryを意味しますか? – zero323

+0

私はあなたを誤解したと思います。私の計画はすでに間違いだと思った。私が悪用されるように。 – cSteusloff

関連する問題