当前位置: 首页 > news >正文

自己的网站可以做淘客吗深圳网络营销怎么推广

自己的网站可以做淘客吗,深圳网络营销怎么推广,wordpress mobile app,做个什么样的网站目录 时间语义 水位线(Watermarks) 并行流中的水位线 窗口 滚动窗口—Tumbling Windows 滑动窗口—Sliding Windows 会话窗口—Session Windows 全局窗口—Global Windows 例子 时间语义 如图所示,由事件生成器(Event Pr…

目录

时间语义

水位线(Watermarks) 

并行流中的水位线

窗口

滚动窗口—Tumbling Windows

滑动窗口—Sliding Windows

会话窗口—Session Windows

 全局窗口—Global Windows

例子


时间语义

        如图所示,由事件生成器(Event Producer)生成事件,生成的事件数据被收集起来,首先进入分布式消息队列(Message Queue),然后被 Flink 系统中的 Source 算子(Data Source)读取消费,进而向下游的窗口算子(Window Operator)传递,最终由窗口算子进行计算处理。

​ 有两个非常重要的时间点:

(1)一个是数据产生的时刻,我们把它叫作“事件时间”(Event Time);

(2)另一个是数据真正被Flink处理的时刻,叫作“处理时间”(Processing Time)。

我们所定义的窗口操作,到底是以哪种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中存在网络传输延迟和时钟漂移,事件的处理时间相对发生时间会有所滞后。

事件时间(Event Time): 指每个事件在其产生设备上发生的时间。

处理时间(Processing Time): 是指对事件执行相应操作的机器的系统时间。

        当流程序(a streaming program)基于处理时间(Processing Time)运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作的机器的系统时钟。每小时处理时间窗口(An hourly processing time window)将包括在系统时钟指示整小时之间到达特定操作的所有记录。例如,如果应用程序在上午9:15开始运行,那么第一个小时处理时间窗口,也就是[9,10),将包括上午9:15到上午10:00之间处理的事件,下一个窗口,也就是[10,11)将包括上午10:00到上午11:00之间处理的事件,依此类推。无论事件是以有序的状态进入Flink事件窗口,还是无序的状态进入Flink事件窗口,窗口都将按照处理时间对已经到达的事件进行计算。

        基于处理时间来处理数据的方式是有时间标准的,这个时间标准就是运行相应操作的机器的系统时钟。到什么时间就做什么事。

Processing time 提供了最佳的性能和最低的延迟,但是不能提供确定性,即计算结果是不确定的。例如,时间窗口为5min的求和统计,应用程序在 9:00 开始运行,则第一个时间窗口处理 [9:00, 9:05) 的事件,下一个窗口处理 [9:05, 9:10) 的事件,依此类推。通信延迟、作业故障重启等问题,可能导致窗口的计算结果是不一样的。如下图所示,假设事件(事件时间, 数值) 遇到上述问题,场景一:事件B(9:03,2)有网络延迟落在[9:10, 9:15),场景二:作业故障重启导致事件B(9:03,2)和事件C(9:06,3)落在[9:10, 9:15)。都会导致求和的结果不正确。

        Processing Time是指数据被Operator处理时当前所在主机的系统时间。当用户选择使用Processing Time时,在Flink中所有和时间相关的操作都会按照当前系统时间进行处理,例如:Window窗口划分。使用这种语义时Flink中处理数据延迟较低、处理性能高,无论进入到Flink中的源头数据是否有乱序,只要被Flink应用接收的数据都会按照当前数据处理时的系统时间赋值时间语义,可见这种语义虽然处理数据性能高但不能解决数据乱序和延迟问题,从而导致数据统计不精准。Processing Time适合计算精度要求不高的计算场景。

        既然使用Processing Time时间语义,计算的结果的正确性无法得到保证,那么使用Event Time语义呢?

        Event Time是每个事件在其生成设备上发生的时间,这个时间往往是嵌入在事件记录中,例如一条数据中的时间戳记录了该事件数据的产生时间,该时间与下游Flink处理时系统时间无关。如果每个事件包含事件时间,当事件经过网络传输流转到Flink中处理时,理论上来说,先产生的事件会比后产生的事件先到达Flink系统中被处理,但实际情况往往由于网络传输延迟导致早先产生的事件后到达Flink系统被处理的情况(数据延迟到达),这就出现了数据乱序。但基于Event Time的时间概念,我们可以让Flink进行数据处理时基于事件产生的时间处理,这样就可以还原事件的先后关系,保证数据处理的准确性。Event Time 时间语义在实际生产环境中使用较多,该时间语义能保证乱序数据处理的准确性。

        当Flink应用使用Event Time作为时间的衡量标准。窗口计算什么时候触发计算呢?假设事件是有序到达窗口的,窗口每隔1个小时计算一次,当Flink应用程序接收到的第一个事件的事件时间为9:15,程序则认为现在的时间是9:15,可能机器的系统时间已经为10:00,但这并不重要,因为Flink程序已经使用事件时间语义。此时[9,10)窗口不会触发计算,Flink应用继续接收事件,当接收到事件时间为10:00或者之后的事件,则Flink应用程序认为现在时间已经到了10:00或者超过10:00点了,应该触发[9,10)窗口计算。此时就可以使用事件时间作为时间标记来触发窗口计算。

        那当事件因为延迟等原因无序到达窗口呢?比如此时10:00的事件已经到达窗口,是否要触发[9,10)窗口计算呢?还不可以,因为此时可能还有10:00之前的事件尚未到达,例如9:15的事件。那么Flink应用就需要“等一等”,但是这里我们又不能无限期的等待下去,所以这里需要有一个时间标记来决定何时触发窗口,就是要告诉Flink程序要等多久,这个时间标记就是Watermark。所以Watermark是基于Event Time语义给出的。

        其实除了上面两个时间语义,还有一个时间语义叫Ingestion Time,它指的是事件进入Flink的时间。


水位线(Watermarks) 

        Flink中测量Event Time进展的机制是水位线(Watermarks)。Watermarks作为数据流的一部分,并携带时间戳t。实际上Watermarks的本质就是一个时间戳t,它度量了Event Time到底进展到什么时候了。Watermarks(t)表明事件时间在该流中已经达到时间t,这意味着后面的流中不应该再有时间戳小于等于t的事件。

        下图显示了有序事件流。图中每个方块表示一个事件,方块中的数字表示事件时间,在本例中,事件是按顺序排列的(相对于它们的时间戳),这意味着Watermark只是流中的周期性标记。所谓周期性标记指的是Flink每隔200ms计算一次Watermark。

图中有两个Watermak,w(11)和w(20),它们分别表示事件时间已经达到11和20。事实上,对于有序流,Watermark可以使用事件的事件时间即可。在流中周期性的标记即可。

        Watermark对于乱序流( out-of-order streams)是至关重要的,如下图所示,其中事件不是按事件时间戳排序的。一般来说,Watermark(t)是一种声明,声明到流中的那个点,在时间戳t之前的所有事件都应该已经到达。例如w(11)声明到流中的这个位置,时间已经是11了。一旦Watermark到达一个算子,算子可以将其内部事件时间时钟的时间调整到Watermark的值。

Watermark是如何计算的呢?还记得之前说的“等一等”吗?“等一等”其实是事件到达Flink延迟的一个时间。例如上图中,当事件到达后,再等待4s。

w= max(事件时间)-延迟的时间

例如上图中,当事件7到达时,w=max(7)-4=3,

当事件11到达时,w=max(7,11)-4=7,

当事件15到达时,w=max(7,11,15)-4=11,

当事件9到达时,w=max(7,11,15,9)-4=11,

当事件12到达时,w=max(7,11,15,12)-4=11,

此时到了Watermarks每隔200ms计算一次,此时时间已经走到了w(11)。这是Flink认为小于等于11的事件已经全部到达。

如果是基于事件时间语义的有序流,Watermask计算时延迟的时间为0,即w=max(事件时间)。


并行流中的水位线

         水位线是在源函数(source functions)处或直接在源函数之后生成的。源函数的每个并行子任务通常独立地生成其水位线。这些水位线定义了特定并行源处的事件时间。

        当水位线在流处理项目中流动时,它们会提前到达算子处的事件时间。每当一个算子将其事件时间向前推进时,它就会在其后续算子的下游生成一个新的水位线。

        一些算子(Operation)使用多个输入流;例如,联合,或keyBy(…)或partition(…)函数后面的算子。这样,一个算子的当前事件时间是其输入流事件时间的最小值。当它的输入流更新它们的事件时间时,算子也会更新。

        下图显示了在并行流中流动的事件和水位线的示例,以及跟踪事件时间的操作。

图中绘制了并行度为2的流处理。流处理有source、map和window操作。操作右上角黄色方块中数字表示该算子的事件时间。 

例如图中window算子的事件时间为14,原因是map(1)算子的事件时间为29,同时map(2)算子的事件时间为14,两者同时向下游window算子推进,到当前window算子时,window算子取两者事件时间最小值作为当前算子的事件时间,所以window(1)和window(2)两个并行的算子的事件时间为14。


窗口

        聚合事件(例如计数、求和)在流处理中的工作方式与批处理中的不同。例如,计算流中的所有元素是不可能的,因为流通常是无限的。相反,流上的聚合(计数,总和等)是由窗口限定的,例如“过去5分钟的计数”或“最近100个元素的总和”。

        Windows可以是时间驱动的(例如:每30秒)或数据驱动的(例如:每100个元素)。通常可以区分不同类型的窗口,例如时间驱动窗口分为滚动窗口(没有重叠)、滑动窗口(有重叠)、会话窗口(被不活动的间隙打断)和全局窗口。数据驱动窗口有计数窗口

 图中黄色箭头表示事件流方向,每隔灰色方块表示一个事件。Time windows表示时间驱动的窗口,每隔多长时间统计一次,Count(3) windows表示数据驱动的窗口,每3个事件统计一次。

滚动窗口—Tumbling Windows

        滚动窗口赋值器将每个元素赋给指定窗口大小的窗口。滚动窗口有固定的大小,不重叠。例如,如果指定大小为5分钟的滚动窗口,则将评估当前窗口,并每五分钟启动一个新窗口,如下图所示。

滑动窗口—Sliding Windows

        滑动窗口赋值器将元素赋给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制滑动窗口启动的频率。因此,如果滑动块小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。

        例如,您可以设置大小为10分钟的窗口,每隔5分钟滑动一次。这样,每隔5分钟就会出现一个窗口,其中包含最近10分钟内到达的事件,如下图所示。

当window slide<window size时,滑动后的窗口与滑动前的窗口之间会有重叠部分,例如图中window 1和window 2之间有重叠;

当window slide>=window size,滑动后的窗口与滑动前的窗口之间可能还会有间隙部分。

会话窗口—Session Windows

        会话窗口分配器按活动的会话对元素进行分组。会话窗口不重叠,也没有固定的开始和结束时间,这与滚动窗口和滑动窗口不同。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口关闭。会话窗口分配器可以配置一个静态会话间隙,也可以配置一个会话间隙提取器函数,该函数定义了不活动的时间长度。当此期限到期时,当前会话关闭,并将后续元素分配给新的会话窗口。

 全局窗口—Global Windows

        全局窗口赋值器将具有相同键的所有元素赋给同一个全局窗口。此窗口方案仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有自然的结束。


例子

        这里以单词统计为例来说明水位线生成以及窗口使用,本例的需求每隔5s中,每个单词的数量,代码如下:

package com.leboop;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import javax.annotation.Nullable;
import javax.sound.midi.Soundbank;/*** Description TODO.* Date 2024/7/24 11:10** @author leb* @version 2.0*/
public class OrderStreamWatermarkDemo {public static void main(String[] args) throws Exception {// 1. 获取流执行环境.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();System.out.println("默认并行度parallelism=" + env.getParallelism());env.setParallelism(1); // 为了方便测试,这里并行度设置为1,默认并行度为8.// 2. 获取默认的时间语义. 本api对应的flink版本1.9.3,默认时间语义为:ProcessingTimeTimeCharacteristic streamTimeCharacteristic = env.getStreamTimeCharacteristic();System.out.println("默认时间语义streamTimeCharacteristic=" + streamTimeCharacteristic);long autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();System.out.println("默认水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);// 3. 设置时间语义为事件时间.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();System.out.println("默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);// 设置水位线生成时间间隔.env.getConfig().setAutoWatermarkInterval(200);// 4. 监听socket数据源,每行输入格式:单词,事件时间,例如:hello,1000.final DataStreamSource<String> sourceDS = env.socketTextStream("bigdata111", 9999);// 5. 将socket数据源流转换成Word对象流SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {public Word map(String s) throws Exception {String[] wordArr = s.split(",");return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);}});// 6. 设置watermark.SingleOutputStreamOperator<Word> watermarkDS = wordDs.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Word>() {private Long lateTime = 5 * 1000L;private Long maxEventTime = 0L;@Nullablepublic Watermark getCurrentWatermark() {// 生成Watermark.Watermark watermark = new Watermark(maxEventTime - lateTime);
//                System.out.println("当前水位线watermark=" + watermark.getTimestamp());return watermark;}public long extractTimestamp(Word word, long previousElementTimestamp) {// 抽取事件时间.long eventTime = word.getTime() * 1000;// 计算最大事件时间.maxEventTime = Math.max(maxEventTime, eventTime);// 此处代码仅仅是为了打印当前word的水位线.Watermark eventWatermark = new Watermark(maxEventTime - lateTime);word.setWatermark(eventWatermark); // 当前word的水位线.System.out.println("抽取" + word);return eventTime;}});// 7. 将word对象流转换成key流.KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {public String getKey(Word word) throws Exception {return word.getWord();}});// 设置window计算:滚动窗口每隔5s计算一次.WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));// 窗口中事件统计.SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");// 输出窗口中事件.wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {System.out.println("窗口window=" + window + ",窗口中数据input=" + input);}});// 8.打印结果并执行.wordCountResult.print();env.execute();}}

 下面是Word实体类代码:

package com.leboop;import org.apache.flink.streaming.api.watermark.Watermark;/*** Description TODO.* Date 2024/7/24 13:05** @author leb* @version 2.0*/
public class Word {/*** 单词.*/private String word;/*** 单词产生的事件时间.*/private Long time;/*** 统计单词个数.*/private Integer count;private Watermark watermark;public Word() {}public Word(String word, Long time, Integer count) {this.word = word;this.time = time;this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public Long getTime() {return time;}public void setTime(Long time) {this.time = time;}public Integer getCount() {return count;}public void setCount(Integer count) {this.count = count;}public Watermark getWatermark() {return watermark;}public void setWatermark(Watermark watermark) {this.watermark = watermark;}@Overridepublic String toString() {return "Word{" +"word='" + word + '\'' +", time=" + time +", count=" + count +", watermark=" + watermark.getTimestamp() +'}';}
}

代码中有几点需要注意的:

(1)代码的并行度最好设置为1,默认并行度为8,这样后面通过socket输入的单词都进去该并行度中进行计算,加快触发,否则并行度太多,需要在socket中输入更多的单词,才能触发。

env.setParallelism(1);

(2)本文使用的是flink 1.9.3的api,该版本默认时间语义是ProcessingTime,后面flink新版本默认的时间语义是EventTime,查看TimeCharacteristic源码时间语义总共有三种:

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api;import org.apache.flink.annotation.PublicEvolving;/*** The time characteristic defines how the system determines time for time-dependent* order and operations that depend on time (such as time windows).*/
@PublicEvolving
public enum TimeCharacteristic {/*** Processing time for operators means that the operator uses the system clock of the machine* to determine the current time of the data stream. Processing-time windows trigger based* on wall-clock time and include whatever elements happen to have arrived at the operator at* that point in time.** <p>Using processing time for window operations results in general in quite non-deterministic* results, because the contents of the windows depends on the speed in which elements arrive.* It is, however, the cheapest method of forming windows and the method that introduces the* least latency.*/ProcessingTime,/*** Ingestion time means that the time of each individual element in the stream is determined* when the element enters the Flink streaming data flow. Operations like windows group the* elements based on that time, meaning that processing speed within the streaming dataflow* does not affect windowing, but only the speed at which sources receive elements.** <p>Ingestion time is often a good compromise between processing time and event time.* It does not need any special manual form of watermark generation, and events are typically* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can* only be introduced by streaming shuffles or split/join/union operations. The fact that* elements are not very much out-of-order means that the latency increase is moderate,* compared to event* time.*/IngestionTime,/*** Event time means that the time of each individual element in the stream (also called event)* is determined by the event's individual custom timestamp. These timestamps either exist in* the elements from before they entered the Flink streaming dataflow, or are user-assigned at* the sources. The big implication of this is that it allows for elements to arrive in the* sources and in all operators out of order, meaning that elements with earlier timestamps may* arrive after elements with later timestamps.** <p>Operators that window or order data with respect to event time must buffer data until they* can be sure that all timestamps for a certain time interval have been received. This is* handled by the so called "time watermarks".** <p>Operations based on event time are very predictable - the result of windowing operations* is typically identical no matter when the window is executed and how fast the streams* operate. At the same time, the buffering and tracking of event time is also costlier than* operating with processing time, and typically also introduces more latency. The amount of* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the* time span between the arrival of early and late elements is. With respect to the* "time watermarks", this means that the cost typically depends on how early or late the* watermarks can be generated for their timestamp.** <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the* event's original time, rather than the time assigned at the data source. Practically, that* means that event time has generally more meaning, but also that it takes longer to determine* that all elements for a certain time have arrived.*/EventTime
}

本案例中使用EventTime语义。

(3) 默认情况下,EventTime时间语义的水位线生成时间间隔为200ms,可查看StreamExecutionEnvironment类中如下方法看到:

	@PublicEvolvingpublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}}

(4)输入的数据格式如下:

hello,1
spring,1
java,1
hello,6
spring,11
hello,7
java,14
java,20
hello,3
hello,5
hello,10
java,21
java,31

 格式为单词,事件时间,例如第一行表示hello这个单词在1s时刻产生的。从数据中可以看到单词是无序的,例如hello,3在后面才出现。

(5)将每行输入数据通过英文逗号切分,转换成一个Word对象,代码如下:

        SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {public Word map(String s) throws Exception {String[] wordArr = s.split(",");return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);}});

 (6)窗口设置

可通过assignTimestampsAndWatermarks为事件抽出时间戳和生成水位线。代码中设置了延迟时间为5s,代码如下:

private Long lateTime = 5 * 1000L;

(7)Word对象流转换成键流

        为了统计每个单词的数量,需要将单词按照单词分流,因此需要一单词为键来统计,代码如下:

        KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {public String getKey(Word word) throws Exception {return word.getWord();}});

(8)为键流设置窗口大小为5s的滚动窗口

WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));

此时键流转换成的窗口流。此时,划分的滚动窗口为[0,5000),[5000,10000),[10000,15000),……。每个窗口含头不含尾,窗口时间单位为ms。

(9)窗口流按照单词对象Word的count字段来统计单词数量,代码如下:

SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");

这里需要Word类必须是POJO,例如该实体类不是public修饰,在单词统计wordWS.sum("count")处会报错如下: 

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.leboop.Word>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:55)at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:1367)at com.leboop.OrderStreamWatermarkDemo.main(OrderStreamWatermarkDemo.java:101)

 (10)打印每个窗口中有哪些事件

        为了知道统计背后的原理,这里打印出每个窗口中的事件,代码如下:

wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {System.out.println("窗口window=" + window + ",窗口中数据input=" + input);}});

 (11)最后打印出统计结果并执行程序

