2017-08-02 3 views
1

ストリーミング環境で、FlinkのテーブルAPIおよび/またはFlinkのSQLサポート(Flink 1.3.1、Scala 2.11)を使用しています。私はDataStream[Person]を始めている、とPersonはのように見えるケースクラスである:私は絵にattributesを持って開始するまで、すべてが期待どおりに動作しているFlickテーブルAPIとSQLとマップタイプ(Scala)

Person(name: String, age: Int, attributes: Map[String, String]) 

。例えば

val result = streamTableEnvironment.sql(
""" 
|SELECT 
|name, 
|attributes['foo'], 
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE) 
|FROM myTable 
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo'] 
|""".stripMargin) 

...につながる:

org.apache.flink.table.api.TableExceptionスレッドの例外 "メイン":タイプがサポートされていません。ANY at org.apache.flink.table.api.TableException $ .apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory $ .toTypeInfo(FlinkTypeFactory.scala:341) at org。 apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:530) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234) at scala.collection.Iterator $ class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach( Iterator.scala:1336) scala.collection.IterableLike $ class.foreach(IterableLike.scala:72) scala.collection.AbstractIterable.foreach(Iterable.scala:54) scala.collection.TraversableLike $ classにあります。 map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apa che.flink.table.plan.logical.LogicalRelNode。(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job $の.main(Job.scala:112)com.nordstrom.mdt.Job.main(Job.scala)で

注:このエラーは、特定のマップキーが存在するか否かを生じます。また、私がでなければ、はマップキーを全く指定していないので、私は違うエラーが発生します。そのシナリオはここでプレイされていません。

このPR は、のように見えますが、転送先はhttps://github.com/apache/flink/pull/3767です。 test caseを具体的に見ると、DataSetsで型情報が可能であることが示唆されています。関連する方法のいずれも、fromDataStreamおよびregisterDataStreamは、型情報を提供する方法を提供していません。

はこれが可能ですか?言い換えれば、StreamsのサポートマップでSQLをFlinkできますか?編集の明確化

... がマップキー(GROUP BY ... attributesではなくattributes['foo'])を省略する場合、私は以下のエラーを取得します。これは、ランタイムがこれらが文字列であることを認識していることを示します。

この型(scala.collection.immutable.Map [scala。Tuple2(_1:String、_2:String)])をキーとして使用することはできません。

答えて

1

現在、Flink SQLはJava java.util.Mapのみをサポートしています。スカラマップはFlink GenericTypeInfo/SQL ANYデータ型のブラックボックスとして扱われます。したがって、これらのブラックボックスを転送してスカラー関数内で使用することはできますが、['key']演算子でアクセスすることはサポートされていません。

Javaマップを使用するか、UDFでアクセス操作を自分で実装します。

問題の問題を作成しました:https://issues.apache.org/jira/browse/FLINK-7360

+0

は理にかなっています。ありがとうございました。 読者には、これを行う方法についてhttps://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html#scalar-functionsに詳しく説明しています。 – StephenWithPH

+0

['key']はjava.util.mapで動作しますか? Map イベントを使用しています。 ordereventからイベント['abc']を選択します。それでもエラーの例外が表示されます。タイプはサポートされていません:ANY。私はflink 1.3.1を使用しています。私は何かが欠けていますか?あなたはそれが動作する例に私を指摘できますか?ありがとう。 – thisisananth

+0

@thisisananth再現するいくつかの例を表示できますか?それがバグであれば、私はそれを修正しようとします。 – twalthr