基于RabbitMQ在Kettle中使用MQTT协议进行数据传输

背景

最近给某招采投标平台做了一个订阅推送功能,其中数据主要的ETL流程实在kettle内做的,现在有一个需求是需要把部分数据处理完成后发送给平台方的实际使用者(主要包括钉钉端和APP端),笔者面临的一个问题是如何将现阶段的处理的部分数据一旦完成后能快速的接投递给消费者。结合业务思考过后主要的数据流程如下(不含真实业务逻辑和数据落库):

从上图中可以看出如何将kettle解除耦合性传递给消息队列,是我主要面临的问题。

选择

经过调研,事实上kettle已经在streaming的模块中列出了一些支持的通信协议和支持的中间件,例如HTTP,MQTT,Kafka,RabbitMQ等。通过上述架构,结合我自己的使用情况,我本来想要直接使用rabbitMQ组件进行消息投递,该消息中间件我也比较熟悉。但是结合我们项目当前使用的kettle版本发现达不到9.X,rabbitMQ组件是没有的。因此当下面临一个问题:要么换中间件,要么换协议。因此这里选中MQTT协议进行接入rabbitMQ。因为Kafka本人不熟悉。

这里简单的说些RabbitMQ,RabbitMQ最早是面向amqp协议的,但是得益于RabbitMQ优秀的设计思路,RabbitMQ可以通过官方插件的形式对其他主流协议做兼容(核心问题就在这些插件使用过程中有一些问题需要进行避免,这也是本文的主要目的),因此这里笔者选用RabbitMQ安装MQTT插件。以下给出Kettle不通版本在上述架构中的解决方案:

  • Kettle9.X

  • Kettle8.x(本文主要将的方式)

实现

  1. 安装给RabbitMQ安装MQTT协议(该项不是本文重点,不再赘述),安装成功可以在localhost:15672 的控制面板中的Overview选项下的PortsAndContexts 下可以查看当前插件的启用情况和端口,MQTT插件一般默认的端口是1883
  2. 给消息队列创建队列,交换器绑定关系等,这里我的消费者是一个SpringBoot工程,消息队列的绑定关系我是由其设置的。读者也可以手动在控制面版上界面化设置。下面给我的SpringBean配置。原理在原理说明章节说明
@Configuration
public class RabbitMQConsumerConfiguration {

    /**
     * 公告推送 kettle使用mqtt进行推送因此只能使用该交换器
     */
    public static final String GG_TOPIC = "amq.topic";
    /**
     * rabbitmq中规定接收mqtt消息的队列名必须以mqtt-subscription-开头gg指的是消息类型,01指的是Q0S
     */
    public static final String GG_QUEUE = "mqtt-subscription-gg01";

    /**
     * 修改rabbitmq默认序列化
     *
     * @param connectionFactory 连接工厂
     * @return rabbitmq的模板
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * 修改rabbitmq默认反序列化
     *
     * @param objectMapper 对象映射
     * @return rabbitmq的模板
     */
    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    /**
     *
     * @return TopicExchange
     */
    @Bean
    public TopicExchange ggTopicExchange() {
        return new TopicExchange(GG_TOPIC);
    }

    /**
     *
     * @return Queue
     */
    @Bean
    public Queue ddGgQueue() {
        return new Queue(GG_QUEUE);
    }

    /**
     *
     * @return Binding
     */
    @Bean
    public Binding ggBindingDd() {
        return BindingBuilder.bind(ddGgQueue()).to(ggTopicExchange()).with("gg");
    }
  1. 配置Kettle的MQTT组件

原理说明

1.看完上述实现过程,小伙伴可能会觉得非常迷惑。我们分成两个部分来详细介绍:

  • 在上述步骤2中,有的小伙伴可能会问,为什么没有设置交换器只能使用amq.topic 这个RabbbitMQ默认的交换器?

    MQTT协议起始比较简单(从kettle需要填写的关键信息可见一二),只需要配置连接地址,主题名称(TOPIC),但事实上我们前文说了,rabbitMQ通过MQTT插件来进行模拟MQTT协议从而能适配到RabbitMQ的结构,而RabbitMQ中的交换器,队列,路由键等概念是在MQTT中不存在的。因此RabbitMQ对使用MQTT插件有如下限制:

    1. 所有的接入到RabbitMQ的MQTT消息都只默认的走**amq.topic** 交换器,想要接收MQTT消息的队列必须绑定该交换器。
    2. 上述队列在绑定在**amq.topic** 交换器时要想能区分普通消息队列,该队列的名称必须以:**mqtt-subscription-** 开头!是的,你没有听错,队列的名称是有限制的。
    3. MQTT生成者发送的Topic必须和上述队列和交换器绑定时的路由键名称抱持一致,上文中我使用的是gg
  • 在上述步骤3中,我们需要注意的参数有两个:
    1. 其中TopicName 对应RabbitMQ并不是队列的名称,也不是交换器的名称,而是指的是两者的绑定键名称。
    2. 其中Quality of Service 在RabbitMQ的MQTT插件说的很详细,该质量参数不能大于1,大于1 则降级为1

总结

综上在Kettle使用MQTT协议接入到RabbitMQ时我们需要注意的有RabbitMQ消费者队列的名称,路由键。

赞(1)
未经允许不得转载:潜龙云 » 基于RabbitMQ在Kettle中使用MQTT协议进行数据传输

评论 1

评论前必须登录!