2016-09-12 6 views
5

TL;私はPySparkアプリケーションでDStreamのように見えます。 ScalaライブラリにDStream[String]として送信したいと思います。しかし、文字列はPy4jによって変換されません。ScSparでPySpark RDDを変換する

私はSpark Streamingを使用してKafkaからデータを取得するPySparkアプリケーションを開発しています。私のメッセージは文字列で、Scalaコードでメソッドを呼び出すと、DStream[String]というインスタンスが渡されます。しかし、私はScalaコードに適切なJVM文字列を受け取ることができません。 Pythonの文字列はJava文字列に変換されず、代わりにシリアル化されているように見えます。

私の質問は、DStreamオブジェクトからJava文字列を取得する方法ですか?私はそれを私のJARへのパスを渡し、PySparkでこのコードを実行している

from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) 

from pyspark.streaming.kafka import KafkaUtils 
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"}) 
values = stream.map(lambda tuple: tuple[1]) 

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) 

ssc.start() 

:オン

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar 

ここ


は私が思いついた最も簡単なPythonのコードですScala側、私は持っています:

package com.seigneurin 

import org.apache.spark.streaming.api.java.JavaDStream 

object MyPythonHelper { 
    def doSomething(jdstream: JavaDStream[String]) = { 
    val dstream = jdstream.dstream 
    dstream.foreachRDD(rdd => { 
     rdd.foreach(println) 
    }) 
    } 
} 

いいえwは、のは、私はカフカにいくつかのデータを送信するとしましょう:私は代わりにfoo barを取得することが期待

[[email protected] 

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN 

Scalaのコードでprintlnステートメントは、のようなものを出力します。私は次のようにScalaのコードに簡単なprintln文を交換する場合

今、:

rdd.foreach(v => println(v.getClass.getCanonicalName)) 

を私が取得:

java.lang.ClassCastException: [B cannot be cast to java.lang.String 

これは、文字列は、実際にバイトの配列として渡されていることを示唆しています。

私は単純に(私は、私もエンコーディングを指定していないよ知っている)文字列にバイトのこの配列を変換しようとした場合:

 def doSomething(jdstream: JavaDStream[Array[Byte]]) = { 
     val dstream = jdstream.dstream 
     dstream.foreachRDD(rdd => { 
      rdd.foreach(bytes => println(new String(bytes))) 
     }) 
     } 

私はは(特殊文字のようながあるかもしれない見える何かを得ます削除されました):

�]qXfoo barqa. 

これは、Python文字列がシリアル化された(ピクルされた)ことを示しています。代わりに、適切なJava文字列を取得する方法はありますか?

答えて

6

ロングストーリーショートは、このようなことをするためのサポートされた方法はありません。プロダクションでこれを試してはいけません。あなたは警告されています。

一般に、Sparkはドライバでの基本的なRPC呼び出し以外にPy4jを使用せず、他のマシンでPy4jゲートウェイを起動しません。必要な場合(主にMLlibとSQLの一部)、SparkはPyroliteを使用して、JVMとPythonの間で渡されるオブジェクトをシリアライズします。

このAPIの部分は、プライベート(Scala)または内部(Python)であり、一般的な使用目的ではありません。理論的にしながら、あなたはどちらかのバッチごとにとにかくそれをアクセス:

package dummy 

import org.apache.spark.api.java.JavaRDD 
import org.apache.spark.streaming.api.java.JavaDStream 
import org.apache.spark.sql.DataFrame 

object PythonRDDHelper { 
    def go(rdd: JavaRDD[Any]) = { 
    rdd.rdd.collect { 
     case s: String => s 
    }.take(5).foreach(println) 
    } 
} 

完全なストリーム:DataFrames(おそらく最も邪悪なオプション)など

object PythonDStreamHelper { 
    def go(stream: JavaDStream[Any]) = { 
    stream.dstream.transform(_.collect { 
     case s: String => s 
    }).print 
    } 
} 

または個々のバッチを暴露:

object PythonDataFrameHelper { 
    def go(df: DataFrame) = { 
    df.show 
    } 
} 

と使用をこれらのラッパーは次のとおりです。

from pyspark.streaming import StreamingContext 
from pyspark.mllib.common import _to_java_object_rdd 
from pyspark.rdd import RDD 

ssc = StreamingContext(spark.sparkContext, 10) 
spark.catalog.listTables() 

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output) 
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd) 
)) 

# Reserialize and convert to JavaDStream<Object> 
# This is the only option which allows further transformations 
# on DStream 
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD( # Reserialize but keep as Python RDD 
     _to_java_object_rdd(rdd), ssc.sparkContext 
    ))._jdstream 
) 

# Convert to DataFrame and pass to Scala sink. 
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
     rdd.map(lambda x: (x,)).toDF()._jdf 
    ) 
) 

ssc.start() 
ssc.awaitTerminationOrTimeout(30) 
ssc.stop() 

これはサポートされていないため、テストされておらず、Spark APIを使用した実験以外の目的では無用です。

+1

完全に明確で非常に役立ちます。ありがとう! –

+0

私は助けることができてうれしいです。私はおそらくここで少し誇張します。あなたの目標が言語に依存しない拡張機能を構築することであれば、内部での修正を避けることはできませんが、開発者はここで意識的な意思決定を行い、それを邪魔しません。 – zero323

+0

こんにちは@ zero323私はここで同じプロセスをやっていますが、プロセス中に大きな問題を抱えている、私はkerka kafkaと私のpythonアプリケーションを通信するオブジェクトを作成します。しかし、オブジェクトを作成すると、sparkのjvmはオブジェクト内で自分の関数を見つけることができません。クラスを作成すると、そのクラスが見つかります。しかし、エラーのためにrddオブジェクトを送ることはできません: 'pyKafka([org.apache.spark.api.java.JavaRDD、class java.lang.String])does not exist'私はその手順に従っています。何が問題なの? –

関連する問題