Noutăți în Pulsar Flink Connector 2.7.0

( Sijia-w) (24 dec. 2020)

Aflați cele mai interesante și majore caracteristici despre Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector oferă o soluție ideală pentru loturi unificate și streaming cu Apache Pulsar și Apache Flink. Pulsar Flink Connector 2.7.0 acceptă funcții în Pulsar 2.7 și Flink 1.12 și este pe deplin compatibil cu formatul de date Flink.

Despre Pulsar Flink Connector

Pentru ca companiile să acceseze real- pentru a obține informații despre timp, au nevoie de capabilități unificate și de streaming. Apache Flink unifică procesarea batch și stream într-un singur motor de calcul cu „fluxuri” ca reprezentare de date unificată. Deși dezvoltatorii au făcut o muncă extinsă în straturile de calcul și API, s-a lucrat foarte puțin la straturile de date și mesagerie și stocare. Cu toate acestea, în realitate, datele sunt separate în silozuri de date, create de diverse tehnologii de stocare și mesagerie. Ca urmare, nu există încă o singură sursă de adevăr și operațiunea generală pentru echipele de dezvoltatori este încă dezordonată. Pentru a aborda operațiunile dezordonate, trebuie să stocăm date în fluxuri. Apache Pulsar (împreună cu Apache BookKeeper) îndeplinește perfect criteriile: datele sunt stocate ca o singură copie (sursă de adevăr) și pot fi accesate în fluxuri (prin interfețe pub-sub) și segmente (pentru procesarea batch). Când Flink și Pulsar se reunesc, cele două tehnologii open source creează o arhitectură de date unificată pentru întreprinderile bazate pe date în timp real.

Conectorul Pulsar Flink oferă prelucrare elastică a datelor cu Apache Pulsar și Apache Flink , permițând Apache Flink să citiți / scrieți date de la / către Apache Pulsar. Conectorul Pulsar Flink vă permite să vă concentrați asupra logicii afacerii dvs. fără să vă faceți griji cu privire la detaliile de stocare.

Provocări

Când am dezvoltat pentru prima dată Pulsar Flink Connector, acesta a primit o adoptare largă atât din partea Comunitățile Flink și Pulsar. Folosind conectorul Pulsar Flink, Hewlett Packard Enterprise (HPE) a construit o platformă de calcul în timp real, BIGO a construit un sistem de procesare a mesajelor în timp real și Zhihu este în curs de evaluare a potrivirii conectorului pentru un sistem de calcul în timp real.

Pe măsură ce mai mulți utilizatori au adoptat Pulsar Flink Connector, am auzit o problemă comună din partea comunității: este greu să faci serializare și deserializare. În timp ce conectorul Pulsar Flink utilizează serializarea Pulsar, versiunile anterioare nu au acceptat formatul de date Flink. Drept urmare, utilizatorii au trebuit să facă o mulțime de configurații pentru a utiliza conectorul pentru a face calcule în timp real.

Pentru a face conectorul Pulsar Flink mai ușor de utilizat, am decis să dezvoltăm capacitățile acceptă formatul de date Flink, astfel încât utilizatorii nu trebuie să petreacă timp la configurare.

Ce este nou în Pulsar Flink Connector 2.7.0?

Pulsar Flink Connector 2.7.0 acceptă caracteristici în Apache Pulsar 2.7.0 și Apache Flink 1.12 și este pe deplin compatibil cu conectorul Flink și formatul mesajului Flink. Acum, puteți utiliza caracteristici importante în Flink, cum ar fi scufundarea exactă, mecanismul Pulsar de ridicare, coloanele calculate Data Definition Language (DDL), filigranele și metadatele. De asemenea, puteți utiliza abonamentul Key-Shared în Pulsar și puteți efectua serializarea și deserializarea fără prea multe configurații. În plus, puteți personaliza cu ușurință configurația pe baza afacerii dvs.

Mai jos, vă prezentăm în detaliu caracteristicile cheie din Pulsar Flink Connector 2.7.0 în detaliu.

Coadă de mesaje comandate cu high- performanță

Atunci când utilizatorii aveau nevoie să garanteze ordonarea strictă a mesajelor, doar un singur consumator avea voie să consume mesaje. Acest lucru a avut un impact sever asupra debitului. Pentru a aborda acest lucru, am proiectat un model de abonament Key\_Shared în Pulsar. Garantează ordonarea mesajelor și îmbunătățește randamentul prin adăugarea unei chei la fiecare mesaj și direcționează mesajele cu același Hash cheie către un consumator.

Conectorul Pulsar Flink 2.7.0 acceptă modelul de abonament Key\_Shared. Puteți activa această caracteristică setând enable-key-hash-range la true. Gama Key Hash procesată de fiecare consumator este decisă de paralelismul sarcinilor.

Introducerea semanticii exact o dată pentru Pulsar sink (pe baza tranzacției Pulsar)

