私はIDEからflinkを実行しています。照会可能で、データを格納することで、作業 何とか、私はそれを照会するとき、それは例外Flinkクエリ可能状態が動作しない
Exeception
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)])
を投げる私のコードされている:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
は、ローカルホストで実行されます:6123 は、接続に問題があります。別のクラスターで仕事を提出する必要がありますか?私は産卵おりません新しいミニクラスタまたはクラスタ のようにhttps://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java のように私は同じクラスタでこれをsamにしたいenv.executeで実行されるメインアプリケーションとしてのenvrionment。そのステップは必要ですか? deaultのFLINKによってドキュメントから
もjobmanagerが実行されている場所を知るための方法があります。私はapiを見つけることができません –
どのようにあなたの仕事を提出していますか?仕事の提出のログを共有できますか? –
私はIDEから仕事をしていますが、私はあなたがIDE.I tworks with yarn/clusterモードで稼働しているときにジョブマネージャに接続する方法がないと思っています –