+ if (instance->notify != true) {
+ ERROR("amqp1 plugin: write notification failed");
+ }
+
+ cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL ){
+ ERROR("amqp1 plugin: notify failed");
+ return -1;
+ }
+
+ DEQ_ITEM_INIT(cdm);
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
+ cdm->instance = instance;
+
+ switch (instance->format) {
+ case AMQP1_FORMAT_JSON:
+ format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+ status = format_json_notification(cdm->mbuf.start, bufsize, n);
+ if (status != 0) {
+ ERROR("amqp1 plugin: formatting notification failed");
+ cd_message_free(cdm);
+ return status;
+ }
+ cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: notify format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
+ break;
+ default:
+ ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+ cd_message_free(cdm);
+ return -1;
+ }
+
+ /* encode message and place on outbound queue */
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: notify enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
+
+} /* }}} int amqp1_notify */
+
+static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ user_data_t *user_data) {
+ int status = 0;
+ size_t bfree = BUFSIZE;
+ size_t bfill = 0;
+ size_t bufsize = BUFSIZE;
+
+ if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)