Programavimas

Sukurtas realiuoju laiku: didelis duomenų perdavimas naudojant „Apache Kafka“, 1 dalis

Prasidėjus didžiajam duomenų judėjimui, daugiausia dėmesio buvo skiriama paketiniam apdorojimui. Paskirstyti duomenų saugojimo ir užklausų įrankiai, pvz., „MapReduce“, „Hive“ ir „Pig“, buvo sukurti duomenims apdoroti paketais, o ne nuolat. Verslas kiekvieną vakarą vykdydavo kelis darbus, kad iš duomenų bazės gautų duomenis, tada juos analizuotų, transformuotų ir galų gale saugotų. Neseniai įmonės atrado duomenų ir įvykių analizės ir apdorojimo galimybes kaip jie nutinka, ne tik kartą per kelias valandas. Tačiau dauguma tradicinių susirašinėjimo sistemų nėra pritaikytos dideliems duomenims tvarkyti realiuoju laiku. Taigi „LinkedIn“ inžinieriai sukūrė ir atvirą šaltinį „Apache Kafka“: paskirstytą pranešimų sistemą, kuri tenkina didelių duomenų poreikius, keisdama prekių aparatinę įrangą.

Per pastaruosius kelerius metus „Apache Kafka“ pasirodė sprendžiant įvairius naudojimo atvejus. Paprasčiausiu atveju tai gali būti paprastas buferis programų žurnalams saugoti. Kartu su tokia technologija, kaip „Spark Streaming“, ji gali būti naudojama duomenų pokyčiams stebėti ir imtis veiksmų dėl tų duomenų prieš juos išsaugant galutinėje paskirties vietoje. „Kafka“ nuspėjamasis režimas daro jį galingu įrankiu aptikti sukčiavimą, pavyzdžiui, patikrinti kreditinės kortelės operacijos pagrįstumą, kai tai įvyksta, ir nelaukti, kol paketais bus apdorojamos valandos vėliau.

Ši dviejų dalių mokymo programa supažindina su „Kafka“, pradedant nuo to, kaip ją įdiegti ir paleisti savo kūrimo aplinkoje. Gausite „Kafka“ architektūros apžvalgą, po kurios bus pristatyta „Apache Kafka“ pranešimų siuntimo sistemos komplektacija. Galiausiai sukursite pasirinktinę gamintojo / vartotojo programą, kuri siunčia ir vartoja pranešimus per „Kafka“ serverį. Antroje mokymo programos pusėje sužinosite, kaip suskirstyti ir grupuoti pranešimus ir kaip valdyti, kokius pranešimus vartos „Kafka“ vartotojas.

Kas yra Apache Kafka?

„Apache Kafka“ yra pranešimų sistema, sukurta dideliems duomenims skleisti. Panašiai kaip „Apache ActiveMQ“ ar „RabbitMq“, „Kafka“ leidžia skirtingose ​​platformose sukurtoms programoms bendrauti per asinchroninį pranešimų perdavimą. Tačiau „Kafka“ skiriasi nuo šių tradicinių pranešimų sistemų pagrindiniais būdais:

  • Jis sukurtas horizontaliai keisti mastelį, pridedant daugiau prekių serverių.
  • Tai suteikia daug didesnį pralaidumą tiek gamintojo, tiek vartotojo procesams.
  • Jis gali būti naudojamas tiek partijos, tiek realaus laiko naudojimo atvejams palaikyti.
  • Jis nepalaiko JMS, „Java“ orientuoto tarpinės programinės įrangos API.

Apache Kafkos architektūra

Prieš pradėdami tyrinėti „Kafka“ architektūrą, turėtumėte žinoti pagrindinę jos terminologiją:

  • A gamintojas yra procesas, galintis paskelbti pranešimą tema.
  • a vartotojas yra procesas, kurio metu galima užsiprenumeruoti vieną ar daugiau temų ir naudoti temoms paskelbtus pranešimus.
  • A temos kategorija yra kanalo, kuriame skelbiami pranešimai, pavadinimas.
  • A brokeris yra procesas, vykdomas vienoje mašinoje.
  • A klasteris yra brokerių grupė, dirbanti kartu.

