当前位置: 首页 > news >正文

建站公司技术服务费seo网站推广方法

建站公司技术服务费,seo网站推广方法,php做网站导购,开发公司自己买自己的商品房背景 在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢? …

背景

在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?

ProcessWindowFunction处理函数三种状态之间的关系:

1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:

package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";//更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n",name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context){ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);System.out.println("getRuntimeContext() == context :" + (getRuntimeContext() == context));ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(new ValueStateDescriptor<>("myGlobalState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);KeyCount globalValue = contextGlobalValueState.value();if (globalValue == null) {globalValue = new KeyCount();globalValue.key = s;globalValue.count = 0;}globalValue.count += count;contextGlobalValueState.update(globalValue);ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d,    total : %d, windowStateCount :%s, globalStateCount :%s\n",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));}}

输出结果:

window, XXX, 08:34:45 - 08:34:50, 3,    total : 22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window, YYY, 08:34:45 - 08:34:50, 2,    total : 22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}

从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:

ValueState<KeyCount> state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));

在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:

System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));
System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));

结果如下:

contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true

参考文献:
https://cloud.tencent.com/developer/article/1815079

http://www.15wanjia.com/news/31869.html

相关文章:

  • 中国域名查询5g网络优化
  • 深圳富通做网站西安关键词seo公司
  • 常州网站建设外包产品推广文案
  • 深圳小程序网站开发2021近期时事新闻热点事件
  • 做类似电影天堂的网站违法吗seo综合查询平台
  • 做品牌文化的网站百度推广怎么联系
  • 网站个人备案材料千锋教育培训机构可靠吗
  • 个人网站素材图片免费的app推广平台
  • 企业网站首页开发百度论坛首页
  • 建网站和做微信哪个好一份完整的营销策划方案
  • wordpress用户名或密码错误前端性能优化
  • 家具建设企业网站网络推广运营途径
  • 自己做网站怎么赢利谷歌搜索引擎入口363
  • 网站制作软件大全青岛关键词优化seo
  • 博客乐云seo
  • 设计装饰公司排名郑州seo网站管理
  • 直播网站如何做友情链接在线观看
  • 利用php做直播网站百度怎么注册公司网站
  • 经典网站建设网站优化排名方法有哪些
  • 门户网站建设成本百度一下网页首页
  • 要做网站优秀软文范例800字
  • 泸州北京网站建设关键词推广是什么
  • 手机建站cms系统营销型网站外包
  • 制作公司网站关键词挖掘爱站网
  • 阿里做外贸的网站上海seo关键词优化
  • 做英文网站要会什么重庆森林在线观看
  • 零基础培训网页设计seo工具优化软件
  • 集约化政府门户网站建设的优点软文写作的十大技巧
  • 上海网站建设永灿14年品牌最近发生的新闻
  • 怎么从网站上看出做网站的日期衡阳seo