+struct listen_thread_args_s
+{
+ population_t *population;
+ char *node;
+ char *service;
+};
+typedef struct listen_thread_args_s listen_thread_args_t;
+
+/*
+ * Private functions
+ */
+static char *population_strdup (const char *src)
+{
+ size_t s;
+ char *ret;
+
+ if (src == NULL)
+ return (NULL);
+
+ s = strlen (src) + 1;
+ ret = (char *) malloc (s);
+ if (ret == NULL)
+ return (NULL);
+
+ memcpy (ret, src, s);
+ return (ret);
+} /* char *population_strdup */
+
+static int population_send_to_peer (population_t *p, void *pi) /* {{{ */
+{
+ char buffer[NETWORK_BUFFER_SIZE];
+ size_t buffer_size;
+ size_t buffer_free;
+ char *buffer_ptr;
+
+ int fd;
+ int i;
+ int status;
+
+ if (p == NULL)
+ return (-1);
+
+ if (pi == NULL)
+ return (-1);
+
+ buffer_size = sizeof (buffer);
+ memset (buffer, 0, buffer_size);
+
+ pthread_mutex_lock (&p->lock);
+
+ if (p->serialize == NULL)
+ {
+ pthread_mutex_unlock (&p->lock);
+ fprintf (stderr, "population_send_to_peer: Cannot send to peer without "
+ "serialization function!\n");
+ return (-1);
+ }
+
+ i = (int) (((double) p->peers_num) * (rand() / (RAND_MAX + 1.0)));
+ fd = p->peers[i];
+
+ buffer_ptr = buffer;
+ buffer_free = sizeof (buffer);
+ status = p->serialize (pi, &buffer_ptr, &buffer_free);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&p->lock);
+ fprintf (stderr, "population_send_to_peer: p->serialize failed "
+ "with status %i.\n", status);
+ return (-1);
+ }
+
+ buffer_size = sizeof (buffer) - buffer_free;
+ if (buffer_size < 1)
+ {
+ pthread_mutex_unlock (&p->lock);
+ fprintf (stderr, "population_send_to_peer: p->serialize didn't put "
+ "anything into the buffer..\n");
+ return (-1);
+ }
+
+ /* write should not block - hopefully */
+ status = send (fd, buffer, buffer_size, MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (status < 0)
+ {
+ pthread_mutex_unlock (&p->lock);
+ status = errno;
+ if (status != ECONNREFUSED)
+ {
+ fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
+ "send(2) returned with error %i.\n", status);
+ }
+ return (-1);
+ }
+ else if (((size_t) status) != buffer_size)
+ {
+ pthread_mutex_unlock (&p->lock);
+ fprintf (stderr, "population_send_to_peer: Writing to socket failed: "
+ "send(2) returned %i (expected %zu).\n",
+ status, buffer_size);
+ return (-1);
+ }
+
+ pthread_mutex_unlock (&p->lock);
+
+#if 0
+ printf ("population_send_to_peer: Sent individual with rating %i to peer #%i.\n",
+ p->rate (pi), i);
+#endif
+
+ return (0);
+} /* }}} int population_send_to_peer */
+
+static void *listen_thread (void *data)
+{
+ listen_thread_args_t *args;
+ population_t *p;
+ char *node;
+ char *service;
+ int status;
+ int fd;
+
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list;
+ struct addrinfo *ai_ptr;
+
+ args = (listen_thread_args_t *) data;
+ p = args->population;
+ node = args->node;
+ service = args->service;
+
+ ai_list = NULL;
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+ ai_hints.ai_flags = AI_PASSIVE;
+#ifdef AI_ADDRCONFIG
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_DGRAM;
+ ai_hints.ai_protocol = 0;
+
+ status = getaddrinfo (node,
+ (service != NULL) ? service : POPULATION_DEFAULT_PORT,
+ &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ fprintf (stderr, "listen_thread: getaddrinfo (%s) failed: %s\n",
+ (node != NULL) ? node : "NULL", gai_strerror (status));
+ return ((void *) -1);
+ }
+
+ fd = -1;
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+ if (fd < 0)
+ continue;
+
+ status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ if (status != 0)
+ {
+ close (fd);
+ fd = -1;
+ continue;
+ }
+
+ break;
+ }
+
+ freeaddrinfo (ai_list);
+
+ if (fd < 0)
+ {
+ fprintf (stderr, "listen_thread: No socket could be opened.\n");
+ return ((void *) -1);
+ }
+
+ pthread_mutex_lock (&p->lock);
+ p->flags |= POPULATION_FLAG_LISTEN;
+ while ((p->flags & POPULATION_FLAG_SHUTDOWN) == 0)
+ {
+ /* Allocate one extra byte to null-terminate the data. */
+ char buffer[NETWORK_BUFFER_SIZE + 1];
+ void *pi;
+
+ pthread_mutex_unlock (&p->lock);
+
+ status = recvfrom (fd, buffer, sizeof (buffer) - 1, /* flags = */ 0,
+ /* from = */ NULL, /* fromlen = */ NULL);
+ if (status < 1)
+ {
+ fprintf (stderr, "listen_thread: recvfrom(2) failed: status = %i; "
+ "errno = %i;\n", status, errno);
+ pthread_mutex_lock (&p->lock);
+ continue;
+ }
+ assert (status < sizeof (buffer));
+ buffer[sizeof (buffer) - 1] = 0;
+
+ pi = p->unserialize (buffer, (size_t) status);
+ if (pi == NULL)
+ {
+ fprintf (stderr, "listen_thread: p->unserialize returned NULL.\n");
+ pthread_mutex_lock (&p->lock);
+ continue;
+ }
+
+#if 0
+ printf ("listen_thread: Received individual with rating %i.\n",
+ p->rate (pi));
+#endif
+
+ population_insert (p, pi);
+
+ p->free (pi);
+
+ pthread_mutex_lock (&p->lock);
+ } /* while (42) */
+
+ close (fd);
+ fd = -1;
+
+ /* clear the listen flag */
+ p->flags &= ~(POPULATION_FLAG_LISTEN);
+
+ pthread_mutex_unlock (&p->lock);
+ return ((void *) 0);
+} /* void *listen_thread */
+