当前位置: 首页 > news >正文

b2c网站类型著名的网络营销案例

b2c网站类型,著名的网络营销案例,自己怎么做免费网站空间,做网站找哪家好?聚禄鼎科技是一家给企业做网站的公司一 、 AllowedLateness API 延时关闭窗口 AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s&…

一 、 AllowedLateness API 延时关闭窗口

AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。

二 、 OutputTag API 侧输出流

使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。

       context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);

如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。

完全与预期一致。

在这里插入图片描述

完整代码:

public class WindowOutputTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {@Overridepublic void run(SourceContext<Event> context) throws Exception {context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);}@Overridepublic void cancel() {}});//operatorSingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {};WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).allowedLateness(Time.of(2, TimeUnit.SECONDS)).sideOutputLateData(eventOutputTag);SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {@Overridepublic void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());}});windowAgg.print("窗口数据 ");//获取测输出流中的延时数据DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);sideOutput.print("测输出流:-> ");env.execute();}}
http://www.15wanjia.com/news/29007.html

相关文章:

  • 蛋糕店的网站建设咋写关键词优化推广排名多少钱
  • 莘县做网站推广网络媒体推广产品
  • 做齐鲁油官方网站微信营销的方法和技巧
  • 怎做不下网站刷枪青岛seo网站关键词优化
  • 湖南网站建设营销网站建设专家
  • 做汽车配件出口用什么网站好些seo咨询岳阳
  • 蒙自市建设局网站东莞疫情最新消息今天中高风险区
  • 做徽标哪个网站素材多免费b2b信息发布网站
  • iis 网站模板下载seo点击优化
  • 七星彩的网站怎么做的最新腾讯新闻
  • 怎么仿制一个网站长沙官网seo收费标准
  • wordpress 日期调用安徽新站优化
  • 小说网站怎么建设找谁做百度关键词排名
  • 做网站的公司找客户关键词快速排名seo怎么优化
  • 广告联盟没有网站怎么做品牌广告投放
  • 恶搞网站怎么做优化公司哪家好
  • 百度网站安全在线检测手机百度app免费下载
  • 热e国产-网站正在建设中-手机版seo网站搜索优化
  • 个人建网站需要多少钱新闻 最新消息
  • 阜新网站开发公司新站seo外包
  • 网络推广客服是做什么的站长工具seo综合查询推广
  • 美国政府网站建设淘宝关键词热度查询工具
  • 唯美网站建设百度快速收录入口
  • 怎么自己做模板网站站长工具排名查询
  • 重庆网站建设找重庆万为百度云网页版入口
  • 珠海建设工程交易中心网站站长网站查询
  • 做图模板网站有哪些发布广告的平台免费
  • 做网站树立品牌形象seo与sem的区别
  • 主视觉设计网站全国疫情最新公布
  • 雄县做网站seo营销名词解释