kafka-sparkストリーミング統合を実行してリアルタイムでデータを取得します。 コード:Sparkストリーミングジョブは、バックグラウンドで実行しても3-4時間後に自動的に終了します。
は/ usr/binに/火花提出--packages org.apache.spark:火花ストリーミングkafka_2.10:1.6.2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
#set auto.offset.reset = smallest
sc = SparkContext(appName="PythonStreamingDirectKafka")
ssc = StreamingContext(sc, 3600)
brokers = *****
topic = ******
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
lines.pprint()
lines.saveAsTextFiles('/tmp/')
ssc.start()
ssc.awaitTermination()
は、このコマンドを使用してバックグラウンドでジョブを実行します - マスター糸get_stream.py> stream.log 2>
これはspark-streamジョブから生成されたstream.logです。このジョブは3-4時間後に自動的にシャットダウンします。私はTRACEモードのログに取得してい エラーが(全体のログが表示されない、それはあまりにも大きいです)されています
16/08/11 09:56:09 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/11 09:56:09 DEBUG JobScheduler: Stopping JobScheduler
16/08/11 09:56:09 INFO JobGenerator: Stopping JobGenerator immediately
16/08/11 09:56:09 INFO RecurringTimer: Stopped timer for JobGenerator after time 1470906000000
16/08/11 09:56:09 INFO JobGenerator: Stopped JobGenerator
16/08/11 09:56:09 DEBUG JobScheduler: Stopping job executor
16/08/11 09:56:09 DEBUG JobScheduler: Stopped job executor
16/08/11 09:56:09 INFO JobScheduler: Stopped JobScheduler
16/08/11 09:56:09 INFO StreamingContext: StreamingContext stopped successfully
16/08/11 09:56:09 INFO SparkContext: Invoking stop() from shutdown hook
16/08/11 09:56:09 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=29, src=/var/log/spark/apps/application_1470897979038_0002.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512
16/08/11 09:56:09 DEBUG DFSClient: DFSClient flush(): bytesCurBlock=64944 lastFlushOffset=64878 createNewBlock=false
16/08/11 09:56:09 DEBUG DFSClient: Queued packet 29
16/08/11 09:56:09 DEBUG DFSClient: Waiting for ack for: 29
16/08/11 09:56:09 TRACE Tracer: setting current span null
16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 29 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 64944
16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 29 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0
16/08/11 09:56:09 INFO SparkUI: Stopped Spark web UI at http://10.102.224.120:4040
16/08/11 09:56:09 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=30, src=/var/log/spark/apps/application_1470897979038_0002.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512
16/08/11 09:56:09 DEBUG DFSClient: Queued packet 30
16/08/11 09:56:09 DEBUG DFSClient: Queued packet 31
16/08/11 09:56:09 DEBUG DFSClient: Waiting for ack for: 31
16/08/11 09:56:09 TRACE Tracer: setting current span null
16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 30 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 64944
16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 30 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0
16/08/11 09:56:09 TRACE Tracer: setting current span null
16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 31 offsetInBlock: 64944 lastPacketInBlock: true lastByteOffsetInBlock: 64944
16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 31 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0
16/08/11 09:56:09 DEBUG DFSClient: Closing old block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: complete {src: "/var/log/spark/apps/application_1470897979038_0002.inprogress" clientName: "DFSClient_NONMAPREDUCE_258672080_15" last { poolId: "BP-730701491-10.102.224.120-1470897963878" blockId: 1073741871 generationStamp: 1047 numBytes: 64944 } fileId: 16590}
16/08/11 09:56:09 DEBUG Client: The ping interval is 60000 ms.
16/08/11 09:56:09 DEBUG Client: Connecting to ip-10-102-224-120.ec2.internal/10.102.224.120:8020
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop: starting, having connections 2
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10767
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10767
16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: complete took 3ms
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: complete {result: true}
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: getFileInfo {src: "/var/log/spark/apps/application_1470897979038_0002"}
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10768
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10768
16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: getFileInfo took 1ms
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: getFileInfo {}
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: rename {src: "/var/log/spark/apps/application_1470897979038_0002.inprogress" dst: "/var/log/spark/apps/application_1470897979038_0002"}
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10769
16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10769
16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: rename took 2ms
16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: rename {result: true}
16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Shutting down all executors
16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Interrupting monitor thread
16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down
16/08/11 09:56:09 DEBUG AbstractService: Service: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state STOPPED
16/08/11 09:56:09 DEBUG Client: stopping client from cache: [email protected]
16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Stopped
16/08/11 09:56:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/08/11 09:56:09 INFO MemoryStore: MemoryStore cleared
16/08/11 09:56:09 INFO BlockManager: BlockManager stopped
16/08/11 09:56:09 INFO BlockManagerMaster: BlockManagerMaster stopped
16/08/11 09:56:09 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/08/11 09:56:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/08/11 09:56:09 INFO SparkContext: Successfully stopped SparkContext
16/08/11 09:56:09 INFO ShutdownHookManager: Shutdown hook called
16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983/pyspark-f6c4c7f7-f6e5-4dcf-a9cd-cf03391413d9
16/08/11 09:56:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983
16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983/httpd-9104e927-bd6d-4338-bd20-fd01b7d3ce7f
16/08/11 09:56:09 DEBUG Client: stopping client from cache: [email protected]