1

私は任意のタイプのメッセージを購読して公開することができる静的メッセージバスを設計しています。オブザーバーに明示的に退会を要求するのを避けるために、代理人自身を追跡するのではなく、代理人を指すWeakReferenceオブジェクトを追跡したいと思います。私はPaul Stovellが彼のブログhttp://www.paulstovell.com/weakeventsで説明したのと同様のものをコーディングしました。複数のスレッドからのオブジェクトへの弱参照を追跡する

私の問題はこれです:Paulのコードとは異なり、私のオブザーバーはあるスレッドのメッセージを購読しますが、メッセージは別のスレッドに公開される可能性があります。この場合、オブザーバーに通知する必要がある時間までに、WeakReference.Targetの値は、ターゲットが収集されたことを示すnullであることがわかります。この問題は、短い弱参照と長い弱参照の両方で持続します。

逆に、同じスレッドから購読と発行を行うと、コードは正常に動作します。後者は、私が実際にメッセージを購読する同じスレッドからの要求である限り、実際にはThreadPoolの新しいスレッドのターゲットを列挙することになります。

私はこれが非常に特殊なケースであることを理解していますので、助けていただければ幸いです。

私の質問は:適切なスレッド同期が適切であれば、複数のスレッドからWeakReferenceオブジェクトに確実にアクセスできないのでしょうか?私には分かりませんが、私には分かりません。それで、私は何を正しくしていないのですか?

+0

あなたの質問は? (質問の形でそれを明記してください) – Kiril

+0

また、私たちはここでどの言語/プラットフォームを話していますか? Java? C#? –

+0

ご意見ありがとうございます。私はVS2010で.NET 4.0でC#を使用しています。問題はこれです:適切なスレッドの同期が適切であれば、複数のスレッドからWeakReferenceオブジェクトに確実にアクセスできないはずですか?私には分かりませんが、私には分かりません。それで、私は何を正しくしていないのですか? – user1144037

答えて

0

シンプルなフォームにコードを縮小したような感じです(下記参照)。これは、弱い参照ターゲットをあまりにも早く収集させる原因となった問題は、自分のコードのどこかに存在しなければならないということです。だから、私自身の質問に答えるために、弱参照は複数のスレッドから安全にアクセスできるように見えます。ここで

は私のテストコードです:

using System; 
using System.Collections.Concurrent; 
using System.Collections.Generic; 
using System.Linq; 
using System.Linq.Expressions; 
using System.Threading; 
using System.Threading.Tasks; 


namespace Test 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Starting the app"); 
      Test test = new Test(); 
      // uncomment these lines to cause automatic unsubscription from Message1 
