Pratique De Réécriture De Watermark Dans Flink

Apache Flink est un puissant cadre de traitement de flux, capable de traiter des flux de données en temps réel. Lors du traitement de données en temps réel, un élément clé est le Watermark. Les Watermarks sont un type particulier de timestamp utilisé pour traiter les flux de données en temps d’événement afin de résoudre les problèmes d’événements désordonnés et de données retardées. Cependant, il peut parfois être nécessaire de personnaliser la génération de Watermark en fonction de la logique métier spécifique. Cet article explorera comment réécrire le Watermark dans Flink et fournira quelques conseils pratiques et exemples.

1 Qu’est-ce qu’un Watermark

Dans Flink, un Watermark est un marqueur de temps d’événement qui indique que les données avant ce timestamp ont été traitées. Flink utilise le Watermark pour déterminer quand déclencher les opérations de fenêtre basées sur le temps d’événement. Si le timestamp d’un événement est antérieur au Watermark actuel, cet événement est considéré comme “en retard” et peut être rejeté ou placé dans un flux de sortie latéral spécial.

2 Pourquoi réécrire

Dans les applications pratiques, le Watermark dans le flux de données d’origine peut ne pas répondre à nos besoins. Par exemple, nous pourrions avoir besoin d’ajuster la stratégie de génération de Watermark en fonction de la logique métier ou de traiter des situations particulières telles que des retards de données, des pannes de système, etc. Dans ce cas, nous devons personnaliser la logique de génération de Watermark.

Dans Flink, nous pouvons personnaliser la génération de Watermark en implémentant l’interface WatermarkStrategy. Généralement, il faut accomplir les 4 points suivants :

  1. Définir la stratégie de Watermark : Créer une classe héritant de WatermarkStrategy et implémenter les méthodes createTimestampAssigner et createWatermarkGenerator.

  2. Implémenter TimestampAssigner : Dans la méthode createTimestampAssigner, retourner une instance de TimestampAssigner qui est responsable de l’attribution d’un timestamp à chaque événement.

  3. Implémenter WatermarkGenerator : Dans la méthode createWatermarkGenerator, retourner une instance de WatermarkGenerator qui est responsable de la génération du Watermark.

  4. Appliquer la stratégie de Watermark : Lors de la création du flux de données (DataStream), utiliser WatermarkStrategy pour définir la stratégie de génération de Watermark.

4 Exemple : stratégie de génération de Watermark personnalisée

Supposons que nous ayons un flux de données de journaux contenant des timestamps d’activités des utilisateurs. Nous souhaitons ajuster dynamiquement le Watermark en fonction de la fréquence des activités des utilisateurs pour mieux gérer les données retardées. Voici un exemple simple :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class CustomWatermarkStrategy implements WatermarkStrategy<LogEvent> {
    @Override
    public TimestampAssigner<LogEvent> createTimestampAssigner(WatermarkGenerator<?> generator) {
        return new TimestampAssigner<LogEvent>() {
            @Override
            public long extractTimestamp(LogEvent event) {
                // Extraire le timestamp de l'événement
                return event.getTimestamp();
            }
        };
    }

    @Override
    public WatermarkGenerator<LogEvent> createWatermarkGenerator(TimestampAssigner<LogEvent> timestampAssigner) {
        return new WatermarkGenerator<LogEvent>() {
            private long lastTimestamp = Long.MIN_VALUE;

            @Override
            public void onEvent(LogEvent event, long eventTimestamp, WatermarkOutput output) {
                // Ajuster dynamiquement le Watermark en fonction du timestamp de l'événement
                long watermark = Math.max(lastTimestamp, eventTimestamp) - 10000; // Retard de 10 secondes
                output.emitWatermark(new Watermark(watermark));
                lastTimestamp = eventTimestamp;
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                // Émettre périodiquement le Watermark
                output.emitWatermark(new Watermark(lastTimestamp - 10000));
            }
        };
    }
}

Cet exemple crée une stratégie de Watermark personnalisée qui ajuste dynamiquement le Watermark en fonction du timestamp de l’événement. Ainsi, nous pouvons mieux gérer les données retardées sans perdre trop d’informations en raison d’une stratégie de Watermark trop stricte.

4.1 Adaptation au format de temps

Comment réécrire pour une information temporelle sous forme de chaîne comme “2022/10/22 10:34” ?

Si cette chaîne a déjà été analysée en un objet de temps Java, tel que java.util.Date ou java.time.Instant, elle peut être directement utilisée comme paramètre dans la fonction assignTimestampsAndWatermarks. Par exemple :

1
2
3
4
5
6
7
Copy codeDataStream<YourType> stream = ...;
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return element.getEventTime().getTime();
    }
});

Si la chaîne n’a pas été analysée en un objet de temps Java, elle doit d’abord être analysée en un objet de temps, puis utilisée dans la fonction assignTimestampsAndWatermarks. Vous pouvez utiliser la classe SimpleDateFormat pour analyser la chaîne de temps :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Copy codeDataStream<YourType> stream = ...;
SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm");
final Date eventTime = format.parse(timeString);

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<YourType>(Time.seconds(10)) {
    @Override
    public long extractTimestamp(YourType element) {
        return eventTime.getTime();
    }
});

5 Conclusion

La réécriture du Watermark dans Flink est une fonctionnalité avancée qui permet d’ajuster le comportement du traitement de flux en fonction des besoins métier spécifiques. En personnalisant la stratégie de Watermark, vous pouvez traiter les flux de données en temps réel de manière plus flexible, améliorant ainsi la précision et l’efficacité du traitement des données. Dans les applications pratiques, une utilisation judicieuse de cette opération peut améliorer la capacité de traitement des grandes données.

Buy me a coffee~
Tim AlipayAlipay
Tim PayPalPayPal
Tim WeChat PayWeChat Pay
0%