Consumir datos de API en Kafka

Una guía paso a paso para ingerir contenido de un Solicitud HTTP o respuesta de API REST en Kafka

(6 de diciembre de 2020)

Foto de EJ Strat en Unsplash

En el clima de código abierto actual, se ha convertido en un lugar común hacer que los datos estén fácilmente disponibles a través de API. Empresas como Spotify y Twitter exponen algunos de sus datos disponibles para su análisis a través de las API REST . Esto hace que la obtención de datos a través de una solicitud HTTP o API REST sea cada vez más popular. Para que estos datos estén disponibles en una plataforma de análisis de datos eficiente, los usuarios deberán construir algún tipo de canalización para enrutar los datos desde su fuente a la plataforma de análisis deseada. Apache Kafka es una forma resistente y eficiente de obtener estos datos.

Apache Kafka es una plataforma de transmisión de eventos distribuidos de código abierto que se utiliza para – canalizaciones de datos de rendimiento, análisis de transmisión, integración de datos y aplicaciones de misión crítica.

Este cuaderno es un tutorial para publicar el contenido de la respuesta de una solicitud HTTP o API REST en Kafka. Aquí asumiremos que el servicio de Kafka se activó en un contenedor docker. Docker es una plataforma para desarrollar, enviar y ejecutar aplicaciones. Puede leer más sobre Docker aquí .

Paso 1: Lea el contenido de la respuesta de la API REST / respuesta HTTP en un archivo JSON usando el siguiente comando.

Curl es un comando para obtener o enviar datos usando la sintaxis de URL, usando cualquier de los protocolos admitidos. Algunos de los protocolos admitidos son HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP o FILE.

Agregamos las opciones:

  • -L (válido para HTTP y HTTPS) para poder hacer que curl rehaga la solicitud en el nuevo lugar si el servidor informa que la página solicitada se ha movido a una ubicación diferente (indicada con un encabezado Ubicación: y una respuesta 3XX código). Cuando se usa la autenticación, curl solo envía sus credenciales al host inicial. Si un redireccionamiento lleva curl a un host diferente, no podrá interceptar el usuario + contraseña. Puede limitar el número de redireccionamientos a seguir usando la opción – max-redirs.
  • -o assessment-tries-nested.json para escribir la salida en este archivo en lugar de stdout
  • Luego proporcionamos https://goo.gl/ME6hjp , la URL de la que queremos recibir datos.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

El resultado se vería así:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Paso 2 : Use jq para modificar la forma en que leemos en el archivo JSON que llenamos con la salida de curl en el paso 1 y pruebe eso con cat.

El gato El comando concatena archivos e imprime en la salida estándar.

Proporcionamos los nombres de los archivos que queremos concatenar, o la entrada estándar, a la salida estándar. Sin FILE, o cuando FILE es -, lee la entrada estándar.

El | (tubería) permite que la salida estándar del comando 1 (el comando antes de |) sea la entrada estándar para el comando 2 (el comando después de |). Entonces, nuestro resultado de concatenación actúa como una entrada para jq «.»

jq es un procesador JSON de línea de comandos ligero y flexible. Le permite dividir, filtrar, mapear y transformar datos estructurados con facilidad. . [] Desenrolla la matriz y extrae el índice de la matriz como una línea / fila y la -c conserva el color del formato jq. Entonces el jq . [] -C nos permite separar cada índice en la matriz del JSON en una nueva línea y preservar el color del formato proporcionado por jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Paso 3: Para ver cuántas líneas (que serán cuántos mensajes publicaremos en Kafka ) resultado de nuestro comando en el paso 2.

Añadiendo | wc -l nos permite tomar nuestra salida estándar del comando en el Paso 2, que son las líneas jq formateadas y extraídas de la matriz JSON al siguiente comando como entrada. El siguiente comando es wc -l. wc imprime el recuento de nuevas líneas porque -l se proporciona como una opción que especifica una nueva línea.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

El resultado debe ser el número de líneas, por ejemplo:

3280

Paso 4: Aquí tomamos lo que tenemos en el Paso 2 y publíquelo en el tema de Kafka intentos de evaluación. Aquí usaremos docker-compose asumiendo que el servicio Kafka se inicia usando Docker.

docker-compose exec ejecuta un comando en el contenedor cuyo nombre se proporciona, aquí container1.

El comando que ejecutamos es bash -c “cat assessment-tries-nested.json | jq .[] ’-C | kafkacat -P -b kafka: 29092 -t assessment-tries & & echo Produjo 3280 mensajes ”.

  • bash es lanzar un shell en el contenedor
  • -c es una opción para poder leer comandos de la siguiente cadena
  • La siguiente cadena primero concatena el contenido del archivo evaluación-intentos-nested.json en la salida estándar.
  • Luego pasa la salida estándar de eso como entrada estándar al siguiente comando: jq . [] -c que obtiene todo el contenido de la salida (formateado como JSON) y extrae cada índice de la matriz en una nueva línea.
  • La salida estándar de eso se pasa como entrada estándar al siguiente comando: kafkacat – P -b kafka: 29092 -t assessment-tries & & echo Produjo 3280 mensajes ”.
  • kafkacat -P lanza la utilidad en modo de productor. En esto, kafkacat lee los mensajes de la entrada estándar (stdin).
  • -b kafka: 29092 se usa para especificar el corredor de Kafka, cuyo nombre es solo Kafka con el host, ambos configurados en Docker-compose.yml
  • -t Assessment-Attents se usa para especificar el nombre del tema en el que queremos publicar
  • & & se usa para listar un comando que queremos ejecutar después del anterior a que se haya completado con éxito la ejecución
  • echo Produced 3280 messages. es un mensaje que desea mostrar si el comando anterior de publicación en Kafka se ha ejecutado correctamente. Sabíamos 3280 del paso 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

El resultado debería verse como el siguiente:

Produced 3280 messages.

Referencias

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/