Programavimas

Kaip naudoti „Redis“ srauto apdorojimui realiuoju laiku

Roshanas Kumaras yra „Redis Labs“ vyresnysis produktų vadovas.

Duomenų perdavimas srautu realiuoju laiku yra įprastas reikalavimas daugeliui didelių duomenų naudojimo atvejų. Tokiose srityse kaip daiktų internetas, elektroninė komercija, saugumas, ryšiai, pramogos, finansai ir mažmeninė prekyba, kur tiek daug priklauso nuo laiku ir tiksliai pagrįsto sprendimų priėmimo, realiuoju laiku duomenų rinkimas ir analizė yra verslo pagrindas.

Tačiau kaupiant, saugant ir apdorojant srautinius duomenis dideliais kiekiais ir dideliu greičiu, kyla architektūrinių iššūkių. Svarbus pirmasis žingsnis atliekant duomenų analizę realiuoju laiku yra užtikrinti, kad būtų tinkamų tinklo, skaičiavimo, saugojimo ir atminties išteklių, kad būtų galima užfiksuoti greitus duomenų srautus. Tačiau įmonės programinės įrangos krūva turi atitikti jos fizinės infrastruktūros našumą. Priešingu atveju verslui teks susidurti su didžiuliu duomenų kaupimu arba, dar blogiau, trūkstamais ar neišsamiais duomenimis.

Redis tapo populiariu pasirinkimu tokiems greito duomenų įvedimo scenarijams. Lengva atmintyje esanti duomenų bazės platforma „Redis“ pasiekia milijonų operacijų per sekundę greitį, naudodama milisekundžių trukmės delsas, tuo pačiu pasitelkdama minimalius išteklius. Jis taip pat siūlo paprastus diegimus, kuriuos įgalina kelios duomenų struktūros ir funkcijos.

Šiame straipsnyje aš parodysiu, kaip „Redis Enterprise“ gali išspręsti įprastus iššūkius, susijusius su didelės apimties didelės spartos duomenų kaupimu ir apdorojimu. Peržiūrėsime tris skirtingus būdus (įskaitant kodą), kaip apdoroti „Twitter“ kanalą realiuoju laiku, naudojant „Redis Pub / Sub“, „Redis“ sąrašus ir „Redis“ rūšiuojamus rinkinius. Kaip matysime, visi trys metodai gali atlikti greitą duomenų įvedimą, atsižvelgiant į naudojimo atvejį.

Kuriant greitus duomenų perdavimo sprendimus yra iššūkiai

Didelio greičio duomenų įvedimas dažnai apima keletą skirtingų sudėtingumo tipų:

  • Didelės apimties duomenys kartais atkeliauja į serijas. „Bursty“ duomenims reikalingas sprendimas, galintis apdoroti didelius duomenų kiekius su minimalia vėlavimo trukme. Idealiu atveju ji turėtų sugebėti atlikti milijonus rašymų per sekundę naudodama milisekundžių trukmės vėlavimą, naudodama minimalius išteklius.
  • Duomenys iš kelių šaltinių. Duomenų įvedimo sprendimai turi būti pakankamai lankstūs, kad būtų galima tvarkyti duomenis įvairiais formatais, prireikus išsaugoti šaltinio tapatybę ir realiuoju laiku transformuoti ar normalizuoti.
  • Duomenys, kuriuos reikia filtruoti, analizuoti ar persiųsti. Daugelyje duomenų įvedimo sprendimų yra vienas ar daugiau abonentų, kurie duomenis naudoja. Tai dažnai yra skirtingos programos, veikiančios tose pačiose ar skirtingose ​​vietose, naudojant įvairias prielaidas. Tokiais atvejais duomenų bazėje reikia ne tik transformuoti duomenis, bet ir filtruoti ar kaupti, atsižvelgiant į vartojančių programų reikalavimus.
  • Duomenys, gaunami iš geografiškai paskirstytų šaltinių. Pagal šį scenarijų dažnai yra patogu paskirstyti duomenų rinkimo mazgus, pastatant juos arti šaltinių. Patys mazgai tampa greito duomenų įvedimo sprendimo dalimi, kad būtų galima rinkti, apdoroti, persiųsti arba nukreipti perimtus duomenis.

Greitas duomenų apdorojimas „Redis“ sistemoje

Daugybė sprendimų, palaikančių greitą duomenų įsisavinimą, yra sudėtingi, turtingi funkcijų ir per daug sukurti, kad atitiktų paprastus reikalavimus. Kita vertus, „Redis“ yra ypač lengvas, greitas ir lengvai naudojamas. Turėdami klientų daugiau nei 60 kalbų, „Redis“ galima lengvai integruoti į populiarius programinės įrangos paketus.

„Redis“ siūlo tokias duomenų struktūras kaip sąrašai, rinkiniai, rūšiuojami rinkiniai ir maišos, kurios siūlo paprastą ir universalų duomenų apdorojimą. „Redis“ atlieka daugiau nei milijoną skaitymo / rašymo operacijų per sekundę, o vidutinio dydžio prekių debesies egzempliorius vėluoja po milisekundžių, todėl tai labai efektyvu naudojant didelius duomenų kiekius. „Redis“ taip pat palaiko žinučių siuntimo paslaugas ir klientų bibliotekas visomis populiariomis programavimo kalbomis, todėl tai puikiai tinka derinti didelės spartos duomenų įvedimą ir realaus laiko analizę. „Redis Pub / Sub“ komandos leidžia atlikti pranešimų tarpininko vaidmenį tarp leidėjų ir abonentų - ši funkcija dažnai naudojama pranešimams ar pranešimams siųsti tarp paskirstytų duomenų įvedimo mazgų.

„Redis Enterprise“ patobulina „Redis“ sklandų mastelį, visada prieinamą prieinamumą, automatizuotą diegimą ir galimybę naudoti ekonomišką „flash“ atmintį kaip RAM išplėtėją, kad būtų galima ekonomiškai apdoroti didelius duomenų rinkinius.

Toliau pateiktuose skyriuose aš apibūdinsiu, kaip naudoti „Redis Enterprise“, siekiant išspręsti įprastas duomenų sugėrimo problemas.

Redis „Twitter“ greičiu

Norėdami iliustruoti „Redis“ paprastumą, ištirsime greito duomenų įvedimo sprendimo pavyzdį, kuris renka pranešimus iš „Twitter“ sklaidos kanalo. Šio sprendimo tikslas yra apdoroti tweetus realiuoju laiku ir juos apdorojant stumti žemyn.

Tada „Twitter“ duomenys, kuriuos nurodo sprendimas, sunaudojami keliems procesoriams. Kaip parodyta 1 paveiksle, šiame pavyzdyje kalbama apie du procesorius - anglų „Tweet“ procesorių ir „Influencer“ procesorių. Kiekvienas procesorius filtruoja „tweets“ ir perduoda juos atitinkamais kanalais kitiems vartotojams. Ši grandinė gali eiti tiek, kiek reikia sprendimo. Tačiau savo pavyzdžiu sustojame trečiajame lygyje, kur apibendriname populiarias diskusijas tarp anglakalbių ir geriausių influencerių.

„Redis Labs“

Atkreipkite dėmesį, kad mes naudojame „Twitter“ informacijos santraukų apdorojimo pavyzdį dėl duomenų atvykimo greičio ir paprastumo. Taip pat atkreipkite dėmesį, kad „Twitter“ duomenys pasiekia mūsų greitą duomenų per vieną kanalą. Daugeliu atvejų, pavyzdžiui, IoT, gali būti keli duomenų šaltiniai, siunčiantys duomenis į pagrindinį imtuvą.

Yra trys galimi šio sprendimo įgyvendinimo būdai naudojant „Redis“: suvartoti naudojant „Redis Pub / Sub“, suvartoti naudojant sąrašo duomenų struktūrą arba surūšiuoti naudojant duomenų rūšiavimo rūšiavimo rinkinį. Panagrinėkime kiekvieną iš šių variantų.

Nurykite su Redis Pub / Sub

Tai paprasčiausias greito duomenų įvedimo būdas. Šis sprendimas naudoja „Redis“ „Pub / Sub“ funkciją, leidžiančią programoms skelbti ir užsiprenumeruoti pranešimus. Kaip parodyta 2 paveiksle, kiekvienas etapas apdoroja duomenis ir paskelbia juos kanale. Sekantis etapas prenumeruoja kanalą ir gauna pranešimus tolesniam apdorojimui ar filtravimui.

„Redis Labs“

Argumentai "už"

  • Lengva įgyvendinti.
  • Gerai veikia, kai duomenų šaltiniai ir procesoriai yra paskirstyti geografiškai.

Minusai

  • Sprendimas reikalauja, kad leidėjai ir abonentai nuolat veiktų. Abonentai praranda duomenis sustabdę arba nutrūkus ryšiui.
  • Tam reikia daugiau ryšių. Programa negali skelbti ir užsiprenumeruoti to paties ryšio, todėl kiekvienam tarpiniam duomenų procesoriui reikalingi du ryšiai - vienas norint prenumeruoti, kitas - paskelbti. Jei „Redis“ vykdote DBaaS platformoje, svarbu patikrinti, ar jūsų pakete ar paslaugų lygyje nėra jokių apribojimų jungčių skaičiui.

Pastaba apie ryšius

Jei kanalą užsisako daugiau nei vienas klientas, Redis kiekvienam klientui duomenis perduoda tiesiškai, vienas po kito. Didelės duomenų apkrovos ir daugybė jungčių gali sukelti delsą tarp leidėjo ir jo abonentų. Nors numatytoji maksimalaus jungčių skaičiaus ribinė vertė yra 10 000, turite patikrinti ir palyginti, kiek jungčių tinka jūsų naudingajai apkrovai.

Redis palaiko kliento išvesties buferį kiekvienam klientui. Numatytieji „Pub / Sub“ kliento išvesties buferio apribojimai yra nustatyti:

kliento-išvesties-buferio ribos „pubsub“ 32 MB 8 MB 60

Pasirinkus šį nustatymą, „Redis“ privers klientus atsijungti dviem sąlygomis: jei išvesties buferis išaugs virš 32 MB arba jei išvesties buferis 60 sekundžių nuosekliai talpins 8 MB duomenų.

Tai rodo, kad klientai duomenis vartoja lėčiau nei jie yra paskelbti. Iškilus tokiai situacijai, pirmiausia pabandykite optimizuoti vartotojus taip, kad vartotojai nepridėtų vėlavimo. Jei pastebėsite, kad jūsų klientai vis dar atjungiami, galite padidinti kliento-išvesties-buferio ribos pubsub nuosavybė redis.conf. Atminkite, kad bet kokie nustatymų pakeitimai gali padidinti leidėjo ir abonento vėlavimą. Visi pakeitimai turi būti kruopščiai išbandyti ir patikrinti.

„Redis Pub / Sub“ sprendimo kodo dizainas

„Redis Labs“

Tai paprasčiausias iš trijų šiame straipsnyje aprašytų sprendimų. Čia pateikiamos svarbios „Java“ klasės, įdiegtos šiam sprendimui. Atsisiųskite šaltinio kodą su visišku įgyvendinimu čia: //github.com/redislabsdemo/IngestPubSub.

Abonentas klasė yra pagrindinė šio dizaino klasė. Kiekvienas Abonentas objektas palaiko naują ryšį su Rediu.

klasės abonentas pratęsia „JedisPubSub“ padargus „Runnable“

asmeninės eilutės pavadinimas;

privatus „RedisConnection“ jungtis = nulis;

privatus Jedis jedis = null;

privati ​​eilutė abonento kanalas;

public Subscriber (String subscriberName, String channelName) išmeta išimtį {

vardas = abonento vardas;

subscriberChannel = channelName;

Siūlas t = naujas siūlas (tai);

t.startas ();

       }

@ Nepaisyti

public void run () {

bandyti{

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

while (tiesa) {

jedis.subscribe (tai, tai.subscriberChannel);

                      }

} laimikis (e išimtis) {

e.printStackTrace ();

              }

       }

@ Nepaisyti

public void onMessage (eilutės kanalas, eilutės pranešimas) {

super.onMessage (kanalas, pranešimas);

       }

}

Leidėjas klasė palaiko atskirą ryšį su Rediu, kad skelbtų pranešimus kanale.

public class Publisher {

RedisConnection jungtis = nulis;

Jedis jedis = null;

privatus stygų kanalas;

public Publisher (String channelName) išmeta išimtį {

kanalas = channelName;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (eilutės žinutė) išmeta išimtį {

jedis.publish (kanalas, msg);

       }

}

EnglishTweetFilter, „InfluencerTweetFilter“, „HashTagCollector“ir „InfluencerCollector“ filtrai tęsiasi Abonentas, kuri leidžia jiems klausytis gaunamų kanalų. Kadangi jums reikia atskirų „Redis“ jungčių, kad galėtumėte užsiprenumeruoti ir paskelbti, kiekviena filtrų klasė turi savo RedisConnection objektas. Filtrai klausosi naujų pranešimų savo kanaluose. Čia yra pavyzdinis kodas EnglishTweetFilter klasė:

public class EnglishTweetFilter prailgina abonentą

{

privatus „RedisConnection“ jungtis = nulis;

privatus Jedis jedis = null;

privati ​​eilutė publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) išmeta išimtį {

super (vardas, abonento kanalas);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@ Nepaisyti

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = naujas JsonParser ();

JsonElement jsonElement = jsonParser.parse (pranešimas);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filtruoti pranešimus: skelbkite tik „tweets“ angliškai

if (jsonObject.get („lang“)! = null &&

jsonObject.get („lang“). getAsString (). lygu („en“)) {

jedis.publish (leidėjo kanalas, pranešimas);

              }

       }

}

Leidėjas klasėje yra paskelbimo metodas, kuris skelbia pranešimus reikiamu kanalu.

public class Publisher {

.

.     

public void paskelbti (eilutės žinutė) išmeta išimtį {

jedis.publish (kanalas, msg);

       }

.

}

Pagrindinė klasė skaito duomenis iš įkeliamo srauto ir paskelbia juos „AllData“ kanalą. Pagrindinis šios klasės metodas paleidžia visus filtro objektus.

viešoji klasė „IngestPubSub“

