Consommation de données API dans Kafka

Un guide étape par étape pour ingérer le contenu dun Requête HTTP ou réponse de lAPI REST dans Kafka

(6 décembre 2020)

Photo de EJ Strat sur Unsplash

Dans le climat open source daujourdhui, il est devenu courant de rendre les données facilement disponibles via les API. Des entreprises comme Spotify et Twitter exposent certaines de leurs données disponibles pour analyse via les API REST . Cela rend lapprovisionnement de données via une requête HTTP ou une API REST de plus en plus populaire. Afin de rendre ces données disponibles dans une plate-forme danalyse de données efficace, les utilisateurs devront créer une sorte de pipeline pour acheminer les données de leur source vers la plate-forme danalyse souhaitée. Apache Kafka est un moyen résilient et efficace de générer ces données.

Apache Kafka est une plateforme de diffusion dévénements distribuée open source utilisée pour – pipelines de données de performance, analyse en continu, intégration de données et applications critiques.

Ce bloc-notes est une procédure pas à pas pour publier le contenu de la réponse dune requête HTTP ou dune API REST vers Kafka. Ici, nous supposerons que le service Kafka a été créé dans un conteneur de docker. Docker est une plate-forme pour le développement, lexpédition et lexécution dapplications. Vous pouvez en savoir plus sur docker ici .

Étape 1: Lisez le contenu de la réponse de lAPI REST / de la réponse HTTP dans un fichier JSON à laide de la commande suivante.

Curl est une commande permettant dobtenir ou denvoyer des données à laide de la syntaxe URL, en utilisant nimporte quel des protocoles pris en charge. Certains des protocoles pris en charge sont HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP ou FILE.

Nous ajoutons les options:

  • -L (valable pour HTTP et HTTPS) pour pouvoir faire curl refaire la requête sur le nouveau lieu si le serveur signale que la page demandée a été déplacée vers un autre emplacement (indiqué par un en-tête Location: et une réponse 3XX code). Lorsque lauthentification est utilisée, curl envoie uniquement ses informations didentification à lhôte initial. Si une redirection dirige curl vers un autre hôte, elle ne pourra pas intercepter lutilisateur + le mot de passe. Vous pouvez limiter le nombre de redirections à suivre en utilisant loption – max-redirs.
  • -o assessment-tests-nested.json pour écrire la sortie dans ce fichier au lieu de stdout
  • Ensuite, nous avons fourni https://goo.gl/ME6hjp , lURL dont nous voulons recevoir les données.

curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Le résultat ressemblerait à:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Étape 2 : Utilisez jq pour modifier la façon dont nous lisons dans le fichier JSON que nous avons rempli avec la sortie de curl à létape 1 et testez cela avec cat.

Le chat La commande concatène les fichiers et les imprime sur la sortie standard.

Nous fournissons le (s) nom (s) de fichier (s) que nous voulons concaténer, ou entrée standard, à la sortie standard. Sans FILE, ou lorsque FILE est -, il lit lentrée standard.

Le | (pipe) permet à la sortie standard de la commande 1 (la commande avant le |) dêtre lentrée standard pour la commande 2 (la commande après le |). Ainsi, notre résultat de concaténation agit comme une entrée pour jq .

jq est un processeur JSON de ligne de commande léger et flexible. Il vous permet de découper, filtrer et mapper et transforme facilement les données structurées. ‘. []’ Déroule le tableau et extrait l’index du tableau sous forme de ligne / ligne et le -c préserve la couleur du formatage jq. Ainsi, le jq . [] -C nous permet de séparer chaque index du tableau du JSON dans une nouvelle ligne et de conserver la couleur du formatage fourni par jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Étape 3: Pour voir combien de lignes (ce sera le nombre de messages que nous publierons sur Kafka ) résultent de notre commande à létape 2.

Ajout de | wc -l nous permet de prendre notre sortie standard de la commande à létape 2 qui est le format jq et les lignes extraites du tableau JSON à la commande suivante en entrée. La commande suivante est wc -l. wc imprime le nombre de sauts de ligne car loption -l est fournie comme option qui spécifie la nouvelle ligne.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Le résultat doit être le nombre de lignes, par exemple:

3280

Étape 4: Ici, nous prenons ce que nous avons à létape 2 et publiez-le dans le sujet Kafka «tentatives dévaluation». Ici, nous utiliserons docker-compose en supposant que le service Kafka est lancé à laide de Docker.

docker-compose exec exécute une commande dans le conteneur dont le nom est fourni, ici container1.

La commande que nous exécutons est bash -c “cat evaluation-tests-nested.json | jq «.[] ’-C | kafkacat -P -b kafka: 29092 -t Assessment-Tentatives & & echo Produit 3280 messages. ”

  • bash consiste à lancer un shell dans le conteneur
  • -c est une option pour pouvoir lire les commandes de la chaîne suivante
  • La chaîne suivante concatène dabord le contenu du fichier assessment-tests-nested.json dans la sortie standard.
  • Il passe ensuite la sortie standard de celui-ci comme entrée standard dans la commande suivante: jq . [] -c qui récupère tout le contenu de la sortie (formaté comme JSON) et extrait chaque index du tableau dans une nouvelle ligne.
  • La sortie standard est ensuite transmise comme entrée standard à la commande suivante: kafkacat – P -b kafka: 29092 -t Assessment-Tentatives & & echo A produit 3280 messages. ”
  • kafkacat -P lance lutilitaire en mode producteur. En cela, kafkacat lit les messages à partir de lentrée standard (stdin).
  • -b kafka: 29092 est utilisé pour spécifier le courtier Kafka, dont le nom est juste Kafka avec lhôte – les deux sont configurés dans le docker-compose.yml
  • -t Assessment-Trips est utilisé pour spécifier le nom du sujet dans lequel nous voulons publier
  • & & est utilisé pour lister une commande que nous voulons exécuter après celle avant quelle ne soit terminée avec succès
  • echo Produit 3280 messages. est un message que nous souhaitez afficher si la commande précédente de publication sur Kafka a été exécutée avec succès. Nous connaissions 3280 à létape 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Le résultat devrait ressembler à ce qui suit:

Produced 3280 messages.

Références

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/