Stvoreno za realno vrijeme: Razmjena velikih podataka s Apacheom Kafkom, 1. dio

Kada je krenulo kretanje velikih podataka, uglavnom je bilo usmjereno na serijsku obradu. Alati za distribuiranu pohranu podataka i upite poput MapReduce, Hive i Pig dizajnirani su za obradu podataka u skupinama, a ne kontinuirano. Tvrtke bi izvodile više poslova svake noći kako bi izvukle podatke iz baze podataka, zatim analizirale, transformirale i na kraju pohranile podatke. U novije vrijeme poduzeća su otkrila moć analize i obrade podataka i događaja kako se događaju , a ne samo jednom u nekoliko sati. Međutim, većina tradicionalnih sustava za razmjenu poruka ne može se prilagoditi velikom broju podataka u stvarnom vremenu. Tako su inženjeri u LinkedInu izgradili Apache Kafka otvorenog koda: okvir distribuiranih poruka koji udovoljava zahtjevima velikih podataka skaliranjem na robnom hardveru.

Tijekom posljednjih nekoliko godina pojavio se Apache Kafka koji je rješavao razne slučajeve upotrebe. U najjednostavnijem slučaju to bi mogao biti jednostavni međuspremnik za pohranu dnevnika aplikacija. U kombinaciji s tehnologijom poput Spark Streaming-a, može se koristiti za praćenje promjena podataka i poduzimanje radnji na tim podacima prije spremanja na konačno odredište. Kafkin način predviđanja čini ga snažnim alatom za otkrivanje prijevara, kao što je provjera valjanosti transakcije kreditnom karticom kada se dogodi, a ne čekanje serijske obrade satima kasnije.

Ovaj dvodijelni vodič predstavlja Kafku, počevši od toga kako ga instalirati i pokrenuti u vašem razvojnom okruženju. Dobit ćete pregled Kafkine arhitekture, nakon čega slijedi uvod u razvijanje gotovog Apache Kafka sustava za razmjenu poruka. Napokon, izradit ćete prilagođenu aplikaciju proizvođača / potrošača koja šalje i troši poruke putem Kafka poslužitelja. U drugoj polovici ovog vodiča naučit ćete kako dijeliti i grupirati poruke te kako kontrolirati koje će poruke konzumirati potrošači Kafke.

Što je Apache Kafka?

Apache Kafka je sustav za razmjenu poruka izgrađen u mjeri za velike podatke. Slično Apache ActiveMQ ili RabbitMq, Kafka omogućuje aplikacijama izgrađenim na različitim platformama komunikaciju putem asinkronog prosljeđivanja poruka. Ali Kafka se razlikuje od ovih tradicionalnijih sustava za razmjenu poruka na ključne načine:

  • Dizajniran je za horizontalno skaliranje, dodavanjem dodatnih robnih poslužitelja.
  • Pruža mnogo veću propusnost i za proizvođačke i za potrošačke procese.
  • Može se koristiti za podršku serijskim i stvarnim slučajevima.
  • Ne podržava JMS, Java-ov API orijentiran na poruke međuopreme.

Arhitektura Apachea Kafke

Prije nego što istražimo Kafkinu arhitekturu, trebali biste znati njezinu osnovnu terminologiju:

  • Proizvođač je proces koji se može objaviti poruku na temu.
  • potrošača je proces koji se može pretplatiti na jednoj ili više tema i konzumirati poruka objavljenih na teme.
  • Kategorija tema je naziv hrane na koju su objavljene poruke.
  • Posrednik je proces pokrenut na jednom stroju.
  • Klaster je skupina posrednika rade zajedno.

Arhitektura Apache Kafke vrlo je jednostavna, što može rezultirati boljim performansama i propusnošću u nekim sustavima. Svaka je tema u Kafki poput jednostavne datoteke dnevnika. Kad proizvođač objavi poruku, Kafka poslužitelj dodaje je na kraj datoteke dnevnika za zadanu temu. Poslužitelj također dodjeljuje pomak , što je broj koji se koristi za trajnu identifikaciju svake poruke. Kako raste broj poruka, vrijednost svakog odstupanja raste; na primjer, ako producent objavi tri poruke, prva će dobiti odmak od 1, druga odmak od 2, a treća odmak od 3.

Kada se Kafka potrošač prvi put pokrene, poslužitelju će poslati zahtjev za povlačenjem tražeći da dohvati bilo koju poruku za određenu temu s pomakom većim od 0. Poslužitelj će provjeriti datoteku dnevnika za tu temu i vratiti tri nove poruke . Potrošač će obraditi poruke, zatim poslati zahtjev za poruke s pomakom većim od 3 i tako dalje.

