Spotřeba dat API do Kafky

Podrobný průvodce po požití obsahu z HTTP požadavek nebo odpověď REST API do Kafky

(6. prosince 2020)

Foto od EJ Strat na Unsplash

V dnešním prostředí s otevřeným zdrojovým kódem se stalo běžným zvykem snadno zpřístupnit data prostřednictvím API. Společnosti jako Spotify a Twitter zveřejňují některá svá data dostupná k analýze prostřednictvím REST API s. Díky tomu je získávání dat prostřednictvím požadavku HTTP nebo REST API stále oblíbenější. Aby bylo možné tato data zpřístupnit na efektivní platformě pro analýzu dat, uživatelé budou muset vytvořit nějaký druh kanálu pro směrování dat ze svého zdroje na požadovanou platformu pro analýzu. Apache Kafka je odolný a efektivní způsob získávání těchto dat.

Apache Kafka je open-source distribuovaná platforma pro streamování událostí používaná pro vysoké -výkonové datové kanály, analytika streamování, integrace dat a kritické aplikace.

Tento notebook je návodem k publikování obsahu odpovědi z požadavku HTTP nebo REST API na Kafku. Zde budeme předpokládat, že služba Kafka byla roztočena v kontejneru ukotvitelného prostoru. Docker je platforma pro vývoj, přepravu a spouštění aplikací. Více o ukotvitelném panelu si můžete přečíst zde .

Krok 1: Načtěte obsah odpovědi REST API / odpovědi HTTP do souboru JSON pomocí následujícího příkazu.

Curl je příkaz pro získávání nebo odesílání dat pomocí syntaxe URL pomocí libovolného podporovaných protokolů. Některé z podporovaných protokolů jsou HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP nebo FILE.

Přidáváme možnosti:

  • -L (platí pro HTTP a HTTPS), aby bylo možné provést zvlnění předělat požadavek na nové místo, pokud server hlásí, že se požadovaná stránka přesunula na jiné místo (označeno záhlaví Location: a odpovědí 3XX kód). Když se použije autentizace, curl odešle svá pověření pouze počátečnímu hostiteli. Pokud přesměrování převezme zvlnění na jiného hostitele, nebude moci zachytit heslo uživatele +. Počet přesměrování, která je třeba sledovat, můžete omezit pomocí možnosti – max-redirs.
  • -o assessment-pokusy-nested.json k zápisu výstupu do tohoto souboru namísto standardního výstupu
  • Poté jsme zadali https://goo.gl/ME6hjp , adresu URL, ze které chceme přijímat data.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Výsledek bude vypadat takto:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Krok 2 : Pomocí jq upravte způsob čtení v souboru JSON, který jsme naplnili výstupem zvlnění v kroku 1, a otestujte to pomocí kočky.

Kočka příkaz zřetězuje soubory a tiskne na standardní výstup.

Poskytujeme názvy souborů, které chceme zřetězit, nebo standardní vstup, na standardní výstup. Bez SOUBORU, nebo pokud je SOUBOR -, přečte standardní vstup.

| (pipe) umožňuje, aby standardní výstup příkazu 1 (příkaz před |) byl standardním vstupem pro příkaz 2 (příkaz za |). Náš zřetězený výsledek tedy funguje jako vstup do jq ‘.’

jq je lehký a flexibilní procesor JSON příkazového řádku. Umožní vám to snadno rozdělit, filtrovat a mapovat a transformovat strukturovaná data. „. []“ Rozbalí pole a vytáhne index v poli jako řádek / řádek a znak -c zachová barvu před formátováním jq. Takže jq . [] -C nám umožňuje oddělit každý index v poli JSON do nového řádku a zachovat barvu formátování poskytovaného jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Krok 3: Chcete-li zjistit, kolik řádků (což bude počet zpráv, které zveřejníme na Kafce) ) výsledek z našeho příkazu v kroku 2.

Přidání | wc -l nám umožňuje převzít náš standardní výstup z příkazu v kroku 2, kterým jsou jq formátované a extrahované řádky z pole JSON do dalšího příkazu jako vstup. Další příkaz je wc -l. wc vytiskne počet nových řádků, protože -l poskytované jako volba, která určuje nový řádek.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Výsledkem by měl být počet řádků, například:

3280

Krok 4: Zde si vezmeme, co máme v kroku 2 a zveřejnit to do tématu Kafka „pokusy o posouzení“. Zde použijeme docker-compose za předpokladu, že je služba Kafka spuštěna pomocí Docker.

docker-compose exec spustí příkaz v kontejneru, jehož název je uveden, zde container1.

Příkaz, který spustíme, je bash -c „cat assessment-pokusy-nested.json | jq .[] “-C | kafkacat -P -b kafka: 29092 -t pokusů o posouzení & & echo Produkoval 3280 zpráv. ”

  • bash je spuštění shellu v kontejneru
  • -c je možnost, aby bylo možné číst příkazy z následujícího řetězce
  • Řetězec následující nejprve zřetězí obsah souboru assessment-pokusy-nested.json na standardní výstup.
  • Poté z toho předá standardní výstup jako standardní vstup do dalšího příkazu: jq . [] -c který získá veškerý obsah výstupu (formátovaný jako JSON) a extrahuje každý index pole do nového řádku.
  • Jeho standardní výstup je poté předán jako standardní vstup dalšímu příkazu: kafkacat – P -b kafka: 29092 -t pokusů o posouzení & & echo Produkoval 3280 zpráv. ”
  • kafkacat -P spouští nástroj v režimu producenta. V tomto kafkacat čte zprávy ze standardního vstupu (stdin).
  • -b kafka: 29092 se používá k určení makléře Kafka, jehož název je pouze Kafka s hostitelem – oba jsou nakonfigurovány v docker-compose.yml
  • -t assessment-pokusy se používá k určení názvu tématu, na které chceme publikovat
  • & & se používá k vypsání příkazu, který chceme provést poté, co byl úspěšně dokončen
  • echo Produced 3280 messages. is a message we chcete zobrazit, pokud byl předchozí příkaz publikování na Kafce úspěšně proveden. 3280 jsme znali z kroku 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Výsledek by měl vypadat takto:

Produced 3280 messages.

Reference

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/