Nouveautés de Pulsar Flink Connector 2.7.0

( Sijia-w) (24 décembre 2020)

En savoir les fonctionnalités les plus intéressantes et les plus importantes de Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector fournit une solution idéale pour un traitement par lots et streaming unifié avec Apache Pulsar et Apache Flink. Pulsar Flink Connector 2.7.0 prend en charge les fonctionnalités de Pulsar 2.7 et Flink 1.12, et est entièrement compatible avec le format de données Flink.

À propos de Pulsar Flink Connector

Pour que les entreprises puissent accéder à informations temporelles sur les données, ils ont besoin de capacités de traitement par lots et de streaming unifiées. Apache Flink unifie le traitement par lots et par flux en un seul moteur de calcul avec des «flux» comme représentation unifiée des données. Bien que les développeurs aient effectué un travail considérable au niveau des couches informatique et API, très peu de travail a été effectué au niveau des couches de données et de messagerie et de stockage. Cependant, en réalité, les données sont séparées en silos de données, créés par diverses technologies de stockage et de messagerie. En conséquence, il ny a toujours pas de source de vérité unique et le fonctionnement global des équipes de développeurs est toujours compliqué. Pour résoudre les problèmes liés aux opérations, nous devons stocker les données dans des flux. Apache Pulsar (avec Apache BookKeeper) répond parfaitement aux critères: les données sont stockées en une seule copie (source de vérité), et peuvent être consultées dans des flux (via des interfaces pub-sub) et des segments (pour le traitement par lots). Lorsque Flink et Pulsar se rejoignent, les deux technologies open source créent une architecture de données unifiée pour les entreprises basées sur les données en temps réel.

Le connecteur Pulsar Flink fournit un traitement élastique des données avec Apache Pulsar et Apache Flink , permettant à Apache Flink de lire / écrire des données depuis / vers Apache Pulsar. Le connecteur Pulsar Flink vous permet de vous concentrer sur votre logique commerciale sans vous soucier des détails de stockage.

Défis

Lorsque nous avons développé le connecteur Pulsar Flink pour la première fois, il a été largement adopté par les deux Communautés Flink et Pulsar. En exploitant le connecteur Pulsar Flink, Hewlett Packard Enterprise (HPE) a construit une plate-forme de calcul en temps réel, BIGO a construit un système de traitement des messages en temps réel, et Zhihu est en train dévaluer ladéquation du connecteur pour un système informatique en temps réel.

Alors que de plus en plus dutilisateurs adoptaient le connecteur Pulsar Flink, nous avons entendu un problème commun de la communauté: il est difficile de faire de la sérialisation et de la désérialisation. Alors que le connecteur Pulsar Flink tire parti de la sérialisation Pulsar, les versions précédentes ne prenaient pas en charge le format de données Flink. En conséquence, les utilisateurs ont dû faire beaucoup de configurations pour utiliser le connecteur pour faire du calcul en temps réel.

Pour rendre le connecteur Pulsar Flink plus facile à utiliser, nous avons décidé de développer les capacités pour prend en charge le format de données Flink, les utilisateurs nont donc pas besoin de passer du temps sur la configuration.

Quoi de neuf dans Pulsar Flink Connector 2.7.0?

Le Pulsar Flink Connector 2.7.0 prend en charge les fonctionnalités dans Apache Pulsar 2.7.0 et Apache Flink 1.12, et est entièrement compatible avec le connecteur Flink et le format de message Flink. Désormais, vous pouvez utiliser des fonctionnalités importantes dans Flink, telles que le récepteur à une seule fois, le mécanisme upsert Pulsar, les colonnes calculées en langage de définition de données (DDL), les filigranes et les métadonnées. Vous pouvez également tirer parti de labonnement Key-Shared dans Pulsar et effectuer la sérialisation et la désérialisation sans trop de configuration. De plus, vous pouvez facilement personnaliser la configuration en fonction de votre entreprise.

Ci-dessous, nous présentons les principales fonctionnalités de Pulsar Flink Connector 2.7.0 en détail.

File dattente de messages ordonnée avec une performance

Lorsque les utilisateurs avaient besoin de garantir strictement lordre des messages, un seul consommateur était autorisé à consommer des messages. Cela a eu un impact sévère sur le débit. Pour résoudre ce problème, nous avons conçu un modèle dabonnement Key\_Shared dans Pulsar. Il garantit lordre des messages et améliore le débit en ajoutant une clé à chaque message, et achemine les messages avec le même hachage de clé vers un consommateur.

Pulsar Flink Connector 2.7.0 prend en charge le modèle dabonnement Key\_Shared. Vous pouvez activer cette fonctionnalité en définissant enable-key-hash-range sur true. La plage de Key Hash traitée par chaque consommateur est décidée par le parallélisme des tâches.

Présentation de la sémantique à exactement une seule fois pour Pulsar sink (basée sur la transaction Pulsar)

