17、RabbitMQ基础:RabbitOperations接口详解

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
  • send
  • convertAndSend
  • convertSendAndReceive
  • execute
  • invoke
  • waitForConfirms
  • getConnectionFactory
  • Other

前言

在上一篇,我们介绍了RabbitTemplate 实现AmqpTemplate接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperations

AmqpTemplate是对AMQP协议的支持,完成了基本的发送、接收消息,而RabbitOperations是对RabbitMQ的直接集成,提供了更细致的操作。

send

send 方法,主要是添加了一个CorrelationData参数。

CorrelationData用于发布确认、退回模式时进行数据封装,该对象会返回ACK以及原因,开启了退回模式时,还会返回退回信息。

public class CorrelationData implements Correlation {
   
     
	// 异步执行的结果,Confirm表示返回结果的类型
	private final SettableListenableFuture<Confirm> future = new SettableListenableFuture();
	// 唯一ID,如果未提供id将自动设置为唯一值。
    private volatile String id;
    // 退回时返回信息
    private volatile ReturnedMessage returnedMessage;
}

	// 发送消息,传递CorrelationData 对象
	default void send(String routingKey, Message message, CorrelationData correlationData)
			throws AmqpException {
   
     

		throw new UnsupportedOperationException("This implementation does not support this method");
	}
	
	// 指定交换机、路由、传递CorrelationData 对象
	void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
			throws AmqpException;

convertAndSend

convertAndSendsend方法的基础上,可以直接发送JAVA对象,并可以添加一个MessagePostProcessor消息处理器。

	// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData 
	void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;

	void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)
			throws AmqpException;

	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
			throws AmqpException;

	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,
			CorrelationData correlationData) throws AmqpException;
			
	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,
			CorrelationData correlationData) throws AmqpException;
	
	void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;

convertSendAndReceive

convertSendAndReceiveAmqpTemplate接口中发送并接收消息一样,是RPC模式,区别是多了个CorrelationData参数。

	@Nullable
	Object convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)
			throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String exchange, String routingKey, Object message,
			CorrelationData correlationData) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,
			CorrelationData correlationData) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String routingKey, Object message,
			MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;

	@Nullable
	Object convertSendAndReceive(String exchange, String routingKey, Object message,
			MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
			throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	default <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
			@Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType)
					throws AmqpException {
   
     

		return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);
	}

	@Nullable
	<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
			CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
			MessagePostProcessor messagePostProcessor, CorrelationData correlationData,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

	@Nullable
	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
			@Nullable MessagePostProcessor messagePostProcessor,
			@Nullable CorrelationData correlationData,
			ParameterizedTypeReference<T> responseType) throws AmqpException;

execute

execute方法可以获取原生Channel执行操作,需要一个ChannelCallback参数。

	@Nullable
	<T> T execute(ChannelCallback<T> action) throws AmqpException;

ChannelCallback是一个函数式接口,使用该接口,可以获取RabbitMQChannel,执行任意操作,并返回结果。

@FunctionalInterface
public interface ChannelCallback<T> {
   
     
	/**
	 * @param channel 通道
	 * @return 返回结果
	 */
    @Nullable
    T doInRabbit(Channel var1) throws Exception;
}

示例:

        ChannelCallback<Boolean> stringChannelCallback = new ChannelCallback<Boolean>() {
   
     
            @Override
            public Boolean doInRabbit(Channel channel) throws Exception {
   
     
                // 调用Channel 发送消息
                channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());
                System.out.println("doInRabbit");
                return true;
            }
        };
        Boolean execute = rabbitTemplate.execute(stringChannelCallback);
        System.out.println("结果:"+execute);

invoke

invoke方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。

	@Nullable
	default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
   
     
		ret

	@Nullable
	<T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks,
			@Nullable com.rabbitmq.client.ConfirmCallback nacks);
}

OperationsCallback操作回调,可以获取RabbitOperations执行操作,并返回结果。

	@FunctionalInterface
	interface OperationsCallback<T> {
   
     
		/**
		 * @param operations RabbitOperations.
		 * @return 结果.
		 */
		@Nullable
		T doInRabbit(RabbitOperations operations);
}

waitForConfirms

waitForConfirmswaitForConfirmsOrDie都是等待确认,但是必须在invoke方法中使用,

	// 等待确认 
	boolean waitForConfirms(long timeout) throws AmqpException;

	// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息
	void waitForConfirmsOrDie(long timeout) throws AmqpException;

getConnectionFactory

返回此操作的连接工厂。

	ConnectionFactory getConnectionFactory();

Other

startstop没有实现的方法,只是为了向后兼容。

	@Override
	default void start() {
   
     
		// No-op - implemented for backward compatibility
	}

	@Override
	default void stop() {
   
     
		// No-op - implemented for backward compatibility
	}

	@Override
	default boolean isRunning() {
   
     
		return false;
	}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: