java开发RocketMQ生产者高可用示例详解
2022-08-05 19:55:25 来源:易采站长站 作者:
目录
引言1 消息1.1 topic1.2 Body1.3 tag1.4 key1.5 延迟级别2 生产者高可用2.1 客户端保证生产者高可用2.1.1 重试机制2.1.2 客户端容错2.2 Broker端保证生产者高可用引言
前边两章说了点基础的,从这章开始,我们挖挖源码。看看RocketMQ是怎么工作的。
首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了。
我们看看消息孩子们都长啥样。
1>
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
//主题名字
private String topic;
//消息扩展信息,Tag,keys,延迟级别都存在这里
private Map<String, String> properties;
//消息体,字节数组
private byte[] body;
//设置消息的key,
public void setKeys(String keys) {}
//设置topic
public void setTopic(String topic) {}
//延迟级别
public int setDelayTimeLevel(int level) {}
//消息过滤的标记
public void setTags(String tags) {}
//扩展信息存放在此
public void putUserProperty(final String name, final String value) {}
}
消息就是孩子们,这些孩子们呢,有各自的特点,也有共性。同一个家长送来的两个孩子可以是去同一个地方的,也可以是去不同的地方的。
1.1>
首先呢,每个孩子消息都有一个属性topic,这个我们上文说到了,是一个候船大厅。孩子们进来之后,走到自己指定的候船大厅的指定区域(平时出门坐火车高铁不也是指定的站台乘车么),坐到message queue座位上等,等着出行。
Broker有一个或者多个topic,消息会存放到topic内的message queue内,等待被消费。
1.2>
孩子消息,也有一个Body属性,这就是他的能力,他会画画,他会唱歌,他会干啥干啥,就记录在这个Body属性里。等走出去了,体现价值的地方也是这个Body属性。
Body就是消息体,消费者会根据消息体执行对应的操作。
1.3>
这个tag我们上节说了,就是一个标记,有的孩子背着画板,相机,有的游船就特意找到这些孩子拉走,完成他们的任务。
可以给消息设置tag属性,消费者可以选择含有特定tag属性的消息进行消费。
1.4>
key就是每个孩子消息的名字了。要找哪个孩子,喊他名就行。
对发送的消息设置好 Key,以后可以根据这个Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。
1.5>
当然,还有的孩子来就不急着走,来之前就想好了,要恰个饭,得30分钟,所以自己来了会等30分钟后被接走。
设置延迟级别可以规定多久后消息可以被消费。
2>
每个送孩子来的家长都希望能送到候船大厅里,更不希望孩子被搞丢了,这个时候这个候船大厅就需要一些保证机制了。
2.1>
2.1.1>
就是说家长送来了,孩子进到候船大厅之后,没能成功坐到message queue座位上,这个时候工作人员会安排重试,再去看是否有座位坐。重试次数默认是2次,也就是说,消息孩子共有3次找座位坐的机会。
看源码,我特意加了注解,大致可以看懂一些了。
//这里取到了重试的次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //获取消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); ... } catch (RemotingException e) { ... continue; } catch (MQClientException e) { ... continue; } catch (MQBrokerException e) { ... continue; } catch (InterruptedException e) { //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试 throw e; } } else { break; } }
重试代码如上,这个sendDefaultImpl
方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
2.1.2>
若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.
选择Broker
就是在选择message queue
,对应的代码如下:
这里会先判断延迟容错开关是否开启,这个开关默认是关闭的,若是开启的话,会优先选择延迟较低的Broker。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //判断发送延迟容错开关是否开启 if (this.sendLatencyFaultEnable) { try { //选择一个延迟上可以接受,并且和上次发送相同的Broker int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //若是Broker的延迟时间可以接受,则返回这个Broker if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //若是前边都没选中一个Broker,就随机选一个Broker return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
但是当延迟容错开关为关闭状态的时候,执行的代码如下:
为了均匀分散Broker的压力,会选择与之前不同的Broker。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //若是没有上次的Brokername做参考,就随机选一个 if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //如果有,那么就选一个其他的Broker for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); //这里判断遇上一个使用的Broker不是同一个 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectOneMessageQueue(); } }
2.2>
Broker候船大厅为了能确切的接收到消息孩子,至少会有两个厅,一个主厅一个副厅,一般来说孩子都会进入到主厅,然后一顿操作,卡该忙信那机资(影分身之术),然后让分身进入到副厅,这样当主厅停电了,不工作了,副厅的分身只要去完成了任务就ok的。一般来说都是主厅的消息孩子去坐船完成任务。
之后我们会聊到Broker的主从复制,分为同步复制和异步复制,同步复制时指当master 收到消息之后,同步到slaver才算消息发送成功。异步复制是只要master收到消息就算成功。生产中建议至少部署两台master和两台slaver。
下一篇,我们聊聊,消息的发送流程,就是说,一个消息孩子,从进码头的门到坐到message queue座位上,都经历了啥。
以上就是java开发RocketMQ生产者高可用示例详解的详细内容,更多关于java RocketMQ生产者高可用的资料请关注易采站长站其它相关文章!
如有侵权,请发邮件到 [email protected]
最新图文推荐
相关文章
-
Spring Cloud 整合Apache-SkyWalking实现链路跟踪的方法
什么是SkyWalking 查看官网https://skywalking.apache.org/ 分布式系统的应用程序性能监视工具,专为微服务、云原生架构和基于容器(Docker、K8s、Mesos)架构而设计。 安装 进入下载页面https://2020-06-18
-
成功解决IDEA2020 Plugins 连不上、打不开的方法
IntelliJ IDEA 2020.1 插件中心一直打不开,鉴于有部分同学反馈设置http proxy不能解决,所以可按以下顺序检查 一、设置 http proxy—勾上Auto-detect proxy setting,参照下图,加上地址 http://127.0.02020-06-25
-
IDEA2020 1.1中Plugins加载不出来的问题及解决方法
进入File-Setting 如图,取消勾选,点击确认后重启,点击了以后等一会就可以正常显示 ps:下面看下解决IDEA 2020.1.1 找不到程序包和符号 问题描述 IDEA 2020.1.1 maven项目build的时候报错,找2020-06-28
-
springboot + rabbitmq 如何实现消息确认机制(踩坑经验)
本文收录在个人博客:www.chengxy-nds.top,技术资源共享,一起进步 最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷2020-07-01
-
JetBrains IntelliJ IDEA 2020安装与使用教程详解
对于JetBrains IntelliJ IDEA 2020的认识 IntelliJ IDEA 2020是一款JAVA编程软件,捷克IntelliJ公司研发推出。该软件提供了一个非常强大的JAVA集成开发环境,不仅添加了对Records的完整代码洞察支持,2020-06-28