Przetwarzanie danych API do platformy Kafka

Przewodnik krok po kroku dotyczący pozyskiwania treści z Żądanie HTTP lub odpowiedź REST API do Kafka

(6 grudnia 2020 r.)

Zdjęcie: EJ Strat na Unsplash

W dzisiejszym środowisku open source powszechnym stało się udostępnianie danych za pośrednictwem interfejsów API. Firmy takie jak Spotify i Twitter udostępniają część swoich danych dostępnych do analizy za pośrednictwem REST API . To sprawia, że ​​pozyskiwanie danych za pośrednictwem żądania HTTP lub REST API staje się coraz bardziej popularne. Aby udostępnić te dane w wydajnej platformie analizy danych, użytkownicy będą musieli zbudować pewnego rodzaju potok do kierowania danych z ich źródła do żądanej platformy analitycznej. Apache Kafka to odporny i wydajny sposób pozyskiwania tych danych.

Apache Kafka to rozproszona platforma przesyłania strumieniowego zdarzeń typu open source używana do – potoki danych wydajnościowych, analizy strumieniowe, integracja danych i aplikacje o znaczeniu krytycznym.

Ten notatnik to przewodnik po publikowaniu treści odpowiedzi z żądania HTTP lub interfejsu API REST do platformy Kafka. Tutaj przyjmiemy, że usługa Kafka została uruchomiona w kontenerze docker. Docker to platforma do tworzenia, wysyłania i uruchamiania aplikacji. Możesz przeczytać więcej o dockerze tutaj .

Krok 1: Odczytaj zawartość odpowiedzi REST API / odpowiedzi HTTP do pliku JSON za pomocą następującego polecenia.

Curl to polecenie do pobierania lub wysyłania danych przy użyciu składni adresu URL, przy użyciu dowolnego obsługiwanych protokołów. Niektóre z obsługiwanych protokołów to HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP lub FILE.

Dodajemy opcje:

  • -L (ważne dla HTTP i HTTPS), aby umożliwić curl ponowienie żądania w nowym miejscu, jeśli serwer zgłosi, że żądana strona została przeniesiona do innej lokalizacji (oznaczonej nagłówkiem Location: i odpowiedzią 3XX kod). Gdy używane jest uwierzytelnianie, curl wysyła swoje dane uwierzytelniające tylko do początkowego hosta. Jeśli przekierowanie przenosi curl do innego hosta, nie będzie w stanie przechwycić użytkownika i hasła. Możesz ograniczyć liczbę przekierowań do naśladowania, używając opcji – max-redirs.
  • -o assessment-próbuje-nested.json zapisuje dane wyjściowe do tego pliku zamiast na standardowe wyjście.
  • Następnie podaliśmy https://goo.gl/ME6hjp , adres URL, z którego chcemy otrzymywać dane.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

Wynik będzie wyglądał następująco:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

Krok 2 : Użyj jq, aby zmodyfikować sposób, w jaki czytamy w pliku JSON, który wypełniliśmy danymi wyjściowymi curl w kroku 1, i przetestuj to za pomocą cat.

Kot polecenie łączy pliki i drukuje na standardowym wyjściu.

Podajemy nazwy plików, które chcemy połączyć, lub standardowe wejście, na standardowe wyjście. Bez PLIKU lub gdy PLIK to -, czyta standardowe wejście.

| (potok) pozwala, aby standardowe wyjście polecenia 1 (polecenie przed |) było standardowym wejściem dla polecenia 2 (polecenie po |). Tak więc nasz wynik konkatenacji działa jako dane wejściowe do jq „.”

jq to lekki i elastyczny procesor JSON wiersza poleceń. Pozwala z łatwością wycinać, filtrować, mapować i przekształcać uporządkowane dane. „. []” Rozwija tablicę i wyciąga indeks z tablicy jako wiersz / wiersz, a -c zachowuje kolor z formatowania jq. Zatem jq . [] -C pozwala nam oddzielić każdy indeks w tablicy JSON do nowej linii i zachować kolor formatowania zapewniany przez jq.

cat assessment-attempts-nested.json | jq ".[]" -c

Krok 3: Aby zobaczyć, ile wierszy (czyli ile wiadomości opublikujemy w serwisie Kafka) ) wynikają z naszego polecenia w kroku 2.

Dodawanie | wc -l pozwala nam pobrać nasze standardowe dane wyjściowe z polecenia w kroku 2, czyli sformatowane w jq i wyodrębnione wiersze z tablicy JSON do następnego polecenia jako dane wejściowe. Następnym poleceniem jest wc -l. wc wypisuje liczbę nowych linii, ponieważ -l podane jako opcja określająca nową linię.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

Wynik powinna być liczbą wierszy, na przykład:

3280

Krok 4: Tutaj bierzemy to, co mamy w kroku 2 i opublikuj to w temacie Kafki „próby oceny”. Tutaj użyjemy docker-compose, zakładając, że usługa Kafka jest uruchamiana za pomocą Dockera.

docker-compose exec uruchamia polecenie w kontenerze, którego nazwa jest podana, tutaj „container1”.

Polecenie, które uruchamiamy to bash -c “cat assessment-samples-nested.json | jq ”.[] ’-C | kafkacat -P -b kafka: 29092 -t assessment-tests & & echo Wytworzono 3280 wiadomości. ”

  • bash uruchamia powłokę w kontenerze
  • -c to opcja umożliwiająca odczytanie poleceń z następującego ciągu
  • Następujący ciąg najpierw łączy zawartość pliku assessment-tours-nested.json na standardowe wyjście.
  • Następnie przekazuje standardowe wyjście z tego jako standardowe wejście do następnego polecenia: jq . [] -c który pobiera całą zawartość wyjścia (sformatowaną jak JSON) i wyodrębnia każdy indeks tablicy do nowego wiersza.
  • Standardowe wyjście jest następnie przekazywane jako standardowe wejście do następnego polecenia: kafkacat – P -b kafka: 29092 -t assessment-tests & & echo Wytworzono 3280 wiadomości. ”
  • kafkacat -P uruchamia narzędzie w trybie producenta. W tym przypadku kafkacat czyta komunikaty ze standardowego wejścia (stdin).
  • -b kafka: 29092 służy do określenia brokera Kafka, którego nazwa to po prostu Kafka z hostem – oba są skonfigurowane w plik docker-compose.yml
  • -t assessment-tests służy do określenia nazwy tematu, w którym chcemy opublikować
  • & & służy do wylistowania polecenia, które chcemy wykonać po poleceniu przed jego pomyślnym zakończeniem
  • echo Wytworzono komunikaty 3280. to wiadomość, którą chcesz wyświetlić, czy poprzednie polecenie publikowania w Kafce zostało pomyślnie wykonane. Znaliśmy 3280 z kroku 3.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

Wynik powinien wyglądać następująco:

Produced 3280 messages.

Odniesienia

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https: / /stedolan.github.io/jq/