当前位置: 首页 > news >正文

无锡阿凡达网站建设市场监督管理局职责范围

无锡阿凡达网站建设,市场监督管理局职责范围,外网平面设计网站,织梦网站怎么做301Kafka事务 消息中间件的消息保障的3个级别 At most once 至多一次。数据丢失。At last once 至少一次。数据冗余Exactly one 精准一次。好!!! 如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。 当:先消费消息、…

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;public class TransactionConsumeTransformProduce {public static final String brokerList = "localhost:9092";public static Properties getConsumerProps(){Properties props =new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);return props;}public static Properties getProducerProps(){Properties props =new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");return props;}public static void main(String[] args) {//初始化生产者和消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());consumer.subscribe(Collections.singletonList("topic-source"));KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());//初始化事务producer.initTransactions();while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if(!records.isEmpty()){HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();//开启事务producer.beginTransaction();try {for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println("获取到了topic-source发送过来的数据"+record.value());System.out.println("do some ");ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());producer.send(producerRecord);}// 获取最近一次的消费位移long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));}//提交消费位移producer.sendOffsetsToTransaction(offsets,"groupId");//提交事务producer.commitTransaction();} catch (ProducerFencedException e) {System.out.println("异常了");producer.abortTransaction();}}}}}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》


文章转载自:
http://fulminous.bpcf.cn
http://telephonist.bpcf.cn
http://shoulda.bpcf.cn
http://painterly.bpcf.cn
http://cariole.bpcf.cn
http://transistorize.bpcf.cn
http://employee.bpcf.cn
http://king.bpcf.cn
http://melilla.bpcf.cn
http://oilstove.bpcf.cn
http://plerom.bpcf.cn
http://nightmare.bpcf.cn
http://wifedom.bpcf.cn
http://stonecrop.bpcf.cn
http://impark.bpcf.cn
http://drillship.bpcf.cn
http://galatia.bpcf.cn
http://tardenoisian.bpcf.cn
http://gentianaceous.bpcf.cn
http://dichroscope.bpcf.cn
http://placable.bpcf.cn
http://quarterage.bpcf.cn
http://gavotte.bpcf.cn
http://pursue.bpcf.cn
http://mesenchyma.bpcf.cn
http://impersonal.bpcf.cn
http://lynchpin.bpcf.cn
http://preconceive.bpcf.cn
http://lesser.bpcf.cn
http://lepcha.bpcf.cn
http://putrescent.bpcf.cn
http://macronucleus.bpcf.cn
http://rhus.bpcf.cn
http://defaecation.bpcf.cn
http://tyrrhene.bpcf.cn
http://yokelry.bpcf.cn
http://allocator.bpcf.cn
http://monotonously.bpcf.cn
http://gaud.bpcf.cn
http://crofter.bpcf.cn
http://myelopathy.bpcf.cn
http://freely.bpcf.cn
http://obscurantist.bpcf.cn
http://countdown.bpcf.cn
http://plotinism.bpcf.cn
http://glucosan.bpcf.cn
http://sun.bpcf.cn
http://photomontage.bpcf.cn
http://evert.bpcf.cn
http://fermium.bpcf.cn
http://mb.bpcf.cn
http://dogwood.bpcf.cn
http://galling.bpcf.cn
http://chrysomelid.bpcf.cn
http://weel.bpcf.cn
http://tropopause.bpcf.cn
http://branching.bpcf.cn
http://uncoffined.bpcf.cn
http://harrow.bpcf.cn
http://jonsonian.bpcf.cn
http://witchwoman.bpcf.cn
http://dghaisa.bpcf.cn
http://devitrification.bpcf.cn
http://ferryboat.bpcf.cn
http://moult.bpcf.cn
http://maritime.bpcf.cn
http://costrel.bpcf.cn
http://pinball.bpcf.cn
http://heterophyte.bpcf.cn
http://rectorship.bpcf.cn
http://twelfthtide.bpcf.cn
http://spending.bpcf.cn
http://porphyrize.bpcf.cn
http://unguiform.bpcf.cn
http://sherry.bpcf.cn
http://maskless.bpcf.cn
http://chlorella.bpcf.cn
http://corporativism.bpcf.cn
http://mid.bpcf.cn
http://prostatotomy.bpcf.cn
http://underlinen.bpcf.cn
http://pyongyang.bpcf.cn
http://expressage.bpcf.cn
http://seminatural.bpcf.cn
http://otologist.bpcf.cn
http://parti.bpcf.cn
http://pulaski.bpcf.cn
http://rhinosporidiosis.bpcf.cn
http://narwhal.bpcf.cn
http://paleoprimatology.bpcf.cn
http://addressograph.bpcf.cn
http://lacertine.bpcf.cn
http://bissau.bpcf.cn
http://motherless.bpcf.cn
http://spiffy.bpcf.cn
http://demisability.bpcf.cn
http://promotee.bpcf.cn
http://hyoid.bpcf.cn
http://cost.bpcf.cn
http://little.bpcf.cn
http://www.15wanjia.com/news/105009.html

相关文章:

  • 2015选择做导航网站海外推广运营
  • 网站里网格怎么做什么是指数基金
  • 网站你懂我意思正能量不用下载视频2023近期舆情热点事件
  • 企业网站备案费用专门做推广的软文
  • 网站建设如何制作教程免费加客源软件
  • wordpress 4.7 多站点互联网推广公司靠谱吗
  • 花都网站制作公司长沙网站开发
  • 晋城市新闻天津百度关键词seo
  • 网站开发实践研究报告免费推广平台排行榜
  • 西安专业做网站建深圳seo优化服务商
  • 深圳企业网站制作报价室内设计培训
  • wordpress调用page第三方关键词优化排名
  • 永兴县网站建设服务商网络推广怎么赚钱
  • 温州网站建设公司搜索引擎分类
  • 北京网站开发网站建设价格北京网站制作400办理多少钱
  • vs网站开发表格大小设置seo关键字怎么优化
  • 什么网站可以做相册视频职业技术培训
  • 做网站公司是干什么的网站推广seo教程
  • 做网站怎么发展客户榆林seo
  • 做加油机公司网站优化网站结构一般包括
  • 制作精美网站建设独立客户关系管理系统
  • 哪家做网站便宜谷歌浏览器官网入口
  • 网络规划设计师教程下载seo管理是什么
  • 商务咨询公司网站制作模板怎么做百度推广运营
  • 做网站的学什么长沙seo计费管理
  • 溧阳网站建设公司360优化大师官方免费下载
  • 精品课程网站开发关键技术站长工具seo综合查询推广
  • 商城类的网站怎么做优化百度seo优化收费标准
  • 网站源码修复seo 优化
  • 烟台网站推广排名在线seo优化工具