ストリーミング環境で、FlinkのテーブルAPIおよび/またはFlinkのSQLサポート(Flink 1.3.1、Scala 2.11)を使用しています。私はDataStream[Person]
を始めている、とPerson
はのように見えるケースクラスである:私は絵にattributes
を持って開始するまで、すべてが期待どおりに動作しているFlickテーブルAPIとSQLとマップタイプ(Scala)
Person(name: String, age: Int, attributes: Map[String, String])
。例えば
:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
...につながる:
org.apache.flink.table.api.TableExceptionスレッドの例外 "メイン":タイプがサポートされていません。ANY at org.apache.flink.table.api.TableException $ .apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory $ .toTypeInfo(FlinkTypeFactory.scala:341) at org。 apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:531) at org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:530) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234) at scala.collection.Iterator $ class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach( Iterator.scala:1336) scala.collection.IterableLike $ class.foreach(IterableLike.scala:72) scala.collection.AbstractIterable.foreach(Iterable.scala:54) scala.collection.TraversableLike $ classにあります。 map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apa che.flink.table.plan.logical.LogicalRelNode。(operators.scala:530) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503) at com.nordstrom.mdt.Job $の.main(Job.scala:112)com.nordstrom.mdt.Job.main(Job.scala)で
注:このエラーは、特定のマップキーが存在するか否かを生じます。また、私がでなければ、はマップキーを全く指定していないので、私は違うエラーが発生します。そのシナリオはここでプレイされていません。
このPR は、のように見えますが、転送先はhttps://github.com/apache/flink/pull/3767です。 test caseを具体的に見ると、DataSetsで型情報が可能であることが示唆されています。関連する方法のいずれも、fromDataStream
およびregisterDataStream
は、型情報を提供する方法を提供していません。
はこれが可能ですか?言い換えれば、StreamsのサポートマップでSQLをFlinkできますか?編集の明確化
... がマップキー(GROUP BY ... attributes
ではなくattributes['foo']
)を省略する場合、私は以下のエラーを取得します。これは、ランタイムがこれらが文字列であることを認識していることを示します。
この型(scala.collection.immutable.Map [scala。Tuple2(_1:String、_2:String)])をキーとして使用することはできません。
は理にかなっています。ありがとうございました。 読者には、これを行う方法についてhttps://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html#scalar-functionsに詳しく説明しています。 – StephenWithPH
['key']はjava.util.mapで動作しますか? Mapイベントを使用しています。 ordereventからイベント['abc']を選択します。それでもエラーの例外が表示されます。タイプはサポートされていません:ANY。私はflink 1.3.1を使用しています。私は何かが欠けていますか?あなたはそれが動作する例に私を指摘できますか?ありがとう。 –
thisisananth
@thisisananth再現するいくつかの例を表示できますか?それがバグであれば、私はそれを修正しようとします。 – twalthr