Co nowego w Pulsar Flink Connector 2.7.0

( Sijia-w) (24 grudnia 2020 r.)

Dowiedz się najciekawsze i najważniejsze cechy Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector zapewnia idealne rozwiązanie do ujednoliconego wsadowania i przesyłania strumieniowego z Apache Pulsar i Apache Flink. Pulsar Flink Connector 2.7.0 obsługuje funkcje w Pulsar 2.7 i Flink 1.12 i jest w pełni kompatybilny z formatem danych Flink.

O Pulsar Flink Connector

Aby firmy miały dostęp do rzeczywistych wgląd w dane czasowe, potrzebują ujednoliconych możliwości wsadowych i strumieniowych. Apache Flink ujednolica przetwarzanie wsadowe i strumieniowe w jeden silnik obliczeniowy ze „strumieniami” jako ujednoliconą reprezentacją danych. Chociaż programiści wykonali rozległą pracę w warstwie obliczeniowej i warstwie API, wykonano bardzo niewiele pracy w warstwach danych, przesyłania wiadomości i pamięci masowej. Jednak w rzeczywistości dane są segregowane w silosy danych, utworzone przez różne technologie przechowywania i przesyłania wiadomości. W rezultacie nadal nie ma jednego źródła prawdy, a ogólne działania zespołów programistów są nadal chaotyczne. Aby poradzić sobie z bałaganiarskimi operacjami, musimy przechowywać dane w strumieniach. Apache Pulsar (razem z Apache BookKeeper) doskonale spełnia kryteria: dane są przechowywane jako jedna kopia (źródło prawdy) i mogą być dostępne w strumieniach (przez interfejsy pub-sub) i segmentach (do przetwarzania wsadowego). Kiedy firmy Flink i Pulsar spotykają się, obie technologie open source tworzą ujednoliconą architekturę danych dla firm opartych na danych w czasie rzeczywistym.

Pulsar Flink connector zapewnia elastyczne przetwarzanie danych za pomocą Apache Pulsar i Apache Flink , umożliwiając Apache Flink odczyt / zapis danych z / do Apache Pulsar. Pulsar Flink Connector pozwala skoncentrować się na logice biznesowej bez martwienia się o szczegóły pamięci masowej.

Wyzwania

Kiedy po raz pierwszy opracowaliśmy Pulsar Flink Connector, uzyskał szerokie zastosowanie zarówno w Społeczności Flink i Pulsar. Korzystając ze złącza Pulsar Flink, Hewlett Packard Enterprise (HPE) stworzył platformę obliczeniową czasu rzeczywistego, BIGO zbudował system przetwarzania wiadomości w czasie rzeczywistym, a Zhihu jest w trakcie oceny dopasowania łącznika do systemu przetwarzania w czasie rzeczywistym.

Ponieważ coraz więcej użytkowników przyjęło Pulsar Flink Connector, usłyszeliśmy od społeczności częsty problem: trudno jest przeprowadzić serializację i deserializację. Podczas gdy złącze Pulsar Flink wykorzystuje serializację Pulsar, poprzednie wersje nie obsługiwały formatu danych Flink. W rezultacie użytkownicy musieli wykonać wiele konfiguracji, aby używać konektora do wykonywania obliczeń w czasie rzeczywistym.

Aby ułatwić korzystanie z łącznika Pulsar Flink, zdecydowaliśmy się zbudować możliwości w pełni obsługują format danych Flink, więc użytkownicy nie muszą tracić czasu na konfigurację.

Co nowego w Pulsar Flink Connector 2.7.0?

Pulsar Flink Connector 2.7.0 obsługuje funkcje w Apache Pulsar 2.7.0 i Apache Flink 1.12 i jest w pełni kompatybilny ze złączem Flink i formatem wiadomości Flink. Teraz możesz używać ważnych funkcji w Flink, takich jak ujście dokładnie raz, mechanizm upsert Pulsar, kolumny obliczane w języku DDL (Data Definition Language), znaki wodne i metadane. Możesz również skorzystać z subskrypcji Key-Shared w Pulsar i przeprowadzić serializację i deserializację bez dużej konfiguracji. Dodatkowo możesz łatwo dostosować konfigurację w zależności od Twojej firmy.

