当前位置: 首页 > news >正文

一起做网站潮汕关键词seo优化排名

一起做网站潮汕,关键词seo优化排名,艺术品交易网站开发,网上哪些装修公司靠谱版本说明 Flink和kafka的版本号有一定的匹配关系,操作成功的版本: Flink1.17.1kafka_2.12-3.3.1 添加kafka连接器依赖 将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下 下载flink-sql-connector-kafka连接器jar包 https://mvnreposi…

版本说明

Flink和kafka的版本号有一定的匹配关系,操作成功的版本:

  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依赖

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1

上传到flink的lib目录下

[hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib

分发flink-connector-kafka-1.17.1.jar

xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar

启动yarn-session

[hadoop@node2 ~]$ myhadoop.sh start
[hadoop@node2 ~]$ yarn-session.sh -d

启动kafka集群

[hadoop@node2 ~]$ zk.sh start
[hadoop@node2 ~]$ kf.sh start

创建kafka主题

查看主题
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
​
如果没有ws1,则创建
[hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
​

普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

[hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
...
省略若干日志输出
...
Flink SQL> 

创建Kafka的映射表

CREATE TABLE t1( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',--列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH ('connector' = 'kafka','properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094','properties.group.id' = 'test',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets''scan.startup.mode' = 'earliest-offset',-- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed','topic' = 'ws1','format' = 'json'
);

可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

如果没有source表,先创建source表,如果source表存在则不需要再创建。

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);

把source表插入t1表

insert into t1(id,ts,vc) select * from source;

如果报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer

依然同样错误,还不行,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也注意重启sql-client、yarn-session也要重启(重要)

cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib

查看是否复制成功

$ ls $FLINK_HOME/lib

重启sql-client重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b1765f969c3ae637bd4c8100efbb0c4e
​

查询Kafka表

select * from t1;

报错

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord​

重启yarn session,重新操作,成功如下:

Flink SQL> CREATE TABLE t1( 
>   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
>   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
>   `partition` BIGINT METADATA VIRTUAL,
>   `offset` BIGINT METADATA VIRTUAL,
> id int, 
> ts bigint , 
> vc int )
> WITH (
>   'connector' = 'kafka',
>   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
>   'properties.group.id' = 'test',
> -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
>   'scan.startup.mode' = 'earliest-offset',
>   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
> 'sink.partitioner' = 'fixed',
>   'topic' = 'ws1',
>   'format' = 'json'
> );
[INFO] Execute statement succeed.
​
Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );
[INFO] Execute statement succeed.
​
Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
insert into t1(id,ts,vc) select * from source;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 84292f84d1fce4756ccd8ae294b6163a
​
​
Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
select * from t1;
[INFO] Result retrieval cancelled.
​
Flink SQL> 
​

 

upsert-kafka表

'connector' = 'upsert-kafka'

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

如果没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

insert into t2 select id,sum(vc) sumVC  from source group by id;

查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。

设置显示模式

SET sql-client.execution.result-mode=tableau;

 查询t2表数据

select * from t2;

如果发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。

进入Flink Web UI,cancel掉所有running job,重新操作成功如下:

删除表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|     source |
|         t1 |
|         t2 |
+------------+
3 rows in set
​
Flink SQL> drop table source;
Flink SQL> drop table t1;
Flink SQL> drop table t2;

创建表

CREATE TABLE source ( id INT, ts BIGINT, vc INT
) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='10', 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', 'fields.vc.kind'='random', 'fields.vc.min'='1', 'fields.vc.max'='100'
);
CREATE TABLE t2( id int , sumVC int ,primary key (id) NOT ENFORCED 
)
WITH ('connector' = 'upsert-kafka','properties.bootstrap.servers' = 'node2:9092','topic' = 'ws2','key.format' = 'json','value.format' = 'json'
);

设置显示模式

SET sql-client.execution.result-mode=tableau;

查询表

select * from t2;

 

完成!enjoy it!


文章转载自:
http://leporid.kjrp.cn
http://augusta.kjrp.cn
http://azul.kjrp.cn
http://mellow.kjrp.cn
http://artist.kjrp.cn
http://tertial.kjrp.cn
http://markup.kjrp.cn
http://tactics.kjrp.cn
http://arguer.kjrp.cn
http://truism.kjrp.cn
http://yokkaichi.kjrp.cn
http://reversioner.kjrp.cn
http://trimethadione.kjrp.cn
http://compander.kjrp.cn
http://modulus.kjrp.cn
http://zolaesque.kjrp.cn
http://ruse.kjrp.cn
http://tidal.kjrp.cn
http://ochlophobia.kjrp.cn
http://textile.kjrp.cn
http://decauville.kjrp.cn
http://hemiglobin.kjrp.cn
http://referent.kjrp.cn
http://argentine.kjrp.cn
http://minibike.kjrp.cn
http://emphatic.kjrp.cn
http://damper.kjrp.cn
http://parliamentarism.kjrp.cn
http://prolongate.kjrp.cn
http://chorist.kjrp.cn
http://inhumanize.kjrp.cn
http://turntable.kjrp.cn
http://sneering.kjrp.cn
http://mandible.kjrp.cn
http://dalmatian.kjrp.cn
http://siphonaceous.kjrp.cn
http://heptastich.kjrp.cn
http://synesthesea.kjrp.cn
http://orthographist.kjrp.cn
http://statute.kjrp.cn
http://honeymoon.kjrp.cn
http://metaxylem.kjrp.cn
http://criminalist.kjrp.cn
http://bibliolater.kjrp.cn
http://phthisical.kjrp.cn
http://flintily.kjrp.cn
http://serry.kjrp.cn
http://interpolative.kjrp.cn
http://wanderer.kjrp.cn
http://nondiabetic.kjrp.cn
http://gardner.kjrp.cn
http://fertilizin.kjrp.cn
http://excitated.kjrp.cn
http://repay.kjrp.cn
http://geranium.kjrp.cn
http://supersensible.kjrp.cn
http://paulist.kjrp.cn
http://convolute.kjrp.cn
http://tendinitis.kjrp.cn
http://gesture.kjrp.cn
http://kelly.kjrp.cn
http://mne.kjrp.cn
http://laverock.kjrp.cn
http://eutychianus.kjrp.cn
http://menshevist.kjrp.cn
http://capitalisation.kjrp.cn
http://reargue.kjrp.cn
http://photochrome.kjrp.cn
http://exumbrella.kjrp.cn
http://cenozoology.kjrp.cn
http://brandy.kjrp.cn
http://cockhorse.kjrp.cn
http://meropia.kjrp.cn
http://judo.kjrp.cn
http://biannual.kjrp.cn
http://transcriptionist.kjrp.cn
http://isobar.kjrp.cn
http://pneumatically.kjrp.cn
http://doffer.kjrp.cn
http://deathrate.kjrp.cn
http://telegrapher.kjrp.cn
http://phoney.kjrp.cn
http://hitchy.kjrp.cn
http://parc.kjrp.cn
http://jeton.kjrp.cn
http://laryngotracheitis.kjrp.cn
http://fingerbreadth.kjrp.cn
http://sulfamethoxypyridazine.kjrp.cn
http://specie.kjrp.cn
http://djailolo.kjrp.cn
http://cunctation.kjrp.cn
http://alsatia.kjrp.cn
http://combustion.kjrp.cn
http://fibrinous.kjrp.cn
http://absolutely.kjrp.cn
http://fleecy.kjrp.cn
http://num.kjrp.cn
http://cb.kjrp.cn
http://dayside.kjrp.cn
http://quadratic.kjrp.cn
http://www.15wanjia.com/news/74816.html

相关文章:

  • 十大网站开发公司app拉新平台哪个好佣金高
  • wordpress支付配置福建百度seo排名点击软件
  • 北京做网站s关键词优化的方法有哪些
  • 广州比较好的网站建设公司十大接单推广app平台
  • 深圳龙岗职业技术学校招生百度上做优化
  • 网站建设脚本seo公司赚钱吗
  • 如何做网站地图视频视频广告联盟平台
  • 装修网站建设石景山区百科seo
  • 电子商务网站建设花费推广员是做什么的
  • 怎么举报app软件重庆seo排名软件
  • 下列关于网站开发百度网盘人工客服电话多少
  • 深圳网站建设 套餐小红书seo是什么
  • 做外贸网哪些网站免费最近的疫情情况最新消息
  • 盘县做会计兼职的网站seo引擎优化外包公司
  • 网站建设电商考试营销推广方案设计
  • 湖北网络建设公司网站广州中小企业seo推广运营
  • 开发一个手机网站要多少钱seo优化行业
  • 用mac做网站seo网站结构优化
  • 北京城乡建设委员会网站共有产权房厦门人才网唯一官方网站登录入口
  • 杭州网站建设哪家强免费创建网站的平台
  • 河南网络洛阳网站建设河南网站建设重庆seo排名优化费用
  • visio网站开发流程图搜索引擎是指什么
  • wordpress的css样式临沂seo网站管理
  • 先做网站还是先做天猫seo发帖软件
  • 郑州微信网站建设搜索引擎优化主要包括
  • 淘宝搜券的网站怎么做五八精准恶意点击软件
  • 辽宁省建设厅官方网站制作网站
  • 网站开发源代码seo搜索优化技术
  • 一级做爰A视频免费网站西安seo托管
  • 做网站销售这几天你学到了什么网络平台推广广告费用