- for (i = 0; i < ci->children_num; i++) {
- /*
- * The code here could be simplified but makes room
- * for easy adding of new options later on.
- */
- child = &ci->children[i];
- status = 0;
-
- if (strcasecmp ("Property", child->key) == 0) {
- if (child->values_num != 2) {
- WARNING("kafka properties need both a key and a value.");
- goto errout;
- }
- if (child->values[0].type != OCONFIG_TYPE_STRING ||
- child->values[1].type != OCONFIG_TYPE_STRING) {
- WARNING("kafka properties needs string arguments.");
- goto errout;
- }
- key = child->values[0].value.string;
- val = child->values[1].value.string;
- ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
- errbuf, sizeof(errbuf));
- if (ret != RD_KAFKA_CONF_OK) {
- WARNING("cannot set kafka topic property %s to %s: %s.",
- key, val, errbuf);
- goto errout;
- }
-
- } else if (strcasecmp ("Key", child->key) == 0) {
- char *tmp_buf = NULL;
- status = cf_util_get_string(child, &tmp_buf);
- if (status != 0) {
- WARNING("write_kafka plugin: invalid key supplied");
- break;
- }
-
- if (strcasecmp(tmp_buf, "Random") != 0) {
- tctx->has_key = 1;
- tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
- }
- sfree(tmp_buf);
-
- } else if (strcasecmp ("Format", child->key) == 0) {
- status = cf_util_get_string(child, &key);
- if (status != 0)
- goto errout;
-
- assert(key != NULL);
-
- if (strcasecmp(key, "Command") == 0) {
- tctx->format = KAFKA_FORMAT_COMMAND;
-
- } else if (strcasecmp(key, "Graphite") == 0) {
- tctx->format = KAFKA_FORMAT_GRAPHITE;
-
- } else if (strcasecmp(key, "Json") == 0) {
- tctx->format = KAFKA_FORMAT_JSON;
-
- } else {
- WARNING ("write_kafka plugin: Invalid format string: %s",
- key);
- }
-
- sfree(key);
-
- } else if (strcasecmp ("StoreRates", child->key) == 0) {
- status = cf_util_get_boolean (child, &tctx->store_rates);
- (void) cf_util_get_flag (child, &tctx->graphite_flags,
- GRAPHITE_STORE_RATES);
-
- } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) {
- status = cf_util_get_flag (child, &tctx->graphite_flags,
- GRAPHITE_SEPARATE_INSTANCES);
-
- } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) {
- status = cf_util_get_flag (child, &tctx->graphite_flags,
- GRAPHITE_ALWAYS_APPEND_DS);
-
- } else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
- status = cf_util_get_string (child, &tctx->prefix);
- } else if (strcasecmp ("GraphitePostfix", child->key) == 0) {
- status = cf_util_get_string (child, &tctx->postfix);
- } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) {
- char *tmp_buff = NULL;
- status = cf_util_get_string (child, &tmp_buff);
- if (strlen (tmp_buff) > 1)
- WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles "
- "only one character. Others will be ignored.");
- tctx->escape_char = tmp_buff[0];
- sfree (tmp_buff);
- } else {
- WARNING ("write_kafka plugin: Invalid directive: %s.", child->key);
- }
-
- if (status != 0)
- break;
- }