阿里云IoT初試
本文從概念到實戰(zhàn),以一個假想產(chǎn)品——”電子貨架標簽“(Electronic Shelf Label,以下簡稱ESL)為例,介紹基于阿里云IoT的物聯(lián)網(wǎng)應用開發(fā)。
數(shù)據(jù)交互流程
以云端下發(fā)命令到最終收到應答為例(虛線表示異步):

LoRaWAN:ESL所采用的通訊協(xié)議;LoRaWAN NS:LoRaWAN網(wǎng)絡(luò)的中樞大腦,控制通訊參數(shù)、實現(xiàn)QoS、節(jié)點入網(wǎng)和遷移、數(shù)據(jù)加解密等。MQTT:基于Pub/Sub范式的消息協(xié)議。它工作在 TCP/IP協(xié)議族上,是為硬件性能低下的遠程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計。Link WAN:阿里云物聯(lián)網(wǎng)絡(luò)管理平臺,可用它快速組建LoRaWAN網(wǎng)絡(luò);簡單地說,它主要扮演了LoRaWAN NS的角色;AliIoT:阿里云物聯(lián)網(wǎng)平臺,基于MQTT。處理設(shè)備層和業(yè)務(wù)層的數(shù)據(jù)交互;AMQP:消息隊列,設(shè)備異步應答返回的消息通過此消息隊列傳遞到云端。(廣義上說,AMQP是一個協(xié)議,RabbitMQ就是該協(xié)議的一個實現(xiàn))
ESL和LoRa網(wǎng)關(guān)是通過LoRa協(xié)議通信,LoRa可以看做是物理層面的信息調(diào)制協(xié)議或通訊協(xié)議,沒有TCP的概念。
注意,MQTT并不局限于LoRaWAN場景,阿里云也在平臺上將二者作了不同入口,前者對應AliIoT,后者對應Link WAN。初次接觸不免困惑(這也是阿里云一貫的作風),其實背后就是這個關(guān)系。我們可以設(shè)備直連AliIoT做IoT應用開發(fā)(參看10分鐘物聯(lián)網(wǎng)設(shè)備接入阿里云IoT平臺);如果是LoRaWAN系統(tǒng),也可以同時借助 Link WAN 做LoRaWAN的網(wǎng)絡(luò)管理。
網(wǎng)關(guān)要接入Link WAN,需要移植阿里云提供的SDK到網(wǎng)關(guān)與通信模組上,并且購買Link WAN密鑰安裝,并登錄阿里云物聯(lián)網(wǎng)絡(luò)管理平臺控制臺添加網(wǎng)關(guān)。云端開發(fā)人員只要關(guān)注AliIoT、AMQP及業(yè)務(wù)層即可。
AliIoT控制臺準備
-
公共實例-》創(chuàng)建產(chǎn)品。產(chǎn)品名稱“電子貨架標簽”;節(jié)點類型表示該產(chǎn)品下設(shè)備的類型,選擇
直連設(shè)備(LoRa有IP的概念?),然后連網(wǎng)方式選擇LoRaWAN;因為ESL設(shè)備收發(fā)的數(shù)據(jù)為未編碼的字節(jié)數(shù)組,數(shù)據(jù)格式選擇透傳/自定義,后續(xù)需要提供數(shù)據(jù)解析腳本,將上行的自定義格式的數(shù)據(jù)轉(zhuǎn)換為Alink JSON格式,將下行的Alink JSON格式數(shù)據(jù)解析為設(shè)備自定義格式,設(shè)備才能與云端進行通信。產(chǎn)品創(chuàng)建完畢獲得ProductKey。 -
管理產(chǎn)品-》功能定義,即定義所謂的
物模型。功能分為屬性、服務(wù)、事件三種類型(同定義一個類一樣,有屬性、方法、事件)。一個產(chǎn)品可以定義多個物模型,即一個產(chǎn)品下面可以有提供不同功能的多種設(shè)備。這里我們?yōu)镋SL定義——- 屬性:shelfNo,所屬貨架,數(shù)據(jù)類型text。示例A.05.02,A區(qū)5排2號貨架;
- 服務(wù):show,顯示貨品名稱和對應價格,入?yún)⒂衟roductName:text,price:float,調(diào)用方式選擇異步;
- 事件:heart,心跳,我們可以定義一些輸出參數(shù)如電池電量batteryLevel:int32,固件版本firmwareVersion:text,如此每次回報時這些信息也傳給云端。
這樣,云端就可以下發(fā)查詢電池電量和設(shè)置貨品名稱和對應價格的兩種命令,同時也可以被動接收設(shè)備返回的心跳消息。當然,物模型只是定義了接口,具體實現(xiàn)需要設(shè)備端和云端共同完成。
物模型中服務(wù)調(diào)用方式可設(shè)置
同步或者異步。同步方式:物聯(lián)網(wǎng)平臺直接使用RRPC同步方式下行推送請求,設(shè)備返回RRPC響應消息。RRPC使用詳情,請參見什么是RRPC。異步方式:物聯(lián)網(wǎng)平臺采用異步方式下行推送請求,設(shè)備采用異步方式返回結(jié)果。 -
管理產(chǎn)品-》數(shù)據(jù)解析。上面說到,設(shè)備和云端的交互數(shù)據(jù)需要中間的解析(序列化/反序列化)過程(發(fā)生在上圖第1步之后和第4步之前)。以JavaScript腳本為例:
var ALINK_EVENT_HEART_POST_METHOD = 'thing.event.heart.post'; //與云端綁定的topic相關(guān),下同。設(shè)備心跳包上報 var ALINK_EVENT_ACK_POST_METHOD = 'thing.event.ack.post'; //設(shè)備服務(wù)應答上報 var ALINK_PROP_REPORT_METHOD = 'thing.event.property.post'; //設(shè)備屬性上報 var ALINK_PROP_SET_METHOD = 'thing.service.property.set'; //云端下發(fā)屬性控制指令到設(shè)備端。 var ALINK_PROP_SET_REPLY_METHOD = 'thing.service.property.set'; //設(shè)備上報屬性設(shè)置的結(jié)果到云端。 var ALINK_SERVICE_SHOW_METHOD = 'thing.service.show'; //云端調(diào)用設(shè)備show服務(wù) /** * 將Alink協(xié)議的數(shù)據(jù)轉(zhuǎn)換為設(shè)備能識別的格式數(shù)據(jù),物聯(lián)網(wǎng)平臺給設(shè)備下發(fā)數(shù)據(jù)時調(diào)用 * 入?yún)ⅲ簀sonObj,對象,不能為空。 * 出參:rawData,byte[]數(shù)組,不能為空。 * * 示例數(shù)據(jù): * 云端下發(fā)屬性設(shè)置指令: * 傳入?yún)?shù): * {"method":"thing.service.property.set","id":"12345","version":"1.0","params":{"shelfNo":"A.05.02"}} * 注意:云端只下發(fā){"shelfNo":"A.05.02"},其余結(jié)構(gòu)是AliIoT封裝的。 */ function protocolToRawData(jsonObj) { var method = jsonObj['method']; var params = json['params']; //按照自定義協(xié)議格式拼接 rawData var rawdata = [0x5d, 0x64, 0x00]; if (method == ALINK_PROP_SET_METHOD) { //設(shè)置屬性 rawdata = rawdata.concat(textToByteArray(params['shelfNo'])); } else if (method == ALINK_SERVICE_SHOW_METHOD) { //調(diào)用服務(wù) var productName = params['productName']; var price = params['price']; rawdata = rawdata.concat(textToByteArray(productName)); rawdata = rawdata.concat(floatToByteArray(price)); } //other commands ... return rawdata; } /** * 將設(shè)備的自定義格式數(shù)據(jù)轉(zhuǎn)換為Alink協(xié)議的數(shù)據(jù),設(shè)備上報數(shù)據(jù)到物聯(lián)網(wǎng)平臺時調(diào)用。 * 入?yún)ⅲ簉awData,byte[]數(shù)組,不能為空。 * 出參:jsonObj,對象,不能為空。 * * 示例數(shù)據(jù): * 設(shè)備心跳上報: * 傳入?yún)?shù): * 0xFF1020010005 * 輸出結(jié)果: * {"method":"thing.event.heart.post","id":"12345678","params":{"batteryLevel":32,"firmwareVersion":"1.0.5"},"version":"1.0"} */ function rawDataToProtocol(rawData) { var uint8Array = new Uint8Array(rawData.length); for (var i = 0; i < bytes.length; i++) { uint8Array[i] = bytes[i] & 0xff; } var dataView = new DataView(uint8Array.buffer, 0); var jsonObj = new Object(); var params = {}; var head = uint8Array.slice(0, 2).join(); //自定義協(xié)議包頭 if (head[0] == 0xFF && head[1] == 0x10) { params['batteryLevel'] = dataView.getInt8(2); params['firmwareVersion'] = `${dataView.getInt8(3)}.${dataView.getInt8(4)}.${dataView.getInt8(5)}`; jsonObj['method'] = ALINK_EVENT_HEART_POST_METHOD; } else { //其它數(shù)據(jù)包轉(zhuǎn)換 } jsonObj['version'] = '1.0'; //ALink JSON格式,協(xié)議版本號固定字段。 jsonObj['id'] = '12345678' //ALink JSON格式,標示該次請求id值。 jsonObj['params'] = params; return jsonObj; } /** * 處理自定義Topic,本示例不涉及 */ function transformPayload(topic, rawData) { var jsonObj = {} return jsonObj; }數(shù)據(jù)解析的前提之一是設(shè)備收發(fā)的數(shù)據(jù)格式要確定好。
設(shè)備端發(fā)布/訂閱的topic(阿里云控制臺的產(chǎn)品Topic類列表中設(shè)置)和云端處理的topic不一樣,云端處理的topic是由
Alink協(xié)議定義的,和method值有關(guān)。詳見Alink協(xié)議上述腳本將業(yè)務(wù)數(shù)據(jù)和字節(jié)數(shù)組進行了轉(zhuǎn)換,若是擔心數(shù)據(jù)協(xié)議外泄[給阿里云?],這部分工作也可以放在云端,腳本文件只用來進行字節(jié)數(shù)組的轉(zhuǎn)發(fā)(這種情況下,物模型所有功能的出參入?yún)⒍贾恍枰粋€,數(shù)據(jù)格式為int32array)。
-
管理產(chǎn)品-》服務(wù)端訂閱。創(chuàng)建AMQP訂閱,AMQP會將消息推送給列表中的所有消費組,一個消費組可看做是一個消息隊列,云端作為客戶端連接某隊列得到設(shè)備上報消息。我們新建名稱為“電子貨架標簽-Q1”的消費組,得到一串自動生成的消費組ID。
云端開發(fā)
以Java/Kotlin為例,先引入SDK:
//下發(fā)命令依賴
implementation("com.aliyun:aliyun-java-sdk-core:4.5.22")
implementation("com.aliyun:aliyun-java-sdk-iot:7.27.0")
//獲取應答依賴
implementation("org.apache.qpid:qpid-jms-client:0.59.0")
implementation("commons-codec:commons-codec:1.15")
下發(fā)show命令:
@Service
class AliIoTDemo {
@Autowired
lateinit var config: AliIoTConfig
private lateinit var client: IAcsClient
@PostConstruct
fun init() {
val profile =
DefaultProfile.getProfile(config.regionId, config.accessKeyId, config.accessKeySecret)
client = DefaultAcsClient(profile)
}
/**
* loraId: 設(shè)備編號,對應AliIoT的DeviceName
*/
fun show(loraId: String) {
val gson = GsonInstance.get()
val jo = JsonObject()
jo.addProperty("productName", "康師傅方便面")
jo.addProperty("price", 3.50)
val request = InvokeThingServiceRequest().apply {
productKey = config.productKey //創(chuàng)建物聯(lián)網(wǎng)產(chǎn)品時得到ProductKey
deviceName = loraId
identifier = "show" //物模型定義的服務(wù)名稱
args = gson.toJson(jo) //{"productName": "康師傅方便面", "price": 3.50}
}
client.doAction(request)
}
}
代碼中的client.doAction是無法得到應答的,所以我們還要寫一個AMQP客戶端去異步獲得應答消息,具體參看官方示例Java SDK接入示例 - 阿里云物聯(lián)網(wǎng)平臺。
多條異步命令順序執(zhí)行
如果一個事務(wù)只要下發(fā)一條命令,那就等著拿結(jié)果就好了;但是有多條異步命令需要順序執(zhí)行的話,就稍微有點麻煩了,我們要考慮上下文的掛起和恢復、超時取消等機制。以下為簡單示例:
//保存各事務(wù)對應的等待發(fā)送的命令隊列,命令一旦發(fā)送則須從隊列中移除
//key為設(shè)備編號,二元組第一項表示事務(wù)開始時間,用于超時判斷
private val cmdSetMap = ConcurrentHashMap<String, Pair<Long, Queue<InvokeThingServiceRequest>>>()
internal fun putInvokeThingServiceRequest(deviceNo: String, requests: Queue<InvokeThingServiceRequest>) {
//同樣設(shè)備之前的命令不再執(zhí)行,移除
cmdSetMap.remove(deviceNo)
if (requests.size == 1) { //只有一條命令則直接發(fā)送
client.doAction(requests.poll())
} else {
val request = requests.poll() //先發(fā)送第一條
cmdSetMap[deviceNo] = Pair(System.currentTimeMillis(), requests) //其余的存入待發(fā)送列表
client.doAction(request)
}
}
//...
{
//應答消息抵達后,若應答OK則執(zhí)行下一條命令
val request = cmdSetMap[deviceName]!!.second.poll()
try {
client.doAction(request)
} catch (ex: Exception) {
logger.error(ex)
// 發(fā)生錯誤 通知客戶端
}
if (cmdSetMap[deviceName]!!.second.size == 0) cmdSetMap.remove(deviceName)
}
//每分鐘清理過時事務(wù)
@Scheduled(cron = "0 * * * * *")
fun removeTimeoutCmd() {
//...
}
在語言層面,不管是以前的回調(diào)地獄還是后來興起的async/await、suspend、Promise等,都能處理這種場景。本質(zhì)上,異步回調(diào)是指令尋址、變量出入棧的過程,有時還涉及到線程上下文的切換,各種語言/框架都幫我們考慮并且做了,我們只要按照既定語法編寫業(yè)務(wù)代碼即可。
為什么業(yè)務(wù)端不能直接訂閱對應的topic呢,這樣不就能直接拿到數(shù)據(jù)了嗎?AliIoT似乎也沒有提供業(yè)務(wù)層直接訂閱 AliIoT topic 的入口。不過MQTT協(xié)議是基于PUB/SUB的異步通信模式,就算業(yè)務(wù)端能直接接收到應答,也要處理應答消息轉(zhuǎn)發(fā)到對應的上下文、上下文掛起恢復等問題。

浙公網(wǎng)安備 33010602011771號