現在ブロック方法を実行しているアクターに対してもアクター状態を読み取ることができます。アクターはIActorStateManager
を使用して状態を保存し、次にIActorStateProvider
を使用します。 IActorStateProvider
はActorService
ごとに1回インスタンス化されます。各パーティションは、アクターのホスティングと実行を担当するActorService
をインスタンス化します。アクターサービスは、コアであるStatefulService
(または、通常のステートフルサービスで使用される基本クラスのStatefulServiceBase
)です。これを念頭に置いて、通常のサービス、つまりIService
に基づくサービスインターフェイスで作業するのと同じ方法で、私たちの俳優に対応するActorService
で作業できます。
IActorStateProvider
(あなたが永続状態を使用している場合KvsActorStateProvider
によって実装)私たちが使用できる2つの方法があります。これらのメソッドへ
Task<T> LoadStateAsync<T>(ActorId actorId, string stateName, CancellationToken cancellationToken = null);
Task<PagedResult<ActorId>> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken);
を呼び出しますが、これらがあるので、理にかなっている、俳優のロックの影響を受けませんパーティション上のすべてのアクターをサポートするように設計されています。
例:
public interface IManyfoldActorService : IService
{
Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken);
}
public class ManyfoldActorService : ActorService, IManyfoldActorService
{
...
}
がProgram.Main
に新しいActorServiceを登録します:
カスタムActorService
を作成し、俳優をホストするために1つを使用し
ActorRuntime.RegisterActorAsync<ManyfoldActor>(
(context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult();
を私たちは持っていると仮定すると、次のメソッドを持つシンプルなActor:
Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken)
{
Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult();
var task = this.StateManager.SetStateAsync("count", count, cancellationToken);
ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}");
return task;
}
それは30秒間待機し(、メソッド呼び出しをブロック、長時間実行をシミュレートするために)、次いでint
に状態値"count"
を設定します。
私たちは今、いくつかの状態データを生成するために俳優のためのSetCountAsync
を呼び出すことができます別のサービスで:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
var actorProxyFactory = new ActorProxyFactory();
long iterations = 0;
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
iterations += 1;
var actorId = iterations % 10;
var count = Environment.TickCount % 100;
var manyfoldActor = actorProxyFactory.CreateActorProxy<IManyfoldActor>(new ActorId(actorId));
manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false);
ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}");
await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken);
}
}
この方法は、単純に延々と俳優の値を変更するループ。 (合計10人のアクターと、3秒間の遅延と30秒間のアクター間の相関関係に注意してください。ロックを待っているアクターコールの無限の蓄積を防ぐためにこの方法を単に設計しています)。各呼び出しはまた、火災と忘れて実行されるので、次の俳優の状態が更新される前にそれを更新することができます。その愚かなコードは、理論を証明するためにこのように設計されています。
今の俳優サービスで、我々は方法このようなGetCountsAsync
実装できます
public async Task<IDictionary<long, int>> GetCountsAsync(CancellationToken cancellationToken)
{
ContinuationToken continuationToken = null;
var actors = new Dictionary<long, int>();
do
{
var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken);
foreach (var actor in page.Items)
{
var count = await this.StateProvider.LoadStateAsync<int>(actor, "count", cancellationToken);
actors.Add(actor.GetLongId(), count);
}
continuationToken = page.ContinuationToken;
}
while (continuationToken != null);
return actors;
}
をこれは(そのパーティションの)すべての既知のアクターを照会するために基礎となるActorStateProvider
を使用して、直接、このために状態を読み込み、アクタのメソッド実行によってブロックされないようにします。
最後のピース、私たちのActorServiceを呼び出し、すべてのパーティション間でGetCountsAsync
を呼び出すことができ、いくつかの方法:D秒それは状態が更新され、ここで各だ取得します。今、私たちはすべての33の10人の俳優を与える
public IDictionary<long, int> Get()
{
var applicationName = FabricRuntime.GetActivationContext().ApplicationName;
var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}";
var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}");
var fabricClient = new FabricClient();
var partitions = new List<long>();
var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult();
foreach (var servicePartition in servicePartitionList)
{
var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation;
partitions.Add(partitionInformation.LowKey);
}
var serviceProxyFactory = new ServiceProxyFactory();
var actors = new Dictionary<long, int>();
foreach (var partition in partitions)
{
var actorService = serviceProxyFactory.CreateServiceProxy<IManyfoldActorService>(actorServiceUri, new ServicePartitionKey(partition));
var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult();
foreach (var count in counts)
{
actors.Add(count.Key, count.Value);
}
}
return actors;
}
このコードを実行します俳優は毎回30秒忙しいです。アクターサービスは、各アクターメソッドが返されると直ちに更新された状態を認識します。
このサンプルでは省略されているものがいくつかあります。たとえば、アクターサービスで状態をロードしたときなど、タイムアウトを防ぐ必要があるかもしれません。
アクターで長時間実行されるアクションを実行しないでください。すぐに応答できるはずです。 - 長時間実行するアクションが必要な場合は、実行したときに親に通知する子アクターにスピンオフします。 - そのようにして、親の俳優は反応し続けることができます。 – BrainSlugs83