TCP Socket - C Language
pool.c
Vai alla documentazione di questo file.
00001 
00012 #include "pool.h"
00013 #include "errore.h"
00014 
00016 void *thWorker(void * arg);
00017 
00018 th_pool * poolInit(int dim, int job_max, sigset_t * set){
00019         /* Dichiarazione della struttura che conterra' questo pool di threads */
00020         th_pool *pool = NULL;
00021         int n=0;
00022 
00023         if (dim<1){
00024                 errno = EINVAL;
00025                 sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata");
00026                 return NULL;
00027         }
00028         if (job_max<1){
00029                 errno = EINVAL;
00030                 sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato");
00031                 return NULL;
00032         }
00033 
00034         /* Allocazione memoria per la struttura del pool */
00035         pool = (th_pool*) calloc(1, sizeof(th_pool));
00036         /* Setto i parametri operativi del pool */
00037         pool->pool_size = dim;
00038         pool->signalmask = set;
00039         pool->job_max = job_max;
00040         /*pool->firstjob = NULL;
00041         pool->lastjob = NULL;
00042         pool->job_size = 0;*/
00043         /* Creazione array dei tids dei threads */
00044         pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t));
00045 
00046         /* Inizializzo il mutex e le cond 'notempty' e 'empty' per la lista dei job */
00047         if (pthread_mutex_init(&(pool->lock), NULL))
00048                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init");
00049 
00050         if (pthread_cond_init(&(pool->notempty),NULL))
00051                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty");
00052 
00053         if (pthread_cond_init(&(pool->empty),NULL))
00054                 sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty");
00055 
00056         /* Creazione dei threads */
00057         for (n = 0; n < pool->pool_size; n++)
00058                 if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){
00059                         sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create");
00060                         /* Manca una reale gestione degli errori
00061                          * in caso la pthread_create fallisca */
00062                 }
00063 
00064         return pool;
00065 }
00066 
00067 /* Aggiungo un nuovo job alla lista dei job */
00068 int poolDispatcher(th_pool *pool, void *(*start_routine)(void*), void *arg){
00069         joblist * newjob = NULL;
00070 
00071         if (!pool){
00072                 errno = EINVAL;
00073                 sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo");
00074                 return -1;
00075         }
00076         if (!start_routine){
00077                 errno = EINVAL;
00078                 sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla");
00079                 return -2;
00080         }
00081         if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){
00082                 sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)");
00083                 return -3;
00084         }
00085         newjob->next = NULL;
00086         newjob->arg = arg;
00087         newjob->start_routine = start_routine;
00088 
00089         /* Accesso in muta esclusione alla lista dei job */
00090         pthread_mutex_lock(&(pool->lock));
00091 
00092         /* Se il pool di threads e' in fase di terminazione rifiuto il job */
00093         if (pool->quitflag){
00094                 errno = ESHUTDOWN;
00095                 sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura");
00096                 free(newjob);
00097                 pthread_mutex_unlock(&(pool->lock));
00098                 return -4;
00099         }
00100 
00101         /* Se la coda di job e' piena rifiuto il nuovo job*/
00102         if (pool->job_size==pool->job_max){
00103                 errno = EAGAIN;
00104                 sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool");
00105                 free(newjob);
00106                 pthread_mutex_unlock(&(pool->lock));
00107                 return -5;
00108         }
00109 
00110         /* lista dei jobs vuota -> inserimento in testa */
00111         if (pool->job_size == 0){
00112                 pool->firstjob = newjob;
00113                 pool->lastjob = newjob;
00114                 /* segnalo che la coda non e' piu' vuota */
00115                 pthread_cond_signal(&(pool->notempty));
00116         }
00117         else{   /* inserimento in coda */
00118                 pool->lastjob->next = newjob;
00119                 pool->lastjob = newjob;
00120         }
00121         pool->job_size++;
00122 
00123         pthread_mutex_unlock(&(pool->lock));
00124         return 0;
00125 }
00126 
00127 int poolDestroy(th_pool *pool){
00128         int n=0;
00129 
00130         if (!pool){
00131                 errno = EINVAL;
00132                 sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo");
00133                 return -1;
00134         }
00135 
00136         pthread_mutex_lock(&(pool->lock));
00137         /* da questo momento in poi il pool rifiutera' nuovi job */
00138         pool->quitflag = 1;
00139         /* aspetto la terminazione dei job attivi e in coda */
00140         while (pool->job_size != 0)
00141                 pthread_cond_wait(&(pool->empty), &(pool->lock));
00142 
00143         /* termino definitivamente i threads del pool */
00144         pool->shutdown = 1;
00145         pthread_cond_broadcast(&(pool->notempty));
00146         pthread_mutex_unlock(&(pool->lock));
00147 
00148         for (n=0; n<pool->pool_size; n++){
00149                 pthread_cond_broadcast(&(pool->notempty));
00150                 if (pthread_join(pool->tids[n], NULL) != 0)
00151                         sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join");
00152         }
00153 
00154         /* rilascio le risorse */
00155         pthread_mutex_destroy(&(pool->lock));
00156         pthread_cond_destroy(&(pool->empty));
00157         pthread_cond_destroy(&(pool->notempty));
00158         free(pool->tids);
00159         free(pool);
00160         pool = NULL;
00161         return 0;
00162 }
00163 
00164 void *thWorker(void *arg){
00165         th_pool *pool = NULL;
00166         joblist * job = NULL;
00167 
00168         pool = (th_pool *) arg;
00169 
00170         /* Gestine dei segnali, se voluta */
00171         if (pool->signalmask && (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0))
00172                 sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'");
00173 
00174         for (;;){
00175                 /* Accesso in muta esclusione alla lista dei jobs */
00176                 pthread_mutex_lock(&(pool->lock));
00177 
00178                 while (pool->job_size == 0){    /* coda vuota -> attesa */
00179                         /* rilascio il lock e attendo */
00180                         pthread_mutex_unlock(&(pool->lock));
00181 
00182                         /* controllo che il pool non sia in fase di chiusura */
00183                         if (pool->shutdown)
00184                                 pthread_exit(NULL);
00185 
00186                         pthread_cond_wait(&(pool->notempty), &(pool->lock));
00187 
00188                         /* controllo che il pool non sia in fase di chiusura */
00189                         if (pool->shutdown) {
00190                                 pthread_mutex_unlock(&(pool->lock));
00191                                 pthread_exit(NULL);
00192                         }
00193                 }
00194 
00195                 job = pool->firstjob;
00196                 pool->firstjob = job->next;
00197                 pool->job_size--;
00198                 /* se la coda dei job e' vuota */
00199                 if (pool->job_size==0)
00200                         pool->lastjob = NULL;
00201 
00202                 /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */
00203                 if (pool->job_size == 0 && pool->quitflag)
00204                         pthread_cond_signal(&(pool->empty));
00205 
00206                 pthread_mutex_unlock(&(pool->lock));
00207 
00208                 /* Eseguo il job */
00209                 job->start_routine(job->arg);
00210 
00211                 free(job);
00212                 job = NULL;
00213         }
00214         pthread_exit(NULL);
00215         return NULL;
00216 }