Practice of Rewriting Watermarks in Flink
Apache Flink is a powerful stream processing framework with the capability of handling real-time data streams. When processing real-time data, a key element is the Watermark. Watermarks are a special type of timestamp used for processing event-time stream data to address issues with out-of-order events and delayed data. However, sometimes we may need to customize the generation of Watermarks based on specific business logic. This article will explore how to rewrite Watermarks in Flink and provide some practical tips and examples.
1 What is a Watermark
In Flink, a Watermark is a marker for event time, indicating that data before this timestamp has been processed. Flink uses Watermarks to determine when event-time-based window operations can be triggered. If an event’s timestamp is earlier than the current Watermark, the event is considered “late” and may be discarded or placed into a special side output stream.
2 Why Rewriting is Needed
In practical applications, the Watermark in the original data stream may not meet our needs. For example, we may need to adjust the Watermark generation strategy based on business logic or handle special cases such as data delays or system failures. In such cases, we need to customize the logic for generating Watermarks.
3 How to Rewrite Watermarks in Flink
In Flink, you can customize Watermark generation by implementing the WatermarkStrategy
interface. Generally, you need to do the following four things:
-
Define a Watermark Strategy: Create a class that extends
WatermarkStrategy
and implement thecreateTimestampAssigner
andcreateWatermarkGenerator
methods. -
Implement TimestampAssigner: In the
createTimestampAssigner
method, return aTimestampAssigner
instance responsible for assigning timestamps to each event. -
Implement WatermarkGenerator: In the
createWatermarkGenerator
method, return aWatermarkGenerator
instance responsible for generating Watermarks. -
Apply the Watermark Strategy: When creating a data stream (DataStream), use the
WatermarkStrategy
to set the Watermark generation strategy.
4 Example: Custom Watermark Generation Strategy
Suppose we have a log data stream containing timestamps of user activities. We want to dynamically adjust the Watermark based on user activity frequency to better handle late data. Here is a simple example:
|
|
In this example, a custom Watermark strategy is created that dynamically adjusts the Watermark based on the event’s timestamp. This way, we can better handle late data without losing too much information due to overly strict Watermark strategies.
4.1 Modification for Time Formats
How to rewrite for time information in the form of a string like “2022/10/22 10:34”?
If this string has been parsed into a Java time object, such as java.util.Date or java.time.Instant, it can be directly used as a parameter in the assignTimestampsAndWatermarks function. For example:
|
|
If the string has not been parsed into a Java time object, it needs to be parsed into a time object first, and then used in the assignTimestampsAndWatermarks function. You can use the SimpleDateFormat class to parse the time string:
|
|
5 Conclusion
Rewriting Watermarks in Flink is an advanced feature that allows you to adjust stream processing behavior according to specific business needs. By customizing Watermark strategies, you can more flexibly handle real-time data streams and improve the accuracy and efficiency of data processing. In practical applications, proper use of this operation can enhance big data processing capabilities.


