跳至主要内容

MQTT QoS

網路不穩?QoS 網路服務品質


回顧 上上次的MQTT

通訊協定 MQTT之淺入


物聯網 Internet Of Thing


當訊息是很重要的不可遺失的呢?

例如前途的瓦斯爐訊號


遺失1 Pub -> Broker


遺失2 Broker -> Sub


額外補充 遺失3 Broker -> Storage(AMQP協議中才有)

補充資料AMQP Transaction機制


解決方案 QoS

Quality of Service(網路服務品質)

在MQTT中是用來設置訊息傳輸的規則


QoS 0

最多一次(不管有沒有送到就是送一次)



QoS 1

最少一次(類似郵件掛號,必須被確認packetId封包ID) 過一段時間都沒收到確認會補送訊息



QoS 2

確認一次(避免重複丟出訊息)



實作環節


新增帶有QoS等級的發送訊息方法

import express from 'express';
import mqtt, { QoS } from 'mqtt';

const app = express();

const mqttClient = mqtt.connect('mqtt://localhost');

app.get('/publish', (req, res) => {
const message = req.query.message as string;
const qosLevel = parseInt(req.query.qos as string);

if (![0, 1, 2].includes(qosLevel)) {
res.status(400).send('Invalid QoS level');
return;
}

mqttClient.publish('topic', message, { qos: qosLevel as QoS }, (err) => {
if (err) {
console.error(`Error publishing message: ${err}`);
res.status(500).send('Error publishing message');
return;
}

console.log(`Published message "${message}" with QoS ${qosLevel}`);
res.status(200).send('Message published');
});
});

app.listen(3000, () => {
console.log('Express server started on port 3000');
});

去處理QoS 訊息

import express from 'express';
import mqtt, { QoS } from 'mqtt';

const app = express();

const mqttClient = mqtt.connect('mqtt://localhost');

mqttClient.on('connect', () => {
mqttClient.subscribe('topic', { qos: 2 }, (err) => {
if (err) {
console.error(`Error subscribing to topic: ${err}`);
} else {
console.log('Subscribed to topic with QoS 2');
}
});
});

mqttClient.on('message', (topic, message, packet) => {
console.log(`Received message "${message.toString()}" on topic "${topic}" with QoS ${packet.qos}`);
});

app.listen(3000, () => {
console.log('Express server started on port 3000');
});

參考文章