Programavimas

Sukurtas realiuoju laiku: didelis duomenų perdavimas naudojant „Apache Kafka“, 2 dalis

Pirmoje šios „Apache Kafka“ įvado „JavaWorld“ pusėje jūs sukūrėte keletą mažų gamintojų / vartotojų programų, naudodamiesi „Kafka“. Iš šių pratimų turėtumėte žinoti pagrindinius „Apache Kafka“ pranešimų sistemos pagrindus. Šioje antroje pusėje sužinosite, kaip naudoti skaidinius, kad paskirstytumėte apkrovą ir horizontaliai išplėstumėte savo programą, tvarkydami iki milijonų pranešimų per dieną. Jūs taip pat sužinosite, kaip „Kafka“ naudoja pranešimų poslinkius, kad stebėtų ir valdytų sudėtingą pranešimų apdorojimą, ir kaip apsaugoti „Apache Kafka“ pranešimų sistemą nuo gedimo, jei vartotojas nusileis. Mes sukursime programos pavyzdį iš 1 dalies, kad galėtume naudoti „public-subscribe“ ir „point-to-point“ naudojimo atvejus.

Pertvaros Apache Kafkoje

„Kafka“ temas galima suskirstyti į skaidinius. Pvz., Kurdami temą, pavadintą „Demo“, galite sukonfigūruoti ją, kad būtų trys skaidiniai. Serveris sukūrė tris žurnalo failus, po vieną kiekvienam demonstraciniam skaidiniui. Kai gamintojas paskelbė pranešimą tema, jis priskyrė šiam pranešimui skirsnio ID. Tada serveris pridės pranešimą tik to skaidinio žurnalo faile.

Jei tada pradėjote du vartotojus, serveris gali priskirti 1 ir 2 skaidinius pirmajam vartotojui, o 3 - antram vartotojui. Kiekvienas vartotojas skaitytų tik iš jam priskirtų pertvarų. Demonstravimo temą, sukonfigūruotą trims skaidiniams, galite pamatyti 1 paveiksle.

Norėdami išplėsti scenarijų, įsivaizduokite „Kafka“ klasterį su dviem brokeriais, esančiais dviejose mašinose. Kai suskirstėte demonstracinę temą, ją sukonfigūruosite taip, kad būtų du skaidiniai ir dvi kopijos. Tokio tipo konfigūracijai „Kafka“ serveris priskirs du skaidinius dviem jūsų grupės tarpininkams. Kiekvienas brokeris būtų vienos iš pertvarų lyderis.

Kai prodiuseris paskelbė pranešimą, jis atiteko skaidinio vadovui. Vedėjas paėmė pranešimą ir pridėjo jį prie vietinio kompiuterio žurnalo failo. Antrasis brokeris pasyviai atkartotų tą įsipareigojimų žurnalą savo kompiuteryje. Jei skaidinio vadovas sumažėtų, antrasis tarpininkas taptų naujuoju vadovu ir pradėtų teikti klientų užklausas. Lygiai taip pat, kai vartotojas išsiuntė užklausą į skaidinį, ši užklausa pirmiausia atiteks skaidinio vadovui, kuris grąžins prašomus pranešimus.

Skirstymo nauda

Apsvarstykite „Kafka“ pagrindu veikiančios pranešimų sistemos skaidymo pranašumus:

  1. Mastelis: Sistemoje, kurioje yra tik vienas skaidinys, tema paskelbti pranešimai saugomi žurnalo faile, esančiame vienoje mašinoje. Temos pranešimų skaičius turi tilpti į vieną įsipareigojimų žurnalo failą, o saugomų pranešimų dydis niekada negali būti didesnis nei tos mašinos vietos diske. Padaliję temą galite išplėsti savo sistemą, saugodami pranešimus skirtingose ​​grupių mašinose. Pavyzdžiui, jei norite išsaugoti 30 gigabaitų (GB) pranešimų temai „Demo“, galite sukurti „Kafka“ grupę iš trijų mašinų, kurių kiekvienoje yra 10 GB vietos diske. Tada sukonfigūruosite temą, kad būtų trys skaidiniai.
  2. Serverio ir apkrovos balansavimas: Turėdami kelis skaidinius galite platinti pranešimų užklausas tarpininkams. Pvz., Jei turite temą, kuri apdorojo milijoną pranešimų per sekundę, galite ją padalyti į 100 skaidinių ir pridėti 100 brokerių prie savo grupės. Kiekvienas brokeris būtų vieno skaidinio lyderis, atsakingas už atsakymą tik į 10 000 klientų užklausų per sekundę.
  3. Vartotojo ir apkrovos balansavimas: Panašus į serverio ir apkrovos balansavimą, daugelio vartotojų priėmimas skirtingose ​​mašinose leidžia paskirstyti vartotojų apkrovą. Tarkime, kad norėjote suvartoti 1 milijoną pranešimų per sekundę iš temos su 100 skaidinių. Galite sukurti 100 vartotojų ir juos valdyti lygiagrečiai. „Kafka“ serveris kiekvienam vartotojui priskyrė po vieną skaidinį, o kiekvienas vartotojas lygiagrečiai apdorojo 10 000 pranešimų. Kadangi Kafka kiekvieną skaidinį priskiria tik vienam vartotojui, jame kiekvienas pranešimas bus tvarkingas.

Du skaidymo būdai

Gamintojas yra atsakingas už sprendimą, į kurią sritį pateks pranešimas. Gamintojas turi dvi galimybes kontroliuoti šią užduotį:

  • Pasirinktinis skaidiklis: Galite sukurti klasę, įgyvendinančią org.apache.kafka.clients.producer.Partitioner sąsaja. Šis paprotys Skirstytuvas įgyvendins verslo logiką, kad nuspręstų, kur siunčiami pranešimai.
  • „DefaultPartitioner“: Jei nekuriate pasirinktinio skaidinio klasės, tada pagal numatytuosius nustatymus org.apache.kafka.clients.producer.internals.DefaultPartitioner bus naudojama klasė. Numatytasis skaidytuvas yra pakankamai geras daugeliu atvejų, pateikdamas tris parinktis:
    1. Rankinis: Kai sukursite ProducerRecord, naudokite perkrautą konstruktorių naujas „ProducerRecord“ (topicName, partitionId, messageKey, message) norėdami nurodyti skaidinio ID.
    2. Maišymas (vietovei jautrus): Kai sukursite ProducerRecord, nurodykite a pranešimasKey, paskambinę naujas „ProducerRecord“ (topicName, messageKey, message). „DefaultPartitioner“ naudos rakto maišų, kad visi to paties rakto pranešimai būtų skirti tam pačiam gamintojui. Tai lengviausias ir dažniausiai pasitaikantis požiūris.
    3. Purškimas (atsitiktinis apkrovos balansavimas): Jei nenorite kontroliuoti, į kuriuos skirsnio pranešimus eikite, tiesiog paskambinkite naujas „ProducerRecord“ (topicName, pranešimas) sukurti savo ProducerRecord. Šiuo atveju skaidinys išsiųs pranešimus visiems skaidiniams apybraiška, užtikrindamas subalansuotą serverio apkrovą.

„Apache Kafka“ programos skaidymas

1 dalyje pateikiamam paprastam gamintojo / vartotojo pavyzdžiui naudojome a „DefaultPartitioner“. Dabar bandysime sukurti pasirinktinį skaidiklį. Tarkime, kad šiame pavyzdyje turime mažmeninės prekybos svetainę, kurią vartotojai gali naudoti norėdami užsisakyti produktus bet kurioje pasaulio vietoje. Pagal naudojimą mes žinome, kad dauguma vartotojų yra JAV arba Indijoje. Mes norime padalyti savo paraišką siųsti užsakymus iš JAV ar Indijos savo atitinkamiems vartotojams, o užsakymai iš bet kur kitur bus skirti trečiajam vartotojui.

