KafkaへのAPIデータの消費

からコンテンツを取り込むためのステップバイステップガイドKafkaへのHTTPリクエストまたはRESTAPIレスポンス

(2020年12月6日)

写真提供:iv id スプラッシュ解除

<の= "0bf8313e9b">

EJ Strat p>今日のオープンソース環境では、APIを介してデータをすぐに利用できるようにすることが一般的になっています。 SpotifyやTwitterなどの企業は、 REST API を介して分析に利用できるデータの一部を公開しています。これにより、HTTPリクエストまたはRESTAPIを介したデータのソーシングがますます一般的になっています。このデータを効率的なデータ分析プラットフォームで利用できるようにするには、ユーザーは、ソースから目的の分析プラットフォームにデータをルーティングするための何らかのパイプラインを構築する必要があります。 Apache Kafkaは、このデータを調達するための回復力のある効率的な方法です。

Apache Kafka は、高レベルで使用されるオープンソースの分散イベントストリーミングプラットフォームです。 -パフォーマンスデータパイプライン、ストリーミング分析、データ統合、ミッションクリティカルなアプリケーション。

このノートブックは、HTTPリクエストまたはRESTAPIからの応答のコンテンツをKafkaに公開するためのウォークスルーです。ここでは、KafkaサービスがDockerコンテナーでスピンアップされたと想定します。 Dockerは、アプリケーションを開発、出荷、実行するためのプラットフォームです。 docker の詳細についてはこちらをご覧ください。

ステップ1:次のコマンドを使用して、RESTAPI応答/ HTTP応答の内容をJSONファイルに読み込みます。

Curlは、URL構文を使用してデータを取得または送信するためのコマンドです。サポートされているプロトコルのサポートされているプロトコルには、HTTP、HTTPS、FTP、IMAP、POP3、SCP、SFTP、SMTP、TFTP、TELNET、LDAP、またはFILEがあります。

オプションを追加します:

  • -L(HTTPおよびHTTPSで有効)は、要求されたページが別の場所(Location:ヘッダーと3XX応答で示される)に移動したことをサーバーが報告した場合に、新しい場所でcurlREDOを実行できるようにします。コード)。認証が使用される場合、curlはその資格情報を最初のホストにのみ送信します。リダイレクトによって別のホストにカールが発生した場合、ユーザーとパスワードを傍受することはできません。 — max-redirsオプションを使用して、フォローするリダイレクトの数を制限できます。
  • -oassessment-attempts-nested.jsonを使用して、stdoutの代わりにこのファイルに出力を書き込みます
  • 次に、データを受信するURLである https://goo.gl/ME6hjp を提供しました。
curl -L -o assessment-attempts-nested.json 
https://goo.gl/ME6hjp

結果は次のようになります:

\% Total \% Received \% Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 9096k 100 9096k 0 0 14.6M 0 --:--:-- --:--:-- --:--:-- 14.6M

ステップ2 : jqを使用して、手順1でcurlの出力を入力したJSONファイルの読み取り方法を変更し、catでテストします。

catコマンドはファイルを連結し、標準出力に出力します。

連結するファイル名、つまり標準入力を標準出力に提供します。 FILEがない場合、またはFILEが-の場合、標準入力を読み取ります。

| (パイプ)を使用すると、コマンド1(|の前のコマンド)の標準出力をコマンド2(|の後のコマンド)の標準入力にすることができます。したがって、連結結果はjq ‘。’への入力として機能します。

jqは、軽量で柔軟なコマンドラインJSONプロセッサです。構造化データを簡単にスライス、フィルタリング、マッピング、変換できます。 ‘。[]’は配列を展開し、配列内のインデックスを行/行として引き出します。-cはjqフォーマットからの色を保持します。したがって、jq 。[]-cを使用すると、JSONの配列内の各インデックスを新しい行に分離し、jqによって提供されるフォーマットの色を保持できます。

cat assessment-attempts-nested.json | jq ".[]" -c

ステップ3:行数(Kafkaに公開するメッセージの数)を確認する)ステップ2のコマンドの結果。

追加| wc -lを使用すると、ステップ2のコマンドから標準出力を取得できます。これは、配列JSONからjq形式で抽出された行であり、入力として次のコマンドになります。次のコマンドはwc-lです。 -lは改行を指定するオプションとして提供されるため、wcは改行カウントを出力します。

cat assessment-attempts-nested.json | jq ".[]" -c | wc -l

結果は行数になります。例:

3280

ステップ4:ここで取得したものを取得しますステップ2で、それをKafkaトピック「assessment-attempts」に公開します。ここでは、KafkaサービスがDockerを使用して起動されると仮定してdocker-composeを使用します。

docker-composeexecは、名前が指定されたコンテナー(ここでは「container1」)でコマンドを実行します。

実行するコマンドはbash-c“ catassessment-attempts-nested.json | jq ‘。[] ’-c | kafkacat -P -b kafka:29092 -tassessment-attempts & & echo3280個のメッセージを生成しました。 / blockquote>

  • bashはコンテナ内でシェルを起動することです
  • -cは、次の文字列からコマンドを読み取ることができるオプションです
  • 次の文字列最初に、ファイルassessment-attempts-nested.jsonの内容を標準出力に連結します。
  • 次に、そこからの標準出力を標準入力として次のコマンドに渡します。jq 。[]-c which出力のすべての内容(JSONのようにフォーマットされた)を取得し、配列の各インデックスを新しい行に抽出します。
  • 次に、その標準出力が標準入力として次のコマンドに渡されます。kafkacat- P -b kafka:29092 -tassessment-attempts & & echo 生成された3280メッセージ。”
  • kafkacat -Pは、ユーティリティをプロデューサーモードで起動します。この場合、kafkacatは標準入力(stdin)からメッセージを読み取ります。
  • -bkafka:29092は、Kafkaブローカーを指定するために使用されます。その名前は、ホストを持つKafkaだけです。 docker-compose.yml
  • -tassessment-attemptsは、公開するトピック名を指定するために使用されます
  • & &は、実行が正常に完了する前のコマンドの後に実行するコマンドを一覧表示するために使用されます
  • echo 生成された3280メッセージ。はメッセージです。 Kafkaに公開する前のコマンドが正常に実行されたかどうかを表示したい。手順3から3280がわかりました。
docker-compose exec container1 bash -c "cat assessment-attempts-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessment-attempts && echo "Produced 3280 messages.""

結果は次のようになります。

Produced 3280 messages.

参照

  1. https://kafka.apache.org/
  2. https://docs.docker.com/get-started/overview/
  3. https://www.geeksforgeeks.org/curl-command-in-linux-with-examples/
  4. https:/ /stedolan.github.io/jq/