Merge pull request #670 from pyr/feature-write-kafka
authorPierre-Yves Ritschard <pyr@spootnik.org>
Fri, 25 Jul 2014 13:34:16 +0000 (15:34 +0200)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Fri, 25 Jul 2014 13:34:16 +0000 (15:34 +0200)
Add a write_kafka output with similar properties to the amqp one.

1  2 
configure.ac
src/Makefile.am
src/collectd.conf.pod

diff --combined configure.ac
@@@ -981,7 -981,7 +981,7 @@@ if test "x$fp_layout_type" = "xunknown"
        uint8_t c[8];
        double d;
  
-       d = 8.642135e130; 
+       d = 8.642135e130;
        memcpy ((void *) &i0, (void *) &d, 8);
  
        i1 = i0;
@@@ -1036,7 -1036,7 +1036,7 @@@ if test "x$fp_layout_type" = "xunknown"
        uint8_t c[8];
        double d;
  
-       d = 8.642135e130; 
+       d = 8.642135e130;
        memcpy ((void *) &i0, (void *) &d, 8);
  
        i1 = endianflip (i0);
@@@ -1085,7 -1085,7 +1085,7 @@@ if test "x$fp_layout_type" = "xunknown"
        uint8_t c[8];
        double d;
  
-       d = 8.642135e130; 
+       d = 8.642135e130;
        memcpy ((void *) &i0, (void *) &d, 8);
  
        i1 = intswap (i0);
@@@ -1237,7 -1237,7 +1237,7 @@@ AC_MSG_CHECKING([if have htonll defined
        have_htonll="yes"
        AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.])
      ])
-  
  AC_MSG_RESULT([$have_htonll])
  
  # Check for structures
@@@ -1380,7 -1380,7 +1380,7 @@@ collectd additional packages:]
  
  AM_CONDITIONAL([BUILD_FREEBSD],[test "x$x$ac_system" = "xFreeBSD"])
  
- AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) 
+ AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"])
  
  if test "x$ac_system" = "xAIX"
  then
@@@ -2794,7 -2794,7 +2794,7 @@@ the
        else
                SAVE_CPPFLAGS="$CPPFLAGS"
                CPPFLAGS="$CPPFLAGS $with_snmp_cflags"
-               
                AC_CHECK_HEADERS(net-snmp/net-snmp-config.h, [], [with_libnetsnmp="no (net-snmp/net-snmp-config.h not found)"])
  
                CPPFLAGS="$SAVE_CPPFLAGS"
@@@ -3021,7 -3021,7 +3021,7 @@@ if test "x$with_libowcapi" = "xyes
  then
        SAVE_CPPFLAGS="$CPPFLAGS"
        CPPFLAGS="$with_libowcapi_cppflags"
-       
        AC_CHECK_HEADERS(owcapi.h, [with_libowcapi="yes"], [with_libowcapi="no (owcapi.h not found)"])
  
        CPPFLAGS="$SAVE_CPPFLAGS"
@@@ -3032,7 -3032,7 +3032,7 @@@ the
        SAVE_CPPFLAGS="$CPPFLAGS"
        LDFLAGS="$with_libowcapi_libs"
        CPPFLAGS="$with_libowcapi_cppflags"
-       
        AC_CHECK_LIB(owcapi, OW_get, [with_libowcapi="yes"], [with_libowcapi="no (libowcapi not found)"])
  
        LDFLAGS="$SAVE_LDFLAGS"
@@@ -3597,6 -3597,49 +3597,49 @@@ LDFLAGS="$SAVE_LDFLAGS
  AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
  # }}}
  
