consumer

    0

    2答えて

    私はBlockingQueueでConsumer-Producerの問題を実装しようとしていました。何らかの目的でこれを行うために、私はファイル検索ツールを書くことにしました。 私は、検索メカニズムが再帰的に動作していると判断し、すべての新しいディレクトリに検索の速度を上げるための新しいスレッドプールが追加される予定です。 私の問題は、最終的にスレッドを検索する(消費者)を停止するメカニズムを実装

    0

    1答えて

    Spark StreamingとKafkaを使用して、Webサーバーから受信したメッセージを取り込み、処理しようとしています。 https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.mdに記載されている消費者をテストして、追加機能を利用しています。 最初のステップとして、どのように再生されているかを確認するために提供

    0

    1答えて

    私はSpringブートを介してREST APIに単一の文字列を送信しようとしていますが、私は400:Bad Requestを取得し続けています。私は、このJSONは、APIによって受け入れられていることを郵便配達で確認: { "currency": "USD" } 私は、このサービスに投稿する次のコードを書いた: public enum Currency { USD,

    0

    2答えて

    私はKafkaストリームを使用しており、Javaから最初のコンシューマオフセットをリセットしたいと考えています。 KafkaConsumer.seekToBeginning(...)は行うには正しいことのように聞こえるが、私はカフカストリームで作業: KafkaStreams streams = new KafkaStreams(builder, props); ... streams.sta

    1

    1答えて

    public class RequestConsumer : IConsumer<StartFlowCommand>, IConsumer<List<StartAndNextCommand>> { readonly IWorkFlowHandler _flowHandler; public RequestConsumer(IContainer contai

    1

    1答えて

    私は消費者グループのカフカからのポーリング・メッセージに問題があります。 マイコンシューマオブジェクトは self.ps = TopicPartition(topic, partition) で特定のパーティションに割り当て、その後、消費者はそのパーティションに割り当てる:私は でパーティション内のメッセージをカウントすることができる午前その後 self.consumer.assign([se

    0

    1答えて

    こんにちは、私はKAFKAを学び、私の遠隔ポーラー/消費者に問題を抱えています。 私はKAFKAをプライベートおよびパブリックIPを使用してAWS EC2インスタンスにセットアップしました。私のserver.propertiesは次のようになります。 リスナー= PLAINTEXT://172.31.31.58:9092 #AWSプライベートIP advertised.listeners = PL

    1

    1答えて

    activemqのキューを使用して、仮想トピックからメッセージをデキューしようとしています。私はいくつかのメッセージを送信しようとしましたが、 "メッセージがエンキューされました"の下のトピックに表示されていますが、消費することはできません。私が作成した 仮想トピック名はVirtualTopic.AAだったと消費者がConsumer.client1.VirtualTopic.AAと呼ばれています。

    0

    1答えて

    、私は、Apacheラクダにワイルドカードを使用していますし、私はこのようなルートビルダー定義されています(「:キュー:。* processQueue ActiveMQの」)から を.bean(beanOne、 "someMethod"); 私はメッセージに "{}ユニークID .processQueue" キューを送信されるメッセージを送信している間私はbeanOneのUNIQUEID内部のso

    0

    1答えて

    カフカ消費者ポップapiが低タイムアウトに記録を戻していません。 ポーリングでタイムアウト値を大きくすると、レコードが送信されます。 私はこの論理を理解することができません。次のコード、助けてください:ポーリングするタイムアウト(時間切れ)として指定された期間に発表された新しい消費されていないメッセージがない場合 public ConsumerRecords<String, Map<String,