Norėdami pradėti, sukursime „CountryPartitioner“ kad įgyvendina org.apache.kafka.clients.producer.Partitioner sąsaja. Turime įgyvendinti šiuos metodus:

  1. Kafka paskambins konfigūruoti () kai inicijuosime Skirstytuvas klasė, su a Žemėlapis konfigūracijos savybių. Šis metodas inicijuoja funkcijas, susijusias su programos verslo logika, pvz., Prisijungimą prie duomenų bazės. Šiuo atveju mes norime gana bendro skirsnio, kuris užima valstybės pavadinimas kaip nuosavybė. Tada galime naudoti configProperties.put ("pertvaros.0", "JAV") suskirstyti pranešimų srautą į skaidinius. Ateityje šį formatą galime naudoti norėdami pakeisti, kurios šalys gauna savo skaidinį.
  2. Prodiuseris API skambučiai skaidinys () po vieną kartą už kiekvieną pranešimą. Tokiu atveju mes jį naudosime skaitydami pranešimą ir analizuodami šalies pavadinimą iš pranešimo. Jei šalies pavadinimas yra countryToPartitionMap, jis grįš partitionId saugomi Žemėlapis. Jei ne, ji maišo šalies vertę ir naudos ją apskaičiuodama, į kurį skaidinį ji turėtų patekti.
  3. Mes skambiname Uždaryti() uždaryti skaidinį. Naudojant šį metodą užtikrinama, kad visi inicializacijos metu gauti ištekliai bus išvalomi išjungimo metu.

Atkreipkite dėmesį, kad kai Kafka skambina konfigūruoti (), „Kafka“ gamintojas perduos visas savybes, kurias gamintojui sukonfigūravome Skirstytuvas klasė. Būtina perskaityti tik tas savybes, kurios prasideda pertvaros., juos išanalizuokite, kad gautumėte partitionIdir išsaugokite ID countryToPartitionMap.

Žemiau yra mūsų pritaikytas Skirstytuvas sąsaja.

