ÀÖÓãµç¾º

½ÌÓýÐÐÒµA¹ÉIPOµÚÒ»¹É£¨¹ÉƱ´úÂë 003032£©

È«¹ú×Éѯ/ͶËßÈÈÏߣº400-618-4000

PahoÊÇʲô£¿PahoʵÏÖÏûÏ¢ÊÕ·¢µÄ²Ù×÷Á÷³Ì

¸üÐÂʱ¼ä:2023Äê10ÔÂ13ÈÕ15ʱ28·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

Paho Java¿Í»§¶ËÊÇÓÃJava±àдµÄMQTT¿Í»§¶Ë¿â£¬ÓÃÓÚ¿ª·¢ÔÚJVM»òÆäËûJava¼æÈÝÆ½Ì¨(ÀýÈçAndroid)ÉÏÔËÐеÄÓ¦ÓóÌÐò¡£

Paho²»½ö¿ÉÒÔ¶Ô½ÓEMQ X Broker£¬»¹¿ÉÒÔ¶Ô½ÓÂú×ã·ûºÏMQTTЭÒ鹿·¶µÄÏûÏ¢´úÀí·þÎñ¶Ë£¬Ä¿Ç°Paho¿ÉÒÔÖ§³Öµ½MQTT5.0ÒÔϰ汾¡£MQTT3.3.1ЭÒé°æ±¾»ù±¾ÄÜÂú×ã°Ù·ÖÖ®¾ÅÊ®¶àµÄ½ÓÈ볡¾°¡£

Paho Java¿Í»§¶ËÌṩÁËÁ½¸öAPI£º

1£ºMqttAsyncClientÌṩÁËÒ»¸öÍêÈ«Òì²½µÄAPI£¬ÆäÖлµÄÍê³ÉÊÇͨ¹ý×¢²áµÄ»Øµ÷֪ͨµÄ¡£

2£ºMqttClientÊÇMqttAsyncClientÖÜΧµÄͬ²½°ü×°Æ÷£¬ÔÚÕâÀ¹¦ÄÜËÆºõÓëÓ¦ÓóÌÐòͬ²½¡£

PahoʵÏÖÏûÏ¢ÊÕ·¢

(1)ÕÒµ½ÏîÄ¿£ºemq-demo£¬Ìí¼Ó×ø±êÒÀÀµ

<dependency>
     <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.2</version>
</dependency>

(2)±àд¿Í»§¶Ë·â×°ÀàµÄ´úÂ룺com.itheima.mqtt.client.EmqClient

/**
* Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
*/
@Component
public class EmqClient {

    private Logger log = LoggerFactory.getLogger(EmqClient.class);
    
  private IMqttClient mqttClient;
  
  @Autowired
  private MqttProperties mqttProperties;
  
  @Autowired
  private MqttCallback mqttCallback;
  
  @PostConstruct
  private void init(){
      //MqttClientPersistenceÊÇ½Ó¿Ú ÊµÏÖÀàÓУºMqttDefaultFilePersistence£»MemoryPersistence
      MqttClientPersistence memoryPersistence = new MemoryPersistence();
      try {
          mqttClient = new
MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence);
      } catch (MqttException e) {
           log.error("MqttClient³õʼ»¯Ê§°Ü,brokerurl={},clientId=
{}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
      }
}

