Updated Adrian Perez' 2009 Python network proto classes
[collectd.git] / contrib / collectd_network.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license.
8
9 # Frank Marien (frank@apsu.be) 4 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated for python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13
14 """
15 Collectd network protocol implementation.
16 """
17
18 import socket
19 import struct
20 try:
21   from io import StringIO
22 except ImportError:
23   from cStringIO import StringIO
24
25 from datetime import datetime
26 from copy import deepcopy
27
28
29 DEFAULT_PORT = 25826
30 """Default port"""
31
32 DEFAULT_IPv4_GROUP = "239.192.74.66"
33 """Default IPv4 multicast group"""
34
35 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
36 """Default IPv6 multicast group"""
37
38
39
40 # Message kinds
41 TYPE_HOST            = 0x0000
42 TYPE_TIME            = 0x0001
43 TYPE_TIME_HR         = 0x0008
44 TYPE_PLUGIN          = 0x0002
45 TYPE_PLUGIN_INSTANCE = 0x0003
46 TYPE_TYPE            = 0x0004
47 TYPE_TYPE_INSTANCE   = 0x0005
48 TYPE_VALUES          = 0x0006
49 TYPE_INTERVAL        = 0x0007
50 TYPE_INTERVAL_HR     = 0x0009
51
52 # For notifications
53 TYPE_MESSAGE         = 0x0100
54 TYPE_SEVERITY        = 0x0101
55
56 # DS kinds
57 DS_TYPE_COUNTER      = 0
58 DS_TYPE_GAUGE        = 1
59 DS_TYPE_DERIVE       = 2
60 DS_TYPE_ABSOLUTE     = 3
61
62
63 header = struct.Struct("!2H")
64 number = struct.Struct("!Q")
65 short  = struct.Struct("!H")
66 double = struct.Struct("<d")
67
68
69 def decode_network_values(ptype, plen, buf):
70     """Decodes a list of DS values in collectd network format
71     """
72     nvalues = short.unpack_from(buf, header.size)[0]
73     off = header.size + short.size + nvalues
74     valskip = double.size
75
76     # Check whether our expected packet size is the reported one
77     assert ((valskip + 1) * nvalues + short.size + header.size) == plen
78     assert double.size == number.size
79
80     result = []
81     for dstype in buf[header.size+short.size:off]:
82         if dstype == DS_TYPE_COUNTER:
83             result.append((dstype, number.unpack_from(buf, off)[0]))
84             off += valskip
85         elif dstype == DS_TYPE_GAUGE:
86             result.append((dstype, double.unpack_from(buf, off)[0]))
87             off += valskip
88         elif dstype == DS_TYPE_DERIVE:
89             result.append((dstype, number.unpack_from(buf, off)[0]))
90             off += valskip
91         elif dstype == DS_TYPE_ABSOLUTE:
92             result.append((dstype, number.unpack_from(buf, off)[0]))
93             off += valskip
94         else:
95             raise ValueError("DS type %i unsupported" % dstype)
96
97     return result
98
99
100 def decode_network_number(ptype, plen, buf):
101     """Decodes a number (64-bit unsigned) in collectd network format.
102     """
103     return number.unpack_from(buf, header.size)[0]
104
105
106 def decode_network_string(msgtype, plen, buf):
107     """Decodes a floating point number (64-bit) in collectd network format.
108     """
109     return buf[header.size:plen-1]
110
111
112 # Mapping of message types to decoding functions.
113 _decoders = {
114     TYPE_VALUES         : decode_network_values,
115     TYPE_TIME           : decode_network_number,
116     TYPE_TIME_HR        : decode_network_number,
117     TYPE_INTERVAL       : decode_network_number,
118     TYPE_INTERVAL_HR    : decode_network_number,
119     TYPE_HOST           : decode_network_string,
120     TYPE_PLUGIN         : decode_network_string,
121     TYPE_PLUGIN_INSTANCE: decode_network_string,
122     TYPE_TYPE           : decode_network_string,
123     TYPE_TYPE_INSTANCE  : decode_network_string,
124     TYPE_MESSAGE        : decode_network_string,
125     TYPE_SEVERITY       : decode_network_number,
126 }
127
128
129 def decode_network_packet(buf):
130     """Decodes a network packet in collectd format.
131     """
132     off = 0
133     blen = len(buf)
134
135     while off < blen:
136         ptype, plen = header.unpack_from(buf, off)
137
138         if plen > blen - off:
139             raise ValueError("Packet longer than amount of data in buffer")
140
141         if ptype not in _decoders:
142             raise ValueError("Message type %i not recognized" % ptype)
143
144         yield ptype, _decoders[ptype](ptype, plen, buf[off:])
145         off += plen
146
147
148 class Data(object):
149     time = 0
150     host = None
151     plugin = None
152     plugininstance = None
153     type = None
154     typeinstance = None
155
156     def __init__(self, **kw):
157         [setattr(self, k, v) for k, v in kw.items()]
158
159     @property
160     def datetime(self):
161         return datetime.fromtimestamp(self.time)
162
163     @property
164     def source(self):
165         buf = StringIO()
166         if self.host:
167             buf.write(str(self.host))
168         if self.plugin:
169             buf.write("/")
170             buf.write(str(self.plugin))
171         if self.plugininstance:
172             buf.write("/")
173             buf.write(str(self.plugininstance))
174         if self.type:
175             buf.write("/")
176             buf.write(str(self.type))
177         if self.typeinstance:
178             buf.write("/")
179             buf.write(str(self.typeinstance))
180         return buf.getvalue()
181
182     def __str__(self):
183         return "[%i] %s" % (self.time, self.source)
184
185
186
187 class Notification(Data):
188     FAILURE  = 1
189     WARNING  = 2
190     OKAY     = 4
191
192     SEVERITY = {
193         FAILURE: "FAILURE",
194         WARNING: "WARNING",
195         OKAY   : "OKAY",
196     }
197
198     __severity = 0
199     message  = ""
200
201     def __set_severity(self, value):
202         if value in (self.FAILURE, self.WARNING, self.OKAY):
203             self.__severity = value
204
205     severity = property(lambda self: self.__severity, __set_severity)
206
207     @property
208     def severitystring(self):
209         return self.SEVERITY.get(self.severity, "UNKNOWN")
210
211     def __str__(self):
212         return "%s [%s] %s" % (
213                 super(Notification, self).__str__(),
214                 self.severitystring,
215                 self.message)
216
217
218
219 class Values(Data, list):
220     def __str__(self):
221         return "%s %s" % (Data.__str__(self), list.__str__(self))
222
223
224
225 def interpret_opcodes(iterable):
226     vl = Values()
227     nt = Notification()
228
229     for kind, data in iterable:
230         if kind == TYPE_TIME:
231             vl.time = nt.time = data
232         elif kind == TYPE_TIME_HR:
233             vl.time = nt.time = data
234         elif kind == TYPE_INTERVAL:
235             vl.interval = data
236         elif kind == TYPE_INTERVAL_HR:
237             vl.interval = data
238         elif kind == TYPE_HOST:
239             vl.host = nt.host = data
240         elif kind == TYPE_PLUGIN:
241             vl.plugin = nt.plugin = data
242         elif kind == TYPE_PLUGIN_INSTANCE:
243             vl.plugininstance = nt.plugininstance = data
244         elif kind == TYPE_TYPE:
245             vl.type = nt.type = data
246         elif kind == TYPE_TYPE_INSTANCE:
247             vl.typeinstance = nt.typeinstance = data
248         elif kind == TYPE_SEVERITY:
249             nt.severity = data
250         elif kind == TYPE_MESSAGE:
251             nt.message = data
252             yield deepcopy(nt)
253         elif kind == TYPE_VALUES:
254             vl[:] = data
255             yield deepcopy(vl)
256
257
258
259 class Reader(object):
260     """Network reader for collectd data.
261
262     Listens on the network in a given address, which can be a multicast
263     group address, and handles reading data when it arrives.
264     """
265     addr = None
266     host = None
267     port = DEFAULT_PORT
268
269     BUFFER_SIZE = 16384
270
271
272     def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
273         if host is None:
274             multicast = True
275             host = DEFAULT_IPv4_GROUP
276
277         self.host, self.port = host, port
278         self.ipv6 = ":" in self.host
279
280         family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
281                 None if multicast else self.host, self.port,
282                 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
283                 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
284
285         self._sock = socket.socket(family, socktype, proto)
286         self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
287         self._sock.bind(sockaddr)
288
289         if multicast:
290             if hasattr(socket, "SO_REUSEPORT"):
291                 self._sock.setsockopt(
292                         socket.SOL_SOCKET,
293                         socket.SO_REUSEPORT, 1)
294
295             val = None
296             if family == socket.AF_INET:
297                 assert "." in self.host
298                 val = struct.pack("4sl",
299                         socket.inet_aton(self.host), socket.INADDR_ANY)
300             elif family == socket.AF_INET6:
301                 raise NotImplementedError("IPv6 support not ready yet")
302             else:
303                 raise ValueError("Unsupported network address family")
304
305             self._sock.setsockopt(
306                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
307                     socket.IP_ADD_MEMBERSHIP, val)
308             self._sock.setsockopt(
309                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310                     socket.IP_MULTICAST_LOOP, 0)
311
312
313     def receive(self):
314         """Receives a single raw collect network packet.
315         """
316         return self._sock.recv(self.BUFFER_SIZE)
317
318
319     def decode(self, buf=None):
320         """Decodes a given buffer or the next received packet.
321         """
322         if buf is None:
323             buf = self.receive()
324         return decode_network_packet(buf)
325
326
327     def interpret(self, iterable=None):
328         """Interprets a sequence
329         """
330         if iterable is None:
331             iterable = self.decode()
332         if isinstance(iterable, str):
333             iterable = self.decode(iterable)
334         return interpret_opcodes(iterable)