Novedades de Pulsar Flink Connector 2.7.0

( Sijia-w) (24 de diciembre de 2020)

Más información las características más interesantes y principales de Pulsar Flink Connector 2.7.0.

Pulsar Flink Connector proporciona una solución ideal para lotes unificados y streaming con Apache Pulsar y Apache Flink. Pulsar Flink Connector 2.7.0 admite funciones en Pulsar 2.7 y Flink 1.12, y es totalmente compatible con el formato de datos Flink.

Acerca de Pulsar Flink Connector

Para que las empresas accedan a datos reales tiempo, necesitan capacidades unificadas por lotes y de transmisión. Apache Flink unifica el procesamiento por lotes y de flujo en un solo motor informático con «flujos» como representación unificada de datos. Aunque los desarrolladores han realizado un trabajo extenso en las capas de computación y API, se ha realizado muy poco trabajo en las capas de datos, mensajería y almacenamiento. Sin embargo, en realidad, los datos se segregan en silos de datos, creados por diversas tecnologías de almacenamiento y mensajería. Como resultado, todavía no existe una única fuente de la verdad y la operación general para los equipos de desarrolladores sigue siendo complicada. Para abordar las operaciones desordenadas, necesitamos almacenar datos en secuencias. Apache Pulsar (junto con Apache BookKeeper) cumple perfectamente con los criterios: los datos se almacenan como una copia (fuente de la verdad) y se puede acceder a ellos en secuencias (a través de interfaces pub-sub) y segmentos (para procesamiento por lotes). Cuando Flink y Pulsar se unen, las dos tecnologías de código abierto crean una arquitectura de datos unificada para empresas basadas en datos en tiempo real.

El conector Pulsar Flink proporciona procesamiento de datos elástico con Apache Pulsar y Apache Flink , lo que permite que Apache Flink leer / escribir datos desde / hacia Apache Pulsar. Pulsar Flink Connector le permite concentrarse en la lógica de su negocio sin preocuparse por los detalles de almacenamiento.

Desafíos

Cuando desarrollamos por primera vez el Pulsar Flink Connector, recibió una amplia adopción tanto de los Comunidades Flink y Pulsar. Aprovechando el conector Pulsar Flink, Hewlett Packard Enterprise (HPE) construyó una plataforma informática en tiempo real, BIGO construyó un sistema de procesamiento de mensajes en tiempo real, y Zhihu está en el proceso de evaluar el ajuste del conector para un sistema informático en tiempo real.

A medida que más usuarios adoptaron Pulsar Flink Connector, escuchamos un problema común de la comunidad: es difícil realizar la serialización y deserialización. Si bien el conector Pulsar Flink aprovecha la serialización Pulsar, las versiones anteriores no admitían el formato de datos Flink. Como resultado, los usuarios tuvieron que hacer muchas configuraciones para poder usar el conector para hacer computación en tiempo real.

Para hacer que el conector Pulsar Flink sea más fácil de usar, decidimos desarrollar las capacidades para admite el formato de datos Flink, por lo que los usuarios no necesitan perder tiempo en la configuración.

¿Qué hay de nuevo en Pulsar Flink Connector 2.7.0?

El Pulsar Flink Connector 2.7.0 admite funciones en Apache Pulsar 2.7.0 y Apache Flink 1.12, y es totalmente compatible con el conector Flink y el formato de mensaje Flink. Ahora, puede utilizar funciones importantes en Flink, como el sumidero de una sola vez, el mecanismo Pulsar de inserción, las columnas calculadas del lenguaje de definición de datos (DDL), las marcas de agua y los metadatos. También puede aprovechar la suscripción de clave compartida en Pulsar y realizar la serialización y deserialización sin mucha configuración. Además, puede personalizar fácilmente la configuración en función de su negocio.

A continuación, presentamos en detalle las características clave de Pulsar Flink Connector 2.7.0.

Cola de mensajes ordenados con alto rendimiento

