Kako izraditi aplikacije za streaming sa statusom state with Apache Flink

Fabian Hueske je obveznik i član PMC-a projekta Apache Flink i suosnivač Data Artisans.

Apache Flink je okvir za implementaciju aplikacija za obradu stream-a s državnim statusom i njihovo pokretanje u opsegu na računarskom klasteru. U prethodnom smo članku ispitali što je to obrada streama sa stanjem, koji se slučajevi upotrebe primjenjuju i zašto biste trebali implementirati i pokretati svoje streaming programe s Apache Flink.

U ovom ću članku predstaviti primjere za dva uobičajena slučaja obrade stream-a s državnim stanjem i raspraviti o tome kako ih se može implementirati s Flinkom. Prvi slučaj upotrebe su aplikacije vođene događajima, tj. Aplikacije koje unose kontinuirani tok događaja i primjenjuju neku poslovnu logiku na te događaje. Drugi je slučaj upotrebe streaming analitike, gdje ću predstaviti dva analitička upita implementirana s Flink-ovim SQL API-jem, koji agregiraju podatke o strujanju u stvarnom vremenu. Mi u Data Artisans pružamo izvorni kod svih naših primjera u javnom GitHub spremištu.

Prije nego što uđemo u detalje primjera, predstavit ću tok događaja koji se unosi u primjere aplikacija i objasniti kako možete pokrenuti kod koji pružamo.

Niz događaja vožnje taksijem

Naši se primjeri aplikacija temelje na javnom skupu podataka o vožnji taksijem koji se dogodio u New Yorku 2013. Organizatori Grand Challengea DEBS iz 2015. (ACM Međunarodna konferencija o distribuiranim sustavima temeljenim na događajima) preuredili su izvorni skup podataka i pretvorili ga u jedna CSV datoteka iz koje čitamo sljedećih devet polja.

  • Medaljon - MD5 zbroj taksija
  • Hack_license - MD5 ID broja taksi dozvole
  • Vrijeme preuzimanja - vrijeme kada su putnici pokupljeni
  • Dropoff_datetime - vrijeme kada su putnici odvoženi
  • Pickup_ Longitude - zemljopisna dužina mjesta preuzimanja
  • Pickup_latitude - zemljopisna širina mjesta preuzimanja
  • Dropoff_longitude - dužina mjesta odlaska
  • Dropoff_latitude - zemljopisna širina mjesta odlaska
  • Total_amount - ukupno plaćeno u dolarima

CSV datoteka pohranjuje zapise u rastućem redoslijedu njihovog atributa vremena ispuštanja. Stoga se datoteka može tretirati kao uređeni dnevnik događaja koji su objavljeni kad je putovanje završilo. Da biste pokrenuli primjere koje pružamo na GitHubu, morate s Google diska preuzeti skup podataka izazova DEBS.

Sve primjere aplikacija sekvencijalno čitaju CSV datoteku i unose je kao tok događaja vožnje taksijem. Od tada, aplikacije obrađuju događaje baš kao i bilo koji drugi tok, tj. Poput toka koji se unosi iz sustava za objavljivanje i pretplatu na temelju dnevnika, poput Apache Kafke ili Kinesisa. U stvari, čitanje datoteke (ili bilo koje druge vrste trajnih podataka) i tretiranje kao tok predstavlja temelj Flinkovog pristupa objedinjavanju batch i stream obrade.

Pokretanje primjera Flinka

Kao što je ranije spomenuto, izvorni kod naših primjera aplikacija objavili smo u GitHub spremištu. Savjetujemo vam da rastavite i klonirate spremište. Primjeri se mogu lako izvršiti unutar vašeg IDE-a po vašem izboru; ne trebate postavljati i konfigurirati Flink klaster za njihovo pokretanje. Prvo uvezite izvorni kod primjera kao Maven projekt. Zatim izvršite glavnu klasu aplikacije i navedite mjesto pohrane podatkovne datoteke (pogledajte vezu za preuzimanje podataka gore) kao programski parametar.

Nakon što pokrenete aplikaciju, pokrenut će lokalnu, ugrađenu instancu Flink unutar JVM postupka aplikacije i poslati je da je izvrši. Vidjet ćete hrpu izvještaja iz dnevnika dok se Flink pokreće i zadaci posla se planiraju. Jednom kada se aplikacija pokrene, njezin će se izlaz zapisati u standardni izlaz.

Izgradnja aplikacije vođene događajima u Flinku

