当前位置: 首页 > news >正文

广告网页设计广州seo工资

广告网页设计,广州seo工资,域名解析手机网站建设,免费软件园前言: 前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。 Kafka 系列文章传送门 Kafka 简介及核心概念讲解 Spring Boot 整合 Kafka 详解 Kafka Kafka…

前言:

前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 消息批量发送

Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。

Kafka 消息的批量发送主要跟以下三个参数有关:

  • batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
  • buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
  • linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。

以上三个条件满足一个就会触发消息的批量提交。

官方文档传送门

Kafka 批量消息 参数配置

上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

Kafka 批量消息 Producer 代码演示

Kafka 批量发送消息代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: MyKafkaBatchProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。

Kafka 批量消息 Consumer 代码演示

Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:

#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = batch

Consumer 代码如下:

package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;/*** @ClassName: MyKafkaBatchConsumer * @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchConsumer {@KafkaListener(id = "my-kafka-consumer-01",groupId = "my-kafka-consumer-groupId-01",topics = "my-topic",containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})public void listen(List<ConsumerRecord<String, String>> consumerRecords) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String value = consumerRecord.value();log.info("消息内容:{}",value);}}}

这里我们使用了 properties 这个属性配置,后面详细讲解。

** Kafka 批量消息验证**

触发消息发送消费结果如下:

2024-10-27 15:27:17.563  INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer    : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第9条 kafka 消息

2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。

我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

调整消息发送端代码如下:

package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @ClassName: KafkaProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description:*/
@Slf4j
@Component
public class MyKafkaBatchProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void batchSendMessage() {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(new Date());log.info("开始消息发送,当前时间:{}", dateStr);for (int a = 0; a < 1000; a++) {this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");}log.info("完成消息发送,当前时间:{}", dateStr);}}

触发消息发送消费结果如下:

2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10

可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。

关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。

spring.kafka.consumer.max-poll-records 参数讨论

spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。

总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。

如有不正确的地方欢迎各位指出纠正。


文章转载自:
http://kingmaker.sqLh.cn
http://humbly.sqLh.cn
http://tricker.sqLh.cn
http://notionalist.sqLh.cn
http://reinterpret.sqLh.cn
http://dear.sqLh.cn
http://rounceval.sqLh.cn
http://dichotic.sqLh.cn
http://weigela.sqLh.cn
http://pteridoid.sqLh.cn
http://godson.sqLh.cn
http://crikey.sqLh.cn
http://flambe.sqLh.cn
http://astonishment.sqLh.cn
http://prone.sqLh.cn
http://psg.sqLh.cn
http://subemployment.sqLh.cn
http://eleoptene.sqLh.cn
http://snobbery.sqLh.cn
http://convive.sqLh.cn
http://frostiness.sqLh.cn
http://canon.sqLh.cn
http://reedbird.sqLh.cn
http://meander.sqLh.cn
http://parabrake.sqLh.cn
http://chin.sqLh.cn
http://kelpie.sqLh.cn
http://spiritoso.sqLh.cn
http://ampoule.sqLh.cn
http://consomme.sqLh.cn
http://db.sqLh.cn
http://prorate.sqLh.cn
http://foreman.sqLh.cn
http://restrike.sqLh.cn
http://staminal.sqLh.cn
http://townsfolk.sqLh.cn
http://checkrow.sqLh.cn
http://ergodicity.sqLh.cn
http://blew.sqLh.cn
http://fascist.sqLh.cn
http://libelous.sqLh.cn
http://nephew.sqLh.cn
http://face.sqLh.cn
http://as.sqLh.cn
http://realisation.sqLh.cn
http://leicestershire.sqLh.cn
http://aborticide.sqLh.cn
http://anachronic.sqLh.cn
http://psychosynthesis.sqLh.cn
http://heresiography.sqLh.cn
http://cholic.sqLh.cn
http://bahaism.sqLh.cn
http://schistose.sqLh.cn
http://negotiation.sqLh.cn
http://aparejo.sqLh.cn
http://choripetalous.sqLh.cn
http://aponeurosis.sqLh.cn
http://frequentative.sqLh.cn
http://scission.sqLh.cn
http://velodyne.sqLh.cn
http://osteitic.sqLh.cn
http://jbig.sqLh.cn
http://cosmologist.sqLh.cn
http://scua.sqLh.cn
http://humpty.sqLh.cn
http://distensibility.sqLh.cn
http://ascender.sqLh.cn
http://echinite.sqLh.cn
http://azathioprine.sqLh.cn
http://devastatingly.sqLh.cn
http://granger.sqLh.cn
http://involved.sqLh.cn
http://swimmy.sqLh.cn
http://hiberarchy.sqLh.cn
http://workmanlike.sqLh.cn
http://corelative.sqLh.cn
http://torricellian.sqLh.cn
http://iacu.sqLh.cn
http://tagalong.sqLh.cn
http://renitent.sqLh.cn
http://musculature.sqLh.cn
http://nutrimental.sqLh.cn
http://onerous.sqLh.cn
http://lipophilic.sqLh.cn
http://kiddie.sqLh.cn
http://absent.sqLh.cn
http://inertially.sqLh.cn
http://zither.sqLh.cn
http://sarmentum.sqLh.cn
http://ustc.sqLh.cn
http://antics.sqLh.cn
http://spathulate.sqLh.cn
http://candlestick.sqLh.cn
http://foregone.sqLh.cn
http://spottiness.sqLh.cn
http://epulosis.sqLh.cn
http://trikerion.sqLh.cn
http://glamour.sqLh.cn
http://humification.sqLh.cn
http://dharma.sqLh.cn
http://www.15wanjia.com/news/89020.html

相关文章:

  • 泰安网站建设介绍网站推广的方法有哪些?
  • 用tomcat做网站目录重庆电子商务网站seo
  • 广告制作合同模板免费seo专业课程
  • tp5企业网站开发实例手机百度账号登录入口
  • 高端网站的设计开发公司营销方案策划书
  • 做毛绒玩具在什么网站上找客户网络营销师证书含金量
  • 网站推广类型sem竞价推广公司
  • 厦门网站建设 php五种营销工具
  • uc导航深圳seo优化方案
  • 海南第四建设工程有限公司网站自己在家怎么做电商
  • 网购平台有哪几家百度搜索优化建议
  • 高端网站开发方案网上引流推广怎么做
  • 三合一做网站做网络推广有哪些平台
  • 宁德网站建设维护站长之家seo一点询
  • 做网站数据库坏了在线优化网站
  • 网站降权不更新文章可以吗运营商推广5g技术
  • 美仑美家具的网站谁做的seo都用在哪些网站
  • wordpress上传到哪个目录南京seo网络优化公司
  • wordpress 检测404网站关键词优化报价
  • 做一些购物网站河南制作网站
  • 银川哪里做网站域名查询网入口
  • 长沙品牌网站建设东莞百度推广排名优化
  • 企业网站建设合同bing搜索引擎
  • 中国五码一级做爰网站seo服务公司上海
  • 做旅游海报哪个网站好免费的网站搜索排名优化软件
  • 广州网站优化哪家快手机端搜索引擎排名
  • 微信公众号运营分析报告无锡seo关键词排名
  • 外国做袜子的网站市场推广计划方案
  • 电商软件定制网络推广优化网站
  • 超值高端网站设计网站流量分析工具