2016-12-20 5 views
5

私はカフカ0.10.1.0とエラーの下に取得し、スパークしていますのために安全ではありません2.0.2スパーク:2.0.2 java.util.ConcurrentModificationExceptionが:KafkaConsumerは、マルチスレッドアクセス

private val spark = SparkSession.builder() 
.master("local[*]") 
.appName(job.name) 
.config("spark.cassandra.connection.host","localhost")) 
.config("spark.cassandra.connection.port","9042") 
.config("spark.streaming.receiver.maxRate", 10000) 
.config("spark.streaming.kafka.maxRatePerPartition", 10000) 
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1) 
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1) 
.getOrCreate() 

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"), 
"key.deserializer" -> classOf[StringDeserializer], 
"value.deserializer" -> classOf[StringDeserializer], 
"group.id" -> job.name, 
"auto.offset.reset" -> config.getString("kafka.offset"), 
"enable.auto.commit" -> (false: java.lang.Boolean) 
)` 

例外

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

allreadeは、メールのチェーンが、同じエラーに無い解像度まだhttps://www.mail-archive.com/[email protected]/msg56566.html

答えて

0

蘭を見て、解決策を見つけることができませんでした。代わりに、私はこの問題を避けるために "--executor-cores 1"をspark-submitと一緒に使用します。誰かが解決策を見つけたら、

関連する問題