U Kafki je klijent odgovoran za pamćenje broja odstupanja i dohvaćanje poruka. Poslužitelj Kafka ne prati niti upravlja potrošnjom poruka. Prema zadanim postavkama, Kafka poslužitelj čuvat će poruku sedam dana. Pozadinska nit na poslužitelju provjerava i briše poruke starije od sedam dana. Potrošač može pristupiti porukama sve dok su na poslužitelju. Može pročitati poruku više puta, pa čak i čitati poruke obrnutim redoslijedom primanja. Ali ako potrošač ne uspije dohvatiti poruku prije isteka sedam dana, propustit će je.

Mjerila Kafke

Proizvodna upotreba LinkedIna i drugih poduzeća pokazala je da je s pravilnom konfiguracijom Apache Kafka sposoban obrađivati ​​stotine gigabajta podataka dnevno. U 2011. godini tri inženjera LinkedIn-a koristila su benchmark testiranje kako bi pokazala kako Kafka može postići puno veću propusnost od ActiveMQ-a i RabbitMQ-a.

Apache Kafka brzo postavljanje i demonstracija

U ovom ćemo uputstvu izraditi prilagođenu aplikaciju, ali krenimo s instaliranjem i testiranjem Kafkine instance s izvanrednim proizvođačem i potrošačem.

  1. Posjetite stranicu za preuzimanje Kafke da biste instalirali najnoviju verziju (0,9 od ovog pisanja).
  2. Izdvojite binarne datoteke u software/kafkamapu. Za trenutnu verziju je software/kafka_2.11-0.9.0.0.
  3. Promijenite svoj trenutni direktorij tako da usmjerava na novu mapu.
  4. Pokrenite Zookeeper poslužitelj izvršavajući naredbu: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Pokrenite Kafka poslužitelj izvršenja: bin/kafka-server-start.sh config/server.properties.
  6. Napravite test temu koji možete koristiti za testiranje: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Početak jednostavan konzole potrošača koji se mogu konzumirati poruke objavljene na zadanu temu, kao što su javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Pokrenite jednostavan producent konzolu koja se može objaviti poruke na test temi: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Pokušajte utipkati jednu ili dvije poruke u konzolu proizvođača. Vaše bi se poruke trebale prikazivati ​​na potrošačkoj konzoli.

Primjer primjene s Apacheom Kafkom

Vidjeli ste kako Apache Kafka djeluje izvan okvira. Dalje, razvijemo prilagođenu aplikaciju proizvođača / potrošača. Proizvođač će dohvatiti korisničke podatke s konzole i poslati svaki novi redak kao poruku na Kafka poslužitelj. Potrošač će dohvatiti poruke za određenu temu i ispisati ih na konzoli. Komponente proizvođača i potrošača u ovom su slučaju vaše vlastite implementacije kafka-console-producer.shi kafka-console-consumer.sh.

Krenimo s izradom Producer.javaklase. Ova klasa klijenta sadrži logiku za čitanje korisničkog unosa s konzole i slanje tog unosa kao poruke na Kafka poslužitelj.

Proizvođača konfiguriramo stvaranjem objekta iz java.util.Propertiesklase i postavljanjem njegovih svojstava. Klasa ProducerConfig definira sva različita dostupna svojstva, ali zadane vrijednosti Kafke dovoljne su za većinu upotreba. Za zadanu konfiguraciju trebamo postaviti samo tri obavezna svojstva:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

U slučaju primjera aplikacije, znamo da proizvođač koristi ByteArraySerializerza ključ i StringSerializerza vrijednost. Na klijentskoj strani stoga moramo koristiti org.apache.kafka.common.serialization.ByteArrayDeserializerza ključ i org.apache.kafka.common.serialization.StringDeserializerza vrijednost. Postavljanje tih klasa kao vrijednosti za KEY_DESERIALIZER_CLASS_CONFIGi VALUE_DESERIALIZER_CLASS_CONFIGomogućit će potrošaču deserializaciju byte[]kodiranih tipova koje je poslao proizvođač.

Konačno, moramo postaviti vrijednost GROUP_ID_CONFIG. Ovo bi trebalo biti ime grupe u formatu niza. Objasnit ću vam više o ovoj konfiguraciji za minutu. Za sada pogledajte samo potrošača Kafke sa postavljena četiri obavezna svojstva: