With the rapid development of the companion fish business, offline data is increasingly unable to meet the needs of operation students, and the real-time requirements of data are getting higher and higher. The previous real-time task was to synchronize data to TiDB in real time and use TiDB for micro-batch calculation. As more and more real-time scenarios emerge, TiDB can no longer meet the real-time data computing scenarios, and the calculations and queries are in one cluster, resulting in excessive cluster pressure and may affect normal business usage. It is already necessary to build a real-time data warehouse according to the business form. The real-time data warehouse mainly uses Flink as the computing engine, with distributed data storage media such as Redis and Kafka, and multi-dimensional analysis engines such as ClickHouse.

Companion Fish real-time operation application scenario

provides a stable environment based on the platform (unified scheduling mode, unified management, unified monitoring, etc.). We have built some real-time services to support various business parties through a service-based approach.

    >real-time data warehouse: data synchronization, business data cleaning and deduplication, related topic business data association splicing, and data aggregation and refining, etc., gradually build a multi-dimensional, multi-coverage real-time data warehouse system.

  • Real-time feature platform: real-time data extraction, calculation, and feature write-back.

Brief introduction: the current flow architecture diagram of data in companion fish:

The following mainly introduces the construction system of the real-time data warehouse of companion fish:


  • ODS layer data platform uniformly performs data analysis and processing and writes to Kafka.

  • DWD is more critical, and associates multiple data streams corresponding to the

  • same business domain data table into a complete log at the most granular level, and associates the corresponding dimensions to describe a complete fact.

  • DWS aggregates the data of each small business region according to the same dimensions and writes it to TiDB and ClickHouse. In TiDB, ClickHouse is associated again to form cross-business region aggregate data. For use by business and analysts.



layer complex scene data processing scheme After the data

is collected from the ODS layer, the data processing and processing are mainly concentrated in the DWD layer, and we face a lot of complex processing logic in the scene, this chapter focuses on the DWD layer data processing scheme in detail.

1. Data deduplication

