Because business requirements are different, it is usually divided into two scenarios


  • offline computing scenario: T+1 is the main scenario to calculate historical data
  • Real-time computing scenario: real-time calculation of daily new data, deduplication of user tags

For offline computing scenarios, Hologres is based on RoaringBitmap, providing ultra-high cardinality UV calculation, only need to perform the most fine-grained pre-aggregation calculation, and only generate the most fine-grained pre-aggregation result table, which can achieve sub-second query. For details, see previous articles >> How Hologres Supports Ultra-High Cardinality UV Computation (based on RoaringBitmap)

For real-time computing scenarios, you can use the Flink+Hologres method and deduplicate user tags in real time based on RoaringBitmap. In this way, the user UV and PV data can be obtained in real time at a fine-grained level, and it is convenient to adjust the minimum statistical window (such as the last 5 minutes of UV) according to the needs to achieve a similar real-time monitoring effect and better BI display on the large screen. Compared with deduplication in days, weeks, months, etc., it is more suitable for fine-grained statistics on event dates, and through simple aggregation, statistical results of larger time units can also be obtained.

The main idea

of Flink converts streaming data into tables and dimension tables for JOIN operations, and then into streaming data. This can take advantage of the insertIfNotExists feature of the Hologres dimension table combined with auto-incrementing fields to achieve efficient uid mapping.

Flink processes the associated result data according to the time window, aggregates it according to the query dimension using RoaringBitmap, and stores the query dimension and the aggregated uid in the

aggregate result table, where the aggregated uid result is placed in Hologres’ RoaringBitmap type field.

When querying, similar to the offline mode, you can directly query the aggregate result table according to the query conditions, perform or operations on the key RoaringBitmap fields and count the cardinality to obtain the corresponding number of users.

The processing flow is shown in the following figure

Solution best practice


related basic tables 1

) Create a table uid_mapping as a uid mapping table to map uid to 32-bit int types.

The RoaringBitmap type requires that the user ID must be of type 32-bit int and the denser the better (i.e. the user ID is preferably consecutive). Many user IDs in common business systems or tracking points are string type or Long type, so you need to build a mapping table with uid_mapping types. The mapping table uses Hologres’ SERIAL type (self-increasing 32-bit int) to achieve automatic management and stable mapping of user mappings.

Since it is real-time data, set the table as a row-saving table to improve the QPS of Flink dimension table real-time JOIN.

BEGIN; CREATE TABLE public.uid_mapping (uid text NOT NULL,uid_int32 serial,PRIMARY KEY (uid)); --Set uid to clustering_key and distribution_key facilitate quick search of its corresponding int32 

value CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');

CALL set_table_property('public.uid_mapping''distribution_key''uid');
CALL set_table_property('public.uid_mapping''orientation''row'); COMMIT;

2) Create a table dws_app the basic aggregation table to store the aggregated results on the underlying dimension.

Before using RoaringBitmap, you need to

create a RoaringBitmap extention, and you also need the Hologres instance to be version 0.10


For better performance, we recommend that you set the number of Shards

based on the amount of data in the underlying aggregation table, but we recommend that the number of Shards of the basic aggregation table not exceed the number of cores of the computing resource. It is recommended to use the following method

to set the number of shards through Table Group –

create a Table Group with a number of 16,-- because the amount of test data is millions, of which the back-end computing resource is 100core, and the number of shards is set to 16BEGIN; CREATE TABLE tg16 (a int);                             --Table Group Sentinel 

Call set_table_property('tg16', 'shard_count', '16'); 


Compared to the offline result table, this result table adds a timestamp field to implement statistics in Flink window periods. The resulting table DDL is as follows:

BEGIN; create table dws_app( country text, prov text, city text, ymd text NOT NULL, --date field timetz TIMESTAMPTZ, --statistical timestamp, can achieve statistics in Flink window periods uid32_bitmap roaringbitmap, -- using roaringbitmap to record UV  primary key(country, prov, city, ymd, timetz)--query dimension and time as primary keys to prevent duplicate insertion of data); 

CALL set_table_property('public.dws_app''orientation''column');

--Date fields are set to clustering_key and event_time_column to facilitate filtering

CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');

CALL set_table_property('public.dws_app''event_time_column''ymd'); --equivalent to placing a table in a table group call set_table_property with shard number 16

('public.dws_app', 'colocate_with', 'tg16');

--group by field is set to distribution_key

CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');


2.Flink reads data in real time and updates dws_app basic aggregate table

complete example source code, see alibabacloud-hologres-connectors examples

1) Flink streams data sources (DataStream) and converts them into source tables

  Here the csv file is used as the data source, or it can be a DataStreamSource odsStream such as kafka = env.createInput(csvInput, typeInfo);// To join with the dimension table, you need to add the proctime field, See odsTable = tableEnv.fromDataStream( odsStream, $("


"), $("
$( "prov"), $("city"), $("ymd"), $("proctime

proctime());// Register to the catalog environment

tableEnv.createTemporaryView( "odsTable", odsTable);