„Apache Kafka“ architektūra yra labai paprasta, dėl to kai kuriose sistemose gali būti geresnis našumas ir pralaidumas. Kiekviena „Kafka“ tema yra kaip paprastas žurnalo failas. Kai gamintojas paskelbia pranešimą, „Kafka“ serveris prideda jį prie nurodytos temos žurnalo failo pabaigos. Serveris taip pat priskiria kompensuoti, kuris yra numeris, naudojamas visam pranešimui visam laikui identifikuoti. Didėjant pranešimų skaičiui, kiekvieno poslinkio vertė didėja; pavyzdžiui, jei gamintojas paskelbia tris pranešimus, pirmasis gali gauti 1, antrasis - 2 ir trečiasis - 3 poslinkį.

Kai „Kafka“ vartotojas pirmą kartą paleidžiamas, jis išsiųs užklausą serveriui, prašydamas gauti bet kokius pranešimus apie tam tikrą temą, kurios poslinkio vertė yra didesnė nei 0. Serveris patikrins tos temos žurnalo failą ir grąžins tris naujus pranešimus. . Vartotojas apdoros pranešimus, tada išsiųs prašymą išsiųsti pranešimus didesnis nei 3 ir t.

„Kafka“ klientas yra atsakingas už atskaitos atsiskaitymą ir pranešimų gavimą. „Kafka“ serveris nestebi ir nevaldo pranešimų vartojimo. Pagal numatytuosius nustatymus „Kafka“ serveris išsaugos pranešimą septynias dienas. Fono gija serveryje tikrina ir ištrina septynių dienų ar vyresnius pranešimus. Vartotojas gali pasiekti pranešimus tol, kol jie yra serveryje. Jis gali skaityti pranešimą kelis kartus ir netgi skaityti pranešimus atvirkštine gavimo tvarka. Bet jei vartotojui nepavyksta gauti pranešimo nepasibaigus septynioms dienoms, jis praleis tą pranešimą.

Kafkos etalonai

„LinkedIn“ ir kitų įmonių naudojama produkcija parodė, kad tinkamai sukonfigūravusi „Apache Kafka“ gali kasdien apdoroti šimtus gigabaitų duomenų. 2011 m. Trys „LinkedIn“ inžinieriai naudojo etalonų testavimą, kad įrodytų, jog „Kafka“ gali pasiekti daug didesnį pralaidumą nei „ActiveMQ“ ir „RabbitMQ“.

„Apache Kafka“ greita sąranka ir demonstracinė versija

Šioje pamokoje sukursime pasirinktinę programą, tačiau pradėkime nuo „Kafka“ egzemplioriaus įdiegimo ir testavimo kartu su gamintoju ir vartotoju.

  1. Apsilankykite „Kafka“ atsisiuntimo puslapyje ir įdiekite naujausią versiją (šio rašymo metu - 0,9).
  2. Ištraukite dvejetainius failus į a programinė įranga / kafka aplanką. Dabartinei versijai tai programinė įranga / kafka_2.11-0.9.0.0.
  3. Pakeiskite savo dabartinį katalogą į naują aplanką.
  4. Paleiskite „Zookeeper“ serverį vykdydami komandą: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Paleiskite „Kafka“ serverį vykdydami: bin / kafka-server-start.sh config / server.properties.
  6. Sukurkite testo temą, kurią galėsite naudoti testavimui: bin / kafka-topics.sh --create --zookeeper localhost: 2181 - replikacijos koeficientas 1 - pertvaros 1 - tema javaworld.
  7. Pradėkite paprastą pulto vartotoją, kuris galėtų vartoti pranešimus, paskelbtus pagal tam tikrą temą, pvz javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 - tema javaworld - nuo pradžios.
  8. Paleiskite paprastą gamintojo konsolę, kuri gali paskelbti pranešimus apie bandomąją temą: bin / kafka-console-producer.sh - broker-list localhost: 9092 - tema javaworld.
  9. Pabandykite įvesti vieną ar du pranešimus į gamintojo pultą. Jūsų pranešimai turėtų būti rodomi vartotojų konsolėje.

Programos su „Apache Kafka“ pavyzdys

Jūs matėte, kaip Apache Kafka veikia iš dėžutės. Tada sukurkime pritaikytą gamintojo / vartotojo programą. Gamintojas atkurs vartotojo įvestį iš konsolės ir išsiųs kiekvieną naują eilutę kaip pranešimą į „Kafka“ serverį. Vartotojas gaus nurodytos temos pranešimus ir atsispausdins juos į konsolę. Šiuo atveju gamintojo ir vartotojo komponentai yra jūsų pačių įgyvendinta kafka- console-producer.sh ir kafka-console-consumer.sh.

Pradėkime nuo a Prodiuseris.java klasė. Šioje klientų klasėje yra logika skaityti vartotojo įvestį iš konsolės ir nusiųsti tą įvestį kaip pranešimą į „Kafka“ serverį.

Mes sukonfigūruojame gamintoją sukurdami objektą iš java.util.Nuosavybės klasė ir jos savybių nustatymas. „ProducerConfig“ klasė apibrėžia visas skirtingas turimas savybes, tačiau daugumai paskirčių pakanka numatytųjų „Kafka“ verčių. Numatytajai konfigūracijai turime nustatyti tik tris privalomas ypatybes:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG („bootstrap.servers“) nustato pagrindinio kompiuterio: uosto porų, naudojamų užmegzti pradinius ryšius su "Kakfa" grupe, sąrašą host1: port1, host2: port2, ... formatas. Net jei mūsų „Kafka“ klasteryje yra daugiau nei vienas brokeris, mums reikia nurodyti tik pirmojo brokerio vertę šeimininkas: uostas. „Kafka“ klientas naudos šią vertę norėdamas paskambinti brokeriui, kuris pateiks visų klasterio brokerių sąrašą. Patartina programoje nurodyti daugiau nei vieną brokerį BOOTSTRAP_SERVERS_CONFIG, todėl jei tas pirmasis brokeris nedirbs, klientas galės išbandyti kitus brokerius.

„Kafka“ serveris tikisi pranešimų baito [] raktas, baito [] reikšmė formatas. Užuot konvertuoję kiekvieną raktą ir vertę, „Kafka“ kliento biblioteka leidžia mums naudoti draugiškesnius tipus Stygos ir tarpt pranešimams siųsti. Biblioteka konvertuos juos į tinkamą tipą. Pavyzdžiui, programos pavyzdyje nėra konkretaus pranešimo rakto, todėl mes jį naudosime niekinis už raktą. Vertei naudosime a Stygos, kuris yra vartotojo įvestas į konsolę duomenys.

Norėdami sukonfigūruoti pranešimo raktą, nustatėme reikšmę KEY_SERIALIZER_CLASS_CONFIG ant org.apache.kafka.common.serialization.ByteArraySerializer. Tai veikia todėl, kad niekinis nereikia konvertuoti į baitas []. Už pranešimo vertė, mes nustatėme VALUE_SERIALIZER_CLASS_CONFIG ant org.apache.kafka.common.serialization.StringSerializer, nes ta klasė moka konvertuoti a Stygos į a baitas [].

Pasirinkto rakto / vertės objektai

Panašus į „StringSerializer“, Kafka pateikia serijinius kitų primityvų, tokių kaip tarpt ir ilgas. Norėdami naudoti pasirinktinį objektą savo raktui ar vertei, turėtume sukurti klasę org.apache.kafka.common.serialization.Serializer. Tada galėtume pridėti logiką, kad klasė būtų nuosekli baitas []. Mes taip pat turėtume naudoti atitinkamą deserializer mūsų vartotojo kodą.

„Kafka“ gamintojas

