2016-08-11 15 views
-1

kafka-sparkストリーミング統合を実行してリアルタイムでデータを取得します。 コード:Sparkストリーミングジョブは、バックグラウンドで実行しても3-4時間後に自動的に終了します。

は/ usr/binに/火花提出--packages org.apache.spark:火花ストリーミングkafka_2.10:1.6.2

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

#set auto.offset.reset = smallest 

sc = SparkContext(appName="PythonStreamingDirectKafka") 
ssc = StreamingContext(sc, 3600) 

brokers = ***** 
topic = ****** 

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
lines = kvs.map(lambda x: x[1]) 

lines.pprint() 

lines.saveAsTextFiles('/tmp/') 

ssc.start() 
ssc.awaitTermination() 

は、このコマンドを使用してバックグラウンドでジョブを実行します - マスター糸get_stream.py> stream.log 2>

これはspark-streamジョブから生成されたstream.logです。このジョブは3-4時間後に自動的にシャットダウンします。私はTRACEモードのログに取得してい エラーが(全体のログが表示されない、それはあまりにも大きいです)されています

16/08/11 09:56:09 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 
    16/08/11 09:56:09 DEBUG JobScheduler: Stopping JobScheduler 
    16/08/11 09:56:09 INFO JobGenerator: Stopping JobGenerator immediately 
    16/08/11 09:56:09 INFO RecurringTimer: Stopped timer for JobGenerator after time 1470906000000 
    16/08/11 09:56:09 INFO JobGenerator: Stopped JobGenerator 
    16/08/11 09:56:09 DEBUG JobScheduler: Stopping job executor 
    16/08/11 09:56:09 DEBUG JobScheduler: Stopped job executor 
    16/08/11 09:56:09 INFO JobScheduler: Stopped JobScheduler 
    16/08/11 09:56:09 INFO StreamingContext: StreamingContext stopped successfully 
    16/08/11 09:56:09 INFO SparkContext: Invoking stop() from shutdown hook 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=29, src=/var/log/spark/apps/application_1470897979038_0002.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient flush(): bytesCurBlock=64944 lastFlushOffset=64878 createNewBlock=false 
    16/08/11 09:56:09 DEBUG DFSClient: Queued packet 29 
    16/08/11 09:56:09 DEBUG DFSClient: Waiting for ack for: 29 
    16/08/11 09:56:09 TRACE Tracer: setting current span null 
    16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 29 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 64944 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 29 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0 
    16/08/11 09:56:09 INFO SparkUI: Stopped Spark web UI at http://10.102.224.120:4040 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient writeChunk allocating new packet seqno=30, src=/var/log/spark/apps/application_1470897979038_0002.inprogress, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512 
    16/08/11 09:56:09 DEBUG DFSClient: Queued packet 30 
    16/08/11 09:56:09 DEBUG DFSClient: Queued packet 31 
    16/08/11 09:56:09 DEBUG DFSClient: Waiting for ack for: 31 
    16/08/11 09:56:09 TRACE Tracer: setting current span null 
    16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 30 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 64944 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 30 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0 
    16/08/11 09:56:09 TRACE Tracer: setting current span null 
    16/08/11 09:56:09 DEBUG DFSClient: DataStreamer block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 sending packet packet seqno: 31 offsetInBlock: 64944 lastPacketInBlock: true lastByteOffsetInBlock: 64944 
    16/08/11 09:56:09 DEBUG DFSClient: DFSClient seqno: 31 reply: SUCCESS downstreamAckTimeNanos: 0 flag: 0 
    16/08/11 09:56:09 DEBUG DFSClient: Closing old block BP-730701491-10.102.224.120-1470897963878:blk_1073741871_1047 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: complete {src: "/var/log/spark/apps/application_1470897979038_0002.inprogress" clientName: "DFSClient_NONMAPREDUCE_258672080_15" last { poolId: "BP-730701491-10.102.224.120-1470897963878" blockId: 1073741871 generationStamp: 1047 numBytes: 64944 } fileId: 16590} 
    16/08/11 09:56:09 DEBUG Client: The ping interval is 60000 ms. 
    16/08/11 09:56:09 DEBUG Client: Connecting to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop: starting, having connections 2 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10767 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10767 
    16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: complete took 3ms 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: complete {result: true} 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: getFileInfo {src: "/var/log/spark/apps/application_1470897979038_0002"} 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10768 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10768 
    16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: getFileInfo took 1ms 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: getFileInfo {} 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Call -> ip-10-102-224-120.ec2.internal/10.102.224.120:8020: rename {src: "/var/log/spark/apps/application_1470897979038_0002.inprogress" dst: "/var/log/spark/apps/application_1470897979038_0002"} 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop sending #10769 
    16/08/11 09:56:09 DEBUG Client: IPC Client (461299828) connection to ip-10-102-224-120.ec2.internal/10.102.224.120:8020 from hadoop got value #10769 
    16/08/11 09:56:09 DEBUG ProtobufRpcEngine: Call: rename took 2ms 
    16/08/11 09:56:09 TRACE ProtobufRpcEngine: 46: Response <- ip-10-102-224-120.ec2.internal/10.102.224.120:8020: rename {result: true} 
    16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Shutting down all executors 
    16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Interrupting monitor thread 
    16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down 
    16/08/11 09:56:09 DEBUG AbstractService: Service: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state STOPPED 
    16/08/11 09:56:09 DEBUG Client: stopping client from cache: [email protected] 
    16/08/11 09:56:09 INFO YarnClientSchedulerBackend: Stopped 
    16/08/11 09:56:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
    16/08/11 09:56:09 INFO MemoryStore: MemoryStore cleared 
    16/08/11 09:56:09 INFO BlockManager: BlockManager stopped 
    16/08/11 09:56:09 INFO BlockManagerMaster: BlockManagerMaster stopped 
    16/08/11 09:56:09 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
    16/08/11 09:56:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 
    16/08/11 09:56:09 INFO SparkContext: Successfully stopped SparkContext 
    16/08/11 09:56:09 INFO ShutdownHookManager: Shutdown hook called 
    16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983/pyspark-f6c4c7f7-f6e5-4dcf-a9cd-cf03391413d9 
    16/08/11 09:56:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 
    16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983 
    16/08/11 09:56:09 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-fb5326d4-f089-4aff-b394-bc126f12a983/httpd-9104e927-bd6d-4338-bd20-fd01b7d3ce7f 
    16/08/11 09:56:09 DEBUG Client: stopping client from cache: [email protected] 

答えて

0

ドライバがオンになっている、すなわち、あなたはおそらく、糸クライアントモードでプログラムを実行していますサブミットホスト。

ログファイルを見ると、あなたは、クライアントがシャットダウンを取得していることがわかります:あなたのセッションが終了したため

Invoking stop() from shutdown hook 

はおそらく、外側のシェルによって呼び出されます。バックグラウンドでジョブを送信しても、プロセスはそのセッションである親プロセスに関連付けられているため、これを防ぐことはできません。

lines.saveAsTextFiles('/tmp/') 

は、私は次のことを示唆している:

はそれに加えて、あなたはあなたが既にHDFSに同じレコードを収集しているため、特にあなたがやるべきではない、あなたの結果を、収集するために、コンソール出力を使用しますこれを修正してください:

a)クラスタモードで実行してください。 --deploy-mode clusterをパラメータ に追加してください。b)出力を収集したい場合は、nohupをspark-submitの前に追加します。 nohupはプロセスを親プロセスから切り離します。

関連する問題