zmq_proxy_steerable - Man Page

built-in 0MQ proxy with control flow

Synopsis

int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);

Description

The zmq_proxy_steerable() function starts the built-in 0MQ proxy in the current application thread, as zmq_proxy() do. Please, refer to this function for the general description and usage. We describe here only the additional control flow provided by the socket passed as the fourth argument "control".

If the control socket is not NULL, the proxy supports control flow. If PAUSE is received on this socket, the proxy suspends its activities. If RESUME is received, it goes on. If TERMINATE is received, it terminates smoothly. If STATISTICS is received, the proxy will reply on the control socket sending a multipart message with 8 frames, each with an unsigned integer 64-bit wide that provide in the following order: - number of messages received by the frontend socket - number of bytes received by the frontend socket - number of messages sent out the frontend socket - number of bytes sent out the frontend socket - number of messages received by the backend socket - number of bytes received by the backend socket - number of messages sent out the backend socket - number of bytes sent out the backend socket

At start, the proxy runs normally as if zmq_proxy was used.

If the control socket is NULL, the function behave exactly as if zmq_proxy(3) had been called.

Refer to zmq_socket(3) for a description of the available socket types. Refer to zmq_proxy(3) for a description of the zmq_proxy.

Example Usage

cf zmq_proxy

Return Value

The zmq_proxy_steerable() function returns 0 if TERMINATE is sent to its control socket. Otherwise, it returns -1 and errno set to ETERM or EINTR (the 0MQ context associated with either of the specified sockets was terminated).

Example

Creating a shared queue proxy.

//  Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);

//  Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);

// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));

//  Start the queue proxy, which runs until ETERM or "TERMINATE"
//  received on the control socket
zmq_proxy_steerable (frontend, backend, NULL, control);

Set up a controller in another node, process or whatever.

void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);

// pause the proxy
assert (zmq_send (control, "PAUSE", 5, 0) == 0);

// resume the proxy
assert (zmq_send (control, "RESUME", 6, 0) == 0);

// terminate the proxy
assert (zmq_send (control, "TERMINATE", 9, 0) == 0);

// check statistics
assert (zmq_send (control, "STATISTICS", 10, 0) == 0);
zmq_msg_t stats_msg;

while (1) {
    assert (zmq_msg_init (&stats_msg) == 0);
    assert (zmq_recvmsg (control, &stats_msg, 0) == sizeof (uint64_t));
    assert (rc == sizeof (uint64_t));
    printf ("Stat: %lu\n", *(unsigned long int *)zmq_msg_data (&stats_msg));
    if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
        break;
    assert (zmq_msg_close (&stats_msg) == 0);
}
assert (zmq_msg_close (&stats_msg) == 0);
---


SEE ALSO

zmq_proxy(3) zmq_bind(3) zmq_connect(3) zmq_socket(3) zmq(7)

Authors

This page was written by the 0MQ community. To make a change please read the 0MQ Contribution Policy at http://www.zeromq.org/docs:contributing.

Info

08/13/2020 0MQ 4.3.3 0MQ Manual