Poniżej szczegółowo przedstawiamy najważniejsze funkcje Pulsar Flink Connector 2.7.0.

Uporządkowana kolejka wiadomości z wysoką wydajność

Gdy użytkownicy musieli ściśle zagwarantować kolejność wiadomości, tylko jeden konsument mógł korzystać z wiadomości. Miało to poważny wpływ na przepustowość. Aby rozwiązać ten problem, zaprojektowaliśmy model subskrypcji Key\_Shared w programie Pulsar. Gwarantuje kolejność wiadomości i poprawia przepustowość, dodając klucz do każdej wiadomości i kieruje wiadomości z tym samym skrótem klucza do jednego odbiorcy.

Pulsar Flink Connector 2.7.0 obsługuje model subskrypcji Key\_Shared. Możesz włączyć tę funkcję, ustawiając enable-key-hash-range na true. O zakresie Key Hash przetwarzanym przez każdego konsumenta decyduje równoległość zadań.

Wprowadzenie semantyki dokładnie jednokrotnie dla ujścia Pulsara (na podstawie transakcji Pulsar)

W poprzednich wersjach sink operatorzy obsługiwali tylko semantykę co najmniej raz, która nie mogła w pełni spełnić wymagań dotyczących spójności typu end-to-end. Aby usunąć duplikaty wiadomości, użytkownicy musieli wykonać brudną robotę, która nie była przyjazna dla użytkownika.

Transakcje są obsługiwane w Pulsar 2.7.0, co znacznie poprawi odporność na błędy w Flink sink. W Pulsar Flink Connector 2.7.0 zaprojektowaliśmy dokładnie jednokrotnie semantykę dla operatorów zlewu w oparciu o transakcje Pulsar. Flink używa protokołu zatwierdzania dwufazowego do implementacji TwoPhaseCommitSinkFunction. Główne metody cyklu życia to beginTransaction (), preCommit (), commit (), abort (), recoveryAndCommit (), recoveryAndAbort ().

Możesz elastycznie wybierać semantykę podczas tworzenia operatora ujścia, a wewnętrzne zmiany logiczne są przejrzyste. Transakcje Pulsar są podobne do protokołu zatwierdzania dwufazowego we Flink, co znacznie poprawi niezawodność Connector Sink.

Implementacja beginTransaction i preCommit jest łatwa. Musisz tylko rozpocząć transakcję Pulsar i zachować TID transakcji po punkcie kontrolnym. W fazie preCommit musisz upewnić się, że wszystkie wiadomości są przesyłane do Pulsara, a wiadomości wstępnie zatwierdzone zostaną ostatecznie zatwierdzone.

Skupiamy się na implementacji recoveryAndCommit i recoveryAndAbort. Ograniczone przez funkcje platformy Kafka, łącznik Kafka przyjmuje style hakerskie do odzyskiwania i zatwierdzania. Transakcje Pulsar nie zależą od konkretnego Producenta, więc możesz łatwo zatwierdzać i przerywać transakcje na podstawie TID.

Transakcje Pulsar są bardzo wydajne i elastyczne. Wykorzystując zalety Pulsar i Flink, złącze Pulsar Flink jest jeszcze mocniejsze. Będziemy nadal ulepszać transakcyjne ujście w złączu Pulsar Flink.

Przedstawiamy złącze upsert-pulsar

Użytkownicy społeczności Flink wyrazili swoje zapotrzebowanie na najwyższy Pulsar. Po przejrzeniu list mailingowych i problemów podsumowaliśmy trzy następujące powody.

  • Interpretuj temat Pulsara jako strumień dziennika zmian, który interpretuje rekordy z kluczami jako zdarzenia upsert (aka wstawianie / aktualizowanie).
  • Jako część potoku czasu rzeczywistego, dołącz wiele strumieni w celu wzbogacenia i zapisz wyniki w temacie Pulsar w celu późniejszego obliczenia. Jednak wynik może zawierać zdarzenia aktualizacji.
  • W ramach potoku czasu rzeczywistego agreguj strumienie danych i przechowuj wyniki w temacie Pulsar w celu późniejszego obliczenia. Jednak wynik może zawierać zdarzenia aktualizacji.

