2017-03-02 8 views
1

AVROメッセージをRabbitMQメッセージキューから取り出すためにSparkコネクタを作成しようとしています。 AVROメッセージをデコードするときに、NoSuchMethodErrorエラーが発生します。これは、Sparkで実行している場合にのみ発生します。NoSuchMethodErrorがシェイプレスでのみ表示される

スパークコードをスパークの外側で正確に再現できませんでしたが、私は2つの例が十分に似ていると思います。私はこれが同じシナリオを再現する最小のコードだと思います。

情報がプライベートであり、接続が問題ではないと思われるため、すべての接続パラメータを削除しました。

スパークコード:

package simpleexample 

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDistributedKey 
import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting 
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.storage.StorageLevel 

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 

import com.sksamuel.avro4s._ 

import java.io.{ByteArrayInputStream, ByteArrayOutputStream} 
import com.rabbitmq.client.QueueingConsumer.Delivery 
import java.util.HashMap 


case class AttributeTuple(attrName: String, attrValue: String) 

// AVRO Schema for Events 

case class DeviceEvent(
    tenantName: String, 
    groupName: String, 
    subgroupName: String, 
    eventType: String, 
    eventSource: String, 
    deviceTypeName: String, 
    deviceId: Int, 
    timestamp: Long, 
    attribute: AttributeTuple 
) 

object RabbitMonitor { 
    def main(args: Array[String]) { 
    println("start") 

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RabbitMonitor") 
    val ssc = new StreamingContext(sparkConf, Seconds(60)) 

    def parseArrayEvent(delivery: Delivery): Seq[DeviceEvent] = { 
     val in = new ByteArrayInputStream(delivery.getBody()) 
     val input = AvroInputStream.binary[DeviceEvent](in) 
     input.iterator.toSeq 
    } 

    val params: Map[String, String] = Map(
     /* many rabbit connection parameters */ 
     "maxReceiveTime" -> "60000" // 60s 
    ) 

    val distributedKey = Seq(
     RabbitMQDistributedKey(
      /* queue name */, 
      new ExchangeAndRouting(/* exchange name */, /* routing key */), 
      params 
     ) 
    ) 

    var events = RabbitMQUtils.createDistributedStream[Seq[DeviceEvent]](ssc, distributedKey, params, parseArrayEvent) 
    events.print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

非スパークコード:

package simpleexample 

import com.thenewmotion.akka.rabbitmq._ 
import akka.actor._ 
// avoid name collision with rabbitmq channel 
import scala.concurrent.{Channel => BasicChannel} 
import scala.concurrent.ExecutionContext.Implicits.global 

import com.sksamuel.avro4s._ 
import java.io.{ByteArrayInputStream, ByteArrayOutputStream} 

object Test extends App { 
    implicit val system = ActorSystem() 

    val factory = new ConnectionFactory() 
    /* Set connection parameters*/ 
    val exchange: String = /* exchange name */ 

    val connection: ActorRef = system.actorOf(ConnectionActor.props(factory), "rabbitmq") 

    def setupSubscriber(channel: Channel, self: ActorRef) { 
     val queue = channel.queueDeclare().getQueue 
     channel.queueBind(queue, exchange, /* routing key */) 
     val consumer = new DefaultConsumer(channel) { 
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) { 
      val in = new ByteArrayInputStream(body) 
      val input = AvroInputStream.binary[DeviceEvent](in) 
      val result = input.iterator.toSeq 
      println(result) 
      } 
     } 

     channel.basicConsume(queue, true, consumer) 
     } 

    connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("eventSubscriber")) 


    scala.concurrent.Future { 
     def loop(n: Long) { 
      Thread.sleep(1000) 
      if (n < 30) { 
       loop(n + 1) 
      } 
     } 
     loop(0) 
    } 
} 

無点火出力(最後の行が正常に復号されたアップデートである):

[email protected]:~/src/scala/so-repro/connector/target/scala-2.11$ scala project.jar 
[INFO] [03/02/2017 14:11:06.899] [default-akka.actor.default-dispatcher-4] [akka://default/deadLetters] Message [com.thenewmotion.akka.rabbitmq.ChannelCreated] from Actor[akka://default/user/rabbitmq#-889215077] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [03/02/2017 14:11:07.337] [default-akka.actor.default-dispatcher-3] [akka://default/user/rabbitmq] akka://default/user/rabbitmq connected to amqp://<rabbit info> 
[INFO] [03/02/2017 14:11:07.509] [default-akka.actor.default-dispatcher-4] [akka://default/user/rabbitmq/eventSubscriber] akka://default/user/rabbitmq/eventSubscriber connected 
Stream(DeviceEvent(int,na,d01,deviceAttrUpdate,device,TestDeviceType,33554434,1488492704421,AttributeTuple(temperature,60)), ?) 

スパーク出力:

[email protected]:~/src/scala/so-repro/connector/target/scala-2.11$ spark-submit ./project.jar --class RabbitMonitor 
start 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
17/03/02 14:20:15 INFO SparkContext: Running Spark version 2.1.0 
17/03/02 14:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
17/03/02 14:20:16 WARN Utils: Your hostname, drexThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlp3s0) 
17/03/02 14:20:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 
17/03/02 14:20:16 INFO SecurityManager: Changing view acls to: drex 
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls to: drex 
17/03/02 14:20:16 INFO SecurityManager: Changing view acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(drex); groups with view permissions: Set(); users with modify permissions: Set(drex); groups with modify permissions: Set() 
17/03/02 14:20:16 INFO Utils: Successfully started service 'sparkDriver' on port 34701. 
17/03/02 14:20:16 INFO SparkEnv: Registering MapOutputTracker 
17/03/02 14:20:16 INFO SparkEnv: Registering BlockManagerMaster 
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 
17/03/02 14:20:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-5cbb13bf-78fe-4227-81b3-1afea40f899a 
17/03/02 14:20:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 
17/03/02 14:20:16 INFO SparkEnv: Registering OutputCommitCoordinator 
17/03/02 14:20:16 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
17/03/02 14:20:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.11:4040 
17/03/02 14:20:16 INFO SparkContext: Added JAR file:/home/drex/src/scala/so-repro/connector/target/scala-2.11/./project.jar at spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614 
17/03/02 14:20:16 INFO Executor: Starting executor ID driver on host localhost 
17/03/02 14:20:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33276. 
17/03/02 14:20:16 INFO NettyBlockTransferService: Server created on 192.168.1.11:33276 
17/03/02 14:20:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 
17/03/02 14:20:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None) 
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.11:33276 with 366.3 MB RAM, BlockManagerId(driver, 192.168.1.11, 33276, None) 
17/03/02 14:20:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.11, 33276, None) 
17/03/02 14:20:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.11, 33276, None) 
17/03/02 14:20:17 INFO RabbitMQDStream: Duration for remembering RDDs set to 60000 ms for [email protected]546621c4 
17/03/02 14:20:17 INFO RabbitMQDStream: Slide time = 60000 ms 
17/03/02 14:20:17 INFO RabbitMQDStream: Storage level = Memory Deserialized 1x Replicated 
17/03/02 14:20:17 INFO RabbitMQDStream: Checkpoint interval = null 
17/03/02 14:20:17 INFO RabbitMQDStream: Remember interval = 60000 ms 
17/03/02 14:20:17 INFO RabbitMQDStream: Initialized and validated [email protected]546621c4 
17/03/02 14:20:17 INFO ForEachDStream: Slide time = 60000 ms 
17/03/02 14:20:17 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
17/03/02 14:20:17 INFO ForEachDStream: Checkpoint interval = null 
17/03/02 14:20:17 INFO ForEachDStream: Remember interval = 60000 ms 
17/03/02 14:20:17 INFO ForEachDStream: Initialized and validated [email protected] 
17/03/02 14:20:17 INFO RecurringTimer: Started timer for JobGenerator at time 1488493260000 
17/03/02 14:20:17 INFO JobGenerator: Started JobGenerator at 1488493260000 ms 
17/03/02 14:20:17 INFO JobScheduler: Started JobScheduler 
17/03/02 14:20:17 INFO StreamingContext: StreamingContext started 
17/03/02 14:21:00 INFO JobScheduler: Added jobs for time 1488493260000 ms 
17/03/02 14:21:00 INFO JobScheduler: Starting job streaming job 1488493260000 ms.0 from job set of time 1488493260000 ms 
17/03/02 14:21:00 INFO SparkContext: Starting job: print at RabbitMonitor.scala:94 
17/03/02 14:21:00 INFO DAGScheduler: Got job 0 (print at RabbitMonitor.scala:94) with 1 output partitions 
17/03/02 14:21:00 INFO DAGScheduler: Final stage: ResultStage 0 (print at RabbitMonitor.scala:94) 
17/03/02 14:21:00 INFO DAGScheduler: Parents of final stage: List() 
17/03/02 14:21:00 INFO DAGScheduler: Missing parents: List() 
17/03/02 14:21:00 INFO DAGScheduler: Submitting ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93), which has no missing parents 
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 366.3 MB) 
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1752.0 B, free 366.3 MB) 
17/03/02 14:21:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.11:33276 (size: 1752.0 B, free: 366.3 MB) 
17/03/02 14:21:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996 
17/03/02 14:21:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93) 
17/03/02 14:21:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
17/03/02 14:21:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7744 bytes) 
17/03/02 14:21:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 
17/03/02 14:21:00 INFO Executor: Fetching spark://192.168.1.11:34701/jars/project.jar with timestamp 1488493216614 
17/03/02 14:21:00 INFO TransportClientFactory: Successfully created connection to /192.168.1.11:34701 after 23 ms (0 ms spent in bootstraps) 
17/03/02 14:21:00 INFO Utils: Fetching spark://192.168.1.11:34701/jars/project.jar to /tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/fetchFileTemp2710654534934784726.tmp 
17/03/02 14:21:00 INFO Executor: Adding file:/tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/project.jar to class loader 
<removing rabbit queue connection parameters> 
17/03/02 14:21:02 INFO RabbitMQRDD: Receiving data in Partition 0 from 
</removing rabbit queue connection parameters> 
17/03/02 14:21:50 WARN BlockManager: Putting block rdd_0_0 failed due to an exception 
17/03/02 14:21:50 WARN BlockManager: Block rdd_0_0 could not be removed as it was not found on disk or in memory 
17/03/02 14:21:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy; 
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447) 
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70) 
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70) 
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93) 
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/03/02 14:21:50 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: shapeless.Lazy.map(Lscala/Function1;)Lshapeless/Lazy; 
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447) 
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70) 
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70) 
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93) 
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209) 
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

17/03/02 14:21:50 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
17/03/02 14:21:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/03/02 14:21:50 INFO TaskSchedulerImpl: Cancelling stage 0 

build.sbt:

retrieveManaged := true 

lazy val sparkVersion = "2.1.0" 

scalaVersion in ThisBuild := "2.11.8" 

lazy val rabbit = (project in file("rabbit-plugin")).settings(
    name := "Spark Streaming RabbitMQ Receiver", 
    homepage := Some(url("https://github.com/Stratio/RabbitMQ-Receiver")), 
    description := "RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark from RabbitMQ.", 
    exportJars := true, 

    assemblyJarName in assembly := "rabbit.jar", 
    test in assembly := {}, 

    moduleName := "spark-rabbitmq", 
    organization := "com.stratio.receive", 
    version := "0.6.0", 

    libraryDependencies ++= Seq(
     "org.apache.spark" %% "spark-core" % sparkVersion % "provided", 
     "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", 
     "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", 
     "com.typesafe.akka" %% "akka-actor" % "2.4.11", 
     "com.rabbitmq" % "amqp-client" % "3.6.6", 
     "joda-time" % "joda-time" % "2.8.2", 
     "com.github.sstone" %% "amqp-client" % "1.5" % Test, 
     "org.scalatest" %% "scalatest" % "2.2.2" % Test, 
     "org.scalacheck" %% "scalacheck" % "1.11.3" % Test, 
     "junit" % "junit" % "4.12" % Test, 
     "com.typesafe.akka" %% "akka-testkit" % "2.4.11" % Test 
     ) 
) 

lazy val root = (project in file("connector")).settings(
    name := "Connector from Rabbit to Kafka queue", 
    description := "", 
    exportJars := true, 

    test in assembly := {}, 
    assemblyJarName in assembly := "project.jar", 

    libraryDependencies ++= Seq(
     "org.apache.spark" %% "spark-core" % sparkVersion % "provided", 
     "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", 
     "com.thenewmotion" %% "akka-rabbitmq" % "3.0.0", 
     "org.apache.kafka" % "kafka_2.10" % "0.10.1.1", 
     "com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4" 
     ) 
) dependsOn rabbit 

私も一緒に火花のための「脂肪ジャー」(addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4"))を入れてアセンブリを使用し、上記の両方の例で使用されるjarファイルを生成するコマンドsbt assemblyを使用しています。私はSpark 2.1.0を実行しています。

私はSpark/Scalaエコシステムには比較的新しいので、うまくいけば、これは私のビルド設定の問題です。 Sparkではシェイプレスが利用できないということは意味を持ちません。

+1

推測:SparkはShapeless 2.0.0(Breeze経由)に依存しています。現在のリリースと互換性がありません。 – zero323

+0

似たような問題がありましたが、私の場合、スパークはavro4sが引っ張るものに比べて古いバージョンを使用するので、犯人はapache avroライブラリでした。 –

答えて

0

zero323私が知る限り正しい答えがあります。 Spark 2.1.0は、それ自体がShapeless 2.0.0に依存する依存関係を持っています。

この問題は、Shapelessとshadeシェイプレスを使用する依存関係をインポートするか、別のavroライブラリを使用する2つの方法のいずれかで解決できます。私は後者の解決策に行きました。

関連する問題