Pulsar Flink Connector 2.7.0의 새로운 기능

( Sijia-w) (2020 년 12 월 24 일)

자세히 알아보기 Pulsar Flink Connector 2.7.0의 가장 흥미롭고 주요 기능입니다.

Pulsar Flink Connector는 Apache Pulsar 및 Apache Flink를 사용한 통합 배치 및 스트리밍을위한 이상적인 솔루션을 제공합니다. Pulsar Flink Connector 2.7.0은 Pulsar 2.7 및 Flink 1.12의 기능을 지원하며 Flink 데이터 형식과 완벽하게 호환됩니다.

Pulsar Flink 커넥터 정보

기업이 실제 데이터에 액세스하기 위해 시간 데이터 통찰력을 얻으려면 통합 배치 및 스트리밍 기능이 필요합니다. Apache Flink는 통합 데이터 표현으로 “스트림”을 사용하여 배치 및 스트림 처리를 하나의 단일 컴퓨팅 엔진으로 통합합니다. 개발자는 컴퓨팅 및 API 계층에서 광범위한 작업을 수행했지만 데이터 및 메시징 및 스토리지 계층에서 수행 된 작업은 거의 없습니다. 그러나 실제로 데이터는 다양한 스토리지 및 메시징 기술에 의해 생성 된 데이터 사일로로 분리됩니다. 결과적으로 여전히 단일 진실 소스가 없으며 개발자 팀의 전반적인 운영은 여전히 ​​지저분합니다. 복잡한 작업을 처리하려면 데이터를 스트림에 저장해야합니다. Apache Pulsar (Apache BookKeeper와 함께)는 기준을 완벽하게 충족합니다. 데이터는 하나의 사본 (진실 소스)으로 저장되고 스트림 (pub-sub 인터페이스를 통해) 및 세그먼트 (일괄 처리 용)에서 액세스 할 수 있습니다. Flink와 Pulsar가 결합되면 두 개의 오픈 소스 기술이 실시간 데이터 기반 비즈니스를위한 통합 데이터 아키텍처를 만듭니다.

Pulsar Flink 커넥터 Apache Pulsar Apache Flink 를 통해 탄력적 인 데이터 처리를 제공하여 Apache Flink를 Apache Pulsar에서 데이터 읽기 / 쓰기. Pulsar Flink 커넥터를 사용하면 저장소 세부 정보에 대해 걱정하지 않고 비즈니스 로직에 집중할 수 있습니다.

과제

Pulsar Flink 커넥터를 처음 개발했을 때 두 제품 모두에서 광범위하게 채택되었습니다. Flink 및 Pulsar 커뮤니티. Pulsar Flink 커넥터를 활용하여 Hewlett Packard Enterprise (HPE) 는 실시간 컴퓨팅 플랫폼 인 BIGO 는 실시간 메시지 처리 시스템을 구축했으며 Zhihu 는 실시간 컴퓨팅 시스템에 대한 커넥터의 적합성을 평가하는 중입니다.

더 많은 사용자가 Pulsar Flink 커넥터를 채택함에 따라 커뮤니티에서 공통적 인 문제를 들었습니다. 직렬화 및 역 직렬화를 수행하기가 어렵습니다. Pulsar Flink 커넥터는 Pulsar 직렬화를 활용하지만 이전 버전은 Flink 데이터 형식을 지원하지 않았습니다. 결과적으로 사용자는 커넥터를 사용하여 실시간 컴퓨팅을 수행하기 위해 많은 구성을 수행해야했습니다.

Pulsar Flink 커넥터를 사용하기 쉽게 만들기 위해 우리는 완전한 기능을 구축하기로 결정했습니다. Flink 데이터 형식을 지원하므로 사용자가 구성에 시간을 할애 할 필요가 없습니다.

Pulsar Flink Connector 2.7.0의 새로운 기능

Pulsar Flink Connector 2.7.0은 기능을 지원합니다. Apache Pulsar 2.7.0 및 Apache Flink 1.12에서 지원되며 Flink 커넥터 및 Flink 메시지 형식과 완벽하게 호환됩니다. 이제 정확히 1 회 싱크, upsert Pulsar 메커니즘, DDL (데이터 정의 언어) 계산 열, 워터 마크 및 메타 데이터와 같은 Flink의 중요한 기능을 사용할 수 있습니다. 또한 Pulsar에서 키 공유 구독을 활용하고 많은 구성없이 직렬화 및 직렬화 해제를 수행 할 수 있습니다. 또한 비즈니스에 따라 쉽게 구성을 사용자 지정할 수 있습니다.

