Consumindo dados de API em Kafka

Um guia passo a passo para ingerir conteúdo de um Solicitação HTTP ou resposta da API REST em Kafka

(6 de dezembro de 2020)

Foto por EJ Strat em Unsplash

No clima de código aberto de hoje, tornou-se comum disponibilizar dados prontamente por meio de APIs. Empresas como Spotify e Twitter expõem alguns de seus dados disponíveis para análise por meio da API REST s. Isso torna o fornecimento de dados por meio de uma solicitação HTTP ou API REST cada vez mais popular. Para disponibilizar esses dados em uma plataforma de análise de dados eficiente, os usuários precisarão construir algum tipo de pipeline para rotear os dados de sua fonte para a plataforma de análise desejada. Apache Kafka é uma forma resiliente e eficiente de obter esses dados.

Apache Kafka é uma plataforma de streaming de eventos distribuída de código aberto usada para alta – pipelines de dados de desempenho, análise de streaming, integração de dados e aplicativos de missão crítica.

Este bloco de notas é um passo a passo na publicação de conteúdo da resposta de uma solicitação HTTP ou API REST para Kafka. Aqui, assumiremos que o serviço Kafka foi criado em um contêiner docker. Docker é uma plataforma para desenvolver, enviar e executar aplicativos. Você pode ler mais sobre docker aqui .

Etapa 1: Leia o conteúdo da resposta da API REST / resposta HTTP em um arquivo JSON usando o seguinte comando.

Curl é um comando para obter ou enviar dados usando a sintaxe de URL, usando qualquer dos protocolos suportados. Alguns dos protocolos suportados são HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP ou FILE.

Adicionamos as opções:

  • -L (válido para HTTP e HTTPS) para ser capaz de fazer curl refazer a solicitação no novo local se o servidor relatar que a página solicitada foi movida para um local diferente (indicado com um cabeçalho Location: e uma resposta 3XX código). Quando a autenticação é usada, curl apenas envia suas credenciais para o host inicial. Se um redirecionamento leva curl para um host diferente, ele não será capaz de interceptar o usuário + senha. Você pode limitar o número de redirecionamentos a serem seguidos usando a opção – max-redirs.
  • -o assessment-tentativas-nested.json para gravar a saída neste arquivo em vez de stdout
  • Em seguida, fornecemos https://goo.gl/ME6hjp , o URL do qual desejamos receber dados.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

O resultado ficaria assim:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Etapa 2 : Use jq para modificar a maneira como lemos no arquivo JSON que preenchemos com a saída de curl na etapa 1 e teste isso com cat.

O gato O comando concatena arquivos e imprime na saída padrão.

Fornecemos o (s) nome (s) do (s) arquivo (s) que desejamos concatenar, ou entrada padrão, na saída padrão. Sem FILE, ou quando FILE é -, ele lê a entrada padrão.

O | (tubo) permite que a saída padrão do comando 1 (o comando antes de |) seja a entrada padrão para o comando 2 (o comando após |). Portanto, nosso resultado concatenado atua como uma entrada para jq ‘.’

jq é um processador JSON de linha de comando leve e flexível. Ele permite dividir, filtrar, mapear e transformar dados estruturados com facilidade. ‘. []’ Desenrola a matriz e puxa o índice na matriz como uma linha / linha e o -c preserva a cor da formatação jq. Portanto, o jq . [] -C nos permite separar cada índice na matriz do JSON em uma nova linha e preservar a cor da formatação fornecida por jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Etapa 3: Para ver quantas linhas (que serão quantas mensagens publicamos no Kafka ) resultado de nosso comando na etapa 2.

Adicionando | wc -l nos permite pegar nossa saída padrão do comando na Etapa 2, que é as linhas jq formatadas e extraídas do array JSON para o próximo comando como entrada. O próximo comando é wc -l. wc imprime a contagem de nova linha porque o -l fornecido como uma opção que especifica nova linha.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

O resultado deve ser o número de linhas, por exemplo:

3280

Etapa 4: Aqui pegamos o que temos na Etapa 2 e publique-o no tópico Kafka tentativas de avaliação. Aqui, usaremos docker-compose assumindo que o serviço Kafka seja iniciado usando Docker.

docker-compose exec executa um comando no contêiner cujo nome é fornecido, aqui container1.

O comando que executamos é bash -c “cat assessment-tentativas-nested.json | jq ‘.[] ’-C | kafkacat -P -b kafka: 29092 -t tentativas de avaliação & & echo Produziu 3280 mensagens. ”

  • bash é lançar um shell no contêiner
  • -c é uma opção para ser capaz de ler comandos da seguinte string
  • A string seguinte primeiro concatena o conteúdo do arquivo assessment-tentativas-nested.json na saída padrão.
  • Em seguida, passa a saída padrão dessa como entrada padrão para o próximo comando: jq . [] -c que obtém todo o conteúdo da saída (formatado como JSON) e extrai cada índice da matriz em uma nova linha.
  • A saída padrão disso é então passada como entrada padrão para o próximo comando: kafkacat – P -b kafka: 29092 -t assessment-tentativas & & echo Produziu 3280 mensagens. ”
  • kafkacat -P inicia o utilitário no modo produtor. Neste, kafkacat lê mensagens da entrada padrão (stdin).
  • -b kafka: 29092 é usado para especificar o corretor Kafka, cujo nome é apenas Kafka com o host – ambos configurados em o docker-compose.yml
  • -t assessment-tentativas é usado para especificar o nome do tópico que queremos publicar
  • & & é usado para listar um comando que desejamos executar após o anterior ter concluído a execução com sucesso
  • echo Produziu 3280 mensagens. é uma mensagem que nós deseja exibir se o comando anterior de publicação no Kafka foi executado com sucesso. Sabíamos 3280 da Etapa 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

O resultado deve ser semelhante ao seguinte:

Produced 3280 messages.

Referências

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/