Practice of Rewriting Watermark in Flink

Apache Flink is a powerful streaming processing framework capable of handling real-time data streams. A key component in handling real-time data is the Watermark. Watermarks are special timestamps used for processing event-time stream data to address the problems of out-of-order events and late data. However, sometimes we may need to customize the generation of Watermarks based on specific business logic. This article will explore how to rewrite Watermarks in Flink and provide some practical tips and examples.

What is Watermark

In Flink, a Watermark is a marker of event time, indicating that data up until this timestamp has been processed. Flink uses Watermarks to determine when event-time-based window operations can be triggered. If an event’s timestamp precedes the current Watermark, then the event is considered “late” and might be discarded or placed into a special side output stream.

Why Rewrite

In practical applications, the original stream’s Watermarks may not meet our needs. For instance, we might need to adjust the generation strategy of Watermarks based on business logic, or handle special circumstances, such as data delays or system failures. At this point, we need to customize the logic for generating Watermarks.

In Flink, we can customize the generation of Watermarks by implementing the WatermarkStrategy interface. Generally, we need to do the following four things:

  1. Define Watermark Strategy: Create a class that extends WatermarkStrategy and implement the createTimestampAssigner and createWatermarkGenerator methods.

  2. Implement TimestampAssigner: In the createTimestampAssigner method, return a TimestampAssigner instance responsible for assigning timestamps to each event.

  3. Implement WatermarkGenerator: In the createWatermarkGenerator method, return a WatermarkGenerator instance responsible for generating Watermarks.

  4. Apply Watermark Strategy: When creating a DataStream, use WatermarkStrategy to set the strategy for generating Watermarks.

Example: Custom Watermark Generation Strategy

Suppose we have a log data stream containing timestamps of user activity. We wish to dynamically adjust Watermarks based on the frequency of user activity to better handle late data. Here is a simple example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CustomWatermarkStrategy implements WatermarkStrategy<LogEvent> {
    @Override
    public TimestampAssigner<LogEvent> createTimestampAssigner(WatermarkGenerator<?> generator) {
        return new TimestampAssigner<LogEvent>() {
            @Override
            public long extractTimestamp(LogEvent event) {
                // Extract timestamp from the event
                return event.getTimestamp();
            }
        };
    }

    @Override
    public WatermarkGenerator<LogEvent> createWatermarkGenerator(TimestampAssigner<LogEvent> timestampAssigner) {
        return new WatermarkGenerator<LogEvent>() {
            private long lastTimestamp = Long.MIN_VALUE;

            @Override
            public void onEvent(LogEvent event, long eventTimestamp, WatermarkOutput output) {
                // Dynamically adjust Watermark based on event timestamp
                long watermark = Math.max(lastTimestamp, eventTimestamp) - 10000;  // Delay of 10 seconds
                output.emitWatermark(new Watermark(watermark));
                lastTimestamp = eventTimestamp;
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // Periodically emit Watermark
                output.emitWatermark(new Watermark(lastTimestamp - 10000));
            }
        };
    }
}

This example creates a custom Watermark strategy that dynamically adjusts Watermarks based on the timestamp of events. This way, we can better handle late data without losing too much information due to overly strict Watermark strategies.

Transforming Time Format

For time information in the format of “2022/10/22 10:34”, how should we rewrite it?

If this string has been parsed into a Java time object, such as java.util.Date or java.time.Instant, it can be passed directly as a parameter in the assignTimestampsAndWatermarks function. For example:

1
2
3
4
5
6
7
Copy codeDataStream<YourType> stream = ...;
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return element.getEventTime().getTime();
    }
});

If the string has not been parsed into a Java time object, it needs to be parsed into one first, then used in the assignTimestampsAndWatermarks function. The SimpleDateFormat class can be used to parse the time string:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Copy codeDataStream<YourType> stream = ...;
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return eventTime.getTime();
    }
});

Summary

Rewriting Watermarks in Flink is an advanced feature that allows for the adjustment of stream processing behavior according to specific business requirements. By customizing the Watermark strategy, it is possible to handle real-time data streams more flexibly, enhancing the accuracy and efficiency of data processing. In practical applications, making appropriate use of this operation can improve the handling capabilities for big data.

Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%