Užpildę Savybės klasę su būtinomis konfigūracijos savybėmis, galime ją naudoti kurdami objektą KafkaGamintojas. Kai tik norime išsiųsti pranešimą „Kafka“ serveriui, sukursime objektą ProducerRecord ir paskambinkite KafkaGamintojas's siųsti () metodas su tuo įrašu išsiųsti pranešimą. ProducerRecord turi du parametrus: temos, į kurią turėtų būti paskelbtas pranešimas, pavadinimą ir tikrąjį pranešimą. Nepamirškite paskambinti Producer.close () metodas, kai baigsite naudoti gamintoją:

Sąrašas 1. „KafkaProducer“

 public class Producer {private static Scanner in; public static void main (String [] argv) išmeta išimtį {if (argv.length! = 1) {System.err.println ("Nurodykite 1 parametrą"); System.exit (-1); } Styginių temaName = argv [0]; in = naujas skaitytuvas (System.in); System.out.println ("Įveskite pranešimą (įveskite exit, jei norite išeiti)"); // Konfigūruokite gamintojo ypatybes configProperties = new Properties (); configProperties.put („ProducerConfig.BOOTSTRAP_SERVERS_CONFIG“, „localhost: 9092“); configProperties.put („ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG“, „org.apache.kafka.common.serialization.ByteArraySerializer“); configProperties.put („ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG“, „org.apache.kafka.common.serialization.StringSerializer“); org.apache.kafka.clients.producer.Producer producer = naujas „KafkaProducer“ (configProperties); Eilučių eilutė = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = naujas ProducerRecord (topicName, eilutė); gamintojas.siųsti (rec); eilutė = in.nextLine (); } in.close (); gamintojas.uždaryti (); }} 

Konfigūruoti pranešimo vartotoją

Tada sukursime paprastą vartotoją, kuris užsiprenumeruos temą. Kai tik tema bus paskelbtas naujas pranešimas, jis tą pranešimą perskaitys ir atspausdins į konsolę. Vartotojo kodas yra gana panašus į gamintojo kodą. Mes pradedame kurti objektą java.util.Nuosavybės, nustatant specifines vartotojui savybes ir naudojant ją kuriant naują objektą „Kafka“ vartotojas. „ConsumerConfig“ klasė apibrėžia visas ypatybes, kurias galime nustatyti. Yra tik keturios privalomos savybės:

  • BOOTSTRAP_SERVERS_CONFIG („bootstrap.servers“)
  • KEY_DESERIALIZER_CLASS_CONFIG (raktas. Deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Kaip ir gamintojų klasei, taip ir naudosime BOOTSTRAP_SERVERS_CONFIG konfigūruoti pagrindinio kompiuterio / prievado poras vartotojų klasei. Ši konfigūracija leidžia mums užmegzti pradinius ryšius su „Kakfa“ grupe host1: port1, host2: port2, ... formatas.

Kaip jau minėjau anksčiau, „Kafka“ serveris tikisi pranešimų baitas [] klavišą ir baitas [] vertės formatus ir turi savo realizavimo būdą, leidžiantį serijuoti įvairius tipus į baitas []. Lygiai taip pat, kaip tai darėme su gamintoju, vartotojai, norėdami konvertuoti, turėsime naudoti pasirinktinį deserializatorių baitas [] atgal į atitinkamą tipą.

Programos pavyzdžio atveju mes žinome, kad gamintojas naudoja „ByteArraySerializer“ už raktą ir „StringSerializer“ už vertę. Todėl kliento pusėje turime naudoti org.apache.kafka.common.serialization.ByteArrayDeserializer už raktą ir org.apache.kafka.common.serialization.StringDeserializer už vertę. Tų klasių nustatymas kaip reikšmes KEY_DESERIALIZER_CLASS_CONFIG ir VALUE_DESERIALIZER_CLASS_CONFIG leis vartotojui deserializuotis baitas [] koduoti tipai, kuriuos siunčia gamintojas.

Galiausiai turime nustatyti GROUP_ID_CONFIG. Tai turėtų būti grupės pavadinimas eilutės formatu. Per minutę paaiškinsiu daugiau apie šią konfigūraciją. Kol kas tiesiog pažvelkite į „Kafka“ vartotoją su keturiomis privalomomis savybėmis:

$config[zx-auto] not found$config[zx-overlay] not found