X-Git-Url: https://git.octo.it/?p=libpopulation.git;a=blobdiff_plain;f=src%2Flibpopulation.c;fp=src%2Flibpopulation.c;h=c5d3c4e8eaa8cdcc95ebe95bd98b8fdefedafbfb;hp=42d1cb1871372ccc4b79b751931791dcb631054b;hb=75002085c2d5746270796f0d0332752e52702331;hpb=00bb95ea0c77d776efebda521c57b8d272e2d908 diff --git a/src/libpopulation.c b/src/libpopulation.c index 42d1cb1..c5d3c4e 100644 --- a/src/libpopulation.c +++ b/src/libpopulation.c @@ -61,6 +61,7 @@ #include "population.h" #include +#include #include #include #include @@ -68,6 +69,11 @@ #include #include #include +#include +#include +#include +#include +#include /* * Data types @@ -92,6 +98,9 @@ struct population_s pi_serialize_f serialize; pi_unserialize_f unserialize; + int *peers; + size_t peers_num; + individual_t fittest; individual_t *individuals; @@ -99,6 +108,88 @@ struct population_s }; /* + * Private functions + */ +static int population_send_to_peer (population_t *p, void *pi) /* {{{ */ +{ + char buffer[1450]; + size_t buffer_size; + size_t buffer_free; + char *buffer_ptr; + + int fd; + int i; + int status; + + if (p == NULL) + return (-1); + + if (pi == NULL) + return (-1); + + buffer_size = sizeof (buffer); + memset (buffer, 0, buffer_size); + + pthread_mutex_lock (&p->lock); + + if (p->serialize == NULL) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: Cannot send to peer without " + "serialization function!\n"); + return (-1); + } + + i = (int) (((double) p->peers_num) * (rand() / (RAND_MAX + 1.0))); + fd = p->peers[i]; + + buffer_ptr = buffer; + buffer_free = sizeof (buffer); + status = p->serialize (pi, &buffer_ptr, &buffer_free); + if (status != 0) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: p->serialize failed " + "with status %i.\n", status); + return (-1); + } + + buffer_size = sizeof (buffer) - buffer_free; + if (buffer_size < 1) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: p->serialize didn't put " + "anything into the buffer..\n"); + return (-1); + } + + /* write should not block - hopefully */ + status = send (fd, buffer, buffer_size, MSG_DONTWAIT | MSG_NOSIGNAL); + if (status < 0) + { + pthread_mutex_unlock (&p->lock); + status = errno; + if (status != ECONNREFUSED) + { + fprintf (stderr, "population_send_to_peer: Writing to socket failed: " + "send(2) returned with error %i.\n", status); + } + return (-1); + } + else if (((size_t) status) != buffer_size) + { + pthread_mutex_unlock (&p->lock); + fprintf (stderr, "population_send_to_peer: Writing to socket failed: " + "send(2) returned %i (expected %zu).\n", + status, buffer_size); + return (-1); + } + + pthread_mutex_unlock (&p->lock); + return (0); +} /* }}} int population_send_to_peer */ + +/* * Constructor and destructor */ population_t *population_create (pi_rate_f rate, pi_copy_f copy, /* {{{ */ @@ -231,6 +322,86 @@ int population_set_serialization (population_t *p, return (0); } /* }}} int population_set_serialization */ +int population_add_peer (population_t *p, const char *node, /* {{{ */ + const char *port) +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + int status; + + if (p == NULL) + return (-1); + + if (node == NULL) + return (-1); + + if (port == NULL) + port = POPULATION_DEFAULT_PORT; + + ai_list = NULL; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = 0; + + status = getaddrinfo (node, port, &ai_hints, &ai_list); + if (status != 0) + { + fprintf (stderr, "population_add_peer: getaddrinfo (%s) failed: %s\n", + node, gai_strerror (status)); + return (-1); + } + + pthread_mutex_lock (&p->lock); + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + int *temp; + + temp = (int *) realloc (p->peers, sizeof (int) * (p->peers_num + 1)); + if (temp == NULL) + { + fprintf (stderr, "population_add_peer: realloc failed.\n"); + continue; + } + p->peers = temp; + + p->peers[p->peers_num] = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (p->peers[p->peers_num] < 0) + continue; + + status = connect (p->peers[p->peers_num], + ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + fprintf (stderr, "population_add_peer: connect(2) failed.\n"); + close (p->peers[p->peers_num]); + continue; + } + + status = fcntl (p->peers[p->peers_num], F_SETFL, O_NONBLOCK); + if (status != 0) + { + fprintf (stderr, "population_add_peer: fcntl (F_SETFL, O_NONBLOCK) " + "failed. Will use the socket with blocking.\n"); + } + + p->peers_num++; + } + pthread_mutex_unlock (&p->lock); + + freeaddrinfo (ai_list); + + return (0); +} /* }}} int population_add_peer */ + void *population_get_random (population_t *p) /* {{{ */ { void *ret = NULL; @@ -305,6 +476,19 @@ int population_insert (population_t *p, void *pi_orig) /* {{{ */ pthread_mutex_lock (&p->lock); + if (p->peers_num > 0) + { + double prob; + + prob = ((double) rand ()) / (((double) RAND_MAX) + 1.0); + if (prob < 0.01) + { + pthread_mutex_unlock (&p->lock); + population_send_to_peer (p, pi); + pthread_mutex_lock (&p->lock); + } + } + /* Keep track of the all time best. */ if ((p->fittest.ptr == NULL) || (p->fittest.rating > pi_rating)) {