Java - Otkrivanje i upravljanje visećim nitima

Alex. C. Punnen

Arhitekt - Nokia Siemens Networks

Bangalore

Viseće niti su čest izazov u razvoju softvera koji se mora povezati s vlasničkim uređajima koristeći zaštićena ili standardizirana sučelja kao što su SNMP, Q3 ili Telnet. Ovaj problem nije ograničen na upravljanje mrežom, već se javlja u širokom rasponu polja kao što su web poslužitelji, procesi koji pozivaju pozive udaljenih procedura itd.

Niti koja pokreće zahtjev za uređajem potreban je mehanizam za otkrivanje u slučaju da uređaj ne reagira ili odgovori samo djelomično. U nekim slučajevima kada se otkrije takvo vješanje, mora se poduzeti određena radnja. Konkretna radnja može biti ili ponovno suđenje ili davanje obavijesti krajnjem korisniku o neuspjehu zadatka ili neka druga opcija oporavka. U nekim slučajevima kada komponenta mora pokrenuti velik broj zadataka prema velikom broju mrežnih elemenata, otkrivanje viseće niti je važno kako ne bi postalo usko grlo za obradu drugih zadataka. Dakle, dva su aspekta upravljanja visećim nitima: izvedba i obavijesti .

Za aspekt obavijesti možemo prilagoditi obrazac Java Observer tako da se uklapa u svijet s više niti.

Prilagođavanje uzorka promatrača Java multithreading sustavima

Zbog visećih zadataka, korištenje Java ThreadPoolklase sa strategijom uklapanja prvo je rješenje koje vam padne na pamet. Međutim, korištenje Jave ThreadPoolu kontekstu nekih niti koje slučajno vise tijekom određenog vremenskog razdoblja daje neželjeno ponašanje na temelju određene korištene strategije, poput gladovanja niti u slučaju strategije fiksnog spremišta niti. To je uglavnom zbog činjenice da Java ThreadPoolnema mehanizam za otkrivanje obješenja niti.

Mogli bismo isprobati spremište predmemorirane niti, ali i to ima problema. Ako postoji velika brzina pokretanja zadataka, a neke niti vise, broj niti mogao bi se povećati, što bi na kraju moglo uzrokovati izgladnjivanje resursa i iznimke izvan memorije. Ili bismo mogli koristiti prilagođenu ThreadPoolstrategiju koja poziva a CallerRunsPolicy. I u ovom slučaju, prekidanje niti moglo bi na kraju dovesti do obješenja svih niti. (Glavna nit nikada ne smije biti pozivalac, jer postoji mogućnost da bilo koji zadatak proslijeđen glavnoj niti može objesiti, zbog čega će se sve zaustaviti.)

Pa, koje je rješenje? Pokazat ću ne tako jednostavan obrazac ThreadPool koji prilagođava veličinu bazena prema stopi zadatka i na temelju broja visećih niti. Krenimo prvo na problem otkrivanja visećih niti.

Otkrivanje visećih niti

Slika 1 prikazuje apstrakciju uzorka:

Ovdje postoje dvije važne klase: ThreadManageri ManagedThread. Obje se protežu od Threadklase Java . ThreadManagerDrži posudu u kojoj se čuva ManagedThreads. Kada se stvori novi ManagedThread, dodaje se u ovaj spremnik.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

U ThreadManageriterira kroz ovaj popis i poziva ManagedThread„s isHung()metodom. Ovo je u osnovi logika provjere vremenskog žiga.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Ako utvrdi da je nit ušla u petlju zadataka i nikada nije ažurirala svoje rezultate, potreban je mehanizam oporavka kako je predviđeno ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Da bi se novi ManagedThreadmogao stvoriti i koristiti umjesto obješenog, ne bi trebao sadržavati stanje ili bilo koji spremnik. Za to treba spremnik na kojem ManagedThreadtreba odvojiti djela. Ovdje koristimo obrazac Singleton zasnovan na ENUM-u za zadržavanje popisa zadataka. Dakle, spremnik koji sadrži zadatke neovisan je o niti koja obrađuje zadatke. Kliknite sljedeću vezu da biste preuzeli izvor za opisani obrazac: Izvor Java Thread Manager.

Viseće niti i Java ThreadPool strategije

Java ThreadPoolnema mehanizam za otkrivanje visećih niti. Korištenje strategije kao što je fiksni threadpool ( Executors.newFixedThreadPool()) neće raditi, jer ako se neki zadaci s vremenom objese, sve će niti na kraju biti u visećem stanju. Druga opcija je korištenje predmemoriranih pravila ThreadPool (Executors.newCachedThreadPool()). To bi moglo osigurati da će uvijek biti dostupne niti za obradu zadatka, ograničene samo ograničenjima VM memorije, CPU-a i niti. Međutim, s ovom politikom nema kontrole broja niti koje se stvaraju. Bez obzira na to visi li niti za obradu ili ne, korištenje ove politike dok je brzina zadatka velika dovodi do stvaranja velikog broja niti. Ako nemate dovoljno resursa za JVM vrlo brzo, dostići ćete maksimalni prag memorije ili visoki CPU. Prilično je uobičajeno vidjeti kako broj niti doseže stotine ili tisuće. Iako se izdaju nakon što se zadatak obradi, ponekad će tijekom rafalnog rukovanja velik broj niti preplaviti sistemske resurse.

