2017-02-11 1 views
1

私は、ActorProxyを使って状態を尋ねるXアクターを反復しているサービスを実行しています。他のメソッドが完了するのを待つことなく、サービスファブリックアクターから状態を取得する方法は?

このサービスは、覚え書きのコールバックから俳優の他の長時間実行中のメソッドを待ってブロックされていません。

以下の簡単な例のGetState()を呼び出す方法があります。これは、リマインダが実行されていることをブロックせずに、メソッドが正しい方法で完了できるようにします。

class Actor : IMyActor{ 

public Task<MyState> GetState() => StateManager.GetAsync("key") 
} 

alterntive。

サービスを呼び出すための適切な方法は何ですか.5秒以内に返信がない場合は、ただと思います。

var proxy = ActorProxy.Create<IMyActor(); 
var state = await proxy.GetState(); // this will wait until the actor is ready to send back the state. 
+0

アクターで長時間実行されるアクションを実行しないでください。すぐに応答できるはずです。 - 長時間実行するアクションが必要な場合は、実行したときに親に通知する子アクターにスピンオフします。 - そのようにして、親の俳優は反応し続けることができます。 – BrainSlugs83

答えて

3

現在ブロック方法を実行しているアクターに対してもアクター状態を読み取ることができます。アクターはIActorStateManagerを使用して状態を保存し、次にIActorStateProviderを使用します。 IActorStateProviderActorServiceごとに1回インスタンス化されます。各パーティションは、アクターのホスティングと実行を担当するActorServiceをインスタンス化します。アクターサービスは、コアであるStatefulService(または、通常のステートフルサービスで使用される基本クラスのStatefulServiceBase)です。これを念頭に置いて、通常のサービス、つまりIServiceに基づくサービスインターフェイスで作業するのと同じ方法で、私たちの俳優に対応するActorServiceで作業できます。

IActorStateProvider(あなたが永続状態を使用している場合KvsActorStateProviderによって実装)私たちが使用できる2つの方法があります。これらのメソッドへ

Task<T> LoadStateAsync<T>(ActorId actorId, string stateName, CancellationToken cancellationToken = null); 
Task<PagedResult<ActorId>> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken); 

を呼び出しますが、これらがあるので、理にかなっている、俳優のロックの影響を受けませんパーティション上のすべてのアクターをサポートするように設計されています。

例:

public interface IManyfoldActorService : IService 
{ 
    Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken); 
} 

public class ManyfoldActorService : ActorService, IManyfoldActorService 
{ 
    ... 
} 

Program.Mainに新しいActorServiceを登録します:

カスタムActorServiceを作成し、俳優をホストするために1つを使用し

ActorRuntime.RegisterActorAsync<ManyfoldActor>(
    (context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult(); 

を私たちは持っていると仮定すると、次のメソッドを持つシンプルなActor:

Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken) 
    { 
     Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult(); 
     var task = this.StateManager.SetStateAsync("count", count, cancellationToken); 
     ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}"); 
     return task; 
    } 

それは30秒間待機し(、メソッド呼び出しをブロック、長時間実行をシミュレートするために)、次いでintに状態値"count"を設定します。

私たちは今、いくつかの状態データを生成するために俳優のためのSetCountAsyncを呼び出すことができます別のサービスで:

protected override async Task RunAsync(CancellationToken cancellationToken) 
    { 
     var actorProxyFactory = new ActorProxyFactory(); 
     long iterations = 0; 
     while (true) 
     { 
      cancellationToken.ThrowIfCancellationRequested(); 
      iterations += 1; 
      var actorId = iterations % 10; 
      var count = Environment.TickCount % 100; 
      var manyfoldActor = actorProxyFactory.CreateActorProxy<IManyfoldActor>(new ActorId(actorId)); 
      manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false); 
      ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}"); 
      await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken); 
     } 
    } 

この方法は、単純に延々と俳優の値を変更するループ。 (合計10人のアクターと、3秒間の遅延と30秒間のアクター間の相関関係に注意してください。ロックを待っているアクターコールの無限の蓄積を防ぐためにこの方法を単に設計しています)。各呼び出しはまた、火災と忘れて実行されるので、次の俳優の状態が更新される前にそれを更新することができます。その愚かなコードは、理論を証明するためにこのように設計されています。

今の俳優サービスで、我々は方法このようなGetCountsAsync実装できます

public async Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken) 
    { 
     ContinuationToken continuationToken = null; 
     var actors = new Dictionary<long, int>(); 

     do 
     { 
      var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken); 

      foreach (var actor in page.Items) 
      { 
       var count = await this.StateProvider.LoadStateAsync<int>(actor, "count", cancellationToken); 
       actors.Add(actor.GetLongId(), count); 
      } 

      continuationToken = page.ContinuationToken; 
     } 
     while (continuationToken != null); 

     return actors; 
    } 

をこれは(そのパーティションの)すべての既知のアクターを照会するために基礎となるActorStateProviderを使用して、直接、このために状態を読み込み、アクタのメソッド実行によってブロックされないようにします。

最後のピース、私たちのActorServiceを呼び出し、すべてのパーティション間でGetCountsAsyncを呼び出すことができ、いくつかの方法:D秒それは状態が更新され、ここで各だ取得します。今、私たちはすべての33の10人の俳優を与える

public IDictionary<long, int> Get() 
    { 
     var applicationName = FabricRuntime.GetActivationContext().ApplicationName; 
     var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}"; 
     var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}"); 

     var fabricClient = new FabricClient(); 
     var partitions = new List<long>(); 
     var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult(); 
     foreach (var servicePartition in servicePartitionList) 
     { 
      var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation; 
      partitions.Add(partitionInformation.LowKey); 
     } 

     var serviceProxyFactory = new ServiceProxyFactory(); 

     var actors = new Dictionary<long, int>(); 
     foreach (var partition in partitions) 
     { 
      var actorService = serviceProxyFactory.CreateServiceProxy<IManyfoldActorService>(actorServiceUri, new ServicePartitionKey(partition)); 

      var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult(); 
      foreach (var count in counts) 
      { 
       actors.Add(count.Key, count.Value); 
      } 
     } 
     return actors; 
    } 

このコードを実行します俳優は毎回30秒忙しいです。アクターサービスは、各アクターメソッドが返されると直ちに更新された状態を認識します。

このサンプルでは省略されているものがいくつかあります。たとえば、アクターサービスで状態をロードしたときなど、タイムアウトを防ぐ必要があるかもしれません。

+0

恐ろしい答え:) –

+0

うれしい私はあなたを助けることができる、それがあなたのためにうまくいくことを願って! – yoape

0

これを行う方法はありません。アクターはシングルスレッドです。彼らが任意のアクターメソッドの中で完了するのを待っている、長い実行中の作業を行っている場合、外部からのメソッドを含む他のメソッドは、待たなければなりません。

+0

以前のsdkバージョンではReadOnly属性がいくつかありませんでしたか?与えられた俳優のためにstatemanagerを取得し、待たずに状態を調べる方法はありますか?私は私が欲しいものを解決する必要があると確信していますか? –

+0

Readonlyは以前のリリースでは存在しましたが、メソッドが終了した後に状態を複製する必要があり、単一のスレッドロックセマンティクスに影響を与えないかどうかのヒントでした。 – masnider

関連する問題