Threads Pool - C Language
Riferimenti per il file thpool.c
#include "thpool.h"

Vai al codice sorgente di questo file.

Funzioni

void * thWorker (void *arg)
th_poolpoolInit (int dim, int job_max, sigset_t *set)
int poolDispatcher (th_pool *pool, void *(*start_routine)(void *), void *arg)
int poolDestroy (th_pool *pool)

Descrizione dettagliata

Implementazione della libreria di creazione e gestione di un pool di threads.

Autore:
Tranchida Giulio, No Matricola 241732
Si dichiara che il contenuto di questo file e', in ogni sua parte, opera originale dell'autore.
Versione:
2.0

Definizione nel file thpool.c.


Documentazione delle funzioni

void * thWorker ( void *  arg)

A solo uso interno

Definizione alla linea 196 del file thpool.c.

                         {
        th_pool *pool = NULL;
        joblist * job = NULL;

        pool = (th_pool *) arg;

        /* Gestine dei segnali, se voluta */
        if (pool->signalmask)
                if (pthread_sigmask(SIG_SETMASK, pool->signalmask, NULL) != 0)
                        sys_err(__FILE__,__LINE__,"thWorker: pthread_sigmask error 'SIG_SETMASK'");

        for (;;){
                /* Accesso in muta esclusione alla lista dei jobs */
                pthread_mutex_lock(&(pool->lock));

                while (pool->job_size == 0){    /* coda vuota -> attesa */
                        /* rilascio il lock e attendo */
                        pthread_mutex_unlock(&(pool->lock));

                        /* controllo che il pool non sia in fase di chiusura */
                        if (pool->shutdown){
                                pthread_mutex_unlock(&(pool->lock));
                                return NULL; /* pthread_exit(NULL); NO THANKS!! */
                        }

                        pthread_cond_wait(&(pool->notempty), &(pool->lock));
                }

                job = pool->firstjob;
                pool->firstjob = job->next;
                pool->job_size--;

                /* se la coda dei job e' vuota */
                if (pool->job_size==0)
                        pool->lastjob = NULL;

                /* segnalo che la coda e' nuovamente vuota (vedi poolDestroy) */
                if (pool->job_size == 0 && pool->quitflag)
                        pthread_cond_signal(&(pool->empty));

                pthread_mutex_unlock(&(pool->lock));

                /* Eseguo il job */
                job->start_routine(job->arg);

                free(job);
                job = NULL;
        }
        pthread_exit((void*)0);
}
th_pool* poolInit ( int  dim,
int  job_max,
sigset_t *  set 
)

poolInit

Funzione che inizializza e ritorna un Pool di Threads

Parametri:
dimDimensione del pool di threads
job_maxDimensione massima della coda dei jobs
*setOpzionale. Maschera dei segnali che il pool usera' durante la creazione dei threads. Se NULL viene ignorata.
Restituisce:
th_pool * Una nuovo pool di threads
NULL e setta errno se dim < 1

Definizione alla linea 27 del file thpool.c.

                                                        {
        /* Dichiarazione della struttura che conterra' questo pool di threads */
        th_pool *pool = NULL;
        int n=0;

        if (dim<1){
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"PoolInit: dimensione del pool di thread errata");
                return NULL;
        }
        if (job_max<1){
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"PoolInit: numero massimo di job errato");
                return NULL;
        }

        /* Allocazione memoria per la struttura del pool */
        if (!(pool = (th_pool*) malloc(sizeof(th_pool)))){
                sys_err(__FILE__,__LINE__,"PoolInit: errore calloc");
                return NULL;
        }

        /* Setto i parametri operativi del pool */
        pool->pool_size = dim;
        pool->signalmask = set;
        pool->job_max = job_max;
        pool->firstjob = NULL;
        pool->lastjob = NULL;
        pool->quitflag = 0;
        pool->shutdown = 0;
        pool->job_size = 0;

        /* Creazione array dei tids dei threads */
        if(!(pool->tids = (pthread_t*) calloc(pool->pool_size, sizeof(pthread_t)))){
                sys_err(__FILE__,__LINE__,"PoolInit: errore calloc",errno);
                free(pool);
                return NULL;
        }

        /* Inizializzo il mutex e le cond 'full' e 'empty' per la lista dei job */
        if (pthread_mutex_init(&(pool->lock), NULL)){
                sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_mutex_init");
                free(pool->tids); free(pool);
                return NULL;
        }

        if (pthread_cond_init(&(pool->notempty),NULL)){
                sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init notempty");
                free(pool->tids); free(pool);
                return NULL;
        }

        if (pthread_cond_init(&(pool->empty),NULL)){
                sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_cond_init empty");
                free(pool->tids); free(pool);
                return NULL;
        }

        /* Creazione dei threads */
        for (n = 0; n < pool->pool_size; n++)
                if (pthread_create(&(pool->tids[n]), NULL, &thWorker, pool) != 0){
                        sys_err(__FILE__,__LINE__,"PoolInit: errore pthread_create",errno);
                        free(pool->tids); free(pool);
                        return NULL;
                }

        return pool;
}
int poolDispatcher ( th_pool pool,
void *(*)(void *)  start_routine,
void *  arg 
)

