当前位置: 首页 > news >正文

网站网络推广排名前50名免费的网站

网站网络推广,排名前50名免费的网站,wordpress h5制作插件,东莞公司网站制作目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…

目录

1、核心概念

消息和批次

Topic和Partition

Replicas

Offset

broker和集群

生产者和消费者

2、开发实战

2.1、消息发送

介绍

代码实现

2.2、消息消费

介绍

代码实现

2.3、SpringBoot Kafka

pom

application.yaml

KafkaConfig

producer

consumer


1、核心概念

消息和批次

        kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。

        批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。

Topic和Partition

        topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。

        partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)

        主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。

        主题的分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

Replicas

  1. 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
  2. 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
  3. 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
  4. 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。

Offset

        偏移量。

        生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。

        消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。

broker和集群

broker为一个独立的kafka服务器;一个kafka集群里有多个broker。

        broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。

        集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。

        每个集群都有一个broker是集群控制器,作用如下:

  1. 将分区分配给首领分区的broker;
  2. 监控broker,首领分区切换

生产者和消费者

        生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

        消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。

2、开发实战

2.1、消息发送

介绍

  • 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
  • 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。        

代码实现

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {Map<String, Object> configs = new HashMap<>();// 设置连接Kafka的初始连接⽤到的服务器地址// 如果是集群,则可以通过此初始连接发现集群中的其他brokerconfigs.put("bootstrap.servers", "node1:9092");// 设置key和value的序列化器configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于封装Producer的消息ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1", // 主题名称0, // 分区编号,现在只有⼀个分区,所以是00, // 数字作为key"message 0" // 字符串作为value);// 发送消息,同步等待消息的确认// producer.send(record).get(3_000, TimeUnit.MILLISECONDS);// 使用回调异步等待消息的确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("主题:" + metadata.topic() + "\n"+ "分区:" + metadata.partition() + "\n"+ "偏移量:" + metadata.offset() + "\n"+ "序列化的key字节:" + metadata.serializedKeySize() + "\n"+ "序列化的value字节:" + metadata.serializedValueSize() + "\n"+ "时间戳:" + metadata.timestamp());} else {System.out.println("有异常:" + exception.getMessage());}}});// 关闭连接producer.close();
}

2.2、消息消费

介绍

        消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

代码实现

public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。configs.put("bootstrap.servers", "node1:9092");// key和value的反序列化器configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "consumer.demo");// 创建消费者对象KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);final Pattern pattern = Pattern.compile("topic_[0-9]");// 消费者订阅主题或分区// consumer.subscribe(pattern);// consumer.subscribe(pattern, new ConsumerRebalanceListener() {final List<String> topics = Arrays.asList("topic_1");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println("剥夺的分区:" + tp.partition());});	}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {partitions.forEach(tp -> {System.out.println(tp.partition());});}});// 拉取订阅主题的消息final ConsumerRecords<Integer, String> records = consumer.poll(3_000);// 获取topic_1主题的消息final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");// 遍历topic_1主题的消息topic1Iterable.forEach(record -> {System.out.println("========================================");System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的值:" + record.value());System.out.println("消息的主题:" + record.topic());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的偏移量:" + record.offset());});// 关闭消费者consumer.close();
}

2.3、SpringBoot Kafka

pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

application.yaml

spring:kafka:bootstrap-servers: node1:9092       # 用于建立初始连接的broker地址producer:key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 16384                 # 默认的批处理记录数buffer-memory: 33554432           # 32MB的总发送缓存consumer:key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: spring-kafka-02-consumer    # consumer的消费组idenable-auto-commit: true              # 是否自动提交消费者偏移量auto-commit-interval: 100             # 每隔100ms向broker提交一次偏移量auto-offset-reset: earliest           # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量

KafkaConfig

@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("ntp-01", 5, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("ntp-02", 3, (short) 1);}
}

producer

@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate template;@RequestMapping("send/sync/{message}")public String sendSync(@PathVariable String message) {ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));try {// 同步等待broker的响应Object o = future.get();SendResult<Integer, String> result = (SendResult<Integer, String>) o;System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}
}@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String asyncSend(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);ListenableFuture<SendResult<Integer, String>> future = template.send(record);// 添加回调,异步等待响应future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送失败: " + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());}});return "success";}
}

consumer

@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-02")public void onMessage(ConsumerRecord<Integer, String> record) {Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);if (optional.isPresent()) {System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());}}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

http://www.15wanjia.com/news/33226.html

相关文章:

  • 网站如何做抖音推广志鸿优化设计
  • 有规范上海关键词优化报价
  • 广州网络公司建站seo是指
  • wordpress主题去谷歌字体淄博seo
  • 网站建设公司源码网络推广最好的网站有哪些
  • 杭州建站供应商什么是网站推广策略
  • java企业门库网站开发互联网项目推广
  • 网站制作费用入什么科目查关键词的排名工具
  • cms wordpress 国内湖南网站建设seo
  • 国家反诈中心app下载怎么注册简述网站内容如何优化
  • 在韩国用什么地图导航安卓优化大师老版本
  • 网站建设网站开发seo快速排名优化
  • 做旅游销售网站平台ppt品牌运营中心
  • wordpress 4.5 多站点不同数据百度客户端下载安装
  • 南昌市网站建设推广武汉企业seo推广
  • 大连建网站需要多少钱福州百度网站排名优化
  • 做美食哪些类型网站网络推广主要是做什么工作
  • 个人网站开发用什么语言微信小程序开发教程
  • 顶顶呱网站建设定制开发公司
  • web网站设计的重庆网站seo多少钱
  • 网站更新和维护怎么做广告推广方案怎么写
  • 网站结构合理免费外链网站seo发布
  • 做蛋糕的网站网站建设制作模板
  • 网页制作培训教学seo最好的工具
  • 深圳网站建设公司服务流程快速收录工具
  • iis配置网站开发环境佛山网站建设十年乐云seo
  • wordpress 模版定制seo关键词优化排名推广
  • 帝国cms做动态网站性能如何在线生成个人网站app
  • 公司怎么在网上推广优化大师安卓版
  • 服装商店的网站建设要求总推荐榜总点击榜总排行榜