Baby, remember the volcano engine big data field that the blogger went to a few days ago, and the more interesting thing for everyone is the last lecture, the flink standardized cleaning and dismantling task of the byte one-stop burying platform.

Among them, everyone feels that the more streamlined thing is that the byte has been done:

  1. without restarting the task can be online and offline with new dismantling and cleaning rules, and all rule changes do not need to involve the restart of the task.
  2. Cleaning UDF, RPC interface hot loading

is

generally a task that never stops, it is impossible to stop, okay, beiber.

Byte volcano engine PPT. Official account reply 20210724 get.

6

The blogger in this article mainly introduces the first point, that is, to dynamically change the rules, you can dynamically add a sink kafka topic, dynamically delete a sink kafka topic, and do not restart the task. I believe that it can throw bricks and lead jade and give you some inspiration.

This

article introduces the framework implementation in detail from the following chapters:

    > background – why do you need such a

    framework

  1. definition, goals – do the goals of this framework, what are the expected effects
  2. Analysis of Difficulties –

  3. The Difficulties of the Construction of this Framework, the Industry’s Current Implementation
  4. of

  5. Data Construction-Framework Specific Scheme DesignData
  6. Assurance Chapter-Framework Guarantee Program
  7. Summary and Outlook 2.Background

Why do you need such a frameworkFirst

, let’s take a look at the background of Byte’s doing this:

    > Task restart causes data delay: For enterprises like Byte, and there will be a lot of new burying points every day, adding these new tracking conditions to the flink task will be restarted, but the byte client log traffic is tens of millions of QPS, which means that this flink task must take a long time once it is restarted, which is unacceptable for latency-sensitive services.
  1. Reduce the chimney consumption of the original client logs and save resources
  2. Unified and standardized tracking platform: Users can use the correct data
  3. through the tracking platform to link with the tracking platform, unified, Standardized streaming data processing platform: Users can obtain the hierarchical guarantee ability of the desired unified and standardized data data through this platform:
  4. Dump log, the

  5. output of the log needs to be guaranteed by priority (death protection, best effort to protect…). Users can rest assured that the data

is as follows:

class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyW0bicUNKs66vibRdKD8ia05KxPmh4pzs5vaELHedEY1Q1ic3YMYXgC546Aw/640?wx_fmt=png”>

thus gave birth to this framework.

3. Definition, Goal – What are the goals and expected effects

of this framework, there

are many above pain points, this section will solve the problem from the perspective of the most painful task restart delay, and reveal the implementation of the flink task with byte dynamic configuration.

The

expected effect is as follows:

1. That is, a dynamic rule, a sink kafka topic can

be dynamically launched without stopping the task, and a kafka topic of the stream data corresponding to a certain or certain type of buried point can be online

As shown on the left, modify the configuration, add a flow removal rule and the corresponding topic, the rule topic on the right begins to produce data, and the corresponding console consumer consumes the data of the composite rule. (GIFs can be slow to load).

8
2. That is, a dynamic rule, a sink kafka topic, and a kafka topic corresponding to a certain or certain type of buried point can be dynamically offline without stopping

As shown on the left, modify the configuration, delete a flow removal rule and the corresponding topic, the rule topic on the right will not produce data, and the corresponding console consumer will have no new data to consume. (GIFs can be slow to load).

9

3. The overall effect is as follows:

4. Analysis of difficulties – the difficulties of this framework construction and the current implementation of the industry first take you to analyze what the most basic modules need to contain to implement this framework

:

    flink task:

  1. itself is a map task, the logic is simple
  2. dynamic online and offline

  3. rule configuration: there must be a dynamic configuration center to tell flink task that needs a new online and offline Kafka topic
  4. Dynamic rule filtering engine: After the flink task detects a dynamic change in the rule, it must update the rule and apply the new rule rule. Need a dynamic code execution engine
  5. to dynamically go up and down Kafka topic: At present, most companies use flink’s own kafka-connector, once it involves adding a downstream, you need to add a Kafka producer operator, because it involves adding an additional operator, that must restart the task. The ability to dynamically add and remove producers is required.

5. Data construction – framework specific scheme design

Let’s talk about the conclusion of scheme selection

:

  1. flink entry task: Map model uses ProcessFunction
  2. Dynamic online and offline rule configuration of the underlying operator

  3. : There are many open source configuration centers, and in order to achieve lightweight and simple implementation, zookeeper is used as the dynamic rule configuration center. Of course, if there is a lot of pressure on ZK, you can also use the broadcast configuration to implement it.
  4. Dynamic rule engine: There are many rule engines, such as the common JavaScript, Groovy, jython, mvel2, freemarker and so on, too many. Considering performance and ease of use, Janino is chosen to dynamically compile dynamic rules into classes. It is then used as a dynamic rule engine. The reasons for choosing Janino will be explained in more detail later.
  5. Dynamic online and offline Kafka topic: Remove flink-kafka-connector, directly use native kafka-clients to output data in ProcessFunction, and maintain a producer pool.