+ # --with-librdkafka {{{
+ AC_ARG_WITH(librdkafka, [AS_HELP_STRING([--with-librdkafka@<:@=PREFIX@:>@], [Path to librdkafka.])],
+ [
+   if test "x$withval" = "xno" && test "x$withval" != "xyes"
+   then
+     with_librdkafka_cppflags="-I$withval/include"
+     with_librdkafka_ldflags="-L$withval/lib"
+     with_librdkafka="yes"
+   else
+     with_librdkafka="$withval"
+   fi
+ ],
+ [
+   with_librdkafka="yes"
+ ])
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ if test "x$with_librdkafka" = "xyes"
+ then
+       AC_CHECK_HEADERS(librdkafka/rdkafka.h, [with_librdkafka="yes"], [with_librdkafka="no (librdkafka/rdkafka.h not found)"])
+ fi
+ if test "x$with_librdkafka" = "xyes"
+ then
+       AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"])
+ fi
+ if test "x$with_librdkafka" = "xyes"
+ then
+       BUILD_WITH_LIBRDKAFKA_CPPFLAGS="$with_librdkafka_cppflags"
+       BUILD_WITH_LIBRDKAFKA_LDFLAGS="$with_librdkafka_ldflags"
+       BUILD_WITH_LIBRDKAFKA_LIBS="-lrdkafka"
+       AC_SUBST(BUILD_WITH_LIBRDKAFKA_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS)
+       AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.])
+ fi
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+ AM_CONDITIONAL(BUILD_WITH_LIBRDKAFKA, test "x$with_librdkafka" = "xyes")
+ # }}}
  # --with-librouteros {{{
  AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])],
  [
@@@ -4039,7 -4082,7 +4082,7 @@@ CPPFLAGS="$SAVE_CPPFLAGS
  LDFLAGS="$SAVE_LDFLAGS"
  
  if test "x$with_libtokyotyrant" = "xyes"
- then 
+ then
    BUILD_WITH_LIBTOKYOTYRANT_CPPFLAGS="$with_libtokyotyrant_cppflags"
    BUILD_WITH_LIBTOKYOTYRANT_LDFLAGS="$with_libtokyotyrant_ldflags"
    BUILD_WITH_LIBTOKYOTYRANT_LIBS="$with_libtokyotyrant_libs"
  AM_CONDITIONAL(BUILD_WITH_LIBTOKYOTYRANT, test "x$with_libtokyotyrant" = "xyes")
  # }}}
  
 +# --with-libudev {{{
 +with_libudev_cflags=""
 +with_libudev_ldflags=""
 +AC_ARG_WITH(libudev, [AS_HELP_STRING([--with-libudev@<:@=PREFIX@:>@], [Path to libudev.])],
 +[
 +      if test "x$withval" = "xno"
 +      then
 +              with_libudev="no"
 +      else
 +              with_libudev="yes"
 +              if test "x$withval" != "xyes"
 +              then
 +                      with_libudev_cflags="-I$withval/include"
 +                      with_libudev_ldflags="-L$withval/lib"
 +                      with_libudev="yes"
 +              fi
 +      fi
 +],
 +[
 +      if test "x$ac_system" = "xLinux"
 +      then
 +              with_libudev="yes"
 +      else
 +              with_libudev="no (Linux only library)"
 +      fi
 +])
 +if test "x$with_libudev" = "xyes"
 +then
 +      SAVE_CPPFLAGS="$CPPFLAGS"
 +      CPPFLAGS="$CPPFLAGS $with_libudev_cflags"
 +
 +      AC_CHECK_HEADERS(libudev.h, [], [with_libudev="no (libudev.h not found)"])
 +
 +      CPPFLAGS="$SAVE_CPPFLAGS"
 +fi
 +if test "x$with_libudev" = "xyes"
 +then
 +      SAVE_CPPFLAGS="$CPPFLAGS"
 +      SAVE_LDFLAGS="$LDFLAGS"
 +      CPPFLAGS="$CPPFLAGS $with_libudev_cflags"
 +      LDFLAGS="$LDFLAGS $with_libudev_ldflags"
 +
 +      AC_CHECK_LIB(udev, udev_new,
 +      [
 +              AC_DEFINE(HAVE_LIBUDEV, 1, [Define to 1 if you have the udev library (-ludev).])
 +      ],
 +      [with_libudev="no (libudev not found)"])
 +
 +      CPPFLAGS="$SAVE_CPPFLAGS"
 +      LDFLAGS="$SAVE_LDFLAGS"
 +fi
 +if test "x$with_libudev" = "xyes"
 +then
 +      BUILD_WITH_LIBUDEV_CFLAGS="$with_libudev_cflags"
 +      BUILD_WITH_LIBUDEV_LDFLAGS="$with_libudev_ldflags"
 +      AC_SUBST(BUILD_WITH_LIBUDEV_CFLAGS)
 +      AC_SUBST(BUILD_WITH_LIBUDEV_LDFLAGS)
 +fi
 +AM_CONDITIONAL(BUILD_WITH_LIBUDEV, test "x$with_libudev" = "xyes")
 +# }}}
 +
  # --with-libupsclient {{{
  with_libupsclient_config=""
  with_libupsclient_cflags=""
@@@ -5284,6 -5266,7 +5327,7 @@@ AC_PLUGIN([vserver],     [$plugin_vserv
  AC_PLUGIN([wireless],    [$plugin_wireless],   [Wireless statistics])
  AC_PLUGIN([write_graphite], [yes],             [Graphite / Carbon output plugin])
  AC_PLUGIN([write_http],  [$with_libcurl],      [HTTP output plugin])
+ AC_PLUGIN([write_kafka],  [$with_librdkafka],  [Kafka output plugin])
  AC_PLUGIN([write_mongodb], [$with_libmongoc],  [MongoDB output plugin])
  AC_PLUGIN([write_redis], [$with_libcredis],    [Redis output plugin])
  AC_PLUGIN([write_riemann], [$have_protoc_c],   [Riemann output plugin])
@@@ -5488,13 -5471,13 +5532,14 @@@ Configuration
      libpq . . . . . . . . $with_libpq
      libpthread  . . . . . $with_libpthread
      librabbitmq . . . . . $with_librabbitmq
+     librdkafka  . . . . . $with_librdkafka
      librouteros . . . . . $with_librouteros
      librrd  . . . . . . . $with_librrd
      libsensors  . . . . . $with_libsensors
      libsigrok   . . . . . $with_libsigrok
      libstatgrab . . . . . $with_libstatgrab
      libtokyotyrant  . . . $with_libtokyotyrant
 +    libudev . . . . . . . $with_libudev
      libupsclient  . . . . $with_libupsclient
      libvarnish  . . . . . $with_libvarnish
      libvirt . . . . . . . $with_libvirt
      wireless  . . . . . . $enable_wireless
      write_graphite  . . . $enable_write_graphite
      write_http  . . . . . $enable_write_http
+     write_kafka . . . . . $enable_write_kafka
      write_mongodb . . . . $enable_write_mongodb
      write_redis . . . . . $enable_write_redis
      write_riemann . . . . $enable_write_riemann
diff --combined src/Makefile.am
@@@ -39,9 -39,7 +39,9 @@@ collectd_SOURCES = collectd.c collectd.
                   utils_subst.c utils_subst.h \
                   utils_tail.c utils_tail.h \
                   utils_time.c utils_time.h \
 -                 types_list.c types_list.h
 +                 types_list.c types_list.h \
 +                 utils_threshold.c utils_threshold.h
 +
  
  collectd_CPPFLAGS =  $(AM_CPPFLAGS) $(LTDLINCL)
  collectd_CFLAGS = $(AM_CFLAGS)
@@@ -387,9 -385,6 +387,9 @@@ if BUILD_WITH_LIBSTATGRA
  disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS)
  disk_la_LIBADD += $(BUILD_WITH_LIBSTATGRAB_LDFLAGS)
  endif
 +if BUILD_WITH_LIBUDEV
 +disk_la_LIBADD += -ludev
 +endif
  if BUILD_WITH_PERFSTAT
  disk_la_LIBADD += -lperfstat
  endif
@@@ -1392,6 -1387,19 +1392,19 @@@ endi
  collectd_DEPENDENCIES += write_http.la
  endif
  
+ if BUILD_PLUGIN_WRITE_KAFKA
+ pkglib_LTLIBRARIES += write_kafka.la
+ write_kafka_la_SOURCES = write_kafka.c \
+                         utils_format_graphite.c utils_format_graphite.h \
+                         utils_format_json.c utils_format_json.h \
+                         utils_cmd_putval.c utils_cmd_putval.h \
+                         utils_crc32.c utils_crc32.h
+ write_kafka_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
+ write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS)
+ collectd_LDADD += "-dlopen" write_kafka.la
+ collectd_DEPENDENCIES += write_kafka.la
+ endif
  if BUILD_PLUGIN_WRITE_MONGODB
  pkglib_LTLIBRARIES += write_mongodb.la
  write_mongodb_la_SOURCES = write_mongodb.c
@@@ -1414,7 -1422,7 +1427,7 @@@ endi
  
  if BUILD_PLUGIN_WRITE_RIEMANN
  pkglib_LTLIBRARIES += write_riemann.la
 -write_riemann_la_SOURCES = write_riemann.c
 +write_riemann_la_SOURCES = write_riemann.c write_riemann_threshold.c
  nodist_write_riemann_la_SOURCES = riemann.pb-c.c riemann.pb-c.h
  write_riemann_la_LDFLAGS = -module -avoid-version
  write_riemann_la_LIBADD = -lprotobuf-c
diff --combined src/collectd.conf.pod
@@@ -1688,20 -1688,6 +1688,20 @@@ collected. If at least one B<Disk> opti
  set to B<false>, B<only> matching disks will be collected. If B<IgnoreSelected>
  is set to B<true>, all disks are collected B<except> the ones matched.
  
 +=item B<UseBSDName> B<true>|B<false>
 +
 +Whether to use the device's "BSD Name", on MacE<nbsp>OSE<nbsp>X, instead of the
 +default major/minor numbers. Requires collectd to be built with Apple's
 +IOKitLib support.
 +
 +=item B<UdevNameAttr> I<Attribute>
 +
 +Attempt to override disk instance name with the value of a specified udev
 +attribute when built with B<libudev>.  If the attribute is not defined for the
 +given device, the default name is used. Example:
 +
 +  UdevNameAttr "DM_NAME"
 +
  =back
  
  =head2 Plugin C<dns>
@@@ -6380,6 -6366,111 +6380,111 @@@ number
  
  =back
  
+ =head2 Plugin C<write_kafka>
+ The I<write_kafka plugin> will send values to a I<Kafka> topic, a distributed
+ queue.
+ Synopsis:
+  <Plugin "write_kafka">
+    Property "metadata.broker.list" "broker1:9092,broker2:9092"
+    <Topic "collectd">
+      Format JSON
+    </Topic>
+  </Plugin>
+ The following options are understood by the I<write_kafka plugin>:
+ =over 4
+ =item E<lt>B<Topic> I<Name>E<gt>
+ The plugin's configuration consists of one or more B<Topic> blocks. Each block
+ is given a unique I<Name> and specifies one kafka producer.
+ Inside the B<Topic> block, the following per-topic options are
+ understood:
+ =over 4
+ =item B<Property> I<String> I<String>
+ Configure the named property for the current topic. Properties are
+ forwarded to the kafka producer library B<librdkafka>.
+ =item B<Key> I<String>
+ Use the specified string as a partioning key for the topic. Kafka breaks
+ topic into partitions and guarantees that for a given topology, the same
+ consumer will be used for a specific key. The special (case insensitive)
+ string B<Random> can be used to specify that an arbitrary partition should
+ be used.
+ =item B<Format> B<Command>|B<JSON>|B<Graphite>
+ Selects the format in which messages are sent to the broker. If set to
+ B<Command> (the default), values are sent as C<PUTVAL> commands which are
+ identical to the syntax used by the I<Exec> and I<UnixSock plugins>.
+ If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+ an easy and straight forward exchange format.
+ If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+ "<metric> <value> <timestamp>\n".
+ =item B<StoreRates> B<true>|B<false>
+ Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+ are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+ default), no conversion is performed. Otherwise the conversion is performed
+ using the internal value cache.
+ Please note that currently this option is only used if the B<Format> option has
+ been set to B<JSON>.
+ =item B<GraphitePrefix> (B<Format>=I<Graphite> only)
+ A prefix can be added in the metric name when outputting in the I<Graphite> format.
+ It's added before the I<Host> name.
+ Metric name will be "<prefix><host><postfix><plugin><type><name>"
+ =item B<GraphitePostfix> (B<Format>=I<Graphite> only)
+ A postfix can be added in the metric name when outputting in the I<Graphite> format.
+ It's added after the I<Host> name.
+ Metric name will be "<prefix><host><postfix><plugin><type><name>"
+ =item B<GraphiteEscapeChar> (B<Format>=I<Graphite> only)
+ Specify a character to replace dots (.) in the host part of the metric name.
+ In I<Graphite> metric name, dots are used as separators between different
+ metric parts (host, plugin, type).
+ Default is "_" (I<Underscore>).
+ =item B<GraphiteSeparateInstances> B<false>|B<true>
+ If set to B<true>, the plugin instance and type instance will be in their own
+ path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+ default), the plugin and plugin instance (and likewise the type and type
+ instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+ =item B<StoreRates> B<true>|B<false>
+ If set to B<true> (the default), convert counter values to rates. If set to
+ B<false> counter values are stored as is, i.e. as an increasing integer number.
+ This will be reflected in the C<ds_type> tag: If B<StoreRates> is enabled,
+ converted values will have "rate" appended to the data source type, e.g.
+ C<ds_type:derive:rate>.
+ =back
+ =item B<Property> I<String> I<String>
+ Configure the kafka producer through properties, you almost always will
+ want to set B<metadata.broker.list> to your Kafka broker list.
+ =back
  =head2 Plugin C<write_riemann>
  
  The I<write_riemann plugin> will send values to I<Riemann>, a powerfull stream
@@@ -6452,17 -6543,6 +6557,17 @@@ interval is multiplied to set the TTL. 
  know exactly what you're doing, you should only increase this setting from its
  default value.
  
 +=item B<Notifications> B<false>|B<true>
 +
 +If set to B<true>, create riemann events for notifications. This is B<true>
 +by default. When processing thresholds from write_riemann, it might prove
 +useful to avoid getting notification events.
 +
 +=item B<CheckThresholds> B<false>|B<true>
 +
 +If set to B<true>, attach state to events based on thresholds defined
 +in the B<Threshold> plugin. Defaults to B<false>.
 +
  =back
  
  =item B<Tag> I<String>