+int relink_or_rename(char *old, char *new) {
+ int ret;
+
+ ret = link(old, new);
+ if (ret < 0) {
+ /* Same Coda hack as in write_sha1_file(sha1_file.c) */
+ ret = errno;
+ if (ret == EXDEV && !rename(old, new))
+ return 0;
+ }
+ unlink(old);
+ if (ret) {
+ if (ret != EEXIST)
+ return ret;
+ }
+
+ return 0;
+}
+
+#ifdef USE_CURL_MULTI
+void process_curl_messages();
+void process_request_queue();
+#endif
+
+struct active_request_slot *get_active_slot()
+{
+ struct active_request_slot *slot = active_queue_head;
+ struct active_request_slot *newslot;
+
+#ifdef USE_CURL_MULTI
+ int num_transfers;
+
+ /* Wait for a slot to open up if the queue is full */
+ while (active_requests >= max_requests) {
+ curl_multi_perform(curlm, &num_transfers);
+ if (num_transfers < active_requests) {
+ process_curl_messages();
+ }
+ }
+#endif
+
+ while (slot != NULL && slot->in_use) {
+ slot = slot->next;
+ }
+ if (slot == NULL) {
+ newslot = xmalloc(sizeof(*newslot));
+ newslot->curl = curl_easy_duphandle(curl_default);
+ newslot->in_use = 0;
+ newslot->next = NULL;
+
+ slot = active_queue_head;
+ if (slot == NULL) {
+ active_queue_head = newslot;
+ } else {
+ while (slot->next != NULL) {
+ slot = slot->next;
+ }
+ slot->next = newslot;
+ }
+ slot = newslot;
+ }
+
+ active_requests++;
+ slot->in_use = 1;
+ slot->done = 0;
+ slot->local = NULL;
+ curl_easy_setopt(slot->curl, CURLOPT_HTTPHEADER, no_pragma_header);
+ curl_easy_setopt(slot->curl, CURLOPT_HTTPHEADER, no_range_header);
+ curl_easy_setopt(slot->curl, CURLOPT_ERRORBUFFER, curl_errorstr);
+
+ return slot;
+}
+
+int start_active_slot(struct active_request_slot *slot)
+{
+#ifdef USE_CURL_MULTI
+ CURLMcode curlm_result = curl_multi_add_handle(curlm, slot->curl);
+
+ if (curlm_result != CURLM_OK &&
+ curlm_result != CURLM_CALL_MULTI_PERFORM) {
+ active_requests--;
+ slot->in_use = 0;
+ return 0;
+ }
+#endif
+ return 1;
+}
+
+void run_active_slot(struct active_request_slot *slot)
+{
+#ifdef USE_CURL_MULTI
+ int num_transfers;
+ long last_pos = 0;
+ long current_pos;
+ fd_set readfds;
+ fd_set writefds;
+ fd_set excfds;
+ int max_fd;
+ struct timeval select_timeout;
+ CURLMcode curlm_result;
+
+ while (!slot->done) {
+ data_received = 0;
+ do {
+ curlm_result = curl_multi_perform(curlm,
+ &num_transfers);
+ } while (curlm_result == CURLM_CALL_MULTI_PERFORM);
+ if (num_transfers < active_requests) {
+ process_curl_messages();
+ process_request_queue();
+ }
+
+ if (!data_received && slot->local != NULL) {
+ current_pos = ftell(slot->local);
+ if (current_pos > last_pos)
+ data_received++;
+ last_pos = current_pos;
+ }
+
+ if (!slot->done && !data_received) {
+ max_fd = 0;
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ FD_ZERO(&excfds);
+ select_timeout.tv_sec = 0;
+ select_timeout.tv_usec = 50000;
+ select(max_fd, &readfds, &writefds,
+ &excfds, &select_timeout);
+ }
+ }
+#else
+ slot->curl_result = curl_easy_perform(slot->curl);
+ active_requests--;
+#endif
+}
+
+void start_request(struct transfer_request *request)
+{
+ char *hex = sha1_to_hex(request->sha1);
+ char prevfile[PATH_MAX];
+ char *url;
+ char *posn;
+ int prevlocal;
+ unsigned char prev_buf[PREV_BUF_SIZE];
+ ssize_t prev_read = 0;
+ long prev_posn = 0;
+ char range[RANGE_HEADER_SIZE];
+ struct curl_slist *range_header = NULL;
+ struct active_request_slot *slot;
+
+ snprintf(prevfile, sizeof(prevfile), "%s.prev", request->filename);
+ unlink(prevfile);
+ rename(request->tmpfile, prevfile);
+ unlink(request->tmpfile);
+
+ request->local = open(request->tmpfile,
+ O_WRONLY | O_CREAT | O_EXCL, 0666);
+ if (request->local < 0) {
+ request->state = ABORTED;
+ error("Couldn't create temporary file %s for %s: %s\n",
+ request->tmpfile, request->filename, strerror(errno));
+ return;
+ }
+
+ memset(&request->stream, 0, sizeof(request->stream));
+
+ inflateInit(&request->stream);
+
+ SHA1_Init(&request->c);
+
+ url = xmalloc(strlen(request->repo->base) + 50);
+ request->url = xmalloc(strlen(request->repo->base) + 50);
+ strcpy(url, request->repo->base);
+ posn = url + strlen(request->repo->base);
+ strcpy(posn, "objects/");
+ posn += 8;
+ memcpy(posn, hex, 2);
+ posn += 2;
+ *(posn++) = '/';
+ strcpy(posn, hex + 2);
+ strcpy(request->url, url);
+
+ /* If a previous temp file is present, process what was already
+ fetched. */
+ prevlocal = open(prevfile, O_RDONLY);
+ if (prevlocal != -1) {
+ do {
+ prev_read = read(prevlocal, prev_buf, PREV_BUF_SIZE);
+ if (prev_read>0) {
+ if (fwrite_sha1_file(prev_buf,
+ 1,
+ prev_read,
+ request) == prev_read) {
+ prev_posn += prev_read;
+ } else {
+ prev_read = -1;
+ }
+ }
+ } while (prev_read > 0);
+ close(prevlocal);
+ }
+ unlink(prevfile);
+
+ /* Reset inflate/SHA1 if there was an error reading the previous temp
+ file; also rewind to the beginning of the local file. */
+ if (prev_read == -1) {
+ memset(&request->stream, 0, sizeof(request->stream));
+ inflateInit(&request->stream);
+ SHA1_Init(&request->c);
+ if (prev_posn>0) {
+ prev_posn = 0;
+ lseek(request->local, SEEK_SET, 0);
+ ftruncate(request->local, 0);
+ }
+ }
+
+ slot = get_active_slot();
+ curl_easy_setopt(slot->curl, CURLOPT_FILE, request);
+ curl_easy_setopt(slot->curl, CURLOPT_WRITEFUNCTION, fwrite_sha1_file);
+ curl_easy_setopt(slot->curl, CURLOPT_ERRORBUFFER, request->errorstr);
+ curl_easy_setopt(slot->curl, CURLOPT_URL, url);
+
+ /* If we have successfully processed data from a previous fetch
+ attempt, only fetch the data we don't already have. */
+ if (prev_posn>0) {
+ if (get_verbosely)
+ fprintf(stderr,
+ "Resuming fetch of object %s at byte %ld\n",
+ hex, prev_posn);
+ sprintf(range, "Range: bytes=%ld-", prev_posn);
+ range_header = curl_slist_append(range_header, range);
+ curl_easy_setopt(slot->curl,
+ CURLOPT_HTTPHEADER, range_header);
+ }
+
+ /* Try to get the request started, abort the request on error */
+ if (!start_active_slot(slot)) {
+ request->state = ABORTED;
+ close(request->local);
+ free(request->url);
+ return;
+ }
+
+ request->slot = slot;
+ request->state = ACTIVE;
+}
+
+void finish_request(struct transfer_request *request)
+{
+ fchmod(request->local, 0444);
+ close(request->local);
+
+ if (request->http_code == 416) {
+ fprintf(stderr, "Warning: requested range invalid; we may already have all the data.\n");
+ } else if (request->curl_result != CURLE_OK) {
+ return;
+ }
+
+ inflateEnd(&request->stream);
+ SHA1_Final(request->real_sha1, &request->c);
+ if (request->zret != Z_STREAM_END) {
+ unlink(request->tmpfile);
+ return;
+ }
+ if (memcmp(request->sha1, request->real_sha1, 20)) {
+ unlink(request->tmpfile);
+ return;
+ }
+ request->rename =
+ relink_or_rename(request->tmpfile, request->filename);
+
+ if (request->rename == 0)
+ pull_say("got %s\n", sha1_to_hex(request->sha1));
+}
+
+void release_request(struct transfer_request *request)
+{
+ struct transfer_request *entry = request_queue_head;
+
+ if (request == request_queue_head) {
+ request_queue_head = request->next;
+ } else {
+ while (entry->next != NULL && entry->next != request)
+ entry = entry->next;
+ if (entry->next == request)
+ entry->next = entry->next->next;
+ }
+
+ free(request->url);
+ free(request);
+}
+
+#ifdef USE_CURL_MULTI
+void process_curl_messages()
+{
+ int num_messages;
+ struct active_request_slot *slot;
+ struct transfer_request *request = NULL;
+ CURLMsg *curl_message = curl_multi_info_read(curlm, &num_messages);
+
+ while (curl_message != NULL) {
+ if (curl_message->msg == CURLMSG_DONE) {
+ slot = active_queue_head;
+ while (slot != NULL &&
+ slot->curl != curl_message->easy_handle)
+ slot = slot->next;
+ if (slot != NULL) {
+ curl_multi_remove_handle(curlm, slot->curl);
+ active_requests--;
+ slot->done = 1;
+ slot->in_use = 0;
+ slot->curl_result = curl_message->data.result;
+ request = request_queue_head;
+ while (request != NULL &&
+ request->slot != slot)
+ request = request->next;
+ } else {
+ fprintf(stderr, "Received DONE message for unknown request!\n");
+ }
+ if (request != NULL) {
+ request->curl_result =
+ curl_message->data.result;
+ curl_easy_getinfo(slot->curl,
+ CURLINFO_HTTP_CODE,
+ &request->http_code);
+ request->slot = NULL;
+
+ /* Use alternates if necessary */
+ if (request->http_code == 404 &&
+ request->repo->next != NULL) {
+ request->repo = request->repo->next;
+ start_request(request);
+ } else {
+ finish_request(request);
+ request->state = COMPLETE;
+ }
+ }
+ } else {
+ fprintf(stderr, "Unknown CURL message received: %d\n",
+ (int)curl_message->msg);
+ }
+ curl_message = curl_multi_info_read(curlm, &num_messages);
+ }
+}
+
+void process_request_queue()
+{
+ struct transfer_request *request = request_queue_head;
+ int num_transfers;
+
+ while (active_requests < max_requests && request != NULL) {
+ if (request->state == WAITING) {
+ if (has_sha1_file(request->sha1))
+ release_request(request);
+ else
+ start_request(request);
+ curl_multi_perform(curlm, &num_transfers);
+ }
+ request = request->next;
+ }
+}
+#endif
+