Dans les versions précédentes, sink Les opérateurs ne prenaient en charge que la sémantique au moins une fois, ce qui ne pouvait pas répondre pleinement aux exigences de cohérence de bout en bout. Pour dédupliquer les messages, les utilisateurs devaient faire du sale boulot, ce qui nétait pas convivial.

Les transactions sont prises en charge dans Pulsar 2.7.0, ce qui améliorera considérablement la capacité de tolérance aux pannes du puits Flink. Dans Pulsar Flink Connector 2.7.0, nous avons conçu une sémantique exactement unique pour les opérateurs de puits basés sur les transactions Pulsar. Flink utilise le protocole de validation en deux phases pour implémenter TwoPhaseCommitSinkFunction. Les principales méthodes de cycle de vie sont beginTransaction (), preCommit (), commit (), abort (), recoverAndCommit (), recoverAndAbort ().

Vous pouvez sélectionner la sémantique de manière flexible lors de la création dun opérateur récepteur, et le les changements de logique interne sont transparents. Les transactions Pulsar sont similaires au protocole de validation en deux phases de Flink, ce qui améliorera considérablement la fiabilité de Connector Sink.

Il est facile dimplémenter beginTransaction et preCommit. Il vous suffit de démarrer une transaction Pulsar et de conserver le TID de la transaction après le point de contrôle. Dans la phase de pré-validation, vous devez vous assurer que tous les messages sont vidés vers Pulsar et que les messages pré-validés seront finalement validés.

Nous nous concentrons sur recoverAndCommit et recoverAndAbort dans limplémentation. Limité par les fonctionnalités de Kafka, le connecteur Kafka adopte des styles de hack pour recoverAndCommit. Les transactions Pulsar ne dépendent pas du producteur spécifique, il est donc facile pour vous de valider et dannuler des transactions basées sur TID.

Les transactions Pulsar sont très efficaces et flexibles. Profitant des avantages de Pulsar et Flink, le connecteur Pulsar Flink est encore plus puissant. Nous continuerons à améliorer le puits transactionnel dans le connecteur Pulsar Flink.

Présentation du connecteur upsert-pulsar

Les utilisateurs de la communauté Flink ont ​​exprimé leurs besoins pour le Pulsar upsert. Après avoir parcouru les listes de diffusion et les problèmes, nous avons résumé les trois raisons suivantes.

  • Interpréter le sujet Pulsar comme un flux de journal des modifications qui interprète les enregistrements avec des clés comme des événements upsert (aka insert / update).
  • Dans le cadre du pipeline en temps réel, joignez plusieurs flux pour lenrichissement et stockez les résultats dans une rubrique Pulsar pour un calcul ultérieur. Cependant, le résultat peut contenir des événements de mise à jour.
  • Dans le cadre du pipeline en temps réel, agréger sur des flux de données et stocker les résultats dans une rubrique Pulsar pour un calcul ultérieur. Cependant, le résultat peut contenir des événements de mise à jour.

En fonction des exigences, nous ajoutons le support dUpsert Pulsar. Le connecteur upsert-pulsar permet de lire et décrire des données dans des sujets Pulsar de manière ascendante.

  • En tant que source, le connecteur upsert-pulsar produit un flux de journal des modifications, où chaque enregistrement de données représente un événement de mise à jour ou de suppression. Plus précisément, la valeur dun enregistrement de données est interprétée comme une MISE À JOUR de la dernière valeur de la même clé, le cas échéant (si une clé correspondante nexiste pas encore, la mise à jour sera considérée comme un INSERT). En utilisant lanalogie de la table, un enregistrement de données dans un flux de journal des modifications est interprété comme un UPSERT (aka INSERT / UPDATE) car toute ligne existante avec la même clé est écrasée. De plus, les valeurs nulles sont interprétées dune manière spéciale: un enregistrement avec une valeur nulle représente un «DELETE».
  • En tant que récepteur, le connecteur upsert-pulsar peut consommer un flux de journal des modifications. Il écrira les données INSERT / UPDATE\_AFTER en tant que valeur de messages Pulsar normale, et les données DELETE en tant que messages Pulsar avec des valeurs nulles (indiquer la pierre tombale pour la clé). Flink garantira lordre des messages sur la clé primaire en partitionnant les données sur les valeurs des colonnes de clé primaire, de sorte que les messages de mise à jour / suppression sur la même clé tomberont dans la même partition.

Prise en charge de la nouvelle interface source et de lAPI Table introduites dans FLIP-27 et FLIP-95

Cette fonctionnalité unifie la source du flux batch et optimise le mécanisme de découverte des tâches et de lecture des données. Cest également la pierre angulaire de notre implémentation de lunification par lots et streaming Pulsar. La nouvelle API Table prend en charge les colonnes, les filigranes et les métadonnées calculés DDL.

Prise en charge des métadonnées de lecture et décriture SQL comme décrit dans FLIP-107

