RocketMQ - 如何实现顺序消息

发布一下 0 0

顺序消息的使用场景

日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。

如何发送和消费顺序消息

我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。

1. 顺序发消息

server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group#设置同步发送spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@RestControllerpublic class OrderlyController {    @Autowired    private Source source;    @GetMapping("/orderly")    public String orderly() {        List<String> types = Arrays.asList("创建订单", "支付", "退款");        types.forEach(type -> {            MessageBuilder builder = MessageBuilder.withPayload(type).setHeader(BinderHeaders.PARTITION_HEADER, 0);            Message message = builder.build();            source.output().send(message);        });        return "OK";    }}

上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在application.properties配置文件中指定producer.sync=true,默认是异步发送,此处改为同步发送。

MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。

2. 顺序收消息

server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group#设置同步发送spring.cloud.stream.rocketmq.bindings.output.producer.sync=true
@EnableBinding({Sink.class})@SpringBootApplicationpublic class App {    public static void main( String[] args )    {        SpringApplication.run(App.class);    }    @StreamListener(Sink.INPUT)    public void receive(String msg) {        System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());    }}

程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的

TopicTest receive: 创建订单, receiveTime= 1590503510075TopicTest receive: 支付, receiveTime= 1590503510076TopicTest receive: 退款, receiveTime= 1590503510077

顺序发送的技术原理

RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。

  • 局部有序:指发送同一个队列的消息有序,可以在发送消息时指定队列,在消费消息时也按顺序消费。例如同一个订单ID的消息要保证有序,不同订单的消息没有约束,相互不影响,不同订单ID之间的消息时并行的。
  • 全局有序:设置Topic只有一个队列可以实现全局有序,创建Topic时手动设置。此类场景极少,性能差,通常不推荐使用。

RocketMQ中消息发送有三种方式:同步、异步、单项。

  • 同步:发送网络请求后会同步等待Broker服务器返回结果,支持发送失败重试,适用于比较重要的消息通知场景。
  • 异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。
  • 单向:单向发送原理和异步一致,但不支持回调。适用于响应时间非常端,对可靠性要求不高的场景,例如日志收集。

顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。

RocketMQ的核心代码如下:

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {	private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();	public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {        return this.syncSendOrderly(destination, message, hashKey, (long)this.producer.getSendMsgTimeout());    }		public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {        if (!Objects.isNull(message) && !Objects.isNull(message.getPayload())) {            try {                long now = System.currentTimeMillis();                // 转成RocketMQ API中的Message对象                org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(this.objectMapper, this.charset, destination, message);                // 调用发送消息接口                SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);                long costTime = System.currentTimeMillis() - now;                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());                return sendResult;            } catch (Exception var12) {                log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);                throw new MessagingException(var12.getMessage(), var12);            }        } else {            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }    }}

选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成

public class SelectMessageQueueByHash implements MessageQueueSelector {    public SelectMessageQueueByHash() {    }    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        int value = arg.hashCode();        if (value < 0) {            value = Math.abs(value);        }        value %= mqs.size();        return (MessageQueue)mqs.get(value);    }}
  • 根据hashKey计算hash值,hashKey是我们前面例子中订单ID,因此相同订单ID的hash值相同。
  • 用hash值和队列数mqs.size()取模,得到一个索引值,结果小于队列数。
  • 根据索引值从队列列表中取出一个队列mqs.get(value),hash值相同则队列相同。

在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。

普通发送的技术原理

RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。

从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。

轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。

public class TopicPublishInfo {	public MessageQueue selectOneMessageQueue(String lastBrokerName) {		// 第一次执行时,lastBrokerName = null        if (lastBrokerName == null) {            return this.selectOneMessageQueue();        } else {            int index = this.sendWhichQueue.getAndIncrement();            for(int i = 0; i < this.messageQueueList.size(); ++i) {                int pos = Math.abs(index++) % this.messageQueueList.size();                if (pos < 0) {                    pos = 0;                }                MessageQueue mq = (MessageQueue)this.messageQueueList.get(pos);                // 当前选中的Queue所在Broker,不是上次发送的Broker                if (!mq.getBrokerName().equals(lastBrokerName)) {                    return mq;                }            }            return this.selectOneMessageQueue();        }    }	public MessageQueue selectOneMessageQueue() {        int index = this.sendWhichQueue.getAndIncrement();        int pos = Math.abs(index) % this.messageQueueList.size();        if (pos < 0) {            pos = 0;        }        return (MessageQueue)this.messageQueueList.get(pos);    }}

轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。

顺序消费的技术原理

RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。

多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。

顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。

public class ConsumeMessageOrderlyService implements ConsumeMessageService {	class ConsumeRequest implements Runnable {        private final ProcessQueue processQueue;        private final MessageQueue messageQueue;        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {            this.processQueue = processQueue;            this.messageQueue = messageQueue;        }        public ProcessQueue getProcessQueue() {            return this.processQueue;        }        public MessageQueue getMessageQueue() {            return this.messageQueue;        }        public void run() {        	// 省略        	try {                    this.processQueue.getLockConsume().lock();                     if (this.processQueue.isDropped()) {                       ConsumeMessageOrderlyService.log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                       return;                      }                      status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);                 } catch (Throwable var23) {                     ConsumeMessageOrderlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var23), ConsumeMessageOrderlyService.this.consumerGroup, msgs, this.messageQueue});                     hasException = true;                } finally {                    this.processQueue.getLockConsume().unlock();               }        }}

消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。

在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。

并发消费的原理

RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。

并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。

并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。

消息的幂等性

说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?

RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。

在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。

  • at-most-once:消息投递后不论消息是否被消费成功,不会再重复投递,有可能会导致消息未被消费,RocketMQ未使用该方式。
  • at-lease-once:消息投递后,消费完成后,向服务器返回ACK,没有消费则一定不会返回ACK消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的ACK,服务器则会再次投递,这就会导致可能重复消费,RocketMQ通过ACK来确保消息至少被消费一次。
  • exactly-only-once:必须下面两个条件都满足,才能认为消息是"Exactly Only Once"。 发送消息阶段,不允许发送重复消息;消费消息阶段,不允许消费重复的消息。在分布式系统环境下,如果要实现该模式,巨大的开销不可避免。RocketMQ没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。

版权声明:内容来源于互联网和用户投稿 如有侵权请联系删除

本文地址:http://0561fc.cn/82831.html

  • 评论列表

留言评论