当前位置: 首页 > news >正文

厦门市湖里区建设局网站免费创建个人博客网站

厦门市湖里区建设局网站,免费创建个人博客网站,杨浦网站建设 网站外包,东莞建网站【MacOS】RocketMQ 搭建Java客户端 文章目录 【MacOS】RocketMQ 搭建Java客户端一、引入RocketMQ客户端依赖1.maven工程,在你的pom.xml中添加RocketMQ客户端依赖:2.gradle工程添加库 二、创建生产者和消费者1.创建一个生产者消费者1.创建一个PullConsume…

【MacOS】RocketMQ 搭建Java客户端

文章目录

  • 【MacOS】RocketMQ 搭建Java客户端
    • 一、引入RocketMQ客户端依赖
      • 1.maven工程,在你的`pom.xml`中添加RocketMQ客户端依赖:
      • 2.gradle工程添加库
    • 二、创建生产者和消费者
      • 1.创建一个生产者
      • 消费者
        • 1.创建一个PullConsumer
        • 2.创建一个PushConsumer
    • 三、遇到的问题
      • 1.连接失败的问题
      • 2.主题没有找到

一、引入RocketMQ客户端依赖

1.maven工程,在你的pom.xml中添加RocketMQ客户端依赖:

<dependencies><!-- 添加RocketMQ客户端依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>版本跟你下载的rocketmq版本一样</version></dependency>
</dependencies>

2.gradle工程添加库

compile 'org.apache.rocketmq:rocketmq-client:你的版本号'
  • 注意
    1. 客户端和服务端版本要一致,否则会发射管一些奇怪的问题
    2. 要到控制台创建Topic队列名称

二、创建生产者和消费者

1.创建一个生产者

mport com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;/*** @author pengxiaoping* @date 2024年10月18日 11:27*/
public class Producer {public static void main(String[] args) {Producer.producer();}public static void producer() {//创建DefaultMQProducer消息生产者对象DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");//设置NameServer 多个节点间用分号分割producer.setNamesrvAddr("localhost:9876");try {//与NameServer建立长连接producer.start();//发送十条数据for (int i = 1; i <= 10; i++) {//1S中发送一次Thread.sleep(1000);JSONObject json = new JSONObject();json.put("orderId",i+1);json.put("desc","这是第"+i+1+"个订单");//数据正文String data = json.toJSONString();/*创建消息Message消息三个参数topic 代表消息主题,自定义自定义TopicOrder代表订单主题代表订单主题tags 代表标志,用于消费者接收数据时进行数据筛选。PAY_TAG代表支付相关信息body 代表消息内容*/Message message = new Message("TopicOrder", "PAY_TAG", data.getBytes());//发送消息,获取发送结果SendResult result = producer.send(message);//将发送结果对象打印在控制台System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:"+ result.getSendStatus());}}catch (Exception e){e.printStackTrace();}finally {try {producer.shutdown();} catch (Exception e) {}}}}

消费者

对于Consumer来说,他有两种基础的工作方式:pull和push。
区别:push:broker端来了消息以后主动将消息从broker端向consumer端推送。
pull:对于consumer来说主动往broker发一个请求,然后broker在通过请求响应给consumer一批消息。一般用push模式。

1.创建一个PullConsumer
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PullConsumer {public static volatile boolean running = true;public static void consumer() {//创建pull消费者对象DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("TestPullConsumerGroup");//设置NameServer节点litePullConsumer.setNamesrvAddr("localhost:9876");try {//订阅主题,与Push相同litePullConsumer.subscribe("TopicOrder", "*");//每次拉取数据条目数litePullConsumer.setPullBatchSize(10);//启动消费者litePullConsumer.start();while (running) {List<MessageExt> messageExts = litePullConsumer.poll();//批量数据处理for (MessageExt msg : messageExts) {System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + newString(msg.getBody()));}}}catch (Exception e){e.printStackTrace();}finally {litePullConsumer.shutdown();}}public static void main(String[] args) {PullConsumer.consumer();}
}
2.创建一个PushConsumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PushConsumer {public static void consumer() {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumerGroup");try {//设置NameServer节点consumer.setNamesrvAddr("localhost:9876");/*订阅主题,consumer.subscribe包含两个参数:topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。subExpression: 子表达式用于筛选tags。同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。例如:设置为*,则代表接收所有tags数据。例如:设置为PAY_TAG,则Broker中只有tags=PAY_TAG的消息会被接收,而其他的就会被排除在外。*/consumer.subscribe("TopicOrder", "*");//创建监听,当有新的消息监听程序会及时捕捉并加以处理。consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//批量数据处理for (MessageExt msg : msgs) {System.out.println("消费者获取数据:" + msg.getMsgId() + "==>" + newString(msg.getBody()));}//返回数据已接收标识return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者,与Broker建立长连接,开始监听。consumer.start();} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {PushConsumer.consumer();}
}

三、遇到的问题

1.连接失败的问题

Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failedat org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:572)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2050)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2041)at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:782)... 6 more
  • 检查代码中设置的NameServer地址是否正确,跟配置文件中的NAME_ADDR地址一致。确保没有拼写错误、IP 地址或域名准确,以及端口号正确。
  • 检查 RocketMQ 的 NameServer 是否已经启动并且正在运行。查看 NameServer 的日志文件,确认没有错误或异常情况。如果 NameServer 没有启动,需要启动它。确保 NameServer 的配置正确,并且没有与其他服务冲突。

