X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Flibpopulation.c;h=c27fd9bf7adf0306296605b4f1fa9e8fc27ec365;hb=HEAD;hp=f5379a069d25613d786cba145c7c223005231791;hpb=fb5eeea19c692eff4feb261a63f23e115a196d2b;p=libpopulation.git diff --git a/src/libpopulation.c b/src/libpopulation.c index f5379a0..c27fd9b 100644 --- a/src/libpopulation.c +++ b/src/libpopulation.c @@ -1,6 +1,6 @@ /** - * libevolve - src/evolve.c - * Copyright (C) 2008 Florian octo Forster + * libpopulation - src/evolve.c + * Copyright (C) 2008,2009 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -61,6 +61,8 @@ #include "population.h" #include +#include +#include #include #include #include @@ -68,6 +70,14 @@ #include #include #include +#include +#include +#include +#include +#include +#include + +#define NETWORK_BUFFER_SIZE 1450 /* * Data types @@ -88,12 +98,255 @@ struct population_s pi_free_f free; pi_copy_f copy; + /* Optional serialization */ + pi_serialize_f serialize; + pi_unserialize_f unserialize; + + int *peers; + size_t peers_num; + +#define POPULATION_FLAG_LISTEN 0x01 +#define POPULATION_FLAG_SHUTDOWN 0x02 +#define POPULATION_FLAG_EXPLORE 0x10 + int flags; + pthread_t listen_thread_id; + individual_t fittest; individual_t *individuals; size_t individuals_num; }; +struct listen_thread_args_s +{ + population_t *population; + char *node; + char *service; +}; +typedef struct listen_thread_args_s listen_thread_args_t; + +/* + * Private functions + */ +static char *population_strdup (const char *src) +{ + size_t s; + char *ret; + + if (src == NULL) + return (NULL); + + s = strlen (src) + 1; + ret = (char *) malloc (s); + if (ret == NULL) + return (NULL); + + memcpy (ret, src, s); + return (ret); +} /* char *population_strdup */ + +static int population_send_to_peer (population_t *p, void *pi) /* {{{ */ +{ + char buffer[NETWORK_BUFFER_SIZE]; + size_t buffer_size; + size_t buffer_free; + char *buffer_ptr; + + int fd; + int i; + int status; + + if (p == NULL) + return (-1); + + if (pi == NULL) + return (-1); + + buffer_size = sizeof (buffer); + memset (buffer, 0, buffer_size); + + pthread_mutex_lock (&p->lock); + + if (p->serialize == NULL) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: Cannot send to peer without " + "serialization function!\n"); + return (-1); + } + + i = (int) (((double) p->peers_num) * (rand() / (RAND_MAX + 1.0))); + fd = p->peers[i]; + + buffer_ptr = buffer; + buffer_free = sizeof (buffer); + status = p->serialize (pi, &buffer_ptr, &buffer_free); + if (status != 0) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: p->serialize failed " + "with status %i.\n", status); + return (-1); + } + + buffer_size = sizeof (buffer) - buffer_free; + if (buffer_size < 1) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: p->serialize didn't put " + "anything into the buffer..\n"); + return (-1); + } + + /* write should not block - hopefully */ + status = send (fd, buffer, buffer_size, MSG_DONTWAIT | MSG_NOSIGNAL); + if (status < 0) + { + pthread_mutex_unlock (&p->lock); + status = errno; + if (status != ECONNREFUSED) + { + fprintf (stderr, "population_send_to_peer: Writing to socket failed: " + "send(2) returned with error %i.\n", status); + } + return (-1); + } + else if (((size_t) status) != buffer_size) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: Writing to socket failed: " + "send(2) returned %i (expected %zu).\n", + status, buffer_size); + return (-1); + } + + pthread_mutex_unlock (&p->lock); + +#if 0 + printf ("population_send_to_peer: Sent individual with rating %i to peer #%i.\n", + p->rate (pi), i); +#endif + + return (0); +} /* }}} int population_send_to_peer */ + +static void *listen_thread (void *data) +{ + listen_thread_args_t *args; + population_t *p; + char *node; + char *service; + int status; + int fd; + + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + + args = (listen_thread_args_t *) data; + p = args->population; + node = args->node; + service = args->service; + + ai_list = NULL; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = AI_PASSIVE; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = 0; + + status = getaddrinfo (node, + (service != NULL) ? service : POPULATION_DEFAULT_PORT, + &ai_hints, &ai_list); + if (status != 0) + { + fprintf (stderr, "listen_thread: getaddrinfo (%s) failed: %s\n", + (node != NULL) ? node : "NULL", gai_strerror (status)); + return ((void *) -1); + } + + fd = -1; + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + continue; + + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + close (fd); + fd = -1; + continue; + } + + break; + } + + freeaddrinfo (ai_list); + + if (fd < 0) + { + fprintf (stderr, "listen_thread: No socket could be opened.\n"); + return ((void *) -1); + } + + pthread_mutex_lock (&p->lock); + p->flags |= POPULATION_FLAG_LISTEN; + while ((p->flags & POPULATION_FLAG_SHUTDOWN) == 0) + { + /* Allocate one extra byte to null-terminate the data. */ + char buffer[NETWORK_BUFFER_SIZE + 1]; + void *pi; + + pthread_mutex_unlock (&p->lock); + + status = recvfrom (fd, buffer, sizeof (buffer) - 1, /* flags = */ 0, + /* from = */ NULL, /* fromlen = */ NULL); + if (status < 1) + { + fprintf (stderr, "listen_thread: recvfrom(2) failed: status = %i; " + "errno = %i;\n", status, errno); + pthread_mutex_lock (&p->lock); + continue; + } + assert (status < sizeof (buffer)); + buffer[sizeof (buffer) - 1] = 0; + + pi = p->unserialize (buffer, (size_t) status); + if (pi == NULL) + { + fprintf (stderr, "listen_thread: p->unserialize returned NULL.\n"); + pthread_mutex_lock (&p->lock); + continue; + } + +#if 0 + printf ("listen_thread: Received individual with rating %i.\n", + p->rate (pi)); +#endif + + population_insert (p, pi); + + p->free (pi); + + pthread_mutex_lock (&p->lock); + } /* while (42) */ + + close (fd); + fd = -1; + + /* clear the listen flag */ + p->flags &= ~(POPULATION_FLAG_LISTEN); + + pthread_mutex_unlock (&p->lock); + return ((void *) 0); +} /* void *listen_thread */ + /* * Constructor and destructor */ @@ -140,6 +393,16 @@ void population_destroy (population_t *p) /* {{{ */ if (p == NULL) return; + pthread_mutex_lock (&p->lock); + p->flags |= POPULATION_FLAG_SHUTDOWN; + if ((p->flags & POPULATION_FLAG_LISTEN) != 0) + { + pthread_kill (p->listen_thread_id, SIGTERM); + pthread_mutex_unlock (&p->lock); + pthread_join (p->listen_thread_id, /* return = */ NULL); + pthread_mutex_lock (&p->lock); + } + if (p->fittest.ptr != NULL) p->free (p->fittest.ptr); p->fittest.ptr = NULL; @@ -212,6 +475,158 @@ int population_set_size (population_t *p, /* {{{ */ return (0); } /* }}} */ +int population_set_serialization (population_t *p, /* {{{ */ + pi_serialize_f serialize, pi_unserialize_f unserialize) +{ + if (p == NULL) + return (-1); + + pthread_mutex_lock (&p->lock); + + p->serialize = serialize; + p->unserialize = unserialize; + + pthread_mutex_unlock (&p->lock); + return (0); +} /* }}} int population_set_serialization */ + +int population_set_replacement_method (population_t *p, int method) /* {{{ */ +{ + int status = 0; + + if (p == NULL) + return (EINVAL); + + pthread_mutex_lock (&p->lock); + + if (method == POPULATION_REPLACEMENT_EXPLOIT) + p->flags &= ~POPULATION_FLAG_EXPLORE; + else if (method == POPULATION_REPLACEMENT_EXPLORE) + p->flags |= POPULATION_FLAG_EXPLORE; + else + status = EINVAL; + + pthread_mutex_unlock (&p->lock); + + return (0); +} /* }}} int population_set_replacement_method */ + +int population_add_peer (population_t *p, const char *node, /* {{{ */ + const char *port) +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + int status; + + if (p == NULL) + return (-1); + + if (node == NULL) + return (-1); + + if (port == NULL) + port = POPULATION_DEFAULT_PORT; + + ai_list = NULL; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = 0; + + status = getaddrinfo (node, port, &ai_hints, &ai_list); + if (status != 0) + { + fprintf (stderr, "population_add_peer: getaddrinfo (%s) failed: %s\n", + node, gai_strerror (status)); + return (-1); + } + + pthread_mutex_lock (&p->lock); + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + int *temp; + + temp = (int *) realloc (p->peers, sizeof (int) * (p->peers_num + 1)); + if (temp == NULL) + { + fprintf (stderr, "population_add_peer: realloc failed.\n"); + continue; + } + p->peers = temp; + + p->peers[p->peers_num] = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (p->peers[p->peers_num] < 0) + continue; + + status = connect (p->peers[p->peers_num], + ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + fprintf (stderr, "population_add_peer: connect(2) failed.\n"); + close (p->peers[p->peers_num]); + continue; + } + + status = fcntl (p->peers[p->peers_num], F_SETFL, O_NONBLOCK); + if (status != 0) + { + fprintf (stderr, "population_add_peer: fcntl (F_SETFL, O_NONBLOCK) " + "failed. Will use the socket with blocking.\n"); + } + + p->peers_num++; + + printf ("population_add_peer: Successfully added peer #%i.\n", + p->peers_num - 1); + } + pthread_mutex_unlock (&p->lock); + + freeaddrinfo (ai_list); + + return (0); +} /* }}} int population_add_peer */ + +int population_start_listen_thread (population_t *p, /* {{{ */ + const char *node, const char *service) +{ + listen_thread_args_t *args; + + pthread_mutex_lock (&p->lock); + if ((p->flags & POPULATION_FLAG_LISTEN) != 0) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_start_listen_thread: " + "Listen thread already started.\n"); + return (-EALREADY); + } + + args = (listen_thread_args_t *) malloc (sizeof (listen_thread_args_t)); + if (args == NULL) + { + fprintf (stderr, "population_start_listen_thread: malloc failed.\n"); + return (-1); + } + + memset (args, 0, sizeof (listen_thread_args_t)); + args->population = p; + args->node = population_strdup (node); + args->service = population_strdup (service); + + pthread_create (&p->listen_thread_id, /* attr = */ NULL, + listen_thread, (void *) args); + + pthread_mutex_unlock (&p->lock); + return (0); +} /* }}} int population_start_listen_thread */ + void *population_get_random (population_t *p) /* {{{ */ { void *ret = NULL; @@ -266,7 +681,7 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ { void *pi; int pi_rating; - int num_tries; + int sent_to_peer; int i; if (p == NULL) @@ -282,12 +697,31 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ return (-1); } + /* + * With a small chance, send this individual to somewhere else. + * `sent_to_peer = -1' is used to signal the following code that this + * individual has been sent to somewhere else and doesn't go into the local + * population. + */ + sent_to_peer = 0; + if (p->peers_num > 0) + { + double prob; + + prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0); + if (prob <= 0.001) + { + population_send_to_peer (p, pi); + sent_to_peer = 1; + } + } + pi_rating = p->rate (pi); pthread_mutex_lock (&p->lock); /* Keep track of the all time best. */ - if ((p->fittest.ptr == NULL) || (p->fittest.rating > pi_rating)) + if ((p->fittest.ptr == NULL) || (p->fittest.rating >= pi_rating)) { void *temp; @@ -301,18 +735,21 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ } } - if (p->individuals_num <= 0) + if ((sent_to_peer != 0) || (p->individuals_num <= 0)) { pthread_mutex_unlock (&p->lock); p->free (pi); - return (-1); + return (0); } - num_tries = (int) ceil (log (p->individuals_num) / log (2.0)); - for (i = 0; i < num_tries; i++) + do { size_t j; + int chance_j; + int chance_pi; + int chance; + j = (size_t) (((double) p->individuals_num) * (rand() / (RAND_MAX + 1.0))); if (p->individuals[j].ptr == NULL) @@ -323,7 +760,20 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ break; } - if (pi_rating < p->individuals[j].rating) + /* large distance from fittest => high probability of losing. */ + chance_j = 1 + p->individuals[j].rating - p->fittest.rating; + chance_pi = 1 + pi_rating - p->fittest.rating; + + chance_j = chance_j * chance_j; + chance_pi = chance_pi * chance_pi; + + chance = (int) (((double) (chance_j + chance_pi)) + * (rand() / (RAND_MAX + 1.0))); + + if (p->flags & POPULATION_FLAG_EXPLORE) + chance *= .5; + + if (chance < chance_j) /* j looses ;) */ { void *temp0; int temp1; @@ -336,15 +786,12 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ p->individuals[j].rating = pi_rating; pi_rating = temp1; } - } + } while (0); pthread_mutex_unlock (&p->lock); if (pi != NULL) - { p->free (pi); - pi = NULL; - } return (0); } /* }}} int population_insert */