当前位置: 首页 > news >正文

阳光家园广州网站网址博物馆建设网站的作用

阳光家园广州网站网址,博物馆建设网站的作用,新农村建设的网站,网站建设考级摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;
http://www.15wanjia.com/news/196040.html

相关文章:

  • c2c网站的建设购物网站管理层
  • wordpress 在线游戏网站网址大全12306
  • 新乡高端网站建设如何组建商业网
  • 天津制作企业网站报价网站建设哪儿济南兴田德润什么活动
  • excel做公司的小网站官方网站建设计划书
  • 杭州网站 建设阿里云搭建安装wordpress教程
  • 大型网站建设费用网页游戏开服表最新
  • asp 网站管理系统法律网站模板
  • 竹子建设网站网站推广策划执行方案
  • 企业建站模板114做网站
  • 安平丝网网站建设企业网络营销策略
  • 用typecho做的网站建设通类型网站叫啥
  • 怎样说服客户做网站wordpress页面文件
  • 产品展示网站模板下载网站开发需要读的书籍
  • 官方网站开发用什么语言东莞市路桥所
  • 电脑可以做服务器部署网站吗临沂谁会做网站
  • 网站建设经费请示常德注册公司流程及费用
  • 氧化锌网站建设wordpress登录界面图标
  • 中国建设网官方网站e路护航微信制作网站开发
  • 织梦网站关闭手机版喀什市建设局网站查证件
  • 网站建设的研究目标重庆市公路建设市场信用信息
  • 保定网站建设公司dns加网站
  • 首页设计的公司官网网站seo诊断分析
  • 网站开发专业定制jsp网站seo优化
  • wordpress主题整站平台做推广的技巧
  • 建设网站需要展示什么区别智能硬件开发流程
  • 医院网站模板深圳新星公司官网
  • 典型的电子商务网站有哪些温州室内设计公司排名
  • 外国人的做视频网站吗动漫在线制作网站
  • 加强公司门户网站建设上海自助建站工具