设备在线/离线状态的缓存方案——实践类

设备在线/离线状态缓存

很多场景中,我们都需要查询设备是否在线,但POP API的访问频次受限,需要我们自己系统缓存设备状态


技术方案

  1. 配置规则引擎监听设备状态变更消息,流转到函数计算FC
  2. 由于消息乱序,在函数计算对比表格存储/Redis中设备状态和当前状态的lastTime,存储最新的数据
  3. 业务系统从表格存储/Redis中快速查询设备当前在线/离线状态

image.png


1.设备在线/离线状态变更消息

当设备连接到IoT物联网平台,设备离线,在线状态变更会生成特定topic的消息,我们服务端可以通过订阅这个topic获得设备状态变更信息。
**

设备的在线/离线状态流转的Topic格式:

/as/mqtt/status/{productKey}/{deviceName}


payload数据格式:

{
    "status":"online|offline",
    "productKey":"pk13543",
    "deviceName":"dn1234",
    "time":"2018-08-31 15:32:28.205",
    "utcTime":"2018-08-31T07:32:28.205Z",
    "lastTime":"2018-08-31 15:32:28.195",
    "utcLastTime":"2018-08-31T07:32:28.195Z",
    "clientIp":"123.123.123.123"
}

参数说明:

2.通过规则引擎流转设备状态


2.1 配置SQL

SELECT productKey,deviceName,
timestamp() as timestamp ,
status,
time as currentTime ,lastTime,clientIp
FROM "/as/mqtt/status/a1Xr8ofpSst/+" WHERE

这样我们就可以从消息体获取到设备的status,currentTime和lastTime了。

规则引擎数据处理操作界面

image.png

2.2 配置数据流转函数计算FC

规则引擎数据流转操作界面

image.png


2.3 函数计算FC实现

nodejs实现代码参考:
由于消息乱序,需要对比表格存储/Redis中已存储的设备状态和当前状态的lastTime,找出最新状态存储到缓存中。

var TableStore = require('tablestore');
var options = {
    accessKeyId: '你的accessKeyId',
    accessKeySecret: '你的accessKeySecret',
}
var otsClient = new TableStore.Client({
    accessKeyId: options.accessKeyId,
    secretAccessKey: options.accessKeySecret,
    endpoint: '你的endpoint',
    instancename: '你的instancename',
    maxRetries: 20 //默认20次重试,可以省略这个参数。
});
var response = {
    isBase64Encoded: false,
    statusCode: 200
};
module.exports.handler = function(event, context, callback) {
    var eventJson = JSON.parse(event.toString());
    var deviceId = eventJson.deviceName;
    var productKey = eventJson.productKey;
    var lastTime = new Date(eventJson.lastTime);
    var params = {
        tableName: "device_status_table",
        primaryKey: [{ 'deviceId': deviceId },{'productKey': productKey}],
        maxVersions: 1
    };
    try {
        otsClient.getRow(params, function(err, data) {
            if (err) {
                response.body = { msg: 'error', code: 404 };
                callback(null, response);
                return;
            }
            //有数据,拿出来比较lastTime
            if (data.row.primaryKey) {
                var attributes = data.row.attributes;
                var dbTime = '';
                attributes.forEach(function(item) {
                    if (item.columnName == 'lastTime') {
                        dbTime = new Date(item.columnValue);
                    }
                })
                //转换成毫秒进行比较
                if (lastTime.getTime() < dbTime.getTime()) {
                    return;
                }
            }
            var iot_data = {
                tableName: "device_status_table",
                condition: new TableStore.Condition(TableStore.RowExistenceExpectation.IGNORE, null),
                primaryKey: [{ "deviceId": deviceId },{'productKey': productKey}],
                attributeColumns: [
                    { 'lastTime': eventJson.lastTime },
                    { 'clientIp': eventJson.clientIp },
                    { 'status': eventJson.status }
                ],
                returnContent: { returnType: TableStore.ReturnType.Primarykey }
            }
            otsClient.putRow(iot_data, function(err, data) {
                if (err) {
                    response.body = { msg: 'error', code: 404 };
                    callback(null, response);
                    return;
                }
                response.body = { msg: 'ok', code: 200 };
                callback(null, response);
                return;
            });
        });
    } catch (err) {
        response.body = { msg: 'error', code: 404 };
        callback(null, response);
    }
};


2.4 表格存储OTS

在device_status_table表中,查看 设备上下线情况

image.png

【往期回顾】

1、39张IoT传感器工作原理GIF图汇总

2、IoT 设备发送 MQTT 请求的曲折经历

3、20元体 Arduino 环境监测仪开发

4、智能手持测温枪开发实践

5、JMeter 压测 MQTT 服务性能实战


物联网平台产品介绍详情:

阿里云物联网平台客户交流群


请使用浏览器的分享功能分享到微信等