2017-12-09 1 views
1

データをCSVファイルで送信できます。まず、乱数をCSVファイルに書き込んで送信しますが、直接送信することは可能ですか? 私のソケットコード:Python - 整数または文字列をスパークストリーミングに送信

import socket 
host = 'localhost' 
port = 8080 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
s.bind((host, port)) 
s.listen(1) 
while True: 
    print('\nListening for a client at',host , port) 
    conn, addr = s.accept() 
    print('\nConnected by', addr) 
    try: 
     print('\nReading file...\n') 
     while 1: 
      out = "test01" 
      print('Sending line', line) 
      conn.send(out) 
    except socket.error: 
     print ('Error Occured.\n\nClient disconnected.\n') 
conn.close() 

スパークストリーミングコード:

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]","deneme") 
ssc = StreamingContext(sc, 10) 
socket_stream = ssc.socketTextStream("localhost",8080) 

random_integers = socket_stream.window(30) 

digits = random_integers.flatMap(lambda line: line.split(" ")).map(lambda digit: (digit, 1)) 

digit_count = digits.reduceByKey(lambda x,y:x+y) 
digit_count.pprint() 

ssc.start() 

答えて

0

これは、ソケットブロックがデータを送信しているためで、決してが上に移動します。最も基本的なソリューションは、データのいくつかの量を送信し、接続を閉じることです。

import socket 
import time 

host = 'localhost' 
port = 50007 

i = 0 

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

s.bind((host, port)) 
s.listen(1) 

try: 
    while True: 
     conn, addr = s.accept() 
     try: 
      for j in range(10): 
       conn.send(bytes("{}\n".format(i), "utf-8")) 
       i += 1 
       time.sleep(1) 
      conn.close() 
     except socket.error: pass 
finally: 
    s.close() 

タイムアウトで何かもっと面白いチェック非ブロックモードを取得するには。

+0

これを行う方法はありますか?接続の開閉は毎回多くの時間を浪費するためです。 –

関連する問題