18、RabbitMQ基础:对象序列化机制

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
  • 发送对象
  • 接收对象
  • 使用Jackson 序列化

前言

使用RabbitMQ原生API,发送消息时,发送的是二进制byte[]数据。

    void basicPublish(String var1, String var2, byte[] var4) throws IOException;

使用RabbitTemplate.send方法发送Message对象,也是二进制byte[]数据。

    public Message(byte[] body) {
   
     
        this(body, new MessageProperties());
    }

在接收时,需要将二进制数据转为你想要的数据格式。在JAVA编程中都是基于对象操作,一般消息都是对象,比如订单、日志。

所以RabbitTemplate提供了convertAndSend方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。

发送对象

首先我们看下RabbitTemplate.convertAndSend是如何工作及序列化对象的。

发送一个用户User对象,该对象需要实现Serializable序列化接口。

        User user = new User();
        user.setName("张三");
        rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);

convertAndSend也是调用send方法,只是多了一个convertMessageIfNecessary,将对象转为二进制数组,并封装到Message对象中。

   public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
   
     
   		// this.convertMessageIfNecessary(object) 将JAVA 消息对象转为Message
        this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
    }

convertMessageIfNecessary会判断当前消息是否是Message类型,如果是直接返回,不是则调用消息转换器进行转换。

   protected Message convertMessageIfNecessary(Object object) {
   
     
        return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

获取消息转换器,直接通过RabbitTemplate.getMessageConverter获取其成员属性,也就是SimpleMessageConverter,这是默认值。

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
   
     
    	//  private MessageConverter messageConverter = new SimpleMessageConverter();
        MessageConverter converter = this.getMessageConverter();
        if (converter == null) {
   
     
            throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        } else {
   
     
            return converter;
        }
    }

接着调用消息转换器的toMessage方法,

    public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException {
   
     
    	// 1. 创建消息属性对象
        MessageProperties messageProperties = messagePropertiesArg;
        if (messagePropertiesArg == null) {
   
     
            messageProperties = new MessageProperties();
        }
		// 2. 创建Message对象
        Message message = this.createMessage(object, messageProperties, genericType);
        messageProperties = message.getMessageProperties();
        if (this.createMessageIds && messageProperties.getMessageId() == null) {
   
     
            messageProperties.setMessageId(UUID.randomUUID().toString());
        }

        return message;
    }

createMessage创建Message对象并返回。如果不是byte[]String类型,最后会查看消息对象是否实现了Serializable接口,如果是,则进行序列化,并设置ContentType:application/x-java-serialized-object,以上都是不是则会抛出IllegalArgumentException异常。

    protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
   
     
        byte[] bytes = null;
        // 1. byte[] 类型
        if (object instanceof byte[]) {
   
     
            bytes = (byte[])object;
            // 设置消息属性 ContentType:application/octet-stream
            messageProperties.setContentType("application/octet-stream");
        } else if (object instanceof String) {
   
     
        	// 2. String 类型
            try {
   
     
            	// 转为字节
                bytes = ((String)object).getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException var6) {
   
     
                throw new MessageConversionException("failed to convert to Message content", var6);
            }
			// 设置消息属性 ContentType:text/plain
            messageProperties.setContentType("text/plain");
            // 设置消息属性 ContentEncoding:UTF-8
            messageProperties.setContentEncoding(this.defaultCharset);
        } else if (object instanceof Serializable) {
   
     
        	// 3. 实现了 Serializable接口
            try {
   
     
            	// 转为byte[] 
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
   
     
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }
			// 设置消息属性 ContentType:application/x-java-serialized-object
            messageProperties.setContentType("application/x-java-serialized-object");
        }

        if (bytes != null) {
   
     
        	// 4. 设置长度
            messageProperties.setContentLength((long)bytes.length);
            // 5. 返回Message对象 
            return new Message(bytes, messageProperties);
        } else {
   
     
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
        }
    }

Message创建成功后,调用原生的channel.basicPublish方法,发送消息对象、属性。

    protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {
   
     
        AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    }

查看控制台,可以看到对象消息的相关信息:
*

接收对象

在消费者接收消息时,可以直接接收业务对象。

    @RabbitListener(queues = {
   
     "dsfsf"})
    public void receive003(User user) {
   
     
        System.out.println("收到消息" + user);
    }

容器监听消息,调用消息转换器SimpleMessageConverter将二进制数据转为相应的对象。

调用的是SimpleMessageConverter.fromMessage方法。

    public Object fromMessage(Message message) throws MessageConversionException {
   
     
        Object content = null;
        // 1. 处理消息属性
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
   
     
        	// 获取contentType ,这里为:application/x-java-serialized-object
            String contentType = properties.getContentType();
            // 2. contentType 以text 开头(字符串),二进制转为字符串返回
            if (contentType != null && contentType.startsWith("text")) {
   
     
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
   
     
                    encoding = this.defaultCharset;
                }

                try {
   
     
                    content = new String(message.getBody(), encoding);
                } catch (UnsupportedEncodingException var8) {
   
     
                    throw new MessageConversionException("failed to convert text-based Message content", var8);
                }
            // 3. contentType为 application/x-java-serialized-object(序列化对象),
            } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
   
     
                try {
   
     
                	// 反序列化为对象
                    content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
   
     
                    throw new MessageConversionException("failed to convert serialized Message content", var7);
                }
            }
        }
		// 4. 以上都不是,直接返回二进制
        if (content == null) {
   
     
            content = message.getBody();
        }
        return content;
    }

使用Jackson 序列化

可是使用其他序列化方式,比如Jackson

只需要在RabbitTemplate、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
   
     
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
   
     
        RabbitTemplate template = new RabbitTemplate();
        configurer.configure(template, connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: