"); //-->
本文分享自天翼云开发者社区《Flink 与Flink可视化平台StreamPark教程(时间相关 1)》,作者:l****n
但在分布式系统中,这种驱动方式又会有一些问题。因为数据本身在处理转换的过程中会变化,如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果,那么下游的数据就会变少,时间进度的控制就不够精细了。
所以我们应该把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
水位线设置package cn.ctyun.demo.api.watermark;import cn.ctyun.demo.api.utils.TransformUtil;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;/** * @classname: ViewContentStreamWithWaterMark * @description: 拥有水位线 * @author: Liu Xinyuan * @create: 2023-04-14 09:50 **/public class ViewContentStreamWithWaterMark { public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){ // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> viewContentSouce = MySqlSource.<String>builder() .hostname("***") .port(3306) .username("***") .password("***") .databaseList("test_cdc_source") .tableList("test_cdc_source.user_view") .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("Asia/Shanghai") .build(); // 2.使用CDC Source从MySQL读取数据 DataStreamSource<String> mysqlDataStreamSource = env.fromSource( viewContentSouce, WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(1L)).withTimestampAssigner( new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String extractData, long l) { return JSONObject.parseObject(extractData).getLong("ts_ms"); } } ), "ViewContentStreamWithWatermark Source" ); // 3.转换为指定格式 return mysqlDataStreamSource.map(TransformUtil::formatResult); }}
我们在cdc传来的数据中获取他的日志自带更新时间戳字段ts_ms时间戳作为我们的事件时间,并生成水位线,此后此数据流将包含水位线进行后续地传递。
在窗口中,有着不同的设置,可以面对不同的场景。我们按照数据不同的分配规则,将窗口的具体实现分为了以下四种,如下所示:
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)
窗口函数MapReduce在这里,我们首先定义一个MapReduce过程,用来统计目前十秒内的访问统计数量,这里的水位线设定请参考代码ViewContentStreamWithWaterMark(上文中提供的代码)
package cn.ctyun.demo.api;import cn.ctyun.demo.api.watermark.ViewContentStreamWithWaterMark;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;/** * @classname: ApiTimeWindow * @description: 时间窗的使用 * @author: Liu Xinyuan * @create: 2023-04-17 20:39 **/public class ApiTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<JSONObject> viewContentDataStream = ViewContentStreamWithWaterMark.getViewContentDataStream(env); viewContentDataStream.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { // 不将删除的数据考虑在内 return !value.getString("op").equals("d"); } }).map(new MapFunction<JSONObject, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(JSONObject value) throws Exception { return Tuple2.of(value.getString("user_name"), 1L); } }).keyBy(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 设定一个累加规则 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }).print(); env.execute(); }}
这里设定了一个时间窗口为10秒,最终的结果为每十秒钟将统计一个登录统计,并输出到控制台。使用时间窗口后和不加的唯一区别是计算的范围变为了时间窗内计算。
*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。