Novità di Pulsar Flink Connector 2.7.0

( Sijia-w) (24 dicembre 2020)

Impara le caratteristiche più interessanti e principali di Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector fornisce una soluzione ideale per batch e streaming unificati con Apache Pulsar e Apache Flink. Pulsar Flink Connector 2.7.0 supporta le funzionalità di Pulsar 2.7 e Flink 1.12 ed è completamente compatibile con il formato dati Flink.

Informazioni su Pulsar Flink Connector

Per consentire alle aziende di accedere a dati reali informazioni sui dati temporali, necessitano di funzionalità di streaming e batch unificate. Apache Flink unifica lelaborazione in batch e in streaming in un unico motore di elaborazione con “flussi” come rappresentazione unificata dei dati. Sebbene gli sviluppatori abbiano svolto un lavoro approfondito a livello di elaborazione e API, è stato fatto pochissimo lavoro a livello di dati, messaggistica e archiviazione. Tuttavia, in realtà, i dati sono separati in silos di dati, creati da varie tecnologie di archiviazione e messaggistica. Di conseguenza, non esiste ancora ununica fonte di verità e loperazione complessiva per i team di sviluppatori è ancora complicata. Per affrontare le operazioni disordinate, dobbiamo memorizzare i dati nei flussi. Apache Pulsar (insieme ad Apache BookKeeper) soddisfa perfettamente i criteri: i dati vengono archiviati come una copia (fonte della verità) ed è possibile accedervi in ​​flussi (tramite interfacce pub-sub) e segmenti (per lelaborazione batch). Quando Flink e Pulsar si uniscono, le due tecnologie open source creano unarchitettura dati unificata per le aziende basate sui dati in tempo reale.

Il connettore Pulsar Flink fornisce lelaborazione dati elastica con Apache Pulsar e Apache Flink , consentendo ad Apache Flink di leggere / scrivere dati da / verso Apache Pulsar. Il connettore Pulsar Flink ti consente di concentrarti sulla logica aziendale senza preoccuparti dei dettagli di archiviazione.

Sfide

Quando abbiamo sviluppato per la prima volta il connettore Pulsar Flink, ha ricevuto unampia adozione da entrambi i Comunità Flink e Pulsar. Sfruttando il connettore Pulsar Flink, Hewlett Packard Enterprise (HPE) ha creato una piattaforma di elaborazione in tempo reale, BIGO ha creato un sistema di elaborazione dei messaggi in tempo reale e Zhihu sta valutando lidoneità del connettore per un sistema di elaborazione in tempo reale.

Man mano che sempre più utenti hanno adottato Pulsar Flink Connector, abbiamo sentito un problema comune dalla comunità: è difficile eseguire la serializzazione e la deserializzazione. Sebbene il connettore Pulsar Flink sfrutti la serializzazione Pulsar, le versioni precedenti non supportavano il formato dati Flink. Di conseguenza, gli utenti hanno dovuto eseguire molte configurazioni per utilizzare il connettore per lelaborazione in tempo reale.

Per rendere il connettore Pulsar Flink più facile da usare, abbiamo deciso di sviluppare le funzionalità per supporta il formato dati Flink, quindi gli utenti non devono perdere tempo nella configurazione.

Novità in Pulsar Flink Connector 2.7.0?

Pulsar Flink Connector 2.7.0 supporta le funzionalità in Apache Pulsar 2.7.0 e Apache Flink 1.12 ed è completamente compatibile con il connettore Flink e il formato dei messaggi Flink. Ora puoi utilizzare funzionalità importanti in Flink, come il sink esattamente una volta, il meccanismo Pulsar upsert, colonne calcolate DDL (Data Definition Language), filigrane e metadati. Puoi anche sfruttare labbonamento con chiave condivisa in Pulsar e condurre la serializzazione e la deserializzazione senza troppa configurazione. Inoltre, puoi personalizzare facilmente la configurazione in base alla tua attività.

Di seguito, introduciamo in dettaglio le funzionalità chiave di Pulsar Flink Connector 2.7.0.

Coda di messaggi ordinata con prestazioni

Quando gli utenti avevano bisogno di garantire rigorosamente lordine dei messaggi, solo un consumatore poteva consumare i messaggi. Ciò ha avuto un grave impatto sul rendimento. Per risolvere questo problema, abbiamo progettato un modello di abbonamento Key\_Shared in Pulsar. Garantisce lordine dei messaggi e migliora la velocità di trasmissione aggiungendo una chiave a ogni messaggio e instrada i messaggi con lo stesso hash della chiave a un consumatore.

Pulsar Flink Connector 2.7.0 supporta il modello di abbonamento Key\_Shared. Puoi abilitare questa funzione impostando enable-key-hash-range su true. Lintervallo di key hash elaborato da ciascun consumatore è deciso dal parallelismo delle attività.

Introduzione della semantica di una volta sola per il sink Pulsar (basato sulla transazione Pulsar)

Nelle versioni precedenti, sink gli operatori supportavano solo la semantica almeno una volta, che non poteva soddisfare completamente i requisiti per la coerenza end-to-end. Per deduplicare i messaggi, gli utenti dovevano fare del lavoro sporco, che non era facile da usare.

Le transazioni sono supportate in Pulsar 2.7.0, che migliorerà notevolmente la capacità di tolleranza ai guasti del sink Flink. In Pulsar Flink Connector 2.7.0, abbiamo progettato la semantica esattamente una volta per gli operatori sink basati su transazioni Pulsar. Flink utilizza il protocollo di commit a due fasi per implementare TwoPhaseCommitSinkFunction. I metodi principali del ciclo di vita sono beginTransaction (), preCommit (), commit (), abort (), recoverAndCommit (), recoverAndAbort ().

È possibile selezionare la semantica in modo flessibile quando si crea un operatore sink e il le modifiche alla logica interna sono trasparenti. Le transazioni Pulsar sono simili al protocollo di commit a due fasi in Flink, che migliorerà notevolmente laffidabilità di Connector Sink.

È facile implementare beginTransaction e preCommit. Hai solo bisogno di avviare una transazione Pulsar e mantenere il TID della transazione dopo il checkpoint. Nella fase di preCommit, devi assicurarti che tutti i messaggi vengano scaricati su Pulsar e che i messaggi preimpegnati verranno eventualmente salvati.

Ci concentriamo su recoverAndCommit e recoverAndAbort nellimplementazione. Limitato dalle funzionalità di Kafka, il connettore Kafka adotta stili di hacking per recoverAndCommit. Le transazioni Pulsar non si basano sul produttore specifico, quindi è facile per te eseguire il commit e interrompere le transazioni basate su TID.

Le transazioni Pulsar sono altamente efficienti e flessibili. Sfruttando i vantaggi di Pulsar e Flink, il connettore Pulsar Flink è ancora più potente. Continueremo a migliorare il sink transazionale nel connettore Pulsar Flink.

Presentazione del connettore upsert-pulsar

Gli utenti della comunità Flink hanno espresso le loro esigenze per lupsert Pulsar. Dopo aver esaminato le mailing list e i problemi, abbiamo riassunto i seguenti tre motivi.

  • Interpreta largomento Pulsar come un flusso di changelog che interpreta i record con le chiavi come eventi upsert (aka inserimento / aggiornamento).
  • Come parte della pipeline in tempo reale, unisci più flussi per larricchimento e archivia i risultati in un argomento Pulsar per ulteriori calcoli in seguito. Tuttavia, il risultato potrebbe contenere eventi di aggiornamento.
  • Come parte della pipeline in tempo reale, aggrega i flussi di dati e archivia i risultati in un argomento Pulsar per ulteriori calcoli in seguito. Tuttavia, il risultato potrebbe contenere eventi di aggiornamento.

In base ai requisiti, aggiungiamo il supporto per Upsert Pulsar. Il connettore upsert-pulsar consente di leggere e scrivere dati in argomenti Pulsar in modo upsert.

  • Come sorgente, il connettore upsert-pulsar produce un flusso di changelog, in cui ogni record di dati rappresenta un aggiornamento o eliminare un evento. Più precisamente, il valore in un record di dati viene interpretato come un AGGIORNAMENTO dellultimo valore per la stessa chiave, se presente (se una chiave corrispondente non esiste ancora, laggiornamento sarà considerato un INSERT). Utilizzando lanalogia con la tabella, un record di dati in un flusso di log delle modifiche viene interpretato come UPSERT (noto anche come INSERT / UPDATE) perché qualsiasi riga esistente con la stessa chiave viene sovrascritta. Inoltre, i valori null vengono interpretati in un modo speciale: un record con un valore null rappresenta un “DELETE”.
  • Come sink, il connettore upsert-pulsar può consumare un flusso di changelog. Scriverà i dati INSERT / UPDATE\_AFTER come normali valori dei messaggi Pulsar e scriverà i dati DELETE come messaggi Pulsar con valori nulli (indicare la lapide per la chiave). Flink garantirà lordinamento dei messaggi sulla chiave primaria partizionando i dati sui valori delle colonne della chiave primaria, quindi i messaggi di aggiornamento / cancellazione sulla stessa chiave cadranno nella stessa partizione.

Supporta la nuova interfaccia sorgente e lAPI della tabella introdotte in FLIP-27 e FLIP-95

Questa caratteristica unifica lorigine del flusso batch e ottimizza il meccanismo per lindividuazione delle attività e la lettura dei dati. È anche la pietra angolare della nostra implementazione del batch Pulsar e dellunificazione streaming. La nuova API Table supporta colonne, filigrane e metadati calcolati DDL.

Supporta metadati di lettura e scrittura SQL come descritto in FLIP-107

