【功能完善】IoT: 集成 Vert.x 支持,重构 HTTP 插件为 Vert.x 插件

This commit is contained in:
安浩浩
2025-01-07 23:13:57 +08:00
parent cde6ebf921
commit 77b89aad77
12 changed files with 355 additions and 288 deletions

View File

@@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>yudao-module-iot-plugin</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>2.2.0-snapshot</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-http-plugin</artifactId>
<name>${project.artifactId}</name>
<version>2.2.0-snapshot</version>
<description>物联网 插件模块 - http 插件</description>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer>
<mainClass>cn.iocoder.yudao.module.iot.HttpPluginSpringbootApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.id>http-plugin</plugin.id>
<plugin.description>http-plugin-0.0.1</plugin.description>
<plugin.provider>ahh</plugin.provider>
</properties>
</project>

View File

@@ -1,5 +1,5 @@
plugin.id=http-plugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.HttpPlugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin
plugin.version=0.0.1
plugin.provider=ahh
plugin.dependencies=

View File

@@ -21,7 +21,7 @@
<properties>
<!-- 插件相关 -->
<plugin.id>http-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpPlugin</plugin.class>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>http-plugin-0.0.1</plugin.description>
@@ -30,27 +30,6 @@
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@@ -118,6 +97,29 @@
<skip>true</skip>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>3.4.1</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <shadedArtifactAttached>true</shadedArtifactAttached>-->
<!-- <shadedClassifierName>shaded</shadedClassifierName>-->
<!-- <transformers>-->
<!-- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">-->
<!-- <mainClass>cn.iocoder.yudao.module.iot.HttpPluginSpringbootApplication</mainClass>-->
<!-- </transformer>-->
<!-- </transformers>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
</plugins>
</build>
@@ -145,10 +147,15 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version> <!-- 版本可根据需要调整 -->
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<!-- Vert.x Web 模块 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT -->
<dependency>

View File

