2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
7 # Distributed under terms of the GPLv2 license.
9 # Frank Marien (frank@apsu.be) 4 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated for python 3
12 # - fixed for larger packet sizes (possible on lo interface)
15 Collectd network protocol implementation.
21 from io import StringIO
23 from cStringIO import StringIO
25 from datetime import datetime
26 from copy import deepcopy
32 DEFAULT_IPv4_GROUP = "239.192.74.66"
33 """Default IPv4 multicast group"""
35 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
36 """Default IPv6 multicast group"""
45 TYPE_PLUGIN_INSTANCE = 0x0003
47 TYPE_TYPE_INSTANCE = 0x0005
49 TYPE_INTERVAL = 0x0007
50 TYPE_INTERVAL_HR = 0x0009
54 TYPE_SEVERITY = 0x0101
63 header = struct.Struct("!2H")
64 number = struct.Struct("!Q")
65 short = struct.Struct("!H")
66 double = struct.Struct("<d")
69 def decode_network_values(ptype, plen, buf):
70 """Decodes a list of DS values in collectd network format
72 nvalues = short.unpack_from(buf, header.size)[0]
73 off = header.size + short.size + nvalues
76 # Check whether our expected packet size is the reported one
77 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
78 assert double.size == number.size
81 for dstype in buf[header.size+short.size:off]:
82 if dstype == DS_TYPE_COUNTER:
83 result.append((dstype, number.unpack_from(buf, off)[0]))
85 elif dstype == DS_TYPE_GAUGE:
86 result.append((dstype, double.unpack_from(buf, off)[0]))
88 elif dstype == DS_TYPE_DERIVE:
89 result.append((dstype, number.unpack_from(buf, off)[0]))
91 elif dstype == DS_TYPE_ABSOLUTE:
92 result.append((dstype, number.unpack_from(buf, off)[0]))
95 raise ValueError("DS type %i unsupported" % dstype)
100 def decode_network_number(ptype, plen, buf):
101 """Decodes a number (64-bit unsigned) in collectd network format.
103 return number.unpack_from(buf, header.size)[0]
106 def decode_network_string(msgtype, plen, buf):
107 """Decodes a floating point number (64-bit) in collectd network format.
109 return buf[header.size:plen-1]
112 # Mapping of message types to decoding functions.
114 TYPE_VALUES : decode_network_values,
115 TYPE_TIME : decode_network_number,
116 TYPE_TIME_HR : decode_network_number,
117 TYPE_INTERVAL : decode_network_number,
118 TYPE_INTERVAL_HR : decode_network_number,
119 TYPE_HOST : decode_network_string,
120 TYPE_PLUGIN : decode_network_string,
121 TYPE_PLUGIN_INSTANCE: decode_network_string,
122 TYPE_TYPE : decode_network_string,
123 TYPE_TYPE_INSTANCE : decode_network_string,
124 TYPE_MESSAGE : decode_network_string,
125 TYPE_SEVERITY : decode_network_number,
129 def decode_network_packet(buf):
130 """Decodes a network packet in collectd format.
136 ptype, plen = header.unpack_from(buf, off)
138 if plen > blen - off:
139 raise ValueError("Packet longer than amount of data in buffer")
141 if ptype not in _decoders:
142 raise ValueError("Message type %i not recognized" % ptype)
144 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
152 plugininstance = None
156 def __init__(self, **kw):
157 [setattr(self, k, v) for k, v in kw.items()]
161 return datetime.fromtimestamp(self.time)
167 buf.write(str(self.host))
170 buf.write(str(self.plugin))
171 if self.plugininstance:
173 buf.write(str(self.plugininstance))
176 buf.write(str(self.type))
177 if self.typeinstance:
179 buf.write(str(self.typeinstance))
180 return buf.getvalue()
183 return "[%i] %s" % (self.time, self.source)
187 class Notification(Data):
201 def __set_severity(self, value):
202 if value in (self.FAILURE, self.WARNING, self.OKAY):
203 self.__severity = value
205 severity = property(lambda self: self.__severity, __set_severity)
208 def severitystring(self):
209 return self.SEVERITY.get(self.severity, "UNKNOWN")
212 return "%s [%s] %s" % (
213 super(Notification, self).__str__(),
219 class Values(Data, list):
221 return "%s %s" % (Data.__str__(self), list.__str__(self))
225 def interpret_opcodes(iterable):
229 for kind, data in iterable:
230 if kind == TYPE_TIME:
231 vl.time = nt.time = data
232 elif kind == TYPE_TIME_HR:
233 vl.time = nt.time = data
234 elif kind == TYPE_INTERVAL:
236 elif kind == TYPE_INTERVAL_HR:
238 elif kind == TYPE_HOST:
239 vl.host = nt.host = data
240 elif kind == TYPE_PLUGIN:
241 vl.plugin = nt.plugin = data
242 elif kind == TYPE_PLUGIN_INSTANCE:
243 vl.plugininstance = nt.plugininstance = data
244 elif kind == TYPE_TYPE:
245 vl.type = nt.type = data
246 elif kind == TYPE_TYPE_INSTANCE:
247 vl.typeinstance = nt.typeinstance = data
248 elif kind == TYPE_SEVERITY:
250 elif kind == TYPE_MESSAGE:
253 elif kind == TYPE_VALUES:
259 class Reader(object):
260 """Network reader for collectd data.
262 Listens on the network in a given address, which can be a multicast
263 group address, and handles reading data when it arrives.
272 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
275 host = DEFAULT_IPv4_GROUP
277 self.host, self.port = host, port
278 self.ipv6 = ":" in self.host
280 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
281 None if multicast else self.host, self.port,
282 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
283 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
285 self._sock = socket.socket(family, socktype, proto)
286 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
287 self._sock.bind(sockaddr)
290 if hasattr(socket, "SO_REUSEPORT"):
291 self._sock.setsockopt(
293 socket.SO_REUSEPORT, 1)
296 if family == socket.AF_INET:
297 assert "." in self.host
298 val = struct.pack("4sl",
299 socket.inet_aton(self.host), socket.INADDR_ANY)
300 elif family == socket.AF_INET6:
301 raise NotImplementedError("IPv6 support not ready yet")
303 raise ValueError("Unsupported network address family")
305 self._sock.setsockopt(
306 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
307 socket.IP_ADD_MEMBERSHIP, val)
308 self._sock.setsockopt(
309 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
310 socket.IP_MULTICAST_LOOP, 0)
314 """Receives a single raw collect network packet.
316 return self._sock.recv(self.BUFFER_SIZE)
319 def decode(self, buf=None):
320 """Decodes a given buffer or the next received packet.
324 return decode_network_packet(buf)
327 def interpret(self, iterable=None):
328 """Interprets a sequence
331 iterable = self.decode()
332 if isinstance(iterable, str):
333 iterable = self.decode(iterable)
334 return interpret_opcodes(iterable)