Practice of Rewriting Watermarks in Flink

Apache Flink is a powerful stream processing framework with the capability of handling real-time data streams. When processing real-time data, a key element is the Watermark. Watermarks are a special type of timestamp used for processing event-time stream data to address issues with out-of-order events and delayed data. However, sometimes we may need to customize the generation of Watermarks based on specific business logic. This article will explore how to rewrite Watermarks in Flink and provide some practical tips and examples.

1 What is a Watermark

In Flink, a Watermark is a marker for event time, indicating that data before this timestamp has been processed. Flink uses Watermarks to determine when event-time-based window operations can be triggered. If an event’s timestamp is earlier than the current Watermark, the event is considered “late” and may be discarded or placed into a special side output stream.

2 Why Rewriting is Needed

In practical applications, the Watermark in the original data stream may not meet our needs. For example, we may need to adjust the Watermark generation strategy based on business logic or handle special cases such as data delays or system failures. In such cases, we need to customize the logic for generating Watermarks.

In Flink, you can customize Watermark generation by implementing the WatermarkStrategy interface. Generally, you need to do the following four things:

  1. Define a Watermark Strategy: Create a class that extends WatermarkStrategy and implement the createTimestampAssigner and createWatermarkGenerator methods.

  2. Implement TimestampAssigner: In the createTimestampAssigner method, return a TimestampAssigner instance responsible for assigning timestamps to each event.

  3. Implement WatermarkGenerator: In the createWatermarkGenerator method, return a WatermarkGenerator instance responsible for generating Watermarks.

  4. Apply the Watermark Strategy: When creating a data stream (DataStream), use the WatermarkStrategy to set the Watermark generation strategy.

4 Example: Custom Watermark Generation Strategy

Suppose we have a log data stream containing timestamps of user activities. We want to dynamically adjust the Watermark based on user activity frequency to better handle late data. Here is a simple example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CustomWatermarkStrategy implements WatermarkStrategy<LogEvent> {
    @Override
    public TimestampAssigner<LogEvent> createTimestampAssigner(WatermarkGenerator<?> generator) {
        return new TimestampAssigner<LogEvent>() {
            @Override
            public long extractTimestamp(LogEvent event) {
                // Extract timestamp from the event
                return event.getTimestamp();
            }
        };
    }

    @Override
    public WatermarkGenerator<LogEvent> createWatermarkGenerator(TimestampAssigner<LogEvent> timestampAssigner) {
        return new WatermarkGenerator<LogEvent>() {
            private long lastTimestamp = Long.MIN_VALUE;

            @Override
            public void onEvent(LogEvent event, long eventTimestamp, WatermarkOutput output) {
                // Dynamically adjust Watermark based on event timestamp
                long watermark = Math.max(lastTimestamp, eventTimestamp) - 10000; // Delay by 10 seconds
                output.emitWatermark(new Watermark(watermark));
                lastTimestamp = eventTimestamp;
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // Periodically emit Watermark
                output.emitWatermark(new Watermark(lastTimestamp - 10000));
            }
        };
    }
}

In this example, a custom Watermark strategy is created that dynamically adjusts the Watermark based on the event’s timestamp. This way, we can better handle late data without losing too much information due to overly strict Watermark strategies.

4.1 Modification for Time Formats

How to rewrite for time information in the form of a string like “2022/10/22 10:34”?

If this string has been parsed into a Java time object, such as java.util.Date or java.time.Instant, it can be directly used as a parameter in the assignTimestampsAndWatermarks function. For example:

1
2
3
4
5
6
7
Copy codeDataStream<YourType> stream = ...;
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return element.getEventTime().getTime();
    }
});

If the string has not been parsed into a Java time object, it needs to be parsed into a time object first, and then used in the assignTimestampsAndWatermarks function. You can use the SimpleDateFormat class to parse the time string:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Copy codeDataStream<YourType> stream = ...;
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return eventTime.getTime();
    }
});

5 Conclusion

Rewriting Watermarks in Flink is an advanced feature that allows you to adjust stream processing behavior according to specific business needs. By customizing Watermark strategies, you can more flexibly handle real-time data streams and improve the accuracy and efficiency of data processing. In practical applications, proper use of this operation can enhance big data processing capabilities.

Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%