Add notify handler
authorAndrew Smith <ansmith@redhat.com>
Mon, 15 Jan 2018 14:09:21 +0000 (09:09 -0500)
committerAndrew Smith <ansmith@redhat.com>
Mon, 15 Jan 2018 14:09:21 +0000 (09:09 -0500)
src/amqp1.c
src/collectd.conf.pod

index dcd17dd..2474664 100644 (file)
@@ -67,6 +67,7 @@ typedef struct amqp1_config_transport_t {
 typedef struct amqp1_config_instance_t {
   DEQ_LINKS(struct amqp1_config_instance_t);
   char              *name;
+  _Bool             notify;
   uint8_t           format;
   unsigned int      graphite_flags;
   _Bool             store_rates;
@@ -87,7 +88,7 @@ typedef struct cd_message_t {
 
 DEQ_DECLARE(cd_message_t, cd_message_list_t);
 
-/* 
+/*
  * Globals
  */
 pn_connection_t          *conn = NULL;
@@ -153,7 +154,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
     pn_link_advance(link);
     if (cdm->instance->pre_settle == true) {
       pn_delivery_settle(dlv);
-    }    
+    }
     event_count++;
     cd_message_free(cdm);
     cdm = DEQ_HEAD(to_send);
@@ -212,7 +213,7 @@ static bool handle(pn_event_t *event) /* {{{ */
     }
     break;
   }
-       
+
   case PN_TRANSPORT_CLOSED: {
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     break;
@@ -235,7 +236,7 @@ static bool handle(pn_event_t *event) /* {{{ */
     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
     pn_connection_close(pn_event_connection(event));
     break;
-  }    
+  }
 
   case PN_PROACTOR_INACTIVE: {
     return false;
@@ -265,21 +266,100 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
   return NULL;
 } /* }}} void event_thread */
 
+static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {{{ */
+{
+  size_t       bufsize = BUFSIZE;
+  pn_data_t    *body;
+  pn_message_t *message;
+
+  /* encode message */
+  message = pn_message();
+  pn_message_set_address(message, instance->send_to);
+  body = pn_message_body(message);
+  pn_data_clear(body);
+  pn_data_put_binary(body, cdm->mbuf);
+  pn_data_exit(body);
+
+  /* put_binary copies and stores so ok to use mbuf */
+  cdm->mbuf.size = bufsize;
+  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+
+  pthread_mutex_lock(&send_lock);
+  DEQ_INSERT_TAIL(out_messages, cdm);
+  pthread_mutex_unlock(&send_lock);
+
+  pn_message_free(message);
+
+  /* activate the sender */
+  if (conn != NULL) {
+    pn_connection_wake(conn);
+  }
+
+} /* }}} void encqueue */
+
+static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */
+{
+  amqp1_config_instance_t *instance;
+  int          status = 0;
+  size_t       bfree = BUFSIZE;
+  size_t       bfill = 0;
+  cd_message_t *cdm;
+  size_t       bufsize = BUFSIZE;
+
+  if ((n == NULL) || (user_data == NULL))
+    return EINVAL;
+
+  instance = user_data->data;
+
+  if (instance->notify != true) {
+    ERROR("amqp1 plugin: write notification failed");
+  }
+
+  cdm = NEW(cd_message_t);
+  DEQ_ITEM_INIT(cdm);
+  cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
+  cdm->instance = instance;
+
+  switch (instance->format) {
+  case AMQP1_FORMAT_JSON:
+    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
+    status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+    if (status != 0) {
+      ERROR("amqp1 plugin: formatting notification failed");
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  default:
+    ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+    return -1;
+  }
+
+  /* encode message and place on outbound queue */
+  encqueue(cdm, instance);
+
+  return 0;
+} /* }}} int amqp1_notify */
+
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
-                       user_data_t *user_data) 
+                       user_data_t *user_data)
 {
-  amqp1_config_instance_t *instance = user_data->data;
+  amqp1_config_instance_t *instance;
   int          status = 0;
   size_t       bfree = BUFSIZE;
   size_t       bfill = 0;
   cd_message_t *cdm;
   size_t       bufsize = BUFSIZE;
-  pn_data_t    *body;
-  pn_message_t *message;
 
-  if ((ds == NULL) || (vl == NULL) || (transport == NULL))
+  if ((ds == NULL) || (vl == NULL) || (transport == NULL) || (user_data == NULL))
     return EINVAL;
 
+  instance = user_data->data;
+
+  if (instance->notify != false) {
+    ERROR("amqp1 plugin: write failed");
+  }
+
   cdm = NEW(cd_message_t);
   DEQ_ITEM_INIT(cdm);
   cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
@@ -312,40 +392,20 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     cdm->mbuf.size = strlen(cdm->mbuf.start);
     break;
   default:
-    ERROR("amqp1 plugin: Invalid format (%i).", instance->format);
+    ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
     return -1;
   }
 
-  /* encode message */
-  message = pn_message();
-  pn_message_set_address(message, instance->send_to);
-  body = pn_message_body(message);
-  pn_data_clear(body);
-  pn_data_put_binary(body, cdm->mbuf);
-  pn_data_exit(body);
-
-  /* put_binary copies and stores so ok to use mbuf */
-  cdm->mbuf.size = bufsize;
-  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
-
-  pthread_mutex_lock(&send_lock);
-  DEQ_INSERT_TAIL(out_messages, cdm);
-  pthread_mutex_unlock(&send_lock);
-
-  pn_message_free(message);
-
-  /* activate the sender */
-  if (conn != NULL) {
-    pn_connection_wake(conn);
-  }
+  /* encode message and place on outboud queue */
+  encqueue(cdm, instance);
 
   return 0;
-} /* }}} int amqp_write1 */
+} /* }}} int amqp1_write */
 
 static void amqp1_config_transport_free(void *ptr) /* {{{ */
 {
   amqp1_config_transport_t *transport = ptr;
+
   if (transport == NULL)
     return;
 
@@ -361,7 +421,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */
 static void amqp1_config_instance_free(void *ptr) /* {{{ */
 {
   amqp1_config_instance_t *instance = ptr;
-  
+
   if (instance == NULL)
     return;
 
@@ -398,6 +458,8 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
     if (strcasecmp("PreSettle", child->key) == 0)
       status = cf_util_get_boolean(child, &instance->pre_settle);
+    else if (strcasecmp("Notify", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->notify);
     else if (strcasecmp("Format", child->key) == 0) {
       status = cf_util_get_string(child, &key);
       if (status != 0)
@@ -456,8 +518,14 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
              transport->address,instance->name);
-    status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
-            .data = instance, .free_func = amqp1_config_instance_free, });
+    if (instance->notify == true) {
+      status = plugin_register_notification(tpname, amqp1_notify, &(user_data_t) {
+              .data = instance, .free_func = amqp1_config_instance_free, });
+    } else {
+      status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
+              .data = instance, .free_func = amqp1_config_instance_free, });
+    }
+
     if (status != 0) {
       amqp1_config_instance_free(instance);
     }
