pthread_cond_wait / pthread_cond_broadcast : Was mit dem Mutex machen?



  • Hallo,
    ich hab ein kleines Problem: Ich habe ein Hauptthread und für eine bestimmte berechnung N worker-threads. Immer wenn ich den Punkt mit der Berechnung erreiche mache ich ein pthread_cond_broadcast um alle Threads aufzuwecken, am ende der Berechnung machen die Threads pthread_cond_wait. Der Hauptthread soll auf die worker-threads warten bis alle pthread_cond_wait erreicht haben:

    void *thread_function(void *arg)
    {
    	while(arg->run)
    	{
    		process_data(arg->data, arg->start, arg->len);
    		pthread_cond_wait(arg->cond, arg->mutex);
    	}
    	pthread_exit(NULL);
    }
    
    void do_something(threaddata_t *threads)
    {
    	if(!threads->created) {
    		for(int i=0;i<threads->N;i++) {
    			threads->threads_arg[i].cond = &threads->cond;
    			threads->threads_arg[i].mutex = &threads->mutex;
    			threads->threads_arg[i].data = threads->data;
    			threads->threads_arg[i].start = (threads->data_len/N)*i;
    			threads->threads_arg[i].len = (threads->data_len/N);
    			threads->threads_arg[i].run = 1;
    			pthread_create(&threads->thread[i], &threads->attr, thread_function, &threads->threads_arg[i]);
    		}
    		threads->created=1;
    	}
    	else {
    		pthread_mutex_unlock(&threads->mutex);
    		pthread_cond_broadcast(&threads->cond);
    	}
    
    	/* soll hier warten bis alle Threads wieder pthread_cond_wait erreicht haben */
    	pthread_mutex_lock(&threads->mutex);
    }
    

    Das ist der stark vereinfachte Code, grundsätzlich funktioniert es (die Threads berechnen was sie sollen), was nicht klappt ist die synchronisation, ich habe noch nicht verstanden welche von funktion wo den mutex locken/unlocken muss und was pthread_cond_wait mit dem mutex macht. Oder brauche ich für jeden Thread ein eigenen Mutex?

    Vielen Dank





  • Das zweite Beispiel scheint ungefähr das zu sein was ich will, aber ich blicke da nicht durch, da gibts unterschiedliche Signale mit verschiedenen Mutexen, die Threads holen ihre Aufgaben aus einer Liste auf die nur ein Thread zugreifen kann, das ist alles viel komplizierter als ich das brauche. Ich habe auch schon nach einfachen Beispielen gesucht, aber nichts gefunden.
    Alles was ich will ist alle Worker-Threads vom Hauptthread starten und der Hauptthread soll dann warten bis alle Worker-Threads wieder bei pthread_cond_wait sind (die Worker-Threads greifen schreibend nur auf getrennte Bereiche zurück, die müssen keine Rücksicht aufeinander nehmen), das sollte doch viel einfacher sein als das was dort ist. Gibt es irgendwo eine bessere Erklärung als die man-page was dieser Mutex genau macht bei pthread_cond_wait und wo den welcher Thread locken/unlocken soll.

    Vielen Dank



  • Wenn die Threads auf getrennte Bereiche zugreifen, dann gibt es auch keinen kritischen Bereich der synchronisiert werden muss.
    Ich denke, was du suchst, ist pthread_join, was auch in meinen Links erklärt ist:

    http://www.gentoo.org/doc/en/articles/l-posix1.xml



  • Theoretisch würde das gehen, aber dann müsste ich die Threads immer wieder neu erzeugen und ich denke das kostet zu viel Zeit (die Berechnung ("process_data" in meinem Code) dauert nur ein paar Millisekunden, aber es gibt halt immer neue Daten). Angenommen ein Thread erstellen dauert 1ms, dann frisst der Overhead durch Thread-Erstellung den Performace-Gewinn schon wieder auf.

    Nachtrag: Habe gerade gelesen das man mit 10ms für die Threaderstellung rechnen kann (das war ein Beitrag von 2008, selbst wenns jetzt schneller ist, 1ms wäre ja noch zu lang) und deshalb nicht immer neue Threads erstellen/beenden soll.



  • Wieso lässt du deine Threads nicht gleich mehrere Daten am Stück berechnen?

    Angenommen du hast 100 Zeilen Daten die berechnet werden müssen und 10 Threads, dann kann jeder Thread 10 Zeilen berechnen. Du musst nur noch die Zuteilung richtig machen. Also Thread 1 --> Zeile 1-10, Thread 2 --> Zeile 11 -20 etc.



  • Ich habe es ganz genauso gemacht, bei der Zuteilung klappt ja auch alles, jeder Thread macht genau das was er soll, nur verstehe ich nicht wie ich diesen Mutex verwenden muss, damit die Synchronisation vergleichbar mit pthread_create/pthread_join ist.

    Ich kann nicht mehr Daten am Stück verarbeiten, da das in Echtzeit erfolgen muss, dass heißt so wenig Datenpakete sammeln wie irgendwie möglich, sonst bekomme ich eine unerträglich hohe Latenzzeit (und ich müsste den ganzen Rest umschreiben, der ist auf ein Paket rein -> ein Paket raus ausgelegt, das wäre sehr viel Arbeit)



  • Ich glaube, dir ist einfach nicht klar, wozu man Mutex und Condition-Variablen benutzt. Les dir nochmals die Links durch.

    Vielleicht können dir Barriers in irgendeiner Form weiterhelfen...

    #define _XOPEN_SOURCE 600
    
    #include <pthread.h>
    #include <stdlib.h>
    #include <stdio.h>
    
    #define THREADS 100
    
    //Barrier variable
    pthread_barrier_t barr;
    int bcount = 0;
    int acount = 0;
    
    void *entry_point(void *arg)
    {
            int rank = (int) arg;
    
            printf("Thread: %d at sync point\n", rank);
            bcount++;
            //Synchronization point
    
            int rc = pthread_barrier_wait(&barr);
            if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {
                    printf("Could not wait on barrier\n");
                    exit(-1);
            }
    
            printf("Thread: %d finished\n", rank);
            acount++;
            return (0);
    }
    
    int main()
    {
            pthread_t thr[THREADS];
    
            //Barrier initialization
            if (pthread_barrier_init(&barr, NULL, THREADS)) {
                    printf("Could not create a barrier\n");
                    return -1;
            }
    
            for (int i = 0; i < THREADS; ++i) {
                    if (pthread_create(&thr[i], NULL, &entry_point, (void *) i)) {
                            printf("Could not create thread %d\n", i);
                            return -1;
                    }
            }
    
            for (int i = 0; i < THREADS; ++i) {
                    if (pthread_join(thr[i], NULL)) {
                            printf("Could not join thread %d\n", i);
                            return -1;
                    }
            }
    
            printf("Exit reached with 'before count':%d and 'after count':%d \n", bcount, acount);
            return 0;
    }
    

    L. G.
    Steffo



  • Was mir nicht klar ist wie dieser Mutex für pthread_cond_wait funktioniert. Hätte ich nur ein Worker-Thread könnte ich das alles mit einem Mutex lösen. Ich hab aber N Threads. Barriers scheinen mir nicht sinnvoll, da ich N voneinander unabhängige Threads habe und einer der auf alle anderen wartet (bzw. andersherum während nichts berechnet wird).

    Ich habe mir das schon alles durchgelesen, aber das hilft mir nicht weiter.
    1.) Wie genau funktioniert der Mutex für pthread_cond_wait? Brauch ich einen insgesamt oder für jeden Thread einen eigenen? Wann hält welcher Thread den Mutex?
    2.) Ich denke pthread_cond_wait ist hier genau das richtige, ich habe auch schon an ein rw-lock gedacht, das geht aber nicht, weil wenn alle Worker-Threads während der Berechnung ein readlock haben und der Hauptthread auf ein writelock wartet, bekommen die Threads immer wieder ein readlock, da der Thread ja ein neuen readlock bekommt wenn die anderen Threads noch nicht fertig sind, aber der Hauptthread würde nur ein writelock bekommen wenn alle Threads genaue gleichzeitig fertig sind.

    Gibt es kein einfaches Beispiel mit pthread_cond_wait und pthread_cond_broadcast (nicht Signal!) das ohne irgendwelche Tricks für verkettete Listen auskommt.



  • Hallo
    also grundsaetzliche musst du das cond_wait locken, soll heissen du machst ein
    lock vor dem cond_wait und ein unlock danach. Das musst du machen, weil du ja auf eine exklusive Ressource zu greifst, die von allen Threads genutzt wird.
    Ich wuerde allerdings die Variante mit der Barrier bevorzugen und wuerde auch den Hauptthread rechnen lassen oder soll der was anderes machen ?

    Daher meine Empfehlung fuer dich synchronisiere das erstmal mit Barrier, das kostet zwar mehr aber man versteht den Mechanismus dahinter.

    Eine Frage haette ich noch die Berechnungen werden die auf eine globale Ressource ausgefuehrt?



  • Hallo,
    jetzt glaub ich hab ichs verstanden: Der Mutex ist nur dazu da den Zugriff auf die Condition-Variable abzusichern, das ist alles. Das Warten hab ich jetzt mit barrier realisiert (aber ohne den hauptthread mitrechnen zu lassen):

    void *thread_function(void *arg)
    {
        while(arg->run)
        {
            process_data(arg->data, arg->start, arg->len);
            pthread_barrier_wait(arg->barrier);
            pthread_mutex_lock(arg->mutex);
            pthread_cond_wait(arg->cond, arg->mutex);
            pthread_mutex_unlock(arg->mutex);
        }
        pthread_exit(NULL);
    }
    
    void do_something(threaddata_t *threads)
    {
        if(!threads->created) {
        	pthread_barrier_init(&threads->barrier,NULL,threads->N+1);
        	pthread_cond_init(&threads->cond,NULL);
        	pthread_mutex_init(&threads->mutex,NULL);
            for(int i=0;i<threads->N;i++) {
                threads->threads_arg[i].cond = &threads->cond;
                threads->threads_arg[i].mutex = &threads->mutex;
                threads->threads_arg[i].barrier = &threads->barrier;
                threads->threads_arg[i].data = threads->data;
                threads->threads_arg[i].start = (threads->data_len/threads->N)*i;
                threads->threads_arg[i].len = (threads->data_len/threads->N);
                threads->threads_arg[i].run = 1;
                pthread_create(&threads->thread[i], &threads->attr, thread_function, &threads->threads_arg[i]);
            }
            threads->created=1;
        }
        else {
            pthread_cond_broadcast(&threads->cond);
        }
    
        /* soll hier warten bis alle Threads wieder pthread_cond_wait erreicht haben */
        pthread_barrier_wait(&threads->barrier);
    }
    

    Es funktioniert bis jetzt problemlos und die Geschwindigkeit hat sich bei 4 threads auf einem dual-core mit Hyperthreading mehr als verdoppelt, ich denke damit scheint die Umsetzung auch ganz gut zu sein.
    Die Berechnung führen alle Threads auf genau dem gleichen Objekt zu, schreiben tun sie aber in unterschiedliche Bereiche (beim Lesen gibt es z.T überschneidungen, aber das ist ja kein Problem). Grundsätzlich ist der Lese- und Schreibbereich getrennt, das heißt in einem Teil wird nur gelesen, in einem anderen nur geschrieben.
    Vielen Dank!


Anmelden zum Antworten