W oparciu o wymagania dodajemy obsługę Upsert Pulsar. Złącze upsert-pulsar umożliwia odczytywanie i zapisywanie danych w tematach Pulsar w odwrotny sposób.

  • Jako źródło, złącze upsert-pulsar tworzy strumień zmian, w którym każdy rekord danych reprezentuje aktualizację lub usunięcie wydarzenia. Dokładniej, wartość w rekordzie danych jest interpretowana jako AKTUALIZACJA ostatniej wartości tego samego klucza, jeśli taki istnieje (jeśli odpowiadający klucz jeszcze nie istnieje, aktualizacja będzie traktowana jako WSTAW). Stosując analogię do tabeli, rekord danych w strumieniu dziennika zmian jest interpretowany jako UPSERT (inaczej INSERT / UPDATE), ponieważ każdy istniejący wiersz z tym samym kluczem jest nadpisywany. Ponadto wartości null są interpretowane w specjalny sposób: rekord z wartością zerową reprezentuje „DELETE”.
  • Jako ujście, złącze upsert-pulsar może zużywać strumień dziennika zmian. Zapisuje dane INSERT / UPDATE\_AFTER jako normalną wartość wiadomości Pulsar i zapisuje dane DELETE jako wiadomości Pulsar z wartościami zerowymi (wskazuje tombstone dla klucza). Flink zagwarantuje kolejność komunikatów na kluczu podstawowym według danych partycji w wartościach kolumn klucza podstawowego, więc komunikaty o aktualizacji / usunięciu z tego samego klucza będą znajdować się w tej samej partycji.

Obsługa nowego interfejsu źródłowego i interfejsu Table API wprowadzonego w FLIP-27 i FLIP-95

Ta funkcja ujednolica źródło strumienia wsadowego i optymalizuje mechanizm wykrywania zadań i odczytu danych. Jest to również kamień węgielny naszego wdrożenia ujednolicenia wsadowego i strumieniowego Pulsara. Nowy interfejs Table API obsługuje kolumny obliczeniowe DDL, znaki wodne i metadane.

Obsługa metadanych odczytu i zapisu SQL zgodnie z opisem w FLIP-107

FLIP-107 umożliwia użytkownikom dostęp do metadanych konektora jako kolumny metadanych w definicjach tabel. W przypadku przetwarzania w czasie rzeczywistym użytkownicy zwykle potrzebują dodatkowych informacji, takich jak czas zdarzenia, pola niestandardowe. Złącze Pulsar Flink obsługuje metadane odczytu i zapisu SQL, więc jest elastyczne i łatwe dla użytkowników w zarządzaniu metadanymi wiadomości Pulsar w Pulsar Flink Connector 2.7.0. Aby uzyskać szczegółowe informacje na temat konfiguracji, zobacz Manipulowanie metadanymi wiadomości Pulsar .

Dodaj typ formatu Flink atomic do obsługi typów pierwotnych Pulsara

W łączniku Pulsar Flink 2.7.0 dodajemy typ formatu Flink atomic, aby obsługiwać typy prymitywne Pulsar. Gdy przetwarzanie Flink wymaga typu prymitywnego Pulsar, możesz użyć atomic jako formatu łącznika. Aby uzyskać więcej informacji na temat typów prymitywnych Pulsara, zobacz https: //pulsar.apache.org / docs / pl / schema-zrozumieć / .

Migracja

Jeśli używasz poprzedniej wersji Pulsar Flink Connector, musisz dostosować parametry SQL i API odpowiednio. Poniżej podajemy szczegółowe informacje na temat każdego z nich.

SQL

W języku SQL zmieniliśmy parametry konfiguracyjne Pulsara w deklaracji DDL. Nazwy niektórych parametrów są zmieniane, ale wartości nie ulegają zmianie.

  • Usuń prefiks connector. z nazw parametrów.
  • Zmień nazwę parametru connector.type na connector.
  • Zmień nazwę parametru trybu uruchamiania z connector.startup-mode do scan.startup.mode.
  • Dostosuj właściwości Pulsara jako properties.pulsar.reader.readername=testReaderName.

