/* ARISA - queue handling functions * Copyright (C) 2003, 2004 Carl Ritson * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */ #include "arisa.h" static const char *err_dup = "Duplicate Queue"; static const char *err_max = "At Per User Queue Slot Limit"; static const char *err_full = "All Queue Slots Full"; static const char *err_unknown = "Unknown Error"; int queue_add(queue_t *q) { int i,ret = -1; LOCK(global); LOCK(q); for(i = 0; i < global->no_queues; ++i) { LOCK(global->queues[i]); if(strcasecmp(global->queues[i]->name,q->name) == 0 && global->queues[i]->deleted == 0) { LOGP(L_ERR,"Error, attempted to add duplicate queue %s", global->queues[i]->name); UNLOCK(global->queues[i]); break; } UNLOCK(global->queues[i]); } if(i == global->no_queues) { LOGP(L_INF,"Added queue %s",q->name); PTRARR_ADD(&global->queues,&global->no_queues,q); validity_insert(q); ret = 0; } UNLOCK(q); UNLOCK(global); if(ret == 0) se_notify_queue_add(q); return ret; } void queue_purge(const time_t now) { queue_t **old = NULL; int no_old = 0; int i,j,deleted; //D("Queue Purge Begins"); LOCK(global); for(i = 0, deleted = 0; i < global->no_queues; ++i) { LOCK(global->queues[i]); if(IS_PURGE_TIME(global->queues[i]->deleted,now)) deleted++; UNLOCK(global->queues[i]); } if(deleted) { old = global->queues; no_old = global->no_queues; PTRARR_ALLOC(&(global->queues),&(global->no_queues), no_old - deleted); for(i = 0, j = 0; i < no_old; ++i) { LOCK(old[i]); if(!IS_PURGE_TIME(old[i]->deleted,now)) { global->queues[j++] = old[i]; UNLOCK(old[i]); old[i] = NULL; } else UNLOCK(old[i]); } } UNLOCK(global); if(deleted) { for(i = 0; i < no_old; ++i) { if(old[i] != NULL) free_queue(old[i]); } } if(old != NULL) xfree(old); //D("Queue Purge Ends"); } int queue_del(queue_t *q) { int i,okay = 1; LOCK(global); for(i = 0; i < global->no_packlists && okay; ++i) { LOCK(global->packlists[i]); if(global->packlists[i]->deleted == 0 && global->packlists[i]->queue == q) { LOGP(L_ERR,"Error trying to remove queue with attached packlists"); okay = 0; } UNLOCK(global->packlists[i]); } if(okay) { LOCK(q); LOGP(L_INF,"Deleted queue %s",q->name); q->deleted = xtime(); validity_delete(q); UNLOCK(q); } UNLOCK(global); return (okay - 1); // returns 0 or -1 } queue_t *queue_find(const char *name) { queue_t *ret = NULL; int i; LOCK(global); for(i = 0; i < global->no_queues && ret == NULL; ++i) { LOCK(global->queues[i]); if(strcasecmp(global->queues[i]->name,name) == 0 && global->queues[i]->deleted == 0) { ret = global->queues[i]; validity_insert(ret); } UNLOCK(global->queues[i]); } UNLOCK(global); return ret; } int queue_valid(queue_t *q) { int i,ret = 0; if(q == NULL) return 0; if(validity_test(q)) return 1; LOCK(global); for(i = 0; i < global->no_queues; ++i) { if(global->queues[i] == q) break; } if(i < global->no_queues) { // queue is in the array LOCK(q); if(q->deleted == 0) { validity_insert(q); ret = 1; } UNLOCK(q); } UNLOCK(global); return ret; } int queue_rename(queue_t *q, const char *name) { int i,ret = 0; if(q == NULL || name == NULL) return -1; LOCK(global); for(i = 0; i < global->no_queues && ret == 0; ++i) { LOCK(global->queues[i]); if(global->queues[i]->deleted == 0 && strcmp(global->queues[i]->name,name) == 0) ret = -1; UNLOCK(global->queues[i]); } if(ret == 0) { LOCK(q); if(q->name != NULL) xfree(q->name); q->name = xstrdup(name); UNLOCK(q); } UNLOCK(global); return ret; } int queue_enqueue(queue_t *q, send_t *s, int flags, char **err) { int i,ret = 0; if(err != NULL) *err = (char *)err_unknown; LOCK(q); if(q->deleted != 0) { UNLOCK(q); return -1; } LOCK(s); RLOCK(s->pack); if(s->pack->size < q->bypass_size) flags |= SEND_NO_CLIMIT; RUNLOCK(s->pack); UNLOCK(s); if(pool_add_send(q->pool,s,flags,err) == 0) { UNLOCK(q); return 0; } LOCK(s); if((q->queues_per_user > 0 || q->combined_queue_limit) && !(flags & QUEUE_NO_ULIMIT)) { int sends_per_user, no_queues = 0; pool_t *p = q->pool; LOCK(p); sends_per_user = p->sends_per_user; for(i = 0; i < p->no_csends && ret == 0; ++i) { LOCK(p->csends[i]); if(p->csends[i]->network == s->network && irc_strcasecmp(p->csends[i]->nick,s->nick) == 0) { if(p->csends[i]->pack == s->pack) ret = -1; no_queues++; } UNLOCK(p->csends[i]); } UNLOCK(p); if(q->combined_queue_limit == 0) no_queues = 0; for(i = 0; i < q->no_qsends && ret == 0; ++i) { LOCK(q->qsends[i]); if(q->qsends[i]->network == s->network && irc_strcasecmp(q->qsends[i]->nick,s->nick) == 0) { if(q->qsends[i]->pack == s->pack) ret = -1; no_queues++; } UNLOCK(q->qsends[i]); } if(ret == -1) { if(err != NULL) *err = (char *)err_dup; } else if(q->combined_queue_limit == 0 && no_queues >= q->queues_per_user) { if(err != NULL) *err = (char *)err_max; ret = -1; } else if(q->combined_queue_limit && no_queues >= (q->queues_per_user + sends_per_user)) { if(err != NULL) *err = (char *)err_max; ret = -1; } } if(ret != -1) { if(q->no_qsends < q->length || (flags & QUEUE_NO_CLIMIT)) { if(err != NULL) *err = NULL; PTRARR_ADD(&q->qsends,&q->no_qsends,s); s->state = STATE_QUEUED; s->started = xtime(); s->queue = q; ret = q->no_qsends; // position } else { if(err != NULL && q->length > 0) *err = (char *)err_full; ret = -1; } } UNLOCK(s); UNLOCK(q); if(ret > 0) se_notify_enqueue(1,q,s,ret); return ret; } static int queue_cmp_0(const void *a, const void *b) { queue_t *qa = *((queue_t **)a), *qb = *((queue_t **)b); if(qa->priority != qb->priority) return 0-(qa->priority - qb->priority); else if(qa->last_dequeue != qb->last_dequeue) return 0-(qb->last_dequeue - qa->last_dequeue); else return 0-(qa->no_qsends - qb->no_qsends); } static int queue_cmp_1(const void *a, const void *b) { queue_t *qa = *((queue_t **)a), *qb = *((queue_t **)b); time_t now = xtime(); return 0-((qa->priority * (now - qa->last_dequeue) * qa->no_qsends) - (qb->priority * (now - qb->last_dequeue) * qb->no_qsends)); } static int queue_cmp_2(const void *a, const void *b) { queue_t *qa = *((queue_t **)a), *qb = *((queue_t **)b); time_t now = xtime(); return 0-((qa->priority * (now - qa->last_dequeue) * ((qa->no_qsends*100)/qa->length)) - (qb->priority * (now - qb->last_dequeue) * ((qb->no_qsends*100)/qb->length))); } static int send_cmp(const void *a, const void *b) { send_t *s[2] = {*((send_t **)a), *((send_t **)b)}; int i,mode[2] = {0,0}; for(i = 0; i < 2; ++i) { LOCK(s[i]); LOCK(s[i]->network); if(s[i]->network->info != NULL) { LOCK(s[i]->network->info); mode[i] = nickhash_highest_mode( s[i]->network->info->nick_tracker, s[i]->nick); UNLOCK(s[i]->network->info); } UNLOCK(s[i]->network); UNLOCK(s[i]); } return 0-(mode[0] - mode[1]); } send_t *queue_dequeue(pool_t *to_pool) { send_t *ret = NULL, **sends = NULL; queue_t *q = NULL, **queues= NULL; int i,j,k,usends; int no_sends = 0, no_queues = 0; //D("entry"); LOCK(global); queues = xalloc(sizeof(queue_t *) * global->no_queues); for(i = 0; i < global->no_queues; ++i) { LOCK(global->queues[i]); if( global->queues[i]->deleted == 0 && global->queues[i]->pool == to_pool && global->queues[i]->priority > 0 && global->queues[i]->no_qsends > 0) { queues[no_queues++] = global->queues[i]; /* hold locks! */ } else UNLOCK(global->queues[i]); } if(no_queues == 0) { UNLOCK(global); if(queues != NULL) xfree(queues); return NULL; } LOCK(to_pool); switch(to_pool->dequeue_algorithm) { case 0: qsort(queues,no_queues,sizeof(queue_t *),queue_cmp_0); break; case 1: qsort(queues,no_queues,sizeof(queue_t *),queue_cmp_1); break; case 2: qsort(queues,no_queues,sizeof(queue_t *),queue_cmp_2); break; default: break; } for(i = 0; ret == NULL && i < no_queues; ++i) { q = queues[i]; sends = PTRARR_DUP(q->qsends,q->no_qsends,&no_sends); if(q->priority_queues != 0) qsort(sends,no_sends,sizeof(send_t *),send_cmp); for(j = 0; ret == NULL && j < no_sends; ++j) { if(to_pool->sends_per_user == 0) { ret = sends[j]; } else { LOCK(sends[j]); for(k = 0, usends = 0; k < to_pool->no_csends; ++k) { LOCK(to_pool->csends[k]); if(to_pool->csends[k]->network == sends[j]->network && irc_strcasecmp(to_pool->csends[k]->nick,sends[j]->nick) == 0) usends++; UNLOCK(to_pool->csends[k]); } UNLOCK(sends[j]); if(usends < to_pool->sends_per_user) ret = sends[j]; } if(ret != NULL) { /* Don't send someone the a file they are already receiving, * at least from this pool. */ LOCK(ret); for(k = 0; k < to_pool->no_csends && ret != NULL; ++k) { send_t *s = to_pool->csends[k]; LOCK(s); if(s->network == ret->network && irc_strcasecmp(s->nick,ret->nick) == 0) { if(s->pack == ret->pack) { UNLOCK(ret); ret = NULL; } } UNLOCK(s); } if(ret != NULL) UNLOCK(ret); } } if(ret != NULL) { q->last_dequeue = xtime(); PTRARR_DEL(&(q->qsends),&(q->no_qsends),ret); LOCK(ret); ret->pool = to_pool; se_notify_dequeue(0,q,ret); UNLOCK(ret); } if(sends != NULL) xfree(sends); } UNLOCK(to_pool); if(queues != NULL) { for(i = 0; i < no_queues; ++i) UNLOCK(queues[i]); xfree(queues); } UNLOCK(global); //D("ret: %p",ret); return ret; } int queue_remove_no(queue_t *q, const int no) { send_t *s = NULL; int ret = -1; LOCK(q); if(no <= q->no_qsends && no > 0) { s = q->qsends[no-1]; PTRARR_DEL(&q->qsends,&q->no_qsends,s); ret = 0; } UNLOCK(q); if(s != NULL) free_send(s); return ret; } int queue_remove(queue_t *q, const char *hostmask, network_t *net, const char *nick, const char *file, const char *label, send_t *ptr) { size_t bufsize = 0; char *buffer = NULL; send_t **to_free = NULL; int no_to_free = 0; int i,j,done,sg = 0; D("host: %s, net: %p, nick: %s, file: %s, label: %s", hostmask,net,nick,file,label); if(label != NULL) { bufsize = strlen(label)+1; buffer = xalloc(bufsize); } if(q == NULL) { sg = 1; LOCK(global); } for(j = 0 ;; ++j) { if(sg) { if(j >= global->no_queues) break; q = global->queues[j]; } LOCK(q); if(q->deleted != 0) { UNLOCK(q); continue; } do { /* A confusing and very deep loop. * Here 'done' indicates whether the code * has done a full loop of the queue. * * Everytime a queued send is deleted, the loop * must start over since its position in list is * no longer valid. */ for(i = 0, done = 1; i < q->no_qsends && done == 1; ++i) { LOCK(q->qsends[i]); if((ptr == NULL && send_matchs(q->qsends[i],hostmask,net,nick,file,label)) || q->qsends[i] == ptr) done = 0; UNLOCK(q->qsends[i]); if(done == 0) { PTRARR_ADD(&to_free,&no_to_free, q->qsends[i]); PTRARR_DEL(&(q->qsends),&(q->no_qsends), q->qsends[i]); } } } while(done == 0); UNLOCK(q); if(!sg) break; } if(sg) UNLOCK(global); if(to_free != NULL) { /* locks on sends to free already held from earlier loop */ for(i = 0; i < no_to_free; ++i) free_send(to_free[i]); xfree(to_free); } if(buffer != NULL) xfree(buffer); D("ret: %d",no_to_free); return no_to_free; } int queue_check_users(queue_t *q) { struct { network_t *network; char *nick; } _user_t, *u; pqueue_t to_check; pqueue_t to_delete; int i,del; pqueue_init(&to_check,0); pqueue_init(&to_delete,0); LOCK(q); for(i = 0; i < q->no_qsends; ++i) { u = xalloc(sizeof(_user_t)); u->nick = strdup(q->qsends[i]->nick); u->network = q->qsends[i]->network; pqueue_push_back(&to_check,u); } UNLOCK(q); while((u = pqueue_pop_front(&to_check)) != NULL) { del = 0; LOCK(u->network); if(!u->network->deleted && u->network->info != NULL && u->network->state == NETWORK_CONNECTED) { LOCK(u->network->info); if(!nickhash_present(u->network->info->nick_tracker, NULL,u->nick)) { pqueue_push_back(&to_delete,u); del = 1; } UNLOCK(u->network->info); } UNLOCK(u->network); if(!del) { xfree(u->nick); xfree(u); } } while((u = pqueue_pop_front(&to_delete)) != NULL) { LOCK(u->network); LOGP(L_XDC,"Queue check shows [%s:%s] is offline, deleting queues",u->network->name,u->nick); UNLOCK(u->network); queue_remove(NULL,NULL,u->network,u->nick,NULL,NULL,NULL); xfree(u->nick); xfree(u); } LOCK(q); q->last_checked = xtime(); UNLOCK(q); return 0; } static void generate_notifications(queue_t *q, pqueue_t *to_send) { struct { network_t *network; char *nick; char *msg; } msg_t, *m; char buffer[512]; int i; for(i = 0; i < q->no_qsends; ++i) { send_t *s = q->qsends[i]; m = xalloc(sizeof(msg_t)); LOCK(s); RLOCK(s->pack); snprintf(buffer,sizeof(buffer), "You are queued for \"%s\", present position %d of %d, \"/msg $BOT XDCC REMOVE %s\" to remove this queue.", s->pack->label,i+1,q->no_qsends,s->pack->label); m->network = s->network; m->nick = xstrdup(s->nick); m->msg = xstrdup(buffer); RUNLOCK(s->pack); UNLOCK(s); pqueue_push_back(to_send,m); } } static void send_notifications(pqueue_t *to_send) { struct { network_t *network; char *nick; char *msg; } *m; while((m = pqueue_pop_front(to_send)) != NULL) { irc_send_msg(m->network,m->nick,m->msg); xfree(m->msg); xfree(m->nick); xfree(m); } } int queue_notify_users(queue_t *q) { pqueue_t to_send; int ret; pqueue_init(&to_send,0); LOCK(q); generate_notifications(q,&to_send); q->last_notify = xtime(); UNLOCK(q); ret = pqueue_length(&to_send); send_notifications(&to_send); return ret; } void queue_notify(const time_t now) { pqueue_t to_send; int i; pqueue_init(&to_send,0); LOCK(global); for(i = 0; i < global->no_queues; ++i) { queue_t *q = global->queues[i]; LOCK(q); if(q->notify != 0 && (q->last_notify + q->notify) > now) { generate_notifications(q,&to_send); q->last_notify = now; } UNLOCK(q); } UNLOCK(global); send_notifications(&to_send); } void queue_nick_change(queue_t *q, network_t *net, const char *onick, const char *nnick) { int i; LOCK(q); for(i = 0; i < q->no_qsends; ++i) { send_t *s = q->qsends[i]; LOCK(s); if(s->network == net && irc_strcasecmp(s->nick,onick) == 0) { xfree(s->nick); s->nick = xstrdup(nnick); } UNLOCK(s); } UNLOCK(q); } int queue_requeue(send_t *s) { queue_t *q = NULL; if(s->retries-- > 0) q = s->queue; if(q != NULL && queue_valid(q)) { // Here we should have the only reference to the send LOCK(q); if(q->no_qsends < q->length || q->overflow_requeue) { char buffer[512],nick[64]; network_t *net; PTRARR_ADD(&(q->qsends),&(q->no_qsends),s); LOCK(s); s->state = STATE_QUEUED; s->pool = NULL; s->started = xtime(); s->bytes_sent = 0; s->pos = 0; D("s: %p, s->pack: %p",s,s->pack); // FIXME: is this the right place for this message ? RLOCK(s->pack); snprintf(buffer,sizeof(buffer),"You have been requeued in position %d for \"%s\", to remove this queue type \"/msg $BOT XDCC REMOVE %s\". You have %d retries remaining.", q->no_qsends, s->pack->label,s->pack->label, s->retries); RUNLOCK(s->pack); net = s->network; xstrncpy(nick,s->nick,sizeof(nick)); se_notify_enqueue(0,q,s,q->no_qsends); UNLOCK(s); UNLOCK(q); irc_send_notice(net,nick,buffer); return 0; } else { UNLOCK(q); } } return -1; } // from_pos and to_pos indexed from 0 int queue_send_move(queue_t *q, int from_pos, int to_pos) { int ret; LOCK(q); ret = PTRARR_MOVE(&(q->qsends),&(q->no_qsends),from_pos,to_pos); UNLOCK(q); return ret; } // pos1,pos2 indexed from 0 int queue_send_swap(queue_t *q, int pos1, int pos2) { int ret; LOCK(q); ret = PTRARR_SWAP(&(q->qsends),&(q->no_qsends),pos1,pos2); UNLOCK(q); return ret; } int queue_send_force_no(queue_t *q, const int no) { LOCK(q); if(no < 0 || no >= q->no_qsends) { UNLOCK(q); return -1; } LOCK(q->pool); PTRARR_ADD(&(q->pool->csends),&(q->pool->no_csends),q->qsends[no]); UNLOCK(q->pool); PTRARR_DEL(&(q->qsends),&(q->no_qsends),q->qsends[no]); UNLOCK(q); return 0; } int queue_send_force(queue_t *q, const char *hostmask, network_t *net, const char *nick, const char *file, const char *label, send_t *ptr) { pqueue_t to_clear; int i; pqueue_init(&to_clear,0); LOCK(q); LOCK(q->pool); for(i = 0; i < q->no_qsends; ++i) { LOCK(q->qsends[i]); if((ptr == NULL && send_matchs(q->qsends[i],hostmask,net,nick,file,label)) || q->qsends[i] == ptr) { pqueue_push_back(&to_clear,q->qsends[i]); PTRARR_ADD(&(q->pool->csends),&(q->pool->no_csends), q->qsends[i]); } UNLOCK(q->qsends[i]); } UNLOCK(q->pool); i = pqueue_length(&to_clear); while(pqueue_length(&to_clear) > 0) { PTRARR_DEL(&(q->qsends),&(q->no_qsends), pqueue_pop_front(&to_clear)); } UNLOCK(q); return i; } int queue_set_pool(queue_t *q, pool_t *p) { int ret = -1; LOCK(q); LOCK(p); if(p->deleted == 0) { q->pool = p; ret = 0; } UNLOCK(p); UNLOCK(q); return ret; } /** Settings Data **/ static setting_t queue_data[] = { {"bypass-size", ST_OFF, OFFSETOF(queue_t,bypass_size), 0,"0",NULL,NULL,NULL}, {"combined-queue-limit",ST_BOOL,OFFSETOF(queue_t,combined_queue_limit), 0,NULL,NULL,NULL,NULL}, {"length", ST_INT, OFFSETOF(queue_t,length), 0,"0",NULL,NULL,NULL}, {"notify", ST_INT, OFFSETOF(queue_t,notify), 0,"0",NULL,NULL,NULL}, {"overflow-requeue", ST_BOOL,OFFSETOF(queue_t,overflow_requeue), VALUE_BOOL_ONOFF,NULL,NULL,NULL,NULL}, {"priority", ST_INT, OFFSETOF(queue_t,priority), 0,"-1",NULL,NULL,NULL}, {"priority-queues", ST_BOOL,OFFSETOF(queue_t,priority_queues), VALUE_BOOL_ONOFF,NULL,NULL,NULL,NULL}, {"queues-per-user", ST_INT, OFFSETOF(queue_t,queues_per_user), 0,"0",NULL,NULL,NULL}, {NULL} }; int queue_apply_setting(queue_t *q, const char *name, const char *value) { value_t *v = value_string(name,value); int ret; LOCK(q); ret = apply_setting(queue_data,q,v); UNLOCK(q); xfree(v); return ret; } int queue_read_settings(queue_t *q, pqueue_t *out) { int ret; LOCK(q); ret = read_settings(queue_data,q,out); UNLOCK(q); return ret; }