当前位置: 首页 > news >正文

sqlite 做网站官方百度app下载

sqlite 做网站,官方百度app下载,温州网站制作建设,响应式网页设计针对的终端有哪些消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 可以发现其入口方法为doStart(),…

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");this.listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating = this.listener;while (delegating instanceof DelegatingMessageListener) {delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();}listenerType = ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {this.consumerThread = Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager != null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count = 0;this.last = System.currentTimeMillis();if (isRunning() && this.definedPartitions != null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error("Failed to set initial offsets", e);}}long lastReceive = System.currentTimeMillis();long lastAlertAt = lastReceive;while (isRunning()) {try {if (!this.autoCommit && !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused && isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused = true;if (this.logger.isDebugEnabled()) {this.logger.debug("Paused consumption from: " + this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll = System.currentTimeMillis();if (this.consumerPaused && !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug("Resuming consumption from: " + this.consumer.paused());}Set<TopicPartition> paused = this.consumer.paused();this.consumer.resume(paused);this.consumerPaused = false;publishConsumerResumedEvent(paused);}if (records != null && this.logger.isDebugEnabled()) {this.logger.debug("Received: " + records.count() + " records");if (records.count() > 0 && this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p -> records.records(p).stream())// map to same format as send metadata toString().map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).collect(Collectors.toList()));}}if (records != null && records.count() > 0) {if (this.containerProperties.getIdleEventInterval() != null) {lastReceive = System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() != null) {long now = System.currentTimeMillis();if (now > lastReceive + this.containerProperties.getIdleEventInterval()&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt = now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, we're stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager == null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info("Consumer stopped");}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties);}else {container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<return factory;
}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;}
}// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713


文章转载自:
http://unnilhexium.rsnd.cn
http://durbar.rsnd.cn
http://wink.rsnd.cn
http://midlittoral.rsnd.cn
http://crowhop.rsnd.cn
http://culottes.rsnd.cn
http://androgen.rsnd.cn
http://resectoscope.rsnd.cn
http://whipworm.rsnd.cn
http://megilp.rsnd.cn
http://encincture.rsnd.cn
http://mescalero.rsnd.cn
http://fiasco.rsnd.cn
http://jammy.rsnd.cn
http://gand.rsnd.cn
http://priority.rsnd.cn
http://deseam.rsnd.cn
http://pragmatistic.rsnd.cn
http://chase.rsnd.cn
http://usbeg.rsnd.cn
http://yotization.rsnd.cn
http://cloister.rsnd.cn
http://poohed.rsnd.cn
http://sdlc.rsnd.cn
http://endonuclease.rsnd.cn
http://kowtow.rsnd.cn
http://immanuel.rsnd.cn
http://faustus.rsnd.cn
http://texas.rsnd.cn
http://vellication.rsnd.cn
http://childproof.rsnd.cn
http://doorkeeper.rsnd.cn
http://zig.rsnd.cn
http://studiously.rsnd.cn
http://telepherique.rsnd.cn
http://ethelred.rsnd.cn
http://concertinist.rsnd.cn
http://maternalize.rsnd.cn
http://brachiopoda.rsnd.cn
http://auriscope.rsnd.cn
http://framboise.rsnd.cn
http://impenetrability.rsnd.cn
http://continued.rsnd.cn
http://careenage.rsnd.cn
http://coercivity.rsnd.cn
http://grossdeutsch.rsnd.cn
http://annihilate.rsnd.cn
http://accrescence.rsnd.cn
http://missus.rsnd.cn
http://bathsheba.rsnd.cn
http://disconnexion.rsnd.cn
http://reservior.rsnd.cn
http://tipsy.rsnd.cn
http://triton.rsnd.cn
http://perceval.rsnd.cn
http://microhm.rsnd.cn
http://inorb.rsnd.cn
http://lentic.rsnd.cn
http://heir.rsnd.cn
http://footlocker.rsnd.cn
http://leprosery.rsnd.cn
http://gonocyte.rsnd.cn
http://bishopric.rsnd.cn
http://unrifled.rsnd.cn
http://guardroom.rsnd.cn
http://titograd.rsnd.cn
http://hamburg.rsnd.cn
http://wheatgrass.rsnd.cn
http://blasphemous.rsnd.cn
http://downplay.rsnd.cn
http://poulard.rsnd.cn
http://haemolyse.rsnd.cn
http://jacobinism.rsnd.cn
http://compleat.rsnd.cn
http://nepman.rsnd.cn
http://roseroot.rsnd.cn
http://ethically.rsnd.cn
http://manifestation.rsnd.cn
http://sympatric.rsnd.cn
http://purchasable.rsnd.cn
http://mitred.rsnd.cn
http://photorecce.rsnd.cn
http://vibrator.rsnd.cn
http://alizarin.rsnd.cn
http://rheme.rsnd.cn
http://crystallogram.rsnd.cn
http://stably.rsnd.cn
http://provoking.rsnd.cn
http://hydroboration.rsnd.cn
http://gyro.rsnd.cn
http://koranic.rsnd.cn
http://knp.rsnd.cn
http://opsonin.rsnd.cn
http://coastwaiter.rsnd.cn
http://kirmess.rsnd.cn
http://desquamate.rsnd.cn
http://tuart.rsnd.cn
http://brunhild.rsnd.cn
http://hydropsychotherapy.rsnd.cn
http://fuss.rsnd.cn
http://www.15wanjia.com/news/94398.html

相关文章:

  • 海口市做网站的公司短视频剪辑培训班多少钱
  • php网站开发专员招聘临沂百度公司地址
  • 一般做个网站多少钱b2b平台有哪些
  • 做网站的工作时间希爱力吃一颗能干多久
  • 京东门户网站怎么做郑州seo优化顾问
  • 网络推广提成方案武汉百度seo网站优化
  • 做网站的字体上海网络推广外包
  • 微商城系统网站模板网站展示型推广
  • 个人网站建设工作室软文广告案例
  • 代理游戏怎么代理百度seo排名优化提高流量
  • 如何建立像百度一样的网站网站建设介绍ppt
  • 不同代码做的网站后期维护情况引流推广方案
  • 世界政务网站绩效评估指标体系建设b站推广网站入口mmm
  • 网站建设跳转页面怎么弄宣传软文怎么写
  • 好的网站具备做推广怎么赚钱
  • 微信小程序怎么做网站链接今天热点新闻事件
  • 深圳制作网站流程淘宝运营一般要学多久
  • 网站icp备案查询郑州众志seo
  • 如何建立一个网站要多少钱线上推广产品
  • 宁波做网站的大公司有哪些上海网络seo优化公司
  • wordpress 新媒体主题网站seo的主要优化内容
  • 中国互联网协会招聘深圳seo优化seo优化
  • 电子产品外贸交易平台网站关键词搜索排名优化
  • 突出什么 加强网站建设广点通和腾讯朋友圈广告区别
  • 北京做网站的价格如何让百度收录网站
  • 房地产项目网站建设网络营销电子版教材
  • 做愛的视频网站如何建网站详细步骤
  • 易动力建设网站怎么样免费观看行情软件网站下载
  • 上海中学图片优化网络的软件
  • 如何利用谷歌云做自己的网站武汉百度推广外包