src/libpopulation.c: Added code to send individuals to peers.
authorFlorian Forster <octo@leeloo.home.verplant.org>
Sat, 12 Jul 2008 09:32:06 +0000 (11:32 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Sat, 12 Jul 2008 09:32:06 +0000 (11:32 +0200)
src/libpopulation.c
src/population.h

index 42d1cb1..c5d3c4e 100644 (file)
@@ -61,6 +61,7 @@
 #include "population.h"
 
 #include <stdlib.h>
+#include <errno.h>
 #include <stdint.h>
 #include <inttypes.h>
 #include <string.h>
 #include <limits.h>
 #include <pthread.h>
 #include <math.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
 
 /*
  * Data types
@@ -92,6 +98,9 @@ struct population_s
   pi_serialize_f serialize;
   pi_unserialize_f unserialize;
 
+  int *peers;
+  size_t peers_num;
+
   individual_t fittest;
 
   individual_t *individuals;
@@ -99,6 +108,88 @@ struct population_s
 };
 
 /*
+ * Private functions
+ */
+static int population_send_to_peer (population_t *p, void *pi) /* {{{ */
+{
+  char buffer[1450];
+  size_t buffer_size;
+  size_t buffer_free;
+  char *buffer_ptr;
+
+  int fd;
+  int i;
+  int status;
+
+  if (p == NULL)
+    return (-1);
+
+  if (pi == NULL)
+    return (-1);
+
+  buffer_size = sizeof (buffer);
+  memset (buffer, 0, buffer_size);
+
+  pthread_mutex_lock (&p->lock);
+
+  if (p->serialize == NULL)
+  {
+    pthread_mutex_unlock (&p->lock);
+    fprintf (stderr, "population_send_to_peer: Cannot send to peer without "
+        "serialization function!\n");
+    return (-1);
+  }
+
+  i = (int) (((double) p->peers_num) * (rand() / (RAND_MAX + 1.0)));
+  fd = p->peers[i];
+
+  buffer_ptr = buffer;
+  buffer_free = sizeof (buffer);
+  status = p->serialize (pi, &buffer_ptr, &buffer_free);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&p->lock);
+    fprintf (stderr, "population_send_to_peer: p->serialize failed "
+        "with status %i.\n", status);
+    return (-1);
+  }
+
+  buffer_size = sizeof (buffer) - buffer_free;
+  if (buffer_size < 1)
+  {
+    pthread_mutex_unlock (&p->lock);
+    fprintf (stderr, "population_send_to_peer: p->serialize didn't put "
+        "anything into the buffer..\n");
+    return (-1);
+  }
+
+  /* write should not block - hopefully */
+  status = send (fd, buffer, buffer_size, MSG_DONTWAIT | MSG_NOSIGNAL);
+  if (status < 0)
+  {
+    pthread_mutex_unlock (&p->lock);
+    status = errno;
+    if (status != ECONNREFUSED)
+    {
+      fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
+          "send(2) returned with error %i.\n", status);
+    }
+    return (-1);
+  }
+  else if (((size_t) status) != buffer_size)
+  {
+    pthread_mutex_unlock (&p->lock);
+    fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
+        "send(2) returned %i (expected %zu).\n",
+        status, buffer_size);
+    return (-1);
+  }
+
+  pthread_mutex_unlock (&p->lock);
+  return (0);
+} /* }}} int population_send_to_peer */
+
+/*
  * Constructor and destructor
  */
 population_t *population_create (pi_rate_f rate, pi_copy_f copy, /* {{{ */
@@ -231,6 +322,86 @@ int population_set_serialization (population_t *p,
   return (0);
 } /* }}} int population_set_serialization */
 
+int population_add_peer (population_t *p, const char *node, /* {{{ */
+    const char *port)
+{
+  struct addrinfo  ai_hints;
+  struct addrinfo *ai_list;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  if (p == NULL)
+    return (-1);
+
+  if (node == NULL)
+    return (-1);
+
+  if (port == NULL)
+    port = POPULATION_DEFAULT_PORT;
+
+  ai_list = NULL;
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_DGRAM;
+  ai_hints.ai_protocol = 0;
+
+  status = getaddrinfo (node, port, &ai_hints, &ai_list);
+  if (status != 0)
+  {
+    fprintf (stderr, "population_add_peer: getaddrinfo (%s) failed: %s\n",
+        node, gai_strerror (status));
+    return (-1);
+  }
+
+  pthread_mutex_lock (&p->lock);
+
+  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    int *temp;
+
+    temp = (int *) realloc (p->peers, sizeof (int) * (p->peers_num + 1));
+    if (temp == NULL)
+    {
+      fprintf (stderr, "population_add_peer: realloc failed.\n");
+      continue;
+    }
+    p->peers = temp;
+
+    p->peers[p->peers_num] = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
+        ai_ptr->ai_protocol);
+    if (p->peers[p->peers_num] < 0)
+      continue;
+
+    status = connect (p->peers[p->peers_num],
+        ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      fprintf (stderr, "population_add_peer: connect(2) failed.\n");
+      close (p->peers[p->peers_num]);
+      continue;
+    }
+
+    status = fcntl (p->peers[p->peers_num], F_SETFL, O_NONBLOCK);
+    if (status != 0)
+    {
+      fprintf (stderr, "population_add_peer: fcntl (F_SETFL, O_NONBLOCK) "
+          "failed. Will use the socket with blocking.\n");
+    }
+
+    p->peers_num++;
+  }
+  pthread_mutex_unlock (&p->lock);
+
+  freeaddrinfo (ai_list);
+
+  return (0);
+} /* }}} int population_add_peer */
+
 void *population_get_random (population_t *p) /* {{{ */
 {
   void *ret = NULL;
@@ -305,6 +476,19 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */
 
   pthread_mutex_lock (&p->lock);
 
+  if (p->peers_num > 0)
+  {
+    double prob;
+
+    prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0);
+    if (prob < 0.01)
+    {
+      pthread_mutex_unlock (&p->lock);
+      population_send_to_peer (p, pi);
+      pthread_mutex_lock (&p->lock);
+    }
+  }
+
   /* Keep track of the all time best. */
   if ((p->fittest.ptr == NULL) || (p->fittest.rating > pi_rating))
   {
index a512b40..64b3d8f 100644 (file)
@@ -34,6 +34,9 @@ int population_set_size (population_t *p, size_t population_size);
 int population_set_serialization (population_t *p,
     pi_serialize_f serialize, pi_unserialize_f unserialize);
 
+#define POPULATION_DEFAULT_PORT "46835"
+int population_add_peer (population_t *p, const char *node, const char *port);
+
 /*
  * Methods
  */