2016-01-22 9 views
7

私は、Kafka 0.8から読み込んだカスタムDataFlow無制限データソースを作成しています。私はDirectPipelineRunnerを使ってローカルで実行したいと思います。私はいつでも私のカスタムソースのための評価者を登録していないとして、いくつかの理にかなってDirectPipelineRunnerでカスタムDataFlow無制限ソースを使用する

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
     at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
     at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
     at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374) 
     at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87) 
     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174) 

:しかし、私は、次のstackstraceを取得しています。

https://github.com/GoogleCloudPlatform/DataflowJavaSDKを読むと、の限定版の評価版のようです。ソースが登録されています。カスタムの無制限ソースの評価者を定義して登録するには、どのような方法が推奨されますか?

答えて

3

は現在、限定された入力のみで実行されます。私たちはこの制限を取り除くことに積極的に取り組んでおり、まもなくリリースする予定です。ところで

、あなたは自明次の例のように、withMaxNumRecordsを使用することにより、テストの目的のために、BoundedSourceに任意のUnboundedSourceを回すことができます。

UnboundedSource<String> unboundedSource = ...; // make a Kafka source 
PCollection<String> boundedKafkaCollection = 
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10)); 

は詳細についてthis issue on GitHubを参照してください。


また、カフカコネクタにはいくつかの努力があります。 our GitHub repositoryを使用して、私たちや他の貢献者と関わりたいと思うかもしれません。

+0

基本的に古いコンシューマAPIを使用している旧式のkafkaを使用しているため、コネクタに対する現在の努力はあなたにとっては役に立たないものです:-)カフカのバージョンを更新し、間もなくより標準的なコネクタです。 – bfabry

関連する問題