¸üÐÂʱ¼ä:2023Äê10ÔÂ13ÈÕ15ʱ28·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:
Paho Java¿Í»§¶ËÊÇÓÃJava±àдµÄMQTT¿Í»§¶Ë¿â£¬ÓÃÓÚ¿ª·¢ÔÚJVM»òÆäËûJava¼æÈÝÆ½Ì¨(ÀýÈçAndroid)ÉÏÔËÐеÄÓ¦ÓóÌÐò¡£
Paho²»½ö¿ÉÒÔ¶Ô½ÓEMQ X Broker£¬»¹¿ÉÒÔ¶Ô½ÓÂú×ã·ûºÏMQTTÐÒ鹿·¶µÄÏûÏ¢´úÀí·þÎñ¶Ë£¬Ä¿Ç°Paho¿ÉÒÔÖ§³Öµ½MQTT5.0ÒÔϰ汾¡£MQTT3.3.1ÐÒé°æ±¾»ù±¾ÄÜÂú×ã°Ù·ÖÖ®¾ÅÊ®¶àµÄ½ÓÈ볡¾°¡£
Paho Java¿Í»§¶ËÌṩÁËÁ½¸öAPI£º
1£ºMqttAsyncClientÌṩÁËÒ»¸öÍêÈ«Òì²½µÄAPI£¬ÆäÖлµÄÍê³ÉÊÇͨ¹ý×¢²áµÄ»Øµ÷֪ͨµÄ¡£
2£ºMqttClientÊÇMqttAsyncClientÖÜΧµÄͬ²½°ü×°Æ÷£¬ÔÚÕâÀ¹¦ÄÜËÆºõÓëÓ¦ÓóÌÐòͬ²½¡£
(1)ÕÒµ½ÏîÄ¿£ºemq-demo£¬Ìí¼Ó×ø±êÒÀÀµ
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
(2)±àд¿Í»§¶Ë·â×°ÀàµÄ´úÂ룺com.itheima.mqtt.client.EmqClient
/**
* Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
*/
@Component
public class EmqClient {
private Logger log = LoggerFactory.getLogger(EmqClient.class);
private IMqttClient mqttClient;
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttCallback mqttCallback;
@PostConstruct
private void init(){
//MqttClientPersistenceÊÇ½Ó¿Ú ÊµÏÖÀàÓУºMqttDefaultFilePersistence£»MemoryPersistence
MqttClientPersistence memoryPersistence = new MemoryPersistence();
try {
mqttClient = new
MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),memoryPersistence);
} catch (MqttException e) {
log.error("MqttClient³õʼ»¯Ê§°Ü,brokerurl={},clientId=
{}",mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
}
}
/**
* Á¬½Óbroker
* @param username
* @param password
*/
public void connect(String username,String password){
//´´½¨MQTTÁ¬½ÓÑ¡Ïî¶ÔÏó--¿ÉÅäÖÃmqttÁ¬½ÓÏà¹ØÑ¡Ïî
MqttConnectOptions connectOptions = new MqttConnectOptions();
//×Ô¶¯ÖØÁ¬
connectOptions.setAutomaticReconnect(true);
/**
* ÉèÖÃΪtrueºóÒâζ×Å£º¿Í»§¶Ë¶Ï¿ªÁ¬½Óºóemq²»±£Áô»á»°±£Áô»á»°£¬·ñÔò»á²úÉú¶©ÔĹ²Ïí¶ÓÁеĴæ»î
¿Í»§¶ËÊÕ²»µ½ÏûÏ¢µÄÇé¿ö
* ÒòΪ¶Ï¿ªµÄÁ¬½Ó»¹±»±£ÁôµÄ»°£¬emq»á½«¶ÓÁÐÖеÄÏûÏ¢¸ºÔص½¶Ï¿ªµ«»¹±£ÁôµÄ¿Í»§¶Ë£¬µ¼Ö´æ»îµÄ¿Í»§
¶ËÊÕ²»µ½ÏûÏ¢
* ½â¾ö¸ÃÎÊÌâÓÐÁ½ÖÖ·½°¸:1.Á¬½Ó¶Ï¿ªºó²»Òª±£³Ö£»2.±£Ö¤Ã¿¸ö¿Í»§¶ËÓй̶¨µÄclientId
*/
connectOptions.setCleanSession(true);
connectOptions.setUserName(username);
connectOptions.setPassword(password.toCharArray());
//ÉèÖÃmqttÏûÏ¢»Øµ÷
mqttClient.setCallback(mqttCallback);
//Á¬½Óbroker
try {
mqttClient.connect(connectOptions);
} catch (MqttException e) {
log.error("Á¬½Ómqtt brokerʧ°Ü,ʧ°ÜÔÒò:{}",e.getMessage());
}
}
/**
* ·¢²¼
* @param topic
* @param msg
*/
public void publish(String topic, String msg, QosEnum qos, boolean retain){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos.value());
mqttMessage.setRetained(retain);
mqttMessage.setPayload(msg.getBytes());
if(mqttClient.isConnected()){
try {
mqttClient.publish(topic,mqttMessage);
} catch (MqttException e) {
log.error("mqttÏûÏ¢·¢²¼Ê§°Ü,topic={},msg={},qos={},retain={},errormsg=
{}",topic,msg,qos,retain,e.getMessage());
}
}
}
/**
* ¶©ÔÄ
* @param topicFilter
* @return
*/
public void subscribe(String topicFilter,QosEnum qos){
try {
mqttClient.subscribe(topicFilter,qos.value());
} catch (MqttException e) {
log.error("¶©ÔÄʧ°Ü,topicfilter={},qos={},errormsg=
{}",topicFilter,qos,e.getMessage());
}
}
/**
* ¶Ï¿ªÁ¬½Ó
*/
@PreDestroy
public void disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
log.error("¶Ï¿ªÁ¬½Ó³öÏÖÒì³£,errormsg={}",e.getMessage());
}
}
}
ÐèÒªÔÚapplication.ymlÖÐÌí¼Ó×Ô¶¨ÒåµÄÅäÖãº
mqtt: broker-url: tcp://192.168.200.129:1883 client-id: demo-client username: user password: 123456
ͬʱÐèÒª´´½¨ÊôÐÔÅäÖÃÀàÀ´¼ÓÔØ¸ÃÅäÖÃÊý¾Ý£¬´´½¨£ºcom.itheima.mqtt.properties.MqttProperties
/**
* Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
*/
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String brokerUrl;
private String clientId;
private String username;
private String password;
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "MqttProperties{" +
"brokerUrl='" + brokerUrl + '\'' +
", clientId='" + clientId + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
»¹Ðè´´½¨QoS·þÎñÖ®Ààö¾Ù£ºcom.itheima.mqtt.enums.QosEnum
/**
* Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
*/
public enum QosEnum {
QoS0(0),QoS1(1),QoS2(2);
QosEnum(int qos) {
this.value = qos;
}
private final int value;
public int value(){
return this.value;
}
}
(3)ÔÚÁ¬½Ó½ÓÊÕµ½ÏûÏ¢Ö®ºó£¬ÎÒÃÇÐèÒª½«ÏûÏ¢´«ÈëÏûÏ¢»Øµ÷£ºcom.itheima.mqtt.client.MessageCallback
/**
* Created by ÀÖÓã²¥¿Í*ºÚÂí³ÌÐòÔ±.
*/
@Component
public class MessageCallback implements MqttCallback {
private Logger log = LoggerFactory.getLogger(MessageCallback.class);
@Override
public void connectionLost(Throwable cause) {
//¶ªÊ§¶Ô·þÎñ¶ËµÄÁ¬½Óºó´¥·¢¸Ã·½·¨»Øµ÷£¬´Ë´¦¿ÉÒÔ×öÒ»Ð©ÌØÊâ´¦Àí£¬±ÈÈçÖØÁ¬
log.info("¶ªÊ§Á˶ÔbrokerµÄÁ¬½Ó");
}
/**
* ¶©Ôĵ½ÏûÏ¢ºóµÄ»Øµ÷
* ¸Ã·½·¨ÓÉmqtt¿Í»§¶Ëͬ²½µ÷ÓÃ,ÔÚ´Ë·½·¨Î´ÕýÈ··µ»ØÖ®Ç°£¬²»»á·¢ËÍackÈ·ÈÏÏûÏ¢µ½broker
* Ò»µ©¸Ã·½·¨ÏòÍâÅ׳öÁËÒì³£¿Í»§¶Ë½«Òì³£¹Ø±Õ£¬µ±ÔÙ´ÎÁ¬½Óʱ£»ËùÓÐQoS1,QoS2ÇÒ¿Í»§¶Ëδ½øÐÐackÈ·ÈϵÄ
ÏûÏ¢¶¼½«ÓÉ
* broker·þÎñÆ÷Ôٴη¢Ë͵½¿Í»§¶Ë
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("¶©Ôĵ½ÁËÏûÏ¢;topic={},messageid={},qos={},msg={}", topic, message.getId(), message.getQos(), new String(message.getPayload()));
}
/**
* ÏûÏ¢·¢²¼Íê³ÉÇÒÊÕµ½ackÈ·ÈϺóµÄ»Øµ÷
* QoS0£ºÏûÏ¢±»ÍøÂç·¢³öºó´¥·¢Ò»´Î
* QoS1£ºµ±ÊÕµ½brokerµÄPUBACKÏûÏ¢ºó´¥·¢
* QoS2£ºµ±ÊÕµ½broerµÄPUBCOMPÏûÏ¢ºó´¥·¢
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("ÏûÏ¢·¢ËÍÍê³É,messageId={},topics={}", messageId, topics);
}
}
(4)±àдÏûÏ¢·¢²¼ºÍ¶©ÔĵIJâÊÔ£¬ÔÚÆô¶¯ÀàÖÐÌí¼ÓÈçÏ´úÂë¡£
@Autowired
private EmqClient emqClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init(){
emqClient.connect(mqttProperties.getUsername(),mqttProperties.getPassword());
//¶©ÔÄijһÖ÷Ìâ
emqClient.subscribe("testtopic/#", QosEnum.QoS2);
//¿ªÆôÒ»¸öеÄÏß³ÌÏò¸ÃÖ÷Ìâ·¢ËÍÏûÏ¢
new Thread(()->{
while (true){
emqClient.publish("testtopic/123","mqtt msg:"+
LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME),QosEnum.QoS2,false);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
(5)²âÊÔ£ºÔÚDashboardÖпªÆôʹÓÃusername½øÐÐÈÏÖ¤µÄ×é¼þ£¬ÆäËû×é¼þÍ£Ö¹¼´¿É£¬È»ºóÆô¶¯ÏîÄ¿£¬²é¿´¿ØÖÆÌ¨Êä³ö¼´¿É¡£
±±¾©Ð£Çø