From: Florian Forster Date: Tue, 2 May 2017 08:49:12 +0000 (+0200) Subject: libcollectdclient: Introduce a "parser" callback. X-Git-Tag: collectd-5.8.0~102^2~14 X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=8059545db364a1aa722535da2c1939529ea28a68 libcollectdclient: Introduce a "parser" callback. This will allow the network plugin to enqueue received network packets and parse them using a thread pool. --- diff --git a/src/libcollectdclient/collectd/server.h b/src/libcollectdclient/collectd/server.h index 90c1c0b7..48b58744 100644 --- a/src/libcollectdclient/collectd/server.h +++ b/src/libcollectdclient/collectd/server.h @@ -28,13 +28,20 @@ #include "collectd/lcc_features.h" +#include "collectd/network.h" /* for lcc_security_level_t */ +#include "collectd/network_parse.h" /* for lcc_network_parse_options_t */ #include "collectd/types.h" -#include "collectd/network.h" /* for lcc_security_level_t */ #include LCC_BEGIN_DECLS +/* lcc_network_parser_t is a callback that parses received network packets. It + * is expected to call lcc_network_parse_options_t.writer with each + * lcc_value_list_t it parses that has the required security level. */ +typedef int (*lcc_network_parser_t)(void *payload, size_t payload_size, + lcc_network_parse_options_t opts); + /* lcc_listener_t holds parameters for running a collectd server. */ typedef struct { /* conn is a UDP socket for the server to listen on. */ @@ -48,6 +55,10 @@ typedef struct { * LCC_DEFAULT_PORT. */ char *service; + /* parser is the callback used to parse incoming network packets. Defaults to + * lcc_network_parse() if set to NULL. */ + lcc_network_parser_t parser; + /* writer is the callback used to send incoming lcc_value_list_t to. */ lcc_value_list_writer_t writer; diff --git a/src/libcollectdclient/server.c b/src/libcollectdclient/server.c index 4a414bfa..68e4a9ea 100644 --- a/src/libcollectdclient/server.c +++ b/src/libcollectdclient/server.c @@ -189,6 +189,9 @@ int lcc_listen_and_write(lcc_listener_t srv) { /* TODO(octo): this should be a define. */ srv.buffer_size = 1452; + if (srv.parser == NULL) + srv.parser = lcc_network_parse; + int ret = 0; while (42) { char buffer[srv.buffer_size]; @@ -200,13 +203,12 @@ int lcc_listen_and_write(lcc_listener_t srv) { break; } - /* TODO(octo): implement parse(). */ - (void)lcc_network_parse(buffer, (size_t)len, - (lcc_network_parse_options_t){ - .writer = srv.writer, - .password_lookup = srv.password_lookup, - .security_level = srv.security_level, - }); + (void)srv.parser(buffer, (size_t)len, + (lcc_network_parse_options_t){ + .writer = srv.writer, + .password_lookup = srv.password_lookup, + .security_level = srv.security_level, + }); } if (close_socket) {