摘要
本文主要学习的目标有两个:
- RabbitMQ中的消息可靠性投递的方式;
- 发布的性能权衡;
虽然不是所有的系统都要求像银行一样对消息可靠投递有非常严格的要求,但确保消息被接收和投递是非常重要的。RabbitMQ基于AMQP规范,后者提供消息发布中的事务以及消息持久化选项,以提供比自身普通消息发布更高级的可靠消息通信机制。
发布性能的权衡
在RabbitMQ中,创建可靠性投递的每个机制都会对性能产生一定的影响。单独使用时可能不太会注意到吞吐量的差异,但是当它们组合使用时,吞吐量就会由明显不同,只有通过执行自己的性能基准测试,才能确定性能与可靠性投递之间可以接受的平衡。
下面从左到右依次说明这些机制会产生哪些性能影响。
另外,会使用Spring提供的RabbitTemplate客户端工具(使用过RabbitTemplate,后续可能不会介绍RabbitTemplate),对每种机制进行配置,并发送消息到RabbitMQ。
代码在Github:
没有保障
在完美世界里,无须任何额外的配置或操作,RabbitMQ就可以可靠的投递消息。
不幸的是,当墨菲定律肆虐我们的程序时,完美世界并不存在。
在非核心应用中,发布的消息不必处理每个可能的故障点,例如发一些允许丢弃的消息,那么我们可以不使用任何保障机制,直接使用Basic.Publish发送消息。
使用RabbitTemplate时,可以在配置文件中设置:
spring: #消息队列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / publisher-returns: false publisher-confirms: false connection-timeout: 5000ms
将publisher-returns和publisher-confirms设置为false。
失败通知
设置mandatory后,RabbitMQ将不接受不可路由的消息。
mandatory标志是一个与Basic.Publish命令一起传递的参数,该参数会告诉RabbitMQ,如果消息不可路由,它应该通过Basic.Return命令将消息返回给发布者。设置mandatory标志可以被认为是开启故障检测模式,它只会让RabbitMQ向你通知失败,而不会通知成功。如果消息路由正确,你的发布者将不会收到通知。
/** * 定制AmqpTemplate对象。 * 可根据需要定制多个。 * * @return AmqpTemplate对象。 */ @Bean public AmqpTemplate amqpTemplate() { rabbitTemplate.setEncoding("UTF-8"); // 设置不接受不可路由的消息,需要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},原因:{},交换器: {},路由键:{}", correlationId, replyCode, replyText, exchange, routingKey); }); return rabbitTemplate; }
如上面的配置,我们设置了mandatory等于true,同时将配置文件中的publisher-returns也设置为true,这样就打开了失败通知。下面做个测试:
/** * 发送direct消息。 * 交换器存在,但队列不存在,为了测试Mandatory与ReturnCallback。 * * @param message 消息内容。 */ public void directNotExistQueue(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData); }
我们创建了交换器DIRECT_EXCHANGE,但是使用一个不存在的RoutingKey,这就等于发送消息到交换器成功,但是无法路由到某一个队列,执行测试用例,观察结果:
/** * 发送direct消息,但消息路由不存在。 * 交换器存在,但队列不存在,为了测试Mandatory与ReturnCallback。 */ @Test public void testDirectNotExistQueue() { messageProducer.directNotExistQueue("{}"); }
结果如下:
ReturnCallback -> 消息 null 发送失败,应答码:312,原因:NO_ROUTE,交换器: DIRECT_EXCHANGE,路由键:DIRECT_ROUTING_KEY_NOT_EXIST
Basic.Return调用是一个RabbitMQ的异步调用,并且在消息发布后的任何时候都可能发生。
如果代码中没有设置setReturnCallback,那么该调用将被忽略。
其实setReturnCallback就是处理Basic.Return的回调方法,RabbitTemplate接收到Basic.Return命令后,调用该方法。
发布者确认
发布者确认模式是AMQP规范的扩展功能,只能用在支持这个特定扩展的客户端,RabbitTemplate支持这个模式。
在协议层,发布任何消息之前,消息发布者必须向RabbitMQ发送Confirm.Select请求,并等待Confirm.SelectOk响应以获知投递确认已经被启动。在这一点上,对于发布者发送给RabbitMQ的每条消息,服务器会发送一个确认响应(Basic.Ack)或否定确认响应(Basic.Nack)。
在RabbitTemplate中,要使用发布者确认,需要在配置文件中配置:
publisher-confirms: true
然后在设置回调函数:
/** * 定制AmqpTemplate对象。 * 可根据需要定制多个。 * * @return AmqpTemplate对象。 */ @Bean public AmqpTemplate amqpTemplate() { // 设置消息转换器为Jackson rabbitTemplate.setEncoding("UTF-8"); // 设置不接受不可路由的消息,需要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},原因:{},交换器: {},路由键:{}", correlationId, replyCode, replyText, exchange, routingKey); }); // 设置消息发布确认功能,需要在yml中配置:publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("ConfirmCallback -> 消息发布到交换器成功,id:{}", correlationData); } else { log.warn("ConfirmCallback -> 消息发布到交换器失败,错误原因为:{}", cause); } }); // 开启事务模式,需要在yml中配置:publisher-confirms: false // rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }
调用setConfirmCallback方法,设置回调函数,每次发送消息到RabbitMQ,服务器都会返回响应,可以通过判断ack来确定是否发送成功。
当成功发送到交换器后,ConfirmCallback会接收到ack为true的响应,如果没有成功发送到交换器,则会接收到ack为false的响应。
具体测试代码如下:
/** * 发送direct消息。 * 交换器不存在,队列也不存在,为了测试ConfirmCallback。 * * @param message 消息内容。 */ public void directNotExistExchangeAndQueue(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE_NOT_EXIST", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData); }
首先向不存在的交换器发送消息,结果为:
/** * 发送direct消息,交换器和路由都不存在。 * 交换器不存在,队列也不存在,为了测试ConfirmCallback。 */ @Test public void testDirectNotExistExchangeAndQueue() { messageProducer.directNotExistExchangeAndQueue("{}"); }
ConfirmCallback -> 消息发布到交换器失败,错误原因为:channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE_NOT_EXIST' in vhost '/', class-id=60, method-id=40)
然后在使用失败通知模式的测试用例测试一下,即能发送到交换器,但是无法路由到队列:
ReturnCallback -> 消息 null 发送失败,应答码:312,原因:NO_ROUTE,交换器: DIRECT_EXCHANGE,路由键:DIRECT_ROUTING_KEY_NOT_EXISTConfirmCallback -> 消息发布到交换器成功,id:CorrelationData [id=9282dbe9-4fe9-4b85-af06-79305f4c99e1]
无论是否使用发布者确认模式,如果你发布消息到不存在的交换器,那么发布用的信道将会被RabbitMQ关闭。
发布者确认模式不能与事务模式一起工作,此外,作为对Basic.Publish请求的异步响应,它并不能保证何时会收到确认。
备用交换器
备用交换器是RabbitMQ对AMQP的另一种扩展,用于处理无法路由的消息。备用交换器在第一次声明交换器时被指定,用来提供一种预先存在的交换器,即如果交换器无法路由消息,那么消息就会被路由到这个新的备用交换器。
如果将消息发送到具有备用交换器的交换器(设置了mandatory=true)上, 那么一旦预期的交换器无法正常路由消息,Basic.Return就不会发给发布者。因为消息成功的发布到了备用交换器。
RabbitTemplate声明备用交换器的代码如下:
/** * 声明Direct交换器。 * 同时指定备用交换器。 * * @return Exchange对象。 */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE") .durable(false) .withArgument("alternate-exchange", "UN_ROUTE_EXCHANGE") .build(); }
在声明交换器时,调用withArgument函数,key为alternate-exchange,value为备用交换器的名称,这里是UN_ROUTE_EXCHANGE(备用服务器也需要创建)。
下面进行测试,发送一个无法路由的消息到DIRECT_EXCHANGE,这个消息将不能被路由,但不会回调ReturnCallback,而是会进入到UN_ROUTE_EXCHANGE交换器中:
事务提交
AMQP事务提供了一种机制,通过这种机制,消息可以批量发布到RabbitMQ,然后提交到队列或者回滚。
在RabbitTemplate中,使用事务就不能使用ReturnConfime模式,所以要把publisher-confimes设置为false,具体代码如下:
/** * 定制AmqpTemplate对象。 * 可根据需要定制多个。 * * @return AmqpTemplate对象。 */ @Bean public AmqpTemplate amqpTemplate() { // 设置消息转换器为Jackson rabbitTemplate.setEncoding("UTF-8"); // 设置不接受不可路由的消息,需要在yml中配置:publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},原因:{},交换器: {},路由键:{}", correlationId, replyCode, replyText, exchange, routingKey); }); // 开启事务模式,需要在yml中配置:publisher-confirms: false rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; }
代码中,要设置setChannelTransacted为true,然后声明RabbitMQ的事务管理器:
/** * 声明RabbitMQ事务管理器。 * * @param connectionFactory 连接工厂。 * @return PlatformTransactionManager对象。 */ @Bean public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
到这里,事务的配置准备工作就做好了,接下来,基于事务模式发送消息:
/** * 在事务模式下,发送direct消息。 ** 第一次发送,消息可以正常路由到队列。 * 第二次发送,消息不能路由到队列。 */ @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData); }
代码中,加入了@Transactional修饰方法,先后发送两条消息到交换器,第一次发送的消息会正常路由到队列,第二次发送的消息则不会发送到队列,下面是测试代码和结果:
/** * 在事务模式下,发送direct消息。 * 第一次发送,消息可以正常路由到队列。 * 第二次发送,消息不能路由到队列。 */ @Test public void testDirectOnTransaction() { messageProducer.directOnTransaction("{}"); }
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
由于发生了异常,执行了回滚,所以第一条消息也没有被发送到队列:
如果两条数据都会成功发送到RabbitMQ,则会成功提交两条消息。
如果不用@Transactional修饰方法,那么会有一条消息进入RabbitMQ,另一条消息丢失,具体测试如下,首先是两条消息都能发送到RabbitMQ:
/** * 在事务模式下,发送direct消息。 ** 第一次发送,消息可以正常路由到队列。 * 第二次发送,消息不能路由到队列。 */ @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); }
下面把@Transactional修饰去掉,然后一条可以发送到RabbitMQ,另一条不可以:
/** * 在事务模式下,发送direct消息。 ** 第一次发送,消息可以正常路由到队列。 * 第二次发送,消息不能路由到队列。 */ // @Transactional(rollbackFor = Exception.class) public void directOnTransaction(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData); rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData); }
执行后结果如下:
org.springframework.amqp.AmqpIOException: java.io.IOException
可以看到程序依旧抛出了异常,但第一条消息发送到了RabbitMQ中:
在协议层,当RabbitMQ由于错误而无法路由时,它将发送一个Basic.Return响应,希望终止事务的发布者应该发送TX.Rollback请求,并等待TX.RollbackOk响应,然后继续工作。
RabbitMQ只在每个发出的命令作用于单个队列时才执行原子事务。如果不只一个队列受到事务中任何命令的影响,则提交就不具备原子性。
推荐使用发布确认模式用作轻量级替代方案,因为它的速度快,可以同时提供肯定或否定的确认。
高可用队列以及高可用队列事务
高可用队列(HA队列)时RabbitMQ的一项增强功能,它允许队列在多个服务器上拥有冗余副本。
当消息发送到高可用队列是,消息会发送到集群中的每台服务器,一旦消息在集群中的任何节点都完成消费,那么消息所有副本将立即从其他节点删除。
HA队列中有一个节点是主节点,其他所有节点都是辅助节点。当主节点发生故障,会在辅助节点中选择一个接管主节点的角色。如果HA节点中的一个辅助节点故障了,其他节点将照常工作。
当一个故障节点恢复了,或者新添加进来一个辅助节点,它将不包含任何已经存在于现有节点中的消息,当现有节点的消息被消费后,故障节点或新节点则开始接收消息,并执行同步操作。
如果使用事务或消息确认机制,则消息需要在HA队列中所有活动节点确定后,RabbitMQ才会发送成功响应。
高可用队列的配置在后面会单独写一篇。
消息持久化
如果将一个消息的delivery-mode设置为1,RabbitMQ会被告知不需要将消息存储到磁盘,而消息会一直保存在内存中。
为了使消息在RabbitMQ重启后仍然存在,除了将delivery-mode设置为2,还需要在创建队列时设置durable,使队列变为持久化队列。
在发布消息时,RabbitTemplate默认采用持久化策略,如果希望持久化存储消息,需要在发送消息时做如下设置:
/** * 发送direct非持久化消息。 * RabbitTemplate默认采用消息持久化存储。 * * @param message 消息内容。 */ public void directNonPersistent(String message) { rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); msg.getMessageProperties().setCorrelationId(UUID.randomUUID().toString()); return msg; } ); }
setDeliveryMode为非持久化模式后,发送的消息将只保存在RabbitMQ的内存中。
在I/O密集型服务器中,通过操作系统在存储设备之间传输数据时,操作系统将阻塞I/O操作的进程。当RabbitMQ服务器正在尝试执行I/O操作,并等待存储设备响应时,操作系统内核发生阻塞,那么RabbitMQ能做的就只有等待。
尽管消息持久化时保障消息最终被投递的最重要的方式之一,但实现它的代价也时最大的。