src/libpopulation.c: Add the possibility to receive individuals from peers.
[libpopulation.git] / src / libpopulation.c
1 /**
2  * libevolve - src/evolve.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #if 0
41 #ifdef _XOPEN_SOURCE
42 # undef _XOPEN_SOURCE
43 #endif 
44 #define _XOPEN_SOURCE 500
45 #endif
46   
47 #ifndef _REENTRANT
48 # define _REENTRANT
49 #endif
50
51 #ifndef _THREAD_SAFE
52 # define _THREAD_SAFE
53 #endif 
54
55 #ifdef _GNU_SOURCE
56 # undef _GNU_SOURCE
57 #endif
58 /* }}} */
59
60 #include "config.h"
61 #include "population.h"
62
63 #include <stdlib.h>
64 #include <assert.h>
65 #include <errno.h>
66 #include <stdint.h>
67 #include <inttypes.h>
68 #include <string.h>
69 #include <stdio.h>
70 #include <limits.h>
71 #include <pthread.h>
72 #include <math.h>
73 #include <unistd.h>
74 #include <fcntl.h>
75 #include <sys/types.h>
76 #include <sys/socket.h>
77 #include <netdb.h>
78 #include <signal.h>
79
80 #define NETWORK_BUFFER_SIZE 1450
81
82 /*
83  * Data types
84  */
85 struct individual_s
86 {
87   void *ptr;
88   int rating;
89 };
90 typedef struct individual_s individual_t;
91
92 struct population_s
93 {
94   pthread_mutex_t lock;
95
96   /* Callback functions */
97   pi_rate_f rate;
98   pi_free_f free;
99   pi_copy_f copy;
100
101   /* Optional serialization */
102   pi_serialize_f serialize;
103   pi_unserialize_f unserialize;
104
105   int *peers;
106   size_t peers_num;
107
108 #define POPULATION_FLAG_LISTEN   0x01
109 #define POPULATION_FLAG_SHUTDOWN 0x02
110   int flags;
111   pthread_t listen_thread_id;
112
113   individual_t fittest;
114
115   individual_t *individuals;
116   size_t individuals_num;
117 };
118
119 struct listen_thread_args_s
120 {
121   population_t *population;
122   char *node;
123   char *service;
124 };
125 typedef struct listen_thread_args_s listen_thread_args_t;
126
127 /*
128  * Private functions
129  */
130 static char *population_strdup (const char *src)
131 {
132   size_t s;
133   char *ret;
134
135   if (src == NULL)
136     return (NULL);
137
138   s = strlen (src) + 1;
139   ret = (char *) malloc (s);
140   if (ret == NULL)
141     return (NULL);
142
143   memcpy (ret, src, s);
144   return (ret);
145 } /* char *population_strdup */
146
147 static int population_send_to_peer (population_t *p, void *pi) /* {{{ */
148 {
149   char buffer[NETWORK_BUFFER_SIZE];
150   size_t buffer_size;
151   size_t buffer_free;
152   char *buffer_ptr;
153
154   int fd;
155   int i;
156   int status;
157
158   if (p == NULL)
159     return (-1);
160
161   if (pi == NULL)
162     return (-1);
163
164   buffer_size = sizeof (buffer);
165   memset (buffer, 0, buffer_size);
166
167   pthread_mutex_lock (&p->lock);
168
169   if (p->serialize == NULL)
170   {
171     pthread_mutex_unlock (&p->lock);
172     fprintf (stderr, "population_send_to_peer: Cannot send to peer without "
173         "serialization function!\n");
174     return (-1);
175   }
176
177   i = (int) (((double) p->peers_num) * (rand() / (RAND_MAX + 1.0)));
178   fd = p->peers[i];
179
180   buffer_ptr = buffer;
181   buffer_free = sizeof (buffer);
182   status = p->serialize (pi, &buffer_ptr, &buffer_free);
183   if (status != 0)
184   {
185     pthread_mutex_unlock (&p->lock);
186     fprintf (stderr, "population_send_to_peer: p->serialize failed "
187         "with status %i.\n", status);
188     return (-1);
189   }
190
191   buffer_size = sizeof (buffer) - buffer_free;
192   if (buffer_size < 1)
193   {
194     pthread_mutex_unlock (&p->lock);
195     fprintf (stderr, "population_send_to_peer: p->serialize didn't put "
196         "anything into the buffer..\n");
197     return (-1);
198   }
199
200   /* write should not block - hopefully */
201   status = send (fd, buffer, buffer_size, MSG_DONTWAIT | MSG_NOSIGNAL);
202   if (status < 0)
203   {
204     pthread_mutex_unlock (&p->lock);
205     status = errno;
206     if (status != ECONNREFUSED)
207     {
208       fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
209           "send(2) returned with error %i.\n", status);
210     }
211     return (-1);
212   }
213   else if (((size_t) status) != buffer_size)
214   {
215     pthread_mutex_unlock (&p->lock);
216     fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
217         "send(2) returned %i (expected %zu).\n",
218         status, buffer_size);
219     return (-1);
220   }
221
222   pthread_mutex_unlock (&p->lock);
223
224 #if 0
225   printf ("population_send_to_peer: Sent individual with rating %i to peer #%i.\n",
226       p->rate (pi), i);
227 #endif
228
229   return (0);
230 } /* }}} int population_send_to_peer */
231
232 static void *listen_thread (void *data)
233 {
234   listen_thread_args_t *args;
235   population_t *p;
236   char *node;
237   char *service;
238   int status;
239   int fd;
240
241   struct addrinfo  ai_hints;
242   struct addrinfo *ai_list;
243   struct addrinfo *ai_ptr;
244
245   args    = (listen_thread_args_t *) data;
246   p       = args->population;
247   node    = args->node;
248   service = args->service;
249
250   ai_list = NULL;
251
252   memset (&ai_hints, 0, sizeof (ai_hints));
253   ai_hints.ai_flags = AI_PASSIVE;
254 #ifdef AI_ADDRCONFIG
255   ai_hints.ai_flags |= AI_ADDRCONFIG;
256 #endif
257   ai_hints.ai_family = AF_UNSPEC;
258   ai_hints.ai_socktype = SOCK_DGRAM;
259   ai_hints.ai_protocol = 0;
260
261   status = getaddrinfo (node, 
262       (service != NULL) ? service : POPULATION_DEFAULT_PORT,
263       &ai_hints, &ai_list);
264   if (status != 0)
265   {
266     fprintf (stderr, "listen_thread: getaddrinfo (%s) failed: %s\n",
267         (node != NULL) ? node : "NULL", gai_strerror (status));
268     return ((void *) -1);
269   }
270
271   fd = -1;
272   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
273   {
274     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
275     if (fd < 0)
276       continue;
277
278     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
279     if (status != 0)
280     {
281       close (fd);
282       fd = -1;
283       continue;
284     }
285
286     break;
287   }
288
289   freeaddrinfo (ai_list);
290
291   if (fd < 0)
292   {
293     fprintf (stderr, "listen_thread: No socket could be opened.\n");
294     return ((void *) -1);
295   }
296
297   pthread_mutex_lock (&p->lock);
298   p->flags |= POPULATION_FLAG_LISTEN;
299   while ((p->flags & POPULATION_FLAG_SHUTDOWN) == 0)
300   {
301     /* Allocate one extra byte to null-terminate the data. */
302     char buffer[NETWORK_BUFFER_SIZE + 1];
303     void *pi;
304
305     pthread_mutex_unlock (&p->lock);
306
307     status = recvfrom (fd, buffer, sizeof (buffer) - 1, /* flags = */ 0,
308         /* from = */ NULL, /* fromlen = */ NULL);
309     if (status < 1)
310     {
311       fprintf (stderr, "listen_thread: recvfrom(2) failed: status = %i; "
312           "errno = %i;\n", status, errno);
313       pthread_mutex_lock (&p->lock);
314       continue;
315     }
316     assert (status < sizeof (buffer));
317     buffer[sizeof (buffer) - 1] = 0;
318
319     pi = p->unserialize (buffer, (size_t) status);
320     if (pi == NULL)
321     {
322       fprintf (stderr, "listen_thread: p->unserialize returned NULL.\n");
323       pthread_mutex_lock (&p->lock);
324       continue;
325     }
326
327 #if 0
328     printf ("listen_thread: Received individual with rating %i.\n",
329         p->rate (pi));
330 #endif
331
332     population_insert (p, pi);
333
334     p->free (pi);
335
336     pthread_mutex_lock (&p->lock);
337   } /* while (42) */
338
339   close (fd);
340   fd = -1;
341
342   /* clear the listen flag */
343   p->flags &= ~(POPULATION_FLAG_LISTEN);
344
345   pthread_mutex_unlock (&p->lock);
346   return ((void *) 0);
347 } /* void *listen_thread */
348
349 /*
350  * Constructor and destructor
351  */
352 population_t *population_create (pi_rate_f rate, pi_copy_f copy, /* {{{ */
353     pi_free_f f)
354 {
355   population_t *p;
356   size_t i;
357
358   p = (population_t *) malloc (sizeof (population_t));
359   if (p == NULL)
360     return (NULL);
361
362   memset (p, 0, sizeof (*p));
363   pthread_mutex_init (&p->lock, /* attr = */ NULL);
364
365   p->rate = rate;
366   p->copy = copy;
367   p->free = f;
368
369   p->fittest.ptr = NULL;
370   p->fittest.rating = -1;
371
372   p->individuals = malloc (32 * sizeof (p->individuals[0]));
373   if (p->individuals == NULL)
374   {
375     free (p);
376     return (NULL);
377   }
378   memset (p->individuals, 0, 32 * sizeof (p->individuals[0]));
379   p->individuals_num = 32;
380
381   for (i = 0; i < p->individuals_num; i++)
382   {
383     p->individuals[i].ptr = NULL;
384     p->individuals[i].rating = -1;
385   }
386
387   return (p);
388 } /* }}} population_t *population_create */
389
390 void population_destroy (population_t *p) /* {{{ */
391 {
392   if (p == NULL)
393     return;
394
395   pthread_mutex_lock (&p->lock);
396   p->flags |= POPULATION_FLAG_SHUTDOWN;
397   if ((p->flags & POPULATION_FLAG_LISTEN) != 0)
398   {
399     pthread_kill (p->listen_thread_id, SIGTERM);
400     pthread_mutex_unlock (&p->lock);
401     pthread_join (p->listen_thread_id, /* return = */ NULL);
402     pthread_mutex_lock (&p->lock);
403   }
404
405   if (p->fittest.ptr != NULL)
406     p->free (p->fittest.ptr);
407   p->fittest.ptr = NULL;
408   p->fittest.rating = -1;
409
410   if (p->individuals_num > 0)
411   {
412     size_t i;
413
414     for (i = 0; i < p->individuals_num; i++)
415     {
416       if (p->individuals[i].ptr != NULL)
417         p->free (p->individuals[i].ptr);
418       p->individuals[i].ptr = NULL;
419       p->individuals[i].rating = -1;
420     }
421
422     free (p->individuals);
423     p->individuals = NULL;
424     p->individuals_num = 0;
425   }
426
427   memset (p, 0, sizeof (*p));
428   free (p);
429 } /* }}} void population_destroy */
430
431 int population_set_size (population_t *p, /* {{{ */
432     size_t population_size)
433 {
434   size_t i;
435   individual_t *temp;
436
437   if (p == NULL)
438     return (-1);
439
440   pthread_mutex_lock (&p->lock);
441
442   if (p->individuals_num == population_size)
443   {
444     pthread_mutex_unlock (&p->lock);
445     return (0);
446   }
447
448   for (i = population_size; i < p->individuals_num; i++)
449   {
450     p->free (p->individuals[i].ptr);
451     p->individuals[i].ptr = NULL;
452     p->individuals[i].rating = -1;
453   }
454
455   temp = (individual_t *) realloc (p->individuals,
456       population_size * sizeof (p->individuals[0]));
457   if (temp == NULL)
458   {
459     pthread_mutex_unlock (&p->lock);
460     return (-1);
461   }
462   p->individuals = temp;
463
464   for (i = p->individuals_num; i < population_size; i++)
465   {
466     p->individuals[i].ptr = NULL;
467     p->individuals[i].rating = -1;
468   }
469
470   p->individuals_num = population_size;
471
472   pthread_mutex_unlock (&p->lock);
473
474   return (0);
475 } /* }}} */
476
477 int population_set_serialization (population_t *p, /* {{{ */
478     pi_serialize_f serialize, pi_unserialize_f unserialize)
479 {
480   if (p == NULL)
481     return (-1);
482
483   pthread_mutex_lock (&p->lock);
484
485   p->serialize = serialize;
486   p->unserialize = unserialize;
487
488   pthread_mutex_unlock (&p->lock);
489   return (0);
490 } /* }}} int population_set_serialization */
491
492 int population_add_peer (population_t *p, const char *node, /* {{{ */
493     const char *port)
494 {
495   struct addrinfo  ai_hints;
496   struct addrinfo *ai_list;
497   struct addrinfo *ai_ptr;
498   int status;
499
500   if (p == NULL)
501     return (-1);
502
503   if (node == NULL)
504     return (-1);
505
506   if (port == NULL)
507     port = POPULATION_DEFAULT_PORT;
508
509   ai_list = NULL;
510
511   memset (&ai_hints, 0, sizeof (ai_hints));
512   ai_hints.ai_flags = 0;
513 #ifdef AI_ADDRCONFIG
514   ai_hints.ai_flags |= AI_ADDRCONFIG;
515 #endif
516   ai_hints.ai_family = AF_UNSPEC;
517   ai_hints.ai_socktype = SOCK_DGRAM;
518   ai_hints.ai_protocol = 0;
519
520   status = getaddrinfo (node, port, &ai_hints, &ai_list);
521   if (status != 0)
522   {
523     fprintf (stderr, "population_add_peer: getaddrinfo (%s) failed: %s\n",
524         node, gai_strerror (status));
525     return (-1);
526   }
527
528   pthread_mutex_lock (&p->lock);
529
530   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
531   {
532     int *temp;
533
534     temp = (int *) realloc (p->peers, sizeof (int) * (p->peers_num + 1));
535     if (temp == NULL)
536     {
537       fprintf (stderr, "population_add_peer: realloc failed.\n");
538       continue;
539     }
540     p->peers = temp;
541
542     p->peers[p->peers_num] = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
543         ai_ptr->ai_protocol);
544     if (p->peers[p->peers_num] < 0)
545       continue;
546
547     status = connect (p->peers[p->peers_num],
548         ai_ptr->ai_addr, ai_ptr->ai_addrlen);
549     if (status != 0)
550     {
551       fprintf (stderr, "population_add_peer: connect(2) failed.\n");
552       close (p->peers[p->peers_num]);
553       continue;
554     }
555
556     status = fcntl (p->peers[p->peers_num], F_SETFL, O_NONBLOCK);
557     if (status != 0)
558     {
559       fprintf (stderr, "population_add_peer: fcntl (F_SETFL, O_NONBLOCK) "
560           "failed. Will use the socket with blocking.\n");
561     }
562
563     p->peers_num++;
564
565     printf ("population_add_peer: Successfully added peer #%i.\n",
566         p->peers_num - 1);
567   }
568   pthread_mutex_unlock (&p->lock);
569
570   freeaddrinfo (ai_list);
571
572   return (0);
573 } /* }}} int population_add_peer */
574
575 int population_start_listen_thread (population_t *p, /* {{{ */
576     const char *node, const char *service)
577 {
578   listen_thread_args_t *args;
579
580   pthread_mutex_lock (&p->lock);
581   if ((p->flags & POPULATION_FLAG_LISTEN) != 0)
582   {
583     pthread_mutex_unlock (&p->lock);
584     fprintf (stderr, "population_start_listen_thread: "
585         "Listen thread already started.\n");
586     return (-EALREADY);
587   }
588
589   args = (listen_thread_args_t *) malloc (sizeof (listen_thread_args_t));
590   if (args == NULL)
591   {
592     fprintf (stderr, "population_start_listen_thread: malloc failed.\n");
593     return (-1);
594   }
595
596   memset (args, 0, sizeof (listen_thread_args_t));
597   args->population = p;
598   args->node = population_strdup (node);
599   args->service = population_strdup (service);
600
601   pthread_create (&p->listen_thread_id, /* attr = */ NULL,
602       listen_thread, (void *) args);
603
604   pthread_mutex_unlock (&p->lock);
605   return (0);
606 } /* }}} int population_start_listen_thread */
607
608 void *population_get_random (population_t *p) /* {{{ */
609 {
610   void *ret = NULL;
611   size_t i;
612
613   pthread_mutex_lock (&p->lock);
614
615   if (p->individuals_num < 1)
616   {
617     pthread_mutex_unlock (&p->lock);
618     return (NULL);
619   }
620
621   while (ret == NULL)
622   {
623     i = (size_t) (((double) p->individuals_num)
624         * (rand() / (RAND_MAX + 1.0)));
625     if (p->individuals[i].ptr == NULL)
626       continue;
627
628     ret = p->copy (p->individuals[i].ptr);
629   }
630
631   pthread_mutex_unlock (&p->lock);
632
633   return (ret);
634 } /* }}} void *population_pick_random */
635
636 void *population_get_fittest (population_t *p) /* {{{ */
637 {
638   void *ret = NULL;
639
640   if (p == NULL)
641     return (NULL);
642
643   pthread_mutex_lock (&p->lock);
644
645   if (p->fittest.ptr == NULL)
646   {
647     pthread_mutex_unlock (&p->lock);
648     return (NULL);
649   }
650
651   ret = p->copy (p->fittest.ptr);
652
653   pthread_mutex_unlock (&p->lock);
654
655   return (ret);
656 } /* }}} void *population_get_fittest */
657
658 int population_insert (population_t *p, void *pi_orig) /* {{{ */
659 {
660   void *pi;
661   int pi_rating;
662   int num_tries;
663   int i;
664
665   if (p == NULL)
666     return (-1);
667
668   if (pi_orig == NULL)
669     return (-1);
670
671   pi = p->copy (pi_orig);
672   if (pi == NULL)
673   {
674     fprintf (stderr, "population_insert: p->copy failed.\n");
675     return (-1);
676   }
677
678   pi_rating = p->rate (pi);
679
680   pthread_mutex_lock (&p->lock);
681
682   /* Keep track of the all time best. */
683   if ((p->fittest.ptr == NULL) || (p->fittest.rating >= pi_rating))
684   {
685     void *temp;
686
687     temp = p->copy (pi);
688     if (temp != NULL)
689     {
690       if (p->fittest.ptr != NULL)
691         p->free (p->fittest.ptr);
692       p->fittest.ptr = temp;
693       p->fittest.rating = pi_rating;
694     }
695   }
696
697   if (p->individuals_num <= 0)
698   {
699     pthread_mutex_unlock (&p->lock);
700     p->free (pi);
701     return (-1);
702   }
703
704   do
705   {
706     size_t j;
707
708     int chance_j;
709     int chance_pi;
710     int chance;
711
712     j = (size_t) (((double) p->individuals_num) * (rand() / (RAND_MAX + 1.0)));
713
714     if (p->individuals[j].ptr == NULL)
715     {
716       p->individuals[j].ptr = pi;
717       p->individuals[j].rating = pi_rating;
718       pi = NULL;
719       break;
720     }
721
722     /* large distance from fittest => high probability of losing. */
723     chance_j = 1 + p->individuals[j].rating - p->fittest.rating;
724     chance_pi = 1 + pi_rating - p->fittest.rating;
725
726     chance_j = chance_j * chance_j;
727     chance_pi = chance_pi * chance_pi;
728
729     chance = (int) (((double) (chance_j + chance_pi))
730         * (rand() / (RAND_MAX + 1.0)));
731     if (chance < chance_j) /* j looses ;) */
732     {
733       void *temp0;
734       int temp1;
735
736       temp0 = p->individuals[j].ptr;
737       p->individuals[j].ptr = pi;
738       pi = temp0;
739
740       temp1 = p->individuals[j].rating;
741       p->individuals[j].rating = pi_rating;
742       pi_rating = temp1;
743     }
744   } while (0);
745
746   pthread_mutex_unlock (&p->lock);
747
748   if (pi != NULL)
749   {
750     p->free (pi);
751     pi = NULL;
752   }
753
754   while (p->peers_num > 0)
755   {
756     double prob;
757     size_t j;
758     void *pi;
759
760     prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0);
761     if (prob < 0.999)
762       break;
763
764     pi = population_get_random (p);
765     if (pi == NULL)
766     {
767       fprintf (stderr, "population_insert: population_get_random failed.\n");
768       break;
769     }
770
771     population_send_to_peer (p, pi);
772     p->free (pi);
773
774     break;
775   }
776
777   return (0);
778 } /* }}} int population_insert */
779
780 /* vim: set sw=2 sts=2 et fdm=marker : */
781