b2b网站大全 网址大全网络营销方案范文
本文介绍Redis pipeline相关的知识点及代码示例,包括Redis客户端-服务端的一次完整的网络请求、pipeline与client执行多命令的区别、pipeline与Redis"事务"、pipeline的使用代码示例;
pipeline与client执行多命令的区别
Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务;Redis客户端-服务端的一次完整的网络请求来回如下图;
简化一下,一次Redis请求和响应,会经历如下的步骤:
- 客户端发起一个(查询/插入)请求,并监听socket返回,通常情况都是阻塞模式等待Redis服务器的响应;
- 服务端处理命令,并且返回处理结果给客户端;
- 客户端接收到服务的返回结果,程序从阻塞代码处返回;
Redis客户端和服务端之间通过网络连接进行数据传输,这个连接可以很快(loopback接口)或很慢(建立了一个多次跳转的网络连接);但无论网络延如何延时,数据包总是能从客户端到达服务器,并从服务器返回数据回复客户端,这个时间被称之为RTT(Round Trip Time - 往返时间);
我们可以很容易就意识到,Redis在连续请求服务端时,即使Redis每秒能处理100k请求,但也会因为网络传输花费大量时间,导致整体性能的下降;
因此如果遇到大量的批处理,我们可以考虑使用Redis的pipeline(管道);
对于pipeline技术而言,对于N个命令,就相当于将N个上图中的步骤,合并成1个,其他多余的时间开销仅作用于命令的执行,这样服务请求响应的总体时间将会大大的减少;
关于pipeline与client单命令的压测结果可参考Redis精通系列——Pipeline(管道);
值得注意的是,管道技术并不是Redis特有的技术,管道技术往往需要客户端-服务器的共同配合,大部分工作任务其实是在客户端完成;Redis在较早的版本就已经支持管道技术;
如下图,多个连续的incr指令,使用pipeline(管道)后,多个连续的incr指令只会花费一次网络来回开销;这个开销会随着N数值的增大,大幅减少网络IO开销,从而提升整体服务的性能;
Redis pipeline的使用注意事项
1. pipeline一次执行的命令不宜过多
结合上面redis命令完整执行流程图,有个值得注意的点——可能出现我们经常说到的IO阻塞:
- 当write操作发生,并且发送缓冲区(send buffer)满时,就会导致write操作阻塞;
- 当read操作发生,并且接收缓冲区(recv buffer)满时,就会导致read操作阻塞;
上述的这两个阻塞如果出现,将会导致整个请求时间变长;
因此我们操作大批量指令的时候,比如10k个指令,我们可以合理的对指令分多次批量发送,这样可以减少出现阻塞的情况,也可以避免服务器响应一个过大的response包,导致客户端内存负载过重;
即使不发生IO阻塞,pipeline每批打包的命令不能过多还有一个原因:因为 pipeline 方式打包命令再发送,那么 redis server 必须在处理完所有命令前,先缓存起所有命令的处理结果,这样就有一个缓存结果的内存的消耗;
2. pipeline不保证命令执行的原子性
官方文档的一句话——
Redis::PIPELINE block is simply transmitted faster to the server, but without any guarantee of atomicity.
其实Redis的高性能设计本就不支持包含多命令的严格事务,哪怕是multi/exec操作还是Lua脚本;Redis只是提供了一些命令来一定程度实现"事务";
multi/exec操作针对命令语法错误和执行时参数错误,处理是不一样的,详情见我之前的文章《Redis——“事务“/Lua脚本》;
Lua脚本也只能一定程度保证逻辑处理和Redis命令打包的原子性,例如库存扣减;
pipeline的使用代码示例
代码示例的背景是:根据appId批量查询本地缓存的App信息,未命中本地缓存的,需要从redis中拿这多个key的value,然后刷入本地缓存;在"从redis中拿这多个key的value",在key数量较多时,做个优化,使用Redis的pipeline;
private List<AppSimpleInfoDTO> querySimpleAppWithCache(List<String> payAppIds) {if (CollectionUtils.isEmpty(payAppIds)) {return Lists.newArrayList();}List<AppSimpleInfoDTO> appList = Lists.newArrayList();try {// 从本地缓存读取APPfinal List<String> tobeQry = Lists.newArrayList();for (String payAppId : payAppIds) {AppSimpleInfoDTO appSimpleInfoDTO = null;appSimpleInfoDTO = appSimpleInfoCache.getIfPresent(payAppId);if (appSimpleInfoDTO != null) {log.info("get_appSimpleInfoDTO_fr_appSimpleInfoCache_suc. [appSimpleInfoDTO={}]", JSON.toJSONString(appSimpleInfoDTO));appList.add(appSimpleInfoDTO);} else {tobeQry.add(payAppId);}}// 未命中本地缓存则去redis查询if (CollectionUtils.isNotEmpty(tobeQry)) {final List<AppSimpleInfoDTO> cacheAppSimpleInfoDTOs = getAndCacheAppSimpleInfoDTOs(tobeQry);if (CollectionUtils.isNotEmpty(cacheAppSimpleInfoDTOs)) {appList.addAll(cacheAppSimpleInfoDTOs);}}return appList;} catch (Exception e) {log.error("querySimpleAppWithCache error.", e);// 异常时刷全量缓存return getAndCacheAppSimpleInfoDTOs(payAppIds);}}
使用pipeline做多个key的get命令:
private List<AppSimpleInfoDTO> getAndCacheAppSimpleInfoDTOs(List<String> payAppIds) {final Set<String> payAppIds2Qry = new HashSet<>(payAppIds);final List<AppSimpleInfoDTO> result = Lists.newArrayList();// 先尝试从redis获取 pipeline模式JedisClusterPipeLine pipeline = null;try {pipeline = jedisCluster.pipelined();for (String payAppId : payAppIds) {final String appSimpleInfoKey = CacheKeyUtils.getAppSimpleInfoKey(payAppId);// pipeline添加get命令pipeline.get(appSimpleInfoKey);}// pipeline执行并获取结果final List<Object> allVal = pipeline.syncAndReturnAll();if (CollectionUtils.isNotEmpty(allVal)) {allVal.forEach(val -> {if (val != null) {final String jsonStr = String.valueOf(val);if (StringUtils.isNotBlank(jsonStr)) {Optional.ofNullable(JSON.parseObject(jsonStr, AppSimpleInfoDTO.class)).ifPresent(appSimpleInfoDTO -> {result.add(appSimpleInfoDTO);appSimpleInfoCache.put(appSimpleInfoDTO.getPayAppId(), appSimpleInfoDTO);log.info("localCache_AppSimpleInfoDTO_fr_redis_suc. [simpleApp={}]", JSON.toJSONString(appSimpleInfoDTO));payAppIds2Qry.remove(appSimpleInfoDTO.getPayAppId());});}}});}} catch (Exception e) {log.error("localCache_AppSimpleInfoDTOs_fr_redis_error. [payAppIds={}]", JSON.toJSONString(payAppIds), e);} finally {if (pipeline != null) {pipeline.close();}}// redis未查到的数据走RPC查询 并异步加载到redis和localCacheif (CollectionUtils.isNotEmpty(payAppIds2Qry)) {final List<String> payAppIds2QryList = new ArrayList<>(payAppIds2Qry);List<App> apps = Lists.newArrayList();int index = 0;while (index < payAppIds2QryList.size()) {int toIndex = Math.min(index + max_batch, payAppIds2QryList.size());List<String> payAppIds2QryTemp = payAppIds2QryList.subList(index, toIndex);// 实时查APP信息接口List<App> appsTemp = queryAppByPayAppIds(new ArrayList<>(payAppIds2QryTemp));apps.addAll(appsTemp);index += max_batch;}// 异步加载到redis和localCacheif (CollectionUtils.isNotEmpty(apps)) {for (App app : apps) {AppSimpleInfoDTO simpleApp = new AppSimpleInfoDTO(app.getName(), app.getPackname(), app.getCode(), app.getBigType());CompletableFuture.runAsync(() -> {// redis缓存1小时jedisClusterTemplate.setex(CacheKeyUtils.getAppSimpleInfoKey(simpleApp.getPayAppId()), VivoConfigManager.getInteger(JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL, JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL_DEFAULT), JSON.toJSONString(simpleApp));log.info("redisCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));// local cache 缓存5分钟appSimpleInfoCache.put(simpleApp.getPayAppId(), simpleApp);log.info("localCache_AppSimpleInfoDTO_suc. [simpleApp={}]", JSON.toJSONString(simpleApp));});result.add(simpleApp);}}}// 重新排序final Map<String, AppSimpleInfoDTO> payAppIdMap = result.stream().collect(Collectors.toMap(AppSimpleInfoDTO::getPayAppId, Function.identity(), (old, newly) -> newly));result.clear();payAppIds.forEach(payAppId -> Optional.ofNullable(payAppIdMap.get(payAppId)).ifPresent(result::add));return result;}
本文参考:
Redis精通系列——Pipeline(管道)