Flinkにおけるウォーターマーク書き換えの実践

Apache Flink は強力なストリーム処理フレームワークで、リアルタイムデータストリームを処理する能力を持っています。リアルタイムデータを処理する際に重要なのが Watermark です。Watermarks はイベント時間(event time)のストリームデータを処理するための特別なタイムスタンプで、乱序イベントや遅延データの問題を解決します。しかし、特定のビジネスロジックに基づいてWatermarkの生成をカスタマイズする必要がある場合があります。この記事では、FlinkでWatermarkをどのように再定義するかを探り、いくつかの実用的なテクニックと例を提供します。

1 Watermarkとは

FlinkにおけるWatermarkは、イベント時間のマーカーで、このタイムスタンプ以前のデータが処理済みであることを示します。FlinkはWatermarkを使用して、イベント時間に基づくウィンドウ操作をいつトリガーできるかを決定します。あるイベントのタイムスタンプが現在のWatermarkよりも早い場合、そのイベントは「遅延」と見なされ、破棄されるか、特別なサイド出力ストリームに入れられる可能性があります。

2 なぜ再定義が必要か

実際のアプリケーションでは、元のデータストリームのWatermarkが要求を満たさない場合があります。たとえば、ビジネスロジックに基づいてWatermarkの生成戦略を調整する必要がある場合や、データの遅延、システム障害などの特殊な状況を処理する必要がある場合です。このような場合、Watermarkの生成ロジックをカスタマイズする必要があります。

3 FlinkでWatermarkを再定義する方法

Flinkでは、WatermarkStrategy インターフェースを実装することでWatermarkの生成をカスタマイズできます。一般的に以下の4点を行う必要があります:

  1. Watermark戦略の定義WatermarkStrategy を継承したクラスを作成し、createTimestampAssignercreateWatermarkGenerator メソッドを実装します。

  2. TimestampAssignerの実装createTimestampAssigner メソッドで、各イベントにタイムスタンプを割り当てる TimestampAssigner インスタンスを返します。

  3. WatermarkGeneratorの実装createWatermarkGenerator メソッドで、Watermarkを生成する WatermarkGenerator インスタンスを返します。

  4. Watermark戦略の適用:データストリーム(DataStream)を作成する際に、WatermarkStrategy を使用してWatermarkの生成戦略を設定します。

4 例:カスタムWatermark生成戦略

ユーザー活動のタイムスタンプを含むログデータストリームがあると仮定します。ユーザー活動の頻度に基づいてWatermarkを動的に調整し、遅延データをより適切に処理したいとします。以下は簡単な例です:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CustomWatermarkStrategy implements WatermarkStrategy<LogEvent> {
    @Override
    public TimestampAssigner<LogEvent> createTimestampAssigner(WatermarkGenerator<?> generator) {
        return new TimestampAssigner<LogEvent>() {
            @Override
            public long extractTimestamp(LogEvent event) {
                // イベントからタイムスタンプを抽出
                return event.getTimestamp();
            }
        };
    }

    @Override
    public WatermarkGenerator<LogEvent> createWatermarkGenerator(TimestampAssigner<LogEvent> timestampAssigner) {
        return new WatermarkGenerator<LogEvent>() {
            private long lastTimestamp = Long.MIN_VALUE;

            @Override
            public void onEvent(LogEvent event, long eventTimestamp, WatermarkOutput output) {
                // イベントのタイムスタンプに基づいてWatermarkを動的に調整
                long watermark = Math.max(lastTimestamp, eventTimestamp) - 10000; // 10秒遅延
                output.emitWatermark(new Watermark(watermark));
                lastTimestamp = eventTimestamp;
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // 定期的にWatermarkを発射
                output.emitWatermark(new Watermark(lastTimestamp - 10000));
            }
        };
    }
}

この例では、イベントのタイムスタンプに基づいてWatermarkを動的に調整するカスタムWatermark戦略を作成しています。これにより、遅延データをより適切に処理し、厳しすぎるWatermark戦略によって情報を失うことを防ぎます。

4.1 時間フォーマットに対する改造

時間情報が"2022/10/22 10:34"のような文字列の場合、どのようにしてこの時間情報に対して再定義を行うのでしょうか?

この文字列がすでにJavaの時間オブジェクト(例えばjava.util.Dateやjava.time.Instant)に解析されている場合、assignTimestampsAndWatermarks関数内でそれをパラメータとして渡すことができます。例えば:

1
2
3
4
5
6
7
Copy codeDataStream<YourType> stream = ...;
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return element.getEventTime().getTime();
    }
});

文字列がJavaの時間オブジェクトに解析されていない場合、まずそれを時間オブジェクトに解析し、それからassignTimestampsAndWatermarks関数内で使用します。SimpleDateFormatクラスを使用して時間文字列を解析できます:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Copy codeDataStream<YourType> stream = ...;
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return eventTime.getTime();
    }
});

5 まとめ

FlinkでWatermarkを再定義することは高度な機能であり、具体的なビジネスニーズに応じてストリーム処理の動作を調整できます。カスタムWatermark戦略を通じて、リアルタイムデータストリームをより柔軟に処理し、データ処理の精度と効率を向上させることができます。実際のアプリケーションでは、この操作を合理的に使用することで、大規模データの処理能力を向上させることができます。

Buy me a coffee~
Tim 支付宝支付宝
Tim 贝宝贝宝
Tim 微信微信
0%