FLIP-107 permet aux utilisateurs daccéder aux métadonnées du connecteur en tant que colonne de métadonnées dans les définitions de table. Dans le calcul en temps réel, les utilisateurs ont généralement besoin dinformations supplémentaires, telles que eventTime, des champs personnalisés. Le connecteur Pulsar Flink prend en charge les métadonnées de lecture et décriture SQL, il est donc flexible et facile pour les utilisateurs de gérer les métadonnées des messages Pulsar dans Pulsar Flink Connector 2.7.0. Pour plus de détails sur la configuration, reportez-vous à Manipulation des métadonnées des messages Pulsar .

Ajouter le type de format Flink atomic pour prendre en charge les types primitifs Pulsar

Dans Pulsar Flink Connector 2.7.0, nous ajoutons le type de format Flink atomic pour prendre en charge les types primitifs Pulsar. Lorsque le traitement Flink nécessite un type primitif Pulsar, vous pouvez utiliser atomic comme format de connecteur. Pour plus dinformations sur les types primitifs Pulsar, consultez https: //pulsar.apache.org / docs / fr / schema-understand / .

Migration

Si vous utilisez la version précédente de Pulsar Flink Connector, vous devez ajuster les paramètres SQL et API en conséquence. Ci-dessous, nous fournissons des détails sur chacun deux.

SQL

En SQL, nous avons modifié les paramètres de configuration de Pulsar dans la déclaration DDL. Le nom de certains paramètres est modifié, mais les valeurs ne le sont pas.

  • Supprimez le préfixe connector. des noms de paramètres.
  • Modifiez le nom du paramètre connector.type en connector.
  • Modifiez le nom du paramètre de mode de démarrage de connector.startup-mode dans scan.startup.mode.
  • Ajustez les propriétés de Pulsar comme properties.pulsar.reader.readername=testReaderName.

Si vous utilisez SQL dans Pulsar Flink Connector, vous devez ajuster votre configuration SQL en conséquence lors de la migration vers Pulsar Flink Connector 2.7.0. Lexemple suivant montre les différences entre les versions précédentes et la version 2.7.0 pour SQL.

SQL dans les versions précédentes :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL dans Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Du point de vue de lAPI, nous avons ajusté certaines classes et activé une personnalisation plus facile.

  • Pour résoudre les problèmes de sérialisation, nous avons modifié la signature de la méthode de construction FlinkPulsarSink et ajouté PulsarSerializationSchema.
  • Nous avons supprimé les classes inappropriées liées à la ligne, telles que FlinkPulsarRowSink, FlinkPulsarRowSource. Si vous devez gérer le format Row, vous pouvez utiliser les composants de sérialisation liés à Flink Row.

Vous pouvez créer PulsarSerializationSchema en utilisant PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor est déplacé vers PulsarSerializationSchemaWrapper. Lorsque vous ajustez votre API, vous pouvez prendre lexemple suivant comme référence.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Plan futur

Aujourdhui, nous concevons un lot et une solution de flux intégrée à Pulsar Source, basée sur la nouvelle API Flink Source (FLIP-27). La nouvelle solution débloquera les limites de linterface source de streaming actuelle (SourceFunction) et simultanément pour unifier les interfaces source entre les API de traitement par lots et de streaming.

Pulsar propose une architecture hiérarchique où les données sont divisées en streaming, batch, et des données froides, qui permettent à Pulsar de fournir une capacité infinie. Cela fait de Pulsar une solution idéale pour le batch et le streaming unifiés.

La solution batch and stream basée sur la nouvelle API Flink Source est divisée en deux parties simples: SplitEnumerator et Reader. SplitEnumerator découvre et attribue des partitions, et Reader lit les données de la partition.

Pulsar stocke les messages dans le bloc du grand livre, et vous pouvez localiser les registres via Pulsar admin, puis fournir une partition de courtier, une partition BookKeeper, une partition Offloader et dautres informations via différentes politiques de partitionnement. Pour plus de détails, reportez-vous à https://github.com/streamnative/pulsar-flink/issues/187 .

Conclusion

Pulsar Flink Connector 2.7.0 est disponible et nous encourageons vivement tout le monde à utiliser Pulsar Flink Connector 2.7.0. La nouvelle version est plus conviviale et est activée avec diverses fonctionnalités dans Pulsar 2.7 et Flink 1.12. Nous allons contribuer à Pulsar Flink Connector 2.7.0 au référentiel Flink . Si vous avez des inquiétudes concernant Pulsar Flink Connector, nhésitez pas à ouvrir les problèmes dans https://github.com/streamnative/pulsar-flink/issues .

À propos de lauteur

Jianyun Zhao est ingénieur logiciel chez StreamNative. Auparavant, il était responsable du développement dun système informatique en temps réel chez Zhaopin.com. Vous pouvez le suivre sur twitter .

Jennifer Huang est un committer Apache Pulsar. Elle travaille comme stratège de contenu senior chez StreamNative, responsable de la documentation Apache Pulsar et de la croissance de la communauté. Vous pouvez la suivre sur twitter .

Ce message a été initialement publié sur blog StreamNative .

Vous aimez ce message? Veuillez recommander et / ou partager.

Vous voulez en savoir plus? Voir https://streamnative.io/blog . Suivez-nous sur Medium et consultez notre GitHub .