❝
Each article in this series starts from some practical cases, analyzes some problems often encountered in the production environment, and throws bricks and jade to help friends solve some practical problems. This article is the third article in the “live real-time data construction” series, mainly introducing the construction process of the production side indicators in the live broadcast room, if it is helpful to small partners, welcome to like + watch again~❞This article is the third article in the “live real-time data construction” series, mainly
introduced “Construction of production-side indicators”, such as the number of live broadcast rooms currently being live, or the number of anchors. Before introducing the construction process of production-side indicators, let’s review the “Architecture” diagram in the previous section.
The data link of the “production-side indicators” to be introduced in this article mainly corresponds to the following modules.
- data
-
source: read the logs of Kafka data sources such as live production, such as starting and shutting down; -
portrait dimension tables + flink to build production-side real-time indicators; -
Sink: Writes the processed metric data to Kafka.
Data processing: use production-side data sources + real-time
The “marked red” module in the figure is the module involved in the data link of the production side indicator. Labeled with another image.
The introduction of the
real-time portrait dimension table in the production side architecture of the live broadcast room has been introduced in the previous section, if you are interested, you can click the following link to jump to the previous section to read ~

Production Practices | Flink + Live (II) | How do I build a real-time public portrait table?
This subsection does not introduce in detail the construction process of all indicators involved in “construction of production-side indicators“, mainly with ” The number of live broadcast rooms that are currently being broadcast in the current minute” as a representative case of “production-side index construction”, introduces the entire construction process of this indicator. To restore the business process and technical solutions of production-side indicators for everyone.
Question
still starts with a few questions to introduce the construction process of “the number of live broadcast rooms that are currently on the air in the minute”.
-
How to build this indicator? What is the overall metric calculation process?
1. Talk about definitions?
The number of live broadcast rooms being broadcast in the current minute is defined as the number of live broadcast rooms that are being broadcast in the
current minute + the number of live broadcast rooms that are being broadcast in the current minute drilled down in a single-layer dimension.
For example
, the current
time point is 2020-11-11 12:42, and the number of live broadcast rooms in the current minute is 3000 (of which the IOS platform is 1500 and the Android platform is 1500 according to the platform dimension).
At 12:43, 200 live rooms are off (100 are IOS and 100 are Android), and 100 live rooms are running (all IOS), then the number of live rooms that are broadcasting in the current minute is 2900 (drilled down according to the platform dimension: 1500 for IOS platform and 1400 for Android platform).
Among them, 2020-11-11 12:42 of 3000 and
2020-11-11 12:43 of 2900 and according to the value drilled down by the platform are the number of live broadcast rooms that are being broadcast at the current time, which is the final output result.
Based on the above definition and analysis, the schema information of the data sources and data sinks involved in the whole process can be clarified.
Data Source SchemaData
source The schema is as follows.
Field | Notes |
---|---|
live_stream_id | Live room |
author_id | host ID |
start_or_end | start or close |
timestamp | timestamp |
…. | … |
According to the overall processing process and the final result, the data collection schema information is determined as follows.
The | |
---|---|
timestamp field summarizes to the minute granularity | |
metric_name | metric name, for example: the number of live broadcast rooms |
metric_value | 3000 (number of live broadcast rooms) |
platform, version dim_value dimension | |
values, example: IOS, 8.1 | |
… | … |
❝
Notes:
“
metric_name and metric_value”:
These two fields are for later design when the indicator is enriched. For example, if you need to add indicators such as the number of anchors and the duration of the broadcast, you don’t need to modify the data collection schema, you only need to add a metric_name, and you can use the original schema for data output.
“dim_name and dim_value“:
The currently constructed indicators only provide the ability to drill down in one dimension, so two fields of dim_name and dim_value are designed to meet the needs of users to view the current number of live broadcast rooms on the platform IOS or the current live broadcast room using the broadcast software version 8.1. If the subsequent business scenario requires multi-dimensional drill-down capabilities, you can expand it on the field. Alternatively, you can provide detailed data for multidimensional drill-down in OLAP.
❞
2.How to build?
For the number of live rooms that are running in the current minute, the
calculation method is very simple, which is the following mathematical formula:
“The number of live rooms that are being broadcast in the current minute” = “The number of live rooms that are
being broadcast in the previous minute” + “The number of live rooms that are on the air in the current minute” – “Number of live broadcast rooms off in the current minute”
At present, we have the calculation formula, so we can analyze the calculation and processing logic of the indicator in detail. And you can also get another information, for the calculation of the number of live broadcast rooms that are being broadcast in the current minute, it depends on the context information, that is, “the number of live broadcast rooms that are being broadcast in the previous minute”, which is also the “status”.
First, the indicator processing logic is introduced.
The overall processing logic of the indicator
processing logic from the acquisition to the data source to the output indicator is shown in the following figure.
I will not use words to repeat it here.
The module
marked “pink” in the data flow is the “status” in the task, that
is, the number of live broadcast rooms that are being broadcast in the current minute that has been stored in the task.
Since state involves state, here is my understanding of “
state“.
If there are errors, please point them out in the discussion at the end of the article and I will discuss them with you.
A state is
actually something that records contextual information, and if the current calculation process depends on the result of the last calculation, then the result of the last calculation is the state. To name a few 🌰;
-
“Stream processing“: As described in this section, the calculation of “the number of live rooms that are running in the current minute” depends on the number of live broadcast rooms that are running in the previous minute (“status“ ) to make calculations. Some friends may say, can you not rely on the last minute, can you calculate from scratch? The answer is yes, but from scratch, you also need to store all the historical data, which is actually the state, but we optimize it to the number of live broadcast rooms in the last minute.
-
Batch: Today’s full scale = yesterday’s full scale (“status“) + today’s delta table.
-
“Database storage”: the most common MySQL primary key auto-increment, unique key, etc. Why does a new data primary key increment? Because mysql stores the last value of the primary key (“state“). The reason why an error is reported due to the unique key when inserting the same data is because mysql stores the data (“status“) of all the unique key fields.
-
Life”: Current mobile phone battery = mobile phone battery in the last minute (“Status“) + (charging/power consumption). Why do you like your significant other more and more? Because how you feel about her = how you felt about her the last second (“state“) + the second she kissed you.
“
can be seen everywhere in life Even if you are not a programmer, I believe you can understand the concept of “state“.
An example
of how
to implement the metric calculation code example is as follows.
public class LiveStreamRealtimeMetricProdProcessorJob { public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream source = SourceFactory.getSourceDataStream(...); DataStream result = source .keyBy(new KeySelector() {
@Override
public Long getKey(SourceModel commonModel) throws Exception {
return commonModel.getLiveStreamId() % 1000 ; } }) .timeWindow(Time.seconds(60))
.process(new ProcessWindowFunction() { private ValueState playingLiveStreamNumberValueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.playingLiveStreamNumberValueState = getRuntimeContext().getState(...); } @Override
public void process(Long bucket, Context context, Iterable iterable,
Collector collector) throws Exception {
Long playingLiveStreamNumber = this .playingLiveStreamNumberValueState.value(); if (null == playingLiveStreamNumber) {
playingLiveStreamNumber = 0L ; } List sourceModels = (List) iterable; for (SourceModel sourceModel : sourceModels) {
if (BizType.I == sourceModel.getBizType()) { playingLiveStreamNumber++; } else {
playingLiveStreamNumber--; } } this.playingLiveStreamNumberValueState.update(playingLiveStreamNumber);
collector.collect( SinkModel.builder().build() ); } }); SinkFactory.setSinkDataStream(...); env.execute(); } @Data
@Builder
static class SourceModel {
Live room id
private Long liveStreamId;
Private
Long time;
anchor id
private Long authorId;
binlog timestamp
private long binlogTimestamp;
Start broadcasting, close broadcast
private BizType bizType; } enum BizType {
I, start
D, off ; } @Data
@Builder
static class SinkModel {
timestamp, summarized to minute granularity
private Long timestamp;
indicator name
private String metricName;
indicator value
private double metricValue;
dimension name
private String dimName;
dimension value
private String dimValue; }}
To summarize
this article, which continues from the above, it mainly introduces the “construction of production-side indicators” in the live broadcast room, with “the number of live broadcast rooms being broadcast in the current minute” as an example. Definitions and issues related to the construction process are proposed, and the following two subsections are introduced from these two questions.
The first section briefly describes the definition of the number of live rooms that are currently on the air in the current minute.
The second section mainly introduces the construction logic and process of the number of live broadcast rooms that are currently being broadcast in the minute, and expands the concept of “status”.
The last section summarizes this article.