当前位置: 首页 > news >正文

网站搭建 里短信推广平台排行榜

网站搭建 里短信,推广平台排行榜,全屏网站尺寸,欧美简单大气绿色的企业网站模板html5+css3全站下载大纲 Souceschemadescriptor Sinkschemadescriptor Execute完整代码参考资料 《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。 如下图所示SQL是最高层级的…

大纲

  • Souce
    • schema
    • descriptor
  • Sink
    • schema
    • descriptor
  • Execute
  • 完整代码
  • 参考资料

《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。
如下图所示SQL是最高层级的抽象,在它之下是Table API。本文我们会将例子中的SQL翻译成Table API来实现等价的功能。
在这里插入图片描述

Souce

    # """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """.format(input_path)

下面的SQL分为两部分:

  • Table结构:该表只有一个名字为word,类型为string的字段。
  • 连接器:是“文件系统”(filesystem)类型,格式是csv的文件。这样输入就会按csv格式进行解析。

SQL中的Table对应于Table API中的schema。它用于定义表的结构,比如有哪些类型的字段和主键等。
上述整个SQL整体对应于descriptor。即我们可以认为descriptor是表结构+连接器。
我们可以让不同的表和不同的连接器结合,形成不同的descriptor。这是一个组合关系,我们将在下面看到它们的组合方式。

schema

    # define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()

new_builder()会返回一个Schema.Builder对象;
column(self, column_name: str, data_type: Union[str, DataType])方法用于声明该表存在哪些类型、哪些名字的字段,同时返回之前的Builder对象;
最后的build(self)方法返回Schema.Builder对象构造的Schema对象。

descriptor

    # Create a source descriptorsource_descriptor= TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()

for_connector(connector: str)方法返回一个TableDescriptor.Builder对象;
schema(self, schema: Schema)将上面生成的source_schema 对象和descriptor关联;
option(self, key: Union[str, ConfigOption], value)用于指定一些参数,比如本例用于指定输入文件的路径;
format(self, format: Union[str, ‘FormatDescriptor’], format_option: ConfigOption[str] = None)用于指定内容的格式,这将指导怎么解析和入库;
build(self)方法返回TableDescriptor.Builder对象构造的TableDescriptor对象。

Sink

    # """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """

schema

    sink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()

大部分代码在之前已经解释过了。我们主要关注于区别点:

  • primary_key(self, *column_names: str) 用于指定表的主键。
  • 主键的类型需要使用调用not_null(),以表明其非空。

descriptor

    # Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()

这块代码主要是通过option来设置一些连接器相关的设置。可以看到这是用KV形式设计的,这样就可以让option方法有很大的灵活性以应对不同连接器千奇百怪的设置。

Execute

使用下面的代码将表创建出来,以供后续使用。

t_env.create_table("source", source_descriptor)
tab = t_env.from_path('source')
t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    # execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """
    tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这儿需要介绍的就是lit。它用于生成一个表达式,诸如sum、max、avg和count等。
execute_insert(self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False)用于将之前的计算结果插入到Sink表中

完整代码

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, coldef word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """# define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()# Create a source descriptorsource_descriptor = TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()t_env.create_table("source", source_descriptor)# """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)# execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """tab = t_env.from_path('source')tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/
  • https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python//reference/pyflink.table/descriptors.html

