当前位置: 首页 > news >正文

网站首页建设方案武鸣住房和城乡规划建设局网站

网站首页建设方案,武鸣住房和城乡规划建设局网站,雨花区网站建设,app网站制作要多少费用一、前言 本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,比如动态新增 RabbitMQ 交换机、队列等操作。二、默认RabbitMQ中的exchange、queue动态新增及监听 1、新增RabbitMQ配置 RabbitMQConfig.java import org.springframework.amqp.rabbit.a…

一、前言

本篇主要是围绕着 Spring Boot 3.x 与 RabbitMQ 的动态配置集成,比如动态新增 RabbitMQ 交换机、队列等操作。

二、默认RabbitMQ中的exchange、queue动态新增及监听

1、新增RabbitMQ配置

RabbitMQConfig.java
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @className: RabbitConfig* @program: chain* @description: RabbitMQ 配置类* @author: kenny* @create: 2024-10-03 21:59* @version: 1.0.0*/
@Configuration
@EnableRabbit
public class RabbitMQConfig {/*** 创建 RabbitTemplate, 用于发送消息** @return RabbitTemplate*/@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate();}/*** 创建 RabbitAdmin, 用于创建 Exchange 和 Queue** @param rabbitTemplate RabbitTemplate* @return RabbitAdmin*/@Beanpublic RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {return new RabbitAdmin(rabbitTemplate);}
}

2、新增RabbitMQ动态操作组件

RabbitDynamicConfigService.java
RabbitDynamicConfigService.java 中包含了不同类型Exchange的创建、删除,Queue的创建和删除、绑定Exchange
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Map;/*** @className: RabbitDynamicConfigService* @program: chain* @description: 动态创建队列和交换机* @author: kenny* @create: 2024-10-03 23:49* @version: 1.0.0*/
@Slf4j
@Service
public class RabbitDynamicConfigService {/*** 为了解决循环依赖问题*/private final RabbitAdmin rabbitAdmin;private final RabbitListenerService rabbitListenerService;@Autowiredpublic RabbitDynamicConfigService(RabbitAdmin rabbitAdmin,RabbitListenerService rabbitListenerService) {this.rabbitAdmin = rabbitAdmin;this.rabbitListenerService = rabbitListenerService;}/*** 动态创建队列,并持久化** @param queueName 队列名称*/public void createQueue(String queueName) {// 队列持久化Queue queue = new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println("队列创建成功: " + queueName);}/*** 动态创建队列,并持久化** @param queueName 队列名称*/public void createQueue(String queueName, Boolean isListener) {// 队列持久化Queue queue = new Queue(queueName, true);// 创建队列rabbitAdmin.declareQueue(queue);System.out.println("队列创建成功: " + queueName);if (!isListener) {return;}rabbitListenerService.createListener(queueName);}/*** 动态创建交换机,并持久化** @param exchangeName 交换机名称*/public void createExchange(String exchangeName) {// 交换机持久化DirectExchange exchange = new DirectExchange(exchangeName, true, false);rabbitAdmin.declareExchange(exchange);log.info("交换机创建成功: {}", exchangeName);}// 动态创建 Fanout 交换机public void createDirectExchange(String exchangeName) {DirectExchange fanoutExchange = new DirectExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info("Direct 交换机创建成功: {}", exchangeName);}// 动态创建 Fanout 交换机public void createFanoutExchange(String exchangeName) {FanoutExchange fanoutExchange = new FanoutExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(fanoutExchange);log.info("Fanout 交换机创建成功: {}", exchangeName);}// 动态创建 Topic 交换机public void createTopicExchange(String exchangeName) {TopicExchange topicExchange = new TopicExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(topicExchange);log.info("Topic 交换机创建成功: {}", exchangeName);}// 动态创建 Headers 交换机public void createHeadersExchange(String exchangeName) {HeadersExchange headersExchange = new HeadersExchange(exchangeName, true, false); // 持久化rabbitAdmin.declareExchange(headersExchange);log.info("Headers 交换机创建成功: {}", exchangeName);}/*** 动态绑定队列到交换机,并指定路由键** @param queueName    队列名称* @param exchangeName 交换机名称* @param routingKey   路由键*/public void bindQueueToExchange(String queueName, String exchangeName, String routingKey) {Queue queue = new Queue(queueName);DirectExchange exchange = new DirectExchange(exchangeName);Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);}/*** 动态绑定队列到交换机,并指定路由键** @param queueName    队列名称* @param exchangeName 交换机名称* @param routingKey   路由键*/public void moreExchangeTypeBindQueueToExchange(String queueName, String exchangeType, String exchangeName, String routingKey, Map<String, Object> headers) {switch (exchangeType) {case "fanout" -> bindQueueToExchange(queueName, exchangeName, routingKey);case "direct" -> bindQueueToDirectExchange(queueName, exchangeName, routingKey);case "topic" -> bindQueueToTopicExchange(queueName, exchangeName, routingKey);case "headers" -> bindQueueToHeadersExchange(queueName, exchangeName, headers);default -> throw new IllegalArgumentException("不支持的交换机类型: " + exchangeType);}}/*** 动态绑定队列到交换机,并指定路由键(exchange: direct)** @param queueName    队列名称* @param exchangeName 交换机名称*/public void bindQueueToFanoutExchange(String queueName, String exchangeName) {Queue queue = new Queue(queueName);FanoutExchange exchange = new FanoutExchange(exchangeName);Binding binding = BindingBuilder.bind(queue).to(exchange);rabbitAdmin.declareBinding(binding);log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName);}/*** 动态绑定队列到交换机,并指定路由键(exchange: direct)** @param queueName    队列名称* @param exchangeName 交换机名称* @param routingKey   路由键*/public void bindQueueToDirectExchange(String queueName, String exchangeName, String routingKey) {Queue queue = new Queue(queueName);DirectExchange exchange = new DirectExchange(exchangeName);Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);}/*** 动态绑定队列到交换机,并指定路由键(exchange: topic)** @param queueName    队列名称* @param exchangeName 交换机名称* @param routingKey   路由键*/public void bindQueueToTopicExchange(String queueName, String exchangeName, String routingKey) {Queue queue = new Queue(queueName);TopicExchange exchange = new TopicExchange(exchangeName);Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);rabbitAdmin.declareBinding(binding);log.info("绑定创建成功: {}", queueName + " -> {}", exchangeName + " 使用路由键: {}", routingKey);}/*** 动态绑定队列到交换机,并指定路由键(exchange: headers)** @param queueName    队列名称* @param exchangeName 交换机名称* @param headers      路由键*/public void bindQueueToHeadersExchange(String queueName, String exchangeName, Map<String, Object> headers) {Queue queue = new Queue(queueName);HeadersExchange exchange = new HeadersExchange(exchangeName);Binding binding = BindingBuilder.bind(queue).to(exchange).whereAll(headers).match();rabbitAdmin.declareBinding(binding);log.info("队列 {}", queueName + " 已绑定到 Headers 交换机 {}", exchangeName + ",使用头部匹配规则: {}", headers);}/*** 动态删除队列** @param queueName 队列名称*/public void deleteQueue(String queueName) {rabbitAdmin.deleteQueue(queueName);log.info("队列删除成功: {}", queueName);}/*** 动态删除交换机** @param exchangeName 交换机名称*/public void deleteExchange(String exchangeName) {rabbitAdmin.deleteExchange(exchangeName);log.info("交换机删除成功: {}", exchangeName);}
}

3、RabbitMQ中队列的动态监听

RabbitListenerService.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @className: RabbitListenerService* @program: chain* @description: RabbitMQ监听器Service组件* @author: kenny* @create: 2024-10-04 01:40* @version: 1.0.0*/
@Slf4j
@Service
public class RabbitListenerService {// 为了解决循环依赖问题private final SimpleRabbitListenerContainerFactory listenerContainerFactory;private final ConnectionFactory connectionFactory;@Autowiredpublic RabbitListenerService(SimpleRabbitListenerContainerFactory listenerContainerFactory,ConnectionFactory connectionFactory) {this.listenerContainerFactory = listenerContainerFactory;this.connectionFactory = connectionFactory;}/*** 创建监听器容器并启动监听** @param queueName 队列名称*/public void createListener(String queueName) {// 创建并启动监听器容器SimpleMessageListenerContainer container = listenerContainerFactory.createListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(queueName);// 监听逻辑处理container.setMessageListener(new MessageListenerAdapter(new Object() {public void handleMessage(String message) {System.out.println("收到来自RabbitMQ中队列:" + queueName + " 队列的消息:" + message);}}));// 启动监听器容器container.start();System.out.println("RabbitMQ队列监听器已启动:" + queueName);}
}

4、RabbitMQ中的Exchange、Queue动态操作接口

RabbitDynamicChannelController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;/*** @className: RabbitDynamicController* @program: chain* @description: RabbitMQ 动态创建队列、交换机,绑定等操作* @author: kenny* @create: 2024-10-04 00:22* @version: 1.0.0*/
@RestController
@RequestMapping("/rabbit/dynamic/channel")
public class RabbitDynamicChannelController {/*** 动态创建队列和交换机*/@Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/*** 动态创建队列** @param queueName 队列名称* @return 处理结果*/@GetMapping("/createQueue")public String createQueue(@RequestParam("queueName") String queueName) {rabbitDynamicConfigService.createQueue(queueName);return "队列已创建: " + queueName;}/*** 动态创建交换机** @param exchangeName 交换机名称* @return 处理结果*/@GetMapping("/createExchange")public String createExchange(@RequestParam("exchangeName") String exchangeName) {rabbitDynamicConfigService.createExchange(exchangeName);return "交换机已创建: " + exchangeName;}/*** 动态绑定队列和交换机** @param queueName    队列名称* @param exchangeName 交换机名称* @param routingKey   路由键* @return 处理结果*/@GetMapping("/bindQueue")public String bindQueueToExchange(@RequestParam("queueName") String queueName,@RequestParam("exchangeName") String exchangeName,@RequestParam("routingKey") String routingKey) {rabbitDynamicConfigService.bindQueueToExchange(queueName, exchangeName, routingKey);return "队列和交换机已绑定: " + queueName + " -> " + exchangeName;}/*** 动态删除队列** @param queueName 队列名称* @return 处理结果*/@GetMapping("/deleteQueue")public String deleteQueue(@RequestParam("queueName") String queueName) {rabbitDynamicConfigService.deleteQueue(queueName);return "队列已删除: " + queueName;}/*** 动态删除交换机** @param exchangeName 交换机名称* @return 处理结果*/@GetMapping("/deleteExchange")public String deleteExchange(@RequestParam("exchangeName") String exchangeName) {rabbitDynamicConfigService.deleteExchange(exchangeName);return "交换机已删除: " + exchangeName;}// 创建并绑定 Fanout 交换机@GetMapping("/createDirectExchange")public String createDirectExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {rabbitDynamicConfigService.createDirectExchange(exchangeName);rabbitDynamicConfigService.bindQueueToDirectExchange(queueName, exchangeName, routingKey);return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;}// 创建并绑定 Fanout 交换机@GetMapping("/createFanoutExchange")public String createFanoutExchange(@RequestParam String exchangeName, @RequestParam String queueName) {rabbitDynamicConfigService.createFanoutExchange(exchangeName);rabbitDynamicConfigService.bindQueueToFanoutExchange(queueName, exchangeName);return "Fanout Exchange and Queue Binding created: " + exchangeName + " -> " + queueName;}// 创建并绑定 Topic 交换机@GetMapping("/createTopicExchange")public String createTopicExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam String routingKey) {rabbitDynamicConfigService.createTopicExchange(exchangeName);rabbitDynamicConfigService.bindQueueToTopicExchange(queueName, exchangeName, routingKey);return "Topic Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with routing key: " + routingKey;}// 创建并绑定 Headers 交换机@GetMapping("/createHeadersExchange")public String createHeadersExchange(@RequestParam String exchangeName, @RequestParam String queueName, @RequestParam Map<String, String> headersMap) {Map<String, Object> headers = new HashMap<>(headersMap);rabbitDynamicConfigService.createHeadersExchange(exchangeName);rabbitDynamicConfigService.bindQueueToHeadersExchange(queueName, exchangeName, headers);return "Headers Exchange and Queue Binding created: " + exchangeName + " -> " + queueName + " with headers: " + headers;}
}

5、RabbitMQ中的Queue消息监听动态操作接口

RabbitChannelListenerController.java
import com.chain.air.rpp.exchange.rabbit.RabbitDynamicConfigService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @className: RabbitListenerController* @program: chain* @description: RabbitMQ 监听器 Controller 组件* @author: kenny* @create: 2024-10-04 01:30* @version: 1.0.0*/
@RestController
@RequestMapping("/rabbit/channel/listener")
public class RabbitChannelListenerController {@Resourceprivate RabbitDynamicConfigService rabbitDynamicConfigService;/*** 创建监听器,监听指定队列** @param queueName 队列名称* @return 处理结果*/@GetMapping("/queue")public String listenQueue(@RequestParam("queueName") String queueName) {rabbitDynamicConfigService.createQueue(queueName, true);return "开始监听队列:" + queueName;}
}

三、动态exchange、queue的测试

1、测试Exchange、Queue的动态创建和删除

2、测试Exchange和Queue的动态绑定

3、发送、接收消息测试动态创建Exchange、Queue

4、测试Queue的动态监听接口

下一篇:7、Spring Boot 3.x集成RabbitMQ动态实例等操作

http://www.15wanjia.com/news/172013.html

相关文章:

