2016-05-04 8 views
0

私はApache Sparkで新しく、Spark Streaming + Kafkaインテグレーションの直接アプローチの例(JavaDirectKafkaWordCount.java)を実行しようとしています。スパークとカフカの直接アプローチ

私はすべてのライブラリをダウンロードしましたが、私は実行しようとすると、私は、任意の提案

Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; 
at kafka.api.RequestKeys$.<init>(RequestKeys.scala:48) 
at kafka.api.RequestKeys$.<clinit>(RequestKeys.scala) 
at kafka.api.TopicMetadataRequest.<init>(TopicMetadataRequest.scala:55) 
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122) 
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112) 
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) 
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) 
at it.unimi.di.luca.SimpleApp.main(SimpleApp.java:53) 

このエラーが出ますか?

+0

あなたはどちらのスカラーバージョンをターゲットにしていますか? –

+0

@lu_Ferra、あなたのサンプルコードを投稿すると、他の人があなたのクエストにうまく答えるのに役立つかもしれません。 – Suresh

答えて

-1

私はそれがいくつかのことになると思います。

  • プロジェクトで依存関係が正しく宣言されていない可能性があります。あなたはカフカとスパークストリーミングがあることを確認する必要があります。 mavenのようなビルダーを使用している場合は、ビルダーファイルに追加する必要がある行を見つけることができます。http://mvnrepository.com/
  • 読み込み元のトピックがまだ存在しない場合にもエラーが発生します。あなたはカフカサーバとカフカの飼育係が実行されていることを確認してください

  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
    

    のようなもので、コマンドラインでそれを作成することができます。

これで問題が解決しない場合は、メインを投稿する必要があります。

0

Scala 2.10、Kafka 0.10、Spark 1.6.2、Cassandra 3.5のコードを使用してください。

私はレシーバーレスアプローチ/ダイレクトカフカ消費を使用しています。役に立ったらいいですか

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SaveMode 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka.KafkaUtils 
import com.datastax.spark.connector._ 

import kafka.serializer.StringDecoder 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import java.util.Formatter.DateTime 

object StreamProcessor extends Serializable { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext(sparkConf) 

    val ssc = new StreamingContext(sc, Seconds(2)) 

    val sqlContext = new SQLContext(sc) 

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 

    val topics = args.toSet 

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


     stream 
    .map { 
    case (_, msg) => 
     val result = msgParseMaster(msg) 
     (result.id, result.data) 
    }.foreachRDD(rdd => if (!rdd.isEmpty)  rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data"))) 

     } 
    } 

    ssc.start() 
    ssc.awaitTermination() 

    } 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 
    case class wordCount(id: Long, data1: String, data2: String) extends serializable 
    implicit val formats = DefaultFormats 
    def msgParseMaster(msg: String): wordCount = { 
    val m = parse(msg).extract[wordCount] 
    return m 

    } 

} 
関連する問題