Because MongoDB is used extensively for its internal business, MongoDB itself stores semi-structured data and does not have a fixed schema. When synchronizing Mongo’s oplog, the DWD layer of the real-time data warehouse does not require all fields to participate, and we will only extract fields with relatively high daily usage for table construction. This may change due to irrelevant data, and we will receive a record of the same data. For example, when classifying and analyzing the amount of a user’s order, if the address of the user’s order changes, we will also receive a business log, because we do not pay attention to the address dimension, so this log is useless duplicate data. This unprocessed data is inconvenient for BI engineers to use directly, and even directly affects the accuracy of calculation results. So for this non-Append-only data, we have customized the log format. In the binlog or oplog parsed by the platform, we still customize and add some metadata information to allow BI engineers to better understand what happened at the corresponding point in time when this data enters the real-time computing engine, and whether this fact participates in the calculation. Therefore, we have added fields such as metadata_table (original table name), metadata_changes (modify field name), metadata_op_type (DML type), metadata_commit_ts (modification timestamp) and other fields to help us better filter the duplicate data that we think is considered by the business.


  • two real-time data streams are associated.

  • Live streams are correlated with fact data that happened in the past.

  • Two real-time data streams are associated

    Using Redis based on memory, supporting a large number of QPS per unit time, fast access characteristics:

      > first we should observe the data within a certain range, Observe the out-of-order of the data in the time dimension. Set the time for data delay and the data cache time.

    • The services of companion fish are relatively stable, the data is out of order at most is the second-level difference, we usually choose a relatively large amount of data flow to do the mainstream, add window waiting for the main data stream (the window time does not have to be too long, such as 10s), the right data stream writes data to the Redis cache (minutes), when the mainstream window expires, ensure that the right stream data and cache are in Redis. Implement memory sharing between multiple operators within a Flink job. The advantages of this way are: simple enough and versatile; Flink job does not need to maintain business status, and the job is lightweight and stable. The disadvantage is that as the amount of data increases and the number of jobs increases, it will put a lot of pressure on the Redis cluster.


    Inside the Flink job, a complete user-state state management is provided, including state initialization, state update, state snapshot, and state recovery:

      > Put different tags on the data leftStream and rightStream, and combine leftStream and rightStream with contect operators. Perform group by operations on the join condition, and the data of the same group is cached and output in the state of the data in the precess operator. What is obtained downstream is the data that can be correlated.

    • For example, we can set the timer in the early morning of each day, clear the state, and trigger the strategy with the specific timer to see the business scenario.

    • Advantage: All processing logic for the entire job does not depend on other external storage systems and is calculated internally by Flink.

    • Disadvantages: If multiple data streams are associated, the overall amount of job code is large and the development cost is relatively high. The data is maintained by Flink, and the entire job memory load is high and the data volume is large, which has an impact on the overall stable operation of the job.

    The Flink community has recognized the pain points of multi-stream joins and provides a special join method that is different from offline SQL:

    • register watermarks for leftStream and rightStream respectively (preferably with event time).

    • Make an interval join leftStream with rightStream. (In a stream-to-stream join, a window join can only associate messages in the corresponding window in two streams, and cross-window messages cannot be associated, so they are discarded.) Interval Join does not have the concept of window, and directly uses timestamps as the condition of association, which is more expressive. The basic logic of the implementation of Interval join is relatively simple, mainly relying on TimeBoundedStreamJoin to complete the association of messages, and its core logic mainly includes message caching, processing of different association types, and message cleaning, but it is not simple to implement. Messages in one stream may need to be associated with multiple messages from another stream, so when a stream flow is associated, it usually needs to be associated with conditions similar to the following:).

    • Pros: Simple coding; The modification access of the entire job state is automatically done by the Flink source code, and the overall state load is relatively small with the user’s manual coding.

    • Disadvantages: The special join method is limited by the scene.

    Figure: <

    img class=”

    rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/YriaiaJPb26VMZ9RWnPMnSFD9vRy9j8x4diahqmckju0hMeOticNYhycvTgT1EQG5e2u65ut3WcEugrJEVPMhYIafg/640?wx_fmt=jpeg” >

    Flink Table & SQL temporal table:

      > In Flink, starting with 1.7, the concept of temporal tables (i.e., temporal tables) was introduced. Temporal Table simplifies and speeds up such queries and reduces the use of state Temporal Table is to append rows in an Append-Only table, interpret them as Chanlogs based on the set primary key and time (e.g. productID, updatedAt above), and provide a version of the data at a specific time.

    • When working with temporal tables, be aware of the following issues.

     Temporal Table provides data at a point in history. Temporal >Table tracks versions based on time. Temporal Table requires a time attribute and primary key. Temporal Table General and the keyword LATERAL TABLE Combine in combination. Temporal Table is based on ProcessingTime When processing time attributes, each primary key holds only the latest version of the data. Temporal Table is based on EventTime When the time attribute is processed, each primary key holds all versions from the previous watermark to the current system time.   table Join the right Temporal Table, which is essentially a left-table driven join, that is, get the key from the left table, and go to the right according to the key and time (possibly historical time). Temporal Table table query. Temporal >Table Join currently only supports Inner joins.  temporal  Table Join, Temporal on the right The Table table returns the latest version of the data. 

    For example:

    class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/YriaiaJPb26VMZ9RWnPMnSFD9vRy9j8x4dH1NIukATuewdNEsKlFs44ibQv4Iqu2iadNB9EztpBNicGvIw51fibGJtBA/640?wx_fmt=jpeg”>


    the associated historical data

      > we first need to analyze the expiration of historical data, for example, in the business scenario of companion fish, the user’s booking behavior and the data associated with the user’s online class may differ by a few days (the user will make an appointment for the next week’s course in advance). At this time, the expiration time of the data needs our special relationship and processing, we can accurately calculate the first occurrence of the event, its accurate expiration time, for example: for example, the official class time is three days later, so we can put them in the Redis cache (3+1)*24 h to ensure that when the user is in class, their appointment record is still warmed up in our memory.
    • If the expiration of historical data cannot be determined. For example, in the business scenario of companion fish, when a user is an important behavior (order), the user level used, and the bound teacher and other detailed information, we can only permanently cache them in Redis for factual data to access the association.

    3. From the perspective of

    data form JOIN


    there are three situations: one-to-one, many-to-one, and many-to-many.

    • For one-to-one, many-to-one, we only need to use Redis or state to cache the data stream of a single party.
    • For many-to-many

    • join cases: many-to-many joins. We can only connect leftStream and rightStream first, and cache the data in Redis or job momery at the day level, respectively. Whether left Steam or right stream, the data is cached first, to iterate through all the data that has arrived on the other side, and output it to the downstream.
    • For many-to-many left join

    • scenarios: The many-to-many left join scenario is more complicated, we can only connect leftStream and rightStream first, cache it in job momery or Redis, leftSteam or rightStream data will be cached first, Then go through all the data that has arrived on the other side and output it downstream. It’s just that at this time, for the data that is not on the downstream join, it is not a good judgment whether the data is really not joined, or because of the difference in the time of the data entering the operator, there is no join. At this time, we will write the data to TiDB or ClickHouse, and filter the data that does not have a join due to the difference in the time of entering the Operator operator in this OLAP engine that can be quickly calculated based on the amount of data at the day level.
    • Note that if you use Flink Operator State, you need to set the timer, or use Flink TTL to clean up the state regularly, otherwise the program will OOM. If you use Redis, you need to set invalid data settings or call offline scripts to delete data regularly.
    DWS Data Layer Data Processing Scheme

    What we usually store in the offline data warehouse is the coarse-grained number of cross-business domains. We also store it in this way inside the real-time data warehouse of the companion fish. It’s just that the correlation between cross-business domain data is not calculated in Flink real-time processing engine. Instead, put them in TiDB or ClickHouse for calculations. In Flink memory, we only calculate the aggregate metrics of the current business domain, and tag the data to mark the dimensions by which the data is aggregated and what the aggregation granularity is. (For example, in terms of time granularity, we usually aggregate data in small units of 5min or 10min), if you want to query the cross-business federation data of the day, the view will be predefined based on TiDB or ClickHouse, and the data in the topic of a single business domain on the day will be aggregated in the view first, and then the number of different business domains will be associated with the dimension tag marked in advance in the data to obtain cross-business aggregation indicators.

    Future and Outlook
      > In the future, we will continue to compare the advantages and disadvantages of Storm, Spark Streaming, Flink and other technology stack products in terms of use and performance. Looking forward to the richness of the Flink ecosystem, we will try to let some features such as Flink CDC, Flink ML, and Flink CEP play a role in our data warehouse construction.
    • Recent iterations of Flink SQL have also been fairly frequent. Due to Alibaba’s support for Flink planner, Flink’s concept of batch and stream integration is closer to reality, and we will try to use Flink as the processing engine of offline data warehouses to push Flink SQL in the company’s data set.
    • Continue to improve the real-time platform monitoring of Flink tasks and the optimization of resource management.


    https://ci.apache.org/projects/Flink/Flink-docs-release-1.13/ https://blog.csdn.net/wangpei1949/article/details/103541939

    author: Li Zhen Source: Companion Fish Technology Blog

    Original text: https://tech.ipalfish.com/blog/2021/06/29/flink_practice/



    public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.

    like + Looking, less bugs 👇