//   test = null; 
//   GC.Collect(); 
//   GC.WaitForPendingFinalizers(); 

      // publish Message1 on this thread 
      // MessageBus.Publish<Message1>(new Message1()); 

      // publish Message1 on another thread 
      ThreadPool.QueueUserWorkItem(delegate 
      { 
       MessageBus.Publish<Message1>(new Message1()); 
      }); 


      while (!MessageBus.IamDone) 
      { 
       Thread.Sleep(100); 
      } 
      Console.WriteLine("Exiting the app"); 
      Console.WriteLine("Press <ENTER> to terminate program."); 
      Console.WriteLine(); 
      Console.ReadLine(); 
     } 
    } 

    public class Test 
    { 
     public Test() 
     { 
      Console.WriteLine("Subscribing to message 1."); 
      MessageBus.Subscribe<Message1>(OnMessage1); 
      Console.WriteLine("Subscribing to message 2."); 
      MessageBus.Subscribe<Message2>(OnMessage2); 
     } 

     public void OnMessage1(Message1 message) 
     { 
      Console.WriteLine("Got message 1. Publishing message 2"); 
      MessageBus.Publish<Message2>(new Message2()); 
     } 

     public void OnMessage2(Message2 message) 
     { 
      Console.WriteLine("Got message 2. Closing the app"); 
      MessageBus.IamDone = true; 
     } 
    } 

    public abstract class MessageBase 
    { 
     public string Message; 
    } 

    public class Message1 : MessageBase 
    { 
    } 

    public class Message2 : MessageBase 
    { 
    } 

    public static class MessageBus 
    { 
     // This is here purely for this test 
     public static bool IamDone = false; 
     ///////////////////////////////////// 

     /// <summary> 
     /// A dictionary of lists of handlers of messages by message type 
     /// </summary> 
     private static ConcurrentDictionary<string, List<WeakReference>> handlersDict = new ConcurrentDictionary<string, List<WeakReference>>(); 

     /// <summary> 
     /// Thread synchronization object to use with Publish calls 
     /// </summary> 
     private static object _lockPublishing = new object(); 

     /// <summary> 
     /// Thread synchronization object to use with Subscribe calls 
     /// </summary> 
     private static object _lockSubscribing = new object(); 

     /// <summary> 
     /// Creates a work queue item that encapsulates the provided parameterized message 
     /// and dispatches it. 
     /// </summary> 
     /// <typeparam name="TMessage">Message argument type</typeparam> 
     /// <param name="message">Message argument</param> 
     public static void Publish<TMessage>(TMessage message) 
      where TMessage : MessageBase 
     { 
      // create the dictionary key 
      string key = String.Empty; 
      key = typeof(TMessage).ToString(); 
      // initialize a queue work item argument as a tuple of the dictionary type key and the message argument 
      Tuple<string, TMessage, Exception> argument = new Tuple<string, TMessage, Exception>(key, message, null); 
      // push the message on the worker queue 
      ThreadPool.QueueUserWorkItem(new WaitCallback(_PublishMessage<TMessage>), argument); 
     } 

     /// <summary> 
     /// Publishes a message to the bus, causing observers to be invoked if appropriate. 
     /// </summary> 
     /// <typeparam name="TArg">Message argument type</typeparam> 
     /// <param name="stateInfo">Queue work item argument</param> 
     private static void _PublishMessage<TArg>(Object stateInfo) 
      where TArg : class 
     { 
      try 
      { 
       // translate the queue work item argument to extract the message type info and 
       // any arguments 
       Tuple<string, TArg, Exception> arg = (Tuple<string, TArg, Exception>)stateInfo; 
       // call all observers that have registered to receive this message type in parallel 
       Parallel.ForEach(handlersDict.Keys 
        // find the right dictionary list entry by message type identifier 
        .Where(handlerKey => handlerKey == arg.Item1) 
        // dereference the list entry by message type identifier to get a reference to the observer 
        .Select(handlerKey => handlersDict[handlerKey]), (handlerList, state) => 
        { 
         lock (_lockPublishing) 
         { 
          List<int> descopedRefIndexes = new List<int>(handlerList.Count); 
          // search the list of references and invoke registered observers 
          foreach (WeakReference weakRef in handlerList) 
          { 
           // try to obtain a strong reference to the target 
           Delegate dlgRef = (weakRef.Target as Delegate); 
           // check if the underlying delegate reference is still valid 
           if (dlgRef != null) 
           { 
            // yes it is, get the delegate reference via Target property, convert it to Action and invoke the observer 
            try 
            { 
             (dlgRef as Action<TArg>).Invoke(arg.Item2); 
            } 
            catch (Exception e) 
            { 
             // trouble invoking the target observer's reference, mark it for deletion 
             descopedRefIndexes.Add(handlerList.IndexOf(weakRef)); 
             Console.WriteLine(String.Format("Error looking up target reference: {0}", e.Message)); 
            } 
           } 
           else 
           { 
            // the target observer's reference has been descoped, mark it for deletion 
            descopedRefIndexes.Add(handlerList.IndexOf(weakRef)); 
            Console.WriteLine(String.Format("Message type \"{0}\" has been unsubscribed from.", arg.Item1)); 
            MessageBus.IamDone = true; 
           } 
          } 
          // remove any descoped references 
          descopedRefIndexes.ForEach(index => handlerList.RemoveAt(index)); 
         } 
        }); 
      } 
      // catch all Exceptions 
      catch (AggregateException e) 
      { 
       Console.WriteLine(String.Format("Error dispatching messages: {0}", e.Message)); 
      } 
     } 

     /// <summary> 
     /// Subscribes the specified delegate to handle messages of type TMessage 
     /// </summary> 
     /// <typeparam name="TArg">Message argument type</typeparam> 
     /// <param name="action">WeakReference that represents the handler for this message type to be registered with the bus</param> 
     public static void Subscribe<TArg>(Action<TArg> action) 
      where TArg : class 
     { 
      // validate input 
      if (action == null) 
       throw new ArgumentNullException(String.Format("Error subscribing to message type \"{0}\": Specified action reference is null.", typeof(TArg))); 
      // build the queue work item key identifier 
      string key = typeof(TArg).ToString(); 
      // check if a message of this type was already added to the bus 
      if (!handlersDict.ContainsKey(key)) 
      { 
       // no, it was not, create a new dictionary entry and add the new observer's reference to it 
       List<WeakReference> newHandlerList = new List<WeakReference>(); 
       handlersDict.TryAdd(key, newHandlerList); 
      } 
      lock (_lockSubscribing) 
      { 
       // append this new observer's reference to the list, if it does not exist already 
       if (!handlersDict[key].Any(existing => (existing.Target as Delegate) != null && (existing.Target as Delegate).Equals(action))) 
       { 
        // append the new reference 
        handlersDict[key].Add(new WeakReference(action, true)); 
       } 
      } 
     } 
    } 
} 
0

これが私の前の回答の改正です。私は元のコードがうまくいかなかった理由を発見しました。この情報は他の人にとって役に立ちます。私の元のコードMessageBusでシングルトンとしてインスタンス化された:上記の例で

public class MessageBus : Singleton<MessageBus> // Singleton<> is my library class 

、それはstaticとして宣言されました:私は、静的を使用するように私のコードを変換すると

public static class MessageBus 

、物事が仕事を始めました。それを言っても、シングルトンがなぜ機能しなかったのかまだ分かりませんでした。

関連する問題