zeromq plugin: Added support for High Water Mark socket option
authorJulien Ammous <j.ammous@gmail.com>
Sat, 13 Nov 2010 17:31:06 +0000 (18:31 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 16 Nov 2010 10:05:03 +0000 (11:05 +0100)
src/zeromq.c

index bc672dc..6b75a9d 100644 (file)
@@ -220,13 +220,14 @@ static int write_value (const data_set_t *ds, /* {{{ */
   }
 
   // try to send the message
-  if( zmq_send(cmq_socket, &msg, /* flags = */ 0) != 0 ) {
+  if( zmq_send(cmq_socket, &msg, ZMQ_NOBLOCK) != 0 ) {
     if( errno == EAGAIN ) {
-      WARNING("ZeroMQ: Cannot send message, queue is full");
+      WARNING("ZeroMQ: Unable to queue message, queue may be full");
+      return -1;
     }
     else {
       ERROR("zmq_send : %s", zmq_strerror(errno));
-      return 1;
+      return -1;
     }
   }
 
@@ -352,6 +353,26 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
       endpoints_num++;
       continue;
     } /* Endpoint */
+    else if( strcasecmp("HWM", child->key) == 0 )
+    {
+      int tmp;
+      uint64_t hwm;
+      
+      status = cf_util_get_int(child, &tmp);
+      if( status != 0 )
+        continue;
+      
+      hwm = (uint64_t) tmp;
+      
+      status = zmq_setsockopt (cmq_socket, ZMQ_HWM, &hwm, sizeof(hwm));
+      if (status != 0) {
+        ERROR ("zeromq plugin: zmq_setsockopt (ZMQ_HWM) failed: %s", zmq_strerror (errno));
+        (void) zmq_close (cmq_socket);
+        return (-1);
+      }
+      
+      continue;
+    } /* HWM */
     else
     {
       ERROR ("zeromq plugin: The \"%s\" config option is now allowed here.",
@@ -424,7 +445,10 @@ static int cmq_config_socket (oconfig_item_t *ci) /* {{{ */
  * Config schema:
  *
  * <Plugin "zeromq">
+ *   Threads 2
+ *
  *   <Socket Publish>
+ *     HWM 300
  *     Endpoint "tcp://localhost:6666"
  *   </Socket>
  *   <Socket Subscribe>