Jeśli używasz SQL w Pulsar Flink Connector, musisz odpowiednio dostosować konfigurację SQL podczas migracji do Pulsar Flink Connector 2.7.0. Poniższy przykład przedstawia różnice między poprzednimi wersjami a wersją 2.7.0 dla SQL.

SQL w poprzednich wersjach :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL w Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Z punktu widzenia API dostosowaliśmy niektóre klasy i włączyliśmy łatwiejsze dostosowywanie.

  • Aby rozwiązać problemy z serializacją, zmieniliśmy sygnaturę metody konstrukcji FlinkPulsarSink i dodaliśmy PulsarSerializationSchema.
  • Usunęliśmy nieodpowiednie klasy związane z wierszem, takie jak FlinkPulsarRowSink, FlinkPulsarRowSource. Jeśli potrzebujesz poradzić sobie z formatem Row, możesz użyć komponentów serializacji związanych z Flink Row.

Możesz zbudować PulsarSerializationSchema używając . TopicKeyExtractor jest przeniesiony do PulsarSerializationSchemaWrapper. Podczas dostosowywania interfejsu API możesz wziąć następującą próbkę jako odniesienie.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Plan na przyszłość

Dzisiaj projektujemy pakiet i rozwiązanie strumieniowe zintegrowane z Pulsar Source, oparte na nowym Flink Source API (FLIP-27). Nowe rozwiązanie odblokuje ograniczenia obecnego interfejsu źródła strumieniowego (SourceFunction) i jednocześnie ujednolici interfejsy źródłowe między interfejsami API wsadowymi i strumieniowymi.

Pulsar oferuje hierarchiczną architekturę, w której dane są podzielone na strumieniowanie, wsad i zimne dane, które umożliwiają Pulsarowi zapewnienie nieskończonej wydajności. To sprawia, że ​​Pulsar jest idealnym rozwiązaniem do ujednolicenia wsadowego i strumieniowego.

Rozwiązanie wsadowe i strumieniowe oparte na nowym Flink Source API jest podzielone na dwie proste części: SplitEnumerator i Reader. SplitEnumerator wykrywa i przypisuje partycje, a program Reader odczytuje dane z partycji.

Pulsar przechowuje wiadomości w bloku księgi i możesz zlokalizować księgi przez administratora Pulsara, a następnie udostępnić partycję brokera, partycję BookKeeper, partycję Offloader i inne informacje za pomocą różnych zasad partycjonowania. Aby uzyskać więcej informacji, zobacz https://github.com/streamnative/pulsar-flink/issues/187 .

Wniosek

Pulsar Flink Connector 2.7.0 został wydany i gorąco zachęcamy wszystkich do korzystania z Pulsar Flink Connector 2.7.0. Nowa wersja jest bardziej przyjazna dla użytkownika i obsługuje różne funkcje w programach Pulsar 2.7 i Flink 1.12. Przekażemy Pulsar Flink Connector 2.7.0 do repozytorium Flink . Jeśli masz jakiekolwiek obawy dotyczące Pulsar Flink Connector, możesz otworzyć problemy w https://github.com/streamnative/pulsar-flink/issues .

O autorze

Jianyun Zhao jest inżynierem oprogramowania w StreamNative. Wcześniej był odpowiedzialny za rozwój systemu przetwarzania w czasie rzeczywistym w Zhaopin.com. Możesz śledzić go na twitterze .

Jennifer Huang jest programistą Apache Pulsar. Pracuje jako starszy strateg treści w StreamNative, odpowiedzialna za dokumentację Apache Pulsar i rozwój społeczności. Możesz ją śledzić na twitterze .

Ten post został pierwotnie opublikowany na blogu StreamNative .

Podoba Ci się ten post? Proszę polecać i / lub udostępniać.

Chcesz dowiedzieć się więcej? Zobacz https://streamnative.io/blog . Obserwuj nas na Medium i odwiedź nasz GitHub .