Programavimas

„Java“ - pakabinamų siūlų aptikimas ir tvarkymas

Autorius Aleksas. C. Punnenas

Architektas - „Nokia Siemens Networks“

Bangalore

Kabančios gijos yra dažnas iššūkis kuriant programinę įrangą, kuri turi sąsają su nuosavais įrenginiais, naudojant nuosavybės teises arba standartizuotas sąsajas, tokias kaip SNMP, Q3 ar Telnet. Ši problema neapsiriboja tinklo valdymu, bet kyla įvairiuose laukuose, pvz., Žiniatinklio serveriuose, procesuose, kuriuose iškviečiami nuotoliniai procedūrų skambučiai ir pan.

Gijai, kuri inicijuoja užklausą įrenginiui, reikia mechanizmo, kuris galėtų aptikti, jei įrenginys nereaguoja arba reaguoja tik iš dalies. Kai kuriais atvejais, kai nustatomas toks pakibimas, reikia imtis konkrečių veiksmų. Konkretus veiksmas gali būti pakartotinis bylos nagrinėjimas arba informavimas galutiniam vartotojui apie užduoties gedimą arba kita atkūrimo parinktis. Kai kuriais atvejais, kai komponentas turi atlikti daugybę užduočių daugeliui tinklo elementų, svarbu nustatyti gijų aptikimą, kad tai netaptų kliūtimi kitoms užduotims apdoroti. Taigi yra du kabančių siūlų valdymo aspektai: spektaklis ir pranešimas.

pranešimo aspektas galime pritaikyti „Java Observer“ modelį, kad jis tilptų į daugiasriegį pasaulį.

„Java Observer“ modelio pritaikymas daugiasriegėms sistemoms

Dėl pakabinamų užduočių, naudojant „Java“ „ThreadPool“ klasė su tinkama strategija yra pirmasis sprendimas, kuris ateina į galvą. Tačiau naudojant „Java“ „ThreadPool“ Kai kurių gijų, atsitiktinai pakabintų per tam tikrą laikotarpį, kontekste atsiranda nepageidaujamas elgesys, pagrįstas tam tikra naudojama strategija, pavyzdžiui, siūlų badavimas fiksuoto sriegio telkinių strategijos atveju. Tai daugiausia lemia tai, kad „Java“ „ThreadPool“ neturi mechanizmo aptikti siūlų pakibimą.

Galėtume išbandyti „Cached thread“ baseiną, tačiau jis taip pat turi problemų. Jei užduočių vykdymas yra didelis ir kai kurios gijos pakimba, gijų skaičius gali išaugti, galiausiai sukeldamas išteklių badą ir išimtis iš atminties. Arba galėtume naudoti „Custom“ „ThreadPool“ strategija, pasitelkiant a „CallerRunsPolicy“. Šiuo atveju taip pat gali būti, kad visi siūlai gali pakibti dėl siūlų pakabinimo. (Pagrindinė gija niekada neturėtų būti skambinantysis, nes yra tikimybė, kad bet kuri pagrindinei gijai perduota užduotis gali pakibti, todėl viskas sustos.)

Taigi, koks yra sprendimas? Aš pademonstruosiu nelabai paprastą „ThreadPool“ modelį, kuris koreguoja baseino dydį pagal užduoties greitį ir pagal pakabinamų siūlų skaičių. Pirmiausia pereikime prie pakabinamų siūlų aptikimo problemos.

Kabančių siūlų aptikimas

1 paveiksle parodyta modelio abstrakcija:

