Merge branch 'import/ss/graphite' into ss/graphite
[collectd.git] / contrib / collectd_network.py
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
4 #
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
6 #
7 # Distributed under terms of the GPLv2 license.
8
9 """
10 Collectd network protocol implementation.
11 """
12
13 import socket
14 import struct
15
16 try:
17     from cStringIO import StringIO
18 except ImportError:
19     from StringIO import StringIO
20
21 from datetime import datetime
22 from copy import deepcopy
23
24
25 DEFAULT_PORT = 25826
26 """Default port"""
27
28 DEFAULT_IPv4_GROUP = "239.192.74.66"
29 """Default IPv4 multicast group"""
30
31 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
32 """Default IPv6 multicast group"""
33
34
35
36 # Message kinds
37 TYPE_HOST            = 0x0000
38 TYPE_TIME            = 0x0001
39 TYPE_PLUGIN          = 0x0002
40 TYPE_PLUGIN_INSTANCE = 0x0003
41 TYPE_TYPE            = 0x0004
42 TYPE_TYPE_INSTANCE   = 0x0005
43 TYPE_VALUES          = 0x0006
44 TYPE_INTERVAL        = 0x0007
45
46 # For notifications
47 TYPE_MESSAGE         = 0x0100
48 TYPE_SEVERITY        = 0x0101
49
50 # DS kinds
51 DS_TYPE_COUNTER      = 0
52 DS_TYPE_GAUGE        = 1
53
54
55 header = struct.Struct("!2H")
56 number = struct.Struct("!Q")
57 short  = struct.Struct("!H")
58 double = struct.Struct("<d")
59
60
61 def decode_network_values(ptype, plen, buf):
62     """Decodes a list of DS values in collectd network format
63     """
64     nvalues = short.unpack_from(buf, header.size)[0]
65     off = header.size + short.size + nvalues
66     valskip = double.size
67
68     # Check whether our expected packet size is the reported one
69     assert ((valskip + 1) * nvalues + short.size + header.size) == plen
70     assert double.size == number.size
71
72     result = []
73     for dstype in map(ord, buf[header.size+short.size:off]):
74         if dstype == DS_TYPE_COUNTER:
75             result.append((dstype, number.unpack_from(buf, off)[0]))
76             off += valskip
77         elif dstype == DS_TYPE_GAUGE:
78             result.append((dstype, double.unpack_from(buf, off)[0]))
79             off += valskip
80         else:
81             raise ValueError("DS type %i unsupported" % dstype)
82
83     return result
84
85
86 def decode_network_number(ptype, plen, buf):
87     """Decodes a number (64-bit unsigned) in collectd network format.
88     """
89     return number.unpack_from(buf, header.size)[0]
90
91
92 def decode_network_string(msgtype, plen, buf):
93     """Decodes a floating point number (64-bit) in collectd network format.
94     """
95     return buf[header.size:plen-1]
96
97
98 # Mapping of message types to decoding functions.
99 _decoders = {
100     TYPE_VALUES         : decode_network_values,
101     TYPE_TIME           : decode_network_number,
102     TYPE_INTERVAL       : decode_network_number,
103     TYPE_HOST           : decode_network_string,
104     TYPE_PLUGIN         : decode_network_string,
105     TYPE_PLUGIN_INSTANCE: decode_network_string,
106     TYPE_TYPE           : decode_network_string,
107     TYPE_TYPE_INSTANCE  : decode_network_string,
108     TYPE_MESSAGE        : decode_network_string,
109     TYPE_SEVERITY       : decode_network_number,
110 }
111
112
113 def decode_network_packet(buf):
114     """Decodes a network packet in collectd format.
115     """
116     off = 0
117     blen = len(buf)
118     while off < blen:
119         ptype, plen = header.unpack_from(buf, off)
120
121         if plen > blen - off:
122             raise ValueError("Packet longer than amount of data in buffer")
123
124         if ptype not in _decoders:
125             raise ValueError("Message type %i not recognized" % ptype)
126
127         yield ptype, _decoders[ptype](ptype, plen, buf[off:])
128         off += plen
129
130
131
132
133
134 class Data(object):
135     time = 0
136     host = None
137     plugin = None
138     plugininstance = None
139     type = None
140     typeinstance = None
141
142     def __init__(self, **kw):
143         [setattr(self, k, v) for k, v in kw.iteritems()]
144
145     @property
146     def datetime(self):
147         return datetime.fromtimestamp(self.time)
148
149     @property
150     def source(self):
151         buf = StringIO()
152         if self.host:
153             buf.write(self.host)
154         if self.plugin:
155             buf.write("/")
156             buf.write(self.plugin)
157         if self.plugininstance:
158             buf.write("/")
159             buf.write(self.plugininstance)
160         if self.type:
161             buf.write("/")
162             buf.write(self.type)
163         if self.typeinstance:
164             buf.write("/")
165             buf.write(self.typeinstance)
166         return buf.getvalue()
167
168     def __str__(self):
169         return "[%i] %s" % (self.time, self.source)
170
171
172
173 class Notification(Data):
174     FAILURE  = 1
175     WARNING  = 2
176     OKAY     = 4
177
178     SEVERITY = {
179         FAILURE: "FAILURE",
180         WARNING: "WARNING",
181         OKAY   : "OKAY",
182     }
183
184     __severity = 0
185     message  = ""
186
187     def __set_severity(self, value):
188         if value in (self.FAILURE, self.WARNING, self.OKAY):
189             self.__severity = value
190
191     severity = property(lambda self: self.__severity, __set_severity)
192
193     @property
194     def severitystring(self):
195         return self.SEVERITY.get(self.severity, "UNKNOWN")
196
197     def __str__(self):
198         return "%s [%s] %s" % (
199                 super(Notification, self).__str__(),
200                 self.severitystring,
201                 self.message)
202
203
204
205 class Values(Data, list):
206     def __str__(self):
207         return "%s %s" % (Data.__str__(self), list.__str__(self))
208
209
210
211 def interpret_opcodes(iterable):
212     vl = Values()
213     nt = Notification()
214
215     for kind, data in iterable:
216         if kind == TYPE_TIME:
217             vl.time = nt.time = data
218         elif kind == TYPE_INTERVAL:
219             vl.interval = data
220         elif kind == TYPE_HOST:
221             vl.host = nt.host = data
222         elif kind == TYPE_PLUGIN:
223             vl.plugin = nt.plugin = data
224         elif kind == TYPE_PLUGIN_INSTANCE:
225             vl.plugininstance = nt.plugininstance = data
226         elif kind == TYPE_TYPE:
227             vl.type = nt.type = data
228         elif kind == TYPE_TYPE_INSTANCE:
229             vl.typeinstance = nt.typeinstance = data
230         elif kind == TYPE_SEVERITY:
231             nt.severity = data
232         elif kind == TYPE_MESSAGE:
233             nt.message = data
234             yield deepcopy(nt)
235         elif kind == TYPE_VALUES:
236             vl[:] = data
237             yield deepcopy(vl)
238
239
240
241 class Reader(object):
242     """Network reader for collectd data.
243
244     Listens on the network in a given address, which can be a multicast
245     group address, and handles reading data when it arrives.
246     """
247     addr = None
248     host = None
249     port = DEFAULT_PORT
250
251     BUFFER_SIZE = 1024
252
253
254     def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
255         if host is None:
256             multicast = True
257             host = DEFAULT_IPv4_GROUP
258
259         self.host, self.port = host, port
260         self.ipv6 = ":" in self.host
261
262         family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
263                 None if multicast else self.host, self.port,
264                 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
265                 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
266
267         self._sock = socket.socket(family, socktype, proto)
268         self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
269         self._sock.bind(sockaddr)
270
271         if multicast:
272             if hasattr(socket, "SO_REUSEPORT"):
273                 self._sock.setsockopt(
274                         socket.SOL_SOCKET,
275                         socket.SO_REUSEPORT, 1)
276
277             val = None
278             if family == socket.AF_INET:
279                 assert "." in self.host
280                 val = struct.pack("4sl",
281                         socket.inet_aton(self.host), socket.INADDR_ANY)
282             elif family == socket.AF_INET6:
283                 raise NotImplementedError("IPv6 support not ready yet")
284             else:
285                 raise ValueError("Unsupported network address family")
286
287             self._sock.setsockopt(
288                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
289                     socket.IP_ADD_MEMBERSHIP, val)
290             self._sock.setsockopt(
291                     socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
292                     socket.IP_MULTICAST_LOOP, 0)
293
294
295     def receive(self):
296         """Receives a single raw collect network packet.
297         """
298         return self._sock.recv(self.BUFFER_SIZE)
299
300
301     def decode(self, buf=None):
302         """Decodes a given buffer or the next received packet.
303         """
304         if buf is None:
305             buf = self.receive()
306         return decode_network_packet(buf)
307
308
309     def interpret(self, iterable=None):
310         """Interprets a sequence
311         """
312         if iterable is None:
313             iterable = self.decode()
314         if isinstance(iterable, basestring):
315             iterable = self.decode(iterable)
316         return interpret_opcodes(iterable)
317
318