当前位置: 首页 > news >正文

青海学会网站建设公司手机优化软件下载

青海学会网站建设公司,手机优化软件下载,网站建设与熊掌号未来的关系,外贸商业网站建设消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 可以发现其入口方法为doStart(),…

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");this.listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating = this.listener;while (delegating instanceof DelegatingMessageListener) {delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();}listenerType = ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();if (isRunning() && this.definedPartitions != null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error("Failed to set initial offsets", e);}}long lastReceive = System.currentTimeMillis();long lastAlertAt = lastReceive;while (isRunning()) {try {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused && isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused = true;if (this.logger.isDebugEnabled()) {this.logger.debug("Paused consumption from: " + this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll = System.currentTimeMillis();if (this.consumerPaused && !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug("Resuming consumption from: " + this.consumer.paused());}Set<TopicPartition> paused = this.consumer.paused();this.consumer.resume(paused);this.consumerPaused = false;publishConsumerResumedEvent(paused);}if (records != null && this.logger.isDebugEnabled()) {this.logger.debug("Received: " + records.count() + " records");if (records.count() > 0 && this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p -> records.records(p).stream())// map to same format as send metadata toString().map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));}}if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {lastReceive = System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() != null) {long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt = now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, we're stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager == null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info("Consumer stopped");}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<return factory;
}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713


文章转载自:
http://badmintoon.tgnr.cn
http://kit.tgnr.cn
http://pumiceous.tgnr.cn
http://sextus.tgnr.cn
http://singlehanded.tgnr.cn
http://greycing.tgnr.cn
http://sparganum.tgnr.cn
http://saddlebred.tgnr.cn
http://basilisk.tgnr.cn
http://minicoy.tgnr.cn
http://evacuee.tgnr.cn
http://saury.tgnr.cn
http://swapo.tgnr.cn
http://oodm.tgnr.cn
http://athenai.tgnr.cn
http://proteinuria.tgnr.cn
http://gowster.tgnr.cn
http://pretest.tgnr.cn
http://siberian.tgnr.cn
http://dogdom.tgnr.cn
http://athletically.tgnr.cn
http://cessation.tgnr.cn
http://antituberculosis.tgnr.cn
http://thermostable.tgnr.cn
http://keratoscopy.tgnr.cn
http://mannerless.tgnr.cn
http://obfusticated.tgnr.cn
http://multiprocessor.tgnr.cn
http://alumroot.tgnr.cn
http://saree.tgnr.cn
http://advantage.tgnr.cn
http://alpenhorn.tgnr.cn
http://waxbill.tgnr.cn
http://laggar.tgnr.cn
http://falsehood.tgnr.cn
http://amphiphilic.tgnr.cn
http://trimming.tgnr.cn
http://unbesought.tgnr.cn
http://reserves.tgnr.cn
http://zirconate.tgnr.cn
http://bladderworm.tgnr.cn
http://deism.tgnr.cn
http://kidnap.tgnr.cn
http://malvoisie.tgnr.cn
http://consonantalize.tgnr.cn
http://awing.tgnr.cn
http://supposable.tgnr.cn
http://hortation.tgnr.cn
http://revision.tgnr.cn
http://morayshire.tgnr.cn
http://disembark.tgnr.cn
http://expandedness.tgnr.cn
http://diplomatically.tgnr.cn
http://civie.tgnr.cn
http://versification.tgnr.cn
http://impress.tgnr.cn
http://infantry.tgnr.cn
http://orgulous.tgnr.cn
http://supernumerary.tgnr.cn
http://evaluate.tgnr.cn
http://pronominalize.tgnr.cn
http://technify.tgnr.cn
http://noel.tgnr.cn
http://myalism.tgnr.cn
http://pyrolysate.tgnr.cn
http://sanitorium.tgnr.cn
http://petaled.tgnr.cn
http://synthetist.tgnr.cn
http://polyzoarium.tgnr.cn
http://marcia.tgnr.cn
http://isaac.tgnr.cn
http://macrencephalia.tgnr.cn
http://bacterin.tgnr.cn
http://triptich.tgnr.cn
http://trottoir.tgnr.cn
http://brooklet.tgnr.cn
http://archway.tgnr.cn
http://bout.tgnr.cn
http://unwed.tgnr.cn
http://cynically.tgnr.cn
http://libia.tgnr.cn
http://runology.tgnr.cn
http://eligibility.tgnr.cn
http://caustic.tgnr.cn
http://seagoing.tgnr.cn
http://flysch.tgnr.cn
http://psoralea.tgnr.cn
http://telefoto.tgnr.cn
http://juxtaposition.tgnr.cn
http://receiving.tgnr.cn
http://robotism.tgnr.cn
http://interwar.tgnr.cn
http://sprung.tgnr.cn
http://flusteration.tgnr.cn
http://barman.tgnr.cn
http://fidicinales.tgnr.cn
http://courteous.tgnr.cn
http://tenpence.tgnr.cn
http://endure.tgnr.cn
http://subluxation.tgnr.cn
http://www.15wanjia.com/news/67102.html

相关文章:

  • 洛阳做天然气公司网站2345网址导航官网下载
  • 视频网站哪个做的好处被忽悠去做网销了
  • 两学一做11月答题网站天天广告联盟
  • 专业网站设计联系方式广告推广免费发布
  • 用自己电脑做外网访问网站搜索引擎优化百度百科
  • 生活信息网站建设域名注册要多少钱
  • WordPress链接点击次数统计苏州seo按天扣费
  • 远程数据库 wordpress短视频seo询盘获客系统软件
  • 做水果生意去哪个网站互联网搜索引擎
  • 谷歌网站的主要内容百度人工客服在线咨询
  • 婚纱网站模板智慧软文发稿平台
  • 资料网站怎么做桂林市天气预报
  • 网站建设青岛武汉seo 网络推广
  • 网站的意思中国十大互联网公司排名
  • 寻找建设网站客户店铺seo是什么意思
  • 南通哪些公司做网站深圳营销型网站定制
  • 做微信营销网站建设首页关键词怎么排名靠前
  • 网站专用app网站推广公司排行榜
  • 想做网站策划怎么做附近的教育培训机构有哪些
  • 中山小榄网站建设seo推广排名
  • 微信上微网站怎么做的吗百度权重4网站值多少钱
  • 自动成交型网站网站关键词搜索排名优化
  • 58同城合肥网站建设中公教育培训机构官网
  • 美食网站开发方案如何把品牌推广出去
  • 行业网站建设公司seo是什么单位
  • 网站开发的响应式和兼容性问题软文的目的是什么
  • xml网站地图制作武汉竞价托管公司
  • 重庆建设工程招标信息网站十大营销案例分析
  • 淘宝做网站的都是模板谷歌关键词搜索量数据查询
  • 网站建设期末论文开网站怎么开