Treća opcija je korištenje prilagođenih strategija ili pravila. Jedna od takvih mogućnosti je imati spremište niti koje se skalira od 0 do nekog maksimalnog broja. Dakle, čak i da jedna nit visi, nova nit će se stvoriti sve dok je postignut maksimalni broj niti:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Ovdje je 3 maksimalni broj niti, a vrijeme održavanja uživo postavljeno je na 60 sekundi, jer je to proces koji zahtijeva puno zadataka. Ako damo dovoljno visok maksimalan broj niti, ovo je više-manje razumno pravilo koje se koristi u kontekstu vješanja. Jedini je problem u tome što ako se na kraju ne puste viseće niti, postoji mala vjerojatnost da bi svi niti mogli u nekom trenutku visjeti. Ako je maksimalan broj niti dovoljno visok i ako se pretpostavi da je obustavljanje zadatka rijedak fenomen, tada bi ova politika odgovarala računu.

Bilo bi slatko da je ThreadPoolimao i priključni mehanizam za otkrivanje visećih niti. Kasnije ću razgovarati o jednom takvom dizajnu. Ako su sve niti zamrznute, možete konfigurirati i koristiti politiku odbijenih zadataka spremišta niti. Ako ne želite odbaciti zadatke, morali biste koristiti CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

U ovom slučaju, ako je prekidanje niti uzrokovalo odbacivanje zadatka, taj će zadatak biti dodijeljen pozivajućoj niti kojom treba rukovati. Uvijek postoji šansa da taj zadatak previše visi. U ovom bi se slučaju cijeli proces zamrznuo. Stoga je bolje ne dodavati takvu politiku u ovom kontekstu.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Prilagođeni ThreadPool s otkrivanjem obješenja

Biblioteka spremišta niti s mogućnošću otkrivanja i rukovanja zadatcima bila bi sjajna. Razvio sam ga i demonstrirat ću ga u nastavku. Ovo je zapravo priključak iz C ++ spremišta niti koji sam dizajnirao i koristio neko vrijeme (vidi reference). U osnovi, ovo rješenje koristi obrazac naredbe i obrazac lanca odgovornosti. Međutim, implementirati uzorak naredbe u Javi bez pomoći podrške objekta Function pomalo je teško. Zbog toga sam morao malo promijeniti implementaciju kako bih koristio Java refleksiju. Imajte na umu da je kontekst u kojem je ovaj uzorak dizajniran bio gdje je trebalo ugraditi / priključiti spremište niti bez mijenjanja bilo koje postojeće klase.(Vjerujem da je jedna velika prednost objektno orijentiranog programiranja to što nam daje način da dizajniramo klase kako bismo učinkovito iskoristili otvoreni zatvoreni princip. To se posebno odnosi na složeni stari naslijeđeni kôd i možda je manje važan za razvoj novog proizvoda.) Stoga sam upotrijebio refleksiju umjesto sučelja za implementaciju naredbenog uzorka. Ostatak koda mogao bi se prenijeti bez većih promjena jer su gotovo svi primitivi za sinkronizaciju niti i signalizaciju dostupni u Javi 1.5 nadalje.Ostatak koda mogao bi se prenijeti bez većih promjena jer su gotovo svi primitivi za sinkronizaciju niti i signalizaciju dostupni u Javi 1.5 nadalje.Ostatak koda mogao bi se prenijeti bez većih promjena jer su gotovo svi primitivi za sinkronizaciju niti i signalizaciju dostupni u Javi 1.5 nadalje.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Primjer upotrebe ovog uzorka:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Sad ovdje imamo još dvije važne satove. Jedna je ThreadChainklasa koja provodi obrazac Lanac odgovornosti:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Ova klasa ima dvije glavne metode. Jedan je Booleov CanHandle()koji pokreće ThreadPoolklasa, a zatim nastavlja rekurzivno. Ovo provjerava je li trenutna nit (trenutna ThreadChaininstanca) slobodna za obradu zadatka. Ako već obrađuje zadatak, poziva sljedećeg u lancu.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Imajte na umu da HandleRequestje metoda metoda ThreadChainkoja se poziva iz Thread run()metode i čeka signal iz canHandlemetode. Također imajte na umu kako se sa zadatkom rješava naredba Command.