FollowApache Flinkfor more technical dry goods

Abstract: This article is compiled from the topic “Pravega Flink Connector Table API” shared by Dell Technologies software engineer Yumin Zhou at Flink Forward Asia 2021 Advanced Feature Exploration”, article reads:

  1. Pravega Schema Registry project introduces
  2. Catalog API integration
  3. Debezium Support
  4. Community Joint White Paper


: Click “Read Original.” Article” for FFA 2021 Summit materials~
At Flink Forward Asia 2020 , our team shared topic

As shown in the figure above, when the writer writes an event, the reader needs to deserialize such an event with the same schema in order to get accurate data from binary. Some sensitive information may even be further protected by encryption coding on top of metadata.

We need to have a centralized schema metadata for Pravega’s stream, including storage in encoding formats, which can be written and defined by writers, and then all readers can read them. Obtain the structure and anti-sequence method of the data to reach a consensus on the schema.

We also hope that this metadata storage can not rely on an additional component, and it is best to take advantage of Pravega itself, which not only saves the cost of operation, but also takes advantage of Pravega’s own efficient and persistent storage features.

Further, a very common business scenario is schema changes. In the same stream, as the business expands, the half-structured data written may introduce new or change some fields to support more business processes and jobs. This is also compatible with many format standards, such as Avro. Specifically, when another writer writes data to the new schema, we not only need to ensure that the reader on the original wire can continue to work, but also support the reading of the data for the new reader that needs to use the new field. Therefore, the existence of the Pravega Schema Registry is best to ensure that there is no need to wait for the reader to actually try to deserialize to learn of the change, but to configure format compatibility on the write side and do some intervention when registering a new schema, so that some management of read and write client applications can be carried out more quickly.

2. Project Introduction

Based on these motivations, we developed a project like Pravega Schema Registry.

It is a schema for storing and managing semi-structured data in Pravega, and then also provides a RESTful interface to manage the stored schema, the encoding format of the data, and the compatibility policy. The interface abstraction we provide is very open, not only built-in Avro, protobuf, json and other common serialization formats, lz4, snappy and other common compression algorithms, but also support some custom serialization methods. This abstraction is a more versatile solution than some other similar projects in the industry. In all serialization management, we can customize the corresponding encoding format and compatibility policy, so that users can freely use any serialization method to process data. The entire project is stored using the functionality of Pravega Key Value Table, which is the underlying implementation of the Schema Registry. Pravega Key Value Table was only one of Pravega’s internal metadata storage components, but slowly we developed a public API that became public in version 0.8 and entered a stable beta version in version 0.10.

The Pravega Schema Registry project also maintains relative independence, except for the underlying implementation using the Pravega key value table, all the abstractions at the top are independent and not limited to Pravega, and the entire project is not open source within Pravega, but as a separate project in the ecology. This allows for more general-purpose storage systems, including common file and object storage, as a schema-managed solution.

3. System Architecture


system architecture of the whole project is shown in the figure.

The Schema Registry can interact with clients through RESTful APIs as well as the GRPC protocol. Group corresponds to a schema-managed unit, and Pravega corresponds to a Stream, which stores the default serialization format, compatibility configuration, and multi-version serialization and encoding information. As mentioned earlier, this information is stored as key-value pairs on the Pravega segment.

From the perspective of the data link, the write side needs to use a special event serialization method with header bytes with protocol version and encoding id, so that the Schema Registry can step in to register or verify the schema of the data to see if it meets the encoding and compatibility requirements, and then allow the schema to legally enter the Pravega stream’s storage. Similarly, the read side needs to be read with such special deserialization.

Second, Catalog API integration

With Schema Registry, the mapping between Catalog and Pravega is more obvious.

As shown in the figure, the stream data in the stream

plus the schema stored by the Schema Registry allows us to successfully deserialize and synthesize a table in Flink Catalog based on the table structure, and likewise the table-to-stream conversion can be done in the opposite link. Therefore, traditional Catalog operations such as table building, table deletion, and library deletion can be abstracted as metadata changes to Pravega and Schema Registry.

Based on this theory, we have initially implemented the catalog interface, so that users can use the following DDL to build a catalog, and use SQL to manipulate Pravega’s metadata.

 CREATE CATALOG pravega_catalog WITH(  'type' = 'pravega',  'default-database' = 'scope1',  'controller-uri' = 'tcp://localhost:9090' , 'schema-registry-uri' = ' http://localhost:9092'); 