Sąrašas 1. „CountryPartitioner“

 viešoji klasė „CountryPartitioner“ įgyvendina „Partitioner“ {privatų statinį žemėlapį countryToPartitionMap; public void configure (Žemėlapio konfigūracijos) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = naujas HashMap (); už (Map.Entry įrašas: configs.entrySet ()) {if (entry.getKey (). startsWith ("skaidiniai.")) {String keyName = entry.getKey (); Stygos reikšmė = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Sveikasis skaičius.parseInt (raktoVardas.pavedimas (11)); countryToPartitionMap.put (reikšmė, paritionId); }}} public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {Sąrašas pertvarų = cluster.availablePartitionsForTopic (tema); String valueStr = (String) reikšmė; String countryName = ((String) reikšmė) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Jei šalis susieta su konkrečiu skaidiniu, grąžinkite ją countryToPartitionMap.get (countryName); } else {// Jei nė viena šalis nėra susieta su tam tikru skaidiniu, paskirstykite likusius skaidinius int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} public void close () {}} 

Prodiuseris 2 sąrašo klasė (žemiau) yra labai panaši į mūsų paprastą gamintoją iš 1 dalies, du pakeitimai paryškinti:

  1. Mes nustatome konfigūracijos ypatybę, kurios raktas yra lygus reikšmei „ProducerConfig.PARTITIONER_CLASS_CONFIG“, kuris atitinka visiškai kvalifikuotą mūsų vardą „CountryPartitioner“ klasė. Mes taip pat nustatėme valstybės pavadinimas į partitionId, taip atvaizduodami savybes, kurioms norime perduoti „CountryPartitioner“.
  2. Mes praleidome klasės diegimo pavyzdį org.apache.kafka.clients.producer.Callback sąsaja kaip antrasis argumentas į producer.send () metodas. „Kafka“ klientas paskambins jai onCompletion () metodas, kai pranešimas bus sėkmingai paskelbtas, pridedant a „RecordMetadata“ objektas. Galėsime naudoti šį objektą norėdami sužinoti, kuriam skaidiniui buvo išsiųstas pranešimas, taip pat paskelbtam pranešimui priskirtą poslinkį.

Sąrašas 2. Padalytas gamintojas

 public class Producer {private static Scanner in; public static void main (String [] argv) išmeta išimtį {if (argv.length! = 1) {System.err.println ("Nurodykite 1 parametrą"); System.exit (-1); } Styginių temaName = argv [0]; in = naujas skaitytuvas (System.in); System.out.println ("Įveskite pranešimą (įveskite exit, jei norite išeiti)"); // Konfigūruokite gamintojo ypatybes configProperties = new Properties (); configProperties.put („ProducerConfig.BOOTSTRAP_SERVERS_CONFIG“, „localhost: 9092“); configProperties.put („ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG“, „org.apache.kafka.common.serialization.ByteArraySerializer“); configProperties.put („ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG“, „org.apache.kafka.common.serialization.StringSerializer“);  configProperties.put („ProducerConfig.PARTITIONER_CLASS_CONFIG“, „CountryPartitioner.class.getCanonicalName“); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India");  org.apache.kafka.clients.producer.Producer producer = naujas „KafkaProducer“ (configProperties); Eilučių eilutė = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); producer.send (rec, naujas skambutis () {public void onCompletion („RecordMetadata“ metaduomenys, išimties išimtis) {System.out.println („Pranešimas išsiųstas į temą ->„ + metadata.topic () + “, parition->„ + metadata.partition () + "saugomas poslinkyje->" + metaduomenys.offsetas ()); ; }}); eilutė = in.nextLine (); } in.close (); gamintojas.uždaryti (); }} 

Priskirti pertvaras vartotojams

„Kafka“ serveris garantuoja, kad skaidinys priskiriamas tik vienam vartotojui, taip garantuodamas pranešimų vartojimo tvarką. Galite rankiniu būdu priskirti skaidinį arba automatiškai priskirti jį.

Jei jūsų verslo logika reikalauja didesnio valdymo, turėsite rankiniu būdu priskirti skaidinius. Šiuo atveju jūs naudotumėte KafkaConsumer.assign () perduoti kiekvienam vartotojui įdomių skaidinių sąrašą „Kakfa“ serveriui.

Numatytasis ir labiausiai paplitęs pasirinkimas yra automatiškai priskirti skaidinius. Tokiu atveju „Kafka“ serveris kiekvienam vartotojui priskirs skaidinį ir skirs skaidinius pagal naujus vartotojus.

Tarkime, kad kuriate naują temą su trimis pertvaromis. Kai pradėsite pirmąjį naujos temos vartotoją, „Kafka“ priskirs visas tris skaidinius tam pačiam vartotojui. Jei tada pradėsite antrą vartotoją, „Kafka“ iš naujo priskirs visas skaidinius, priskirdama vieną skaidinį pirmajam, o likusius du - antram vartotojui. Jei pridėsite trečią vartotoją, „Kafka“ vėl priskirs pertvaras, kad kiekvienam vartotojui būtų priskirtas vienas skaidinys. Galiausiai, jei pradėsite ketvirtą ir penktą vartotojus, trys vartotojai turės priskirtą skaidinį, bet kiti negaus jokių pranešimų. Jei viena iš trijų pradinių skaidinių neveikia, „Kafka“ naudos tą pačią skaidymo logiką, kad priskirtų to vartotojo skaidinį vienam iš papildomų vartotojų.

$config[zx-auto] not found$config[zx-overlay] not found