{

.

public void start () meta išimtį {

       .

       .

leidėjas = naujas leidėjas („AllData“);

englishFilter = new EnglishTweetFilter („English Filter“, „AllData“,

„EnglishTweets“);

influencerFilter = naujas „InfluencerTweetFilter“ („Influencerio filtras“,

„AllData“, „InfluencerTweets“);

hashtagCollector = new HashTagCollector („Hashtag Collector“,

„EnglishTweets“);

influencerCollector = naujas „InfluencerCollector“ („Influencer Collector“,

„InfluencerTweets“);

       .

       .

}

Nurykite su „Redis“ sąrašais

„Redis“ sąrašo duomenų struktūra palengvina eiliškumo sprendimo įgyvendinimą. Šiuo sprendimu gamintojas nustumia kiekvieną pranešimą į eilės galą, o abonentas apklausia eilę ir ištraukia naujus pranešimus iš kito galo.

„Redis Labs“

Argumentai "už"

  • Šis metodas yra patikimas ryšio praradimo atvejais. Kai duomenys įterpiami į sąrašus, jie ten saugomi, kol abonentai juos perskaitys. Tai tiesa, net jei abonentai yra sustabdyti arba praranda ryšį su „Redis“ serveriu.
  • Gamintojai ir vartotojai nereikalauja jokio ryšio.

Minusai

  • Ištraukus duomenis iš sąrašo, jie pašalinami ir jų negalima vėl gauti. Jei vartotojai neišlaiko duomenų, jie prarandami, kai tik suvartojami.
  • Kiekvienam vartotojui reikalinga atskira eilė, kuriai reikia saugoti kelias duomenų kopijas.

„Redis Lists“ sprendimo kodo dizainas

„Redis Labs“

Redis sąrašų sprendimo šaltinio kodą galite atsisiųsti čia: //github.com/redislabsdemo/IngestList. Pagrindinės šio sprendimo klasės yra paaiškintos toliau.

„MessageList“ įterpiama „Redis List“ duomenų struktūra. stumti () metodas nustumia naują pranešimą į kairę nuo eilės ir pop () laukia naujo pranešimo iš dešinės, jei eilė tuščia.

viešosios klasės „MessageList“ {

apsaugotas eilutės pavadinimas = „MyList“; // Vardas

.

.     

public void push (String msg) meta išimtį {

jedis.lpush (vardas, msg); // Kairysis stumdymas

       }

viešasis styginių popsas () meta išimtį {

grąžinti jedis.brpop (0, vardas) .toString ();

       }

.

.

}

„MessageListener“ yra abstrakti klasė, įgyvendinanti klausytojų ir leidėjų logiką. A „MessageListener“ objektas klausosi tik vieno sąrašo, bet gali paskelbti keliais kanalais („MessageFilter“ objektai). Šis sprendimas reikalauja atskiro „MessageFilter“ objektas kiekvienam abonentui.

klasės „MessageListener“ įgyvendina „Runnable“

privačios eilutės pavadinimas = null;

privatus žinučių sąrašas inboundList = null;

Žemėlapis outBoundMsgFilters = naujas HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

if (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@ Nepaisyti

public void run () {

.

while (tiesa) {

String msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

apsaugotas tuštumas „pushMessage“ (String msg) išmeta išimtį {

Set outBoundMsgNames = outBoundMsgFilters.keySet ();

skirta (eilutės pavadinimas: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (vardas);

msgList.filterAndPush (msg);

              }

       }

}

„MessageFilter“ yra tėvų klasė, padedanti filterAndPush () metodas. Duomenims tekant per įsiurbimo sistemą, jie dažnai filtruojami arba transformuojami prieš siunčiant į kitą etapą. Klasės, pratęsiančios „MessageFilter“ klasė nepaiso filterAndPush () metodą ir įgyvendinti savo logiką, kad filtruotas pranešimas būtų perkeltas į kitą sąrašą.

viešoji klasė „MessageFilter“ {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) išmeta išimtį {

messageList.push (msg);

       }

.

.     

}

„AllTweetsListener“ yra pavyzdinis a „MessageListener“ klasė. Čia išklausomi visi „Twitter“ tweetai „AllData“ kanalą ir skelbia duomenis EnglishTweetsFilter ir „InfluencerFilter“.

viešoji klasė „AllTweetsListener“ pratęsia „MessageListener“ {

.

.     

public static void main (String [] args) meta išimtį {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (naujas

„EnglishTweetsFilter“ („EnglishTweetsFilter“, „EnglishTweets“));

allTweetsProcessor.registerOutBoundMessageList (naujas

„InfluencerFilter“ („InfluencerFilter“, „Influencers“));

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter tęsiasi „MessageFilter“. Ši klasė įgyvendina logiką, kad pasirinktų tik tuos tweetus, kurie pažymėti kaip angliški tweetai. Filtras atmeta ne angliškus tweetus ir angliškus tweets perkelia į kitą sąrašą.

public class EnglishTweetsFilter pratęsia MessageFilter {

public EnglishTweetsFilter (String name, String listName) išmeta išimtį {

super (vardas, sąrašasName);

       }

@ Nepaisyti

public void filterAndPush (eilutės pranešimas) išmeta išimtį {

JsonParser jsonParser = naujas JsonParser ();

JsonElement jsonElement = jsonParser.parse (žinutė);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

if (jsonObject.get („lang“)! = null &&

jsonObject.get („lang“). getAsString (). lygu („en“)) {

Jedis jedis = super.getJedisInstance ();

jei (jedis! = null) {

jedis.lpush (super.pavadinimas, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found