Neue Funktionen in Pulsar Flink Connector 2.7.0

( Sijia-w) (24. Dezember 2020)

Lernen Die interessantesten und wichtigsten Funktionen von Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector bietet eine ideale Lösung für einheitliches Batch und Streaming mit Apache Pulsar und Apache Flink. Pulsar Flink Connector 2.7.0 unterstützt Funktionen in Pulsar 2.7 und Flink 1.12 und ist vollständig kompatibel mit dem Flink-Datenformat.

Informationen zu Pulsar Flink Connector

Damit Unternehmen auf real zugreifen können Zeitdatenerkenntnisse benötigen einheitliche Batch- und Streaming-Funktionen. Apache Flink vereint die Stapel- und Stream-Verarbeitung in einer einzigen Computer-Engine mit „Streams“ als einheitlicher Datendarstellung. Obwohl Entwickler umfangreiche Arbeiten auf der Computer- und API-Ebene durchgeführt haben, wurde auf den Daten-, Messaging- und Speicherebenen nur sehr wenig Arbeit geleistet. In der Realität werden Daten jedoch in Datensilos aufgeteilt, die durch verschiedene Speicher- und Messaging-Technologien erstellt werden. Infolgedessen gibt es immer noch keine einzige Quelle der Wahrheit und der Gesamtbetrieb für die Entwicklerteams ist immer noch chaotisch. Um die chaotischen Vorgänge zu beheben, müssen wir Daten in Streams speichern. Apache Pulsar (zusammen mit Apache BookKeeper) erfüllt die Kriterien perfekt: Daten werden als eine Kopie (Quelle der Wahrheit) gespeichert und können in Streams (über Pub-Sub-Schnittstellen) und Segmenten (für die Stapelverarbeitung) abgerufen werden. Wenn Flink und Pulsar zusammenkommen, erstellen die beiden Open Source-Technologien eine einheitliche Datenarchitektur für datengesteuerte Echtzeitunternehmen.

Der Pulsar Flink-Connector bietet elastische Datenverarbeitung mit Apache Pulsar und Apache Flink , wodurch Apache Flink dies ermöglicht Daten von / zu Apache Pulsar lesen / schreiben. Mit dem Pulsar Flink Connector können Sie sich auf Ihre Geschäftslogik konzentrieren, ohne sich um die Speicherdetails kümmern zu müssen.

Herausforderungen

Als wir den Pulsar Flink Connector zum ersten Mal entwickelten, fand er breite Akzeptanz bei beiden Flink- und Pulsar-Gemeinschaften. Unter Verwendung des Pulsar Flink-Anschlusses baute Hewlett Packard Enterprise (HPE) eine Echtzeit-Computerplattform, BIGO hat ein Echtzeit-Nachrichtenverarbeitungssystem erstellt, und Zhihu prüft derzeit die Eignung des Connectors für ein Echtzeit-Computersystem.

Als immer mehr Benutzer den Pulsar Flink Connector verwendeten, hörten wir ein häufiges Problem in der Community: Es ist schwierig, Serialisierung und Deserialisierung durchzuführen. Während der Pulsar Flink-Anschluss die Pulsar-Serialisierung nutzt, haben die vorherigen Versionen das Flink-Datenformat nicht unterstützt. Infolgedessen mussten Benutzer viele Konfigurationen vornehmen, um den Connector für Echtzeit-Computing verwenden zu können.

Um die Verwendung des Pulsar Flink-Connectors zu vereinfachen, haben wir uns entschlossen, die Funktionen vollständig zu erweitern unterstützt das Flink-Datenformat, sodass Benutzer keine Zeit für die Konfiguration aufwenden müssen.

Was ist neu in Pulsar Flink Connector 2.7.0?

Der Pulsar Flink Connector 2.7.0 unterstützt Funktionen in Apache Pulsar 2.7.0 und Apache Flink 1.12 und ist vollständig kompatibel mit dem Flink-Connector und dem Flink-Nachrichtenformat. Jetzt können Sie wichtige Funktionen in Flink verwenden, z. B. genau einmalige Senke, Upsert-Pulsar-Mechanismus, DDL-berechnete Spalten (Data Definition Language), Wasserzeichen und Metadaten. Sie können auch das Key-Shared-Abonnement in Pulsar nutzen und Serialisierung und Deserialisierung ohne viel Konfiguration durchführen. Darüber hinaus können Sie die Konfiguration einfach an Ihr Unternehmen anpassen.

Im Folgenden werden die wichtigsten Funktionen von Pulsar Flink Connector 2.7.0 ausführlich vorgestellt.

Bestellte Nachrichtenwarteschlange mit hoher- Leistung

Wenn Benutzer die Reihenfolge der Nachrichten streng garantieren mussten, durfte nur ein Verbraucher Nachrichten konsumieren. Dies hatte schwerwiegende Auswirkungen auf den Durchsatz. Um dies zu beheben, haben wir in Pulsar ein Key\_Shared-Abonnementmodell entwickelt. Es garantiert die Reihenfolge der Nachrichten und verbessert den Durchsatz, indem jeder Nachricht ein Schlüssel hinzugefügt wird, und leitet Nachrichten mit demselben Schlüssel-Hash an einen Verbraucher weiter.

Pulsar Flink Connector 2.7.0 unterstützt das Abonnementmodell Key\_Shared. Sie können diese Funktion aktivieren, indem Sie enable-key-hash-range auf true setzen. Der von jedem Verbraucher verarbeitete Key Hash-Bereich wird durch die Parallelität der Aufgaben bestimmt.

Einführung einer genau einmaligen Semantik für die Pulsar-Senke (basierend auf der Pulsar-Transaktion)

In früheren Versionen sinkt Die Betreiber unterstützten nur mindestens einmal die Semantik, die die Anforderungen an die End-to-End-Konsistenz nicht vollständig erfüllen konnte. Um Nachrichten zu deduplizieren, mussten Benutzer einige schmutzige Arbeiten ausführen, die nicht benutzerfreundlich waren.

Transaktionen werden in Pulsar 2.7.0 unterstützt, wodurch die Fehlertoleranzfähigkeit der Flink-Senke erheblich verbessert wird. In Pulsar Flink Connector 2.7.0 haben wir eine genau einmalige Semantik für Senkenbetreiber basierend auf Pulsar-Transaktionen entwickelt. Flink verwendet das Zwei-Phasen-Festschreibungsprotokoll, um TwoPhaseCommitSinkFunction zu implementieren. Die wichtigsten Lebenszyklusmethoden sind beginTransaction (), preCommit (), commit (), abort (), recoveryAndCommit (), recoveryAndAbort ().

Sie können die Semantik beim Erstellen eines Senkenoperators flexibel auswählen Interne Logikänderungen sind transparent. Pulsar-Transaktionen ähneln dem Zwei-Phasen-Festschreibungsprotokoll in Flink, wodurch die Zuverlässigkeit von Connector Sink erheblich verbessert wird.

Es ist einfach, beginTransaction und preCommit zu implementieren. Sie müssen nur eine Pulsar-Transaktion starten und die TID der Transaktion nach dem Prüfpunkt beibehalten. In der PreCommit-Phase müssen Sie sicherstellen, dass alle Nachrichten an Pulsar gesendet werden und dass vorab festgeschriebene Nachrichten schließlich festgeschrieben werden.

Wir konzentrieren uns bei der Implementierung auf recoveryAndCommit und recoveryAndAbort. Der Kafka-Connector ist durch die Kafka-Funktionen eingeschränkt und verwendet Hack-Stile für recoveryAndCommit. Pulsar-Transaktionen hängen nicht vom jeweiligen Hersteller ab, daher können Sie Transaktionen basierend auf der TID leicht festschreiben und abbrechen.

Pulsar-Transaktionen sind hocheffizient und flexibel. Der Pulsar Flink-Anschluss nutzt die Vorteile von Pulsar und Flink und ist noch leistungsstärker. Wir werden die Transaktionssenke im Pulsar Flink-Konnektor weiter verbessern.

Einführung des Upsert-Pulsar-Konnektors

Benutzer in der Flink-Community haben ihre Bedürfnisse nach dem Upsert Pulsar zum Ausdruck gebracht. Nach dem Durchsuchen von Mailinglisten und Problemen haben wir die folgenden drei Gründe zusammengefasst.

  • Interpretieren Sie das Pulsar-Thema als Änderungsprotokoll-Stream, der Datensätze mit Schlüsseln als Upsert-Ereignisse (auch als Einfügen / Aktualisieren bezeichnet) interpretiert.
  • Verbinden Sie als Teil der Echtzeit-Pipeline mehrere Streams zur Anreicherung und speichern Sie die Ergebnisse in einem Pulsar-Thema, um sie später weiter zu berechnen. Das Ergebnis kann jedoch Aktualisierungsereignisse enthalten.
  • Als Teil der Echtzeit-Pipeline können Sie Datenströme aggregieren und die Ergebnisse zur späteren Berechnung in einem Pulsar-Thema speichern. Das Ergebnis kann jedoch Aktualisierungsereignisse enthalten.

Basierend auf den Anforderungen fügen wir Unterstützung für Upsert Pulsar hinzu. Der Upsert-Pulsar-Konnektor ermöglicht das Lesen und Schreiben von Daten aus Pulsar-Themen auf Upsert-Weise.

  • Als Quelle erzeugt der Upsert-Pulsar-Konnektor einen Änderungsprotokollstrom, den jeder Datensatz darstellt ein Aktualisierungs- oder Löschereignis. Genauer gesagt wird der Wert in einem Datensatz als UPDATE des letzten Werts für denselben Schlüssel interpretiert, sofern vorhanden (falls noch kein entsprechender Schlüssel vorhanden ist, wird die Aktualisierung als EINFÜGUNG betrachtet). Unter Verwendung der Tabellenanalogie wird ein Datensatz in einem Änderungsprotokollstrom als UPSERT (auch bekannt als INSERT / UPDATE) interpretiert, da jede vorhandene Zeile mit demselben Schlüssel überschrieben wird. Außerdem werden Nullwerte auf besondere Weise interpretiert: Ein Datensatz mit einem Nullwert stellt ein „LÖSCHEN“ dar.
  • Als Senke kann der Upsert-Pulsar-Konnektor einen Changelog-Stream verbrauchen. Es schreibt INSERT / UPDATE\_AFTER-Daten als normalen Pulsar-Nachrichtenwert und DELETE-Daten als Pulsar-Nachrichten mit Nullwerten (geben Sie den Tombstone für den Schlüssel an). Flink garantiert die Nachrichtenreihenfolge auf dem Primärschlüssel anhand von Partitionsdaten für die Werte der Primärschlüsselspalten, sodass die Aktualisierungs- / Löschnachrichten auf demselben Schlüssel in dieselbe Partition fallen.

Unterstützung der neuen Quellschnittstelle und Tabellen-API, die in FLIP-27 und FLIP-95 eingeführt wurden h2>

Diese Funktion vereinheitlicht die Quelle des Batch-Streams und optimiert den Mechanismus für die Aufgabenerkennung und das Lesen von Daten. Es ist auch der Eckpfeiler unserer Implementierung von Pulsar Batch und Streaming Unification. Die neue Tabellen-API unterstützt DDL-berechnete Spalten, Wasserzeichen und Metadaten.

Unterstützt SQL-Lese- und Schreibmetadaten, wie in FLIP-107 beschrieben. Mit h2>

FLIP-107 können Benutzer auf Connector-Metadaten als Metadatenspalte in Tabellendefinitionen zugreifen. Beim Echtzeit-Computing benötigen Benutzer normalerweise zusätzliche Informationen, z. B. eventTime und benutzerdefinierte Felder. Pulsar Flink Connector unterstützt SQL-Lese- und Schreibmetadaten, sodass Benutzer Metadaten von Pulsar-Nachrichten in Pulsar Flink Connector 2.7.0 flexibel und einfach verwalten können. Ausführliche Informationen zur Konfiguration finden Sie unter Manipulation von Pulsar Message-Metadaten .

Flink-Formattyp hinzufügen atomic zur Unterstützung von Pulsar-Primitivtypen

In Pulsar Flink Connector 2.7.0 fügen wir den Flink-Formattyp atomic zur Unterstützung von Pulsar-Primitivtypen hinzu. Wenn für die Flink-Verarbeitung ein primitiver Pulsar-Typ erforderlich ist, können Sie atomic als Konnektorformat verwenden. Weitere Informationen zu primitiven Pulsar-Typen finden Sie unter https: //pulsar.apache.org / docs / de / schema-verstehe / .

Migration

Wenn Sie die vorherige Version von Pulsar Flink Connector verwenden, müssen Sie die SQL- und API-Parameter anpassen entsprechend. Im Folgenden finden Sie Details zu den einzelnen Elementen.

SQL