   /**
    * Á¬½Óbroker
    * @param username
    * @param password
    */
public void connect(String username,String password){
    //´´½¨MQTTÁ¬½ÓÑ¡Ïî¶ÔÏó--¿ÉÅäÖÃmqttÁ¬½ÓÏà¹ØÑ¡Ïî
    MqttConnectOptions connectOptions = new MqttConnectOptions();
    
    //×Ô¶¯ÖØÁ¬
    connectOptions.setAutomaticReconnect(true);
    /**
     * ÉèÖÃΪtrueºóÒâζ×Å£º¿Í»§¶Ë¶Ï¿ªÁ¬½Óºóemq²»±£Áô»á»°±£Áô»á»°£¬·ñÔò»á²úÉú¶©ÔĹ²Ïí¶ÓÁеĴæ»î
¿Í»§¶ËÊÕ²»µ½ÏûÏ¢µÄÇé¿ö
     * ÒòΪ¶Ï¿ªµÄÁ¬½Ó»¹±»±£ÁôµÄ»°£¬emq»á½«¶ÓÁÐÖеÄÏûÏ¢¸ºÔص½¶Ï¿ªµ«»¹±£ÁôµÄ¿Í»§¶Ë£¬µ¼Ö´æ»îµÄ¿Í»§
¶ËÊÕ²»µ½ÏûÏ¢
     * ½â¾ö¸ÃÎÊÌâÓÐÁ½ÖÖ·½°¸:1.Á¬½Ó¶Ï¿ªºó²»Òª±£³Ö£»2.±£Ö¤Ã¿¸ö¿Í»§¶ËÓй̶¨µÄclientId
     */
    connectOptions.setCleanSession(true);
    connectOptions.setUserName(username);
    connectOptions.setPassword(password.toCharArray());
    //ÉèÖÃmqttÏûÏ¢»Øµ÷
    mqttClient.setCallback(mqttCallback);
    //Á¬½Óbroker
    try {
         mqttClient.connect(connectOptions);
    } catch (MqttException e) {
        log.error("Á¬½Ómqtt brokerʧ°Ü,ʧ°ÜÔ­Òò:{}",e.getMessage());
    }
    
  }

   /**
    * ·¢²¼
    * @param topic
    * @param msg
    */
   public void publish(String topic, String msg, QosEnum qos, boolean retain){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        mqttMessage.setPayload(msg.getBytes());
        if(mqttClient.isConnected()){
                try {
                    mqttClient.publish(topic,mqttMessage);
                } catch (MqttException e) {
                    log.error("mqttÏûÏ¢·¢²¼Ê§°Ü,topic={},msg={},qos={},retain={},errormsg=
{}",topic,msg,qos,retain,e.getMessage());
                }
        }
   }
   
   /**
    * ¶©ÔÄ
    * @param topicFilter
    * @return
    */
   public void subscribe(String topicFilter,QosEnum qos){
      try {
          mqttClient.subscribe(topicFilter,qos.value());
      } catch (MqttException e) {
      
          log.error("¶©ÔÄʧ°Ü,topicfilter={},qos={},errormsg=
{}",topicFilter,qos,e.getMessage());
        }
    }
    
    /**
     * ¶Ï¿ªÁ¬½Ó
     */
    @PreDestroy
    public void disConnect(){
        try {
             mqttClient.disconnect();
        } catch (MqttException e) {
             log.error("¶Ï¿ªÁ¬½Ó³öÏÖÒì³£,errormsg={}",e.getMessage());
        }
    }
}

ÐèÒªÔÚapplication.ymlÖÐÌí¼Ó×Ô¶¨ÒåµÄÅäÖãº

mqtt:
broker-url: tcp://192.168.200.129:1883
client-id: demo-client
username: user
password: 123456

ͬʱÐèÒª´´½¨ÊôÐÔÅäÖÃÀàÀ´¼ÓÔØ¸ÃÅäÖÃÊý¾Ý£¬´´½¨£ºcom.itheima.mqtt.properties.MqttProperties

/**
 * Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
 */
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;


    public String getBrokerUrl() {
        return brokerUrl;
    }

    public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "MqttProperties{" +
                "brokerUrl='" + brokerUrl + '\'' +
                ", clientId='" + clientId + '\'' +
                ", username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
     }
}

»¹Ðè´´½¨QoS·þÎñÖ®Ààö¾Ù£ºcom.itheima.mqtt.enums.QosEnum

/**
 * Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
 */
public enum QosEnum {

   QoS0(0),QoS1(1),QoS2(2);

   QosEnum(int qos) {

       this.value = qos;
   }

   private final int value;


   public int value(){
   return this.value;
 }
}