아래에서는 Pulsar Flink Connector 2.7.0의 주요 기능을 자세히 소개합니다.

성능

사용자가 메시지 순서를 엄격하게 보장해야 할 때 한 명의 소비자 만 메시지를 사용할 수있었습니다. 이것은 처리량에 심각한 영향을 미쳤습니다. 이를 해결하기 위해 Pulsar에서 Key\_Shared 구독 모델을 설계했습니다. 각 메시지에 키를 추가하여 메시지 순서를 보장하고 처리량을 향상 시키며 동일한 키 해시가있는 메시지를 한 명의 소비자에게 라우팅합니다.

Pulsar Flink Connector 2.7.0은 Key\_Shared 구독 모델을 지원합니다. enable-key-hash-rangetrue로 설정하여이 기능을 사용할 수 있습니다. 각 소비자가 처리하는 키 해시 범위는 작업의 병렬 처리에 의해 결정됩니다.

Pulsar 싱크에 대한 정확히 한 번 의미론 도입 (Pulsar 트랜잭션 기반)

이전 버전에서는 싱크 연산자는 최소 1 회 시맨틱 만 지원했으며 이는 종단 간 일관성 요구 사항을 완전히 충족 할 수 없었습니다. 메시지를 중복 제거하기 위해 사용자는 사용자 친화적이지 않은 더러운 작업을 수행해야했습니다.

트랜잭션은 Pulsar 2.7.0에서 지원되므로 Flink 싱크의 내결함성 기능이 크게 향상됩니다. Pulsar Flink Connector 2.7.0에서 우리는 Pulsar 트랜잭션을 기반으로하는 싱크 연산자를위한 정확히 한 번 의미 체계를 설계했습니다. Flink는 2 단계 커밋 프로토콜을 사용하여 TwoPhaseCommitSinkFunction을 구현합니다. 주요 라이프 사이클 메소드는 beginTransaction (), preCommit (), commit (), abort (), recoverAndCommit (), recoverAndAbort ()입니다.

싱크 연산자를 만들 때 의미 체계를 유연하게 선택할 수 있습니다. 내부 논리 변경은 투명합니다. Pulsar 트랜잭션은 Flink의 2 단계 커밋 프로토콜과 유사하므로 Connector Sink의 안정성이 크게 향상됩니다.

beginTransaction 및 preCommit을 쉽게 구현할 수 있습니다. Pulsar 트랜잭션을 시작하고 체크 포인트 이후에 트랜잭션의 TID를 유지하기 만하면됩니다. 사전 커밋 단계에서는 모든 메시지가 Pulsar로 플러시되고 사전 커밋 된 메시지가 결국 커밋되는지 확인해야합니다.

우리는 구현에서 recoverAndCommit 및 recoverAndAbort에 중점을 둡니다. Kafka 기능에 의해 제한되는 Kafka 커넥터는 recoverAndCommit에 대한 해킹 스타일을 채택합니다. Pulsar 트랜잭션은 특정 Producer에 의존하지 않으므로 TID를 기반으로 트랜잭션을 쉽게 커밋하고 중단 할 수 있습니다.

Pulsar 트랜잭션은 매우 효율적이고 유연합니다. Pulsar 및 Flink의 장점을 활용하는 Pulsar Flink 커넥터는 훨씬 더 강력합니다. Pulsar Flink 커넥터의 트랜잭션 싱크를 계속 개선 할 것입니다.

upsert-pulsar 커넥터 소개

Flink 커뮤니티의 사용자는 upsert Pulsar에 대한 요구를 표현했습니다. 메일 링리스트와 문제를 살펴본 후 다음 세 가지 이유를 요약했습니다.

  • Pulsar 주제를 키가있는 레코드를 upsert (일명 삽입 / 업데이트) 이벤트로 해석하는 변경 로그 스트림으로 해석합니다.
  • 실시간 파이프 라인의 일부로 보강을 위해 여러 스트림을 결합하고 나중에 추가 계산을 위해 결과를 Pulsar 주제에 저장합니다. 그러나 결과에 업데이트 이벤트가 포함될 수 있습니다.
  • 실시간 파이프 라인의 일부로 데이터 스트림을 집계하고 나중에 추가 계산을 위해 결과를 Pulsar 주제에 저장합니다. 그러나 결과에 업데이트 이벤트가 포함될 수 있습니다.

