2017-07-03 1 views
0

プロデューサ独自のSynchronizedBlockingQueueを持つ異なるスレッド。 各プロデューサはメッセージをそれ自身のキューに入れます。キューのいずれかのいずれかからのメッセージを取得し、プロセスを開始しますブローカなしでコンシューマにメッセージを渡す方法

消費者

別のスレッド。

プロデューサーとコンシューマーのコミュニケーションには、ブローカーが必要です。ボトルネックになる可能性があります。消費者がプロデューサーと開始プロセスから1つのメッセージを得る他の方法はありますか?

+0

それぞれのプロデューサのキューを公開し、それぞれのプロデューサにすべてのプロデューサを最も重大なシナリオでポーリングする必要があります。私はむしろアーキテクチャがよりエレガントで(使い易い)、ボトルネックがあれば、ほとんどのブローカー(rabbitmq、activemq ...など)が分散アーキテクチャをサポートするため、ブローカーを使用したいと思います。それがなければ、あなたはそれを自分でやる必要があります。 – Adonis

答えて

0

言語を指定していないので、私はAdaプログラミング言語を使用した一般的な例を提供すると思いました。この例では、コンシューマは単にプロデューサからメッセージを出力しますが、ここで説明したプロデューサ - コンシューマアーキテクチャが提供されます。

with Ada.Task_Identification; use Ada.Task_Identification; 

package Multiple_Producer is 
    type Producer_Message is private; 

    protected type Buffer is 
     entry Set_Message (Item : in Producer_Message); 
     entry Get_Message (Item : out Producer_Message); 
    private 
     Msg : Producer_Message; 
     Is_New : Boolean := False; 
    end Buffer; 
    type Buf_Alias is access all Buffer; 
    type Buf_Array is array (Positive range <>) of aliased Buffer; 
    type Buf_Access is access all Buf_Array; 

    task type Producer is 
     entry Set_Buffer (Item : Buf_Alias); 
     entry Stop; 
    end Producer; 


    task Consumer is 
     entry Set_Buffers (Item : Buf_Access); 
     entry Stop; 
    end Consumer; 

private 
    type Producer_Message is record 
     the_Task : Task_Id; 
     Value : Integer; 
    end record; 
end Multiple_Producer; 

with Ada.Text_IO; use Ada.Text_IO; 

package body Multiple_Producer is 

    -------------- 
    -- Producer -- 
    -------------- 

    task body Producer is 
     Message : Producer_Message := (Current_Task, 0); 
     The_Buf : Buf_Alias; 
    begin 
     accept Set_Buffer(Item : in Buf_Alias) do 
     The_Buf := Item; 
     end Set_Buffer; 

     loop 
     select 
      accept Stop; 
      exit; 
     else 
      delay 0.01; 
      The_Buf.Set_Message(Message); 
      Message.Value := Message.Value + 1; 
     end select; 
     end loop; 
    end Producer; 

    -------------- 
    -- Consumer -- 
    -------------- 

    task body Consumer is 
     Message : Producer_Message; 
     Buffers : Buf_Access; 
    begin 
     accept Set_Buffers(Item : Buf_Access) do 
     Buffers := Item; 
     end Set_Buffers; 

     loop 
     select 
      accept Stop; 
      exit; 
     else 
      -- Poll the buffers 
      for I in Buffers'Range loop 
       select 
        Buffers(I).Get_Message(Message); 
        Put_Line(Image(Message.The_Task) & ": " & 
          Integer'Image(Message.Value)); 
       or 
        delay 0.001; 
       end select; 
      end loop; 
     end select; 
     end loop; 
    end Consumer; 

    ------------ 
    -- Buffer -- 
    ------------ 

    protected body Buffer is 

     ----------------- 
     -- Set_Message -- 
     ----------------- 

     entry Set_Message (Item : in Producer_Message) when not Is_New is 
     begin 
     Msg := Item; 
     Is_New := True; 
     end Set_Message; 

     ----------------- 
     -- Get_Message -- 
     ----------------- 

     entry Get_Message (Item : out Producer_Message) when Is_New is 
     begin 
     Item := Msg; 
     Is_New := False; 
     end Get_Message; 

    end Buffer; 

end Multiple_Producer; 

with Multiple_Producer; use Multiple_Producer; 

procedure Main is 
    subtype Producer_Range is Positive range 1..5; 
    The_Producers : array(Producer_Range) of Producer; 
    The_Buffers : Buf_Access := new Buf_Array(Producer_Range); 
begin 
    for I in Producer_Range loop 
     The_Producers(I).Set_Buffer(The_Buffers(I)'Access); 
    end loop; 
    Consumer.Set_Buffers(The_Buffers); 
    delay 4.0; 
    for P of The_Producers loop 
     P.Stop; 
    end loop; 
    Consumer.Stop; 
end Main; 
+0

言語 - Java – user3805189

関連する問題