/* ARISA - data hashing functions * Copyright (C) 2003, 2004 Carl Ritson * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */ #include "arisa.h" #include #include #include #include #include "crc32.h" #include "md5.h" #define TICKS 8 #define MAX_REQS_LEN 5 /** diskstat_t * This structure is used to store performance information on disks. */ typedef struct diskstat_t diskstat_t; struct diskstat_t { dev_t dev; unsigned long speed; }; /** md5_asstr * Stores a hex digest string into str, from an MD5 digest. */ static void md5_asstr(char *str, size_t strlen, const uint8_t *digest) { snprintf(str,strlen, "%02x%02x%02x%02x%02x%02x%02x%02x" "%02x%02x%02x%02x%02x%02x%02x%02x", digest[0], digest[1], digest[2], digest[3], digest[4], digest[5], digest[6], digest[7], digest[8], digest[9], digest[10],digest[11], digest[12],digest[13],digest[14],digest[15] ); } /** crc_asstr * Stores a hex digest string into str, from a CRC32 digest. */ static void crc_asstr(char *str, size_t strlen, const U8 *digest) { snprintf(str,strlen,"%02X%02X%02X%02X", digest[0],digest[1],digest[2],digest[3]); } /** strtomd5 * Fills md5 with the MD5 digest of the string in str. */ void strtomd5(char *md5, size_t md5len, char *str) { uint8_t digest[16]; MD5_CTX ctx; assert(md5 != NULL && str != NULL); MD5Init(&ctx); MD5Update(&ctx,str,strlen(str)); MD5Final(digest,&ctx); md5_asstr(md5,md5len,digest); } /** assign_hash * Assigns a pair of MD5 and CRC values to a pack. */ static void assign_hash(pack_t *p, const char *md5, const char *crc) { WLOCK(p); if(p->md5 != NULL) xfree(p->md5); p->md5 = xstrdup(md5); if(p->crc != NULL) xfree(p->crc); p->crc = xstrdup(crc); p->hashed = xtime(); WUNLOCK(p); se_notify_pack_hashed(0,p); } static int hash_file(int fd, char *md5, char *crc, diskstat_t *dstat, int *err) { struct timeval begin,end; unsigned long elapsed,bytes_per_tick,sleep_per_tick; char buffer[4096]; MD5_CTX md5ctx; CRC32_CTX crc32ctx; int tick,tmp; ssize_t ret = -1,rem; double rate; MD5Init(&md5ctx); CRC32Init(&crc32ctx); for(;;) { if(thread_should_end()) return 1; LOCK(global->settings); tmp = global->settings->hashing_io_usage > 0 ? global->settings->hashing_io_usage : 1; UNLOCK(global->settings); rate = (double) dstat->speed * ((double)tmp / 100.0); bytes_per_tick = (unsigned long) (rate / (double)TICKS); bytes_per_tick -= bytes_per_tick % sizeof(buffer); sleep_per_tick = (unsigned long) (1000000.0 / (double) TICKS) * (tmp == 100 ? 0.0 : (double)(100-tmp) / 100.0); if(sleep_per_tick > 1000000) sleep_per_tick = 1000000; ret = 1; tick = 0; elapsed = 0; while(tick < TICKS && ret > 0) { rem = bytes_per_tick; gettimeofday(&begin,NULL); while(rem > 0 && ret > 0) { ret = read(fd,buffer,sizeof(buffer)); if(ret > 0) { MD5Update(&md5ctx,buffer,(size_t)ret); CRC32Update(&crc32ctx,buffer,(size_t)ret); rem -= ret; } else *err = errno; } gettimeofday(&end,NULL); elapsed += diff_tv_usec(&begin,&end); if(sleep_per_tick > 0) usleep(sleep_per_tick); tick++; } if(ret > 0 || (ret >= 0 && tick > 1)) { if(ret == 0) tick--; rem = bytes_per_tick*tick - rem; rate = ((double)rem / (double)elapsed)*1000000.0; dstat->speed += (unsigned long) ((rate - (double)dstat->speed) / 8.0); dstat->speed -= dstat->speed % 4096; if(dstat->speed < 256*1024) dstat->speed = 256*1024; if(dstat->speed > 50*1024*1024) dstat->speed = 50*1024*1024; } if(ret <= 0) break; } if(ret == 0) { MD5Final(buffer,&md5ctx); md5_asstr(md5,64,buffer); CRC32Final(buffer,&crc32ctx); crc_asstr(crc,64,buffer); } else if(ret == -1 && *err == EINTR) ret = 1; return (int) ret; } static void remove_pack(pack_t *pack) { packlist_t **lists; int i,no_lists; LOCK(global); lists = xalloc(sizeof(packlist_t *)*(global->no_packlists+1)); for(i = 0, no_lists = 0; i < global->no_packlists; ++i) { LOCK(global->packlists[i]); if(global->packlists[i]->deleted == 0) lists[no_lists++] = global->packlists[i]; UNLOCK(global->packlists[i]); } UNLOCK(global); for(i = 0; i < no_lists; ++i) pack_del(lists[i], pack); xfree(lists); } static diskstat_t *get_stats(diskstat_t ***stats, int *no_stats, struct stat *sdata) { diskstat_t *dstat = NULL; int i; for(i = 0; i < *no_stats && dstat == NULL; ++i) { if(((*stats)[i])->dev == sdata->st_dev) dstat = (*stats)[i]; } if(dstat == NULL) { dstat = xalloc(sizeof(diskstat_t)); dstat->dev = sdata->st_dev; dstat->speed = 1024 * 1024; PTRARR_ADD(stats,no_stats,dstat); } return dstat; } static int hash_open(const char *fn, struct stat *sdata, int *err) { int fd; fd = open(fn,O_RDONLY | O_LARGEFILE); if(fd == -1) { *err = errno; return -1; } if(fstat(fd,sdata) == -1) { *err = errno; close(fd); return -1; } return fd; } static void hash_close(int fd) { close(fd); } static void find_unhashed(pqueue_t *reqs) { int i,j; LOCK(global); for(i = 0; i < global->no_packlists && pqueue_length(reqs) < MAX_REQS_LEN; ++i) { packlist_t *list = global->packlists[i]; LOCK(list); if(list->deleted != 0) { UNLOCK(list); continue; } for(j = 0; j < list->no_packs && pqueue_length(reqs) < MAX_REQS_LEN; ++j) { RLOCK(list->packs[j]); if(!list->packs[j]->hashed) { RUNLOCK(list->packs[j]); pack_ref(list->packs[j]); pqueue_push_back(reqs,list->packs[j]); } else { RUNLOCK(list->packs[j]); } } UNLOCK(list); } UNLOCK(global); } int hash_request(pack_t *p) { int ret = -1; LOCKR(&(global->hashing.lock)); if(global->hashing.reqs != NULL) { if(pqueue_length(global->hashing.reqs) < MAX_REQS_LEN) { pack_ref(p); pqueue_push_back(global->hashing.reqs,p); thread_wake_up(&(global->hash_thread)); ret = 0; } } UNLOCKR(&(global->hashing.lock)); return ret; } /** hash_thread * Main loop of the hashing thread, searches for unhashed packs every * 30 seconds, hashing them when found. */ void *hash_thread(void *arg) { struct stat sdata; char fn[2048],md5[64],crc[64]; pack_t *pack; int no_stats = 0; diskstat_t **stats = NULL,*dstat; pqueue_t *reqs; thread_signal_started(&(global->hash_thread)); LOG_TITLE("Hashing Thread"); LOGTP(L_INF,"Started"); reqs = pqueue_init(NULL,1); LOCKR(&(global->hashing.lock)); global->hashing.reqs = reqs; UNLOCKR(&(global->hashing.lock)); for(;;) { if(pqueue_length(reqs) == 0) find_unhashed(reqs); if(pqueue_length(reqs) == 0 && !thread_should_end()) thread_sleep(60); if(thread_should_end()) break; while((pack = pqueue_pop_front(reqs)) != NULL) { char errbuf[128]; int fd = -1, ret = -1, err = 0; RLOCK(pack); if(pack->hashed != 0) { RUNLOCK(pack); free_pack(pack); continue; } else { xstrncpy(fn,pack->file,sizeof(fn)); } RUNLOCK(pack); LOGP(L_INF,"Hashing of \"%s\", started.",fn); fd = hash_open(fn,&sdata,&err); if(fd != -1) { dstat = get_stats(&stats,&no_stats,&sdata); ret = hash_file(fd,md5,crc,dstat,&err); } if(ret == 0) { LOGP(L_INF,"Hashing of \"%s\", finished [MD5:%s] [CRC:%s].",fn,md5,crc); assign_hash(pack,md5,crc); } else if(ret < 0) { LOGP(L_INF,"Hashing of \"%s\", failed (%s), deleting associated pack.",fn,lstrerror_r(err,errbuf,sizeof(errbuf))); remove_pack(pack); } else { LOGP(L_INF,"Hashing of \"%s\", interrupted.",fn); } hash_close(fd); free_pack(pack); } } LOCKR(&(global->hashing.lock)); global->hashing.reqs = NULL; UNLOCKR(&(global->hashing.lock)); while((pack = pqueue_pop_front(reqs)) != NULL) free_pack(pack); LOGTP(L_INF,"Shutdown"); thread_signal_finished(); return NULL; }