TL;私はPySparkアプリケーションでDStreamのように見えます。 ScalaライブラリにDStream[String]
として送信したいと思います。しかし、文字列はPy4jによって変換されません。ScSparでPySpark RDDを変換する
私はSpark Streamingを使用してKafkaからデータを取得するPySparkアプリケーションを開発しています。私のメッセージは文字列で、Scalaコードでメソッドを呼び出すと、DStream[String]
というインスタンスが渡されます。しかし、私はScalaコードに適切なJVM文字列を受け取ることができません。 Pythonの文字列はJava文字列に変換されず、代わりにシリアル化されているように見えます。
私の質問は、DStream
オブジェクトからJava文字列を取得する方法ですか?私はそれを私のJARへのパスを渡し、PySparkでこのコードを実行している
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
:オン
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
ここ
は私が思いついた最も簡単なPythonのコードですScala側、私は持っています:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
いいえwは、のは、私はカフカにいくつかのデータを送信するとしましょう:私は代わりにfoo bar
を取得することが期待
[[email protected]
:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Scalaのコードでprintln
ステートメントは、のようなものを出力します。私は次のようにScalaのコードに簡単なprintln
文を交換する場合
今、:
rdd.foreach(v => println(v.getClass.getCanonicalName))
を私が取得:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
これは、文字列は、実際にバイトの配列として渡されていることを示唆しています。
私は単純に(私は、私もエンコーディングを指定していないよ知っている)文字列にバイトのこの配列を変換しようとした場合:
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
私はは(特殊文字のようながあるかもしれない見える何かを得ます削除されました):
�]qXfoo barqa.
これは、Python文字列がシリアル化された(ピクルされた)ことを示しています。代わりに、適切なJava文字列を取得する方法はありますか?
完全に明確で非常に役立ちます。ありがとう! –
私は助けることができてうれしいです。私はおそらくここで少し誇張します。あなたの目標が言語に依存しない拡張機能を構築することであれば、内部での修正を避けることはできませんが、開発者はここで意識的な意思決定を行い、それを邪魔しません。 – zero323
こんにちは@ zero323私はここで同じプロセスをやっていますが、プロセス中に大きな問題を抱えている、私はkerka kafkaと私のpythonアプリケーションを通信するオブジェクトを作成します。しかし、オブジェクトを作成すると、sparkのjvmはオブジェクト内で自分の関数を見つけることができません。クラスを作成すると、そのクラスが見つかります。しかし、エラーのためにrddオブジェクトを送ることはできません: 'pyKafka([org.apache.spark.api.java.JavaRDD、class java.lang.String])does not exist'私はその手順に従っています。何が問題なの? –