代码如下:

wordCountResult.print();env.execute();

         下面打开bigdata111的socket,并逐行输入数据,如下:

程序输出结果如下: 

默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200
抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}
抽取Word{word='spring', time=11, count=1, watermark=6000}
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='hello', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='spring', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='java', time=1, count=1, watermark=-4000}]
Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=7, count=1, watermark=6000}
抽取Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='java', time=20, count=1, watermark=15000}
窗口window=TimeWindow{start=5000, end=10000},窗口中数据input=[Word{word='hello', time=6, count=1, watermark=1000}, Word{word='hello', time=7, count=1, watermark=6000}]
Word{word='hello', time=6, count=2, watermark=1000}
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='spring', time=11, count=1, watermark=6000}]
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='java', time=14, count=1, watermark=9000}]
Word{word='spring', time=11, count=1, watermark=6000}
Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='hello', time=3, count=1, watermark=15000}
抽取Word{word='hello', time=5, count=1, watermark=15000}
抽取Word{word='hello', time=10, count=1, watermark=15000}
抽取Word{word='java', time=21, count=1, watermark=16000}
抽取Word{word='java', time=31, count=1, watermark=26000}
Word{word='java', time=20, count=2, watermark=15000}
窗口window=TimeWindow{start=20000, end=25000},窗口中数据input=[Word{word='java', time=20, count=1, watermark=15000}, Word{word='java', time=21, count=1, watermark=16000}]

 结果解释:

(1)程序运行后,即会输出如下结果:

默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200

(2)当输入前四行数据

hello,1
spring,1
java,1
hello,6

会调用抽取时间戳extractTimestamp方法,然后会输出

抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}

此时水位线为1000ms,这表示程序认为当前的事件时间才到1000ms,此时窗口[0,5000)还不能触发计算,接着输入下一条数据:

spring,11

此时继续抽取时间戳,输出

抽取Word{word='spring', time=11, count=1, watermark=6000}

注意当前水位线已经到达6000,程序认为事件时间已经到了6000ms,既会触发[0,5000)窗口的计算,此时窗口中的数据有

hello,1
spring,1
java,1

这里注意已经输入的如下两条数据并不在该窗口中:

hello,6
spring,11

 数据会按照输入数据的事件时间正确地分配到每个窗口。因此[0,5000)窗口统计结果为:

Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}

也即hello、spring、java各出现1次。以此类推。

http://www.15wanjia.com/news/11626.html

相关文章:

  • 织梦做的网站南京seo排名公司
  • 网页视频下载快捷指令seo关键词搜索和优化
  • 滨州做网站的北京百度推广开户
  • 如何快速做一个网站站长素材官网
  • 如何查看一个网站是什么程序做的小红书新媒体营销案例分析
  • 网站和网页北京seo公司有哪些
  • 建设网站的价格是多少舆情监测软件
  • 哈尔滨做设计和网站的公司吗百度竞价推广的优势
  • 宁波大型网站设计公司广州关键词seo
  • 网站开发是培训石家庄seo扣费
  • 自动做效果图的网站网络服务提供者知道或者应当知道
  • wordpress修改404seo搜索优化是什么
  • 网站内容优化的主要方法seo优化销售话术
  • 张家港高端网站制作seo内容优化心得
  • 华为荣耀手机最新款搜索引擎优化的定义是什么
  • 做易经类的网站营业推广是什么意思
  • 湖北企业年报网上申报入口杭州seo营销公司
  • 鄂州网警企业网站优化服务
  • 开源asp学校系统网站网络广告策划
  • 在一个网站下建设多个子网站中国搜索引擎
  • 网页版传奇合击网站免费优化
  • 网站大数据怎么做南京seo公司哪家
  • 政府网站建设多少钱关键词是指什么
  • 高新技术企业网站怎么做广告优化师培训
  • wordpress汉化免费企业主题seo外包软件
  • 在线短网址缩短工具南宁白帽seo技术
  • 网站服务类型凡科建站
  • 河南网站建设问一问公司国外直播平台tiktok
  • 赛事竞猜网站开发口碑营销的特点
  • 网站做系统叫什么名字营销型网站有哪些功能