Baby, remember the volcano engine big data field that the blogger went to a few days ago, and the more interesting thing for everyone is the last lecture, the flink standardized cleaning and dismantling task of the byte one-stop burying platform.
Among them, everyone feels that the more streamlined thing is that the byte has been done:
-
without restarting the task can be online and offline with new dismantling and cleaning rules, and all rule changes do not need to involve the restart of the task. -
Cleaning UDF, RPC interface hot loading
is
generally a task that never stops, it is impossible to stop, okay, beiber.
★
Byte volcano engine PPT. Official account reply 20210724 get.
”
★
The blogger in this article mainly introduces the first point, that is, to dynamically change the rules, you can dynamically add a sink kafka topic, dynamically delete a sink kafka topic, and do not restart the task. I believe that it can throw bricks and lead jade and give you some inspiration.
This
article introduces the framework implementation in detail from the following chapters:
-
definition, goals – do the goals of this framework, what are the expected effects -
The Difficulties of the Construction of this Framework, the Industry’s Current Implementation -
Data Construction-Framework Specific Scheme DesignData -
Assurance Chapter-Framework Guarantee Program -
Summary and Outlook 2.Background
framework
Analysis of Difficulties –
of
– Why do you need such a frameworkFirst
, let’s take a look at the background of Byte’s doing this:
-
Reduce the chimney consumption of the original client logs and save resources -
Unified and standardized tracking platform: Users can use the correct data -
through the tracking platform to link with the tracking platform, unified, Standardized streaming data processing platform: Users can obtain the hierarchical guarantee ability of the desired unified and standardized data data through this platform: -
output of the log needs to be guaranteed by priority (death protection, best effort to protect…). Users can rest assured that the data
Dump log, the
is as follows:
class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyW0bicUNKs66vibRdKD8ia05KxPmh4pzs5vaELHedEY1Q1ic3YMYXgC546Aw/640?wx_fmt=png”>
thus gave birth to this framework.
3. Definition, Goal – What are the goals and expected effects
of this framework, there
are many above pain points, this section will solve the problem from the perspective of the most painful task restart delay, and reveal the implementation of the flink task with byte dynamic configuration.
The
expected effect is as follows:
1. That is, a dynamic rule, a sink kafka topic can
As shown on the left, modify the configuration, add a flow removal rule and the corresponding topic, the rule topic on the right begins to produce data, and the corresponding console consumer consumes the data of the composite rule. (GIFs can be slow to load).
As shown on the left, modify the configuration, delete a flow removal rule and the corresponding topic, the rule topic on the right will not produce data, and the corresponding console consumer will have no new data to consume. (GIFs can be slow to load).
9
4. Analysis of difficulties – the difficulties of this framework construction and the current implementation of the industry first take you to analyze what the most basic modules need to contain to implement this framework
:
- flink task:
-
itself is a map task, the logic is simple -
rule configuration: there must be a dynamic configuration center to tell flink task that needs a new online and offline Kafka topic -
Dynamic rule filtering engine: After the flink task detects a dynamic change in the rule, it must update the rule and apply the new rule rule. Need a dynamic code execution engine -
to dynamically go up and down Kafka topic: At present, most companies use flink’s own kafka-connector, once it involves adding a downstream, you need to add a Kafka producer operator, because it involves adding an additional operator, that must restart the task. The ability to dynamically add and remove producers is required.
dynamic online and offline
5. Data construction – framework specific scheme design
Let’s talk about the conclusion of scheme selection
:
-
flink entry task: Map model uses ProcessFunction -
: There are many open source configuration centers, and in order to achieve lightweight and simple implementation, zookeeper is used as the dynamic rule configuration center. Of course, if there is a lot of pressure on ZK, you can also use the broadcast configuration to implement it. -
Dynamic rule engine: There are many rule engines, such as the common JavaScript, Groovy, jython, mvel2, freemarker and so on, too many. Considering performance and ease of use, Janino is chosen to dynamically compile dynamic rules into classes. It is then used as a dynamic rule engine. The reasons for choosing Janino will be explained in more detail later. -
Dynamic online and offline Kafka topic: Remove flink-kafka-connector, directly use native kafka-clients to output data in ProcessFunction, and maintain a producer pool.
Dynamic online and offline rule configuration of the underlying operator
The overall scheme architecture diagram is shown in the figure:
class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyW4804CUdRnKlBx79O3vfj9OMibP6oyJGfM7Csciaauy9ctvLpTnice2GRg/640?wx_fmt=png”>
5.1.2. Expected effect
5.1.2.1. Online configuration
5.1.2.2. Offline configuration
class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_png/DODKOLcDkD0NmViaAwkaGcTLNpEQ4XCyWdcUNzYlyVRJeb25Iyjrufq7XMJfQfMaw9JoxicxMj2fCfSjE6LEHLicA/640?wx_fmt=png”>
5.2. The concrete implementation
of the entire task is very simple.
To run locally, you can refer to the following two articles to install zk and kafka.
-
zk:https://www.jianshu.com/p/5491d16e6abd -
kafka:https://www.jianshu.com/p/dd2578d47ff6
5.2.1.flink task entry logic
Let’s first look at the entry logic of the entire task, the function of ProcessFunction is simple:
- >
-
For each log data in -
long as the data meets the conditions of a rule, the log data is written out to the
the data source, as
topic corresponding to the rule env.addSource(new UserDefinedSource())
.process(new ProcessFunction() {
Dynamic Rule Configuration Center
private ZkBasedConfigCenter zkBasedConfigCenter;
kafka producer management center
private KafkaProducerCenter kafkaProducerCenter; @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
this.kafkaProducerCenter = KafkaProducerCenter.getInstance(); } @Override
public void processElement(ClientLogSource clientLogSource, Context context, Collector collector)
throws Exception { Iterate through all dynamic rules
this.zkBasedConfigCenter.getMap().forEach( new BiConsumer() {
@Override
public void accept (Long id, DynamicProducerRule dynamicProducerRule) { Verify that the
data conforms to the rule
if (dynamicProducerRule.eval(clientLogSource)) {
Send the data that matches the rule to the topic of the corresponding rule kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString()); } } }); } @Override
public void close() throws Exception {
super .close();
Close the rule pool
this.zkBasedConfigCenter.close();
Close the producer pool
this.kafkaProducerCenter.close(); } }); env.execute();
5.2.2. Dynamic online and offline rule configuration
to see the core point of flink ProcessFunction, the first part is ZkBasedConfigCenter. Its functions include:
- when the task starts, initialize and load the zk configuration, initialize the rule pool, compile the configuration rules in the rule pool
-
into class executable rules -
listen for zk configuration changes, add the new configuration to the rule pool, and delete the offline configuration from the rule pool
to
5.2.2.1. Dynamic rules Schema design
dynamic rules contain content that is closely related to user needs:
for example, users need to write client logs that report + ID > 300 users on the homepage topic_id_bigger_than_300_and_ main_page in the Kafka topic.
Then for this flink task, there are the following three user inputs
:
-
dynamic rule: which topic the data filtered by this rule will be written to. The topic of
-
id that uniquely identifies a filter
> 300 && clientLogSource.getPage().equals("Home")
; where clientLogSource isthe name of the topic to be written by the original log model
the above example is the unique ID of the topic_id_bigger_than_300_and_main_page dynamic rule: the
rule
The schema for designing the dynamic rule configuration for the above requirements is as follows:
{
"id-numeric type string": { "
condition-filter": "1==1", "
targetTopic-target topic name": " tuzisir1" } "1
": { "condition": "clientLogSource.getId()
> 300 && clientLogSource.getPage().equals(\"Home\")",
"targetTopic": "topic_id_bigger_than_300_and_main_page" }, "2": {
"condition": "clientLogSource.getPage(). equals(\"personal homepage\")",
"targetTopic": "topic_profile_page" }}
corresponds to the dynamic rule model design as follows:
public class DynamicProducerRule implements Evaluable { Specific filter rules
private String condition; Write the topic
private String targetTopic; rule filters compiled with janino
private Evaluable evaluable; public void init(Long id)
{
try {
initialize the rule Class with janino clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition); this.evaluable = clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e); } } @Override
public boolean eval(ClientLogSource clientLogSource) {
return this.evaluable.eval(clientLogSource); }}
focuses on the Evaluable interface, which is inherited from the dynamically generated code and is the basic interface for executing filtering rules.
Code dynamic generation is described in more detail below.
public interface Evaluable { Dynamic rule interface filtering method
boolean eval(ClientLogSource clientLogSource);
5.2.2.2. The zk-based dynamic configuration center uses zk as the dynamic configuration center
to dynamically listen for rule configuration and update rule pools.
public class ZkBasedConfigCenter { zk config Change listener
private TreeCache treeCache; zk client
private CuratorFramework zkClient; private ZkBasedConfigCenter() {
try { open(); } catch (Exception e) {
throw new RuntimeException(e); } } // !!! Rule pool !!! Rule pool !!! Rule pool
private ConcurrentMap map = new ConcurrentHashMap<>(); private void open() throws Exception {
Initialization rules Initialize
zk config listener
invokes private void update(String json) when there is a configuration change to update the rule } public void close () {
this.treeCache.close();
this.zkClient.close(); } private void update(String json) {
Map result = getNewMap(json); 1. Add the new rule to the rule pool
2. Delete the offline rule from the rule pool } private Map getNewMap(String json ) {
parse the new rule and initialize it with janino }}
can use a fixed-path configuration, such as the blogger using the path /kafka-config
5.2.3. The dynamic rule engine
currently uses Groovy, but bloggers often use flink sql, and the code generation in SQL is done using janino, so the performance difference between janino and groovy is compared, and the native class performance compiled by janino is close to the native class, yes About 4 times that of Groovy. Other engines are not considered, either poor ease of use or poor performance.
★
Notes: Performance is really important, and the 1:4 gap is a big difference. If your scenario is also a high-traffic and performance-intensive scenario, we recommend that you start Janino directly!!!
Let’s take a look at the specific benchmark case code:
ClientLogSource is the original log
boolean eval (flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) {
return String.valueOf(clientLogSource.getId()).equals("1"); }
The
above code, executed locally on the blogger’s mac, is executed 5kw times each time, for a total of 5 executions The result is as follows
:
java: 847 ms janino: 745
ms
groovy:4110 msjava:1097 ms
janino:1170 ms
groovy:4052 ms java:916 ms
janino:1117 ms
groovy:4311 msjava:915 ms
janino:1112 ms
groovy:4382 msjava:921 ms
janino:1104 ms groovy:4321 ms
repeated many times: java object: janino compile native class: groovy: almost always 1:1:4 time-consuming. So here we choose the better performance janino.
public class JaninoUtils { public static Class genCodeAndGetClazz(Long id, String topic, String condition) throws Exception {
Dynamically generate code
Initialize the class and return }}}5.2.4. Dynamic up and down Kafka topic Look at the second core point in the entry class
, which is
KafkaProducerCenter.
Its features include:
public class KafkaProducerCenter { kafka producer pool
private final ConcurrentMap> producerConcurrentMap
= new ConcurrentHashMap<>(); private Producer > getProducer(String topicName) { If there is a producer of the current topic in the kafka producer pool, If
not, initialize a new producer and return } public void send(String topicName, String message) {
final ProducerRecord record = new ProducerRecord<>(topicName,
"", message);
try { RecordMetadata metadata = getProducer(topicName).send(record).get(); } catch (Exception e) {
throw new RuntimeException(e); } } public void close() {
Close all producer connections }}
The above is all the code and logic implementation. In fact, the overall look is very simple.
6. Data Assurance – The framework’s assurance scheme
allocates independent queue resources for this task, and will configure a copy to be stored locally whenever this task is loaded into the latest configuration. When the configuration center is hung up, you can also directly load the local configuration of the machine, so that nothing can be output.
-
There are special buried management personnel for logic verification and -
automatic testing before going online: the correctness of the logic is automatically verified on the buried point management platform to ensure that the configuration from online to flink task is correct -
AOP exception handling in the environment and dumping abnormal data into a special exception topic also requires automatic verification of alarm information: -
data accuracy verification mechanism is required for the final result
- approval before going online:
management of
AOP exception handling and alarm:
a
7. Summary and prospect This
article mainly reveals and implements the real-time dynamic processing engine of ByteDance’s tracking point data.
7.2. Outlook
-
At present, the filter conditions are completely Java syntax, and then it can be extended to SQL syntax, improve readability -
functions, RPC hot loading