当前位置: 首页 > news >正文

福州市建设局网站黄页88

福州市建设局网站,黄页88,上海公司名义买房条件,程序员网站说明 暂时考虑的场景是单条数据处理特别复杂和耗时的场景。 场景如下: 要对一篇文档进行实体处理,然后再对实体进行匹配。在这个过程当中,涉及到了好几部分服务: 1 实体识别服务2 数据库查询服务3 es查询服务 整个处理包成了服…

说明

暂时考虑的场景是单条数据处理特别复杂和耗时的场景。

场景如下:

要对一篇文档进行实体处理,然后再对实体进行匹配。在这个过程当中,涉及到了好几部分服务:

  • 1 实体识别服务
  • 2 数据库查询服务
  • 3 es查询服务

整个处理包成了服务,在单线程处理增量的时候非常正常,但尝试进行并行调用的时候出现了问题。每次报错的时候都是显示Connection Reset By Peer,感觉像是服务端连接的问题。由于每一部分都可能是瓶颈,我没(时间)法准确定位问题所在,很有可能是同时起了5个实体识别,GPU的抢占导致的问题(负载经常在100%)。

所以,这个事有两个启示:

  • 1 对于One处理的设计,是否可以保存中间关键步骤的跟踪?
  • 2 系统资源瓶颈。服务资源是瓶颈(GPU、网络、数据库IO),如果目标是瓶颈资源(Server模式),很容易出现失败。反过来,如果从瓶颈资源出发,尽力而为(Worker)反而会有更高的资源利用率。但是,Server模式是面向消费者的,而Worker模式则是面向生产者的,如果我们要把工作交出去,还是应该采用Server模式。

所以要解决的问题是:在保证逻辑正确的情况下,且有大量miss的情况下,如何尽快的完成业务上的任务。

内容

在数据处理架构中,可靠性与效率也是一对钳制指标,这类型指标混在一起是不行的,必须要分开。所以在机器学习里有:

  • 1 精确率与召回率。是分开来研究的,当然也有最终融合的指标(F1Score)
  • 2 TCP/IP。

TCP/IP 协议栈中的两个核心协议是:

TCP(传输控制协议):提供可靠的、面向连接的通信服务,确保数据包按顺序到达且无错误。

IP(互联网协议):负责将数据包从源地址传输到目的地址,但不保证传输的可靠性。

在我的业务场景下,TCP可以视为数据库+比对,而IP则视为队列+处理

(TCP模式)数据库的设计目的就是为了可靠、长期地保存大量的数据,我们把任务、结果以及里程碑节点(中间过程)保存在数据库中是合适的。现在假设只有两个节点:任务和结果。(虽然我发现,在有明显瓶颈的地方是实体识别,这里应该独立一个里程碑节点出来)

(IP模式)队列的设计目的则是缓冲和分发。缓冲是解决生产者的困扰,这样不需要知道机器的能力是多少,把要做的任务发完就好了。然后还可以解决“众包”问题,通过多个worker进行任务分摊,尽可能快的执行任务。在这个过程中,必然会产生大量的不确定问题,导致worker处理、交付失败的。这也是快所要付出的代价。

通过数据库 + 队列的组合,就可以做到既快又好。

1 任务数据入库

简单起见,暂时还是采用mysql。

先将原始数据灌到source表。

之前用离线方式跑了一部分数据,将这部分数据搬到 result表。

right_files = list_file_names_without_extension(right_path)
res_file_list = []
get_right_file_names = []
for the_file in list(right_files):the_tem_data = from_pickle(the_file, './right/')get_right_file_names.append(the_file)res_file_list.append(the_tem_data)res_file_list1 = flatten_list(res_file_list)
# 过滤掉失败的
res_file_list2 = [x for x in res_file_list1 if x !='detail']
right_res_df = pd.DataFrame(res_file_list2)# 引入与数据库表规范对接的数据模型
from pydantic import BaseModel,field_validator
class DocEnt(BaseModel):doc_id : strent_list : list maaped_ent: list @propertydef ent_list_str(self):return ','.join(self.ent_list)@propertydef mapped_list_str(self):return ','.join(self.maaped_ent)def dict(self):data_dict = {}data_dict['doc_id'] = self.doc_iddata_dict['ent_list_str'] = self.ent_list_strdata_dict['mapped_list_str'] = self.mapped_list_strreturn data_dictfrom typing import Listclass DocEnt_list(BaseModel):data_list: List[DocEnt]# 将结果数据转为可被数据库接受的字段模式
docent_list = DocEnt_list(data_list = right_res_df.to_dict(orient='records'))
docent_list1 = [x.dict() for x in docent_list.data_list]