Cuando los usuarios necesitaban garantizar estrictamente el orden de los mensajes, solo un consumidor podía consumir mensajes. Esto tuvo un impacto severo en el rendimiento. Para abordar esto, diseñamos un modelo de suscripción Key\_Shared en Pulsar. Garantiza el orden de los mensajes y mejora el rendimiento al agregar una clave a cada mensaje y enruta los mensajes con el mismo Key Hash a un consumidor.

Pulsar Flink Connector 2.7.0 admite el modelo de suscripción Key\_Shared. Puede habilitar esta función configurando enable-key-hash-range en true. El rango de Key Hash procesado por cada consumidor se decide por el paralelismo de tareas.

Presentamos la semántica de exactamente una vez para Pulsar sink (basada en la transacción Pulsar)

En versiones anteriores, sink Los operadores solo admitían la semántica al menos una vez, que no podía cumplir completamente los requisitos de coherencia de un extremo a otro. Para deduplicar los mensajes, los usuarios tenían que hacer un trabajo sucio, que no era fácil de usar.

Las transacciones son compatibles con Pulsar 2.7.0, lo que mejorará en gran medida la capacidad de tolerancia a fallas de Flink sink. En Pulsar Flink Connector 2.7.0, diseñamos semántica de exactamente una vez para operadores de sumideros basados ​​en transacciones de Pulsar. Flink utiliza el protocolo de confirmación de dos fases para implementar TwoPhaseCommitSinkFunction. Los métodos principales del ciclo de vida son beginTransaction (), preCommit (), commit (), abort (), recoveryAndCommit (), recoveryAndAbort ().

Puede seleccionar la semántica de forma flexible al crear un operador Los cambios lógicos internos son transparentes. Las transacciones Pulsar son similares al protocolo de confirmación de dos fases en Flink, que mejorará en gran medida la confiabilidad de Connector Sink.

Es fácil implementar beginTransaction y preCommit. Solo necesita iniciar una transacción Pulsar y conservar el TID de la transacción después del punto de control. En la fase de confirmación previa, debe asegurarse de que todos los mensajes se descarguen en Pulsar, y los mensajes confirmados previamente se confirmarán eventualmente.

Nos centramos en recoveryAndCommit y recoveryAndAbort en la implementación. Limitado por las características de Kafka, el conector Kafka adopta estilos de pirateo para recoveryAndCommit. Las transacciones de Pulsar no dependen del Productor específico, por lo que es fácil para usted confirmar y cancelar transacciones basadas en TID.

Las transacciones de Pulsar son altamente eficientes y flexibles. Aprovechando las ventajas de Pulsar y Flink, el conector Pulsar Flink es aún más potente. Continuaremos mejorando el receptor transaccional en el conector Pulsar Flink.

Presentamos el conector upsert-pulsar

Los usuarios de la comunidad Flink expresaron sus necesidades para el upsert Pulsar. Después de revisar las listas de correo y los problemas, hemos resumido las siguientes tres razones.

  • Interprete el tema Pulsar como un flujo de registro de cambios que interpreta los registros con claves como eventos upsert (también conocidos como insertar / actualizar).
  • Como parte de la canalización en tiempo real, únase a varios flujos para enriquecerlos y almacene los resultados en un tema Pulsar para realizar cálculos posteriores. Sin embargo, el resultado puede contener eventos de actualización.
  • Como parte del proceso en tiempo real, agregue los flujos de datos y almacene los resultados en un tema de Pulsar para realizar cálculos posteriores. Sin embargo, el resultado puede contener eventos de actualización.

