2016-11-22 1 views
0

Apache Flinkで単純なhello world型プログラムを実行しようとしています。コードはApache Kafkaからのメッセージを受け取り、 "。"各文字の後に新しい文字列をstdoutに出力します。コードはKafkaからメッセージを正しく取得しますが、マップ関数は "。"を追加します。失敗する。 REPLプロンプトでこの関数を試してみましたが、スカラーコードが正しく機能しています。 Scalaのコード:スカラとApache flinkの新機能、なぜマップ関数がREPLを正しく実行するのですか?Flinkで失敗します

scala> input = "hello" 
input: String = hello 
scala> val output = input.flatMap(value => value + ".") 
output: String = h.e.l.l.o 

FLINKプログラム: flink code オフカットラインは

val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties)) 

私は私が間違っているつもりだところ、私は無駄にApacheのマニュアルを試してみた把握カントを読み込みます。あなたが私に与えることができるどんな助けもうまく受け取ります。

+0

エラーが何であるかを私たちに語った場合、それが役立つだろう。 cmd-lineでコンパイルしようとしましたか? Scala IDEは誤ったエラーを出すことが知られています。 – pedrofurla

+0

また、flatMapではなく、messageStreamでマップを使用したいと思うかもしれません。 flatMapは 'value'がcharと' + 'であるため 'input'サンプルで動作します。' 'はs Stringに変換します。 – pedrofurla

+0

エラーメッセージにパラメータタイプがありません。私はそれがIDEかもしれないと思ったが、私はコードをデプロイするのを妨げているmavenのコードをコンパイルしようとしたときに同じエラーが出る。フラットマップは両方の環境で同じように動作しないのでしょうか?文字列を一度に1文字ずつ見る? –

答えて

0

私はそれら述べた機能のすべてがコレクションに適用マップ/ flatMap /などを減らす

のような関数型プログラミングと操作に関するいくつかの基本を勉強推薦まず第一に。 @pedrofuriaが指摘したように、あなたのScalaの例では、FLINK例messageStreamchar

のコレクションですStringからflatMap機能は、あなたがするべきで説明した動作を実行するので、文字列のコレクションとして抽象化することができます適用されますSTH好きですか:

val stream = messageStream.map(str => str.mkString(".")) 

を私はあなたの例からではなくflatMapmkStringを使用したため、元のではなく

h.e.l.l.o(あなたが書いたように)

それは

h.e.l.l.o.を生成します

しかし、もう少し実際には、機能プログラミングの基礎から始めます。

0

助けてくれてありがとう、私は問題を分かった。 flatmap関数はスカラプロンプトで動作しますが、Flinkは、FlatMapにオーバーライドのある新しいFlatMapFunctionを渡す必要があるため、適切なFlinkでは機能しません。なぜそれがflinkでscalaプロンプトで動作するのかまだ分かりませんが、コードはコンパイルされ、期待通りに実行されます。

0

フライングプログラムのflatMapとscalaプロンプトのflatMapが同じではないためです。

scalaプロンプトのflatMapは単なるスカラーの関数です。入力は以下の好きながら

FLINK flatMap

を適用することができる。

val input = benv.fromElements(
      "To be, or not to be,--that is the question:--", 
      "Whether 'tis nobler in the mind to suffer", 
      "The slings and arrows of outrageous fortune", 
      "Or to take arms against a sea of troubles,") 
val counts = input 
      .flatMap { _.toLowerCase.split("\\W+") } 
      .map { (_, 1) }.groupBy(0).sum(1) 

参照:scala-shell

関連する問題