Consumo di dati API in Kafka

Una guida passo passo per importare contenuti da un Richiesta HTTP o risposta dellAPI REST in Kafka

(6 dicembre 2020)

Foto di EJ Strat su Unsplash

Nellodierno clima open source, è diventato un luogo comune rendere i dati prontamente disponibili tramite le API. Aziende come Spotify e Twitter espongono alcuni dei loro dati disponibili per lanalisi tramite le REST API . Ciò rende sempre più popolare il reperimento dei dati tramite una richiesta HTTP o unAPI REST. Per rendere disponibili questi dati in unefficiente piattaforma di analisi dei dati, gli utenti dovranno creare una sorta di pipeline per linstradamento dei dati dalla sua fonte alla piattaforma di analisi desiderata. Apache Kafka è un modo resiliente ed efficiente per acquisire questi dati.

Apache Kafka è una piattaforma di streaming di eventi distribuita open source utilizzata per -performance data pipeline, streaming analytics, data integration e mission-critical applications.

Questo notebook è una procedura dettagliata per la pubblicazione dei contenuti della risposta da una richiesta HTTP o API REST a Kafka. Qui supporremo che il servizio Kafka sia stato avviato in un container Docker. Docker è una piattaforma per lo sviluppo, la spedizione e lesecuzione di applicazioni. Puoi leggere ulteriori informazioni su docker qui .

Passaggio 1: Leggi il contenuto della risposta API REST / risposta HTTP in un file JSON utilizzando il seguente comando.

Curl è un comando per ottenere o inviare dati utilizzando la sintassi URL, utilizzando qualsiasi dei protocolli supportati. Alcuni dei protocolli supportati sono HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP o FILE.

Aggiungiamo le opzioni:

  • -L (valido per HTTP e HTTPS) per poter far ripetere a curl la richiesta nella nuova posizione se il server segnala che la pagina richiesta è stata spostata in una posizione diversa (indicata con unintestazione Posizione: e una risposta 3XX codice). Quando viene utilizzata lautenticazione, curl invia solo le proprie credenziali allhost iniziale. Se un reindirizzamento porta curl a un host diverso, non sarà in grado di intercettare lutente e la password. Puoi limitare il numero di reindirizzamenti da seguire utilizzando lopzione – max-redirs.
  • -o valutazione-tentativi-annidati.json per scrivere loutput su questo file invece che su stdout
  • Quindi abbiamo fornito https://goo.gl/ME6hjp , lURL da cui vogliamo ricevere i dati.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Il risultato sarebbe:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Passaggio 2 : Usa jq per modificare il modo in cui leggiamo nel file JSON che abbiamo popolato con loutput di curl nel passaggio 1 e testalo con cat.

Il gatto Il comando concatena i file e li stampa sullo standard output.

Forniamo i nomi dei file che vogliamo concatenare, o lo standard input, allo standard output. Senza FILE, o quando FILE è -, legge lo standard input.

Il | (pipe) consente allo standard output del comando 1 (il comando prima di |) di essere lo standard input per il comando 2 (il comando dopo |). Quindi il nostro risultato concatenato funge da input per jq “.”

jq è un processore JSON a riga di comando leggero e flessibile. Ti consente di suddividere, filtrare, mappare e trasformare i dati strutturati con facilità. “. []” Srotola larray ed estrae lindice nellarray come una riga / riga e -c preserva il colore dalla formattazione jq. Quindi jq . [] -C ci permette di separare ogni indice nellarray del JSON in una nuova riga e preservare il colore della formattazione fornita da jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Passaggio 3: Per vedere quante righe (quale sarà il numero di messaggi che pubblichiamo su Kafka ) risultato dal nostro comando nel passaggio 2.

Laggiunta di | wc -l ci consente di prendere il nostro output standard dal comando nel passaggio 2 che è le linee jq formattate ed estratte dallarray JSON al comando successivo come input. Il comando successivo è wc -l. wc stampa il conteggio delle nuove righe perché -l fornito come opzione che specifica la nuova riga.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Il risultato dovrebbe essere il numero di righe, ad esempio:

3280

Passaggio 4: Qui prendiamo quello che abbiamo nel passaggio 2 e pubblicalo nellargomento Kafka “tentativi di valutazione”. Qui useremo docker-compose assumendo che il servizio Kafka venga avviato utilizzando Docker.

docker-compose exec esegue un comando nel contenitore il cui nome è fornito, qui “container1”.

Il comando che eseguiamo è bash -c “cat valutazione-tentativi-annidati.json | jq .[] ’-C | kafkacat -P -b kafka: 29092 -t valutazione-tentativi & & echo “Ha prodotto 3280 messaggi.” “

  • bash è per lanciare una shell nel contenitore
  • -c è unopzione per poter leggere i comandi dalla seguente stringa
  • La stringa che segue prima concatena il contenuto del file assessment -tempt-nested.json nello standard output.
  • Quindi passa loutput standard da quello come input standard al comando successivo: jq . [] -c which ottiene tutto il contenuto delloutput (formattato come JSON) ed estrae ogni indice dellarray in una nuova riga.
  • Lo standard output di questo viene quindi passato come standard input al comando successivo: kafkacat – P -b kafka: 29092 -t valutazione-tentativi & & echo “Ha prodotto 3280 messaggi.” “
  • kafkacat -P avvia lutilità in modalità produttore. In questo, kafkacat legge i messaggi dallo standard input (stdin).
  • -b kafka: 29092 viene utilizzato per specificare il broker Kafka, il cui nome è solo Kafka con lhost, entrambi configurati in il docker-compose.yml
  • -t assessment -tempt viene utilizzato per specificare il nome dellargomento che vogliamo pubblicare
  • & & viene utilizzato per elencare un comando che vogliamo eseguire dopo quello prima che abbia completato con successo lesecuzione
  • echo “Ha prodotto 3280 messaggi.” è un messaggio desidera visualizzare se il precedente comando di pubblicazione su Kafka è stato eseguito con successo. Conoscevamo 3280 dal passaggio 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Il risultato dovrebbe essere il seguente:

Produced 3280 messages.

Riferimenti

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/