2017-11-12 1 views
0

私はgoogle pubsubを使用して、サブスクライブされたトピックからメッセージを非同期にプルしています。Google pubsub:プルするJavaサンプルクライアントを掛ける

私は以下のサンプルコードを使用しています:それは数分間(メッセージを摂取)正常に動作し、メッセージのこれ以上の摂取で立ち往生

public void startStreaming() { 
     Subscriber subscriber = null; 
     try { 
      SubscriptionName subscription = SubscriptionName.of("topic-test", 
        "subscription-id-1234"); 
      ExecutorProvider executorProvider = 
        InstantiatingExecutorProvider.newBuilder() 
          .setExecutorThreadCount(1) 
          .build(); 

      MessageReceiver receiver = 
        new MessageReceiver() { 
         @Override 
         public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { 
          try { 
           String msg = message.getData().toStringUtf8(); 
           System.out.println(msg); 
          } catch (Exception e) { 
           throw new RuntimeException("Failed to pull messages from topic"); 
          } 
          consumer.ack(); 
         } 
        }; 
      subscriber = Subscriber 
        .newBuilder(subscription, receiver) 
        .setExecutorProvider(executorProvider) 
        .build(); 
      subscriber.addListener(
        new Subscriber.Listener() { 
         @Override 
         public void failed(Subscriber.State from, Throwable failure) { 
          // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down. 
          logger.error("Subscriber is not able to hook on google pubsub topic", failure); 
         } 
        }, 
        MoreExecutors.directExecutor()); 
      subscriber.startAsync().awaitRunning(); 
      // why should I put sleep here?? 
      Thread.sleep(60000); 
     } catch (Exception e) { 
     } finally { 
      if (subscriber != null) { 
       subscriber.stopAsync().awaitTerminated(); 
      } 
     } 
    } 

、私は再起動かつてのようにメッセージがあることを確信しています引っ張るクライアントは、それは正常にデータを読み取ります。

Nov 12, 2017 7:33:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection sendAckOperations 
WARNING: failed to send acks 
java.lang.IllegalStateException: call was cancelled 
    at com.google.common.base.Preconditions.checkState(Preconditions.java:444) 
    at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:405) 
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) 
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52) 
    at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:286) 
    at com.google.cloud.pubsub.v1.StreamingSubscriberConnection.sendAckOperations(StreamingSubscriberConnection.java:274) 
    at com.google.cloud.pubsub.v1.MessageDispatcher.processOutstandingAckOperations(MessageDispatcher.java:602) 
    at com.google.cloud.pubsub.v1.MessageDispatcher.access$2100(MessageDispatcher.java:56) 
    at com.google.cloud.pubsub.v1.MessageDispatcher$AckDeadlineAlarm.run(MessageDispatcher.java:529) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

そしてExecutorProvider executorProviderを使用していないとき、私は以下のログ

2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND WINDOW_UPDATE: streamId=3 windowSizeIncrement=1048576 
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-4] DEBUG io.grpc.netty.NettyClientHandler [id: 0x330498ce, L:/10.8.17.4:47346 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes= 
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes= 
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 3 thread-local buffer(s) from thread: threadDeathWatcher-1-1 
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 1 thread-local buffer(s) from thread: threadDeathWatcher-1-1 
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-3] DEBUG io.netty.buffer.PoolThreadCache Freed 153 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-3 
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-4] DEBUG io.netty.buffer.PoolThreadCache Freed 131 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-4 

あなたが助けてもらえを得ましたか。それが今で正常に動作している

答えて

0

は、

lines, 

//   if (subscriber != null) { 
//    subscriber.stopAsync().awaitTerminated(); 
//   } 

説明の下に削除することでした、ここでこのgithubのthread 1827

であり、それは本当に意味をなす、何不足していると、Google、クラウドでより多くのドキュメントがありますdocsは、githubスレッドで提供されているコメントについての詳細を提供します。

編集: Githubのチケットは、より多くのドキュメントを追加するために作成し、以下閉じている: 2616

関連する問題