2017-01-28 5 views
0

私はAkka.netでCQRSとEvent Sourcingを適用する方法に興味があります。私はすでにAkka.PersistenceにES部分を配達していることに気づいた。Akka.netとAkka.PersistenceでCQRSのEventBusを適用する方法

私が理解している限り、コマンドハンドラとAggergateRootは、単一クラスのReceivePersistentActorで表すことができます。私は

public abstract class AggregateRoot<TState> : ReceivePersistentActor 
{ 
    private readonly Func<TState, bool> shouldSafeSnapShot; 

    protected AggregateRoot(TState state, Func<TState, bool> shouldSafeSnapShot) 
    { 
     if (state == null) throw new ArgumentNullException(nameof(state)); 
     if (shouldSafeSnapShot == null) throw new ArgumentNullException(nameof(shouldSafeSnapShot)); 

     var path = this.Self.Path; 
     this.PersistenceId = $"{path.Parent.Name}/{path.Name}"; 
     this.State = state; 
     this.shouldSafeSnapShot = shouldSafeSnapShot; 
    } 

    public sealed override string PersistenceId { get; } 

    protected TState State { get; } 

    protected void Emit<TEvent>(TEvent e, Action<TEvent> apply = null) 
    { 
     this.Persist(e, 
      @event => 
       { 
        // update state 
        apply?.Invoke(@event); 

        // safe snapshot or so... 
        this.SaveSnapshotIfRequired(); 

        // publish to event bus? 
        Context.System.EventStream.Publish(e); 
       }); 
    } 

    private void SaveSnapshotIfRequired() 
    { 
     var state = this.State; 

     if (this.shouldSafeSnapShot(state)) 
     { 
      this.SaveSnapshot(state); 
     } 
    } 
} 

がない私はEventBusの上にイベントを送りたい...(ここでは完了していない)そのための基本クラスを持っており、アッカは、それが行われている方法に合うかもしれないパッケージだ以内に何かを持っているようです他のCQRSフレームワーク(EventBusの抽象概念は NCqrsInceptum)またはグレゴリー・ヤングの簡単なサンプル hereと比較してください。

// publish to event bus? 
Context.System.EventStream.Publish(e); 

しかし、AggregateRootドメインオブジェクトの俳優の中eventbusの実装を知っている私には奇妙なようです。私は俳優の子供に責任を置くか、実装を切り替えるために使用されるIAbstractEventBusインターフェースのようなものを注入することができますが、永続化してすべての加入者に通知するのはAggregateRootの責任ではないと思います!? しかし、ESの部分は、俳優に結合されているようです。

Akka.netとAkka.Persistenceでこれを行う典型的な方法は何ですか?どのデザインアイデアをどのように分割するか?理想主義の世界では

私はAkka.netのIEventAdapterインターフェイスをアップつまずいたが、私はこれが正しい道(またはダークサイド)に私をもたらしわからないんだけど...

public class SomeEventAdapter : IEventAdapter 
{ 
    public object ToJournal(object evt) 
    { 
     return evt; // should I add the event emitter here? 
     // but is this already stored now? hmmm 
    } 
} 

答えて

1

あなたのコマンドハンドラはイベントを追跡してイベントバスにプッシュし、コマンドを問題なく完全に処理すると、何らかの形でsotrageに保持することができます。これにより、集計メソッドが完了する前にイベントを早期にプッシュしないことが保証されますそれはすべきイベントです。

Akka.netでは、コマンドを集計に渡すときにTell(尋ねる)を変更することができます(純粋さのために集計からハンドラーを分離していると仮定して)ハンドラはそのハンドラに保持されているので、ハンドラはそれらを(おそらく間接的に... UOWの何らかの形式で)送信して永続させることができます。ただし、コマンドハンドラが処理するように設定されている他のコマンドはありませんあなたの集約が完了するまで処理されます...他の集約はおそらく1つのハンドラによって処理されますが - 結局ロックを要求する... Tellはしません。

他のシステムでは、持続層自体がイベントバスとして機能します。 Greg YoungのEventStoreはこれらのうちの1つです...あなたはあなたのアプリケーションから懸念事項を完全に取り除きます)。

これは理想的な理論と実際の実装とフレームワークの限界の課題です。

理想主義的な世界では、集約ルートは、発生するイベントの格納には関係ありません...提供されたイベントの収集から水分を取り除き、イベントを発生させ、他のコンポーネントにブロードキャストを管理させることです、持続性とそれらのイベントの(水和のための)照会。私はAkka.netがアクターにDBアクセス/永続化レイヤーをうまくロールインしていることを知っています。これは素晴らしいことですが、純粋なSOLID CQRSの実装から逸脱しています。そういうわけで、あなたが実装されている場所、つまりどこにいても、水を泥だらけにすることができます。これはcqrs.netから取られ、その後

Receive<SayHelloWorldCommand>(command => Execute(SayHello, command)); 

経由でアップ配線することができる

protected virtual void Execute<TCommand>(Action<TCommand> action, TCommand command) 
    { 
     UnitOfWork.Add(this); 
     try 
     { 
      action(command); 

      UnitOfWork.Commit(); 

      Sender.Tell(true, Self); 
     } 
     catch(Exception exception) 
     { 
      Logger.LogError("Executing an Akka.net request failed.", exception: exception); 
      Sender.Tell(false, Self); 
      throw; 
     } 
    } 

これは理想からはほど遠い

が、1つのオプションは、集約内作業(UOW)の単位を使用していますそれをハンドラに戻すのではなく、これは、上記の懸念を直接的に集約して保持し、何らかの形でSOLIDの実装を維持するために行った妥協です。

1

@Beachwalker私はhttps://github.com/thangchung/magazine-website-akkaでCQRSアプローチを使って実装しました。私はそれがあなたを助けることができるかどうかわからない?しかし、何か役に立つものが見つかることを願っています。私はそれのための多くの機能と機能を追加し続けています。

関連する問題