Witam!
Wątek A przygotowywuje dane, po przygotowaniu wykonuje funkcję przekazując dane do wątku B.
Dane są kopiowane i wątek B jest wybudzany w celu dalszej obróbki danych.
Dane nie mogą być kolejkowane, jeśli wątek B nie dokończył przetwarzania i nie jest gotowy na przyjęcie nowych danych, wątek A czeka.
Czemu wątki? Ponieważ podczas obróbki danych przez wątek B, wątek A może przygotowywać kolejne dane.
Czyli kolejność działań wątków(zachodzą na siebie) powinien być taki:
Wątek A
Wątek B
Wątek A
Wątek B
....
Przygotowałem prosty kod, aby upewnić się że kolejność działań jest prawidłowa:
#include <stdio.h>
#include <pthread.h>
struct worker_t{
int terminate;
pthread_t thread;
pthread_cond_t cond;
pthread_mutex_t mutex;
int tg;
};
// thread B
static void *worker_run(void *arg){
int cnt = 0;
struct worker_t * worker = (struct worker_t*)arg;
pthread_mutex_lock(&worker->mutex);
while(!worker->terminate){
pthread_cond_wait(&worker->cond, &worker->mutex);
// tutaj przyszłościowo: obrobienie danych przekazanych z wątku A
printf("Thread B: %d\n", cnt);
++cnt;
}
pthread_mutex_unlock(&worker->mutex);
}
void worker_create(struct worker_t * worker){
worker->terminate = 0;
pthread_cond_init(&worker->cond, NULL);
pthread_mutex_init(&worker->mutex, NULL);
pthread_create(&worker->thread, NULL, worker_run, (void *)(worker));
}
void worker_destroy(struct worker_t * worker){
printf("%s\n",__FUNCTION__);
worker->terminate = 1;
worker->tg = 0;
pthread_join(worker->thread, NULL);
pthread_cond_destroy(&worker->cond);
pthread_mutex_destroy(&worker->mutex);
}
void worker_wakeup(struct worker_t * worker){
pthread_mutex_lock(&worker->mutex);
// tutaj przyszłościowo: przekopiowanie danych do wątku B z wątku A
pthread_cond_signal(&worker->cond);
pthread_mutex_unlock(&worker->mutex);
}
// thread A
int main(){
struct worker_t worker;
int i;
worker_create(&worker);
usleep(10 * 1000);
for(i=0; i<3; ++i){
printf("Thread B: %d\n", i); fflush(stdout);
worker_wakeup(&worker);
}
worker_destroy(&worker);
return 0;
}
Moje rozumowanie podczas tworzenia kodu było następujące:
- W wątku B jest blokowany mutex, który jest zwalniany tylko podczas oczekiwania na sygnał.
- Po otrzymaniu sygnału, atomicznie mutex jest blokowany.
- Nie ma możliwości, aby kolejny raz funkcja pthread_cond_signal została wywołana, ponieważ blokuje go mutex
Wynik jednak jest inny:
Mainer: 0
Mainer: 1
Mainer: 2
worker_destroy
Worker: 0 // nieistotne
Wygląda na to, że po wysłaniu sygnału, wątek B nie wybudza się i nie blokuje atomicznie mutex'a, stąd "signal" wywołuje się kilkukrotnie.
Jest to wg. mnie nie logiczne, po co w takim razie "wait" odblokowuje mutexy i je blokuje? (może ma to jakiś sens kiedy jest kilka wątków czekających)
Powyższy kod w QT:
#include <QDebug>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
class Worker: public QThread
{
public:
Worker() : QThread(){
mTerminate = false;
start();
msleep(100);
}
~Worker(){
mTerminate = true;
this->wait();
}
void wakeup();
private:
QMutex mMutex;
QWaitCondition mWaitCondition;
bool mTerminate;
protected:
void run();
};
void Worker::run(){
int cnt = 0;
QMutexLocker ml(&mMutex);
while(!mTerminate){
// here: mutex is lock
mWaitCondition.wait(&mMutex);
/* wait - how it's work:
*
* unlock + wait (atomic)
* awake + lock (atomic)
*
*/
// here: mutex is lock
qDebug() << "Thread B:" << cnt;
cnt++;
}
}
void Worker::wakeup(){
QMutexLocker ml(&mMutex);
mWaitCondition.wakeAll();
}
int main(){
Worker * worker = new Worker();
for(int i=0; i<3; ++i){
qDebug() << "Thread A:"<<i;
worker->wakeup();
}
delete worker;
return 0;
}
Oczywiście można to zrobić za pomocą 2 pthread_cond_t/QWaitCondition czy też za pomocą semaforów(2x), ale interesuje mnie dobre rozwiązanie.
Jak to najlepiej rozwiązać?