The overall scheme architecture diagram is shown in the figure:

class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyW4804CUdRnKlBx79O3vfj9OMibP6oyJGfM7Csciaauy9ctvLpTnice2GRg/640?wx_fmt=png”>

5.1.2. Expected effect

5.1.2.1. Online configuration

4

5.1.2.2. Offline configuration

class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyWdcUNzYlyVRJeb25Iyjrufq7XMJfQfMaw9JoxicxMj2fCfSjE6LEHLicA/640?wx_fmt=png”>

5

5.2. The concrete implementation

of the entire task is very simple.

To run locally, you can refer to the following two articles to install zk and kafka.

  • zk:https://www.jianshu.com/p/5491d16e6abd
  • kafka:https://www.jianshu.com/p/dd2578d47ff6

5.2.1.flink task entry logic

Let’s first look at the entry logic of the entire task, the function of ProcessFunction is simple:

    >
  1. For each log data in
  2. the data source, as

  3. long as the data meets the conditions of a rule, the log data is written out to the
 topic corresponding to the rule  env.addSource(new UserDefinedSource())
.process(new ProcessFunction() {
Dynamic Rule Configuration Center
private  ZkBasedConfigCenter zkBasedConfigCenter;
kafka producer management center
private KafkaProducerCenter kafkaProducerCenter;

        @Override


        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
            this.kafkaProducerCenter = KafkaProducerCenter.getInstance();        }

        @Override


        public void processElement(ClientLogSource clientLogSource, Context context, Collector  collector)
throws Exception
{

Iterate through all dynamic rules


this.zkBasedConfigCenter.getMap().forEach( new BiConsumer() {
                @Override
                public void accept (Long id, DynamicProducerRule dynamicProducerRule) { Verify that the
data conforms to the rule
if (dynamicProducerRule.eval(clientLogSource)) {
Send the data that matches the rule to the topic of the corresponding rule                         kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString());                    }                }            });        }

        @Override


        public void close() throws Exception {
            super .close();
Close the rule pool
this.zkBasedConfigCenter.close();
Close the producer pool
this.kafkaProducerCenter.close();        }    }); env.execute();

5.2.2. Dynamic online and offline rule configuration

to see the core point of flink ProcessFunction, the first part is ZkBasedConfigCenter. Its functions include:

    when the task starts, initialize and load the zk configuration, initialize the rule pool, compile the configuration rules in the rule pool

  1. into class executable rules
  2. to

  3. listen for zk configuration changes, add the new configuration to the rule pool, and delete the offline configuration from the rule pool

5.2.2.1. Dynamic rules Schema design

dynamic rules contain content that is closely related to user needs:

for example, users need to write client logs that report + ID > 300 users on the homepage topic_id_bigger_than_300_and_ main_page in the Kafka topic.

Then for this flink task, there are the following three user inputs

:

    > the filter condition of the dynamic rule: that is, after each piece of upstream data comes over, check whether the data meets the rules conditions. The condition of the above example is clientLogSource.getId() > 300 && clientLogSource.getPage().equals("Home"); where clientLogSource is

    the name of the topic to be written by the original log model

  1. dynamic rule: which topic the data filtered by this rule will be written to. The topic of
  2. the above example is the unique ID of the topic_id_bigger_than_300_and_main_page dynamic rule: the

  3. id that uniquely identifies a filter

rule

The schema for designing the dynamic rule configuration for the above requirements is as follows:

{
"id-numeric type string": { "
condition-filter": "1==1", "
targetTopic-target topic name": " tuzisir1" } "

1

": { "condition": "clientLogSource.getId()
> 300 && clientLogSource.getPage().equals(\"Home\")",
"targetTopic""topic_id_bigger_than_300_and_main_page" },

 "2": {


  "condition""clientLogSource.getPage(). equals(\"personal homepage\")",
"targetTopic": "topic_profile_page" }}

corresponds to the dynamic rule model design as follows:

public  class DynamicProducerRule implements Evaluable  {

Specific filter rules


private  String condition;

Write the topic


private String targetTopic;

rule filters compiled with janino


private Evaluable evaluable;

public void init(Long id)

{
try {
initialize the rule Class with janino  clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition);

            this.evaluable = clazz.newInstance();


        } catch (Exception e) {
            throw new RuntimeException(e);        }    }

    @Override


    public boolean eval(ClientLogSource clientLogSource) {
        return  this.evaluable.eval(clientLogSource);    }}

focuses on the Evaluable interface, which is inherited from the dynamically generated code and is the basic interface for executing filtering rules.

