2016-08-29 86 views
2

私はIDEからflinkを実行しています。照会可能で、データを格納することで、作業 何とか、私はそれを照会するとき、それは例外Flinkクエリ可能状態が動作しない

Exeception

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)]) 

を投げる私のコードされている:

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost") 
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123") 

@throws[Throwable] 
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure) 
else { 
    // At startup some failures are expected 
    // due to races. Make sure that they don't 
    // fail this test. 
    return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() { 
    @throws[Exception] 
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey) 
    }) 
} 
} 

    @SuppressWarnings(Array("unchecked")) 
    private def getKvStateWithRetries(queryName: String, 
           keyHash: Int, 
           serializedKey: Array[Byte]): Future[Array[Byte]] = { 

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey) 
kvState.recoverWith(recover(queryName, keyHash, serializedKey)) 
    } 

def onSuccess = new OnSuccess[Array[Byte]]() { 
@throws(classOf[Throwable]) 
override def onSuccess(result: Array[Byte]): Unit = { 
    println("found record ") 
    val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer) 
    println(value) 
} 
} 


override def invoke(query: QueryMetaData): Unit = { 
println("getting inside querystore"+query.record) 
val serializedResult = flinkQuery.getResult(query.record, queryName) 
serializedResult.onSuccess(onSuccess) 

私は産卵おりません新しいミニクラスタまたはクラスタ のようにhttps://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java のように私は同じクラスタでこれをsamにしたいenv.executeで実行されるメインアプリケーションとしてのenvrionment。そのステップは必要ですか? deaultのFLINKによってドキュメントから

は、ローカルホストで実行されます:6123 は、接続に問題があります。別のクラスターで仕事を提出する必要がありますか?

+0

もjobmanagerが実行されている場所を知るための方法があります。私はapiを見つけることができません –

+0

どのようにあなたの仕事を提出していますか?仕事の提出のログを共有できますか? –

+1

私はIDEから仕事をしていますが、私はあなたがIDE.I tworks with yarn/clusterモードで稼働しているときにジョブマネージャに接続する方法がないと思っています –

答えて

1

多くのグーグルで私は解決策を見つけました。

LocalStreamEnvironmentを使用していて、同じエラーが発生しました。このスレッドが見つかるまでRemoteEnv connect failed。説明されているエラーは、別のセットアップ(ローカルではありません)ですが、テストに使用されるトピックに含まれているgistの例では、パラメータ "useSingleActorSystem"をfalseに設定してLocalFlinkMiniClusterを作成しています。

LocalStreamEnvironmentの実装を見ると、 "useSingleActorSystem"を真のに設定してMiniClusterが作成されます。

は、私は単にミニクラスターがに設定し、「useSingleActorSystem」で作成されLocalStreamEnvironmentを拡張するクラスLocalQueryableStreamEnvironmentを作成し、すべてがIDEから取り組んでいます。

は今、私のコードは次の通りである:

構成:

Configuration config = new Configuration(); 
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6); 
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); 
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue()); 
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); 
config.setString(JobManagerOptions.ADDRESS, "localhost"); 
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue()); 
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);** 

注:QueryableStateだけもっとして1を値に設定し、この設定LOCAL_NUMBER_TASK_MANAGERで動作します!

インスタンス化/環境を実行します。

LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config); 
... 
env.addSource(anySource) 
    .keyby(anyAtribute) 
    .flatmap(new UpdateMyStateToBeQueriedLaterMapper()) 
    .addSink(..); //etc 
... 
env.execute("JobNameHere"); 

をし、クライアントを作成する:詳細情報へのアクセスのために

final Configuration config = new Configuration(); 
config.setString(JobManagerOptions.ADDRESS, "localhost"); 
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue()); 

HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils 
    .createHighAvailabilityServices(
        config, 
        Executors.newSingleThreadScheduledExecutor(), 
        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION 
    ); 
return new QueryableStateClient(config,highAvailabilityServices); 

Queryable States in ApacheFlink - Implementation

Queryable State Client with 1.3.0-rc0

私の依存関係:

compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1' 
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1' 
関連する問題