In SQL haben wir die Pulsar-Konfigurationsparameter in der DDL-Deklaration geändert. Der Name einiger Parameter wird geändert, die Werte jedoch nicht.

  • Entfernen Sie das Präfix connector. aus den Parameternamen.
  • Ändern Sie den Namen des Parameters connector.type in connector.
  • Ändern Sie den Namen des Startmodusparameters von connector.startup-mode in scan.startup.mode.
  • Passen Sie die Pulsareigenschaften als properties.pulsar.reader.readername=testReaderName an.

Wenn Sie SQL in Pulsar Flink Connector verwenden, müssen Sie Ihre SQL-Konfiguration bei der Migration auf Pulsar Flink Connector 2.7.0 entsprechend anpassen. Das folgende Beispiel zeigt die Unterschiede zwischen früheren Versionen und der Version 2.7.0 für SQL.

SQL in früheren Versionen:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL In Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Aus API-Sicht haben wir einige Klassen angepasst und eine einfachere Anpassung ermöglicht.

  • Um Serialisierungsprobleme zu lösen, haben wir die Signatur der Konstruktionsmethode FlinkPulsarSink geändert und PulsarSerializationSchema hinzugefügt.
  • Wir haben unangemessene Klassen im Zusammenhang mit Zeilen entfernt, z. B. FlinkPulsarRowSink, FlinkPulsarRowSource. Wenn Sie sich mit dem Zeilenformat befassen müssen, können Sie Flink Row-bezogene Serialisierungskomponenten verwenden.

Sie können PulsarSerializationSchema mithilfe von PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor wird in PulsarSerializationSchemaWrapper verschoben. Wenn Sie Ihre API anpassen, können Sie das folgende Beispiel als Referenz verwenden.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Zukunftsplan

Heute entwerfen wir einen Stapel und in Pulsar Source integrierte Stream-Lösung, basierend auf der neuen Flink Source API (FLIP-27). Die neue Lösung wird Einschränkungen der aktuellen Streaming-Quellschnittstelle (SourceFunction) aufheben und gleichzeitig die Quellschnittstellen zwischen der Batch- und der Streaming-API vereinheitlichen.

Pulsar bietet eine hierarchische Architektur, bei der Daten in Streaming, Batch, unterteilt werden. und kalte Daten, die es Pulsar ermöglichen, unendliche Kapazität bereitzustellen. Dies macht Pulsar zu einer idealen Lösung für einheitliches Batch und Streaming.

Die Batch- und Stream-Lösung, die auf der neuen Flink Source-API basiert, ist in zwei einfache Teile unterteilt: SplitEnumerator und Reader. SplitEnumerator erkennt und weist Partitionen zu und Reader liest Daten von der Partition.

Pulsar speichert Nachrichten im Ledger-Block. Sie können die Ledger über den Pulsar-Administrator suchen und anschließend über verschiedene Partitionierungsrichtlinien Brokerpartition, BookKeeper-Partition, Offloader-Partition und andere Informationen bereitstellen. Weitere Informationen finden Sie unter https://github.com/streamnative/pulsar-flink/issues/187 .

Schlussfolgerung

Pulsar Flink Connector 2.7.0 ist freigegeben und wir empfehlen allen dringend, Pulsar Flink Connector 2.7.0 zu verwenden. Die neue Version ist benutzerfreundlicher und verfügt über verschiedene Funktionen in Pulsar 2.7 und Flink 1.12. Wir werden Pulsar Flink Connector 2.7.0 zum Flink-Repository beitragen. Wenn Sie Bedenken bezüglich Pulsar Flink Connector haben, können Sie Probleme unter https://github.com/streamnative/pulsar-flink/issues öffnen.

Über den Autor

Jianyun Zhao ist Softwareentwickler bei StreamNative. Zuvor war er bei Zhaopin.com für die Entwicklung eines Echtzeit-Computersystems verantwortlich. Sie können ihm auf Twitter folgen.

Jennifer Huang ist ein Apache Pulsar Committer. Sie arbeitet als Senior Content Strategistin bei StreamNative und ist verantwortlich für die Dokumentation von Apache Pulsar und das Wachstum der Community. Sie können ihr auf twitter folgen.

Dieser Beitrag wurde ursprünglich auf StreamNative-Blog veröffentlicht .

Gefällt dir dieser Beitrag? Bitte empfehlen und / oder teilen.

Möchten Sie mehr erfahren? Siehe https://streamnative.io/blog . Folgen Sie uns auf Medium und sehen Sie sich unsere GitHub an.