将合法的数据结果与ORM对接,先引入数据模型。

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func, Text, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetimem7_24013_url = f"mysql+pymysql://xxx:xxx@172.17.0.1:24013/mydb"# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
m7_engine = create_engine(m7_24013_url)# 创建基类
Base = declarative_base()# 定义数据模型
class DocEntMap(Base):__tablename__ = 'doc_ent_map'id = Column(Integer, primary_key=True)# CompileError: (in table 'users', column 'name'): VARCHAR requires a length on dialect mysqldoc_id = Column(String(50))ent_list_str = Column(Text)mapped_list_str = Column(Text)create_time = Column(DateTime, default=lambda: datetime.now())# 创建索引__table_args__ = (Index('idx_doc_id', doc_id),Index('idx_create_time', create_time),)Base.metadata.create_all(m7_engine)
# 创建会话
Session = sessionmaker(bind=m7_engine)

分批次存储数据

ent_map_lb = ListBatchIterator(docent_list1, 1000)
import tqdm
with Session() as session:for i,some_list in tqdm.tqdm(enumerate(ent_map_lb)):test_list = [DocEntMap(**x) for x in some_list]# 一次性添加到会话中session.add_all(test_list)# 提交会话session.commit()

因为是mysql,我按照每批1000来操作,每秒能存2批,这个速度也能接受了。

2 比较差集

方式一:mysql不支持

SELECT id FROM table1
EXCEPT DISTINCT
SELECT id FROM table2;

方式2:left join

-- 查找在 table1 中存在但在 table2 中不存在的 id
SELECT t1.id FROM table1 t1
LEFT JOIN table2 t2 ON t1.id = t2.id
WHERE t2.id IS NULL;

我使用Sqlalchmey进行比较并获取数据,稍慢,但方法简单

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func, Text, Index
from sqlalchemy.orm import sessionmaker,declarative_base
from datetime import datetimem7_24013_url = f"mysql+pymysql://xxx:xxx@172.17.0.1:24013/mydb"# from urllib.parse import quote_plus
# the_passed = quote_plus('!@#*')
# # 创建数据库引擎
m7_engine = create_engine(m7_24013_url)# 创建基类
Base = declarative_base()# 定义数据模型
class DocEntMap(Base):__tablename__ = 'doc_ent_map'id = Column(Integer, primary_key=True)# CompileError: (in table 'users', column 'name'): VARCHAR requires a length on dialect mysqldoc_id = Column(String(50))ent_list_str = Column(Text)mapped_list_str = Column(Text)create_time = Column(DateTime, default=lambda: datetime.now())# 创建索引__table_args__ = (Index('idx_doc_id', doc_id),Index('idx_create_time', create_time),)# 定义模型类
class SourceData(Base):__tablename__ = 'source_data'id = Column(Integer, primary_key=True)mid = Column(String(50))content = Column(Text)created = Column(String(50))def dict(self):data_dict = {}data_dict['doc_id'] = self.middata_dict['text'] = self.contentreturn data_dict# 创建表(如果表已经存在,这一步将忽略)
Base.metadata.create_all(m7_engine)# 创建会话
Session = sessionmaker(bind=m7_engine)# 分页查询
page = 1
page_size = 1000while True:offset = (page - 1) * page_sizeresult = session.query(SourceData).filter(~SourceData.mid.in_(session.query(DocEntMap.doc_id))).offset(offset).limit(page_size).all()if not result:breakif page % 10 ==0:print(page)resent_task_list = [x.dict() for x in result]produces = Producer(servers = 'KAFKASERVER',raw_msg_list = resent_task_list, topic='the_topic' )resp = req.post('http://agent:port/send_msg/',json = produces.dict()).json()page += 1

翻页到后面还是慢的,不过确实比较简单,省事。

如果使用clickhouse,选取列的速度还是非常快的。要做好索引之后再取数应该效率会比较高。后续再看吧,我现在都倾向先用lazy版的。

sql="""SELECT mid FROM sourceEXCEPTSELECT doc_id FROM target
;
"""
data=chc._exe_sql(sql)

3 kafka 消费

消费也是通过kafka agent来做的,但是比我之前在本机跑慢。我猜是因为worker在处理完任务,请求下一个时,无论多快,都要进行序列化。而且因为通过agent进行,消费者需要对数据进行两次序列化,这个还是会比较耗时的。

我的想法是,通过一个专门的数据拉取程序,事先将数据从kafka上拉下来,以short_uuid命名,然后存在本地的left。然后本地的worker从left中取数。

为什么要大费周章从kafka取,而不是直接从数据库取?

主要目的是为了更好的分发。例如此时又要加另外两台机器协同处理,难道我还要在手工分配数据吗?

纯粹取数的程序,消费速度一定比处理程序快多了,这样就避免了每次要处理时才进行序列化。


文章转载自:
http://comtism.qwfL.cn
http://causeuse.qwfL.cn
http://cyclostome.qwfL.cn
http://oligarchy.qwfL.cn
http://dehumanization.qwfL.cn
http://exgratia.qwfL.cn
http://decompound.qwfL.cn
http://allophonic.qwfL.cn
http://biologist.qwfL.cn
http://tollgatherer.qwfL.cn
http://placage.qwfL.cn
http://sinistrocular.qwfL.cn
http://demitint.qwfL.cn
http://hedgerow.qwfL.cn
http://folliculosis.qwfL.cn
http://wildling.qwfL.cn
http://orthowater.qwfL.cn
http://trivially.qwfL.cn
http://eonism.qwfL.cn
http://imperceptibility.qwfL.cn
http://sexology.qwfL.cn
http://rebreathe.qwfL.cn
http://standardbearer.qwfL.cn
http://aquiform.qwfL.cn
http://chamber.qwfL.cn
http://precedency.qwfL.cn
http://hemipterous.qwfL.cn
http://overblouse.qwfL.cn
http://hymnary.qwfL.cn
http://scientifically.qwfL.cn
http://deformity.qwfL.cn
http://submergible.qwfL.cn
http://coronavirus.qwfL.cn
http://suppress.qwfL.cn
http://cheder.qwfL.cn
http://imbody.qwfL.cn
http://tosspot.qwfL.cn
http://iaa.qwfL.cn
http://fukushima.qwfL.cn
http://bowl.qwfL.cn
http://truantry.qwfL.cn
http://heroically.qwfL.cn
http://zootomist.qwfL.cn
http://callosity.qwfL.cn
http://propeller.qwfL.cn
http://periostitis.qwfL.cn
http://raunchy.qwfL.cn
http://parsley.qwfL.cn
http://omber.qwfL.cn
http://slap.qwfL.cn
http://predella.qwfL.cn
http://animalcule.qwfL.cn
http://glazer.qwfL.cn
http://tempest.qwfL.cn
http://gnomology.qwfL.cn
http://reversion.qwfL.cn
http://bbe.qwfL.cn
http://flannelette.qwfL.cn
http://phonogram.qwfL.cn
http://marine.qwfL.cn
http://periostitis.qwfL.cn
http://coprophobia.qwfL.cn
http://epichorial.qwfL.cn
http://supergravity.qwfL.cn
http://gyroidal.qwfL.cn
http://natationist.qwfL.cn
http://wormless.qwfL.cn
http://witted.qwfL.cn
http://coden.qwfL.cn
http://clothing.qwfL.cn
http://fallaciously.qwfL.cn
http://innocuous.qwfL.cn
http://atilt.qwfL.cn
http://affront.qwfL.cn
http://sulfuric.qwfL.cn
http://awheel.qwfL.cn
http://perinde.qwfL.cn
http://hetero.qwfL.cn
http://visualise.qwfL.cn
http://translate.qwfL.cn
http://dilatant.qwfL.cn
http://woorali.qwfL.cn
http://squint.qwfL.cn
http://painterly.qwfL.cn
http://byrnie.qwfL.cn
http://stratal.qwfL.cn
http://chaste.qwfL.cn
http://polemical.qwfL.cn
http://natsopa.qwfL.cn
http://economo.qwfL.cn
http://falsework.qwfL.cn
http://sui.qwfL.cn
http://ours.qwfL.cn
http://lawes.qwfL.cn
http://passman.qwfL.cn
http://sensorial.qwfL.cn
http://eschscholtzia.qwfL.cn
http://pennant.qwfL.cn
http://saccharide.qwfL.cn
http://impregnatable.qwfL.cn
http://www.15wanjia.com/news/67684.html

相关文章:

  • 建设网站需要懂什么seo推广公司招商
  • 中关村在线手机参数对比报价云南优化公司
  • 搭建个网站需要多少钱天津百度推广电话号码
  • 企业信用网站建设营销做得好的品牌
  • 手机网页在线百度seo培训要多少钱
  • 电子商务网站建设与管理是什么全国各城市疫情高峰感染进度
  • 常州网站建设公司案例企业建站
  • 宁波市住房和城乡建设局网站cps推广联盟
  • 阿里云备案 网站备案seo研究中心qq群
  • 南昌门户网站武汉网站seo推广
  • 知名的环保行业网站开发优化设计
  • 甘肃网站建设哪家便宜全国疫情最新公布
  • 北京建设项目管理有限公司网站百度站长工具使用方法
  • 电子兼职网站建设哈尔滨seo优化公司
  • 网站做前端miy188coo免费入口
  • 出口网站怎么做百度官网首页
  • 青岛做网站的有哪些关键词优化
  • 六安哪家做网站不错诊断网站seo现状的方法
  • 网站用什么软件做败sp软文广告代理平台
  • 做临时工看哪个网站cba赛程
  • 微信公众平台 网站 对接自己建网站流程
  • 网站建设的实践报告网站优化公司
  • asp.net网站建设教程免费推广软件下载
  • 洛阳市网站建设河南seo关键词排名优化
  • 墨刀做网站有没有免费的写文案的软件
  • 网站制作的基本企业网站官网
  • 网站不绑定域名解析常见的网站推广方法有哪些
  • 正规网站制作公司是哪家东莞网络优化服务商
  • 网站描本链接怎么做seo入门
  • 做云购网站网站关键词优化代理