  • 公司网站维护更新流程视频网站怎么做的反爬虫
  • 北京哪家做网站优化网站建设需求怎么写
  • 开平市住房和城乡建设局网站网站后端建设
  • 网站工程是干啥的广元市剑阁县建设局网站
  • 网站建设文本居中代码免费行情软件app网站mnu
  • 梧州网站建设定制免费的网站域名查询
  • 云南哪里有给做网站的下城网站建设
  • 网站建设相关小论文郑州建设网站哪家好
  • 芜湖网站建设求职简历展厅装饰公司
  • 企业建立网站北京工厂网站建设
  • 东莞网站推广推广软件百度关键词挖掘查询工具
  • 外发加工网订货会网站建设优化哪家公司好
  • 快三彩票网站开发登录wordpress后台的管理
  • 有没有房建设计的网站重庆app定制
  • 响应式科技公司网站模板下载wordpress商品
  • 有个做h手游的网站国家高新技术企业标志
  • 用jsp做的网站在不同浏览器显示效果差异很大如何解决社交平台运营是做什么的
  • 百度推广手机网站检测wordpress 获取分类链接
  • 专业微网站珠宝钻石网站建站
  • 太原网站建设搭建下载企业微信最新版
  • wordpress 插件系统seo是做什么工作内容
  • 找公司做网站wordpress栏目列表页
  • 湖北网站设计制作多少钱北京网站建设seo公司哪家好
  • 怎样用前端知识制作企业网站网站还在建设中英文
  • 福田建网站单仁营销网站的建设
  • 网站推广营销的意义气象网站建设管理总结
  • 网站开发前景如何推广外贸网站
  • 炫酷网站推荐河北省建设部网站
  • 网站后台模板关联自己做的网站网站开发和维护费用
  • 挂马网站 名单和建设银行类似的网站