However, when we polished further on the prototype of the first version, we encountered three difficulties in implementation details.

  • first is the handling of schema and serialization. Prior to version 1.12, Flink converted the internal abstraction RowData of JSON and Avro data with row records of Flink tables The process is Flink’s internal and private class under format. So if we wanted to reuse this part of the code to maintain consistency with Flink’s behavior for schema and serialization conversion, we had to copy the entire class library. So we asked the community if we could abstract it away and convert it to a public class. After discussion and communication with the community, we also opened the corresponding JIRA FLINK-19098[1] and contributed code to fix this problem, successfully opening the link of data serialization converted in the Catalog table.
  • The second point is that both Avro and JSON formats are supported. In the existing catalog implementation, the serialization method is actually relatively fixed, which is somewhat inconsistent with the generality of the Pravega Schema Registry design philosophy. Some users may be accustomed to using Avro and others are accustomed to using JSON, how do we balance the needs of both so that they can enjoy the convenience of Catalog? Here we introduce the serialization format option in the catalog, which opens up not only the ability to specify Avro and JSON, but also all the serialization additional configurations officially supported by Flink, such as the serialization format of timestamp, to further customize the method applied to the entire catalog sequence.
  • The third difficulty is that the event we handle through Schema Registry will have five byte headers at the beginning, resulting in no way to directly use Flink’s ready-made JSON and Avro format for serialization, in order to use the serializer API provided by Schema Registry, we need to develop our own set of format factories to call. Specific to the implementation, we inherit the relevant parameters in the Catalog table and serialization, that is, the Schema Registry API, including namespace, group and other information, into the format factory parameters, which is equivalent to the following table DDL.
 CREATE TABLE clicks ( ... ) WITH (  'connector' = 'pravega', ...  'format' = 'pravega-registry',  'pravega-registry.uri' = 'tcp://localhost:9092' , 'pravega-registry.namespace' =  'scope1', '' = 'stream1', 'pravega-registry.format'  = 'Avro'); 

Then get this information in the implementation to call the Schema Registry API, and then stitch together the fixes of the previously mentioned problems, and we have completed the complete binary data and Flink RowData of interop. The whole link is up.


Debezium support

First, let’s introduce a concept of the whole big CDC. CDC, short for Change Data Capture, is a methodology for identifying and tracking data changes, and then taking action. But in reality this is a broad concept. In the modern and more industry practical experience, CDC appears more in a relatively narrow technical term, that is, for the database scenario, for the database log analysis, and then into a specific format of data flow a new technology. For example, common implementations, such as Debezium and the commonly used Canal in China.

Debezium, the industry’s most widely used CDC technology, is based on Kafka Connect and is a distributed platform for transforming database row-level changes into event streams. At present, the application scenarios of CDC technology in the industry are also very extensive, including data synchronization for backup, disaster recovery, data distribution to multiple downstream systems, and ETL integration for docking data lakes.

Debezium currently has three deployment methods:


    The first, and the most commonly used in the industry, is deployed with Kafka Connect. As mentioned earlier, this is also a way that Debezium was supported since its inception, which is to analyze the binlog of traditional databases such as MySQL or Postgres DB and import it into Apache Kafka through the Debezium source implementation of Kafka connect. Using Kafka’s powerful ecosystem, it can be connected to rich downstream engines for further aggregation computing and data lake applications.

    Second, the

  • Debezium community has also seen changes in the message queuing and streaming storage space, slowly spinning off Kafka. It now supports starting a separate Debezium server to connect to a downstream messaging system using the source connector. There are now integrations such as Amazon Kinesis and Google’s PubSub. In the first half of 2021, the Debezium community released version 1.6, in which Debezium officially accepted the contributions of our Pravega community. Pravega’s sink has also become one of the implementations of the source connector, enabling hand-in-hand with the Debezium community.
  • The last option is to embed Debezium as a dependency library in a Java program for invocation. The more famous ones are the very hot Yun Xie in the community and the Flink CDC connector in charge of Xuejian Teacher. In scenarios where there is no need for long-term storage or reuse of CDC data, this lightweight implementation can remove the complexity of Message Queuing deployment and O&M, while still ensuring computational reliability and fault tolerance.

2. Write mode

During the integration of Debezium with Pravega, we have maintained the integrity of the implementation and also provided both normal and transactional writes.

The Debezium server is actually a periodic, batch pull process in the source connector, that is, the interface side receives a Debezium batch upsert stream. As you can see from the figure on the right, inside each parenthesis is a batch pulled, where yellow represents update and white represents insert.

If it is a normal write, we will write all events, whether they are insert or update, sequentially as separate events. You don’t have to worry about the order in a distributed environment. Because every Debezium event carries a key in a database table. Then when Pravega writes, it can also carry the corresponding routing key to make the events of the same key fall into the same segment, and Pravega can ensure the order of the data on the same routing key.