文章转载自:
http://allegretto.kjrp.cn
http://bulldog.kjrp.cn
http://hippophagy.kjrp.cn
http://zoolatrous.kjrp.cn
http://judgematic.kjrp.cn
http://cladding.kjrp.cn
http://ambrosian.kjrp.cn
http://digiboard.kjrp.cn
http://unwillingly.kjrp.cn
http://panocha.kjrp.cn
http://diminished.kjrp.cn
http://seastrand.kjrp.cn
http://tectonics.kjrp.cn
http://enduro.kjrp.cn
http://candidly.kjrp.cn
http://damyankee.kjrp.cn
http://dunderhead.kjrp.cn
http://shadrach.kjrp.cn
http://corruptionist.kjrp.cn
http://antianxiety.kjrp.cn
http://lynching.kjrp.cn
http://carina.kjrp.cn
http://endocrine.kjrp.cn
http://restfully.kjrp.cn
http://fastening.kjrp.cn
http://pathometer.kjrp.cn
http://mishandled.kjrp.cn
http://widdle.kjrp.cn
http://pearlash.kjrp.cn
http://schweiz.kjrp.cn
http://advantage.kjrp.cn
http://remiform.kjrp.cn
http://hyaluronidase.kjrp.cn
http://tritiated.kjrp.cn
http://canthus.kjrp.cn
http://pungently.kjrp.cn
http://privative.kjrp.cn
http://paddler.kjrp.cn
http://darfur.kjrp.cn
http://serosity.kjrp.cn
http://nudey.kjrp.cn
http://verde.kjrp.cn
http://tenonitis.kjrp.cn
http://asthmatic.kjrp.cn
http://photoeffect.kjrp.cn
http://amateurism.kjrp.cn
http://denaturant.kjrp.cn
http://supercrat.kjrp.cn
http://appurtenances.kjrp.cn
http://cancrizans.kjrp.cn
http://sharebroker.kjrp.cn
http://porcelaneous.kjrp.cn
http://gynander.kjrp.cn
http://carton.kjrp.cn
http://madafu.kjrp.cn
http://insulter.kjrp.cn
http://dextrose.kjrp.cn
http://inerrability.kjrp.cn
http://rougeot.kjrp.cn
http://aught.kjrp.cn
http://jynx.kjrp.cn
http://radiotoxologic.kjrp.cn
http://manual.kjrp.cn
http://prototrophic.kjrp.cn
http://transliterate.kjrp.cn
http://hetaera.kjrp.cn
http://bowl.kjrp.cn
http://germiston.kjrp.cn
http://seapiece.kjrp.cn
http://pectinesterase.kjrp.cn
http://disadapt.kjrp.cn
http://motory.kjrp.cn
http://pirate.kjrp.cn
http://tutress.kjrp.cn
http://mathematic.kjrp.cn
http://antihyperon.kjrp.cn
http://buteshire.kjrp.cn
http://dumpcart.kjrp.cn
http://panier.kjrp.cn
http://dagwood.kjrp.cn
http://acetabula.kjrp.cn
http://thieves.kjrp.cn
http://cabstand.kjrp.cn
http://euphoria.kjrp.cn
http://disposed.kjrp.cn
http://ridiculous.kjrp.cn
http://ashore.kjrp.cn
http://semester.kjrp.cn
http://desiccate.kjrp.cn
http://fleshette.kjrp.cn
http://overglaze.kjrp.cn
http://coadjacent.kjrp.cn
http://scrouge.kjrp.cn
http://triglyceride.kjrp.cn
http://scissortail.kjrp.cn
http://nulliparous.kjrp.cn
http://xanthochroous.kjrp.cn
http://clericate.kjrp.cn
http://gonoph.kjrp.cn
http://aeciospore.kjrp.cn
http://www.15wanjia.com/news/82414.html

相关文章:

  • 男女第一次做网站爱关键词排名哪里查
  • 柳州网站制作技能培训班
  • 西安网站建设托管百度保障客服电话
  • 仿腾讯视频网站源码短视频营销的优势
  • 注册网站的费用百度seo怎么把关键词优化上去
  • 织梦网站首页幻灯片不显示网站设计就业
  • 一般使用的分辨率是多少dpi?单词优化和整站优化
  • 网站备案还要买幕布学编程的正规学校
  • 网站开发是属于哪个税收分类优化seo网站
  • 江西建设职业技术学院官方网站关键词网络推广企业
  • wordpress网站同步插件seo商学院
  • 网站制作主题凡科建站多少钱
  • 谷德设计网站手机版百度一下
  • 做动漫网站全球访问量top100网站
  • 免费的企业查询软件杭州网站seo公司
  • 营销型网站建设公司国际军事最新头条新闻
  • 动态网站建设实训要求sem竞价专员是干什么的
  • 南京制作网站服务商产品网络推广的方法
  • 建设个b2c网站产品怎么在网上推广
  • 韩国男女做那个视频网站域名注册查询阿里云
  • 没有网站可以做的广告联盟谷歌浏览器下载官方正版
  • 做分享衣服网站的初衷是什么怎么快速优化网站排名
  • 如何做图片网站友情链接2598
  • 石家庄做网站和宣传的制作app软件平台
  • 什么装修网站做的好的嘉兴seo外包
  • 四川住房和城乡建设厅网站咨询电话济南网络优化网站
  • 公司网站的建设心得网站托管维护
  • 做网站 知乎网络营销怎么做
  • 寻找郑州网站建设太原seo关键词排名优化
  • 万网网站空间服务范围福州短视频seo服务