@@ -465,11 +533,11 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
   return status;
 } /* }}} int amqp1_config_instance */
-  
+
 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 {
   int status=0;
-  
+
   transport = calloc(1, sizeof(*transport));
   if (transport == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
@@ -484,7 +552,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
     sfree(transport);
     return status;
   }
+
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
 
@@ -504,7 +572,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
       WARNING("amqp1 plugin: Ignoring unknown "
               "transport configuration option "
               "\%s\".", child->key);
-   
+
     if (status != 0)
       break;
   }
@@ -566,8 +634,7 @@ static int amqp1_init(void) /* {{{ */
   return 0;
 } /* }}} int amqp1_init */
 
-static int amqp1_shutdown
-(void) /* {{{ */
+static int amqp1_shutdown(void) /* {{{ */
 {
   cd_message_t *cdm;
 
index 2dca2f5..4d6c38e 100644 (file)
@@ -742,8 +742,9 @@ is preserved, i.e. passed through.
 
 The I<AMQP1 plugin> can be used to communicate with other instances of
 I<collectd> or third party applications using an AMQP 1.0 message
-intermediary. Values are sent to the messaging intermediary which
-may handle direct messaging or queue based transfer.
+intermediary. Metric values or notifications are sent to the
+messaging intermediary which may handle direct messaging or
+queue based transfer.
 
 B<Synopsis:>
 
@@ -758,6 +759,7 @@ B<Synopsis:>
     <Instance "some_name">
         Format "command"
         PreSettle false
+        Notify false
  #      StoreRates false
  #      GraphitePrefix "collectd."
  #      GraphiteEscapeChar "_"
@@ -770,8 +772,9 @@ B<Synopsis:>
 
 The plugin's configuration consists of a I<Transport> that configures
 communications to the AMQP 1.0 messaging bus and one or more I<Instance>
-corresponding to publishers to the messaging system. The address in
-the I<Transport> block concatenated with the name given int the
+corresponding to metric or event publishers to the messaging system.
+
+The address in the I<Transport> block concatenated with the name given in the
 I<Instance> block starting tag will be used as the send-to address for
 communications over the messaging link.
 
@@ -836,6 +839,13 @@ system. If set to B<true>, the plugin will not wait for a message
 acknowledgement and the message may be dropped prior to transfer of
 ownership.
 
+=item B<Notify> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will service the
+instance write call back as a value list. If set to B<true> the
+plugin will service the instance as a write notification callback
+for alert formatting.
+
 =item B<StoreRates> B<true>|B<false>
 
 Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources