Wat is er nieuw in Pulsar Flink Connector 2.7.0

( Sijia-w) (24 dec.2020)

Leren de meest interessante en belangrijkste kenmerken van Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector biedt een ideale oplossing voor verenigde batches en streaming met Apache Pulsar en Apache Flink. Pulsar Flink Connector 2.7.0 ondersteunt functies in Pulsar 2.7 en Flink 1.12, en is volledig compatibel met het Flink-gegevensformaat.

Over Pulsar Flink Connector

Om bedrijven toegang te geven tot real- tijdgegevensinzichten, hebben ze uniforme batch- en streamingmogelijkheden nodig. Apache Flink verenigt batch- en streamverwerking in één enkele computer met “streams” als de uniforme dataweergave. Hoewel ontwikkelaars uitgebreid werk hebben verricht aan de computer- en API-lagen, is er zeer weinig werk verricht aan de gegevens- en berichten- en opslaglagen. In werkelijkheid worden gegevens echter gescheiden in gegevenssilos, gecreëerd door verschillende opslag- en berichttechnologieën. Als gevolg hiervan is er nog steeds geen enkele bron van waarheid en is de algehele werking voor de ontwikkelteams nog steeds rommelig. Om de rommelige bewerkingen aan te pakken, moeten we gegevens in streams opslaan. Apache Pulsar (samen met Apache BookKeeper) voldoet perfect aan de criteria: gegevens worden opgeslagen als één kopie (bron van de waarheid), en zijn toegankelijk in streams (via pub-sub-interfaces) en segmenten (voor batchverwerking). Wanneer Flink en Pulsar samenkomen, creëren de twee open source-technologieën een uniforme gegevensarchitectuur voor real-time gegevensgestuurde bedrijven.

De Pulsar Flink-connector biedt elastische gegevensverwerking met Apache Pulsar en Apache Flink , waardoor Apache Flink lees / schrijf gegevens van / naar Apache Pulsar. Met de Pulsar Flink Connector kunt u zich concentreren op uw bedrijfslogica zonder dat u zich zorgen hoeft te maken over de opslagdetails.

Uitdagingen

Toen we de Pulsar Flink Connector voor het eerst ontwikkelden, kreeg deze een brede acceptatie van zowel de Flink- en Pulsar-gemeenschappen. Door gebruik te maken van de Pulsar Flink-connector, heeft Hewlett Packard Enterprise (HPE) een realtime computerplatform gebouwd, BIGO heeft een real-time berichtverwerkingssysteem gebouwd en Zhihu is bezig met het beoordelen van de geschiktheid van de Connector voor een real-time computersysteem.

Naarmate meer gebruikers de Pulsar Flink Connector adopteerden, hoorden we een veelvoorkomend probleem van de gemeenschap: het is moeilijk om serialisatie en deserialisatie uit te voeren. Hoewel de Pulsar Flink-connector gebruikmaakt van Pulsar-serialisering, ondersteunden de vorige versies het Flink-gegevensformaat niet. Als gevolg hiervan moesten gebruikers veel configuraties uitvoeren om de connector te gebruiken voor real-time computing.

Om de Pulsar Flink-connector gebruiksvriendelijker te maken, hebben we besloten om de mogelijkheden volledig ondersteunen het Flink-gegevensformaat, zodat gebruikers geen tijd hoeven te besteden aan configuratie.

Wat is er nieuw in Pulsar Flink Connector 2.7.0?

De Pulsar Flink Connector 2.7.0 ondersteunt functies in Apache Pulsar 2.7.0 en Apache Flink 1.12, en is volledig compatibel met de Flink-connector en Flink-berichtformaat. Nu kunt u belangrijke functies in Flink gebruiken, zoals exact-once-sink, upsert Pulsar-mechanisme, Data Definition Language (DDL) berekende kolommen, watermerken en metagegevens. U kunt ook gebruikmaken van het Key-Shared-abonnement in Pulsar en serialisering en deserialisatie uitvoeren zonder veel configuratie. Bovendien kunt u de configuratie eenvoudig aanpassen op basis van uw bedrijf.

Hieronder introduceren we de belangrijkste functies van Pulsar Flink Connector 2.7.0 in detail.

Geordende berichtenwachtrij met hoge- prestaties

Wanneer gebruikers de volgorde van berichten strikt moesten garanderen, mocht slechts één consument berichten consumeren. Dit had een grote impact op de doorvoer. Om dit aan te pakken hebben we een Key\_Shared-abonnementsmodel ontworpen in Pulsar. Het garandeert de volgorde van berichten en verbetert de doorvoer door een sleutel aan elk bericht toe te voegen, en routeert berichten met dezelfde sleutelhash naar één consument.

Pulsar Flink Connector 2.7.0 ondersteunt het Key\_Shared-abonnementsmodel. U kunt deze functie inschakelen door enable-key-hash-range in te stellen op true. Het Key Hash-bereik dat door elke consument wordt verwerkt, wordt bepaald door het parallellisme van taken.

Introductie van exact één keer semantiek voor Pulsar-sink (gebaseerd op de Pulsar-transactie)

In eerdere versies, sink operators ondersteunden slechts ten minste één keer semantiek, die niet volledig kon voldoen aan de vereisten voor end-to-end-consistentie. Om berichten te ontdubbelen, moesten gebruikers wat vies werk doen, wat niet gebruiksvriendelijk was.

Transacties worden ondersteund in Pulsar 2.7.0, wat de fouttolerantie van Flink sink aanzienlijk zal verbeteren. In Pulsar Flink Connector 2.7.0 hebben we exact één keer semantiek ontworpen voor sink-operators op basis van Pulsar-transacties. Flink gebruikt het tweefasige commit-protocol om TwoPhaseCommitSinkFunction te implementeren. De belangrijkste methoden voor de levenscyclus zijn beginTransaction (), preCommit (), commit (), abort (), RecoverAndCommit (), RecoverAndAbort ().

U kunt semantiek flexibel selecteren bij het maken van een sink-operator, en de interne logische veranderingen zijn transparant. Pulsar-transacties zijn vergelijkbaar met het tweefasige commit-protocol in Flink, wat de betrouwbaarheid van Connector Sink aanzienlijk zal verbeteren.

Het is eenvoudig om beginTransaction en preCommit te implementeren. U hoeft alleen een Pulsar-transactie te starten en de TID van de transactie na het controlepunt vast te houden. In de preCommit-fase moet je ervoor zorgen dat alle berichten naar Pulsar worden doorgespoeld en dat berichten die vooraf zijn vastgelegd, uiteindelijk zullen worden vastgelegd.

We concentreren ons op RecoverAndCommit en RecoverAndAbort tijdens de implementatie. Beperkt door Kafka-functies, gebruikt de Kafka-connector hackstijlen voor recoverAndCommit. Pulsar-transacties zijn niet afhankelijk van de specifieke Producer, dus het is gemakkelijk voor u om transacties vast te leggen en af ​​te breken op basis van TID.

Pulsar-transacties zijn zeer efficiënt en flexibel. Door gebruik te maken van de voordelen van Pulsar en Flink, is de Pulsar Flink-connector nog krachtiger. We zullen doorgaan met het verbeteren van de transactionele sink in de Pulsar Flink-connector.

Introductie van upsert-pulsar-connector

Gebruikers in de Flink-gemeenschap spraken hun behoeften uit voor de upsert Pulsar. Na het doornemen van mailinglijsten en problemen, hebben we de volgende drie redenen samengevat.

  • Interpreteer het Pulsar-onderwerp als een changelog-stream die records met sleutels interpreteert als upsert-gebeurtenissen (ook wel insert / update genoemd).
  • Als onderdeel van de real-time pijplijn, voeg je meerdere streams samen voor verrijking en sla je resultaten op in een Pulsar-onderwerp om later verder te berekenen. Het resultaat kan echter updategebeurtenissen bevatten.
  • Als onderdeel van de real-time pijplijn, aggregeren op gegevensstromen en resultaten opslaan in een Pulsar-onderwerp voor verdere berekening later. Het resultaat kan echter update-evenementen bevatten.

Op basis van de vereisten voegen we ondersteuning toe voor Upsert Pulsar. De upsert-pulsar-connector maakt het mogelijk om gegevens te lezen van en te schrijven naar Pulsar-onderwerpen op de upsert-manier.

  • Als bron produceert de upsert-pulsar-connector een changelog-stream, waarin elk gegevensrecord vertegenwoordigt een update of verwijder evenement. Nauwkeuriger gezegd, de waarde in een gegevensrecord wordt geïnterpreteerd als een UPDATE van de laatste waarde voor dezelfde sleutel, indien aanwezig (als er nog geen corresponderende sleutel bestaat, wordt de update beschouwd als een INSERT). Met behulp van de tabelanalogie wordt een gegevensrecord in een changelog-stroom geïnterpreteerd als een UPSERT (ook bekend als INSERT / UPDATE) omdat elke bestaande rij met dezelfde sleutel wordt overschreven. Ook worden null-waarden op een speciale manier geïnterpreteerd: een record met een null-waarde vertegenwoordigt een “DELETE”.
  • Als sink kan de upsert-pulsar-connector een changelog-stroom verbruiken. Het zal INSERT / UPDATE\_AFTER-gegevens schrijven als normale Pulsar-berichtenwaarde, en DELETE-gegevens schrijven als Pulsar-berichten met nulwaarden (duidt grafsteen aan voor de sleutel). Flink garandeert de berichtvolgorde op de primaire sleutel door partitiegegevens over de waarden van de primaire sleutelkolommen, zodat de update / verwijderingsberichten op dezelfde sleutel in dezelfde partitie vallen.

Ondersteuning voor nieuwe broninterface en tabel-API geïntroduceerd in FLIP-27 en FLIP-95

Deze functie verenigt de bron van de batchstroom en optimaliseert het mechanisme voor taakdetectie en gegevenslezing. Het is ook de hoeksteen van onze implementatie van Pulsar batch- en streaming-unificatie. De nieuwe Table API ondersteunt door DDL berekende kolommen, watermerken en metadata.

Ondersteuning van SQL-metadata voor lezen en schrijven zoals beschreven in FLIP-107

FLIP-107 stelt gebruikers in staat om metagegevens van de connector te openen als een metagegevenskolom in tabeldefinities. Bij real-time computing hebben gebruikers meestal aanvullende informatie nodig, zoals eventTime, aangepaste velden. Pulsar Flink-connector ondersteunt SQL-metadata voor lezen en schrijven, dus het is flexibel en gemakkelijk voor gebruikers om metadata van Pulsar-berichten te beheren in Pulsar Flink Connector 2.7.0. Voor details over de configuratie, refereer je naar Pulsar Message metadata manipulatie .

Flink format type toevoegen atomic ter ondersteuning van Pulsar-primitieve typen

In Pulsar Flink Connector 2.7.0 voegen we Flink-indelingstype atomic toe om Pulsar-primitieve typen te ondersteunen. Als Flink-verwerking een Pulsar-primitief type vereist, kunt u atomic gebruiken als het connectorformaat. Zie https: //pulsar.apache voor meer informatie over primitieve Pulsar-typen.org / docs / en / schema-begrijp / .

Migratie

Als u de vorige Pulsar Flink Connector-versie gebruikt, moet u SQL- en API-parameters aanpassen overeenkomstig. Hieronder geven we details over elk.

SQL

In SQL hebben we de Pulsar-configuratieparameters in DDL-declaratie gewijzigd. De naam van sommige parameters is gewijzigd, maar de waarden zijn niet veranderd.

  • Verwijder het voorvoegsel connector. uit de parameternamen.
  • Wijzig de naam van de parameter connector.type in connector.
  • Wijzig de parameternaam van de opstartmodus van connector.startup-mode in scan.startup.mode.
  • Pas Pulsar-eigenschappen aan als properties.pulsar.reader.readername=testReaderName.

