2017-11-30 1 views
0

私は単一のStringフィールドを抽出するAvroオブジェクトのDataStreamを持つ簡単なプログラムを持っています。私はDataStreamTableに変換し、簡単な投影法でクエリを実行します。Apache Flink:InvalidProgramException:テーブルプログラムをコンパイルできません

val kinesisConsumer = new FlinkKinesisConsumer(streamName, new UnifiedEventDeserializationSchema, consumerConfig) 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
implicit val typeInfo = TypeInformation.of(classOf[UnifiedEvent]) 
val kinesisStream = env.addSource(kinesisConsumer) 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
tableEnv.registerDataStream("table1", kinesisStream); 
val query = "SELECT nd_key FROM table1" 
val result = tableEnv.sql(query) 
tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() 
env.execute() 

私はプログラムを実行すると、私は次の例外を取得:

2017年11月29日16時07分36秒ソース:カスタムソースを - から>: (accepted_cohort_id、ADMIN_ID、after_submission 、によってAmount_Paid、 anonymous_id、APPLICATION_ID、atom_key、bd_group_key、biz_geo、 braavos_purchase_id、カテゴリ、cohort_id、concept_key、c​​oncept_rank、 コンテキスト、context_campaign、context_experiment、COUPON_CODE、 course_key、c​​ourse_rank、cta_destination、cta_location、cta_message、 cta_type、通貨、decision_group_id、device_browser、device_os、 device_os_version、DEVICE_TYPE、持続時間、EVALUATION_ID、EVENT_TYPE、 fin_geo、in_collaboration_with、lab_id、lab_rank、ラベル、lesson_key、 lesson_rank、ロケール、max_pause_duration、メッセージ、MESSAGE_ID、 module_key、module_rank 、nd_key、nd_unit_id、nd_unit_rank、 new_cohort_id、notification_id、num_concepts_completed、num_lessons_completed num_interactions、old_cohort_id、PART_KEY、 part_rank、pause_duration、pause_reason、payment_plan、 payment_provider、points_earned、points_possible、価格、price_sheet、 PRODUCT_KEY、PRODUCT_TYPE、provider_charge_id、 provider_refund_id、 quiz_type、referrer、ref und_amount、requested_cohort_id、その結果、 scholarship_group_key、SEARCH_TERM、skill_level、subscription_id、 suspension_length、suspension_reason、技術、タイムスタンプ、 total_concepts、total_lessons、total_time_sec、タイプ、unenroll_reason、 のuser_id、user_locale、USER_RESPONSE、バリアント、バージョン、WORKSPACE_ID、 workspace_session 、workspace_type) - > select:(nd_key) - > to:Utf8 - > シンク:無名(5/8)がFAILEDに切り替わりました org.apache.flink.api.common.InvalidProgramException:テーブルプログラム はコンパイルできません。これはバグです。問題を提出してください。 でorg.apache.flink.table.codegen.Compiler $ class.compile(Compiler.scala:36) でorg.apache.flink.table.runtime.CRowOutputMapRunner.compile(CRowOutputMapRunner.scala:33) で で org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)で :org.apache.flink.table.runtime.CRowOutputMapRunner.open(48 CRowOutputMapRunner.scala) org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) で でorg.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) org.apache.flink.s treaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) (org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)at )java.lang.Thread.run(Thread。 java:748)原因: org.codehaus.commons.compiler.CompileException:行790、列15: "java.lang.CharSequence"タイプから割り当てを変換できません "org.apache.avro.util"と入力してください。 org.codehaus.janino.UnitCompiler.assignmentConversionで org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) (UnitCompiler.java:10528)ORGで でUTF8" 。codehaus.janino.UnitCompiler.compile2 org.codehaus.janino.UnitCompiler $ 6.visitLocalVariableDeclarationStatementで(UnitCompiler.java:2534) org.codehaus.janino.UnitCompiler.access $ 2600(UnitCompiler.java:212) で(UnitCompiler。 Javaの:1459) org.codehaus.janino.UnitCompiler $ 6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1443) でorg.codehaus.janino.Java $ LocalVariableDeclarationStatement.accept(Java.java:3348) でorg.codehausでorg.codehaus.janino.UnitCompiler.compile でorg.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1523)で.janino.UnitCompiler.compile(UnitCompiler.java:1443) (UnitCompiler.java:3052) org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1313)でorg.codehausで org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1286)で .janino.UnitCompiler.compile2(UnitCompiler.java:785)org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)で $ 400から org.codehaus.janino.UnitCompiler.access(UnitCompiler.java:212)で org.codehaus.janino.UnitCompiler $ 2.visitPackageMemberClassDeclaration(UnitCompiler.java:385) で org.codehaus.janino.UnitCompiler $ 2.visitPackageMemberClassDeclaration(UnitCompiler.java:390) で org.codehaus.janino.UnitCompiler.compileUnitでorg.codehaus.janino.UnitCompiler.compileでorg.codehaus.janino.Java $ PackageMemberClassDeclaration.accept(Java.java:1405) (UnitCompiler.java:385) ( org.codehaus.janino でorg.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)でorg.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)でUnitCompiler.java:357) .SimpleCompiler.cook org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)で org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)で(SimpleCompiler.java:213)でに org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)org.apache.flink.table.codegen.Compiler class.compile $(Compiler.scala:33) ... 8は

答えて

0

複数のオブジェクトのnd_keyアブロタイプのフィールドが処理されると、このようなjava.lang.CharSequence SQLクエリによって。クエリの結果がDataStream[Utf8]に変換されるはずのQUE toAppendStream[org.apache.avro.util.Utf8]を呼び出すことによって

は、あなたが要求します。しかし、FLINKは自動的にCharSequenceUtf8に変換することはできません。

toAppendStream[java.lang.CharSequence]toAppendStream[org.apache.avro.util.Utf8]を変更するようにしてください。使用しているバージョン

FLINK?

関連する問題