FLIP-107 consente agli utenti di accedere ai metadati del connettore come una colonna di metadati nelle definizioni di tabella. Nellelaborazione in tempo reale, gli utenti di solito necessitano di informazioni aggiuntive, come eventTime, campi personalizzati. Il connettore Pulsar Flink supporta la lettura e la scrittura di metadati SQL, quindi è flessibile e facile per gli utenti gestire i metadati dei messaggi Pulsar in Pulsar Flink Connector 2.7.0. Per i dettagli sulla configurazione, fare riferimento a Manipolazione dei metadati dei messaggi Pulsar .

Aggiungi il tipo di formato Flink atomic per supportare i tipi primitivi Pulsar

In Pulsar Flink Connector 2.7.0, aggiungiamo il tipo di formato Flink atomic per supportare i tipi primitivi Pulsar. Quando lelaborazione Flink richiede un tipo primitivo Pulsar, puoi utilizzare atomic come formato del connettore. Per ulteriori informazioni sui tipi primitivi Pulsar, vedere https: //pulsar.apache.org / docs / en / schema-capire / .

Migrazione

Se stai utilizzando la versione precedente del connettore Pulsar Flink, devi regolare i parametri SQL e API di conseguenza. Di seguito forniamo i dettagli su ciascuno.

SQL

In SQL, abbiamo modificato i parametri di configurazione Pulsar nella dichiarazione DDL. Il nome di alcuni parametri viene modificato, ma i valori non vengono modificati.

  • Rimuovi il prefisso connector. dai nomi dei parametri.
  • Cambia il nome del parametro connector.type in connector.
  • Cambia il nome del parametro della modalità di avvio da connector.startup-mode in scan.startup.mode.
  • Modifica le proprietà Pulsar come properties.pulsar.reader.readername=testReaderName.

Se si utilizza SQL in Pulsar Flink Connector, è necessario regolare la configurazione SQL di conseguenza durante la migrazione a Pulsar Flink Connector 2.7.0. Il seguente esempio mostra le differenze tra le versioni precedenti e la versione 2.7.0 per SQL.

SQL nelle versioni precedenti :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL in Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Dal punto di vista dellAPI, abbiamo modificato alcune classi e reso possibile una personalizzazione più semplice.

  • Per risolvere i problemi di serializzazione, abbiamo modificato la firma del metodo di costruzione FlinkPulsarSink e aggiunto PulsarSerializationSchema.
  • Abbiamo rimosso le classi inappropriate relative alla riga, come FlinkPulsarRowSink, FlinkPulsarRowSource. Se devi gestire il formato Row, puoi utilizzare i componenti di serializzazione relativi a Flink Row.

Puoi creare PulsarSerializationSchema utilizzando PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor è stato spostato in PulsarSerializationSchemaWrapper. Quando modifichi la tua API, puoi prendere il seguente esempio come riferimento.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Piano futuro

Oggi stiamo progettando un batch e una soluzione di streaming integrata con Pulsar Source, basata sulla nuova Flink Source API (FLIP-27). La nuova soluzione sbloccherà i limiti dellattuale interfaccia sorgente di streaming (SourceFunction) e simultaneamente unificherà le interfacce sorgente tra le API batch e streaming.

Pulsar offre unarchitettura gerarchica in cui i dati sono suddivisi in streaming, batch, e dati freddi, che consentono a Pulsar di fornire una capacità infinita. Questo rende Pulsar una soluzione ideale per batch e streaming unificati.

La soluzione batch e streaming basata sulla nuova Flink Source API è divisa in due semplici parti: SplitEnumerator e Reader. SplitEnumerator rileva e assegna le partizioni e Reader legge i dati dalla partizione.

Pulsar memorizza i messaggi nel blocco del libro mastro e puoi individuare i registri tramite lamministratore di Pulsar, quindi fornire la partizione del broker, la partizione di BookKeeper, la partizione di Offloader e altre informazioni attraverso diverse politiche di partizionamento. Per ulteriori dettagli, fare riferimento a https://github.com/streamnative/pulsar-flink/issues/187 .

Conclusione

Viene rilasciato Pulsar Flink Connector 2.7.0 e incoraggiamo vivamente tutti a utilizzare Pulsar Flink Connector 2.7.0. La nuova versione è più user-friendly ed è abilitata con varie funzionalità in Pulsar 2.7 e Flink 1.12. Contribuiremo con il Pulsar Flink Connector 2.7.0 al repository Flink . In caso di dubbi su Pulsar Flink Connector, non esitare ad aprire problemi in https://github.com/streamnative/pulsar-flink/issues .

Informazioni sullautore

Jianyun Zhao è un ingegnere del software presso StreamNative. Prima di allora, era responsabile dello sviluppo di un sistema di calcolo in tempo reale su Zhaopin.com. Puoi seguirlo su twitter .

Jennifer Huang è un committer di Apache Pulsar. Lavora come stratega dei contenuti senior presso StreamNative, responsabile della documentazione di Apache Pulsar e della crescita della comunità. Puoi seguirla su twitter .

Questo post è stato originariamente pubblicato sul blog StreamNative .

Ti piace questo post? Consiglia e / o condividi.

Vuoi saperne di più? Vedi https://streamnative.io/blog . Seguici su Medium e controlla il nostro GitHub .