Consumarea datelor API în Kafka

Un ghid pas cu pas pentru a ingera conținut dintr-un Solicitare HTTP sau răspuns REST API în Kafka

(6 decembrie 2020)

Fotografie de EJ Strat pe Unsplash

În climatul open-source de astăzi, a devenit obișnuit ca datele să fie ușor disponibile prin API-uri. Companii precum Spotify și Twitter expun unele dintre datele disponibile pentru analiză prin API REST . Acest lucru face din ce în ce mai populară aprovizionarea datelor printr-o cerere HTTP sau API REST. Pentru a face aceste date disponibile într-o platformă eficientă de analiză a datelor, utilizatorii vor trebui să construiască un fel de conductă pentru rutarea datelor de la sursa sa către platforma de analiză dorită. Apache Kafka este un mod rezistent și eficient de a furniza aceste date.

Apache Kafka este o platformă de streaming de evenimente distribuite open-source folosită pentru -conducte de date de performanță, analize de streaming, integrare de date și aplicații critice pentru misiune.

Acest notebook este o prezentare generală în publicarea conținutului răspunsului de la o cerere HTTP sau API REST către Kafka. Aici vom presupune că serviciul Kafka a fost turnat într-un container de andocare. Docker este o platformă pentru dezvoltarea, livrarea și rularea aplicațiilor. Puteți citi mai multe despre docker aici .

Pasul 1: Citiți conținutul răspunsului REST API / răspuns HTTP într-un fișier JSON utilizând următoarea comandă.

Curl este o comandă pentru obținerea sau trimiterea de date folosind sintaxa URL, utilizând orice a protocoalelor acceptate. Unele dintre protocoalele acceptate sunt HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP sau FILE.

Adăugăm opțiunile:

  • -L (valabil pentru HTTP și HTTPS) pentru a putea face curl reface cererea pe noul loc dacă serverul raportează că pagina solicitată s-a mutat într-o altă locație (indicată cu un Locație: antet și un răspuns 3XX cod). Când se folosește autentificarea, curl își trimite acreditările doar către gazda inițială. Dacă o redirecționare duce curlul la o altă gazdă, nu va putea intercepta utilizatorul + parola. Puteți limita numărul de redirecționări de urmat utilizând opțiunea – max-redirs.
  • -o Assessment-încercări-nested.json pentru a scrie ieșirea în acest fișier în loc de stdout
  • Apoi am furnizat https://goo.gl/ME6hjp , adresa URL de la care dorim să primim date.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Rezultatul ar arăta ca:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Pasul 2 : Folosiți jq pentru a modifica modul în care citim în fișierul JSON pe care l-am populat cu ieșirea curlului la pasul 1 și testați-l cu cat.

Cat comanda concatenează fișiere și tipări pe ieșirea standard.

Furnizăm numele (numele) fișierelor pe care dorim să le concatenăm sau intrarea standard la ieșirea standard. Fără FILE sau când FILE este -, citește intrarea standard.

| (pipe) permite ieșirea standard a comenzii 1 (comanda dinaintea |) să fie intrarea standard pentru comanda 2 (comanda după |). Deci rezultatul nostru concatenat acționează ca o intrare în jq ‘.’

jq este un procesor JSON ușor și flexibil din linia de comandă. Vă permite să tăiați și să filtrați și să mapați și să transformați datele structurate cu ușurință. ‘. []’ Derulează matricea și scoate indexul din matrice sub formă de linie / rând, iar -c păstrează culoarea din formatarea jq. Deci jq . [] -C ne permite să separăm fiecare index din matricea JSON într-o nouă linie și să păstrăm culoarea formatării oferite de jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Pasul 3: Pentru a vedea câte linii (care vor fi câte mesaje publicăm către Kafka ) rezultat din comanda noastră din pasul 2.

Adăugare | wc -l ne permite să luăm ieșirea noastră standard din comanda din Pasul 2, care este linia formatată și extrasă jq din matricea JSON la următoarea comandă ca intrare. Următoarea comandă este wc -l. wc imprimă numărul de linie nouă, deoarece -l furnizat ca opțiune care specifică linia nouă.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Rezultatul ar trebui să fie numărul de linii, de exemplu:

3280

Pasul 4: Aici luăm ceea ce avem la Pasul 2 și publicați-l în subiectul Kafka „încercări de evaluare”. Aici vom folosi docker-compose presupunând că serviciul Kafka este lansat folosind Docker.

docker-compose exec execută o comandă în containerul al cărui nume este furnizat, aici „container1”.

Comanda pe care o executăm este bash -c „cat Assessment-încercări-imbricate.json | jq ‘.[] ’-C | kafkacat -P -b kafka: 29092 -t Assessment-încercări & & echo Mesaje produse 3280. ”

  • bash este de a lansa un shell în container
  • -c este o opțiune pentru a putea citi comenzile din următorul șir
  • Șirul următor mai întâi concatenează conținutul fișierului Assessment-Tentatives-nested.json în ieșire standard.
  • Apoi trece ieșirea standard din aceea ca intrare standard în următoarea comandă: jq . [] -c care obține tot conținutul ieșirii (formatat ca JSON) și extrage fiecare index al matricei într-o nouă linie.
  • Ieșirea standard a acesteia este apoi transmisă ca intrare standard la următoarea comandă: kafkacat – P -b kafka: 29092 -t Assessment-încercări & & echo Mesaje produse 3280. ”
  • kafkacat -P lansează utilitarul în modul producător. În acest sens, kafkacat citește mesajele din intrarea standard (stdin).
  • -b kafka: 29092 este utilizat pentru a specifica brokerul Kafka, al cărui nume este doar Kafka cu gazda – ambele fiind configurate în docker-compose.yml
  • -t Assessment-încercări este folosit pentru a specifica numele subiectului pe care dorim să-l publicăm
  • & & este utilizat pentru a afișa o comandă pe care dorim să o executăm după cea înainte de a finaliza cu succes execuția
  • echo „Mesaje produse 3280” este un mesaj pe care doresc să se afișeze dacă comanda anterioară de publicare în Kafka a fost executată cu succes. Știam 3280 de la Pasul 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Rezultatul ar trebui să arate după cum urmează:

Produced 3280 messages.

Referințe

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/