SpringBoot集成MQTT服务 springboot集成netty

SpringBoot集成MQTT服务 springboot集成netty

精选文章moguli202024-12-18 13:38:2621A+A-

最近实现一个物联网小型应用,需要实现MQTT服务,在小型系统中尤其是通用产品中不宜依赖过多的独立服务,这样使得产品的分发与部署会变得复杂,这里采用内部集成MQTT服务方案,只需部署一个应用就可以实现应用的所有功能。

1、依赖包引入

在SpringBoot项目中集成MQTT服务,这里选择的是基于ActiveMQ提供的MQTT服务,ActiveMQ可以独立部署也可以嵌入到自有应用中,在SpringBoot中集成ActiveMQ比较容易,首先需要引入要用到的jar依赖包:

<!-- ActiveMQ 不推荐直接引入ActiveMQ all,会引入不必要的依赖 -->
<dependency>
  <!-- spring boot连接ActiveMQ的集成依赖 -->
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
   <!-- spring boot 集成ActiveMQ 核心依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-spring</artifactId>
</dependency>
<dependency>
   <!-- activemq stomp协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
   <!-- activemq amqp协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-amqp</artifactId>
</dependency>
<dependency>
   <!-- activemq mqtt协议依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-mqtt</artifactId>
</dependency>
<dependency>
   <!-- activemq kahadb持久化依赖 -->
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-kahadb-store</artifactId>
</dependency>
<!-- end ActiveMQ -->

2、ActiveMQ服务配置

依赖引入完成以后,需要建立ActiveMQ的配置文件,这里可以直接在原ActiveMQ配置文件基础上进行修改,由于是嵌入式的,所以就不需要原独立部署的web管理界面了,配置文件放在SpringBoot项目中的resources目录下,名为activemq.xml,配置内容如下:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="activemq-data">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="true">
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy limit="1000"/>
                        </pendingMessageLimitStrategy>
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>
        <persistenceAdapter>
            <kahaDB directory="activemq-data/kahadb"/><!-- kahabd持久化目录 -->
        </persistenceAdapter>
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <transportConnectors>
            <!-- 如果需要实现其它协议,需要引入对应的协议依赖jar包 -->
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
    </broker>
</beans>

3、在SpringBoot应用加入ActiveMQ服务

在SpringBoot加入ActiveMQ工厂服务实例,以下代码加在SpringBoot启动类中。

/**
 * ActiveMQ服务初始化
 * @return ActiveMQ Broker工厂实例
 * @throws Exception
 */
 @Bean
 public BrokerFactoryBean brokerService() throws Exception {
      BrokerFactoryBean brokerFactoryBean = new BrokerFactoryBean();
      ClassPathResource res = new ClassPathResource("activemq.xml");//加载配置信息
      brokerFactoryBean.setConfig(res);//加载配置信息
      brokerFactoryBean.setStart(true);//启动服务
      return brokerFactoryBean;
 }

4、连接本地ActiveMQ服务

经过以上配置以后,就可以随应一起启动ActiveMQ服务了,这里实现了AMQP、STOMP、MQTT三种协议,由于是在本地连接同一JVM虚拟机内的服务,就不需要像连接外部ActiveMQ服务那样指定IP和端口,这里只需要指定其服务名就可以了,服务名是在ActiveMQ中的brokerName指定的,在application.properties的文加入以下配置:

#这里指向的连接是本应用内的localhost服务,这里采用vm连接
spring.activemq.broker-url=vm://localhost
#开启订阅模式,MQTT协议是以订阅模式存在的
spring.jms.pub-sub-domain=true

5、在应用中使用MQTT服务,进行业务研发

经过以上配置以后,就可以在应用中像使用普通JMS一样来与MQTT设备进行数据交互,MQTT订阅是可以使用通配符进行主题订阅的,通常来讲某一个设备上传数据会以编号+主题的方式实现,如topic:/run/0001/data,其中topic为固定协议头,run为数据分类,0001为设备的编号,data为具体的功能块数据,如果要订阅这一数据,显然不可能分每一个设备单独订阅一个主题,这个时候就可以订阅topic:/run/*/data这个主题,这样所有设备的这个主题都能订阅到。在java中分隔符"/"要用"."代替。

 //-----------订阅run下的data主题--------------------
/**
 * 订阅.ru.*.data主题
 * @param message 消息内容
 */
 @JmsListener(destination = ".ru.*.data")
 public void onRun(ActiveMQMessage message) throws Exception{
    String body = null;//消息内容
    String hid = null;//设备编号
    String type = message.getJMSDestination().toString();//主题类型,形如:topic:/run/0001/data
    //--------解析消息内容--------------------------
    if(null!=message.getContent()){
       body = new String(message.getContent().getData(),"UTF-8");//byte[]类型的消息内容
    }else if(message instanceof ActiveMQTextMessage) {
       ActiveMQTextMessage sg = (ActiveMQTextMessage) message;//String类型的消息内容
       body=sg.getText();
    }
    //-------从主题类型中解析出设备编号---------------
    hid = StringUtils.splitByWholeSeparator(type,".")[2];//设备编号
    //------业务处理代码----------------------------
 }

6、向设备端发布主题

这里采用JSM的方式向设备端发布对应的主题及内容,具体代码如下:

private final JmsTemplate jmsTemplate;//JMS发送实例

/**
 * 发布主题
 * @param topic 主题,形如 .run.0001.data,这里的.相当于/
 * @param body 消息内容
 */
 public void sendTopic(String topic,String body){
     jmsTemplate.convertAndSend(topic,body.getBytes());//这里以byte[]的内容发送
 }

点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2