Pratique De Réécriture De Watermark Dans Flink
Apache Flink est un puissant cadre de traitement de flux, capable de traiter des flux de données en temps réel. Lors du traitement de données en temps réel, un élément clé est le Watermark. Les Watermarks sont un type particulier de timestamp utilisé pour traiter les flux de données en temps d’événement afin de résoudre les problèmes d’événements désordonnés et de données retardées. Cependant, il peut parfois être nécessaire de personnaliser la génération de Watermark en fonction de la logique métier spécifique. Cet article explorera comment réécrire le Watermark dans Flink et fournira quelques conseils pratiques et exemples.
1 Qu’est-ce qu’un Watermark
Dans Flink, un Watermark est un marqueur de temps d’événement qui indique que les données avant ce timestamp ont été traitées. Flink utilise le Watermark pour déterminer quand déclencher les opérations de fenêtre basées sur le temps d’événement. Si le timestamp d’un événement est antérieur au Watermark actuel, cet événement est considéré comme “en retard” et peut être rejeté ou placé dans un flux de sortie latéral spécial.
2 Pourquoi réécrire
Dans les applications pratiques, le Watermark dans le flux de données d’origine peut ne pas répondre à nos besoins. Par exemple, nous pourrions avoir besoin d’ajuster la stratégie de génération de Watermark en fonction de la logique métier ou de traiter des situations particulières telles que des retards de données, des pannes de système, etc. Dans ce cas, nous devons personnaliser la logique de génération de Watermark.
3 Comment réécrire le Watermark dans Flink
Dans Flink, nous pouvons personnaliser la génération de Watermark en implémentant l’interface WatermarkStrategy
. Généralement, il faut accomplir les 4 points suivants :
-
Définir la stratégie de Watermark : Créer une classe héritant de
WatermarkStrategy
et implémenter les méthodescreateTimestampAssigner
etcreateWatermarkGenerator
. -
Implémenter TimestampAssigner : Dans la méthode
createTimestampAssigner
, retourner une instance deTimestampAssigner
qui est responsable de l’attribution d’un timestamp à chaque événement. -
Implémenter WatermarkGenerator : Dans la méthode
createWatermarkGenerator
, retourner une instance deWatermarkGenerator
qui est responsable de la génération du Watermark. -
Appliquer la stratégie de Watermark : Lors de la création du flux de données (DataStream), utiliser
WatermarkStrategy
pour définir la stratégie de génération de Watermark.
4 Exemple : stratégie de génération de Watermark personnalisée
Supposons que nous ayons un flux de données de journaux contenant des timestamps d’activités des utilisateurs. Nous souhaitons ajuster dynamiquement le Watermark en fonction de la fréquence des activités des utilisateurs pour mieux gérer les données retardées. Voici un exemple simple :
|
|
Cet exemple crée une stratégie de Watermark personnalisée qui ajuste dynamiquement le Watermark en fonction du timestamp de l’événement. Ainsi, nous pouvons mieux gérer les données retardées sans perdre trop d’informations en raison d’une stratégie de Watermark trop stricte.
4.1 Adaptation au format de temps
Comment réécrire pour une information temporelle sous forme de chaîne comme “2022/10/22 10:34” ?
Si cette chaîne a déjà été analysée en un objet de temps Java, tel que java.util.Date ou java.time.Instant, elle peut être directement utilisée comme paramètre dans la fonction assignTimestampsAndWatermarks. Par exemple :
|
|
Si la chaîne n’a pas été analysée en un objet de temps Java, elle doit d’abord être analysée en un objet de temps, puis utilisée dans la fonction assignTimestampsAndWatermarks. Vous pouvez utiliser la classe SimpleDateFormat pour analyser la chaîne de temps :
|
|
5 Conclusion
La réécriture du Watermark dans Flink est une fonctionnalité avancée qui permet d’ajuster le comportement du traitement de flux en fonction des besoins métier spécifiques. En personnalisant la stratégie de Watermark, vous pouvez traiter les flux de données en temps réel de manière plus flexible, améliorant ainsi la précision et l’efficacité du traitement des données. Dans les applications pratiques, une utilisation judicieuse de cette opération peut améliorer la capacité de traitement des grandes données.