Speicherauslastung - Emails via Threads einlesen



  • ...ThreadPool verwenden....



  • shaun1981 schrieb:

    Dem ganzen spendiere ich pro Email einen eigenen Thread - der Performance wegen.

    beachte an der Stelle den Scheduler der hier mehr arbeit als nötig hat ... http://de.wikipedia.org/wiki/Scheduling ... wenn ein Thread angehalten wird, damit auf einem Kern ein anderer Thread weiter arbeiten kann hat die CPU Rechenaufwand um den Contextwechsel durchzuführen -> Performance ... bei Deinem Speicherverbrauch müssen da riesige Mengen an Mails auflaufen

    //Prüfen ob alle Threads fertig sind
    	while (threads->Count > 0)
    		{
    		for each (System::Threading::Thread^ t in threads)
    			{
    			if (t->IsAlive == false)
    				{
    				t->Abort ();
    				threads->Remove (t);
    				break;
    				}
    			}
    		}
    
    	//Alle Threads sind fertig
    

    aktives Warten verbraucht immer 100% CPU ... entweder eine Sleep einbauen oder gleich die Join() Methode verwenden - dafür ist sie gedacht

    für jede e-Mail baust Du eine neue Socket-Verbindung auf (Sinn der Sache) ... zum einen sollte da eigentlich die Firewall des e-Mail-providers rumzicken ... zum Anderem kann es Dir passieren das 2 Threads die gleiche e-Mail bearbeiten ... Du hast die zu bearbeitende e-Mail global gespeichert und übergibst sie nicht dem Thread ... damit kann jeder andere Thread die Nummer verfälschen ... vgl. Context-Wechsel (Scheduling) und Race-Conditions -> http://de.wikipedia.org/wiki/Race-Condition

    im Übrigen ist das langsamste Deine Internet-Verbindung ... wenn Du nur 2MBit hast, wirst Du auch nicht mehr als 2MBit ziehen können ... selbst wenn Du rechnerisch mit den Threads 100 MBit kannst ... 2 oder 3 Threads bringen etwas, wenn Du anschließend die e-Mails noch ernsthaft bearbeitest ... ansonsten macht das keinen Sinn

    Hier gibt es aber das Problem, das nach dem Rücksprung mein Arbeitsspeicher total überfüllt ist. Vor der ersten Funktion sind es etwa 729MB Auslastung - hinterher in etwa 1,8GB. Auch nach beenden der beiden Funktionen ändert sich das nicht mehr - bis ich das Programm beende.

    das Problem löst sich von alleine 😉

    hand, mogel



  • für jede e-Mail baust Du eine neue Socket-Verbindung auf (Sinn der Sache) ... zum einen sollte da eigentlich die Firewall des e-Mail-providers rumzicken ... zum Anderem kann es Dir passieren das 2 Threads die gleiche e-Mail bearbeiten ... Du hast die zu bearbeitende e-Mail global gespeichert und übergibst sie nicht dem Thread ... damit kann jeder andere Thread die Nummer verfälschen ... vgl. Context-Wechsel (Scheduling) und Race-Conditions -> http://de.wikipedia.org/wiki/Race-Condition

    Zu 1: Eine Socket Verbindung pro Email brachte mir hier ein erhebliches Zeitguthaben. Daran, dass ein Server mehrere Verbindungen nicht akzeptiert - habe ich nicht gedacht.

    Zu 2: Warum sollte es passieren, dass zwei Threads die gleiche Email bearbeiten? Ich übergebe hier doch via Paramter die EmailNr:

    thread->Start (durchgang);
    

    Zu 3: welche Email soll ich denn dem Thread übergeben ? Sie wird doch aus dem Thread heraus abgerufen und in eine globale LIST gespeichert.

    Die Sache mit dem ThreadPool werde ich mir gleich mal zu Herzen nehmen. Deine INFO Links natürlich auch.

    Eigentlich glaube ich, dass ich nicht weit genug nach vorn geplant habe. Die Struktur, in welche die Daten der Emails geparst werden - scheint mit das Problem zu sein. Diese sieht so aus ...

    public ref struct EmailInfoStrukt
    	{
    	String^ quellcode;
    	String^ header;
    	String^ body;
    	String^ hauptboundaryID;
    
    	String^ messageid;
    	String^ absender;
    	String^ datum;
    	String^ betreff;
    	String^ charset;	
    	String^ transferencoding;
    	String^ emailtext;
    
    	bool HTMLAvaible;
    	bool HTMLOnly;
    	bool isDecoded;
    	bool qpDecoding;
    	String^ format;
    
    	System::Collections::Generic::List<String^>^ HTMLboundarys;
    	System::Collections::Generic::List<String^>^ boundarys;
    	System::Collections::Generic::List<Object^>^ anhänge;
    	};
    

    Wobei die LIST Anhänge später ein array von Streams enthällt. Da hab ich meinen Speicherfresser - denke ich. Das ganze landet nämlich momentan so wie es ist im RAM.



  • shaun1981 schrieb:

    Zu 2: Warum sollte es passieren, dass zwei Threads die gleiche Email bearbeiten? Ich übergebe hier doch via Paramter die EmailNr:

    da habe ich mich verlesen - passt ja soweit

    Zu 3: welche Email soll ich denn dem Thread übergeben ? Sie wird doch aus dem Thread heraus abgerufen und in eine globale LIST gespeichert.

    wenn Du Dich auf das Ende des Wartens beziehst

    for each (System::Threading::Thread^ t in threads) t->Join();
    

    Eigentlich glaube ich, dass ich nicht weit genug nach vorn geplant habe. Die Struktur, in welche die Daten der Emails geparst werden - scheint mit das Problem zu sein. Diese sieht so aus ...

    da würde ich "generischer" ran gehen ... die Header würde ich alle in einer HashTable speichern ... gerade mit den "X-" Headern gibt es keine Definitionen ... wenn Du es mit einer HashTable machst kannst Du immer schön den Header auf den Wert abbilden ... "From:" -> "hein.blöd@blaubär.org"

    dann kannst Du noch entsprechende Methoden/RO Properties erzeugen die Dir die Standard-Header leifern ... From/To/Subject etc. ... ähnliches Verfahren natürlich für den Body bzw. die Anhänge



  • da würde ich "generischer" ran gehen ... die Header würde ich alle in einer HashTable speichern ... gerade mit den "X-" Headern gibt es keine Definitionen ... wenn Du es mit einer HashTable machst kannst Du immer schön den Header auf den Wert abbilden ... "From:" -> "hein.blöd@blaubär.org"

    Könntest du mir hierzu ein grobes Beispiel nennen ? Leider fällt mir das theoretische verstehen über WIKI u. Co. recht schwer. Und was genau meinst du damit, das es mit den X Headern keine Definition gibt ?



  • shaun1981 schrieb:

    Könntest du mir hierzu ein grobes Beispiel nennen ? Leider fällt mir das theoretische verstehen über WIKI u. Co. recht schwer.

    na generell ist ein Header wie folgt aufgebaut

    Header1: value1
    Header2: value2
    

    da jede Zeile parsen und in eine HashTable packen

    Key1: value1
    Key2: value2
    
    public String getHeader(String header)
    {
        return headers[header]; // ungetestet
    }
    // bzw. für die üblichen Verdächtigen
    public String getFrom()
    {
        return headers["From"];
    }
    

    Und was genau meinst du damit, das es mit den X Headern keine Definition gibt ?

    weil ein X-Header nur aussagt das er da ist ... diese Header werden meist von Server bzw. Mailern benutzt

    X-Server-Info: alles klar?
    X-Server-Date: vorgestern
    

    Du kannst solche Header selber setzen und beim Empfänger (sofern der Mailer die versteht) auswerten lassen



  • Jochen Kalmbach schrieb:

    ...ThreadPool verwenden....

    Nochmal dazu:

    Warum steigt die Systemauslastung hierbei ins bodelose ?

    System::Threading::ThreadPool^ tp;
    		tp->SetMaxThreads (6, 6); 
    
    		while (durchgang < neueemails + 1)
    				{			
    				tp->QueueUserWorkItem (gcnew System::Threading::WaitCallback (this, &email::TgetEmailsOverPOP3), durchgang);			
    				}
    
    		return emailbuffer;
    

    Oder habe ich hier etwas falsch verstanden ?



  • OK ... wieder mal mein fehler.

    durchgang++;
    

    Da hab ich dann wohl wieder eines meiner häufigen Problemchen. Der Emailbuffer bleibt beim verlassen leer. Die Threads scheinen also noch nicht ausgeführt worden zu sein. Wenn ich es mir Recht überlege ... verständlich.



  • Ok ... nun bin ich soweit:

    protected:	array<System::Threading::WaitHandle^>^ handles;
    
    ...
    ...
    //Bei benutzung eines ThreadPools
    	else if (useThreadPool == true)
    		{
    		System::Threading::ThreadPool^ tp;
    
    		handles = gcnew array<System::Threading::WaitHandle^> (neueemails);
    
    		tp->SetMaxThreads (5, 5);		
    
    		while (durchgang < neueemails + 1)
    				{
    				handles[durchgang-1] = gcnew System::Threading::AutoResetEvent (false);
    				tp->QueueUserWorkItem (gcnew System::Threading::WaitCallback (this, &email::TgetEmailsOverPOP3), gcnew array<Object^,1> {handles[durchgang-1], durchgang });	
    				durchgang++;
    				}
    
    		//Auf alle Handles warten
    		System::Threading::WaitHandle::WaitAll(handles);
    
    		return emailbuffer;
    
    void emailclass::email::TgetEmailsOverPOP3 (Object^ objarray)
    	{
    	int emailnr;
    	String^ bigbufferstring = "";
    	int bytepos = 0;
    	int paket	= 0;
    
    	//ThreadPool test
    	System::Threading::AutoResetEvent^ autoReset;
    
    	if (useThreadPool == true)
    		{
    		autoReset = (System::Threading::AutoResetEvent^) ( safe_cast<array<Object^,1>>(objarray) )[0];
    		emailnr = (int)( safe_cast<array<Object^,1>>(objarray) )[1];
    		}
    
    	else emailnr = (int) objarray;	
    
    	//neuen BigBuffer erstellen
    	array<Byte>^ bigbuffer	= getNewBuffer ();
    
    	//neuen Socket erstellen
    	System::Net::Sockets::Socket^ socket = getNewSocket ();
    
    	//neuen Socket verbinden - und einloggen
    	if ( connectSocket (socket) == false) return;
    	if (POP3login (socket) == false) return;
    
    	//Login OK - Befehl zum abholen der n-ten Email senden			
    	array<Byte>^ retr  = Encoding::ASCII->GetBytes	("RETR"+" "+safe_cast<int>(emailnr)+"\r\n");			
    	socket->Send ( retr, retr->Length, SocketFlags::None);
    
    	//BytePos und Paketgröße aktualisieren
    	bytepos = 0;
    	paket = socket->Available;
    
    	//Antwort in Bigbuffer holen
    	do
    		{
    		do
    			{
    			bytepos += socket->Receive (bigbuffer, bytepos, paket, SocketFlags::None);
    			paket = socket->Available;
    			}
    			while (socket->Available); 
    		}				
    		while (socket->Poll (socketPollTime, SelectMode::SelectRead) && socket->Available);
    
    	//Keine Daten zum Lesen mehr verfügbar		
    	if (socket->Available == 0) emailbuffer->Add ( Encoding::ASCII->GetString (bigbuffer, 0, bigbuffer->Length) );
    
    	//Socket wird geschlossen
    	socket->Close ();	
    
    	//ThreadPool Signal
    	if (useThreadPool == true) autoReset->Set ();
    	}
    

    Irgendwelche Vorschläge zum Optimieren ? (Die Auslastung ist dennoch sehr hoch ... obwohl ich - die Anhänge an sich - nicht weiterverarbeite. Diese werden bei bedarf von der Platte eingelesen.



  • Warum benutzt Du nicht einfach das APM (BeginXXX(..), EndXXX(..)) ??
    Simon


Anmelden zum Antworten