2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
7 # Distributed under terms of the GPLv2 license or newer.
9 # Frank Marien (frank@apsu.be) 6 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated to python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13 # - fixed comment typo (decode_network_string decodes a string)
16 Collectd network protocol implementation.
19 import socket,struct,sys
21 if platform.python_version() < '2.8.0':
22 # Python 2.7 and below io.StringIO does not like unicode
23 from StringIO import StringIO
26 from io import StringIO
28 from cStringIO import StringIO
30 from datetime import datetime
31 from copy import deepcopy
37 DEFAULT_IPv4_GROUP = "239.192.74.66"
38 """Default IPv4 multicast group"""
40 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
41 """Default IPv6 multicast group"""
43 HR_TIME_DIV = (2.0**30)
50 TYPE_PLUGIN_INSTANCE = 0x0003
52 TYPE_TYPE_INSTANCE = 0x0005
54 TYPE_INTERVAL = 0x0007
55 TYPE_INTERVAL_HR = 0x0009
59 TYPE_SEVERITY = 0x0101
67 header = struct.Struct("!2H")
68 number = struct.Struct("!Q")
69 short = struct.Struct("!H")
70 double = struct.Struct("<d")
72 def decode_network_values(ptype, plen, buf):
73 """Decodes a list of DS values in collectd network format
75 nvalues = short.unpack_from(buf, header.size)[0]
76 off = header.size + short.size + nvalues
79 # Check whether our expected packet size is the reported one
80 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
81 assert double.size == number.size
84 for dstype in [ord(x) for x in buf[header.size+short.size:off]]:
85 if dstype == DS_TYPE_COUNTER:
86 result.append((dstype, number.unpack_from(buf, off)[0]))
88 elif dstype == DS_TYPE_GAUGE:
89 result.append((dstype, double.unpack_from(buf, off)[0]))
91 elif dstype == DS_TYPE_DERIVE:
92 result.append((dstype, number.unpack_from(buf, off)[0]))
94 elif dstype == DS_TYPE_ABSOLUTE:
95 result.append((dstype, number.unpack_from(buf, off)[0]))
98 raise ValueError("DS type %i unsupported" % dstype)
103 def decode_network_number(ptype, plen, buf):
104 """Decodes a number (64-bit unsigned) from collectd network format.
106 return number.unpack_from(buf, header.size)[0]
109 def decode_network_string(msgtype, plen, buf):
110 """Decodes a string from collectd network format.
112 return buf[header.size:plen-1]
115 # Mapping of message types to decoding functions.
117 TYPE_VALUES : decode_network_values,
118 TYPE_TIME : decode_network_number,
119 TYPE_TIME_HR : decode_network_number,
120 TYPE_INTERVAL : decode_network_number,
121 TYPE_INTERVAL_HR : decode_network_number,
122 TYPE_HOST : decode_network_string,
123 TYPE_PLUGIN : decode_network_string,
124 TYPE_PLUGIN_INSTANCE: decode_network_string,
125 TYPE_TYPE : decode_network_string,
126 TYPE_TYPE_INSTANCE : decode_network_string,
127 TYPE_MESSAGE : decode_network_string,
128 TYPE_SEVERITY : decode_network_number,
132 def decode_network_packet(buf):
133 """Decodes a network packet in collectd format.
139 ptype, plen = header.unpack_from(buf, off)
141 if plen > blen - off:
142 raise ValueError("Packet longer than amount of data in buffer")
144 if ptype not in _decoders:
145 raise ValueError("Message type %i not recognized" % ptype)
147 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
155 plugininstance = None
159 def __init__(self, **kw):
160 [setattr(self, k, v) for k, v in kw.items()]
164 return datetime.fromtimestamp(self.time)
170 buf.write(str(self.host))
173 buf.write(str(self.plugin))
174 if self.plugininstance:
176 buf.write(str(self.plugininstance))
179 buf.write(str(self.type))
180 if self.typeinstance:
182 buf.write(str(self.typeinstance))
183 return buf.getvalue()
186 return "[%i] %s" % (self.time, self.source)
190 class Notification(Data):
204 def __set_severity(self, value):
205 if value in (self.FAILURE, self.WARNING, self.OKAY):
206 self.__severity = value
208 severity = property(lambda self: self.__severity, __set_severity)
211 def severitystring(self):
212 return self.SEVERITY.get(self.severity, "UNKNOWN")
215 return "%s [%s] %s" % (
216 super(Notification, self).__str__(),
222 class Values(Data, list):
224 return "%s %s" % (Data.__str__(self), list.__str__(self))
228 def interpret_opcodes(iterable):
232 for kind, data in iterable:
233 if kind == TYPE_TIME:
234 vl.time = nt.time = data
235 elif kind == TYPE_TIME_HR:
236 vl.time = nt.time = data / HR_TIME_DIV
237 elif kind == TYPE_INTERVAL:
239 elif kind == TYPE_INTERVAL_HR:
240 vl.interval = data / HR_TIME_DIV
241 elif kind == TYPE_HOST:
242 vl.host = nt.host = data
243 elif kind == TYPE_PLUGIN:
244 vl.plugin = nt.plugin = data
245 elif kind == TYPE_PLUGIN_INSTANCE:
246 vl.plugininstance = nt.plugininstance = data
247 elif kind == TYPE_TYPE:
248 vl.type = nt.type = data
249 elif kind == TYPE_TYPE_INSTANCE:
250 vl.typeinstance = nt.typeinstance = data
251 elif kind == TYPE_SEVERITY:
253 elif kind == TYPE_MESSAGE:
256 elif kind == TYPE_VALUES:
262 class Reader(object):
263 """Network reader for collectd data.
265 Listens on the network in a given address, which can be a multicast
266 group address, and handles reading data when it arrives.
275 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
278 host = DEFAULT_IPv4_GROUP
280 self.host, self.port = host, port
281 self.ipv6 = ":" in self.host
283 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
284 None if multicast else self.host, self.port,
285 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
286 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
288 self._sock = socket.socket(family, socktype, proto)
289 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
290 self._sock.bind(sockaddr)
293 if hasattr(socket, "SO_REUSEPORT"):
294 self._sock.setsockopt(
296 socket.SO_REUSEPORT, 1)
299 if family == socket.AF_INET:
300 assert "." in self.host
301 val = struct.pack("4sl",
302 socket.inet_aton(self.host), socket.INADDR_ANY)
303 elif family == socket.AF_INET6:
304 raise NotImplementedError("IPv6 support not ready yet")
306 raise ValueError("Unsupported network address family")
308 self._sock.setsockopt(
309 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310 socket.IP_ADD_MEMBERSHIP, val)
311 self._sock.setsockopt(
312 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
313 socket.IP_MULTICAST_LOOP, 0)
317 """Receives a single raw collect network packet.
319 return self._sock.recv(self.BUFFER_SIZE)
322 def decode(self, buf=None):
323 """Decodes a given buffer or the next received packet.
327 return decode_network_packet(buf)
330 def interpret(self, iterable=None):
331 """Interprets a sequence
334 iterable = self.decode()
335 if isinstance(iterable, str):
336 iterable = self.decode(iterable)
337 return interpret_opcodes(iterable)