Als u SQL gebruikt in Pulsar Flink Connector, moet u uw SQL-configuratie dienovereenkomstig aanpassen bij het migreren naar Pulsar Flink Connector 2.7.0. Het volgende voorbeeld toont de verschillen tussen eerdere versies en de 2.7.0-versie voor SQL.

SQL in eerdere versies :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL in Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Vanuit een API-perspectief hebben we enkele klassen aangepast en gemakkelijker maatwerk mogelijk gemaakt.

  • Om serialiseringsproblemen op te lossen, hebben we de handtekening van de constructiemethode FlinkPulsarSink gewijzigd en PulsarSerializationSchema toegevoegd.
  • We hebben ongepaste klassen verwijderd die betrekking hebben op rij, zoals FlinkPulsarRowSink, FlinkPulsarRowSource. Als u te maken heeft met het rijformaat, kunt u Flink Row-gerelateerde serialisatiecomponenten gebruiken.

U kunt PulsarSerializationSchema bouwen met PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor is verplaatst naar PulsarSerializationSchemaWrapper. Wanneer u uw API aanpast, kunt u het volgende voorbeeld als referentie nemen.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Toekomstplan

Vandaag ontwerpen we een batch en stream-oplossing geïntegreerd met Pulsar Source, gebaseerd op de nieuwe Flink Source API (FLIP-27). De nieuwe oplossing zal de beperkingen van de huidige streaming-broninterface (SourceFunction) ontsluiten en tegelijkertijd de broninterfaces tussen de batch- en streaming-APIs verenigen.

Pulsar biedt een hiërarchische architectuur waarin gegevens worden onderverdeeld in streaming, batch, en koude gegevens, waardoor Pulsar een oneindige capaciteit kan bieden. Dit maakt Pulsar een ideale oplossing voor unified batch en streaming.

De batch- en stream-oplossing op basis van de nieuwe Flink Source API is opgedeeld in twee eenvoudige delen: SplitEnumerator en Reader. SplitEnumerator ontdekt partities en wijst ze toe, en Reader leest gegevens van de partitie.

Pulsar slaat berichten op in het grootboekblok, en u kunt de grootboeken lokaliseren via Pulsar admin, en dan de brokerpartitie, BookKeeper-partitie, Offloader-partitie en andere informatie verstrekken via verschillende partitioneringsbeleidslijnen. Voor meer details, refereer je naar https://github.com/streamnative/pulsar-flink/issues/187 .

Conclusie

Pulsar Flink Connector 2.7.0 is vrijgegeven en we raden iedereen sterk aan om Pulsar Flink Connector 2.7.0 te gebruiken. De nieuwe versie is gebruiksvriendelijker en heeft verschillende functies in Pulsar 2.7 en Flink 1.12. We dragen Pulsar Flink Connector 2.7.0 bij aan Flink-repository . Als u zich zorgen maakt over Pulsar Flink Connector, kunt u problemen openen in https://github.com/streamnative/pulsar-flink/issues .

Over de auteur

Jianyun Zhao is een software-engineer bij StreamNative. Daarvoor was hij verantwoordelijk voor de ontwikkeling van een real-time computersysteem bij Zhaopin.com. Je kunt hem volgen op twitter .

Jennifer Huang is een Apache Pulsar-committer. Ze werkt als senior contentstrateeg bij StreamNative, verantwoordelijk voor de documentatie van Apache Pulsar en de groei van de community. Je kunt haar volgen op twitter .

Dit bericht is oorspronkelijk gepubliceerd op StreamNative-blog .

Vind je dit bericht leuk? Gelieve aan te bevelen en / of te delen.

Wilt u meer weten? Zie https://streamnative.io/blog . Volg ons op Medium en bekijk onze GitHub .