【开发篇】一、处理函数:定时器与定时服务

news/2024/7/5 21:14:49 标签: java, flink, process

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

java">stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

java">public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...

}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
java">long currentProcessingTime();
  • 获取当前的水位线(事件时间)
java">long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
java">void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
java">void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
java">void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
java">void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

java">public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node01", 9527)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取
                );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次
                     * @param value  每条数据
                     * @param ctx 上下文对象
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        // 获取定时器服务对象
                        TimerService timerService = ctx.timerService();
                        // 数据中提取出来的事件时间
                        Long currentEventTime = ctx.timestamp(); 
                        //注册定时任务,水位线被推到5s时触发
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");

                        
                    /**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展,就是定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();
                        System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

java">public class KeyedProcessTimerDemo {

    public static void main(String[] args) throws Exception {
		
		//...重复代码略,同上
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // TODO Process:keyed
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();
                        
                        TimerService timerService = ctx.timerService();

						//当前数据的处理时间
                        long currentTs = timerService.currentProcessingTime();
						//定时器不用水位线为标杆,直接处理时间加5s
                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
                        
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");

                    }


         //...重复代码略,同上
}

运行:

在这里插入图片描述

processwatermark_209">4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

java">//...重复代码略,同上

@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     
     // 获取 process的 当前watermark
     long currentWatermark = timerService.currentWatermark();
     System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
 }


此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)


http://www.niftyadmin.cn/n/5124927.html

相关文章

C语言每日一题(20)最大公因数等于 K 的子数组数目

力扣 2447 最大公因数等于 K 的子数组数目 题目描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列。 数组的最大公因数 是能整除数组中所有元素的最大整数。 …

深圳大学计软《程序设计基础》课后实验二:选择结构

A. ASCII码排序&#xff08;循环&#xff09; 题目描述 输入三个字符后&#xff0c;按各字符的ASCII码从小到大的顺序输出这三个字符。 输入 输入多组数据&#xff0c;每组占一行&#xff0c;有三个字符组成&#xff0c;之间无空格 输出 对于每组输入数据&#xff0c;输出…

php实现防止Struts2框架漏洞中的远程命令执行攻击的代码

使用PHP过滤器 PHP提供了一些内置的过滤器&#xff0c;可以过滤掉一些恶意代码。可以使用以下代码将输入数据进行过滤&#xff1a; ​$input $_POST[input]; $safe_input filter_var($input, FILTER_SANITIZE_STRING); 使用escapeshellcmd()和escapeshellarg() 可以使用PHP…

OpenCV学习(一)——图像读取

1. 图像入门 读取图像显示图像写入图像 import cv2# 读取图像 img cv2.imread(lena.jpg) print(img.shape)# 显示图像 cv2.imshow(image, img) cv2.waitKey(0) cv2.destroyAllWindows()# 写入图像 cv2.imwrite(image.jpg, img)1.1 读取图像 读取图像cv.imread(filename, fl…

解锁娜扎副驾,年轻人的第一台车就选哪吒AYA

最近“娜扎哪吒”的CP组合可谓刷屏汽车圈,一个是高颜值、有气质的当红演员,一个是有实力、有能力的低调“理工男”,哪吒汽车和娜扎的携手,让不少朋友情不自禁地嗑起了CP,也开始更关注哪吒汽车的产品。 除了刚刚发布的“大气舒适新标杆”哪吒X,入手门槛低、颜值够高、智能体验远…

六零导航页SQL注入漏洞复现(CVE-2023-45951)

0x01 产品简介 LyLme Spage&#xff08;六零导航页&#xff09;是中国六零&#xff08;LyLme&#xff09;开源的一个导航页面。致力于简洁高效无广告的上网导航和搜索入口&#xff0c;支持后台添加链接、自定义搜索引擎&#xff0c;沉淀最具价值链接&#xff0c;全站无商业推广…

程序员们平时都喜欢逛什么论坛呢?

网站不在多&#xff0c;好用就行&#xff1b;技术不求精&#xff0c;好摸鱼就行。是时候祭出我收藏夹里的这15个网站了&#xff01; 求职必备&#xff1a;牛客网 https://www.nowcoder.com/ 年少不知牛客好&#xff0c;等到要面试的时候才发现是神器。 你可以在牛客上搜索到一…

web浏览器端实现语音转文字或文字转语音

web浏览器端语音转文字demo <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content=&q…