2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
7 # Distributed under terms of the GPLv2 license or newer.
9 # Frank Marien (frank@apsu.be) 6 Sep 2012
10 # - quick fixes for 5.1 binary protocol
11 # - updated to python 3
12 # - fixed for larger packet sizes (possible on lo interface)
13 # - fixed comment typo (decode_network_string decodes a string)
16 Collectd network protocol implementation.
19 import socket,struct,sys
21 from io import StringIO
23 from cStringIO import StringIO
25 from datetime import datetime
26 from copy import deepcopy
32 DEFAULT_IPv4_GROUP = "239.192.74.66"
33 """Default IPv4 multicast group"""
35 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
36 """Default IPv6 multicast group"""
38 HR_TIME_DIV = (2.0**30)
45 TYPE_PLUGIN_INSTANCE = 0x0003
47 TYPE_TYPE_INSTANCE = 0x0005
49 TYPE_INTERVAL = 0x0007
50 TYPE_INTERVAL_HR = 0x0009
54 TYPE_SEVERITY = 0x0101
62 header = struct.Struct("!2H")
63 number = struct.Struct("!Q")
64 short = struct.Struct("!H")
65 double = struct.Struct("<d")
67 def decode_network_values(ptype, plen, buf):
68 """Decodes a list of DS values in collectd network format
70 nvalues = short.unpack_from(buf, header.size)[0]
71 off = header.size + short.size + nvalues
74 # Check whether our expected packet size is the reported one
75 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
76 assert double.size == number.size
79 for dstype in buf[header.size+short.size:off]:
80 if dstype == DS_TYPE_COUNTER:
81 result.append((dstype, number.unpack_from(buf, off)[0]))
83 elif dstype == DS_TYPE_GAUGE:
84 result.append((dstype, double.unpack_from(buf, off)[0]))
86 elif dstype == DS_TYPE_DERIVE:
87 result.append((dstype, number.unpack_from(buf, off)[0]))
89 elif dstype == DS_TYPE_ABSOLUTE:
90 result.append((dstype, number.unpack_from(buf, off)[0]))
93 raise ValueError("DS type %i unsupported" % dstype)
98 def decode_network_number(ptype, plen, buf):
99 """Decodes a number (64-bit unsigned) from collectd network format.
101 return number.unpack_from(buf, header.size)[0]
104 def decode_network_string(msgtype, plen, buf):
105 """Decodes a string from collectd network format.
107 return buf[header.size:plen-1]
110 # Mapping of message types to decoding functions.
112 TYPE_VALUES : decode_network_values,
113 TYPE_TIME : decode_network_number,
114 TYPE_TIME_HR : decode_network_number,
115 TYPE_INTERVAL : decode_network_number,
116 TYPE_INTERVAL_HR : decode_network_number,
117 TYPE_HOST : decode_network_string,
118 TYPE_PLUGIN : decode_network_string,
119 TYPE_PLUGIN_INSTANCE: decode_network_string,
120 TYPE_TYPE : decode_network_string,
121 TYPE_TYPE_INSTANCE : decode_network_string,
122 TYPE_MESSAGE : decode_network_string,
123 TYPE_SEVERITY : decode_network_number,
127 def decode_network_packet(buf):
128 """Decodes a network packet in collectd format.
134 ptype, plen = header.unpack_from(buf, off)
136 if plen > blen - off:
137 raise ValueError("Packet longer than amount of data in buffer")
139 if ptype not in _decoders:
140 raise ValueError("Message type %i not recognized" % ptype)
142 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
150 plugininstance = None
154 def __init__(self, **kw):
155 [setattr(self, k, v) for k, v in kw.items()]
159 return datetime.fromtimestamp(self.time)
165 buf.write(str(self.host))
168 buf.write(str(self.plugin))
169 if self.plugininstance:
171 buf.write(str(self.plugininstance))
174 buf.write(str(self.type))
175 if self.typeinstance:
177 buf.write(str(self.typeinstance))
178 return buf.getvalue()
181 return "[%i] %s" % (self.time, self.source)
185 class Notification(Data):
199 def __set_severity(self, value):
200 if value in (self.FAILURE, self.WARNING, self.OKAY):
201 self.__severity = value
203 severity = property(lambda self: self.__severity, __set_severity)
206 def severitystring(self):
207 return self.SEVERITY.get(self.severity, "UNKNOWN")
210 return "%s [%s] %s" % (
211 super(Notification, self).__str__(),
217 class Values(Data, list):
219 return "%s %s" % (Data.__str__(self), list.__str__(self))
223 def interpret_opcodes(iterable):
227 for kind, data in iterable:
228 if kind == TYPE_TIME:
229 vl.time = nt.time = data
230 elif kind == TYPE_TIME_HR:
231 vl.time = nt.time = data / HR_TIME_DIV
232 elif kind == TYPE_INTERVAL:
234 elif kind == TYPE_INTERVAL_HR:
235 vl.interval = data / HR_TIME_DIV
236 elif kind == TYPE_HOST:
237 vl.host = nt.host = data
238 elif kind == TYPE_PLUGIN:
239 vl.plugin = nt.plugin = data
240 elif kind == TYPE_PLUGIN_INSTANCE:
241 vl.plugininstance = nt.plugininstance = data
242 elif kind == TYPE_TYPE:
243 vl.type = nt.type = data
244 elif kind == TYPE_TYPE_INSTANCE:
245 vl.typeinstance = nt.typeinstance = data
246 elif kind == TYPE_SEVERITY:
248 elif kind == TYPE_MESSAGE:
251 elif kind == TYPE_VALUES:
257 class Reader(object):
258 """Network reader for collectd data.
260 Listens on the network in a given address, which can be a multicast
261 group address, and handles reading data when it arrives.
270 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
273 host = DEFAULT_IPv4_GROUP
275 self.host, self.port = host, port
276 self.ipv6 = ":" in self.host
278 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
279 None if multicast else self.host, self.port,
280 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
281 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
283 self._sock = socket.socket(family, socktype, proto)
284 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
285 self._sock.bind(sockaddr)
288 if hasattr(socket, "SO_REUSEPORT"):
289 self._sock.setsockopt(
291 socket.SO_REUSEPORT, 1)
294 if family == socket.AF_INET:
295 assert "." in self.host
296 val = struct.pack("4sl",
297 socket.inet_aton(self.host), socket.INADDR_ANY)
298 elif family == socket.AF_INET6:
299 raise NotImplementedError("IPv6 support not ready yet")
301 raise ValueError("Unsupported network address family")
303 self._sock.setsockopt(
304 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
305 socket.IP_ADD_MEMBERSHIP, val)
306 self._sock.setsockopt(
307 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
308 socket.IP_MULTICAST_LOOP, 0)
312 """Receives a single raw collect network packet.
314 return self._sock.recv(self.BUFFER_SIZE)
317 def decode(self, buf=None):
318 """Decodes a given buffer or the next received packet.
322 return decode_network_packet(buf)
325 def interpret(self, iterable=None):
326 """Interprets a sequence
329 iterable = self.decode()
330 if isinstance(iterable, str):
331 iterable = self.decode(iterable)
332 return interpret_opcodes(iterable)