Según los requisitos, agregamos soporte para Upsert Pulsar. El conector upsert-pulsar permite leer y escribir datos en temas de Pulsar de la manera upsert.

  • Como fuente, el conector upsert-pulsar produce un flujo de registro de cambios, donde cada registro de datos representa un evento de actualización o eliminación. Más precisamente, el valor en un registro de datos se interpreta como una ACTUALIZACIÓN del último valor para la misma clave, si existe (si una clave correspondiente aún no existe, la actualización se considerará INSERT). Usando la analogía de la tabla, un registro de datos en un flujo de registro de cambios se interpreta como un UPSERT (también conocido como INSERT / UPDATE) porque se sobrescribe cualquier fila existente con la misma clave. Además, los valores nulos se interpretan de una manera especial: un registro con un valor nulo representa un “BORRAR”.
  • Como receptor, el conector upsert-pulsar puede consumir un flujo de registro de cambios. Escribirá datos INSERT / UPDATE\_AFTER como el valor normal de los mensajes Pulsar y escribirá los datos DELETE como mensajes Pulsar con valores nulos (indique la piedra de desecho para la clave). Flink garantizará el orden de los mensajes en la clave principal mediante la partición de datos en los valores de las columnas de la clave principal, por lo que los mensajes de actualización / eliminación de la misma clave caerán en la misma partición.

Admite la nueva interfaz de origen y la API de tabla introducidas en FLIP-27 y FLIP-95

Esta función unifica la fuente del flujo por lotes y optimiza el mecanismo para el descubrimiento de tareas y la lectura de datos. También es la piedra angular de nuestra implementación de la unificación de streaming y por lotes de Pulsar. La nueva Table API admite columnas calculadas DDL, marcas de agua y metadatos.

Admite metadatos de lectura y escritura de SQL como se describe en FLIP-107

FLIP-107 permite a los usuarios acceder a los metadatos del conector como una columna de metadatos en las definiciones de la tabla. En la informática en tiempo real, los usuarios suelen necesitar información adicional, como eventTime, campos personalizados. El conector Pulsar Flink admite metadatos de lectura y escritura SQL, por lo que es flexible y fácil para los usuarios administrar los metadatos de los mensajes Pulsar en Pulsar Flink Connector 2.7.0. Para obtener más detalles sobre la configuración, consulte Manipulación de metadatos de mensajes de Pulsar .

Agregar tipo de formato Flink atomic para admitir tipos primitivos Pulsar

En Pulsar Flink Connector 2.7.0, agregamos el tipo de formato Flink atomic para admitir tipos primitivos Pulsar. Cuando el procesamiento de Flink requiere un tipo primitivo Pulsar, puede usar atomic como formato de conector. Para obtener más información sobre los tipos primitivos de Pulsar, consulte https: //pulsar.apache.org / docs / en / schema-understand / .

Migración

Si está utilizando la versión anterior de Pulsar Flink Connector, necesita ajustar los parámetros de SQL y API en consecuencia. A continuación proporcionamos detalles sobre cada uno.

SQL

En SQL, hemos cambiado los parámetros de configuración de Pulsar en la declaración DDL. El nombre de algunos parámetros se cambia, pero los valores no se cambian.

  • Elimine el prefijo connector. de los nombres de los parámetros.
  • Cambie el nombre del parámetro connector.type a connector.
  • Cambie el nombre del parámetro del modo de inicio de connector.startup-mode en scan.startup.mode.
  • Ajuste las propiedades de Pulsar como properties.pulsar.reader.readername=testReaderName.

Si usa SQL en Pulsar Flink Connector, necesita ajustar su configuración SQL en consecuencia cuando migre a Pulsar Flink Connector 2.7.0. El siguiente ejemplo muestra las diferencias entre las versiones anteriores y la versión 2.7.0 para SQL.

SQL en versiones anteriores :

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector.type" ="pulsar",
"connector.version" = "1",
"connector.topic" ="persistent://public/default/test\_flink\_sql",
"connector.service-url" ="pulsar://xxx",
"connector.admin-url" ="http://xxx",
"connector.startup-mode" ="earliest",
"connector.properties.0.key" ="pulsar.reader.readerName",
"connector.properties.0.value" ="testReaderName",
"format.type" ="json",
"update-mode" ="append"
);

SQL en Pulsar Flink Connector 2.7.0:

create table topic1(
`rip` VARCHAR,
`rtime` VARCHAR,
`uid` bigint,
`client\_ip` VARCHAR,
`day` as TO\_DATE(rtime),
`hour` as date\_format(rtime,"HH")
) with (
"connector" ="pulsar",
"topic" ="persistent://public/default/test\_flink\_sql",
"service-url" ="pulsar://xxx",
"admin-url" ="http://xxx",
"scan.startup.mode" ="earliest",
"properties.pulsar.reader.readername" = "testReaderName",
"format" ="json");

API

Desde una perspectiva de API, ajustamos algunas clases y permitimos una personalización más sencilla.

  • Para resolver problemas de serialización, cambiamos la firma del método de construcción FlinkPulsarSink, y agregamos PulsarSerializationSchema.
  • Eliminamos las clases inapropiadas relacionadas con la fila, como FlinkPulsarRowSink, FlinkPulsarRowSource. Si necesita lidiar con el formato Row, puede usar componentes de serialización relacionados con Flink Row.

Puede construir PulsarSerializationSchema usando PulsarSerializationSchemaWrapper.Builder. TopicKeyExtractor se mueve a PulsarSerializationSchemaWrapper. Cuando ajuste su API, puede tomar la siguiente muestra como referencia.

new PulsarSerializationSchemaWrapper.Builder(new SimpleStringSchema())
.setTopicExtractor(str -> getTopic(str))
.build();

Plan futuro

Hoy, estamos diseñando un lote y solución de flujo integrada con Pulsar Source, basada en la nueva API de Flink Source (FLIP-27). La nueva solución desbloqueará las limitaciones de la interfaz de fuente de transmisión actual (SourceFunction) y, simultáneamente, unificará las interfaces de origen entre las API de transmisión por lotes y las de transmisión.

Pulsar ofrece una arquitectura jerárquica donde los datos se dividen en transmisión, lote, y datos fríos, que permiten que Pulsar proporcione una capacidad infinita. Esto hace que Pulsar sea una solución ideal para lotes y streaming unificados.

La solución por lotes y streaming basada en la nueva API Flink Source se divide en dos partes simples: SplitEnumerator y Reader. SplitEnumerator descubre y asigna particiones, y Reader lee los datos de la partición.

Pulsar almacena mensajes en el bloque del libro mayor, y usted puede ubicar los libros mayores a través del administrador de Pulsar y luego proporcionar la partición del corredor, la partición BookKeeper, la partición del descargador y otra información a través de diferentes políticas de particionamiento. Para obtener más detalles, consulte https://github.com/streamnative/pulsar-flink/issues/187 .

Conclusión

Lanzamiento del Pulsar Flink Connector 2.7.0 y recomendamos encarecidamente a todos que utilicen Pulsar Flink Connector 2.7.0. La nueva versión es más fácil de usar y está habilitada con varias funciones en Pulsar 2.7 y Flink 1.12. Contribuiremos con Pulsar Flink Connector 2.7.0 al repositorio de Flink . Si tiene alguna inquietud sobre Pulsar Flink Connector, no dude en abrir problemas en https://github.com/streamnative/pulsar-flink/issues .

Acerca del autor

Jianyun Zhao es ingeniero de software en StreamNative. Antes de eso, fue responsable del desarrollo de un sistema informático en tiempo real en Zhaopin.com. Puedes seguirlo en twitter .

Jennifer Huang es un confirmador de Apache Pulsar. Trabaja como estratega de contenido senior en StreamNative, responsable de la documentación de Apache Pulsar y el crecimiento de la comunidad. Puedes seguirla en twitter .

Esta publicación se publicó originalmente en el blog StreamNative .

¿Te gusta esta publicación? Recomiende y / o comparta.

¿Desea obtener más información? Consulte https://streamnative.io/blog . Síganos en Medium y echa un vistazo a nuestro GitHub .