2.主题没有找到

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)
  • 主题不存在或者未被创建:

    #在控制台创建主题 -n namesrv 的地址。-t 主题名。-c 指定所在集群
    sh bin/mqadmin updateTopic -n localhost:9876 -t TopicOrder -c DefaultCluster
    
    #出现这个创建成功
    create topic to 172.16.224.140:10911 success.
    TopicConfig [topicName=TopicOrder, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
    

文章转载自:
http://pood.spfh.cn
http://heptad.spfh.cn
http://grig.spfh.cn
http://hematuresis.spfh.cn
http://accumbent.spfh.cn
http://adorable.spfh.cn
http://pedagese.spfh.cn
http://kirigami.spfh.cn
http://apologizer.spfh.cn
http://doubler.spfh.cn
http://hortative.spfh.cn
http://senegalese.spfh.cn
http://critical.spfh.cn
http://cockfight.spfh.cn
http://regeneration.spfh.cn
http://basnet.spfh.cn
http://sportsmanlike.spfh.cn
http://scurrility.spfh.cn
http://horseplay.spfh.cn
http://bailable.spfh.cn
http://dampen.spfh.cn
http://ulsterman.spfh.cn
http://retexture.spfh.cn
http://japan.spfh.cn
http://daughterhood.spfh.cn
http://idd.spfh.cn
http://rood.spfh.cn
http://arena.spfh.cn
http://impalpable.spfh.cn
http://scram.spfh.cn
http://rollman.spfh.cn
http://illogical.spfh.cn
http://slope.spfh.cn
http://saliva.spfh.cn
http://weltanschauung.spfh.cn
http://interjection.spfh.cn
http://serigraphy.spfh.cn
http://fixature.spfh.cn
http://tiglic.spfh.cn
http://lusty.spfh.cn
http://lenticulated.spfh.cn
http://toxoplasma.spfh.cn
http://heathendom.spfh.cn
http://proseminar.spfh.cn
http://fibered.spfh.cn
http://lipin.spfh.cn
http://laval.spfh.cn
http://synchroneity.spfh.cn
http://villeggiatura.spfh.cn
http://proferment.spfh.cn
http://chowder.spfh.cn
http://putamen.spfh.cn
http://airway.spfh.cn
http://whereunder.spfh.cn
http://ogam.spfh.cn
http://wadding.spfh.cn
http://nonjuror.spfh.cn
http://ticking.spfh.cn
http://gooseflesh.spfh.cn
http://scion.spfh.cn
http://strategize.spfh.cn
http://untaa.spfh.cn
http://airglow.spfh.cn
http://domelike.spfh.cn
http://ruction.spfh.cn
http://asemia.spfh.cn
http://mullah.spfh.cn
http://bushido.spfh.cn
http://sagaciously.spfh.cn
http://despairingly.spfh.cn
http://fingerling.spfh.cn
http://ego.spfh.cn
http://uroscopy.spfh.cn
http://posh.spfh.cn
http://scalpriform.spfh.cn
http://moisten.spfh.cn
http://vinylon.spfh.cn
http://fuzznuts.spfh.cn
http://dissatisfactory.spfh.cn
http://philotechnical.spfh.cn
http://hmf.spfh.cn
http://genty.spfh.cn
http://waxplant.spfh.cn
http://musketry.spfh.cn
http://haemocytometer.spfh.cn
http://leud.spfh.cn
http://unmanliness.spfh.cn
http://concretion.spfh.cn
http://norman.spfh.cn
http://bonzer.spfh.cn
http://kench.spfh.cn
http://foxfire.spfh.cn
http://solitarily.spfh.cn
http://conspicuity.spfh.cn
http://weaponization.spfh.cn
http://arithmancy.spfh.cn
http://mincemeat.spfh.cn
http://farkleberry.spfh.cn
http://cyberphobia.spfh.cn
http://fashion.spfh.cn
http://www.15wanjia.com/news/80760.html

相关文章:

  • 献县网站建设网络营销专业代码
  • b2b网站做推广有效果吗百度的广告怎么免费发布
  • 宜城网站建设网站功能开发
  • 一般做网站需要多少钱贵阳seo网站推广
  • 怎么做美食团购网站网店推广运营策略
  • 怎么做网站的导航条怎样和政府交换友链
  • 网站商品图片怎么做吉安seo招聘
  • 前端如何兼职做网站餐饮营销方案
  • 上海软件培训网站建设alexa
  • 台州网站推广杭州seo网络推广
  • 免费网站设计全国各城市疫情高峰感染进度
  • 微信朋友圈推广软文seo编辑是干什么的
  • 项目招商手机系统优化软件
  • 广州企业网站营销电话seo交流网
  • 做地方网站需要什么部门批准seo关键词快速提升软件官网
  • 餐饮公司网站建设的特点微信推广引流平台
  • 禅城网站建设网络营销服务外包
  • 门户网站建设存在的问题和差距公司网络推广方法
  • 建设企业网站的模式郑州做网站的专业公司
  • dedecms网站栏目管理深圳seo公司助力网络营销飞跃
  • 新加坡网站制作百度代做seo排名
  • 泰州企业自助建站网络营销策划名词解释
  • 什么叫商城网站淘宝seo排名优化
  • 甘肃省集约化网站建设百度推广入口官网
  • 惠城网站建设有哪些计算机培训班培训费用
  • 个人网站不能放广告怎么赚钱企业seo排名优化
  • 模板做的网站不好优化网络公司名字
  • 咨询公司排名前十如何做谷歌优化
  • 徐州市政建设集团公司网站互联网的推广
  • 网站怎么做pc端盒子代写平台在哪找