Merge branch 'collectd-4.10' into collectd-5.0
[collectd.git] / contrib / collectd_network.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license or newer.
8
9 # Frank Marien (frank@apsu.be) 6 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated to python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13 # - fixed comment typo (decode_network_string decodes a string)
14
15 """
16 Collectd network protocol implementation.
17 """
18
19 import socket,struct,sys
20 try:
21   from io import StringIO
22 except ImportError:
23   from cStringIO import StringIO
24
25 from datetime import datetime
26 from copy import deepcopy
27
28
29 DEFAULT_PORT = 25826
30 """Default port"""
31
32 DEFAULT_IPv4_GROUP = "239.192.74.66"
33 """Default IPv4 multicast group"""
34
35 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
36 """Default IPv6 multicast group"""
37
38 HR_TIME_DIV = (2.0**30)
39
40 # Message kinds
41 TYPE_HOST            = 0x0000
42 TYPE_TIME            = 0x0001
43 TYPE_TIME_HR         = 0x0008
44 TYPE_PLUGIN          = 0x0002
45 TYPE_PLUGIN_INSTANCE = 0x0003
46 TYPE_TYPE            = 0x0004
47 TYPE_TYPE_INSTANCE   = 0x0005
48 TYPE_VALUES          = 0x0006
49 TYPE_INTERVAL        = 0x0007
50 TYPE_INTERVAL_HR     = 0x0009
51
52 # For notifications
53 TYPE_MESSAGE         = 0x0100
54 TYPE_SEVERITY        = 0x0101
55
56 # DS kinds
57 DS_TYPE_COUNTER      = 0
58 DS_TYPE_GAUGE        = 1
59 DS_TYPE_DERIVE       = 2
60 DS_TYPE_ABSOLUTE     = 3
61
62 header = struct.Struct("!2H")
63 number = struct.Struct("!Q")
64 short  = struct.Struct("!H")
65 double = struct.Struct("<d")
66
67 def decode_network_values(ptype, plen, buf):
68     """Decodes a list of DS values in collectd network format
69     """
70     nvalues = short.unpack_from(buf, header.size)[0]
71     off = header.size + short.size + nvalues
72     valskip = double.size
73
74     # Check whether our expected packet size is the reported one
75     assert ((valskip + 1) * nvalues + short.size + header.size) == plen
76     assert double.size == number.size
77
78     result = []
79     for dstype in buf[header.size+short.size:off]:
80         if dstype == DS_TYPE_COUNTER:
81             result.append((dstype, number.unpack_from(buf, off)[0]))
82             off += valskip
83         elif dstype == DS_TYPE_GAUGE:
84             result.append((dstype, double.unpack_from(buf, off)[0]))
85             off += valskip
86         elif dstype == DS_TYPE_DERIVE:
87             result.append((dstype, number.unpack_from(buf, off)[0]))
88             off += valskip
89         elif dstype == DS_TYPE_ABSOLUTE:
90             result.append((dstype, number.unpack_from(buf, off)[0]))
91             off += valskip
92         else:
93             raise ValueError("DS type %i unsupported" % dstype)
94
95     return result
96
97
98 def decode_network_number(ptype, plen, buf):
99     """Decodes a number (64-bit unsigned) from collectd network format.
100     """
101     return number.unpack_from(buf, header.size)[0]
102
103
104 def decode_network_string(msgtype, plen, buf):
105     """Decodes a string from collectd network format.
106     """
107     return buf[header.size:plen-1]
108
109
110 # Mapping of message types to decoding functions.
111 _decoders = {
112     TYPE_VALUES         : decode_network_values,
113     TYPE_TIME           : decode_network_number,
114     TYPE_TIME_HR        : decode_network_number,
115     TYPE_INTERVAL       : decode_network_number,
116     TYPE_INTERVAL_HR    : decode_network_number,
117     TYPE_HOST           : decode_network_string,
118     TYPE_PLUGIN         : decode_network_string,
119     TYPE_PLUGIN_INSTANCE: decode_network_string,
120     TYPE_TYPE           : decode_network_string,
121     TYPE_TYPE_INSTANCE  : decode_network_string,
122     TYPE_MESSAGE        : decode_network_string,
123     TYPE_SEVERITY       : decode_network_number,
124 }
125
126
127 def decode_network_packet(buf):
128     """Decodes a network packet in collectd format.
129     """
130     off = 0
131     blen = len(buf)
132
133     while off < blen:
134         ptype, plen = header.unpack_from(buf, off)
135
136         if plen > blen - off:
137             raise ValueError("Packet longer than amount of data in buffer")
138
139         if ptype not in _decoders:
140             raise ValueError("Message type %i not recognized" % ptype)
141
142         yield ptype, _decoders[ptype](ptype, plen, buf[off:])
143         off += plen
144
145
146 class Data(object):
147     time = 0
148     host = None
149     plugin = None
150     plugininstance = None
151     type = None
152     typeinstance = None
153
154     def __init__(self, **kw):
155         [setattr(self, k, v) for k, v in kw.items()]
156
157     @property
158     def datetime(self):
159         return datetime.fromtimestamp(self.time)
160
161     @property
162     def source(self):
163         buf = StringIO()
164         if self.host:
165             buf.write(str(self.host))
166         if self.plugin:
167             buf.write("/")
168             buf.write(str(self.plugin))
169         if self.plugininstance:
170             buf.write("/")
171             buf.write(str(self.plugininstance))
172         if self.type:
173             buf.write("/")
174             buf.write(str(self.type))
175         if self.typeinstance:
176             buf.write("/")
177             buf.write(str(self.typeinstance))
178         return buf.getvalue()
179
180     def __str__(self):
181         return "[%i] %s" % (self.time, self.source)
182
183
184
185 class Notification(Data):
186     FAILURE  = 1
187     WARNING  = 2
188     OKAY     = 4
189
190     SEVERITY = {
191         FAILURE: "FAILURE",
192         WARNING: "WARNING",
193         OKAY   : "OKAY",
194     }
195
196     __severity = 0
197     message  = ""
198
199     def __set_severity(self, value):
200         if value in (self.FAILURE, self.WARNING, self.OKAY):
201             self.__severity = value
202
203     severity = property(lambda self: self.__severity, __set_severity)
204
205     @property
206     def severitystring(self):
207         return self.SEVERITY.get(self.severity, "UNKNOWN")
208
209     def __str__(self):
210         return "%s [%s] %s" % (
211                 super(Notification, self).__str__(),
212                 self.severitystring,
213                 self.message)
214
215
216
217 class Values(Data, list):
218     def __str__(self):
219         return "%s %s" % (Data.__str__(self), list.__str__(self))
220
221
222
223 def interpret_opcodes(iterable):
224     vl = Values()
225     nt = Notification()
226
227     for kind, data in iterable:
228         if kind == TYPE_TIME:
229             vl.time = nt.time = data
230         elif kind == TYPE_TIME_HR:
231             vl.time = nt.time = data / HR_TIME_DIV
232         elif kind == TYPE_INTERVAL:
233             vl.interval = data
234         elif kind == TYPE_INTERVAL_HR:
235             vl.interval = data / HR_TIME_DIV
236         elif kind == TYPE_HOST:
237             vl.host = nt.host = data
238         elif kind == TYPE_PLUGIN:
239             vl.plugin = nt.plugin = data
240         elif kind == TYPE_PLUGIN_INSTANCE:
241             vl.plugininstance = nt.plugininstance = data
242         elif kind == TYPE_TYPE:
243             vl.type = nt.type = data
244         elif kind == TYPE_TYPE_INSTANCE:
245             vl.typeinstance = nt.typeinstance = data
246         elif kind == TYPE_SEVERITY:
247             nt.severity = data
248         elif kind == TYPE_MESSAGE:
249             nt.message = data
250             yield deepcopy(nt)
251         elif kind == TYPE_VALUES:
252             vl[:] = data
253             yield deepcopy(vl)
254
255
256
257 class Reader(object):
258     """Network reader for collectd data.
259
260     Listens on the network in a given address, which can be a multicast
261     group address, and handles reading data when it arrives.
262     """
263     addr = None
264     host = None
265     port = DEFAULT_PORT
266
267     BUFFER_SIZE = 16384
268
269
270     def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
271         if host is None:
272             multicast = True
273             host = DEFAULT_IPv4_GROUP
274
275         self.host, self.port = host, port
276         self.ipv6 = ":" in self.host
277
278         family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
279                 None if multicast else self.host, self.port,
280                 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
281                 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
282
283         self._sock = socket.socket(family, socktype, proto)
284         self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
285         self._sock.bind(sockaddr)
286
287         if multicast:
288             if hasattr(socket, "SO_REUSEPORT"):
289                 self._sock.setsockopt(
290                         socket.SOL_SOCKET,
291                         socket.SO_REUSEPORT, 1)
292
293             val = None
294             if family == socket.AF_INET:
295                 assert "." in self.host
296                 val = struct.pack("4sl",
297                         socket.inet_aton(self.host), socket.INADDR_ANY)
298             elif family == socket.AF_INET6:
299                 raise NotImplementedError("IPv6 support not ready yet")
300             else:
301                 raise ValueError("Unsupported network address family")
302
303             self._sock.setsockopt(
304                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
305                     socket.IP_ADD_MEMBERSHIP, val)
306             self._sock.setsockopt(
307                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
308                     socket.IP_MULTICAST_LOOP, 0)
309
310
311     def receive(self):
312         """Receives a single raw collect network packet.
313         """
314         return self._sock.recv(self.BUFFER_SIZE)
315
316
317     def decode(self, buf=None):
318         """Decodes a given buffer or the next received packet.
319         """
320         if buf is None:
321             buf = self.receive()
322         return decode_network_packet(buf)
323
324
325     def interpret(self, iterable=None):
326         """Interprets a sequence
327         """
328         if iterable is None:
329             iterable = self.decode()
330         if isinstance(iterable, str):
331             iterable = self.decode(iterable)
332         return interpret_opcodes(iterable)