From a9222d974903c9cd7f1bb5141ab489a6905fe211 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 16 Jun 2025 07:15:51 +0000 Subject: [PATCH] Add tenant-aware WebSocket message sender and AOP support --- .../YudaoWebSocketAutoConfiguration.java | 29 +++++- .../core/annotation/TenantAware.java | 30 +++++++ .../core/aop/TenantWebSocketAspect.java | 88 +++++++++++++++++++ .../sender/TenantWebSocketMessageSender.java | 65 ++++++++++++++ .../api/websocket/WebSocketSenderApiImpl.java | 25 ++++-- 5 files changed, 229 insertions(+), 8 deletions(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/annotation/TenantAware.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/aop/TenantWebSocketAspect.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/TenantWebSocketMessageSender.java diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java index 3aded88738..4ec4930881 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.websocket.config; import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.websocket.core.aop.TenantWebSocketAspect; import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; @@ -15,6 +16,8 @@ import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMess import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; @@ -27,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; @@ -82,10 +86,31 @@ public class YudaoWebSocketAutoConfiguration { return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties); } + /** + * 创建租户感知的WebSocket消息发送器 + * 自动包装实际的发送器,提供租户隔离功能 + */ + @Bean + @Primary + @ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) + public TenantWebSocketMessageSender tenantWebSocketMessageSender(WebSocketMessageSender webSocketMessageSender) { + return new TenantWebSocketMessageSender(webSocketMessageSender); + } + + /** + * 创建租户WebSocket AOP切面 + * 自动处理带有 @TenantAware 注解的方法 + */ + @Bean + @ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) + public TenantWebSocketAspect tenantWebSocketAspect() { + return new TenantWebSocketAspect(); + } + // ==================== Sender 相关 ==================== @Configuration - @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local") + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true) public class LocalWebSocketMessageSenderConfiguration { @Bean @@ -101,7 +126,7 @@ public class YudaoWebSocketAutoConfiguration { @Bean public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, - RedisMQTemplate redisMQTemplate) { + RedisMQTemplate redisMQTemplate) { return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/annotation/TenantAware.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/annotation/TenantAware.java new file mode 100644 index 0000000000..e8933b6c4f --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/annotation/TenantAware.java @@ -0,0 +1,30 @@ +package cn.iocoder.yudao.framework.websocket.core.annotation; + +import java.lang.annotation.*; + +/** + * 租户感知注解 + * + * 用于标记需要进行租户隔离处理的WebSocket消息发送方法 + * + * @author 芋道源码 + */ +@Target({ElementType.METHOD, ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface TenantAware { + + /** + * 租户ID参数名 + * 当方法参数中包含租户ID时,指定参数名 + */ + String tenantIdParam() default "tenantId"; + + /** + * 是否强制要求租户上下文 + * 如果为true,当没有租户上下文时会抛出异常; + * 如果为false,当没有租户上下文时会忽略消息发送 + */ + boolean required() default false; + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/aop/TenantWebSocketAspect.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/aop/TenantWebSocketAspect.java new file mode 100644 index 0000000000..8023708133 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/aop/TenantWebSocketAspect.java @@ -0,0 +1,88 @@ +package cn.iocoder.yudao.framework.websocket.core.aop; + +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.core.annotation.AnnotationUtils; + +import java.lang.reflect.Method; +import java.lang.reflect.Parameter; + +/** + * 租户感知的WebSocket AOP切面 + * + * 自动处理带有 {@link TenantAware} 注解的方法的租户上下文 + * + * @author 芋道源码 + */ +@Aspect +@Slf4j +public class TenantWebSocketAspect { + + @Around("@annotation(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware) || " + + "@within(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware)") + public Object around(ProceedingJoinPoint joinPoint) throws Throwable { + // 获取方法上的注解 + Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod(); + TenantAware tenantAware = AnnotationUtils.findAnnotation(method, TenantAware.class); + + // 如果方法上没有,尝试从类上获取 + if (tenantAware == null) { + tenantAware = AnnotationUtils.findAnnotation(method.getDeclaringClass(), TenantAware.class); + } + + if (tenantAware == null) { + return joinPoint.proceed(); + } + + // 尝试从方法参数中获取租户ID + Long tenantId = extractTenantIdFromArgs(joinPoint, tenantAware.tenantIdParam()); + + // 如果参数中没有租户ID,尝试从当前上下文获取 + if (tenantId == null) { + tenantId = TenantContextHolder.getTenantId(); + } + + // 根据配置决定如何处理 + if (tenantId == null) { + if (tenantAware.required()) { + throw new IllegalStateException("租户上下文缺失,无法发送WebSocket消息"); + } else { + log.debug("[TenantWebSocketAspect] 租户上下文缺失,跳过WebSocket消息发送: {}", method.getName()); + return null; // 跳过执行 + } + } + + // 在指定租户上下文中执行 + return TenantUtils.execute(tenantId, () -> { + try { + return joinPoint.proceed(); + } catch (Throwable throwable) { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + throw new RuntimeException(throwable); + } + }); + } + + /** + * 从方法参数中提取租户ID + */ + private Long extractTenantIdFromArgs(ProceedingJoinPoint joinPoint, String tenantIdParam) { + Object[] args = joinPoint.getArgs(); + Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod(); + Parameter[] parameters = method.getParameters(); + + for (int i = 0; i < parameters.length; i++) { + if (tenantIdParam.equals(parameters[i].getName()) && args[i] instanceof Long) { + return (Long) args[i]; + } + } + return null; + } +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/TenantWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/TenantWebSocketMessageSender.java new file mode 100644 index 0000000000..1651b35073 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/TenantWebSocketMessageSender.java @@ -0,0 +1,65 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import lombok.RequiredArgsConstructor; + +/** + * 租户感知的 WebSocket 消息发送器装饰类 + * + * 自动处理租户上下文,确保消息只发送给当前租户或指定租户的用户 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +public class TenantWebSocketMessageSender implements WebSocketMessageSender { + + private final WebSocketMessageSender delegate; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + // 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送) + if (TenantContextHolder.getTenantId() != null) { + delegate.send(userType, userId, messageType, messageContent); + } + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + // 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送) + if (TenantContextHolder.getTenantId() != null) { + delegate.send(userType, messageType, messageContent); + } + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + // Session级别发送不受租户限制 + delegate.send(sessionId, messageType, messageContent); + } + + /** + * 发送消息给指定租户的指定用户 + */ + public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) { + TenantUtils.execute(tenantId, () -> { + delegate.send(userType, userId, messageType, messageContent); + }); + } + + /** + * 发送消息给指定租户的指定用户类型 + */ + public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) { + TenantUtils.execute(tenantId, () -> { + delegate.send(userType, messageType, messageContent); + }); + } + + /** + * 获取原始的委托发送器(用于特殊情况下的直接访问) + */ + public WebSocketMessageSender getDelegate() { + return delegate; + } +} \ No newline at end of file diff --git a/yudao-module-infra/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java b/yudao-module-infra/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java index a13d79a6f9..3fbe2e0bf3 100644 --- a/yudao-module-infra/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java +++ b/yudao-module-infra/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.infra.api.websocket; import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; import org.springframework.stereotype.Component; @@ -34,16 +35,28 @@ public class WebSocketSenderApiImpl implements WebSocketSenderApi { @Override public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) { - TenantUtils.execute(tenantId, () -> { - webSocketMessageSender.send(userType, userId, messageType, messageContent); - }); + // 优先使用租户感知的发送器 + if (webSocketMessageSender instanceof TenantWebSocketMessageSender) { + ((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, userId, messageType, messageContent); + } else { + // 降级方案:使用TenantUtils + TenantUtils.execute(tenantId, () -> { + webSocketMessageSender.send(userType, userId, messageType, messageContent); + }); + } } @Override public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) { - TenantUtils.execute(tenantId, () -> { - webSocketMessageSender.send(userType, messageType, messageContent); - }); + // 优先使用租户感知的发送器 + if (webSocketMessageSender instanceof TenantWebSocketMessageSender) { + ((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, messageType, messageContent); + } else { + // 降级方案:使用TenantUtils + TenantUtils.execute(tenantId, () -> { + webSocketMessageSender.send(userType, messageType, messageContent); + }); + } } }