c++ - zmq::proxy example doesn't work () -
i learning how use lib zeromq friend advise me use personnal project.
after reading documentation , planning how use lib project, began testing project code given documentation. test used this one . unfortunatly doesn't work. did minor modification test it. (i give exact code have on test, it's lot sorry without think doesn't make sense , it's impossible me :/ ).
i changed nothing test given documentation, added output test , deleted poll in client (i thought probleme came here because blocking infinite loop thought there timeout).
#include <vector> #include <thread> #include <memory> #include <functional> #include <zmq.h> #include <zmq.hpp> #include <zhelper.hpp> // our client task class. // connects server, , sends request once per second // collects responses arrive, , prints them out. // run several client tasks in parallel, each different random id. // attention! -- random work on linux. class client_task { public: client_task() : ctx_(1), client_socket_(ctx_, zmq_dealer) {} void start() { // generate random identity char identity[10] = {}; sprintf(identity, "%04x-%04x", within(0x10000), within(0x10000)); printf("-> %s\n", identity); client_socket_.setsockopt(zmq_identity, identity, strlen(identity)); client_socket_.connect("tcp://localhost:5570"); zmq_pollitem_t items; items.socket = &client_socket_; items.fd = 0; items.events = zmq_pollin; items.revents = 0; int request_nbr = 0; try { while (true) { (int = 0; < 100; ++i) { // 10 milliseconds sleep(1); std::cout << "ici" << std::endl; if (items.revents & zmq_pollin) { printf("\n%s ", identity); s_dump(client_socket_); } char request_string[16] = {}; sprintf(request_string, "request #%d", ++request_nbr); client_socket_.send(request_string, strlen(request_string)); } } } catch (std::exception &e) {} } private: zmq::context_t ctx_; zmq::socket_t client_socket_; }; // each worker task works on 1 request @ time , sends random number // of replies back, random delays between replies: class server_worker { public: server_worker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), worker_(ctx_, sock_type) {} void work() { worker_.connect("inproc://backend"); try { while (true) { zmq::message_t identity; zmq::message_t msg; zmq::message_t copied_id; zmq::message_t copied_msg; worker_.recv(&identity); worker_.recv(&msg); std::cout << "i never arrive here" << std::endl; int replies = within(5); (int reply = 0; reply < replies; ++reply) { std::cout << "la" << std::endl; s_sleep(within(1000) + 1); copied_id.copy(&identity); copied_msg.copy(&msg); worker_.send(copied_id, zmq_sndmore); worker_.send(copied_msg); } } } catch (std::exception &e) {} } private: zmq::context_t &ctx_; zmq::socket_t worker_; }; // our server task. // uses multithreaded server model deal requests out pool // of workers , route replies clients. 1 worker can handle // 1 request @ time 1 client can talk multiple workers @ // once. class server_task { public: server_task() : ctx_(1), frontend_(ctx_, zmq_router), backend_(ctx_, zmq_dealer) {} void run() { frontend_.bind("tcp://*:5570"); backend_.bind("inproc://backend"); server_worker * worker = new server_worker(ctx_, zmq_dealer); std::thread worker_thread(std::bind(&server_worker::work, worker)); worker_thread.detach(); try { zmq::proxy(&frontend_, &backend_, null); } catch (std::exception &e) {} } private: zmq::context_t ctx_; zmq::socket_t frontend_; zmq::socket_t backend_; }; // main thread starts several clients , server, , // waits server finish. int main (void) { client_task ct1; client_task ct2; client_task ct3; server_task st; std::thread t1(std::bind(&client_task::start, &ct1)); std::thread t2(std::bind(&client_task::start, &ct2)); std::thread t3(std::bind(&client_task::start, &ct3)); std::thread t4(std::bind(&server_task::run, &st)); t1.detach(); t2.detach(); t3.detach(); t4.detach(); std::cout << "ok" << std::endl; getchar(); std::cout << "ok" << std::endl; return 0; }
the output code following :
-> cc66-c879 -> 3292-e961 -> c4aa-55d1 ok ici ici ici ... (infinite ici)
i don't understand why doesn't work. poll in client send exception socket operation on non-socket. major probleme me it's test coming official documentation , can't make work. problem utilisation of socket ?
thanks help
i found out problem.
there problem in official documentation (some obvious mistake initialisation of zmq_pollitem_t array) , 1 made test not working.
for zmq::poll or zmq::proxy, need cast socket structure in void* , mustn't use pointer on socket. zmq poll not working
after modification worked. did post explain why here
here corrected code without additionnal testing output :
// asynchronous client-to-server (dealer router) // // while example runs in single process, make // easier start , stop example. each task has own // context , conceptually acts separate process. #include <vector> #include <thread> #include <memory> #include <functional> #include <zmq.h> #include <zmq.hpp> #include <zhelper.hpp> // our client task class. // connects server, , sends request once per second // collects responses arrive, , prints them out. // run several client tasks in parallel, each different random id. // attention! -- random work on linux. class client_task { public: client_task() : ctx_(1), client_socket_(ctx_, zmq_dealer) {} void start() { // generate random identity char identity[10] = {}; sprintf(identity, "%04x-%04x", within(0x10000), within(0x10000)); printf("-> %s\n", identity); client_socket_.setsockopt(zmq_identity, identity, strlen(identity)); client_socket_.connect("tcp://localhost:5555"); zmq_pollitem_t items[1]; items[0].socket = static_cast<void *> (client_socket_); items[0].fd = 0; items[0].events = zmq_pollin; items[0].revents = 0; int request_nbr = 0; try { while (true) { (int = 0 ; < 100; ++i) { zmq::poll(items, 1, 10); if (items[0].revents & zmq_pollin) { printf("\n%s =>", identity); s_dump(client_socket_); } } char request_string[16] = {}; sprintf(request_string, "request #%d", ++request_nbr); client_socket_.send(request_string, strlen(request_string)); } } catch (std::exception &e) { std::cout << "exception : " << zmq_errno() << " "<< e.what() << std::endl; if (zmq_errno() == eintr) std::cout << "lol"<< std::endl; } } private: zmq::context_t ctx_; zmq::socket_t client_socket_; }; // each worker task works on 1 request @ time , sends random number // of replies back, random delays between replies: class server_worker { public: server_worker(zmq::context_t &ctx, int sock_type) : ctx_(ctx), worker_(ctx_, sock_type) {} void work() { worker_.connect("inproc://backend"); try { while (true) { zmq::message_t identity; zmq::message_t msg; zmq::message_t copied_id; zmq::message_t copied_msg; worker_.recv(&identity); worker_.recv(&msg); int replies = within(5); (int reply = 0; reply < replies; ++reply) { s_sleep(within(1000) + 1); copied_id.copy(&identity); copied_msg.copy(&msg); worker_.send(copied_id, zmq_sndmore); worker_.send(copied_msg); } } } catch (std::exception &e) { std::cout << "error in worker : " << e.what() << std::endl; } } private: zmq::context_t &ctx_; zmq::socket_t worker_; }; // our server task. // uses multithreaded server model deal requests out pool // of workers , route replies clients. 1 worker can handle // 1 request @ time 1 client can talk multiple workers @ // once. class server_task { public: server_task() : ctx_(1), frontend_(ctx_, zmq_router), backend_(ctx_, zmq_dealer) {} void run() { frontend_.bind("tcp://*:5555"); backend_.bind("inproc://backend"); server_worker * worker = new server_worker(ctx_, zmq_dealer); std::thread worker_thread(std::bind(&server_worker::work, worker)); worker_thread.detach(); try { zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), null); } catch (std::exception &e) { std::cout << "error in server : " << e.what() << std::endl; } } private: zmq::context_t ctx_; zmq::socket_t frontend_; zmq::socket_t backend_; }; // main thread starts several clients , server, , // waits server finish. int main (void) { client_task ct1; client_task ct2; client_task ct3; server_task st; std::thread t4(std::bind(&server_task::run, &st)); t4.detach(); std::thread t1(std::bind(&client_task::start, &ct1)); std::thread t2(std::bind(&client_task::start, &ct2)); std::thread t3(std::bind(&client_task::start, &ct3)); t1.detach(); t2.detach(); t3.detach(); getchar(); return 0; }
Comments
Post a Comment