Sada, razgovarajmo o našem prvom slučaju upotrebe, a to je aplikacija vođena događajima. Aplikacije vođene događajima unose tokove događaja, izvode proračune po primanju događaja i mogu emitirati nove događaje ili pokretati vanjske radnje. Više aplikacija vođenih događajima može se sastaviti povezivanjem zajedno putem sustava dnevnika događaja, slično onome kako veliki sustavi mogu biti sastavljeni od mikrousluga. Aplikacije vođene događajima, zapisnici događaja i snimke stanja aplikacija (poznate kao točke spremanja u Flinku) sadrže vrlo moćan obrazac dizajna jer možete resetirati njihovo stanje i ponovno reproducirati njihov unos kako bi se oporavili od kvara, ispravili pogrešku ili migrirali aplikacija na drugi klaster.

U ovom ćemo članku ispitati aplikaciju vođenu događajima koja podupire uslugu koja nadgleda radno vrijeme taksista. 2016. Komisija za taksije i limuzine u New Yorku odlučila je ograničiti radno vrijeme taksista na 12-satne smjene i zahtijevati stanku od najmanje osam sati prije nego što sljedeća smjena može započeti. Smjena započinje s početkom prve vožnje. Od tada, vozač može započeti novu vožnju u roku od 12 sati. Naša aplikacija prati vožnje vozača, označava vrijeme završetka njihovog 12-satnog prozora (tj. Vrijeme kada mogu započeti posljednju vožnju) i označava vožnje koje krše propise. Puni izvorni kod ovog primjera možete pronaći u našem GitHub spremištu.

Naša je aplikacija implementirana s Flinkovim DataStream API-jem i a KeyedProcessFunction. DataStream API je funkcionalni API i zasnovan je na konceptu tipiziranih tokova podataka. A DataStreamje logični prikaz toka događaja tipa T. Tok se obrađuje primjenom funkcije koja na njega stvara drugi tok podataka, moguće drugačijeg tipa. Flink paralelno obrađuje tokove distribuirajući događaje na stream particije i primjenjujući različite instance funkcija na svaku particiju.

Sljedeći isječak koda prikazuje protok naše aplikacije za nadzor na visokoj razini.

// unositi tok vožnje taksijem.

DataStream vožnje = TaxiRides.getRides (env, inputPath);

DataStream obavijesti = vožnje

   // tok particije prema ID-u vozačke dozvole

   .keyBy (r -> r.licenseId)

   // nadgledanje događaja vožnje i generiranje obavijesti

   .process (novi MonitorWorkTime ());

// ispis obavijesti

notifications.print ();

Aplikacija započinje unositi niz događaja vožnje taksijem. U našem primjeru događaji se čitaju iz tekstualne datoteke, raščlanjuju i pohranjuju u TaxiRidePOJO objekte. Aplikacija iz stvarnog svijeta obično unosi događaje iz reda poruka ili dnevnika događaja, poput Apache Kafke ili Pravege. Sljedeći je korak upisivanje TaxiRidedogađaja prema licenseIdvozaču. U keyByrad particije potok na deklariranu terenu, tako da su svi događaji s istim ključem obrađuje na isti paralelni primjer slijedećeg funkcije. U našem slučaju razdvajamo se na licenseIdterenu jer želimo pratiti radno vrijeme svakog pojedinog vozača.

Dalje, primjenjujemo MonitorWorkTimefunkciju na particionirane TaxiRidedogađaje. Funkcija prati vožnje po vozaču i nadgleda njihove smjene i vremena pauze. Emitira događaje tipa Tuple2, gdje svaki nabor predstavlja obavijest koja se sastoji od licence ID-a vozača i poruke. Konačno, naša aplikacija emitira poruke ispisujući ih na standardni izlaz. Aplikacija u stvarnom svijetu napisala bi obavijesti na vanjsku poruku ili sustav za pohranu, poput Apache Kafke, HDFS-a ili sustava baze podataka, ili bi pokrenula vanjski poziv da ih odmah potisne.

Sad kad smo razgovarali o ukupnom toku aplikacije, pogledajmo MonitorWorkTimefunkciju koja sadrži većinu stvarne poslovne logike aplikacije. MonitorWorkTimeFunkcija je sa stanjem transakcije KeyedProcessFunctionda ingests TaxiRidedogađanja i emitira Tuple2zapisa. KeyedProcessFunctionSučelje ima dvije metode za obradu podataka: processElement()a onTimer(). processElement()Metoda se zove za svaki događaj koji dolaze. onTimer()Metoda se zove kad već registrirani požara timera. Sljedeći isječak prikazuje kostur MonitorWorkTimefunkcije i sve što je deklarirano izvan metoda obrade.

javna statička klasa MonitorWorkTime

    proširuje KeyedProcessFunction {

  // vremenske konstante u milisekundama

  privatni statički konačni long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 sati

  privatni statički konačni long REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 sati

  privatni statički konačni long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 sata

 privatni prijelazni formatter DateTimeFormatter;

  // držač stanja za pohranu vremena početka smjene

  ValueState shiftStart;

  @Preuzmi

  javna praznina otvorena (konfiguracija konfiguracije) {

    // registar stanja registra

    shiftStart = getRuntimeContext (). getState (

      novi ValueStateDescriptor (“shiftStart”, Types.LONG));

    // inicijalizira formatter vremena

    this.formatter = DateTimeFormat.forPattern (“yyyy-MM-dd HH: mm: ss”);

  }

  // processElement () i onTimer () detaljno su razmotreni u nastavku.

}

Funkcija deklarira nekoliko konstanti vremenskih intervala u milisekundama, oblikovač vremena i obrađivač stanja za ključeno stanje kojim upravlja Flink. Upravljano stanje povremeno se provjerava i automatski obnavlja u slučaju kvara. Stanje ključa organizirano je po ključu, što znači da će funkcija održavati jednu vrijednost po upravljaču i ključu. U našem slučaju, MonitorWorkTimefunkcija održava Longvrijednost za svaki ključ, tj. Za svaki licenseId. shiftStartDržava pohranjuje polazeći od vremena vozača smjeni. Ručka stanja inicijalizira se u open()metodi koja se poziva jednom prije obrade prvog događaja.

Pogledajmo sada processElement()metodu.

@Preuzmi

postupak javne praznineElement (

    Vožnja TaxiRideom,

    Ctx konteksta,

    Kolektor out) baca iznimku {

  // potražite vrijeme početka zadnje smjene

  Dugi startTs = shiftStart.value ();

  if (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // ovo je prva vožnja u novoj smjeni.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      "Dozvoljeno je prihvatiti nove putnike do" + formatter.print (endTs)));

    // registriraj tajmer za čišćenje stanja za 24 sata

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // ova je vožnja započela nakon završetka dozvoljenog radnog vremena.

    // to je kršenje propisa!

    out.collect (Tuple2.of (ride.licenseId,

      "Ova vožnja kršila je propise o radnom vremenu."));

  }

}

processElement()Metoda se zove za svaki TaxiRidedogađaj. Prvo, metoda dohvaća vrijeme početka vozačeve smjene iz ručice stanja. Ako stanje ne sadrži vrijeme početka ( startTs == null) ili ako je zadnja smjena započela više od 20 sati ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) ranije od trenutne vožnje, trenutna vožnja je prva vožnja nove smjene. U oba slučaja, funkcija započinje novu smjenu ažuriranjem vremena početka smjene na vrijeme početka trenutne vožnje, emitira poruku vozaču s vremenom završetka nove smjene i registrira tajmer za čišćenje stanje za 24 sata.

Ako trenutna vožnja nije prva vožnja u novoj smjeni, funkcija provjerava krši li regulaciju radnog vremena, odnosno je li započela više od 12 sati od početka trenutne smjene vozača. Ako je to slučaj, funkcija odašilje poruku da vozača obavijesti o kršenju.

processElement()Metoda MonitorWorkTimefunkcije bilježi timer za počistiti stanje 24 sata nakon početka smjene. Uklanjanje stanja koje više nije potrebno važno je kako bi se spriječilo povećanje veličine države zbog stanja curenja. Tajmer se aktivira kada vrijeme aplikacije pređe vremensku oznaku tajmera. U tom se trenutku naziva onTimer()metoda. Slično stanju, odbrojivači se održavaju po ključu i funkcija se stavlja u kontekst pridruženog ključa prije nego što se onTimer()metoda pozove. Stoga je sav državni pristup usmjeren na ključ koji je bio aktivan kad je tajmer registriran.

Pogledajmo onTimer()metodu MonitorWorkTime.

@Preuzmi

javna praznina onTimer (

    long timeri,

    OnTimerContext ctx,

    Kolektor out) baca iznimku {

  // uklanjanje stanja pomaka ako već nije započet novi pomak.

  Dugi startTs = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

The processElement()metoda registrira brojila za 24 sata nakon smjene počeo očistiti državu koja više nije potrebna. Čišćenje stanja jedina je logika koju onTimer()metoda provodi. Kad se tajmer aktivira, provjeravamo je li vozač u međuvremenu započeo novu smjenu, tj. Je li se vrijeme početka smjene promijenilo. Ako to nije slučaj, vozaču očistimo stanje smjene.