În versiunile anterioare, sink operatorii au acceptat doar cel puțin o dată semantica, care nu a putut îndeplini pe deplin cerințele de consistență end-to-end. Pentru a deduplica mesajele, utilizatorii au trebuit să facă niște lucruri murdare, care nu erau ușor de utilizat.

Tranzacțiile sunt acceptate în Pulsar 2.7.0, ceea ce va îmbunătăți foarte mult capacitatea de toleranță la erori a Flink sink. În Pulsar Flink Connector 2.7.0, am proiectat semantica exact o dată pentru operatorii de chiuvete pe baza tranzacțiilor Pulsar. Flink folosește protocolul de comitere în două faze pentru a implementa TwoPhaseCommitSinkFunction. Principalele metode ale ciclului de viață sunt beginTransaction (), preCommit (), commit (), abort (), recoverAndCommit (), recoverAndAbort ().

Puteți selecta semantica în mod flexibil atunci când creați un operator de chiuvetă și modificările logice interne sunt transparente. Tranzacțiile Pulsar sunt similare protocolului de confirmare în două faze din Flink, ceea ce va îmbunătăți foarte mult fiabilitatea Conector Sink.

Este ușor să implementați beginTransaction și preCommit. Trebuie doar să porniți o tranzacție Pulsar și să persistați TID-ul tranzacției după punctul de control. În faza preCommit, trebuie să vă asigurați că toate mesajele sunt trimise la Pulsar, iar mesajele pre-comise vor fi trimise în cele din urmă.

Ne concentrăm pe recoveryAndCommit și recoverAndAbort în implementare. Limitat de caracteristicile Kafka, conectorul Kafka adoptă stiluri de hack pentru recoveryAndCommit. Tranzacțiile Pulsar nu se bazează pe producătorul specific, astfel încât este ușor să comiteți și să anulați tranzacțiile pe baza TID.

Tranzacțiile Pulsar sunt extrem de eficiente și flexibile. Având avantajele Pulsar și Flink, conectorul Pulsar Flink este și mai puternic. Vom continua să îmbunătățim chiuveta tranzacțională în conectorul Pulsar Flink.

Introducerea conectorului upsert-pulsar

Utilizatorii din comunitatea Flink și-au exprimat nevoile pentru Pulsar upsert. După ce am analizat listele de corespondență și problemele, am rezumat următoarele trei motive.

  • Interpretează subiectul Pulsar ca un flux de jurnal de schimbări care interpretează înregistrările cu chei ca evenimente upert (aka insert / update).
  • Ca parte a conductei în timp real, alăturați mai multor fluxuri pentru îmbogățire și stocați rezultatele într-un subiect Pulsar pentru calcul ulterior ulterior. Cu toate acestea, rezultatul poate conține evenimente de actualizare.
  • Ca parte a conductei în timp real, agregați fluxurile de date și stocați rezultatele într-un subiect Pulsar pentru calcul ulterior. Cu toate acestea, rezultatul poate conține evenimente de actualizare.

Pe baza cerințelor, adăugăm suport pentru Upsert Pulsar. Conectorul upsert-pulsar permite citirea și scrierea datelor în subiectele Pulsar în mod upsert.

  • Ca sursă, conectorul upsert-pulsar produce un flux de registre de schimbări, unde fiecare înregistrare de date reprezintă o actualizare sau ștergere eveniment. Mai exact, valoarea dintr-o înregistrare de date este interpretată ca un ACTUALIZARE a ultimei valori pentru aceeași cheie, dacă există (dacă o cheie corespunzătoare nu există încă, actualizarea va fi considerată INSERT). Folosind analogia tabelului, o înregistrare de date într-un flux de registre de schimbări este interpretată ca un UPSERT (aka INSERT / UPDATE) deoarece orice rând existent cu aceeași cheie este suprascris. De asemenea, valorile nule sunt interpretate într-un mod special: o înregistrare cu o valoare nulă reprezintă o „ȘTERGERE”.
  • Ca o chiuvetă, conectorul UPSert-pulsar poate consuma un flux de registre de schimbări. Va scrie datele INSERT / UPDATE\_AFTER ca valoare normală a mesajelor Pulsar și va scrie datele ȘTERGERE ca mesaje Pulsar cu valori nule (indicați piatra funerară pentru cheie). Flink va garanta ordonarea mesajelor pe cheia primară prin datele partiției pe valorile coloanelor cheii primare, astfel încât mesajele de actualizare / ștergere de pe aceeași cheie vor intra în aceeași partiție.

Suportă o nouă interfață sursă și API de tabel introduse în FLIP-27 și FLIP-95

Această caracteristică unifică sursa fluxului batch și optimizează mecanismul pentru descoperirea sarcinilor și citirea datelor. Este, de asemenea, piatra de temelie a implementării noastre a Pulsar batch și a unificării în flux. Noua API Table acceptă coloane, filigran și metadate DDL calculate.

Sprijină metadatele SQL citite și scrise așa cum este descris în FLIP-107

