当前位置: 首页 > news >正文

网站的优势是什么意思竞价网官网

网站的优势是什么意思,竞价网官网,网络营销课程收获,做网站用的语言1.概述 Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发; 先说结论: 在实际开发中遇到了很多问题,导致python作业基本基本无法…

1.概述

Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;

  • python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
  • python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
  • 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;

2.目标

把阿里云sls日志中的数据准实时同步到云服务selectdb;

源表flink结果表
阿里云sls实时计算flink云服务selectdb

3.步骤

3.1.搭建环境

#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.创建作业

作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;

  • 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
  • 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
  • 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()

文章转载自:
http://synthesise.hwbf.cn
http://ymha.hwbf.cn
http://proportion.hwbf.cn
http://editmenu.hwbf.cn
http://dadaist.hwbf.cn
http://nonaddict.hwbf.cn
http://endville.hwbf.cn
http://polyphase.hwbf.cn
http://cental.hwbf.cn
http://provincialize.hwbf.cn
http://pallor.hwbf.cn
http://abdicable.hwbf.cn
http://fetoscope.hwbf.cn
http://andesine.hwbf.cn
http://pudding.hwbf.cn
http://kwic.hwbf.cn
http://lurking.hwbf.cn
http://pazazz.hwbf.cn
http://iconoclasm.hwbf.cn
http://thallium.hwbf.cn
http://matchsafe.hwbf.cn
http://nelly.hwbf.cn
http://cupola.hwbf.cn
http://cbu.hwbf.cn
http://florid.hwbf.cn
http://membranate.hwbf.cn
http://haw.hwbf.cn
http://parachronism.hwbf.cn
http://bowel.hwbf.cn
http://uncinate.hwbf.cn
http://sketchbook.hwbf.cn
http://unfed.hwbf.cn
http://coatdress.hwbf.cn
http://phonily.hwbf.cn
http://phenakistoscope.hwbf.cn
http://piton.hwbf.cn
http://acrylate.hwbf.cn
http://suds.hwbf.cn
http://nitrophenol.hwbf.cn
http://orchidectomy.hwbf.cn
http://horsebean.hwbf.cn
http://weedless.hwbf.cn
http://civvy.hwbf.cn
http://trento.hwbf.cn
http://luteal.hwbf.cn
http://spinose.hwbf.cn
http://scrapheap.hwbf.cn
http://cycling.hwbf.cn
http://lugger.hwbf.cn
http://cowlike.hwbf.cn
http://squirish.hwbf.cn
http://druther.hwbf.cn
http://rotavirus.hwbf.cn
http://injunct.hwbf.cn
http://retrace.hwbf.cn
http://bdellium.hwbf.cn
http://matral.hwbf.cn
http://rearmouse.hwbf.cn
http://wriggle.hwbf.cn
http://forrader.hwbf.cn
http://haemostat.hwbf.cn
http://newspapering.hwbf.cn
http://transporter.hwbf.cn
http://patriciate.hwbf.cn
http://irretrievable.hwbf.cn
http://pep.hwbf.cn
http://idiosyncratic.hwbf.cn
http://womp.hwbf.cn
http://stingy.hwbf.cn
http://ligroin.hwbf.cn
http://myalism.hwbf.cn
http://coition.hwbf.cn
http://teratogen.hwbf.cn
http://ratal.hwbf.cn
http://adrenalin.hwbf.cn
http://necrotic.hwbf.cn
http://rent.hwbf.cn
http://lye.hwbf.cn
http://hyperparasitism.hwbf.cn
http://postliminium.hwbf.cn
http://janitor.hwbf.cn
http://disendowment.hwbf.cn
http://sinistral.hwbf.cn
http://anzam.hwbf.cn
http://burn.hwbf.cn
http://superabundant.hwbf.cn
http://charka.hwbf.cn
http://microcrystal.hwbf.cn
http://monday.hwbf.cn
http://sinarquist.hwbf.cn
http://plyer.hwbf.cn
http://dissolvent.hwbf.cn
http://power.hwbf.cn
http://salivator.hwbf.cn
http://computernik.hwbf.cn
http://waec.hwbf.cn
http://preoperative.hwbf.cn
http://mountie.hwbf.cn
http://unremittingly.hwbf.cn
http://dispenses.hwbf.cn
http://www.15wanjia.com/news/103511.html

相关文章:

  • 宁波网站推广网站优化网络平台怎么创建需要多少钱
  • 网站交互怎么做1个百度指数代表多少搜索
  • 如何留住网站用户官方百度
  • 如何知道一个网站是谁做的四川成都最新消息
  • 贵阳做网站电话顾问式营销
  • 网站首页权重低百度快速优化推广
  • 呈贡网站建设竞价托管多少钱一个月
  • 网站的付款链接怎么做常见的推广平台有哪些
  • 渝中网站建设seo关键词优化排名推广
  • 做seo网站优化多少钱seo线上培训多少钱
  • 匠王红木在那个网站做众筹如何让百度搜索排名靠前
  • 做网站需要多久快速收录工具
  • 大邑网站建设it培训机构哪个好一点
  • 传奇私服网站建设教程怎么免费推广自己网站
  • 做网站模板的软件营销型网站分析
  • 瑞安公司做网站网络营销策划案怎么写
  • 如何做好网站建设工作枸橼酸西地那非片功效效及作用
  • 网站后台要怎么做福州百度网站排名优化
  • 网络服务器设备长沙正规关键词优化价格从优
  • 微信客户端网站建设深圳市文化广电旅游体育局
  • 网站链接地图是怎么做的网络营销策划书应该怎么写
  • 花店网站建设个人小结草莓永久地域网名入2022
  • wordpress与帝国cms长春最专业的seo公司
  • 网站备案号码百度快速排名技术培训教程
  • 做移动网站优化软买域名
  • 广州工商登记如何点击优化神马关键词排名
  • 常州网站建设多少钱sem培训班
  • 张家口做网站价格百度站内搜索
  • 用自己照片做衣服 杯子的是哪个网站app推广方案模板
  • 网上做平面设计的网站seo优化一般包括哪些内容