Kafka로 API 데이터 사용

Kafka에 대한 HTTP 요청 또는 REST API 응답

(2020 년 12 월 6 일)

사진 : iv id Unsplash

<의 = "0bf8313e9b">

EJ Strat p> 오늘날의 오픈 소스 환경에서는 API를 통해 데이터를 쉽게 사용할 수 있도록하는 것이 일반화되었습니다. Spotify 및 Twitter와 같은 회사는 REST API 를 통해 분석에 사용할 수있는 일부 데이터를 공개합니다. 이로 인해 HTTP 요청 또는 REST API를 통한 데이터 소싱이 점점 더 인기를 얻고 있습니다. 이 데이터를 효율적인 데이터 분석 플랫폼에서 사용할 수 있도록하려면 사용자는 데이터를 소스에서 원하는 분석 플랫폼으로 라우팅하기위한 일종의 파이프 라인을 구축해야합니다. Apache Kafka는이 데이터를 소싱하는 탄력적이고 효율적인 방법입니다.

Apache Kafka 는 -성능 데이터 파이프 라인, 스트리밍 분석, 데이터 통합 ​​및 미션 크리티컬 애플리케이션.

이 노트북은 HTTP 요청 또는 REST API의 응답 콘텐츠를 Kafka에 게시하는 방법을 안내합니다. 여기서는 Kafka 서비스가 도커 컨테이너에서 회전되었다고 가정합니다. Docker는 애플리케이션을 개발, 제공 및 실행하기위한 플랫폼입니다. docker에 대한 자세한 내용은 여기 를 참조하세요.

1 단계 : 다음 명령을 사용하여 REST API 응답 / HTTP 응답의 내용을 JSON 파일로 읽습니다.

Curl은 URL 구문을 사용하여 데이터를 가져 오거나 보내는 명령입니다. 지원되는 프로토콜의. 지원되는 프로토콜 중 일부는 HTTP, HTTPS, FTP, IMAP, POP3, SCP, SFTP, SMTP, TFTP, TELNET, LDAP 또는 FILE입니다.

다음 옵션을 추가합니다.

  • -L (HTTP 및 HTTPS에 유효) : 요청 된 페이지가 다른 위치 (Location : 헤더 및 3XX 응답으로 표시됨)로 이동했다고 서버가보고하는 경우 새 위치에서 요청을 다시 실행할 수 있습니다. 암호). 인증이 사용되면 curl은 자격 증명을 초기 호스트에만 보냅니다. 리디렉션이 다른 호스트로 이동하는 경우 사용자 + 비밀번호를 가로 챌 수 없습니다. — max-redirs 옵션을 사용하여 따라야하는 리디렉션 수를 제한 할 수 있습니다.
  • -o evaluation-attempts-nested.json을 사용하여 stdout 대신이 파일에 출력을 기록합니다.
  • 그런 다음 데이터를 수신 할 URL 인 https://goo.gl/ME6hjp 를 제공했습니다.
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

결과는 다음과 같습니다.

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

2 단계 : jq를 사용하여 1 단계에서 curl의 출력으로 채운 JSON 파일을 읽는 방식을 수정하고 cat으로 테스트합니다.

The cat 명령은 파일을 연결하고 표준 출력에 인쇄합니다.

연결하려는 파일 이름 또는 표준 입력을 표준 출력에 제공합니다. FILE이 없거나 FILE이-이면 표준 입력을 읽습니다.

The | (파이프)는 명령 1의 표준 출력 (| 앞의 명령)이 명령 2 (| 뒤의 명령)에 대한 표준 입력이되도록합니다. 따라서 연결 결과는 jq‘.’에 대한 입력 역할을합니다.

jq는 가볍고 유연한 명령 줄 JSON 프로세서입니다. 구조화 된 데이터를 쉽게 분할 및 필터링하고 매핑 및 변환 할 수 있습니다. ‘. []’는 배열을 풀고 배열의 인덱스를 행 / 행으로 꺼내고 -c는 jq 형식의 색상을 유지합니다. 따라서 jq . []-c를 사용하면 JSON 배열의 각 인덱스를 새 줄로 분리하고 jq에서 제공하는 서식의 색상을 유지할 수 있습니다.

cat assessment-attempts-nested.json | jq ".[]" -c

3 단계 : 몇 줄 (Kafka에 게시하는 메시지 수)을 확인하려면 ) 2 단계의 명령 결과입니다.

추가 | wc -l을 사용하면 2 단계의 명령에서 jq 형식화되고 배열 JSON에서 추출 된 행의 표준 출력을 입력으로 다음 명령으로 가져올 수 있습니다. 다음 명령은 wc -l입니다. wc는 줄 바꿈을 지정하는 옵션으로 -l이 제공되었으므로 줄 바꿈 개수를 인쇄합니다.

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

결과는 다음과 같이 줄 수 여야합니다.

p>

3280

4 단계 : 여기에서 얻은 정보를 가져옵니다. 2 단계에서이를 Kafka 주제 assessment-attempts에 게시합니다. 여기서는 Kafka 서비스가 Docker를 사용하여 시작되었다고 가정하고 docker-compose를 사용합니다.

docker-compose exec는 이름이 제공된 컨테이너 (여기서는 container1)에서 명령을 실행합니다.

우리가 실행하는 명령은 bash -c“cat evaluation-attempts-nested.json | jq‘.[] -c | kafkacat -P -b kafka : 29092 -t 평가-시도 & & echo Produced 3280 messages.”

  • bash는 컨테이너에서 쉘을 실행하는 것입니다.
  • -c는 다음 문자열에서 명령을 읽을 수있는 옵션입니다.
  • 다음 문자열 먼저 evaluation-attempts-nested.json 파일의 내용을 표준 출력에 연결합니다.
  • 그런 다음 표준 출력을 표준 입력으로 다음 명령에 전달합니다. jq . []-c 출력의 모든 내용 (JSON과 같은 형식)을 가져오고 배열의 각 색인을 새 줄로 추출합니다.
  • 그런 다음 표준 출력이 다음 명령에 표준 입력으로 전달됩니다. kafkacat- P -b kafka : 29092 -t 평가-시도 & & echo Produced 3280 messages.”
  • kafkacat -P는 생산자 모드에서 유틸리티를 시작합니다. 여기서 kafkacat은 표준 입력 (stdin)에서 메시지를 읽습니다.
  • -b kafka : 29092는 Kafka 브로커를 지정하는 데 사용됩니다. 이름은 호스트가있는 Kafka입니다. docker-compose.yml
  • -t evaluation-attempts는 게시 할 주제 이름을 지정하는 데 사용됩니다.
  • & &는 실행이 성공적으로 완료되기 전에 실행할 명령을 나열하는 데 사용됩니다.
  • echo Produced 3280 messages.는 메시지입니다. Kafka에 게시하는 이전 명령이 성공적으로 실행되었는지 표시하려고합니다. 3 단계에서 3280을 알고있었습니다.
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

결과는 다음과 같습니다.

Produced 3280 messages.

참조

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https : / /stedolan.github.io/jq/