【代码新增】IoT:实现 device 下行服务调用的逻辑
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
### 请求 /iot/device/simulation-downstream 接口 => 成功
|
||||
POST {{baseUrl}}/iot/device/simulation-downstream
|
||||
Content-Type: application/json
|
||||
tenant-id: {{adminTenentId}}
|
||||
Authorization: Bearer {{token}}
|
||||
|
||||
{
|
||||
"id": 25,
|
||||
"type": "service",
|
||||
"identifier": "temperature",
|
||||
"data": {
|
||||
"xx": "yy"
|
||||
}
|
||||
}
|
@@ -6,6 +6,7 @@ import cn.iocoder.yudao.framework.common.pojo.PageParam;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
|
||||
import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationDownstreamReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationUpstreamReqVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
@@ -171,7 +172,7 @@ public class IotDeviceController {
|
||||
@Operation(summary = "模拟设备下行")
|
||||
@PreAuthorize("@ss.hasPermission('iot:device:simulation')")
|
||||
public CommonResult<Boolean> simulationDownstreamDevice(
|
||||
@Valid @RequestBody IotDeviceSimulationUpstreamReqVO downstreamReqVO) {
|
||||
@Valid @RequestBody IotDeviceSimulationDownstreamReqVO downstreamReqVO) {
|
||||
deviceDownstreamService.simulationDeviceDownstream(downstreamReqVO);
|
||||
return success(true);
|
||||
}
|
||||
|
@@ -75,6 +75,12 @@ public class IotDeviceLogDO {
|
||||
* 存储具体的消息数据内容,通常是 JSON 格式
|
||||
*/
|
||||
private String content;
|
||||
/**
|
||||
* 响应码
|
||||
*
|
||||
* 目前只有 server 下行消息给 device 设备时,才会有响应码
|
||||
*/
|
||||
private Integer code;
|
||||
|
||||
/**
|
||||
* 上报时间戳
|
||||
|
@@ -55,12 +55,16 @@ public class IotDeviceMessage {
|
||||
* 例如说:属性上报的 properties、事件上报的 params
|
||||
*/
|
||||
private Object data;
|
||||
/**
|
||||
* 响应码
|
||||
*
|
||||
* 目前只有 server 下行消息给 device 设备时,才会有响应码
|
||||
*/
|
||||
private Integer code;
|
||||
|
||||
/**
|
||||
* 上报时间
|
||||
*/
|
||||
private LocalDateTime reportTime;
|
||||
|
||||
// TODO @芋艿 code;
|
||||
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
package cn.iocoder.yudao.module.iot.service.device.control;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationUpstreamReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationDownstreamReqVO;
|
||||
import jakarta.validation.Valid;
|
||||
|
||||
/**
|
||||
@@ -17,6 +17,6 @@ public interface IotDeviceDownstreamService {
|
||||
*
|
||||
* @param downstreamReqVO 设备下行请求 VO
|
||||
*/
|
||||
void simulationDeviceDownstream(@Valid IotDeviceSimulationUpstreamReqVO downstreamReqVO);
|
||||
void simulationDeviceDownstream(@Valid IotDeviceSimulationDownstreamReqVO downstreamReqVO);
|
||||
|
||||
}
|
||||
|
@@ -1,17 +1,36 @@
|
||||
package cn.iocoder.yudao.module.iot.service.device.control;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationUpstreamReqVO;
|
||||
import cn.hutool.core.exceptions.ExceptionUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.framework.common.exception.ServiceException;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceDownstreamAbstractReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceServiceInvokeReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceSimulationDownstreamReqVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
|
||||
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
|
||||
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED;
|
||||
|
||||
/**
|
||||
* 设备下行 Service 实现类
|
||||
*
|
||||
@@ -24,9 +43,17 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
|
||||
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
@Resource
|
||||
private IotPluginInstanceService pluginInstanceService;
|
||||
|
||||
@Resource
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
@Resource
|
||||
private IotDeviceProducer deviceProducer;
|
||||
|
||||
@Override
|
||||
public void simulationDeviceDownstream(IotDeviceSimulationUpstreamReqVO downstreamReqVO) {
|
||||
public void simulationDeviceDownstream(IotDeviceSimulationDownstreamReqVO downstreamReqVO) {
|
||||
// 校验设备是否存在
|
||||
IotDeviceDO device = deviceService.validateDeviceExists(downstreamReqVO.getId());
|
||||
// TODO 芋艿:父设备的处理
|
||||
@@ -40,12 +67,14 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
|
||||
// 属性相关
|
||||
if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType()))
|
||||
// 属性设置
|
||||
if (Objects.equals(downstreamReqVO.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())) {
|
||||
if (Objects.equals(downstreamReqVO.getIdentifier(),
|
||||
IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())) {
|
||||
setDeviceProperty(downstreamReqVO, device, parentDevice);
|
||||
return;
|
||||
}
|
||||
// 属性设置
|
||||
if (Objects.equals(downstreamReqVO.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_GET.getIdentifier())) {
|
||||
if (Objects.equals(downstreamReqVO.getIdentifier(),
|
||||
IotDeviceMessageIdentifierEnum.PROPERTY_GET.getIdentifier())) {
|
||||
getDeviceProperty(downstreamReqVO, device, parentDevice);
|
||||
return;
|
||||
}
|
||||
@@ -53,22 +82,121 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
|
||||
// TODO 芋艿:配置下发
|
||||
}
|
||||
|
||||
private void invokeDeviceService(IotDeviceSimulationUpstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
// 校验服务是否存在
|
||||
// TODO 芋艿:这里需要校验服务是否存在
|
||||
// 调用服务
|
||||
// TODO 芋艿:这里需要调用服务
|
||||
/**
|
||||
* 调用设备服务
|
||||
*
|
||||
* @param downstreamReqVO 下行请求
|
||||
* @param device 设备
|
||||
* @param parentDevice 父设备
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void invokeDeviceService(IotDeviceSimulationDownstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
// 1. 参数校验
|
||||
if (!(downstreamReqVO.getData() instanceof Map<?, ?>)) {
|
||||
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
|
||||
}
|
||||
|
||||
// 2. 发送请求
|
||||
String url = String.format( "sys/%s/%s/thing/service/%s",
|
||||
getProductKey(device, parentDevice), getDeviceName(device, parentDevice), downstreamReqVO.getIdentifier());
|
||||
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO()
|
||||
.setParams((Map<String, Object>) downstreamReqVO.getData());
|
||||
CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
|
||||
|
||||
// 3. 发送设备消息
|
||||
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
|
||||
.setType(IotDeviceMessageTypeEnum.SERVICE.getType()).setIdentifier(reqDTO.getIdentifier())
|
||||
.setData(reqDTO.getParams());
|
||||
sendDeviceMessage(message, device, result.getCode());
|
||||
|
||||
// 4. 如果不成功,抛出异常,提示用户
|
||||
if (result.isError()) {
|
||||
log.error("[invokeDeviceService][设备({})服务调用失败,请求参数:({}),响应结果:({})]",
|
||||
device.getDeviceKey(), reqDTO, result);
|
||||
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
|
||||
}
|
||||
}
|
||||
|
||||
private void setDeviceProperty(IotDeviceSimulationUpstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
// TODO 芋艿:这里需要设置设备属性
|
||||
/**
|
||||
* 设置设备属性
|
||||
*
|
||||
* @param downstreamReqVO 下行请求
|
||||
* @param device 设备
|
||||
* @param parentDevice 父设备
|
||||
*/
|
||||
private void setDeviceProperty(IotDeviceSimulationDownstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
// TODO 1. 请求
|
||||
|
||||
// TODO 2. 发送消息
|
||||
}
|
||||
|
||||
private void getDeviceProperty(IotDeviceSimulationUpstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
private void getDeviceProperty(IotDeviceSimulationDownstreamReqVO downstreamReqVO,
|
||||
IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
// TODO 芋艿:这里需要获取设备属性
|
||||
}
|
||||
|
||||
/**
|
||||
* 请求插件
|
||||
*
|
||||
* @param url URL
|
||||
* @param reqDTO 请求参数,只需要设置子类的参数!
|
||||
* @param device 设备
|
||||
* @return 响应结果
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "HttpUrlsUsage"})
|
||||
private CommonResult<Boolean> requestPlugin(String url, IotDeviceDownstreamAbstractReqDTO reqDTO, IotDeviceDO device) {
|
||||
// 获得设备对应的插件实例
|
||||
IotPluginInstanceDO pluginInstance = pluginInstanceService.getPluginInstanceByDeviceKey(device.getDeviceKey());
|
||||
if (pluginInstance == null) {
|
||||
throw exception(DEVICE_DOWNSTREAM_FAILED, "设备找不到对应的插件实例");
|
||||
}
|
||||
|
||||
// 补充通用参数
|
||||
reqDTO.setRequestId(IdUtil.fastSimpleUUID());
|
||||
|
||||
// 执行请求
|
||||
ResponseEntity<CommonResult<Boolean>> responseEntity;
|
||||
try {
|
||||
responseEntity = restTemplate.postForEntity(
|
||||
String.format("http://%s:%d/%s", pluginInstance.getHostIp(), pluginInstance.getDownstreamPort(), url),
|
||||
reqDTO, (Class<CommonResult<Boolean>>) (Class<?>) CommonResult.class);
|
||||
Assert.isTrue(responseEntity.getStatusCode().is2xxSuccessful(),
|
||||
"HTTP 状态码不是 2xx,而是" + responseEntity.getStatusCode());
|
||||
Assert.notNull(responseEntity.getBody(), "响应结果不能为空");
|
||||
} catch (Exception ex) {
|
||||
log.error("[requestPlugin][设备({}) url({}) 下行消息失败,请求参数({})]", device.getDeviceKey(), url, reqDTO, ex);
|
||||
throw exception(DEVICE_DOWNSTREAM_FAILED, ExceptionUtil.getMessage(ex));
|
||||
}
|
||||
return responseEntity.getBody();
|
||||
}
|
||||
|
||||
private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device, Integer code) {
|
||||
// 1. 完善消息
|
||||
message.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())
|
||||
.setDeviceKey(device.getDeviceKey());
|
||||
Assert.notNull(message.getRequestId(), "requestId 不能为空");
|
||||
if (message.getReportTime() == null) {
|
||||
message.setReportTime(LocalDateTime.now());
|
||||
}
|
||||
message.setCode(code);
|
||||
|
||||
// 2. 发送消息
|
||||
try {
|
||||
deviceProducer.sendDeviceMessage(message);
|
||||
log.info("[sendDeviceMessage][message({}) 发送消息成功]", message);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendDeviceMessage][message({}) 发送消息失败]", message, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getDeviceName(IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
return parentDevice != null ? parentDevice.getDeviceName() : device.getDeviceName();
|
||||
}
|
||||
|
||||
private String getProductKey(IotDeviceDO device, IotDeviceDO parentDevice) {
|
||||
return parentDevice != null ? parentDevice.getProductKey() : device.getProductKey();
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.plugin;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -67,4 +68,12 @@ public interface IotPluginInstanceService {
|
||||
*/
|
||||
void updateDevicePluginInstanceProcessIdAsync(String deviceKey, String processId);
|
||||
|
||||
/**
|
||||
* 获得设备对应的插件实例
|
||||
*
|
||||
* @param deviceKey 设备 Key
|
||||
* @return 插件实例
|
||||
*/
|
||||
IotPluginInstanceDO getPluginInstanceByDeviceKey(String deviceKey);
|
||||
|
||||
}
|
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.plugin;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInfoDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.plugin.IotPluginInstanceDO;
|
||||
@@ -203,4 +204,13 @@ public class IotPluginInstanceServiceImpl implements IotPluginInstanceService {
|
||||
devicePluginProcessIdRedisDAO.put(deviceKey, processId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotPluginInstanceDO getPluginInstanceByDeviceKey(String deviceKey) {
|
||||
String processId = devicePluginProcessIdRedisDAO.get(deviceKey);
|
||||
if (StrUtil.isEmpty(processId)) {
|
||||
return null;
|
||||
}
|
||||
return pluginInstanceMapper.selectByProcessId(processId);
|
||||
}
|
||||
|
||||
}
|
@@ -13,6 +13,7 @@
|
||||
type NCHAR(50),
|
||||
identifier NCHAR(255),
|
||||
content NCHAR(1024),
|
||||
code INT,
|
||||
report_time TIMESTAMP
|
||||
) TAGS (
|
||||
device_key NCHAR(50)
|
||||
@@ -24,7 +25,7 @@
|
||||
</select>
|
||||
|
||||
<insert id="insert">
|
||||
INSERT INTO device_log_${deviceKey} (ts, id, product_key, device_name, type, identifier, content, report_time)
|
||||
INSERT INTO device_log_${deviceKey} (ts, id, product_key, device_name, type, identifier, content, code, report_time)
|
||||
USING device_log
|
||||
TAGS ('${deviceKey}')
|
||||
VALUES (
|
||||
@@ -35,6 +36,7 @@
|
||||
#{type},
|
||||
#{identifier},
|
||||
#{content},
|
||||
#{code},
|
||||
#{reportTime}
|
||||
)
|
||||
</insert>
|
||||
|
Reference in New Issue
Block a user