2017-12-18 8 views
2

カフカからS3にメッセージを読み込もうとしているときに、Kafka接続クラスパスにjarを追加する際に問題があります。Kafkaカスタムtimestamp.extractorに接続

目標は、カフカメッセージのキーの一部であるタイムスタンプに基づいてパーティションにメッセージを書き込むことです。

ストーリーを短くするには、カスタムタイムスタンプ抽出器を提供する必要があります。ドキュメントhereの後にTimestampExtractorインターフェイスを実装し、plugin.pathプロパティにJARの場所を追加したクラスを作成しました。

問題は、接続を開始するときにクラスが見つからないということです。どういうわけか、jarファイルはクラスパスにはないと私は

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor 

は、追加データ取得しています:

バージョン:接続スタンドアロン

:コンフルエント4.0.0

を接続します

開始コマンド:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

4月補助が必要です。

答えて

1

あなたのS3のコネクタに使用可能なカスタムタイムスタンプ抽出クラスを作成するには、次のものが必要です。

  • は、他のコネクタの依存関係と一緒にカスタムクラスを持つjarファイルを追加します。例:./share/java/kafka-connect-s3

    保存あなたは、これが すべてのストレージシンクコネクタ(現在はS3とHDFSのコネクタ)が利用できるようにするだけS3のコネクタで、または ./share/java/kafka-connect-storage-commonで利用可能 ようにしたい場合。

  • カスタムクラスがio.confluent.connect.storage.partitioner.TimestampExtractorインターフェイスを実装していることを確認してください。
  • timestamp.extractorプロパティをコネクタの構成に設定したときに、完全に修飾されたクラス名を使用します。もちろん、定義しパッケージ化したパッケージと一致するようにしてください。例えば:

    timestamp.extractor=me.connectors.MyTimestampExtractor

最後に、ご使用のコネクターへのカスタムパーティショナーを利用できるように同様のプロセスをたどるだろう。

+0

魅力として働いています –

+0

これはおかげで、 'plugin.path'がカスタムエクストラクタやパーティショナーを読み込むのがなぜ機能しないのかを明確にすることはできますか? –

+0

あなたはそれが_働いていないと言うと、それは 'plugin.path'の別のディレクトリからロードされないことを意味しますか?なぜなら、パーティショナークラスもタイムスタンプ・エクストラクタークラスも(少なくとも現時点では)Connectプラグインとはみなされないからです。接続プラグインは、コネクタ、トランスフォーム、コンバーターです。 –

関連する問題