Merge pull request #3339 from jkohen/patch-1
[collectd.git] / contrib / collectd_network.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license or newer.
8
9 # Frank Marien (frank@apsu.be) 6 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated to python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13 # - fixed comment typo (decode_network_string decodes a string)
14
15 """
16 Collectd network protocol implementation.
17 """
18
19 import socket,struct
20 import platform
21 if platform.python_version() < '2.8.0':
22     # Python 2.7 and below io.StringIO does not like unicode
23     from StringIO import StringIO
24 else:
25     try:
26       from io import StringIO
27     except ImportError:
28       from cStringIO import StringIO
29
30 from datetime import datetime
31 from copy import deepcopy
32
33
34 DEFAULT_PORT = 25826
35 """Default port"""
36
37 DEFAULT_IPv4_GROUP = "239.192.74.66"
38 """Default IPv4 multicast group"""
39
40 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
41 """Default IPv6 multicast group"""
42
43 HR_TIME_DIV = (2.0**30)
44
45 # Message kinds
46 TYPE_HOST            = 0x0000
47 TYPE_TIME            = 0x0001
48 TYPE_TIME_HR         = 0x0008
49 TYPE_PLUGIN          = 0x0002
50 TYPE_PLUGIN_INSTANCE = 0x0003
51 TYPE_TYPE            = 0x0004
52 TYPE_TYPE_INSTANCE   = 0x0005
53 TYPE_VALUES          = 0x0006
54 TYPE_INTERVAL        = 0x0007
55 TYPE_INTERVAL_HR     = 0x0009
56
57 # For notifications
58 TYPE_MESSAGE         = 0x0100
59 TYPE_SEVERITY        = 0x0101
60
61 # DS kinds
62 DS_TYPE_COUNTER      = 0
63 DS_TYPE_GAUGE        = 1
64 DS_TYPE_DERIVE       = 2
65 DS_TYPE_ABSOLUTE     = 3
66
67 header = struct.Struct("!2H")
68 number = struct.Struct("!Q")
69 short  = struct.Struct("!H")
70 double = struct.Struct("<d")
71
72 def decode_network_values(ptype, plen, buf):
73     """Decodes a list of DS values in collectd network format
74     """
75     nvalues = short.unpack_from(buf, header.size)[0]
76     off = header.size + short.size + nvalues
77     valskip = double.size
78
79     # Check whether our expected packet size is the reported one
80     assert ((valskip + 1) * nvalues + short.size + header.size) == plen
81     assert double.size == number.size
82
83     result = []
84     for dstype in [ord(x) for x in buf[header.size+short.size:off]]:
85         if dstype == DS_TYPE_COUNTER:
86             result.append((dstype, number.unpack_from(buf, off)[0]))
87             off += valskip
88         elif dstype == DS_TYPE_GAUGE:
89             result.append((dstype, double.unpack_from(buf, off)[0]))
90             off += valskip
91         elif dstype == DS_TYPE_DERIVE:
92             result.append((dstype, number.unpack_from(buf, off)[0]))
93             off += valskip
94         elif dstype == DS_TYPE_ABSOLUTE:
95             result.append((dstype, number.unpack_from(buf, off)[0]))
96             off += valskip
97         else:
98             raise ValueError("DS type %i unsupported" % dstype)
99
100     return result
101
102
103 def decode_network_number(ptype, plen, buf):
104     """Decodes a number (64-bit unsigned) from collectd network format.
105     """
106     return number.unpack_from(buf, header.size)[0]
107
108
109 def decode_network_string(msgtype, plen, buf):
110     """Decodes a string from collectd network format.
111     """
112     return buf[header.size:plen-1]
113
114
115 # Mapping of message types to decoding functions.
116 _decoders = {
117     TYPE_VALUES         : decode_network_values,
118     TYPE_TIME           : decode_network_number,
119     TYPE_TIME_HR        : decode_network_number,
120     TYPE_INTERVAL       : decode_network_number,
121     TYPE_INTERVAL_HR    : decode_network_number,
122     TYPE_HOST           : decode_network_string,
123     TYPE_PLUGIN         : decode_network_string,
124     TYPE_PLUGIN_INSTANCE: decode_network_string,
125     TYPE_TYPE           : decode_network_string,
126     TYPE_TYPE_INSTANCE  : decode_network_string,
127     TYPE_MESSAGE        : decode_network_string,
128     TYPE_SEVERITY       : decode_network_number,
129 }
130
131
132 def decode_network_packet(buf):
133     """Decodes a network packet in collectd format.
134     """
135     off = 0
136     blen = len(buf)
137
138     while off < blen:
139         ptype, plen = header.unpack_from(buf, off)
140
141         if plen > blen - off:
142             raise ValueError("Packet longer than amount of data in buffer")
143
144         if ptype not in _decoders:
145             raise ValueError("Message type %i not recognized" % ptype)
146
147         yield ptype, _decoders[ptype](ptype, plen, buf[off:])
148         off += plen
149
150
151 class Data(object):
152     time = 0
153     host = None
154     plugin = None
155     plugininstance = None
156     type = None
157     typeinstance = None
158
159     def __init__(self, **kw):
160         [setattr(self, k, v) for k, v in kw.items()]
161
162     @property
163     def datetime(self):
164         return datetime.fromtimestamp(self.time)
165
166     @property
167     def source(self):
168         buf = StringIO()
169         if self.host:
170             buf.write(str(self.host))
171         if self.plugin:
172             buf.write("/")
173             buf.write(str(self.plugin))
174         if self.plugininstance:
175             buf.write("/")
176             buf.write(str(self.plugininstance))
177         if self.type:
178             buf.write("/")
179             buf.write(str(self.type))
180         if self.typeinstance:
181             buf.write("/")
182             buf.write(str(self.typeinstance))
183         return buf.getvalue()
184
185     def __str__(self):
186         return "[%i] %s" % (self.time, self.source)
187
188
189
190 class Notification(Data):
191     FAILURE  = 1
192     WARNING  = 2
193     OKAY     = 4
194
195     SEVERITY = {
196         FAILURE: "FAILURE",
197         WARNING: "WARNING",
198         OKAY   : "OKAY",
199     }
200
201     __severity = 0
202     message  = ""
203
204     def __set_severity(self, value):
205         if value in (self.FAILURE, self.WARNING, self.OKAY):
206             self.__severity = value
207
208     severity = property(lambda self: self.__severity, __set_severity)
209
210     @property
211     def severitystring(self):
212         return self.SEVERITY.get(self.severity, "UNKNOWN")
213
214     def __str__(self):
215         return "%s [%s] %s" % (
216                 super(Notification, self).__str__(),
217                 self.severitystring,
218                 self.message)
219
220
221
222 class Values(Data, list):
223     def __str__(self):
224         return "%s %s" % (Data.__str__(self), list.__str__(self))
225
226
227
228 def interpret_opcodes(iterable):
229     vl = Values()
230     nt = Notification()
231
232     for kind, data in iterable:
233         if kind == TYPE_TIME:
234             vl.time = nt.time = data
235         elif kind == TYPE_TIME_HR:
236             vl.time = nt.time = data / HR_TIME_DIV
237         elif kind == TYPE_INTERVAL:
238             vl.interval = data
239         elif kind == TYPE_INTERVAL_HR:
240             vl.interval = data / HR_TIME_DIV
241         elif kind == TYPE_HOST:
242             vl.host = nt.host = data
243         elif kind == TYPE_PLUGIN:
244             vl.plugin = nt.plugin = data
245         elif kind == TYPE_PLUGIN_INSTANCE:
246             vl.plugininstance = nt.plugininstance = data
247         elif kind == TYPE_TYPE:
248             vl.type = nt.type = data
249         elif kind == TYPE_TYPE_INSTANCE:
250             vl.typeinstance = nt.typeinstance = data
251         elif kind == TYPE_SEVERITY:
252             nt.severity = data
253         elif kind == TYPE_MESSAGE:
254             nt.message = data
255             yield deepcopy(nt)
256         elif kind == TYPE_VALUES:
257             vl[:] = data
258             yield deepcopy(vl)
259
260
261
262 class Reader(object):
263     """Network reader for collectd data.
264
265     Listens on the network in a given address, which can be a multicast
266     group address, and handles reading data when it arrives.
267     """
268     addr = None
269     host = None
270     port = DEFAULT_PORT
271
272     BUFFER_SIZE = 16384
273
274
275     def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
276         if host is None:
277             multicast = True
278             host = DEFAULT_IPv4_GROUP
279
280         self.host, self.port = host, port
281         self.ipv6 = ":" in self.host
282
283         family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
284                 None if multicast else self.host, self.port,
285                 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
286                 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
287
288         self._sock = socket.socket(family, socktype, proto)
289         self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
290         self._sock.bind(sockaddr)
291
292         if multicast:
293             if hasattr(socket, "SO_REUSEPORT"):
294                 self._sock.setsockopt(
295                         socket.SOL_SOCKET,
296                         socket.SO_REUSEPORT, 1)
297
298             val = None
299             if family == socket.AF_INET:
300                 assert "." in self.host
301                 val = struct.pack("4sl",
302                         socket.inet_aton(self.host), socket.INADDR_ANY)
303             elif family == socket.AF_INET6:
304                 raise NotImplementedError("IPv6 support not ready yet")
305             else:
306                 raise ValueError("Unsupported network address family")
307
308             self._sock.setsockopt(
309                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310                     socket.IP_ADD_MEMBERSHIP, val)
311             self._sock.setsockopt(
312                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
313                     socket.IP_MULTICAST_LOOP, 0)
314
315
316     def receive(self):
317         """Receives a single raw collect network packet.
318         """
319         return self._sock.recv(self.BUFFER_SIZE)
320
321
322     def decode(self, buf=None):
323         """Decodes a given buffer or the next received packet.
324         """
325         if buf is None:
326             buf = self.receive()
327         return decode_network_packet(buf)
328
329
330     def interpret(self, iterable=None):
331         """Interprets a sequence
332         """
333         if iterable is None:
334             iterable = self.decode()
335         if isinstance(iterable, str):
336             iterable = self.decode(iterable)
337         return interpret_opcodes(iterable)