요구 사항에 따라 Upsert Pulsar에 대한 지원을 추가합니다. upsert-pulsar 커넥터를 사용하면 upsert 방식으로 Pulsar 주제에서 데이터를 읽고 쓸 수 있습니다.

  • 소스로서 upsert-pulsar 커넥터는 각 데이터 레코드가 나타내는 변경 로그 스트림을 생성합니다. 업데이트 또는 삭제 이벤트. 보다 정확하게는 데이터 레코드의 값이 동일한 키에 대한 마지막 값의 UPDATE (있는 경우)로 해석됩니다 (해당 키가 아직없는 경우 업데이트는 INSERT로 간주 됨). 테이블 비유를 사용하면 변경 로그 스트림의 데이터 레코드는 동일한 키를 가진 기존 행을 덮어 쓰므로 UPSERT (일명 INSERT / UPDATE)로 해석됩니다. 또한 null 값은 특별한 방식으로 해석됩니다. null 값이있는 레코드는 “DELETE”를 나타냅니다.
  • 싱크로서 upsert-pulsar 커넥터는 변경 로그 스트림을 사용할 수 있습니다. INSERT / UPDATE\_AFTER 데이터를 일반 Pulsar 메시지 값으로 쓰고 DELETE 데이터를 Null 값이있는 Pulsar 메시지로 기록합니다 (키의 삭제 표시를 나타냄). Flink는 기본 키 열의 값에 대한 파티션 데이터별로 기본 키의 메시지 순서를 보장하므로 동일한 키의 업데이트 / 삭제 메시지가 동일한 파티션에 속합니다.

FLIP-27 FLIP-95 에 도입 된 새로운 소스 인터페이스 및 Table API 지원 h2>

이 기능은 배치 스트림의 소스를 통합하고 작업 검색 및 데이터 읽기를위한 메커니즘을 최적화합니다. 또한 Pulsar 배치 및 스트리밍 통합 구현의 초석이기도합니다. 새로운 Table API는 DDL 계산 열, 워터 마크 및 메타 데이터를 지원합니다.

FLIP-107 에 설명 된대로 SQL 읽기 및 쓰기 메타 데이터를 지원합니다. h2>

FLIP-107을 사용하면 사용자가 테이블 정의의 메타 데이터 열로 커넥터 메타 데이터에 액세스 할 수 있습니다. 실시간 컴퓨팅에서 사용자는 일반적으로 eventTime, 사용자 정의 필드와 같은 추가 정보가 필요합니다. Pulsar Flink 커넥터는 SQL 읽기 및 쓰기 메타 데이터를 지원하므로 사용자가 Pulsar Flink Connector 2.7.0에서 Pulsar 메시지의 메타 데이터를 유연하고 쉽게 관리 할 수 ​​있습니다. 구성에 대한 자세한 내용은 Pulsar 메시지 메타 데이터 조작 을 참조하세요.

Flink 형식 유형 추가 atomic를 사용하여 Pulsar 기본 유형 지원

Pulsar Flink 커넥터 2.7.0에서 Flink 형식 유형 atomic를 추가하여 Pulsar 기본 유형을 지원합니다. Flink 처리에 Pulsar 기본 형식이 필요한 경우 atomic를 커넥터 형식으로 사용할 수 있습니다. Pulsar 기본 유형에 대한 자세한 내용은 https : //pulsar.apache를 참조하세요.org / docs / en / schema-understand / .

마이그레이션

이전 Pulsar Flink Connector 버전을 사용하는 경우 SQL 및 API 매개 변수를 조정해야합니다. 따라서. 아래에서 각각에 대한 세부 정보를 제공합니다.

SQL

SQL에서 DDL 선언의 Pulsar 구성 매개 변수를 변경했습니다. 일부 매개 변수의 이름은 변경되지만 값은 변경되지 않습니다.

  • 매개 변수 이름에서 connector. 접두사를 제거합니다.
  • connector.type 매개 변수의 이름을 connector로 변경합니다.
  • connector.startup-modescan.startup.mode로 변환합니다.
  • Pulsar 속성을 properties.pulsar.reader.readername=testReaderName로 조정합니다. li>

