X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Funixsock.c;h=8524beadd6fed8a30349179555ecd95e52f104e8;hb=fc52d7f0d23b06d3439e8dd0fbc36260d5618f19;hp=17956a177fa7486ec329a156fa4dd181387cd354;hpb=838af4cdc6c8674ed3e14a95fea172118c707a85;p=collectd.git diff --git a/src/unixsock.c b/src/unixsock.c index 17956a17..8524bead 100644 --- a/src/unixsock.c +++ b/src/unixsock.c @@ -67,6 +67,8 @@ static const char *config_keys[] = }; static int config_keys_num = 3; +static int loop = 0; + /* socket configuration */ static int sock_fd = -1; static char *sock_file = NULL; @@ -83,6 +85,52 @@ static unsigned int cache_oldest = UINT_MAX; /* * Functions */ +static int parse_identifier (char *str, char **ret_host, + char **ret_plugin, char **ret_plugin_instance, + char **ret_type, char **ret_type_instance) +{ + char *hostname = NULL; + char *plugin = NULL; + char *plugin_instance = NULL; + char *type = NULL; + char *type_instance = NULL; + + hostname = str; + if (hostname == NULL) + return (-1); + + plugin = strchr (hostname, '/'); + if (plugin == NULL) + return (-1); + *plugin = '\0'; plugin++; + + type = strchr (plugin, '/'); + if (type == NULL) + return (-1); + *type = '\0'; type++; + + plugin_instance = strchr (plugin, '-'); + if (plugin_instance != NULL) + { + *plugin_instance = '\0'; + plugin_instance++; + } + + type_instance = strchr (type, '-'); + if (type_instance != NULL) + { + *type_instance = '\0'; + type_instance++; + } + + *ret_host = hostname; + *ret_plugin = plugin; + *ret_plugin_instance = plugin_instance; + *ret_type = type; + *ret_type_instance = type_instance; + return (0); +} /* int parse_identifier */ + static value_cache_t *cache_search (const char *name) { value_cache_t *vc; @@ -96,65 +144,34 @@ static value_cache_t *cache_search (const char *name) return (vc); } /* value_cache_t *cache_search */ -static int cache_alloc_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance) -{ - int status; - - assert (plugin != NULL); - assert (type != NULL); - - if ((plugin_instance == NULL) || (strlen (plugin_instance) == 0)) - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s/%s", - hostname, plugin, type); - else - status = snprintf (ret, ret_len, "%s/%s/%s-%s", - hostname, plugin, type, type_instance); - } - else - { - if ((type_instance == NULL) || (strlen (type_instance) == 0)) - status = snprintf (ret, ret_len, "%s/%s-%s/%s", - hostname, plugin, plugin_instance, type); - else - status = snprintf (ret, ret_len, "%s/%s-%s/%s-%s", - hostname, plugin, plugin_instance, type, type_instance); - } - - if ((status < 1) || (status >= ret_len)) - return (-1); - return (0); -} /* int cache_alloc_name */ - static int cache_insert (const data_set_t *ds, const value_list_t *vl) { /* We're called from `cache_update' so we don't need to lock the mutex */ value_cache_t *vc; int i; - DEBUG ("ds->ds_num = %i; vl->values_len = %i;", + DEBUG ("unixsock plugin: cache_insert: ds->ds_num = %i;" + " vl->values_len = %i;", ds->ds_num, vl->values_len); assert (ds->ds_num == vl->values_len); vc = (value_cache_t *) malloc (sizeof (value_cache_t)); if (vc == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); ERROR ("unixsock plugin: malloc failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } vc->gauge = (gauge_t *) malloc (sizeof (gauge_t) * vl->values_len); if (vc->gauge == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); ERROR ("unixsock plugin: malloc failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc); return (-1); } @@ -162,20 +179,19 @@ static int cache_insert (const data_set_t *ds, const value_list_t *vl) vc->counter = (counter_t *) malloc (sizeof (counter_t) * vl->values_len); if (vc->counter == NULL) { + char errbuf[1024]; pthread_mutex_unlock (&cache_lock); ERROR ("unixsock plugin: malloc failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); free (vc->gauge); free (vc); return (-1); } - if (cache_alloc_name (vc->name, sizeof (vc->name), - vl->host, vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (vc->name, sizeof (vc->name), vl, ds)) { pthread_mutex_unlock (&cache_lock); - ERROR ("unixsock plugin: cache_alloc_name failed."); + ERROR ("unixsock plugin: FORMAT_VL failed."); free (vc->counter); free (vc->gauge); free (vc); @@ -220,10 +236,7 @@ static int cache_update (const data_set_t *ds, const value_list_t *vl) value_cache_t *vc; int i; - if (cache_alloc_name (name, sizeof (name), - vl->host, - vl->plugin, vl->plugin_instance, - ds->type, vl->type_instance) != 0) + if (FORMAT_VL (name, sizeof (name), vl, ds) != 0) return (-1); pthread_mutex_lock (&cache_lock); @@ -351,8 +364,9 @@ static int us_open_socket (void) sock_fd = socket (PF_UNIX, SOCK_STREAM, 0); if (sock_fd < 0) { + char errbuf[1024]; ERROR ("unixsock plugin: socket failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -365,10 +379,10 @@ static int us_open_socket (void) status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { - DEBUG ("bind failed: %s; sa.sun_path = %s", - strerror (errno), sa.sun_path); - ERROR ("unixsock plugin: bind failed: %s", - strerror (errno)); + char errbuf[1024]; + sstrerror (errno, errbuf, sizeof (errbuf)); + DEBUG ("bind failed: %s; sa.sun_path = %s", errbuf, sa.sun_path); + ERROR ("unixsock plugin: bind failed: %s", errbuf); close (sock_fd); sock_fd = -1; return (-1); @@ -377,8 +391,9 @@ static int us_open_socket (void) status = listen (sock_fd, 8); if (status != 0) { + char errbuf[1024]; ERROR ("unixsock plugin: listen failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; return (-1); @@ -397,8 +412,9 @@ static int us_open_socket (void) status = getgrnam_r (grpname, &sg, grbuf, sizeof (grbuf), &g); if (status != 0) { - WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", - grpname, strerror (status)); + char errbuf[1024]; + WARNING ("unixsock plugin: getgrnam_r (%s) failed: %s", grpname, + sstrerror (errno, errbuf, sizeof (errbuf))); break; } if (g == NULL) @@ -411,10 +427,11 @@ static int us_open_socket (void) if (chown ((sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (uid_t) -1, g->gr_gid) != 0) { + char errbuf[1024]; WARNING ("unixsock plugin: chown (%s, -1, %i) failed: %s", (sock_file != NULL) ? sock_file : US_DEFAULT_PATH, (int) g->gr_gid, - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); } } while (0); @@ -423,7 +440,7 @@ static int us_open_socket (void) static int us_handle_getval (FILE *fh, char **fields, int fields_num) { - char *hostname = fields[1]; + char *hostname; char *plugin; char *plugin_instance; char *type; @@ -434,34 +451,29 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) int i; if (fields_num != 2) - return (-1); - - plugin = strchr (hostname, '/'); - if (plugin == NULL) - return (-1); - *plugin = '\0'; plugin++; - - type = strchr (plugin, '/'); - if (type == NULL) - return (-1); - *type = '\0'; type++; - - plugin_instance = strchr (plugin, '-'); - if (plugin_instance != NULL) { - *plugin_instance = '\0'; - plugin_instance++; + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 2.\n", + fields_num); + fflush (fh); + return (-1); } + DEBUG ("unixsock plugin: Got query for `%s'", fields[1]); - type_instance = strchr (type, '-'); - if (type_instance != NULL) + status = parse_identifier (fields[1], &hostname, + &plugin, &plugin_instance, + &type, &type_instance); + if (status != 0) { - *type_instance = '\0'; - type_instance++; + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); + return (-1); } - status = cache_alloc_name (name, sizeof (name), + status = format_name (name, sizeof (name), hostname, plugin, plugin_instance, type, type_instance); + /* FIXME: Send some response */ if (status != 0) return (-1); @@ -498,6 +510,141 @@ static int us_handle_getval (FILE *fh, char **fields, int fields_num) return (0); } /* int us_handle_getval */ +static int us_handle_putval (FILE *fh, char **fields, int fields_num) +{ + char *hostname; + char *plugin; + char *plugin_instance; + char *type; + char *type_instance; + int status; + int i; + + const data_set_t *ds; + value_list_t vl = VALUE_LIST_INIT; + + char **value_ptr; + + if (fields_num != 3) + { + DEBUG ("unixsock plugin: Wrong number of fields: %i", fields_num); + fprintf (fh, "-1 Wrong number of fields: Got %i, expected 3.\n", + fields_num); + fflush (fh); + return (-1); + } + + status = parse_identifier (fields[1], &hostname, + &plugin, &plugin_instance, + &type, &type_instance); + if (status != 0) + { + DEBUG ("unixsock plugin: Cannot parse `%s'", fields[1]); + fprintf (fh, "-1 Cannot parse identifier.\n"); + fflush (fh); + return (-1); + } + + /* FIXME: Send some response */ + if ((strlen (hostname) > sizeof (vl.host)) + || (strlen (plugin) > sizeof (vl.plugin)) + || ((plugin_instance != NULL) + && (strlen (plugin_instance) > sizeof (vl.plugin_instance))) + || ((type_instance != NULL) + && (strlen (type_instance) > sizeof (vl.type_instance)))) + return (-1); + + strcpy (vl.host, hostname); + strcpy (vl.plugin, plugin); + if (plugin_instance != NULL) + strcpy (vl.plugin_instance, plugin_instance); + if (type_instance != NULL) + strcpy (vl.type_instance, type_instance); + + { /* parse the time */ + char *t = fields[2]; + char *v = strchr (t, ':'); + if (v == NULL) + return (-1); + *v = '\0'; v++; + + vl.time = (time_t) atoi (t); + if (vl.time == 0) + vl.time = time (NULL); + + fields[2] = v; + } + + ds = plugin_get_ds (type); + if (ds == NULL) + return (-1); + + value_ptr = (char **) calloc (ds->ds_num, sizeof (char *)); + /* FIXME: Send some response */ + if (value_ptr == NULL) + return (-1); + + + { /* parse the value-list. It's colon-separated. */ + char *dummy; + char *ptr; + char *saveptr; + + i = 0; + dummy = fields[2]; + saveptr = NULL; + while ((ptr = strtok_r (dummy, ":", &saveptr)) != NULL) + { + dummy = NULL; + if (i >= ds->ds_num) + { + i = ds->ds_num + 1; + break; + } + value_ptr[i] = ptr; + i++; + } + + if (i != ds->ds_num) + { + sfree (value_ptr); + /* FIXME: Send some response */ + return (-1); + } + } /* done parsing the value-list */ + + vl.values_len = ds->ds_num; + vl.values = (value_t *) malloc (vl.values_len * sizeof (value_t)); + if (vl.values == NULL) + { + sfree (value_ptr); + return (-1); + } + DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); + + for (i = 0; i < ds->ds_num; i++) + { + if (strcmp (value_ptr[i], "U") == 0) + vl.values[i].gauge = NAN; + else if (ds->ds[i].type == DS_TYPE_COUNTER) + vl.values[i].counter = atoll (value_ptr[i]); + else if (ds->ds[i].type == DS_TYPE_GAUGE) + vl.values[i].gauge = atof (value_ptr[i]); + } /* for (i = 2 .. fields_num) */ + + plugin_dispatch_values (type, &vl); + + DEBUG ("value_ptr = 0x%p; vl.values = 0x%p;", (void *) value_ptr, (void *) vl.values); + + sfree (value_ptr); + sfree (vl.values); + + fprintf (fh, "0 Success\n"); + fflush (fh); + + return (0); +} /* int us_handle_putval */ + static void *us_handle_client (void *arg) { int fd; @@ -515,8 +662,9 @@ static void *us_handle_client (void *arg) fh = fdopen (fd, "r+"); if (fh == NULL) { + char errbuf[1024]; ERROR ("unixsock plugin: fdopen failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); close (fd); pthread_exit ((void *) 1); } @@ -548,6 +696,10 @@ static void *us_handle_client (void *arg) { us_handle_getval (fh, fields, fields_num); } + else if (strcasecmp (fields[0], "putval") == 0) + { + us_handle_putval (fh, fields, fields_num); + } else { fprintf (fh, "Unknown command: %s\n", fields[0]); @@ -571,17 +723,19 @@ static void *us_server_thread (void *arg) if (us_open_socket () != 0) pthread_exit ((void *) 1); - while (42) + while (loop != 0) { DEBUG ("Calling accept.."); status = accept (sock_fd, NULL, NULL); if (status < 0) { + char errbuf[1024]; + if (errno == EINTR) continue; ERROR ("unixsock plugin: accept failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); close (sock_fd); sock_fd = -1; pthread_exit ((void *) 1); @@ -590,8 +744,9 @@ static void *us_server_thread (void *arg) remote_fd = (int *) malloc (sizeof (int)); if (remote_fd == NULL) { + char errbuf[1024]; WARNING ("unixsock plugin: malloc failed: %s", - strerror (errno)); + sstrerror (errno, errbuf, sizeof (errbuf))); close (status); continue; } @@ -605,13 +760,17 @@ static void *us_server_thread (void *arg) status = pthread_create (&th, &th_attr, us_handle_client, (void *) remote_fd); if (status != 0) { + char errbuf[1024]; WARNING ("unixsock plugin: pthread_create failed: %s", - strerror (status)); + sstrerror (errno, errbuf, sizeof (errbuf))); close (*remote_fd); free (remote_fd); continue; } - } /* while (42) */ + } /* while (loop) */ + + close (sock_fd); + sock_fd = -1; return ((void *) 0); } /* void *us_server_thread */ @@ -644,11 +803,14 @@ static int us_init (void) { int status; + loop = 1; + status = pthread_create (&listen_thread, NULL, us_server_thread, NULL); if (status != 0) { + char errbuf[1024]; ERROR ("unixsock plugin: pthread_create failed: %s", - strerror (status)); + sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -659,6 +821,8 @@ static int us_shutdown (void) { void *ret; + loop = 0; + if (listen_thread != (pthread_t) 0) { pthread_kill (listen_thread, SIGTERM); @@ -676,12 +840,12 @@ static int us_shutdown (void) static int us_write (const data_set_t *ds, const value_list_t *vl) { cache_update (ds, vl); - cache_flush (2 * atoi (COLLECTD_STEP)); + cache_flush (2 * interval_g); return (0); } -void module_register (void) +void module_register (modreg_e load) { plugin_register_config ("unixsock", us_config, config_keys, config_keys_num);