poolDispatcher

Funzione che sottopone un nuovo job al pool di threads

Parametri:
*poolIl pool di thread
*(*start_routine)(void*)Il job da fare eseguire al pool
*argargomento passato al job
Restituisce:
pthread_t Il thread id a cui e' stato assegnato il job
-1 se *pool e' NULL e setta errno
-2 se *start_routine e' NULL e setta errno
-3 se fallisce l'allocazione di un nuovo job
-4 se il pool di thread e' in fase di terminazione (rifiuta il job). -5 se il numero di job attualmente pendenti nella coda e' uguale al numero massimo possibile di job in attesa.

Definizione alla linea 97 del file thpool.c.

                                                                           {
        joblist * newjob = NULL;

        if (!pool){
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"Dispatch: errore pool e' nullo");
                return -1;
        }
        if (!start_routine){
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"Dispatch: errore start_routine e' nulla");
                return -2;
        }

        if (!(newjob = (struct joblist *) malloc(sizeof(struct joblist)))){
                sys_err(__FILE__,__LINE__,"Dispatch: errore malloc(sizeof(struct joblist)");
                return -3;
        }
        newjob->next = NULL;
        newjob->arg = arg;
        newjob->start_routine = start_routine;

        /* Accesso in muta esclusione alla lista dei job */
        pthread_mutex_lock(&(pool->lock));

        /* Se il pool di threads e' in fase di terminazione rifiuto il job */
        if (pool->quitflag){
                /*errore(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura",ESHUTDOWN);*/
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"Dispatch: errore pool di thread in fase di chiusura");
                free(newjob);
                pthread_mutex_unlock(&(pool->lock));
                return -4;
        }

        if (pool->job_size==pool->job_max){
                errno = EAGAIN;
                sys_err(__FILE__,__LINE__,"Dispatch: errore troppi job in attesa nella coda del pool di threads");
                free(newjob);
                pthread_mutex_unlock(&(pool->lock));
                return -5;
        }

        switch(pool->job_size){
                case 0:/* lista dei jobs vuota -> inserimento in testa */
                        pool->firstjob = newjob;
                        pool->lastjob = newjob;
                        /* segnalo che la coda non e' piu' vuota */
                        pthread_cond_signal(&(pool->notempty));
                break;
                default:/* inserimento in coda */
                        pool->lastjob->next = newjob;
                        pool->lastjob = newjob;
                break;
        }
        pool->job_size++;

        pthread_mutex_unlock(&(pool->lock));
        return 0;
}
int poolDestroy ( th_pool pool)

poolDestroy

Distrugge un Pool di Threads: uccide i threads e libera le risorse
In particolare:

  • blocca la possibilita' di sottomettere un nuovo job al pool,
  • aspetta che tutti i job pendenti vengano eseguiti,
  • esegue la join di tutti i threads,
  • libera le risorse e esce.
Parametri:
*poolIl pool di threads da distruggere
Restituisce:
0 se tutto e' andato bene
-1 se *pool e' NULL e setta errno

Definizione alla linea 158 del file thpool.c.

                              {
        int n=0;

        if (!pool){
                errno = EINVAL;
                sys_err(__FILE__,__LINE__,"PoolDestroy: errore pool e' nullo");
                return -1;
        }

        pthread_mutex_lock(&(pool->lock));
        /* da questo momento in poi il pool rifiutera' nuovi job */
        pool->quitflag = 1;
        /* aspetto la terminazione dei job attivi e in coda */
        while (pool->job_size != 0)
                pthread_cond_wait(&(pool->empty), &(pool->lock));

        /* termino definitivamente i threads del pool */
        pool->shutdown = 1;
        pthread_cond_broadcast(&(pool->notempty));
        pthread_mutex_unlock(&(pool->lock));

        for (n=0; n<pool->pool_size; n++){
                pthread_cond_broadcast(&(pool->notempty));
                if (pthread_join(pool->tids[n], NULL) != 0)
                        sys_err(__FILE__,__LINE__,"PoolDestroy: errore pthred_join");
        }

        /* rilascio le risorse */
        pthread_mutex_destroy(&(pool->lock));
        pthread_cond_destroy(&(pool->empty));
        pthread_cond_destroy(&(pool->notempty));

        free(pool->tids);
        free(pool);
        pool = NULL;
        return 0;
}