Next, looking at transactional writes, for each Debezium batch, Pravega encapsulates it in a transaction and commits the Pravega transaction when the batch is complete. In this way, when a failover occurs on the Debezium server, due to the atomicity and idempotency of the Pravega transaction, all events can be played back and output without repeating, so as to ensure accurate semantics.

After version 1.6, users can also use the parameters shown in the table to configure the Debezium server, just fill in the corresponding Pravega connection parameters, specify the scope name and transactional write switch, you can synchronize the changes of all tables in a database of MySQL and other databases in real time, and write to the Pravega stream with the same name as the table name in the Debezium message format.

3. In

addition to Pravega’s contribution in Debezium, in order to use Flink’s stream processing power to consume data, we also need to do corresponding integration on the Pravega Flink connector side of the computing side.

We implemented Table Factory on the existing Table API of FLIP-95 to support basic read and write functions. Some people who know the Flink Table API may ask, the community has provided the implementation of format factories such as debezium-json, it seems to be directly applied, specifying the corresponding format can be used simply, what is the difficulty?

At first we thought so, but it was far from simple. There are two main difficulties we encounter.

One is the need for additional support for the ability to desequence multiple events.

What Debezium provides is an upsert stream, which transforms the Flink table’s Rowdata In abstraction, an event for an insert is a common one-to-one deserialization process. But for update, we need to convert two events to the pre-update and post-update states. This is completely contrary to our implementation of connector serialization as a default one-to-one.

Since Pravega’s own serializer interface is also a one-to-one mapping, and to ensure that Pravega’s serializer is related to Flink DeserializationSchema The interoperability of the interface, we have also done a lot of code support on it, which is convenient for users to use. In order to support this new requirement, we had to refactor the previous deserialization link and move the original deserialization process from the Pravega client to the Pravega connector to do it, using the following deserialize method with collector.

 default void deserialize( byte[] message, Collector out) throws IOException { 

At the same time, the entire code modification link also needs to be very careful, which still ensures the compatibility of the original Pravega serializer with the Flink API conversion API, so as not to affect the upgradeability of online users.

The second is FLIP-107, regarding the support of metadata on the Table source side.

Pravega Table Source implements the SupportsReadingMetadata interface provided by Flink to provide such support. Users can use the from-format prefix to specify metadata from the format factory itself, such as Debezium’s ingestion timestamp, table name, and other meta information to complete and enrich the table information. We also support metadata from Pravega itself, called EventPointer to record the position information of the current event in the stream, recording this information also helps users store this data, and then complete the subsequent random reading or even indexing needs. The DDL of the entire create table containing the metadata is as follows, and these newly created columns appear in the Flink Table abstraction after the original data structure.

 CREATE TABLE debezium_source ( //  Raw Schema id INT NOT NULL, // Format metadata  origin_ts TIMESTAMP(3) METADATA FROM 'from_format.ingestion-timestamp'  VIRTUAL, origin_table STRING METADATA FROM 'from_format.source.table' VIRTUAL,  // Connector metadata event_pointer BYTES METADATA VIRTUAL); 
That’s all for our complete Debezium support from Pravega to Connector.

4. Community joint white paper released

> Teng Yu  (Left) – Jiangjie Qin, founder of Pravega’s China community and director of OSA software development at Dell Technologies (right) – Apache Flink PMC, Head of

Ecosystem Technology for

Alibaba’s Open Source Big Data Team

CDC real-time processing of databases is a very important application scenario in the big data industry, such as data synchronization, distribution, and real-time data warehouses. With the integration of Debezium, Pravega can be used as a solution for a unified messaging intermediate storage layer to meet users’ database synchronization needs with Apache Flink and Debezium. The solution uses Pravega as the message middleware, which can give full play to Pravega’s real-time persistent storage characteristics, and can further provide reliable data storage for data reuse for other applications on the basis of ensuring the real-time real-time performance of the link in milliseconds and data consistency. At the same time, using the rich ecosystem downstream of Apache Flink, it can be easily connected to more systems to meet various business needs. The entire integration process was accomplished by Pravega in collaboration with the Flink community.

The Pravega community is also excited to work with the Flink community to publish our joint Pravega × Flink Open Source Solution for Building Real-Time Database Synchronization. Interested partners can click the link to get detailed information.

FFA 2021 Live Replay & Presentation PDF available

For more technical questions related to Flink, you can scan the code to join the community DingTalk communication group~

    Poke me to get FFA 2021 Summit materials~