Consumer Queue - mutex destroyed while busy



  • (in VC++11 Dev Pre)

    folgender Aufbau: (Header)

    class TaskQueue {
    private:	// LOCKS
    	std::mutex TasksAccessMutex;
    	std::mutex WaitForCompletion;
    private:
    	std::atomic_uint Running;  // wird im Konstruktor 0
    	std::atomic_uint MaxRunning;  // wird im Konstruktor 2
    	std::atomic_bool Terminate;  // wird im Konstruktor false
    	std::thread* Feeder;  // Aufgaben -> Threads
    	void Feed();  // Füttert Threads
    
    	std::deque < std::pair <CountingEvent, PARAMETER_TYPE> > Tasks;  // beinhaltet Aufgaben
    public:
    	~TaskQueue();  // Lässt alles runterfahren
    
    	void AddTask(CountingEvent E, const PARAMETER_TYPE& Params);  //Aufgabe hinzufügen
    };
    

    Source:

    void TaskQueue::AddTask(CountingEvent E, const PARAMETER_TYPE& Params) {
    	std::lock_guard <std::mutex> TasksLock(TasksAccessMutex); // Zugriff auf "Tasks" sperren
    	Tasks.push_back( std::pair <CountingEvent, PARAMETER_TYPE> (E,Params) ); // Aufgabe hinzufügen
    
    }
    //--------------------------------------------------------------------------- 
    void TaskQueue::Feed() {
    	std::lock_guard  <std::mutex> Stopper(WaitForCompletion);
    	while (!Terminate.load()) {
    		std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_FEED_SHORT));
    		std::lock_guard <std::mutex> TaskLocker (TasksAccessMutex);
    		if (Running.load() >= MaxRunning.load() || Tasks.empty()) {
    			std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_FEED_LONG));
    			continue;
    		}
    		std::thread AsynchronousWorker(Tasks.front().first, Tasks.front().second, this);
    		AsynchronousWorker.detach();
    		Tasks.pop_front();
    	}
    }
    //--------------------------------------------------------------------------- 
    TaskQueue::~TaskQueue() {  // Task Queue löschen
    	Terminate.store(true);
    	std::unique_lock <std::mutex> Stopper(WaitForCompletion);
    	std::lock_guard <std::mutex> TALock(TasksAccessMutex);
    	if (Feeder->joinable()) {
    		Feeder->join();
    	}
    	delete Feeder;
    }
    //---------------------------------------------------------------------------
    

    main:

    RETURN_TYPE Test(const PARAMETER_TYPE& Params) {
    	std::lock_guard <std::mutex> lock(CStreamLock);
    	std::cout << Params.GetStr().c_str() << std::endl;
    	return true;
    }
    
    int main() {
    	std::cout << "Hello" << std::endl;
    	TaskQueue* Tasks = new TaskQueue();
    	DynByteArray bytes;
    	bytes.Add(std::string("str"));
    
    	for (int i = 0; i < 5; i++)
    	{
    		Tasks->AddTask(CountingEvent(&Test),bytes);
    
    	}
    
    	std::cin.get();
    	delete Tasks;
    
    	return 0;
    }
    

    Problematik:

    Einmal geht (immer?). Aber die anderen Aufgaben werden mehr oder weniger zufällig ausgeführt. Meistens bekomme ich ein "abort()" an den Kopf (was das heißt, weiß ich nicht). Oder einen Verweis in <mutex> und zwar in die unlock Funktion bei:

    int _Mtx_unlock(_Mtx_t *mtx)
    	{	/* unlock mutex */
    #if !(defined(_M_CEE))
    	_THREAD_ASSERT(1 <= (*mtx)->count
    		&& (*mtx)->thread_id == GetCurrentThreadId(),
    		"unlock of unowned mutex");
    #else
    	_THREAD_ASSERT(1 <= (*mtx)->count
    		&& _Thrd_equal((*mtx)->owner, thrd_current()),
    		"unlock of unowned mutex");
    #endif
    	if (--(*mtx)->count != 0)
    		;
    	else if (((*mtx)->type & ~_Mtx_recursive) == _Mtx_plain)
    		{
    #if !(defined(_M_CEE))
    		(*mtx)->cs.unlock();
    		(*mtx)->thread_id = -1; // hier
    #else
    		LeaveCriticalSection((*mtx)->hnd);
    #endif
    

    Sollte es das Programm doch mal bis zur Destruktion schaffen, dann bekomme ich ein "abort()" mit der Nachricht in der Console, die lautet "mutex destroyed while busy".
    Was vergurke ich denn hier so beim locken?



  • Es war ein Fehler in <mutex>
    Ich habe jetzt die Beta installiert und alles funktioniert wunderbar.
    Sowas hatte ich noch nie, dass ich mich auf die std Bibliothek verlassen konnte. (Naja ok, die ist ja auch Alpha gewesen in dem Fall)

    EDIT: oben am Code noch ein / zwei Änderung vorgenommen, da in Suche nach dem Fehler unsauber gemacht.
    EDIT 2: Noch eine kleine Sache geändert.


Anmelden zum Antworten