Code dynamic generation is described in more detail below.

public interface  Evaluable {

Dynamic rule interface filtering method


boolean  eval(ClientLogSource clientLogSource);

5.2.2.2. The zk-based dynamic configuration center uses zk as the dynamic configuration center

to dynamically listen for rule configuration and update rule pools.

public class ZkBasedConfigCenter { 

zk config Change listener


private  TreeCache treeCache;

zk client


private CuratorFramework zkClient;

    private ZkBasedConfigCenter() {


        try {            open();

        } catch (Exception e) {


            throw new RuntimeException(e);        }    }

    // !!! Rule pool !!! Rule pool !!! Rule pool


private ConcurrentMap map = new ConcurrentHashMap<>();

private void open() throws Exception {

Initialization rules Initialize


zk config listener

invokes private void update(String json) when there is a configuration change to update the rule }

public void close () {


        this.treeCache.close();
        this.zkClient.close();    }

    private void update(String json) {

        Map                result = getNewMap(json); 1. Add the new rule to the rule pool


2. Delete the offline rule from the rule pool }

private Map getNewMap(String json ) {


parse the new rule and initialize it with janino }}

can use a fixed-path configuration, such as the blogger using the path /kafka-config

7

5.2.3. The dynamic rule engine

currently uses Groovy, but bloggers often use flink sql, and the code generation in SQL is done using janino, so the performance difference between janino and groovy is compared, and the native class performance compiled by janino is close to the native class, yes About 4 times that of Groovy. Other engines are not considered, either poor ease of use or poor performance.

Notes: Performance is really important, and the 1:4 gap is a big difference. If your scenario is also a high-traffic and performance-intensive scenario, we recommend that you start Janino directly!!!

Let’s take a look at the specific benchmark case code:

 ClientLogSource is the original log 
boolean eval (flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) {
    return String.valueOf(clientLogSource.getId()).equals("1"); }

The

above code, executed locally on the blogger’s mac, is executed 5kw times each time, for a total of 5 executions The result is as follows

:

java: 847 ms janino: 745 
ms
groovy:4110 ms

java:1097 ms


janino:1170 ms
groovy:4052 ms

java:916 ms


janino:1117 ms
groovy:4311 ms

java:915  ms


janino:1112 ms
groovy:4382 ms

java:921 ms


janino:1104 ms groovy:4321 ms

repeated many times: java object: janino compile native class: groovy: almost always 1:1:4 time-consuming. So here we choose the better performance janino.

public class JaninoUtils {

    public static Class genCodeAndGetClazz(Long id, String topic, String condition) throws Exception {


Dynamically generate code
Initialize the class and return }}}5.2.4. Dynamic up and down Kafka topic Look at the second core point in the entry class

, which is

KafkaProducerCenter.

Its features include:

public class KafkaProducerCenter {

kafka producer pool


    private final ConcurrentMap> producerConcurrentMap
            = new ConcurrentHashMap<>(); private Producer > getProducer(String topicName) {

If there is a producer of the current topic in the kafka producer pool, If


not, initialize a new producer and return }

public void send(String topicName, String message)  {

        final ProducerRecord record = new ProducerRecord<>(topicName,


                "", message);
        try {            RecordMetadata metadata = getProducer(topicName).send(record).get();

        } catch (Exception e) {


            throw new RuntimeException(e);        } }

public void close() {


Close all producer connections }}

The above is all the code and logic implementation. In fact, the overall look is very simple.

6. Data Assurance – The framework’s assurance scheme

allocates independent queue resources for this task, and will configure a copy to be stored locally whenever this task is loaded into the latest configuration. When the configuration center is hung up, you can also directly load the local configuration of the machine, so that nothing can be output.

    > how to ensure that the user’s configuration is correct?
      approval before going online:

    • There are special buried management personnel for logic verification and
    • management of

    • automatic testing before going online: the correctness of the logic is automatically verified on the buried point management platform to ensure that the configuration from online to flink task is correct
    • AOP exception handling and alarm:

    • AOP exception handling in the environment and dumping abnormal data into a special exception topic also requires automatic verification of alarm information:
    • a

    • data accuracy verification mechanism is required for the final result

7. Summary and prospect This

article mainly reveals and implements the real-time dynamic processing engine of ByteDance’s tracking point data.

7.2. Outlook

    > This paper mainly realizes the dynamic of the flow splitting, and the output data and input data are exactly the same, but in many cases, only some of these fields are needed downstream. Therefore, you can also personalize the sink message field and message later. For example, you can add a dynamic Map logic to convert the ClientLogSource in the data source into any model that the user wants. For example, use Dynamic Message or use code generation to do it.
  1. At present, the filter conditions are completely Java syntax, and then it can be extended to SQL syntax, improve readability
  2. functions, RPC hot loading