(3)ÔÚÁ¬½Ó½ÓÊÕµ½ÏûÏ¢Ö®ºó£¬ÎÒÃÇÐèÒª½«ÏûÏ¢´«ÈëÏûÏ¢»Øµ÷£ºcom.itheima.mqtt.client.MessageCallback

/**
 * Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
 */
@Component
public class MessageCallback implements MqttCallback {
    private Logger log = LoggerFactory.getLogger(MessageCallback.class);
    @Override
    public void connectionLost(Throwable cause) {
        //¶ªÊ§¶Ô·þÎñ¶ËµÄÁ¬½Óºó´¥·¢¸Ã·½·¨»Øµ÷£¬´Ë´¦¿ÉÒÔ×öÒ»Ð©ÌØÊâ´¦Àí£¬±ÈÈçÖØÁ¬
        log.info("¶ªÊ§Á˶ÔbrokerµÄÁ¬½Ó");
    }
    /**
    * ¶©Ôĵ½ÏûÏ¢ºóµÄ»Øµ÷
    * ¸Ã·½·¨ÓÉmqtt¿Í»§¶Ëͬ²½µ÷ÓÃ,ÔÚ´Ë·½·¨Î´ÕýÈ··µ»ØÖ®Ç°£¬²»»á·¢ËÍackÈ·ÈÏÏûÏ¢µ½broker
    * Ò»µ©¸Ã·½·¨ÏòÍâÅ׳öÁËÒì³£¿Í»§¶Ë½«Òì³£¹Ø±Õ£¬µ±ÔÙ´ÎÁ¬½Óʱ£»ËùÓÐQoS1,QoS2ÇÒ¿Í»§¶Ëδ½øÐÐackÈ·ÈϵÄ
    ÏûÏ¢¶¼½«ÓÉ
    * broker·þÎñÆ÷Ôٴη¢Ë͵½¿Í»§¶Ë
    * @param topic
    * @param message
    * @throws Exception
    */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("¶©Ôĵ½ÁËÏûÏ¢;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload()));
    }
    /**
     * ÏûÏ¢·¢²¼Íê³ÉÇÒÊÕµ½ackÈ·ÈϺóµÄ»Øµ÷
     * QoS0£ºÏûÏ¢±»ÍøÂç·¢³öºó´¥·¢Ò»´Î
     * QoS1£ºµ±ÊÕµ½brokerµÄPUBACKÏûÏ¢ºó´¥·¢
     * QoS2£ºµ±ÊÕµ½broerµÄPUBCOMPÏûÏ¢ºó´¥·¢
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("ÏûÏ¢·¢ËÍÍê³É,messageId={},topics={}", messageId, topics);
    }
}

(4)±àдÏûÏ¢·¢²¼ºÍ¶©ÔĵIJâÊÔ£¬ÔÚÆô¶¯ÀàÖÐÌí¼ÓÈçÏ´úÂë¡£

@Autowired
private EmqClient emqClient;

@Autowired
private MqttProperties mqttProperties;

@PostConstruct
public void init(){
     emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword());
     //¶©ÔÄijһÖ÷Ìâ
     emqClient.subscribe("testtopic/#", QosEnum.QoS2);
     //¿ªÆôÒ»¸öеÄÏß³ÌÏò¸ÃÖ÷Ìâ·¢ËÍÏûÏ¢
     new Thread(()->{
         while (true){
          emqClient.publish("testtopic/123","mqtt msg:"+
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false);
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }).start();
}

(5)²âÊÔ£ºÔÚDashboardÖпªÆôʹÓÃusername½øÐÐÈÏÖ¤µÄ×é¼þ£¬ÆäËû×é¼þÍ£Ö¹¼´¿É£¬È»ºóÆô¶¯ÏîÄ¿£¬²é¿´¿ØÖÆÌ¨Êä³ö¼´¿É¡£

0 ·ÖÏíµ½£º
ºÍÎÒÃÇÔÚÏß½»Ì¸£¡
¡¾ÍøÕ¾µØÍ¼¡¿¡¾sitemap¡¿