Consumer Queue - mutex destroyed while busy
-
(in VC++11 Dev Pre)
folgender Aufbau: (Header)
class TaskQueue { private: // LOCKS std::mutex TasksAccessMutex; std::mutex WaitForCompletion; private: std::atomic_uint Running; // wird im Konstruktor 0 std::atomic_uint MaxRunning; // wird im Konstruktor 2 std::atomic_bool Terminate; // wird im Konstruktor false std::thread* Feeder; // Aufgaben -> Threads void Feed(); // Füttert Threads std::deque < std::pair <CountingEvent, PARAMETER_TYPE> > Tasks; // beinhaltet Aufgaben public: ~TaskQueue(); // Lässt alles runterfahren void AddTask(CountingEvent E, const PARAMETER_TYPE& Params); //Aufgabe hinzufügen };
Source:
void TaskQueue::AddTask(CountingEvent E, const PARAMETER_TYPE& Params) { std::lock_guard <std::mutex> TasksLock(TasksAccessMutex); // Zugriff auf "Tasks" sperren Tasks.push_back( std::pair <CountingEvent, PARAMETER_TYPE> (E,Params) ); // Aufgabe hinzufügen } //--------------------------------------------------------------------------- void TaskQueue::Feed() { std::lock_guard <std::mutex> Stopper(WaitForCompletion); while (!Terminate.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_FEED_SHORT)); std::lock_guard <std::mutex> TaskLocker (TasksAccessMutex); if (Running.load() >= MaxRunning.load() || Tasks.empty()) { std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_FEED_LONG)); continue; } std::thread AsynchronousWorker(Tasks.front().first, Tasks.front().second, this); AsynchronousWorker.detach(); Tasks.pop_front(); } } //--------------------------------------------------------------------------- TaskQueue::~TaskQueue() { // Task Queue löschen Terminate.store(true); std::unique_lock <std::mutex> Stopper(WaitForCompletion); std::lock_guard <std::mutex> TALock(TasksAccessMutex); if (Feeder->joinable()) { Feeder->join(); } delete Feeder; } //---------------------------------------------------------------------------
main:
RETURN_TYPE Test(const PARAMETER_TYPE& Params) { std::lock_guard <std::mutex> lock(CStreamLock); std::cout << Params.GetStr().c_str() << std::endl; return true; } int main() { std::cout << "Hello" << std::endl; TaskQueue* Tasks = new TaskQueue(); DynByteArray bytes; bytes.Add(std::string("str")); for (int i = 0; i < 5; i++) { Tasks->AddTask(CountingEvent(&Test),bytes); } std::cin.get(); delete Tasks; return 0; }
Problematik:
Einmal geht (immer?). Aber die anderen Aufgaben werden mehr oder weniger zufällig ausgeführt. Meistens bekomme ich ein "abort()" an den Kopf (was das heißt, weiß ich nicht). Oder einen Verweis in <mutex> und zwar in die unlock Funktion bei:
int _Mtx_unlock(_Mtx_t *mtx) { /* unlock mutex */ #if !(defined(_M_CEE)) _THREAD_ASSERT(1 <= (*mtx)->count && (*mtx)->thread_id == GetCurrentThreadId(), "unlock of unowned mutex"); #else _THREAD_ASSERT(1 <= (*mtx)->count && _Thrd_equal((*mtx)->owner, thrd_current()), "unlock of unowned mutex"); #endif if (--(*mtx)->count != 0) ; else if (((*mtx)->type & ~_Mtx_recursive) == _Mtx_plain) { #if !(defined(_M_CEE)) (*mtx)->cs.unlock(); (*mtx)->thread_id = -1; // hier #else LeaveCriticalSection((*mtx)->hnd); #endif
Sollte es das Programm doch mal bis zur Destruktion schaffen, dann bekomme ich ein "abort()" mit der Nachricht in der Console, die lautet "mutex destroyed while busy".
Was vergurke ich denn hier so beim locken?
-
Es war ein Fehler in <mutex>
Ich habe jetzt die Beta installiert und alles funktioniert wunderbar.
Sowas hatte ich noch nie, dass ich mich auf die std Bibliothek verlassen konnte. (Naja ok, die ist ja auch Alpha gewesen in dem Fall)EDIT: oben am Code noch ein / zwei Änderung vorgenommen, da in Suche nach dem Fehler unsauber gemacht.
EDIT 2: Noch eine kleine Sache geändert.