@@ -1,153 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* 基于 Netty 的 HTTP 处理器,用于接收设备上报的数据并调用主程序的 DeviceDataApi 接口进行处理。
* <p>
* 1. 请求格式JSON 格式,地址为 POST /sys/{productKey}/{deviceName}/thing/event/property/post
* 2. 返回结果JSON 格式,包含统一的 code、data、id、message、method、version 字段
*/
public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final DeviceDataApi deviceDataApi;
public HttpHandler(DeviceDataApi deviceDataApi) {
this.deviceDataApi = deviceDataApi;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
// 期望的路径格式: /sys/{productKey}/{deviceName}/thing/event/property/post
// 使用 "/" 拆分路径
String uri = request.uri();
String[] parts = uri.split("/");
/*
拆分结果示例:
parts[0] = ""
parts[1] = "sys"
parts[2] = productKey
parts[3] = deviceName
parts[4] = "thing"
parts[5] = "event"
parts[6] = "property"
parts[7] = "post"
*/
boolean isCorrectPath = parts.length == 8
&& "sys".equals(parts[1])
&& "thing".equals(parts[4])
&& "event".equals(parts[5])
&& "property".equals(parts[6])
&& "post".equals(parts[7]);
if (!isCorrectPath) {
writeResponse(ctx, HttpResponseStatus.NOT_FOUND, "Not Found");
return;
}
String productKey = parts[2];
String deviceName = parts[3];
// 从请求中获取原始数据,尝试解析请求数据为 JSON 对象
String requestBody = request.content().toString(CharsetUtil.UTF_8);
JSONObject jsonData;
try {
jsonData = JSONUtil.parseObj(requestBody);
} catch (Exception e) {
JSONObject res = createResponseJson(
400,
new JSONObject(),
null,
"请求数据不是合法的 JSON 格式: " + e.getMessage(),
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.BAD_REQUEST, res.toString());
return;
}
String id = jsonData.getStr("id", null);
try {
// 调用主程序的接口保存数据
DeviceDataCreateReqDTO createDTO = DeviceDataCreateReqDTO.builder()
.productKey(productKey)
.deviceName(deviceName)
.message(jsonData.toString())
.build();
deviceDataApi.saveDeviceData(createDTO);
// 构造成功响应内容
JSONObject successRes = createResponseJson(
200,
new JSONObject(),
id,
"success",
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.OK, successRes.toString());
} catch (Exception e) {
JSONObject errorRes = createResponseJson(
500,
new JSONObject(),
id,
"The format of result is error!",
"thing.event.property.post",
"1.0"
);
writeResponse(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, errorRes.toString());
}
}
/**
* 创建标准化的响应 JSON 对象
*
* @param code 响应状态码(业务层面的)
* @param data 返回的数据对象JSON
* @param id 请求的 id可选
* @param message 返回的提示信息
* @param method 返回的 method 标识
* @param version 返回的版本号
* @return 构造好的 JSON 对象
*/
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method, String version) {
JSONObject res = new JSONObject();
res.set("code", code);
res.set("data", data != null ? data : new JSONObject());
res.set("id", id);
res.set("message", message);
res.set("method", method);
res.set("version", version);
return res;
}
/**
* 向客户端返回 HTTP 响应的辅助方法
*
* @param ctx 通道上下文
* @param status HTTP 响应状态码(网络层面的)
* @param content 响应内容JSON 字符串或其他文本)
*/
private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, String content) {
// 设置响应头为 JSON 类型和正确的编码
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
status,
Unpooled.copiedBuffer(content, CharsetUtil.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// 发送响应并在发送完成后关闭连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}

View File

@@ -1,94 +0,0 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginWrapper;
import org.pf4j.Plugin;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class HttpPlugin extends Plugin {
private static final int PORT = 8092;
private ExecutorService executorService;
private DeviceDataApi deviceDataApi;
public HttpPlugin(PluginWrapper wrapper) {
super(wrapper);
// 初始化线程池
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void start() {
log.info("HttpPlugin.start()");
// 重新初始化线程池,确保它是活跃的
if (executorService.isShutdown() || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
}
// 从 ServiceRegistry 中获取主程序暴露的 DeviceDataApi 接口实例
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
// 异步启动 Netty 服务器
executorService.submit(this::startHttpServer);
}
@Override
public void stop() {
log.info("HttpPlugin.stop()");
// 停止线程池
executorService.shutdownNow();
}
/**
* 启动 HTTP 服务
*/
private void startHttpServer() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) {
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(new HttpObjectAggregator(65536));
// 将从 ServiceRegistry 获取的 deviceDataApi 传入处理器
channel.pipeline().addLast(new HttpHandler(deviceDataApi));
}
});
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(PORT).sync();
log.info("HTTP 服务器启动成功,端口为: {}", PORT);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("HTTP 服务启动被中断", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@@ -0,0 +1,105 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO;
import io.vertx.core.Handler;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpVertxHandler implements Handler<RoutingContext> {
private final DeviceDataApi deviceDataApi;
public HttpVertxHandler(DeviceDataApi deviceDataApi) {
this.deviceDataApi = deviceDataApi;
}
@Override
public void handle(RoutingContext ctx) {
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
RequestBody requestBody = ctx.body();
JSONObject jsonData;
try {
jsonData = JSONUtil.parseObj(requestBody.asJsonObject());
} catch (Exception e) {
JSONObject res = createResponseJson(
400,
new JSONObject(),
null,
"请求数据不是合法的 JSON 格式: " + e.getMessage(),
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(400)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(res.toString());
return;
}
String id = jsonData.getStr("id", null);
try {
// 调用主程序的接口保存数据
DeviceDataCreateReqDTO createDTO = DeviceDataCreateReqDTO.builder()
.productKey(productKey)
.deviceName(deviceName)
.message(jsonData.toString())
.build();
deviceDataApi.saveDeviceData(createDTO);
// 构造成功响应内容
JSONObject successRes = createResponseJson(
200,
new JSONObject(),
id,
"success",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(200)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(successRes.toString());
} catch (Exception e) {
JSONObject errorRes = createResponseJson(
500,
new JSONObject(),
id,
"The format of result is error!",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(errorRes.toString());
}
}
/**
* 创建标准化的响应 JSON 对象
*
* @param code 响应状态码(业务层面的)
* @param data 返回的数据对象JSON
* @param id 请求的 id可选
* @param message 返回的提示信息
* @param method 返回的 method 标识
* @param version 返回的版本号
* @return 构造好的 JSON 对象
*/
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method,
String version) {
JSONObject res = new JSONObject();
res.set("code", code);
res.set("data", data != null ? data : new JSONObject());
res.set("id", id);
res.set("message", message);
res.set("method", method);
res.set("version", version);
return res;
}
}

View File

@@ -0,0 +1,70 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.iocoder.yudao.module.iot.api.ServiceRegistry;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpVertxPlugin extends Plugin {
private static final int PORT = 8092;
private Vertx vertx;
private DeviceDataApi deviceDataApi;
public HttpVertxPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() {
log.info("HttpVertxPlugin.start()");
// 获取 DeviceDataApi 实例
deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
// 初始化 Vert.x
vertx = Vertx.vertx();
Router router = Router.router(vertx);
// 处理 Body
router.route().handler(BodyHandler.create());
// 设置路由
router.post("/sys/:productKey/:deviceName/thing/event/property/post")
.handler(new HttpVertxHandler(deviceDataApi));
// 启动 HTTP 服务器
vertx.createHttpServer()
.requestHandler(router)
.listen(PORT, http -> {
if (http.succeeded()) {
log.info("HTTP 服务器启动成功,端口为: {}", PORT);
} else {
log.error("HTTP 服务器启动失败", http.cause());
}
});
}
@Override
public void stop() {
log.info("HttpVertxPlugin.stop()");
if (vertx != null) {
vertx.close(ar -> {
if (ar.succeeded()) {
log.info("Vert.x 关闭成功");
} else {
log.error("Vert.x 关闭失败", ar.cause());
}
});
}
}
}