私は任意のタイプのメッセージを購読して公開することができる静的メッセージバスを設計しています。オブザーバーに明示的に退会を要求するのを避けるために、代理人自身を追跡するのではなく、代理人を指すWeakReferenceオブジェクトを追跡したいと思います。私はPaul Stovellが彼のブログhttp://www.paulstovell.com/weakeventsで説明したのと同様のものをコーディングしました。複数のスレッドからのオブジェクトへの弱参照を追跡する






あなたの質問は? (質問の形でそれを明記してください) – Kiril


また、私たちはここでどの言語/プラットフォームを話していますか? Java? C#? –


ご意見ありがとうございます。私はVS2010で.NET 4.0でC#を使用しています。問題はこれです:適切なスレッドの同期が適切であれば、複数のスレッドからWeakReferenceオブジェクトに確実にアクセスできないはずですか?私には分かりませんが、私には分かりません。それで、私は何を正しくしていないのですか? – user1144037





using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Linq; 
using System.Linq.Expressions; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Test 
    class Program 
     static void Main(string[] args) 
      Console.WriteLine("Starting the app"); 
      Test test = new Test(); 
      // uncomment these lines to cause automatic unsubscription from Message1 
//   test = null; 
//   GC.Collect(); 
//   GC.WaitForPendingFinalizers(); 

      // publish Message1 on this thread 
      // MessageBus.Publish<Message1>(new Message1()); 

      // publish Message1 on another thread 
       MessageBus.Publish<Message1>(new Message1()); 

      while (!MessageBus.IamDone) 
      Console.WriteLine("Exiting the app"); 
      Console.WriteLine("Press <ENTER> to terminate program."); 

    public class Test 
     public Test() 
      Console.WriteLine("Subscribing to message 1."); 
      Console.WriteLine("Subscribing to message 2."); 

     public void OnMessage1(Message1 message) 
      Console.WriteLine("Got message 1. Publishing message 2"); 
      MessageBus.Publish<Message2>(new Message2()); 

     public void OnMessage2(Message2 message) 
      Console.WriteLine("Got message 2. Closing the app"); 
      MessageBus.IamDone = true; 

    public abstract class MessageBase 
     public string Message; 

    public class Message1 : MessageBase 

    public class Message2 : MessageBase 

    public static class MessageBus 
     // This is here purely for this test 
     public static bool IamDone = false; 

     /// <summary> 
     /// A dictionary of lists of handlers of messages by message type 
     /// </summary> 
     private static ConcurrentDictionary<string, List<WeakReference>> handlersDict = new ConcurrentDictionary<string, List<WeakReference>>(); 

     /// <summary> 
     /// Thread synchronization object to use with Publish calls 
     /// </summary> 
     private static object _lockPublishing = new object(); 

     /// <summary> 
     /// Thread synchronization object to use with Subscribe calls 
     /// </summary> 
     private static object _lockSubscribing = new object(); 

     /// <summary> 
     /// Creates a work queue item that encapsulates the provided parameterized message 
     /// and dispatches it. 
     /// </summary> 
     /// <typeparam name="TMessage">Message argument type</typeparam> 
     /// <param name="message">Message argument</param> 
     public static void Publish<TMessage>(TMessage message) 
      where TMessage : MessageBase 
      // create the dictionary key 
      string key = String.Empty; 
      key = typeof(TMessage).ToString(); 
      // initialize a queue work item argument as a tuple of the dictionary type key and the message argument 
      Tuple<string, TMessage, Exception> argument = new Tuple<string, TMessage, Exception>(key, message, null); 
      // push the message on the worker queue 
      ThreadPool.QueueUserWorkItem(new WaitCallback(_PublishMessage<TMessage>), argument); 

     /// <summary> 
     /// Publishes a message to the bus, causing observers to be invoked if appropriate. 
     /// </summary> 
     /// <typeparam name="TArg">Message argument type</typeparam> 
     /// <param name="stateInfo">Queue work item argument</param> 
     private static void _PublishMessage<TArg>(Object stateInfo) 
      where TArg : class 
       // translate the queue work item argument to extract the message type info and 
       // any arguments 
       Tuple<string, TArg, Exception> arg = (Tuple<string, TArg, Exception>)stateInfo; 
       // call all observers that have registered to receive this message type in parallel 
        // find the right dictionary list entry by message type identifier 
        .Where(handlerKey => handlerKey == arg.Item1) 
        // dereference the list entry by message type identifier to get a reference to the observer 
        .Select(handlerKey => handlersDict[handlerKey]), (handlerList, state) => 
         lock (_lockPublishing) 
          List<int> descopedRefIndexes = new List<int>(handlerList.Count); 
          // search the list of references and invoke registered observers 
          foreach (WeakReference weakRef in handlerList) 
           // try to obtain a strong reference to the target 
           Delegate dlgRef = (weakRef.Target as Delegate); 
           // check if the underlying delegate reference is still valid 
           if (dlgRef != null) 
            // yes it is, get the delegate reference via Target property, convert it to Action and invoke the observer 
             (dlgRef as Action<TArg>).Invoke(arg.Item2); 
            catch (Exception e) 
             // trouble invoking the target observer's reference, mark it for deletion 
             Console.WriteLine(String.Format("Error looking up target reference: {0}", e.Message)); 
            // the target observer's reference has been descoped, mark it for deletion 
            Console.WriteLine(String.Format("Message type \"{0}\" has been unsubscribed from.", arg.Item1)); 
            MessageBus.IamDone = true; 
          // remove any descoped references 
          descopedRefIndexes.ForEach(index => handlerList.RemoveAt(index)); 
      // catch all Exceptions 
      catch (AggregateException e) 
       Console.WriteLine(String.Format("Error dispatching messages: {0}", e.Message)); 

     /// <summary> 
     /// Subscribes the specified delegate to handle messages of type TMessage 
     /// </summary> 
     /// <typeparam name="TArg">Message argument type</typeparam> 
     /// <param name="action">WeakReference that represents the handler for this message type to be registered with the bus</param> 
     public static void Subscribe<TArg>(Action<TArg> action) 
      where TArg : class 
      // validate input 
      if (action == null) 
       throw new ArgumentNullException(String.Format("Error subscribing to message type \"{0}\": Specified action reference is null.", typeof(TArg))); 
      // build the queue work item key identifier 
      string key = typeof(TArg).ToString(); 
      // check if a message of this type was already added to the bus 
      if (!handlersDict.ContainsKey(key)) 
       // no, it was not, create a new dictionary entry and add the new observer's reference to it 
       List<WeakReference> newHandlerList = new List<WeakReference>(); 
       handlersDict.TryAdd(key, newHandlerList); 
      lock (_lockSubscribing) 
       // append this new observer's reference to the list, if it does not exist already 
       if (!handlersDict[key].Any(existing => (existing.Target as Delegate) != null && (existing.Target as Delegate).Equals(action))) 
        // append the new reference 
        handlersDict[key].Add(new WeakReference(action, true)); 


public class MessageBus : Singleton<MessageBus> // Singleton<> is my library class 


public static class MessageBus 

