mqtt, gps: add name parameter to plugin_thread_create()
[collectd.git] / src / gps.c
1 /**
2  * collectd - src/gps.c
3  * Copyright (C) 2015  Nicolas JOURDEN
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Nicolas JOURDEN <nicolas.jourden at laposte.net>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Marc Fournier <marc.fournier at camptocamp.com>
27  **/
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_time.h"
33
34 #define CGPS_TRUE                  1
35 #define CGPS_FALSE                 0
36 #define CGPS_DEFAULT_HOST          "localhost"
37 #define CGPS_DEFAULT_PORT          "2947" /* DEFAULT_GPSD_PORT */
38 #define CGPS_DEFAULT_TIMEOUT       MS_TO_CDTIME_T (15)
39 #define CGPS_DEFAULT_PAUSE_CONNECT TIME_T_TO_CDTIME_T (5)
40 #define CGPS_MAX_ERROR             100
41 #define CGPS_CONFIG                "?WATCH={\"enable\":true,\"json\":true,\"nmea\":false}\r\n"
42
43 #include <gps.h>
44 #include <pthread.h>
45
46 typedef struct {
47   char *host;
48   char *port;
49   cdtime_t timeout;
50   cdtime_t pause_connect;
51 } cgps_config_t;
52
53 typedef struct {
54   gauge_t sats_used;
55   gauge_t sats_visible;
56   gauge_t hdop;
57   gauge_t vdop;
58 } cgps_data_t;
59
60 static cgps_config_t cgps_config_data;
61
62 static cgps_data_t cgps_data = {NAN, NAN, NAN, NAN};
63
64 static pthread_t cgps_thread_id;
65 static pthread_mutex_t  cgps_data_lock = PTHREAD_MUTEX_INITIALIZER;
66 static pthread_mutex_t  cgps_thread_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_cond_t   cgps_thread_cond = PTHREAD_COND_INITIALIZER;
68 static int cgps_thread_shutdown = CGPS_FALSE;
69 static int cgps_thread_running = CGPS_FALSE;
70
71 /**
72  * Non blocking pause for the thread.
73  */
74 static int cgps_thread_pause(cdtime_t pTime)
75 {
76   cdtime_t until = cdtime() + pTime;
77
78   pthread_mutex_lock (&cgps_thread_lock);
79   pthread_cond_timedwait (&cgps_thread_cond, &cgps_thread_lock,
80       &CDTIME_T_TO_TIMESPEC (until));
81
82   int ret = !cgps_thread_shutdown;
83
84   pthread_mutex_lock (&cgps_thread_lock);
85   return ret;
86 }
87
88 /**
89  * Thread reading from gpsd.
90  */
91 static void * cgps_thread (void * pData)
92 {
93   struct gps_data_t gpsd_conn;
94   unsigned int err_count;
95   cgps_thread_running = CGPS_TRUE;
96
97   while (CGPS_TRUE)
98   {
99     pthread_mutex_lock (&cgps_thread_lock);
100     if (cgps_thread_shutdown == CGPS_TRUE)
101     {
102       goto quit;
103     }
104     pthread_mutex_unlock (&cgps_thread_lock);
105
106     err_count = 0;
107
108 #if GPSD_API_MAJOR_VERSION > 4
109     int status = gps_open (cgps_config_data.host, cgps_config_data.port, &gpsd_conn);
110 #else
111     int status = gps_open_r (cgps_config_data.host, cgps_config_data.port, &gpsd_conn);
112 #endif
113     if (status < 0)
114     {
115       WARNING ("gps plugin: connecting to %s:%s failed: %s",
116                cgps_config_data.host, cgps_config_data.port, gps_errstr (status));
117
118       // Here we make a pause until a new tentative to connect, we check also if
119       // the thread does not need to stop.
120       if (cgps_thread_pause(cgps_config_data.pause_connect) == CGPS_FALSE)
121       {
122         goto quit;
123       }
124
125       continue;
126     }
127
128     gps_stream (&gpsd_conn, WATCH_ENABLE | WATCH_JSON | WATCH_NEWSTYLE, NULL);
129     gps_send (&gpsd_conn, CGPS_CONFIG);
130
131     while (CGPS_TRUE)
132     {
133       pthread_mutex_lock (&cgps_thread_lock);
134       if (cgps_thread_shutdown == CGPS_TRUE)
135       {
136         goto stop;
137       }
138       pthread_mutex_unlock (&cgps_thread_lock);
139
140 #if GPSD_API_MAJOR_VERSION > 4
141       long timeout_us = CDTIME_T_TO_US (cgps_config_data.timeout);
142       if (!gps_waiting (&gpsd_conn, (int) timeout_us ))
143 #else
144       if (!gps_waiting (&gpsd_conn))
145 #endif
146       {
147         continue;
148       }
149
150       if (gps_read (&gpsd_conn) == -1)
151       {
152         WARNING ("gps plugin: incorrect data! (err_count: %d)", err_count);
153         err_count++;
154
155         if (err_count > CGPS_MAX_ERROR)
156         {
157           // Server is not responding ...
158           if (gps_send (&gpsd_conn, CGPS_CONFIG) == -1)
159           {
160             WARNING ("gps plugin: gpsd seems to be down, reconnecting");
161             gps_close (&gpsd_conn);
162             break;
163           }
164           // Server is responding ...
165           else
166           {
167             err_count = 0;
168           }
169         }
170
171         continue;
172       }
173
174       pthread_mutex_lock (&cgps_data_lock);
175
176       // Number of sats in view:
177       cgps_data.sats_used = (gauge_t) gpsd_conn.satellites_used;
178       cgps_data.sats_visible = (gauge_t) gpsd_conn.satellites_visible;
179
180       // dilution of precision:
181       cgps_data.vdop = NAN;
182       cgps_data.hdop = NAN;
183       if (cgps_data.sats_used > 0)
184       {
185         cgps_data.hdop = gpsd_conn.dop.hdop;
186         cgps_data.vdop = gpsd_conn.dop.vdop;
187       }
188
189       DEBUG ("gps plugin: %.0f sats used (of %.0f visible), hdop = %.3f, vdop = %.3f",
190              cgps_data.sats_used, cgps_data.sats_visible, cgps_data.hdop, cgps_data.vdop);
191
192       pthread_mutex_unlock (&cgps_data_lock);
193     }
194   }
195
196 stop:
197   DEBUG ("gps plugin: thread closing gpsd connection ... ");
198   gps_stream (&gpsd_conn, WATCH_DISABLE, NULL);
199   gps_close (&gpsd_conn);
200 quit:
201   DEBUG ("gps plugin: thread shutting down ... ");
202   cgps_thread_running = CGPS_FALSE;
203   pthread_mutex_unlock (&cgps_thread_lock);
204   pthread_exit (NULL);
205 }
206
207
208 /**
209  * Submit a piece of the data.
210  */
211 static void cgps_submit (const char *type, gauge_t value, const char *type_instance)
212 {
213   value_list_t vl = VALUE_LIST_INIT;
214
215   vl.values = &(value_t) { .gauge = value };
216   vl.values_len = 1;
217   sstrncpy (vl.plugin, "gps", sizeof (vl.plugin));
218   sstrncpy (vl.type, type, sizeof (vl.type));
219   sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
220
221   plugin_dispatch_values (&vl);
222 }
223
224 /**
225  * Read the data and submit by piece.
226  */
227 static int cgps_read (void)
228 {
229   cgps_data_t data_copy;
230
231   pthread_mutex_lock (&cgps_data_lock);
232   data_copy = cgps_data;
233   pthread_mutex_unlock (&cgps_data_lock);
234
235   cgps_submit ("dilution_of_precision", data_copy.hdop, "horizontal");
236   cgps_submit ("dilution_of_precision", data_copy.vdop, "vertical");
237   cgps_submit ("satellites", data_copy.sats_used, "used");
238   cgps_submit ("satellites", data_copy.sats_visible, "visible");
239
240   return (0);
241 }
242
243 /**
244  * Read configuration.
245  */
246 static int cgps_config (oconfig_item_t *ci)
247 {
248   int i;
249
250   for (i = 0; i < ci->children_num; i++)
251   {
252     oconfig_item_t *child = ci->children + i;
253
254     if (strcasecmp ("Host", child->key) == 0)
255       cf_util_get_string (child, &cgps_config_data.host);
256     else if (strcasecmp ("Port", child->key) == 0)
257       cf_util_get_service (child, &cgps_config_data.port);
258     else if (strcasecmp ("Timeout", child->key) == 0)
259       cf_util_get_cdtime (child, &cgps_config_data.timeout);
260     else if (strcasecmp ("PauseConnect", child->key) == 0)
261       cf_util_get_cdtime (child, &cgps_config_data.pause_connect);
262     else
263       WARNING ("gps plugin: Ignoring unknown config option \"%s\".", child->key);
264   }
265
266   // Controlling the value for timeout:
267   // If set too high it blocks the reading (> 5 s), too low it gets not reading (< 500 us).
268   // To avoid any issues we replace "out of range" value by the default value.
269   if (
270     cgps_config_data.timeout > TIME_T_TO_CDTIME_T(5)
271     ||
272     cgps_config_data.timeout < US_TO_CDTIME_T(500)
273   )
274   {
275     WARNING ("gps plugin: timeout set to %.6f sec. setting to default (%.6f).",
276       CDTIME_T_TO_DOUBLE(cgps_config_data.timeout),
277       CDTIME_T_TO_DOUBLE(CGPS_DEFAULT_TIMEOUT)
278     );
279     cgps_config_data.timeout = CGPS_DEFAULT_TIMEOUT;
280   }
281
282   return (0);
283 }
284
285 /**
286  * Init.
287  */
288 static int cgps_init (void)
289 {
290   int status;
291
292   if (cgps_thread_running == CGPS_TRUE)
293   {
294     DEBUG ("gps plugin: error gps thread already running ... ");
295     return 0;
296   }
297
298   DEBUG ("gps plugin: config{host: \"%s\", port: \"%s\", timeout: %.6f sec., pause connect: %.3f sec.}",
299          cgps_config_data.host, cgps_config_data.port,
300          CDTIME_T_TO_DOUBLE (cgps_config_data.timeout),
301          CDTIME_T_TO_DOUBLE (cgps_config_data.pause_connect));
302
303   status = plugin_thread_create (&cgps_thread_id, NULL, cgps_thread, NULL, "gps");
304   if (status != 0)
305   {
306     ERROR ("gps plugin: pthread_create() failed.");
307     return (-1);
308   }
309
310   return (0);
311 }
312
313 /**
314  * Shutdown.
315  */
316 static int cgps_shutdown (void)
317 {
318   void * res;
319
320   pthread_mutex_lock (&cgps_thread_lock);
321   cgps_thread_shutdown = CGPS_TRUE;
322   pthread_cond_broadcast (&cgps_thread_cond);
323   pthread_mutex_unlock (&cgps_thread_lock);
324
325   pthread_join(cgps_thread_id, &res);
326   free(res);
327
328   // Clean mutex:
329   pthread_mutex_unlock(&cgps_thread_lock);
330   pthread_mutex_destroy(&cgps_thread_lock);
331   pthread_mutex_unlock(&cgps_data_lock);
332   pthread_mutex_destroy(&cgps_data_lock);
333
334   sfree (cgps_config_data.port);
335   sfree (cgps_config_data.host);
336
337   return (0);
338 }
339
340 /**
341  * Register the module.
342  */
343 void module_register (void)
344 {
345   cgps_config_data.host = sstrdup (CGPS_DEFAULT_HOST);
346   cgps_config_data.port = sstrdup (CGPS_DEFAULT_PORT);
347   cgps_config_data.timeout = CGPS_DEFAULT_TIMEOUT;
348   cgps_config_data.pause_connect = CGPS_DEFAULT_PAUSE_CONNECT;
349
350   plugin_register_complex_config ("gps", cgps_config);
351   plugin_register_init ("gps", cgps_init);
352   plugin_register_read ("gps", cgps_read);
353   plugin_register_shutdown ("gps", cgps_shutdown);
354 }