2) Associate the source table with the Hologres dimension table (uid_mapping),

where the dimension table uses the insertIfNotExists parameter, that is, it is inserted by itself when the data cannot be queried, and the uid_int32 field can be created by using the serial type of Hologres.

 Create a Hologres dimension table, where nsertIfNotExists means that if the query is not available, it will insert itself into the String createUidMappingTable = String.format( "

create table uid_mapping_dim("

    + "  uid string,"
    + "  uid_int32 INT"
    + ") with ("
    + "  'connector'='hologres',"
    + "  'dbname' = '%s',"  Hologres DB name + " 'tablename' = '%s',"Hologres table name + " 'username' = '%s'," //current account access
id + " 'password' = '
s'," //current account access key
    + "  'endpoint' = '%s'," //Hologres endpoint
    + "  'insertifnotexists'='true'"
    + ")" ,    database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable);// source table and dimension table joinString odsJoinDim =

"SELECT, ods.prov,, ods.ymd, dim.uid_int32"

+ "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
    + "  ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3) Convert the association result into DataStream, process it through Flink time window, and aggregate

DataStream > in combination with RoaringBitmap  processedSource =
source // Filter the dimensions to be statistical(country, prov, city, ymd) .keyBy(0, 1, 2, 3) // Rolling time window; Here, because the input stream is simulated by reading csv, ProcessingTime is used, and EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) can be used in actual use) // trigger, and the aggregate result can be obtained before the window is not ended .trigger( ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( // aggregate function, based on the dimension filtered by key by, new aggregatefunction< tuple5 ,        RoaringBitmap,        RoaringBitmap>() {            @Override

            public RoaringBitmap createAccumulator() {

                return  new RoaringBitmap();            }            @Override            public RoaringBitmap add(

                Tuple5 in,

                RoaringBitmap acc) { // Add a 32-bit uid to RoaringBitmap for deduplication acc.add(in.f4);

                return acc;

            }            @Override            public RoaringBitmap getResult(RoaringBitmap acc) {

                return acc;

            }            @Override            public RoaringBitmap merge(                RoaringBitmap acc1, RoaringBitmap acc2) {

                return RoaringBitmap.or(acc1, acc2);

} }, //window function, output aggregate result new WindowFunction< RoaringBitmap, Tuple6, Tuple, TimeWindow> () {            @Override            public void apply(                Tuple keys,                TimeWindow timeWindow,                Iterable  iterable,                Collector<                Tuple6> out)                throws Exception {                RoaringBitmap result = iterable.iterator(). next();                optimizing RoaringBitmap result.runOptimize();                Convert RoaringBitmap to byte array to store in Holo byte[] byteArray = new byte[result.serializedSizeInBytes()];                result.serialize(ByteBuffer.wrap(byteArray));                where the Tuple6.f4(Timestamp) field represents the statistics in seconds out.collect( new Tuple6<>( keys.getField(0), keys.getField(1),                        keys.getField(2),                        keys.getField(3),                        new Timestamp(                            timeWindow.getEnd() / 1000 * 1000),                        byteArray));        }    });

4) Writing

to the result table

, it should be noted that the RoaringBitmap type in Hologres corresponds to the Byte array type in Flink

. The

calculation result is converted to table Table resTable = tableEnv.fromDataStream( processedSource, 


        $( "uid32_bitmap"));// Create a Hologres result table, where Hologres' RoaringBitmap type is stored in a String createHologresTable = String.format via a Byte array( "

create table sink("

+ "  country string,"
        + "  prov string,"
        + "  city string,"
        + "  ymd string,"
        + "  timetz timestamp,"
        + "  uid32_bitmap BYTES"
        + ") with ("
        + "  'connector'='hologres',"
        + "  'dbname' = '%s',"
        +  "  'tablename' = '%s',"
        + "  'username' = '%s',"
        + "  'password' = '%s',"
        + "  'endpoint' = '%s',"
        +  "  'connectionSize' = '%s',"
        + "  'mutatetype' = 'insertOrReplace'"
        + ")" ,    database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable);// Write the calculation result to the dws table

tableEnv.executeSql("insert into sink select * from" + resTable);

3. When querying

data, do aggregation

calculations according to the query dimension from the basic aggregation table (dws_app), query the bitmap cardinality, and obtain the number of users under group by conditions to


the uv

 of each city within a certain day - run the following RB_AGG operation query, The executable parameter first turns off the three-phase aggregation switch (off by default), and the performance 
is better hg_experimental_enable_force_three_stage_agg=off SELECT country,prov,city,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM    dws_app

WHERE   ymd = '20210329'

GROUP BY country         ,prov         ,city;

Query the uv

of each province within a certain period of time –

run the following RB_AGG operation query, and the executable parameter first turns off the three-stage aggregation switch (disabled by default), which has better performance
 hg_experimental_enable_force_three_stage_agg=off SELECT  country        ,prov        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uvFROM    dws_app

WHERE   time > '2021-04-19 18:00:00+08'  and time < '2021-04-19 19:00:00+08'

GROUP BY country         ,prov;

Link to this article: