2016-07-14 5 views
1

は、次の2つのプロセスを考えてみましょう:ZeroMQ REQ/REP性能

sender.cpp:

#include <zhelpers.h> 
... 
zmq::context_t ctx(1); 
... 
void foo(int i) 
{ 
    zmq::socket_t sender(ctx, ZMQ_REQ); 
    sender.connect("tcp://hostname:5000"); 

    std::stringstream ss; 
    ss <<"bar_" <<i; 
    std::string bar_i(std::move(ss.str()); 

    s_sendmore(sender, "foo "); 
    (i != N) ? s_send(sender, bar, 0) : s_send(sender, "done", 0); 
    s_recv(sender); 
} 

int main() 
{ 
    for(int i=0; i<=100000; ++i) 
     foo(i); 
    return 0; 
} 

receiver.cpp

#include <zhelpers.h> 
... 
int main() 
{ 
    zmq::context_t ctx(1); 
    zmq::socket_t rcv(ctx, ZMQ_REP); 
    rcv.bind("tcp://*:5000"); 

    std::string s1(""); 
    std::string s2(""); 

    while(s2 != "done") 
    { 
     s1 = std::move(s_recv(rcv)); 
     s2 = std::move(s_recv(rcv)); 
     std::cout <<"received: " <<s1 <<" " <<s2 <<"\n"; 
     s_send(rcv, "ACK"); 
    } 

    return 0; 
} 

は、の2つのプロセスを始めましょう。まで、その上

foo bar_1 
foo bar_2 
... 

と::

... 
foo bar_100000 

そして私私は何を期待することは、受信プロセスは送信者がそれに送信し、それがプリントアウトしますすべてのメッセージを受信することですどんなブロッキングもせずにこれをやることを期待しています。

私の問題は、レシーバが常に28215回の繰り返し(常にその数の周りに!!!)し、1分ほどブロックすることです。それからそれはさらに100000まで進みますが、時にはそれは再び固執します。私の質問はもちろんです:なぜこれが起こっているのですか?どうすれば修正できますか?

グローバルスコープでfoo(。)内に '送付者'を入れようとしましたが、それがうまくいった:その場合、すべてのプリントアウトは1から100000までスムーズかつ超高速で、もちろんその場合、foo(。)が呼び出されるたびにソケットは作成されませんでした。しかし、残念ながら私のコードでは私はそれをすることはできません。

なぜこのブロックが起こっているのか理解したいと思います。

+0

最大ソケットはサーバー側で制限されるかもしれません。それを増やして解決してください。 tcpがデッドソケットをきれいにするのに時間がかかり、ソケットの最大数に達しているものがたくさんあるからです。 – somdoron

答えて

0

まず、あなたの例はコンパイルされていないため、あまりよくありません。だからここにあなたの意図に近いことと、実際にリンケ

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include sender.cpp -lzmq -o sender 

receiver.cpp

#include <zmq.hpp> 
#include <string> 
#include <cstring> 
#include <iostream> 

int main() { 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REP); 
    socket.bind ("tcp://*:5555"); 

    char buf[100] = {0}; 
    while (std::string(buf).compare("done")) { 
     zmq::message_t request; 

     // Wait for next request from client 
     socket.recv (&request); 
     std::memcpy(buf, request.data(), 100); 
     std::cout << "Received message " << buf << std::endl; 

     // Send reply back to client 
     zmq::message_t reply (5); 
     memcpy (reply.data(), "Hello", 5); 
     socket.send (reply); 
    } 
    return 0; 
} 

使用

sender.cpp

#include <zmq.hpp> 
#include <string> 
#include <iostream> 
#include <string> 

void send(const std::string& msg) 
{ 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REQ); 

    std::cout << "Connecting to receiver ..." << std::endl; 
    socket.connect ("tcp://localhost:5555"); 

    zmq::message_t request (100); 
    memcpy (request.data(), msg.c_str(), 100); 
    std::cout << "Sending message " << msg << "..." << std::endl; 
    socket.send (request); 
} 

int main() 
{ 
    for(int i = 0; i < 100000; ++i) 
    { 
     send(std::to_string(i)); 
    } 
    send("done"); 
} 

使用の何かをコンパイルする必要があり、いくつかのexapmlesです

プロセスの起動時に

、すべてが正常に動作するよう、ブレークのない予想通り、受信機の出力は次のようになります。

Received message 99996 
Received message 99997 
Received message 99998 
Received message 99999 
Received message done 

しかし、私は何を期待します。netstatを見て:

netstat 
Active Internet connections (w/o servers) 
Proto Recv-Q Send-Q Local Address   Foreign Address   State  
tcp  0  0 localhost:38345   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46228   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60309   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46916   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:47600   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:54454   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46409   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51142   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40355   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40005   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45614   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48974   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41427   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58740   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58754   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60044   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57478   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:50419   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:44361   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:37284   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:38662   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45968   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57407   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:59200   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41292   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:55243   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51489   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48865   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:35491   localhost:5555   TIME_WAIT 
... 

私は1回実行した後、TIME_WAIT状態で20k(!)のソケットを持っています。これはsocketの可変スコープがvoid send(...)sender.cppにあるためです。範囲外になったときにソケットを破壊するときにzmqが何をするのか正確にはわかりませんが、このTIME_WAIT状態でソケットを持ってくるソケットのfdのどこかでclose()を呼ぶと確信しています。送信者と受信者が円滑に動作しても、あなたのシステムがこのような多くのソケットをどのように処理するのか分かりません。また、あなたのzhelpers.hファイルが何をしているのか分かりません。しかし、ソケットをグローバルスコープに置くと、1つのソケットで送信側でclose()コールが1回だけ発生することがわかります。もっと調査するためにここから始めます。多分、how-to-forcibly-close-a-socket-in-time-waitを見てください...

+0

ありがとうございます。チェックします。私のコードはコンパイルできませんでした。私はちょうどあなた自身に問題そのものを示すことを意味しました(つまり、私は '...'を使用しています)。私はあらゆる細部に行きたくはありませんでした。たとえば、zhelpers.hは次のようになります。https://github.com/imatix/zguide2/tree/master/examples/C%2B%2B – gybacsi

+1

もう1つのことは、おそらく宣言したくないということですスコープをグローバルスコープで送信していますが、スコープを少し大きくして、つまり送信ループの外側に宣言して少なくとも100000個のメッセージを送信するために再使用する場合は、TIME_WAIT指定ソケットの何千もの問題を解決してください。 – yussuf