in this series in the previous two articles, we described how to implement flexible data stream partitioning based on dynamic update configuration (fraud detection rules), and how to use Flink’s Broadcast mechanism to distribute processing configuration among related operators at runtime.

Directly following where we last discussed end-to-end solutions, in this article, we’ll describe how to use Flink’s “Swiss Army Knife”, Process Function, to create a tailored implementation to meet your flow business logic needs. Our discussion will continue in the context of the fraud detection engine, and we will also demonstrate how to customize the windows provided by the DataStream API to implement your own needs in cases where the windows provided by the DataStream API do not meet your requirements. In particular, we will examine the trade-offs that can be made when designing solutions that require low-latency responses to individual events.

This article will describe some advanced concepts that can be applied independently, and it is recommended that you read first and

One of the common key requirements of fraud detection systems is short response times. The sooner fraud is detected, the more timely it will be stopped and the less negative the impact. This requirement is particularly acute in the financial sector, where any time used to evaluate a fraud detection system is the time it takes for a user to wait for a response. Speedy processing often becomes a competitive advantage between various payment systems, with time limits for generating alerts as low as 300-500 milliseconds. This is the limit for all delays from the moment the fraud detection system receives a financial transaction event until the downstream system issues an alert.

As you may know, Flink already provides powerful Window APIs that can be adapted to a wide range of scenarios. But if you look at all the window types supported by Flink, you will find that none of them fully meet the requirements of our scenario – low-latency calculation of each transaction. Flink’s built-in window does not have the semantics that can express “return x minutes/hours/days from the current event”. In the Window API, events fall into the window defined by the window allocator, but they themselves cannot control the creation and calculation of the window alone. As mentioned above, the goal of our fraud detection engine is to calculate previous relevant data as soon as a new event is received. In this scenario, it is not clear whether it is feasible to use the Window API that comes with Flink. The Window API provides some custom Trigger, Evictor, and Window Assigner that may help us get the results we want. However, this is often difficult to do, and furthermore, this approach does not provide access to the broadcast state, which is then necessary to implement dynamic configuration of business rules.

Let’s take a sliding window using Flink’s Window API as an example. Using a sliding window with a sliding step of S translates to the expected value of the evaluation delay equal to S/2. This means that you need to define a sliding window of 600~1000 ms to meet the low latency requirements of 300~500 ms latency. Flink stores a separate window state for each sliding window, which results in a very large job state, which is not feasible under any moderately high load. To meet our needs, we needed to create a custom low-latency window implementation, and fortunately, Flink gives us all the tools we need to do so, and ProcessFunction is a low-level but powerful class in the Flink API. It has a simple convention:

public class SomeProcessFunction extends KeyedProcessFunction <KeyTypeInputTypeOutputType{

 public void processElement(InputType event, Context ctx, Collector out){}

 public void onTimer(long timestamp, OnTimerContext ctx, Collector out) {}

 public void open(Configuration parameters) {}}

    >processElement(): Receives input data, you can react to each input by calling out.collect() to generate one or more output events for the next operator, You can pass data to the side output or ignore specific input data


  • entirely: Flink calls onTimer() when triggered by a previously registered timer, which supports event time and processing time
  • timers open(): Equivalent to a constructor called inside the TaskManager’s JVM for initialization, such as registering Flink management memory, which can initialize fields that are not serialized or cannot be passed from the JobManager JVM.

Most importantly, ProcessFunction also has access to fault-tolerant states handled by Flink. This combination, combined with Flink’s message processing capabilities and guarantees of low latency, makes it possible to build resilient event-driven applications with virtually arbitrarily complex business logic. This includes creating and working with custom windows with status.


clearance of status

To be able to work with the time window, we need to keep track of the data belonging to that window inside the program. To ensure that this data is fault-tolerant and recoverable in the event of a failure in a distributed system, we should store it in a state managed by Flink. We don’t need to keep all previous transaction data over time. According to the fraud detection sample rules, all transaction data older than 24 hours becomes irrelevant. We’re looking at a constantly moving data window where expired data needs to be constantly moved out of scope (in other words, purged from the state).

We will use MapState to store individual events for the window. To effectively clean up out-of-range events, we will use the event timestamp as the key of MapState.

In general, we have to take into account the fact that there may be different events with exactly the same timestamp, so we will be storing a collection instead of a single piece of data per key (timestamp).

MapState> windowState; 

Note ⚠️:

When using any Flink-managed state in KeyedProcessFunction, the data returned by the state.value() call is automatically scoped by the key of the currently processed event – see the figure below.

If you use MapState, the same principle applies, except that a Map is returned instead of a MyObject. If you are forced to do something like mapState.value().get(inputEvent.getKey()), you should probably use ValueState instead of MapState. Because we want to store multiple values for each event key, MapState is the right choice in our case.

As described in the first blog in this series, we will distribute data based on the key specified in the proactive fraud detection rules. Multiple different rules can be based on the same grouping key. This means that our alert function may receive a message by the same key (e.g. {payerId=25; beneficiaryId=12}), but is destined to be calculated according to different rules, which means that the length of the time window may be different. This raises the question of how we can best store fault-tolerant window state in KeyedProcessFunction. One approach is to create and manage separate MapStates for each rule. However, this approach would be wasteful – we would save the state of overlapping time windows separately, so duplicate data would be stored unnecessarily. A better approach is to always store just enough data to be able to estimate all currently active rules qualified by the same key. To achieve this, whenever a new rule is added, we will determine if its time window has a maximum span and store it in a broadcast state under a specially reserved WIDEST_RULE_KEY.

public void processBroadcastElement(Rule rule, Context ctx, Collector out){  ...  updateWidestWindowRule(rule, broadcastState); }

private void updateWidestWindowRule(Rule rule, BroadcastState broadcastState){

  Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);

  if (widestWindowRule == null) {

    broadcastState.put(WIDEST_RULE_KEY, rule);



  if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {

    broadcastState.put(WIDEST_RULE_KEY, rule);  }}}Now

let’s look in more detail at the implementation of the main method processElement().

In the previous blog post, we described how DynamicKeyFunction allows us to perform dynamic data partitioning based on the groupingKeyNames parameter in the rule definition. The description that follows revolves around the DynamicAlertFunction, which takes advantage of the rest of the rule settings.

As mentioned in previous parts of the blog series, our alert handler receives Keyed type of event, where Transaction is the primary “wrapper” event and String is the key
(payer #x – beneficiary #y), Integer is the ID of the rule that caused this event to be dispatched. This rule was previously stored in the broadcast state and must be retrieved from that state by ID. Here’s the implementation code:

public class DynamicAlertFunction
extends KeyedBroadcastProcessFunction<
        StringKeyed<TransactionStringInteger>, RuleAlert

  private transient MapState> windowState;


  public void processElement(
      Keyed value, ReadOnlyContext ctx, Collector out)

    // Add Transaction to state

    long  currentEventTime = value.getWrapped().getEventTime();                            // <--- (1)    addToStateValuesSet(windowState, currentEventTime, value.getWrapped());

    // Calculate the aggregate value

    Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());    // <--- (2)
    Long windowStartTimestampForEvent = rule.getWindowStartTimestampFor(currentEventTime); // <--- (3)

    SimpleAccumulator aggregator = RuleHelper.getAggregator(rule);            // <--- (4)

    for (Long stateEventTime : windowState.keys()) {
      if  (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {        aggregateValuesInState(stateEventTime, aggregator, rule);      }    }

    // Evaluate the rule and trigger an alert if violated

    BigDecimal aggregateResult = aggregator.getLocalValue();                              // <--- (5)
    boolean isRuleViolated = rule.apply(aggregateResult);
    if (isRuleViolated) {
      long decisionTime = System.currentTimeMillis();
      out.collect(new Alert<>(rule.getRuleId(),                              rule,                              value.getKey(),                              decisionTime,                              value.getWrapped(),                              aggregateResult));    }

    // Register timers to ensure state cleanup

    long cleanupTime = (currentEventTime / 1000) * 1000;                                  // <--- (6)    ctx.timerService().registerEventTimeTimer(cleanupTime);  Here

are the details of the steps


1) We start by adding each new event to our window state:

static   Set addToStateValuesSet( MapState> mapState, K key, V value)
      throws Exception 
{    Set valuesSet = mapState.get(key);

    if (valuesSet != null) {


    } else {

      valuesSet = new HashSet<>();      valuesSet.add(value);    }    mapState.put(key, valuesSet);

    return valuesSet;


2) Next, we retrieve the previously broadcast rule, according to which we need to calculate the incoming transaction data.

  1. getWindowStartTimestampFor determines how long the window should span a given the rule and the current event timestamp, and then calculates how long the window should span.

  2. Aggregate values are calculated by iterating over all window states and applying aggregate functions. It can be the average, maximum, minimum, or sum as in the example rule at the beginning of this article.

private boolean isStateValueInWindow(
    Long stateEventTime, Long windowStartForEvent, long currentEventTime)
  return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime; }

private void aggregateValuesInState(
    Long stateEventTime, SimpleAccumulator aggregator, Rule rule)
 throws Exception 

  Set inWindow = windowState.get(stateEventTime);

  for (Transaction event : inWindow) {

    BigDecimal aggregatedValue =        FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);    aggregator.add(aggregatedValue);  }}
  1. with the aggregate value, we can compare it to the threshold specified in the rule definition and alert if necessary.

  2. Finally, we register a cleanup timer using ctx.timerService().registerEventTimeTimer(). When it wants to move out of range, this timer will be responsible for deleting the current data.

  3. The onTimer method triggers cleanup of the window state.

As mentioned earlier, we always keep as many events as possible in the state to calculate the active rule with the widest window span. This means that during the cleanup process, we only need to remove the state outside this widest window.

This is how the cleaner is implemented:

public void onTimer(final long  timestamp, final OnTimerContext ctx, final Collector out)
    throws Exception 
{  Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);  Optional cleanupEventTimeWindow =      Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);  Optional cleanupEventTimeThreshold = -> timestamp - window);

  // Remove events that are older than (timestamp - widestWindowSpan)ms

  cleanupEventTimeThreshold.ifPresent(this:: evictOutOfScopeElementsFromWindow); }

private void evictOutOfScopeElementsFromWindow(Long threshold) {

  try {    Iterator  keys = windowState.keys().iterator();

    while (keys.hasNext()) {

      Long stateEventTime =;

      if (stateEventTime < threshold) {

        keys.remove();      }    }

  } catch (Exception ex) {

    throw new RuntimeException(ex);  }}The

above is a description of the implementation details. Our method triggers the calculation of the time window as soon as new trading data arrives. Thus, it meets our main requirement – low latency for alerting. For a complete implementation, see the project code on GitHub

What are the advantages and disadvantages of

refining and optimizing the methods described above?


  • low latency capability
  • with

  • potential use case-specific optimization for custom solutions
  • with

  • efficient state reuse (with the same key’s rule sharing state)


  • can’t take advantage of potential future optimizations in existing Window APIs
  • for

  • latency-free event handling Out-of-the-box
  • quadratic computation complexity and potentially large states
  • in the Window API

Now let’s look at the latter two drawbacks and see if we can fix them.

Delayed data before processing delayed data raises the question – does it still make sense to reevaluate the window even when the delayed data

arrives? If you need to do this, you need to increase the widest window size to allow for maximum data latency. This avoids triggering incomplete time window data due to latency data issues.

However, it can be argued that for scenarios that emphasize low-latency processing, this delayed triggering will be meaningless. In this case, we can track the latest timestamp we have observed so far, and for events that don’t monotonically increase this value, just add them to the state and skip the aggregate calculation and alert triggering logic.

Redundant repetition and state size

In the implementation we described, we save each piece of data in state and traverse them with each new data coming in and compute the aggregation again and again. This is clearly not optimal in terms of wasting computing resources on double counting.

What is the main reason for keeping each transaction data in state? The granularity of stored events directly corresponds to the precision calculated by the time window. Because we store each itemized transaction data, we can remove them precisely once they leave the precise 2592000000 millisecond time window (30 days in milliseconds). At this point, it’s worth asking the question – do we really need this millisecond accuracy when estimating such a long time window, or is it acceptable to potentially false positives in exceptional circumstances? If the answer to your use case is that such precision is not required, then you can implement additional optimizations based on binning and pre-aggregation. This optimization idea can be broken down as follows:

  • instead of storing each detailed transaction data, a parent class is created, which can contain the fields of a single piece of data or the aggregated value after processing a batch of data according to the aggregate function.

  • Instead of using timestamps in milliseconds as MapState keys, round them to a level of granularity you’re willing to accept (for example, one minute) to bucket your data.

  • Whenever the calculation window is calculated, new transaction data is stored into aggregate buckets instead of storing separate data points for each data.

Another question we can ask ourselves about state

data and serializers in order to further optimize the implementation is how likely it is to get different events with exactly the same timestamps.

In the described implementation, we show the > by in MapState One way to solve this problem in a dataset that stores each timestamp. However, the impact of this choice on performance may be greater than expected. The reason is that Flink does not currently provide a native Set serializer and instead forces the less efficient Kryo serializer (FLINK-16729). A meaningful alternative strategy is to assume that under normal circumstances, no two different events can have exactly the same timestamp and convert the window state to type MapState. You can use auxiliary outputs to collect and monitor any contingencies that contradict your assumptions. During performance tuning, I generally recommend that you disable Kryo and verify where your application can be further optimized by making sure to use a more efficient serializer.

You can quickly determine which serializer your class will use by setting breakpoints and validating the type of TypeInformation returned.

PojoTypeInfo indicates that an efficient Flink POJO serializer will be used.

GenericTypeInfo indicates that the Kryo serializer is used.

Transactional data pruning: Instead of storing the complete event data, we can reduce individual event data to only the fields that need to be used, reducing the additional strain on the machine by data serialization and deserialization. This may require extracting the required fields for individual events based on the configuration of the activity rule and storing these fields in a generic Map data structure.

While this adjustment may result in significant improvements over large objects, it shouldn’t be your first choice.


article summarizes the implementation description of the fraud detection engine that we started in Part 1. In this blog post, we demonstrate how to leverage ProcessFunction to “simulate” windows with complex custom logic. We’ve discussed the pros and cons of this approach and detailed how to apply custom scenario-specific optimizations – something that the Window API can’t implement directly.

The purpose of this blog post is to illustrate the power and flexibility of the Apache Flink API. At its core, it is the backbone of Flink, and as a developer, it saves you a lot of work and does a good job of generalizing to a wide range of use cases by providing:

  • efficient data exchange in distributed clusters

  • through

  • horizontal scalability through data partitioning

  • Fault-tolerant state with fast local access

  • facilitates handling of state data as easily as using local variables

  • Multithreaded, parallel execution engine. ProcessFunction code runs in a single thread and does not require synchronization. Flink handles all aspects of parallel execution and accesses shared state correctly, which you as a developer don’t have to think about (concurrency is hard).

All of these aspects make it possible to build applications with Flink that go far beyond ordinary streaming ETL use cases and enable arbitrarily complex distributed event-driven applications. With Flink, you can rethink your approach to a wide range of use cases that often rely on using stateless parallel execution nodes and “pushing” state-tolerant issues to the database, an approach that is often doomed to run into scalability issues in the face of growing data volumes.

The author of this article: zhisheng

This article address:

Original English address: