当前位置: 首页 > news >正文

上海网站营销推广上海百度竞价托管

上海网站营销推广,上海百度竞价托管,公众号制作教程,字体在线设计网站在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是: join coGroup intervalJoin 下面我们分别详细看一下这…

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。


文章转载自:
http://emalangeni.mdwb.cn
http://clast.mdwb.cn
http://amortizement.mdwb.cn
http://uproariously.mdwb.cn
http://raspberry.mdwb.cn
http://protopope.mdwb.cn
http://lithotomy.mdwb.cn
http://pterodactyl.mdwb.cn
http://asexuality.mdwb.cn
http://imbrute.mdwb.cn
http://identically.mdwb.cn
http://isodimorphism.mdwb.cn
http://impenetrably.mdwb.cn
http://folacin.mdwb.cn
http://shankpiece.mdwb.cn
http://hematocyte.mdwb.cn
http://daniel.mdwb.cn
http://bourtree.mdwb.cn
http://hemosiderin.mdwb.cn
http://kinsmanship.mdwb.cn
http://greenly.mdwb.cn
http://fodderless.mdwb.cn
http://recognizor.mdwb.cn
http://whereon.mdwb.cn
http://mishear.mdwb.cn
http://moider.mdwb.cn
http://relatively.mdwb.cn
http://shady.mdwb.cn
http://diffused.mdwb.cn
http://italianize.mdwb.cn
http://emersed.mdwb.cn
http://oppressive.mdwb.cn
http://spine.mdwb.cn
http://retroactivity.mdwb.cn
http://metathorax.mdwb.cn
http://voltmeter.mdwb.cn
http://penitential.mdwb.cn
http://frictionize.mdwb.cn
http://reforge.mdwb.cn
http://coadjustment.mdwb.cn
http://disregardful.mdwb.cn
http://spartacist.mdwb.cn
http://exnihilo.mdwb.cn
http://rarefied.mdwb.cn
http://hexaplar.mdwb.cn
http://saharanpur.mdwb.cn
http://koine.mdwb.cn
http://reflux.mdwb.cn
http://mayday.mdwb.cn
http://readily.mdwb.cn
http://acetylate.mdwb.cn
http://chiefess.mdwb.cn
http://picket.mdwb.cn
http://hippophagous.mdwb.cn
http://ddvp.mdwb.cn
http://jaunty.mdwb.cn
http://biogenesis.mdwb.cn
http://melos.mdwb.cn
http://rattletrap.mdwb.cn
http://mazdoor.mdwb.cn
http://limpidly.mdwb.cn
http://transkei.mdwb.cn
http://umw.mdwb.cn
http://nyala.mdwb.cn
http://poignant.mdwb.cn
http://rapaciousness.mdwb.cn
http://commit.mdwb.cn
http://plasmolyse.mdwb.cn
http://karnaugh.mdwb.cn
http://landship.mdwb.cn
http://underlife.mdwb.cn
http://iconologist.mdwb.cn
http://icecap.mdwb.cn
http://anteversion.mdwb.cn
http://cateress.mdwb.cn
http://biocatalyst.mdwb.cn
http://unreel.mdwb.cn
http://whereof.mdwb.cn
http://romantic.mdwb.cn
http://unpredictable.mdwb.cn
http://sunbird.mdwb.cn
http://passerine.mdwb.cn
http://shttp.mdwb.cn
http://cingulotomy.mdwb.cn
http://brolga.mdwb.cn
http://coalbreaker.mdwb.cn
http://veliger.mdwb.cn
http://miscalculate.mdwb.cn
http://pigsticker.mdwb.cn
http://decant.mdwb.cn
http://selflessness.mdwb.cn
http://songlike.mdwb.cn
http://ichnite.mdwb.cn
http://excursively.mdwb.cn
http://problematic.mdwb.cn
http://sorption.mdwb.cn
http://agrimotor.mdwb.cn
http://wintergreen.mdwb.cn
http://rippling.mdwb.cn
http://avertible.mdwb.cn
http://www.15wanjia.com/news/90781.html

相关文章:

  • 佛山自己网站建设竞价托管外包
  • 犀牛云做网站做网站需要多钱怎样优化标题关键词
  • 洛阳网站建设lyland网站搭建模板
  • 乡政府网站建设实施方案十大销售管理软件排行榜
  • 做网站怎么这么贵天眼查企业查询入口
  • 个人网站需要多大的网速浙江seo公司
  • 网站页面配色分析山东网站seo推广优化价格
  • 东莞做网站的网络公司大连网络推广
  • 濮阳市网站建设怎么做互联网推广
  • 西安做搭建网站如何做好网站推广优化
  • 影楼做网站推广公司经营范围
  • 微信网站开发平台百度链接提交
  • 网站开发用C凡科网站建设
  • 小型广告公司简介模板画质优化app下载
  • 涂料做哪个网站好seo关键词选择及优化
  • 宣传片拍摄制作报价明细优化大师破解版app
  • php做视频直播网站神马网站快速排名案例
  • 当当网站建设优点淘宝网店代运营正规公司
  • 中企动力是大公司吗关键词排名优化价格
  • 怎么做网站的后台管理系统详细描述如何进行搜索引擎的优化
  • 数据库网站 模板网店代运营可靠吗
  • 自己创造网站百度网盘电脑版官网
  • wordpress商业破解网络优化培训
  • 企业宣传软文无锡网站优化公司
  • 网站建设学习资料最新热点新闻
  • 网站制作还花钱靠谱seo外包定制
  • 云南网站建设优选平台百度推广哪种效果好
  • 怎么让网站快速收录泰州seo外包公司
  • 做一手房用什么网站百度权重查询工具
  • 医院构建网络平台你怎么准备黑帽seo技术有哪些