API-Daten in Kafka verarbeiten

Eine Schritt-für-Schritt-Anleitung zum Aufnehmen von Inhalten aus einem HTTP-Anforderung oder REST-API-Antwort in Kafka

(6. Dezember 2020)

Foto von EJ Strat auf Unsplash

Im heutigen Open-Source-Klima ist es üblich geworden, Daten über APIs leicht verfügbar zu machen. Unternehmen wie Spotify und Twitter stellen einige ihrer Daten zur Analyse über die REST-APIs zur Verfügung. Dies macht die Beschaffung von Daten über eine HTTP-Anfrage oder eine REST-API immer beliebter. Um diese Daten auf einer effizienten Datenanalyseplattform verfügbar zu machen, müssen Benutzer eine Art Pipeline erstellen, um Daten von ihrer Quelle zur gewünschten Analyseplattform weiterzuleiten. Apache Kafka ist eine zuverlässige und effiziente Möglichkeit, diese Daten zu beschaffen.

Apache Kafka ist eine Open-Source-Plattform für verteiltes Ereignis-Streaming, die für hohe Daten verwendet wird – Leistungsdaten-Pipelines, Streaming-Analysen, Datenintegration und geschäftskritische Anwendungen.

Dieses Notizbuch ist eine exemplarische Vorgehensweise zum Veröffentlichen des Inhalts der Antwort von einer HTTP-Anforderung oder REST-API an Kafka. Hier nehmen wir an, dass der Kafka-Dienst in einem Docker-Container hochgefahren wurde. Docker ist eine Plattform zum Entwickeln, Versenden und Ausführen von Anwendungen. Weitere Informationen zu Docker finden Sie hier .

Schritt 1: Liest den Inhalt der REST-API-Antwort / HTTP-Antwort mit dem folgenden Befehl in eine JSON-Datei.

Curl ist ein Befehl zum Abrufen oder Senden von Daten mithilfe der URL-Syntax unter Verwendung eines beliebigen Befehls der unterstützten Protokolle. Einige der unterstützten Protokolle sind HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP oder DATEI.

Wir fügen die folgenden Optionen hinzu:

  • -L (gültig für HTTP und HTTPS), damit Curl die Anforderung an der neuen Stelle wiederholen kann, wenn der Server meldet, dass die angeforderte Seite an einen anderen Ort verschoben wurde (angezeigt durch einen Ort: Header und eine 3XX-Antwort Code). Wenn die Authentifizierung verwendet wird, sendet curl seine Anmeldeinformationen nur an den ursprünglichen Host. Wenn eine Umleitung Curl auf einen anderen Host ausführt, kann sie den Benutzer + das Kennwort nicht abfangen. Sie können die Anzahl der folgenden Weiterleitungen begrenzen, indem Sie die Option – max-redirs verwenden.
  • -o Assessment-Versuche-nested.json, um die Ausgabe in diese Datei anstelle von stdout
  • zu schreiben Dann haben wir https://goo.gl/ME6hjp angegeben, die URL, von der wir Daten empfangen möchten.

curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Das Ergebnis würde folgendermaßen aussehen:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Schritt 2 : Verwenden Sie jq, um die Art und Weise zu ändern, in der wir die JSON-Datei gelesen haben, die wir in Schritt 1 mit der Ausgabe von curl gefüllt haben, und testen Sie dies mit cat.

Die cat Der Befehl verkettet Dateien und druckt auf der Standardausgabe.

Wir geben die Dateinamen an, die wir mit der Standardausgabe verketten möchten. Ohne DATEI oder wenn DATEI – ist, wird die Standardeingabe gelesen.

Das | (Pipe) ermöglicht, dass die Standardausgabe von Befehl 1 (der Befehl vor dem |) die Standardeingabe für Befehl 2 (der Befehl nach dem |) ist. Unser verkettetes Ergebnis fungiert also als Eingabe für jq ‘.’

jq ist ein leichter und flexibler Befehlszeilen-JSON-Prozessor. Sie können strukturierte Daten mühelos in Scheiben schneiden, filtern, zuordnen und transformieren. . [] Entrollt das Array und zieht den Index im Array als Zeile / Zeile heraus. Das -c bewahrt die Farbe vor der jq-Formatierung. Mit jq . [] -C können wir also jeden Index im Array des JSON in eine neue Zeile aufteilen und die Farbe der von jq bereitgestellten Formatierung beibehalten.

cat assessment-attempts-nested.json | jq ".[]" -c

Schritt 3: Um zu sehen, wie viele Zeilen (wie viele Nachrichten wir an Kafka veröffentlichen) ) resultieren aus unserem Befehl in Schritt 2.

Hinzufügen von | Mit wc -l können wir unsere Standardausgabe aus dem Befehl in Schritt 2 übernehmen, bei dem es sich um die jq-formatierten und extrahierten Zeilen vom Array JSON zum nächsten Befehl als Eingabe handelt. Der nächste Befehl ist wc -l. wc gibt die Anzahl der Zeilenumbrüche aus, da -l als Option zur Angabe der Zeilenumbrüche angegeben wird.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Das Ergebnis sollte die Anzahl der Zeilen sein, zum Beispiel:

3280

Schritt 4: Hier nehmen wir, was wir haben in Schritt 2 und veröffentlichen Sie dies im Kafka-Thema Bewertungsversuche. Hier verwenden wir Docker-Compose unter der Annahme, dass der Kafka-Dienst mit Docker gestartet wird.

Docker-Compose Exec führt einen Befehl in dem Container aus, dessen Name angegeben ist, hier container1.

Der Befehl, den wir ausführen, lautet bash -c “cat Assessment-Versuche-verschachtelt.json | jq ‘.[] ’-C | kafkacat -P -b kafka: 29092 -t Bewertungsversuche & & echo 3280 Nachrichten erzeugt. „

  • bash ist das Starten einer Shell im Container.
  • -c ist eine Option, um Befehle aus der folgenden Zeichenfolge lesen zu können.
  • Die folgende Zeichenfolge verkettet zuerst den Inhalt der Datei Assessment-Versuche-verschachtelt.json in Standardausgabe.
  • Anschließend wird die Standardausgabe von dieser als Standardeingabe an den nächsten Befehl übergeben: jq . [] -c welche Ruft den gesamten Inhalt der Ausgabe ab (formatiert wie JSON) und extrahiert jeden Index des Arrays in eine neue Zeile.
  • Die Standardausgabe davon wird dann als Standardeingabe an den nächsten Befehl übergeben: kafkacat – P -b kafka: 29092 -t Bewertungsversuche & & echo 3280 Nachrichten erzeugt. ”
  • kafkacat -P startet das Dienstprogramm im Produzentenmodus. In diesem Fall liest kafkacat Nachrichten von der Standardeingabe (stdin).
  • -b kafka: 29092 wird verwendet, um den Kafka-Broker anzugeben, dessen Name nur Kafka mit dem Host ist – beide sind in konfiguriert Mit den Bewertungsversuchen docker-compose.yml
  • -t wird der Themenname angegeben, den wir in
  • & veröffentlichen möchten & wird verwendet, um einen Befehl aufzulisten, den wir nach dem Befehl ausführen möchten, bevor die Ausführung erfolgreich abgeschlossen wurde.
  • echo Produzierte 3280 Nachrichten. ist eine Nachricht, die wir ausführen möchten anzeigen, ob der vorherige Befehl zum Veröffentlichen in Kafka erfolgreich ausgeführt wurde. Wir kannten 3280 aus Schritt 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Das Ergebnis sollte wie folgt aussehen:

Produced 3280 messages.

Referenzen

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/