Pulsar Flink Connector에서 SQL을 사용하는 경우 Pulsar Flink Connector 2.7.0으로 마이그레이션 할 때 그에 따라 SQL 구성을 조정해야합니다. 다음 샘플은 이전 버전과 SQL 용 2.7.0 버전 간의 차이점을 보여줍니다.

이전 버전의 SQL :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL Pulsar Flink Connector 2.7.0 :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

API 관점에서 일부 클래스를 조정하고 더 쉬운 사용자 지정을 가능하게했습니다.

  • 직렬화 문제를 해결하기 위해 생성 방법 FlinkPulsarSink의 서명을 변경하고 PulsarSerializationSchema를 추가했습니다.
  • FlinkPulsarRowSink, FlinkPulsarRowSource와 같이 행과 관련된 부적절한 클래스를 제거했습니다. Row 형식을 처리해야하는 경우 Flink Row 관련 직렬화 구성 요소를 사용할 수 있습니다.

iv id =를 사용하여 PulsarSerializationSchema를 빌드 할 수 있습니다. “3564c1ecec”> . TopicKeyExtractorPulsarSerializationSchemaWrapper로 이동되었습니다. API를 조정할 때 다음 샘플을 참조 할 수 있습니다.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

미래 계획

오늘 우리는 배치를 설계하고 있습니다. 새로운 Flink Source API (FLIP-27)를 기반으로 Pulsar Source와 통합 된 스트림 솔루션입니다. 새로운 솔루션은 현재 스트리밍 소스 인터페이스 (SourceFunction)의 제한을 해제하고 동시에 배치 API와 스트리밍 API 간의 소스 인터페이스를 통합합니다.

Pulsar는 데이터가 스트리밍, 배치, Pulsar가 무한한 용량을 제공 할 수 있도록하는 콜드 데이터. 따라서 Pulsar는 통합 배치 및 스트리밍에 이상적인 솔루션입니다.

새로운 Flink Source API를 기반으로하는 배치 및 스트림 솔루션은 SplitEnumerator와 Reader의 두 부분으로 나뉩니다. SplitEnumerator는 파티션을 검색 및 할당하고 Reader는 파티션에서 데이터를 읽습니다.

Pulsar는 원장 블록에 메시지를 저장하고 Pulsar 관리자를 통해 원장을 찾은 다음 브로커 파티션, BookKeeper 파티션, 오프로더 파티션 및 다른 파티션 정책을 통해 기타 정보를 제공 할 수 있습니다. 자세한 내용은 https://github.com/streamnative/pulsar-flink/issues/187 을 참조하세요.

결론

결론

Pulsar Flink Connector 2.7.0이 출시되었으며 모든 사용자가 Pulsar Flink Connector 2.7.0을 사용하는 것이 좋습니다. 새 버전은보다 사용자 친화적이며 Pulsar 2.7 및 Flink 1.12의 다양한 기능을 사용할 수 있습니다. Pulsar Flink Connector 2.7.0을 Flink 저장소 에 제공합니다. Pulsar Flink 커넥터에 대해 우려 사항이있는 경우 https://github.com/streamnative/pulsar-flink/issues 에서 문제를 열어 주시기 바랍니다.

저자 정보

Jianyun Zhao 는 StreamNative의 소프트웨어 엔지니어입니다. 그 전에는 Zhaopin.com에서 실시간 컴퓨팅 시스템 개발을 담당했습니다. twitter 에서 그를 팔로우 할 수 있습니다.

Jennifer Huang 는 Apache Pulsar 커미터입니다. 그녀는 StreamNative의 수석 콘텐츠 전략가로 일하며 Apache Pulsar 문서화 및 커뮤니티 성장을 담당합니다. twitter 에서 팔로우 할 수 있습니다.

이 게시물은 원래 StreamNative 블로그에 게시되었습니다. .

이 게시물이 마음에 드십니까? 추천 및 / 또는 공유해주세요.

자세히 알아 보시겠습니까? https://streamnative.io/blog 를 참조하세요. 팔로우하기 Medium에서 GitHub 를 확인하세요.