FLIP-107 permite utilizatorilor să acceseze metadatele conectorului ca coloană de metadate în definițiile tabelului. În calculul în timp real, utilizatorii au nevoie de obicei de informații suplimentare, cum ar fi eventTime, câmpuri personalizate. Conectorul Pulsar Flink acceptă metadatele de citire și scriere SQL, deci este flexibil și ușor pentru utilizatori să gestioneze metadatele mesajelor Pulsar în Pulsar Flink Connector 2.7.0. Pentru detalii despre configurație, consultați Manipularea metadatelor Pulsar Message .

Adăugați tip de format Flink atomic pentru a suporta tipurile primitive Pulsar

În Pulsar Flink Connector 2.7.0, adăugăm tipul formatului Flink atomic pentru a susține tipurile primitive Pulsar. Când procesarea Flink necesită un tip primitiv Pulsar, puteți utiliza atomic ca format de conector. Pentru mai multe informații despre tipurile primitive Pulsar, consultați https: //pulsar.apache.org / docs / ro / schema-understand / .

Migrare

Dacă utilizați versiunea anterioară Pulsar Flink Connector, trebuie să ajustați parametrii SQL și API în consecinţă. Mai jos oferim detalii despre fiecare.

SQL

În SQL, am modificat parametrii de configurare Pulsar în declarația DDL. Numele unor parametri sunt modificate, dar valorile nu sunt modificate.

  • Eliminați prefixul connector. din numele parametrilor.
  • Schimbați numele parametrului connector.type în connector.
  • Schimbați numele parametrului modului de pornire din connector.startup-mode în scan.startup.mode.
  • Ajustați proprietățile Pulsar ca properties.pulsar.reader.readername=testReaderName.

Dacă utilizați SQL în Pulsar Flink Connector, trebuie să vă ajustați configurația SQL în consecință atunci când migrați la Pulsar Flink Connector 2.7.0. Următorul exemplu prezintă diferențele dintre versiunile anterioare și versiunea 2.7.0 pentru SQL.

SQL în versiunile anterioare :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL în Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Din perspectiva API, am ajustat unele clase și am permis o personalizare mai ușoară.

  • Pentru a rezolva problemele de serializare, am schimbat semnătura metodei de construcție FlinkPulsarSink și am adăugat PulsarSerializationSchema.
  • Am eliminat clasele neadecvate legate de rând, cum ar fi FlinkPulsarRowSink, FlinkPulsarRowSource. Dacă trebuie să aveți de-a face cu formatul Row, puteți utiliza componentele de serializare legate de Flink Row.

Puteți crea PulsarSerializationSchema utilizând PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor este mutat în PulsarSerializationSchemaWrapper. Când vă ajustați API-ul, puteți lua ca referință următorul exemplu.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Plan de viitor

Astăzi proiectăm un lot și soluție de flux integrată cu Pulsar Source, bazată pe noua API Flink Source (FLIP-27). Noua soluție va debloca limitările interfeței curente a sursei de streaming (SourceFunction) și simultan pentru a unifica interfețele sursă între API-urile lot și streaming.

Pulsar oferă o arhitectură ierarhică în care datele sunt împărțite în streaming, și date reci, care permit Pulsar să ofere o capacitate infinită. Acest lucru face ca Pulsar să fie o soluție ideală pentru loturi unificate și streaming.

Soluția batch și stream bazată pe noua API Flink Source este împărțită în două părți simple: SplitEnumerator și Reader. SplitEnumerator descoperă și atribuie partiții, iar Reader citește date din partiție.

Pulsar stochează mesaje în blocul de registru și puteți localiza registrele prin administratorul Pulsar și apoi furnizați partiția broker, partiția BookKeeper, partiția Offloader și alte informații prin diferite politici de partiționare. Pentru mai multe detalii, consultați https://github.com/streamnative/pulsar-flink/issues/187 .

Concluzie

Pulsar Flink Connector 2.7.0 este lansat și îi încurajăm pe toți să folosească Pulsar Flink Connector 2.7.0. Noua versiune este mai ușor de utilizat și este activată cu diverse funcții în Pulsar 2.7 și Flink 1.12. Vom contribui Pulsar Flink Connector 2.7.0 la depozit Flink . Dacă aveți vreo îngrijorare cu privire la Pulsar Flink Connector, nu ezitați să deschideți probleme în https://github.com/streamnative/pulsar-flink/issues .

Despre autor

Jianyun Zhao este inginer software la StreamNative. Înainte de aceasta, el a fost responsabil pentru dezvoltarea unui sistem de calcul în timp real la Zhaopin.com. Îl poți urmări pe twitter .

Jennifer Huang este un comitator Apache Pulsar. Lucrează ca strateg senior de conținut la StreamNative, responsabilă pentru documentarea Apache Pulsar și creșterea comunității. O puteți urmări pe twitter .

Această postare a fost publicată inițial pe blogul StreamNative .

Îți place această postare? Vă rugăm să recomandați și / sau să partajați.

Doriți să aflați mai multe? Vedeți https://streamnative.io/blog . Urmați-ne pe Medium și verificați GitHub . . / p>