Čia yra dvi svarbios klasės: „ThreadManager“ ir „ManagedThread“. Abu jie tęsiasi nuo „Java“ Siūlas klasė. „ThreadManager“ talpina konteinerį, kuriame laikoma „ManagedThreads“. Kai naujas „ManagedThread“ yra sukurta, ji pati įtraukia į šį konteinerį.

 ThreadHangTester testthread = naujas ThreadHangTester ("threadhangertest", 2000, klaidingas); testthread.start (); thrdManger.manage (testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

„ThreadManager“ kartojasi per šį sąrašą ir iškviečia „ManagedThread“'s isHung () metodas. Iš esmės tai yra laiko žymės tikrinimo logika.

 if (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Siūlai pakabinti"); grįžti tiesa; } 

Jei nustatoma, kad gija pateko į užduočių ciklą ir niekada neatnaujino savo rezultatų, reikia atkūrimo mechanizmo, kaip nurodyta „ManageThread“.

 while (isRunning) {for (Iterator iterator = managedThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Aptikta gijos pakaba dėl ThreadName =" + thrddata.getManagedThread (). getName ()); jungiklis (thrddata.getManagedAction ()) {atvejis RESTART_THREAD: // Čia reikia iš naujo paleisti giją // pašalinti iš vadybininko iterator.remove (); // jei įmanoma, sustabdykite šios gijos apdorojimą thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // Žinoti, kokio tipo siūlus sukurti {ThreadHangTester newThread = new ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Sukurti naują giją newThread.start (); // pridėkite jį atgal į valdomą valdymą (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } pertrauka; ......... 

Už naują „ManagedThread“ kad būtų galima sukurti ir naudoti pakabintą, jame neturėtų būti jokių būsenų ar konteinerių. Tam reikalingas konteineris, ant kurio „ManagedThread“ aktai turėtų būti atskirti. Čia mes naudojame ENUM pagrįstą „Singleton“ modelį užduočių sąrašui laikyti. Taigi užduotis turintis konteineris yra nepriklausomas nuo gijos, apdorojančios užduotis. Spustelėkite šią nuorodą, kad atsisiųstumėte aprašyto modelio šaltinį: „Java Thread Manager Source“.

„Hanging Threads“ ir „Java ThreadPool“ strategijos

„Java“ „ThreadPool“ neturi pakabinamų siūlų aptikimo mechanizmo. Naudojant tokią strategiją kaip fiksuotas „threadpool“Executors.newFixedThreadPool ()) neveiks, nes jei kai kurios užduotys laikui bėgant pakimba, visos gijos galiausiai bus pakabintos. Kita galimybė yra naudoti talpykloje saugomą „ThreadPool“ politiką („Executors.newCachedThreadPool“ ()). Tai galėtų užtikrinti, kad užduočiai apdoroti visada bus prieinamos gijos, kurias ribos tik VM atmintis, procesorius ir gijų apribojimai. Tačiau naudojant šią politiką negalima kontroliuoti sukuriamų gijų skaičiaus. Nepaisant to, ar apdorojimo gija pakimba, ar ne, naudojant šią politiką, kai užduočių rodiklis yra didelis, sukuriama daugybė gijų. Jei neturite pakankamai išteklių JVM labai greitai, pasieksite maksimalų atminties slenkstį arba didelį procesorių. Gana dažnai galima pamatyti šimtų ar tūkstančių siūlų skaičių. Nors jie išleidžiami apdorojus užduotį, kartais sprogimo metu didelis gijų skaičius užvaldys sistemos išteklius.

Trečia galimybė yra naudoti pasirinktines strategijas ar strategijas. Viena iš tokių galimybių yra turėti siūlų grupę, kuri skalės nuo 0 iki maksimalaus skaičiaus. Taigi, net jei vienas siūlas pakabintas, naujas siūlas bus sukurtas tol, kol bus pasiektas maksimalus siūlų skaičius:

 execexec = new ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, nauja SynchronousQueue ()); 

Čia 3 yra maksimalus siūlų skaičius ir gyvavimo laikas yra nustatytas 60 sekundžių, nes tai yra daug užduočių reikalaujantis procesas. Jei pateiksime pakankamai didelį maksimalų siūlų skaičių, tai daugmaž pagrįsta politika, taikoma pakabinamų užduočių kontekste. Vienintelė problema yra ta, kad jei pakabinami siūlai galų gale nebus atleisti, yra nedidelė tikimybė, kad visi siūlai tam tikru momentu gali pakibti. Jei maksimalus siūlų kiekis yra pakankamai didelis ir darant prielaidą, kad užduočių atlikimas yra retas reiškinys, ši politika atitiktų sąskaitą.

Būtų buvę miela, jei „ThreadPool“ taip pat turėjo pakabinamą siūlų aptikimo mechanizmą. Vėliau aptarsiu vieną tokį dizainą. Žinoma, jei visos gijos yra užšaldytos, galite sukonfigūruoti ir naudoti gijų grupės atmetamų užduočių politiką. Jei nenorite išmesti užduočių, turėtumėte naudoti „CallerRunsPolicy“:

 execexec = new ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue () new ThreadPoolExecutor.CallerRunsPolicy ()); 

Tokiu atveju, jei gijos pakabinimas sukeltų užduoties atmetimą, ta užduotis būtų suteikta iškviečiamai gijai, kuri bus tvarkoma. Visada yra tikimybė, kad ta užduotis taip pat pakimba. Tokiu atveju visas procesas sustingtų. Taigi geriau nepridėti tokios politikos šiame kontekste.

 viešosios klasės „NotificationProcessor“ įgyvendina „Runnable“ {privatų galutinį „NotificationOriginator“ pranešimo organizatorių; loginė reikšmė isRunning = true; privatus galutinis „ExecutorService execexec“; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // Per daug siūlų // execexec = Executors.newFixedThreadPool (2); //, nėra pakabinimo užduočių aptikimo execexec = new ThreadPoolExor , 250, TimeUnit.MILLISECONDS, nauja SynchronousQueue (), nauja ThreadPoolExecutor.CallerRunsPolicy ()); } public void run () {while (isRunning) {try {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = new Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (new OctetString (), // Užduoties apdorojimas nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

Pasirinktinis „ThreadPool“ su pakabos aptikimu

Būtų puiku turėti „thread-pool“ biblioteką su užduočių pakabinimo aptikimo ir tvarkymo galimybėmis. Sukūriau vieną ir aš tai pademonstruosiu žemiau. Tai iš tikrųjų yra portas iš C ++ gijų telkinio, kurį suprojektavau ir panaudojau kurį laiką (žr. Nuorodas). Iš esmės šiame sprendime naudojamas „Command“ modelis ir „Atsakomybės grandinės“ modelis. Tačiau šiek tiek sunku įgyvendinti „Command“ šabloną „Java“ be „Function“ objektų palaikymo. Tam turėjau šiek tiek pakeisti įgyvendinimą, kad galėčiau naudoti „Java“ atspindį. Atkreipkite dėmesį, kad kontekstas, kuriame buvo sukurtas šis modelis, buvo toks, kad sriegių telkinys turėjo būti įmontuotas / prijungtas nekeičiant nė vienos iš esamų klasių. (Manau, kad vienas didelis objektinio programavimo pranašumas yra tas, kad jis suteikia mums galimybę kurti klases, kad galėtume efektyviai naudoti atvirojo uždaro principą. Tai ypač pasakytina apie sudėtingą seną seną kodą ir gali būti mažiau aktualus naujo produkto kūrimas.) Todėl „Command“ šablonui įgyvendinti vietoj sąsajos naudojau refleksiją. Likusią kodo dalį galima perkelti be didesnių pakeitimų, nes beveik visi siūlų sinchronizavimo ir signalizavimo primityviai yra prieinami „Java 1.5“ ir toliau.

 viešoji klasė Komanda {privatus objektas [] argParameter; ........ // Metodo su dviem argais komanda Ctor (T pObj, String methodName, long timeout, String key, int arg1, int arg2) {m_objptr = pObj; m_metodasName = mthodName; m_timeout = skirtasis laikas; m_key = raktas; argParameter = naujas objektas [2]; argParametras [0] = arg1; argParametras [1] = arg2; } // iškviečia objekto metodą void execute () {Class klass = m_objptr.getClass (); Klasė [] paramTypes = nauja klasė [] {vid.klasė, tarpt.}; pabandykite {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Found the method -> "+ methodName); if (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Šio modelio naudojimo pavyzdys:

 viešoji klasė „CTask“ {.. public int DoSomething (int a, int b) {...}} 

Komanda cmd4 = nauja komanda (4 užduotis, „DoMultiplication“, 1, „key2“, 2,5);

Dabar čia turime dar dvi svarbias klases. Vienas yra „ThreadChain“ klasė, įgyvendinanti atsakomybės grandinės modelį:

 viešoji klasė „ThreadChain“ įgyvendina „Runnable“ {public ThreadChain („ThreadChain p“, „ThreadPool pool“, „String name“) {AddRef (); deleteMe = klaidinga; užimtas = klaidingas; // -> labai svarbu kitas = p; // nustatykite gijų grandinę - atkreipkite dėmesį, kad tai panašu į susietą sąrašą impl threadpool = pool; // nustatyti gijų telkinį - gijos šaknis ........ threadId = ++ ThreadId; ...... // pradėkite giją thisThread = new Thread (this, name + inttid.toString ()); thisThread.start (); } 

Ši klasė turi du pagrindinius metodus. Vienas yra Būlio Gali susitvarkyti() kurį inicijuoja „ThreadPool“ klasę ir tada vyksta rekursiškai. Tai patikrina, ar dabartinė gija (dabartinė „ThreadChain“ egzempliorius) gali laisvai tvarkyti užduotį. Jei ji jau tvarko užduotį, ji paskambina kitai grandinėje.

 public Boolean canHandle () {if (! busy) {// Jei neužimta System.out.println ("Gali tvarkyti šį įvykį ID =" + threadId); // kad signalizuotumėte apie įvykį, pabandykite {condLock.lock (); condWait.signal (); // Signalizuokite „HandleRequest“, kuris to laukia vykdymo metodu .................................... ..... grįžti tiesa; } ......................................... /// Kitaip pažiūrėkite, ar kitas grandinėje esantis objektas yra laisvas ///, kad būtų galima tvarkyti užklausos grąžinimą. canHandle (); 

Atkreipkite dėmesį, kad „HandleRequest“ yra metodas „ThreadChain“ kad yra iškviečiamas iš Gijos eiga () metodo ir laukia signalo iš gali susitvarkyti metodas. Taip pat atkreipkite dėmesį, kaip užduotis tvarkoma naudojant „Command“ modelį.

$config[zx-auto] not found$config[zx-overlay] not found