Compare commits

...

2 Commits

Author SHA1 Message Date
Cursor Agent
a9222d9749 Add tenant-aware WebSocket message sender and AOP support 2025-06-16 07:15:51 +00:00
Cursor Agent
e454bfd959 Add WebSocket methods for sending messages to specific tenants 2025-06-16 06:24:54 +00:00
6 changed files with 267 additions and 2 deletions

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.websocket.config;
import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.websocket.core.aop.TenantWebSocketAspect;
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
@@ -15,6 +16,8 @@ import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMess
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
@@ -27,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
@@ -82,10 +86,31 @@ public class YudaoWebSocketAutoConfiguration {
return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);
}
/**
* 创建租户感知的WebSocket消息发送器
* 自动包装实际的发送器,提供租户隔离功能
*/
@Bean
@Primary
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
public TenantWebSocketMessageSender tenantWebSocketMessageSender(WebSocketMessageSender webSocketMessageSender) {
return new TenantWebSocketMessageSender(webSocketMessageSender);
}
/**
* 创建租户WebSocket AOP切面
* 自动处理带有 @TenantAware 注解的方法
*/
@Bean
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
public TenantWebSocketAspect tenantWebSocketAspect() {
return new TenantWebSocketAspect();
}
// ==================== Sender 相关 ====================
@Configuration
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local")
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true)
public class LocalWebSocketMessageSenderConfiguration {
@Bean

View File

@@ -0,0 +1,30 @@
package cn.iocoder.yudao.framework.websocket.core.annotation;
import java.lang.annotation.*;
/**
* 租户感知注解
*
* 用于标记需要进行租户隔离处理的WebSocket消息发送方法
*
* @author 芋道源码
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TenantAware {
/**
* 租户ID参数名
* 当方法参数中包含租户ID时指定参数名
*/
String tenantIdParam() default "tenantId";
/**
* 是否强制要求租户上下文
* 如果为true当没有租户上下文时会抛出异常
* 如果为false当没有租户上下文时会忽略消息发送
*/
boolean required() default false;
}

View File

@@ -0,0 +1,88 @@
package cn.iocoder.yudao.framework.websocket.core.aop;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.AnnotationUtils;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
/**
* 租户感知的WebSocket AOP切面
*
* 自动处理带有 {@link TenantAware} 注解的方法的租户上下文
*
* @author 芋道源码
*/
@Aspect
@Slf4j
public class TenantWebSocketAspect {
@Around("@annotation(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware) || " +
"@within(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取方法上的注解
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
TenantAware tenantAware = AnnotationUtils.findAnnotation(method, TenantAware.class);
// 如果方法上没有,尝试从类上获取
if (tenantAware == null) {
tenantAware = AnnotationUtils.findAnnotation(method.getDeclaringClass(), TenantAware.class);
}
if (tenantAware == null) {
return joinPoint.proceed();
}
// 尝试从方法参数中获取租户ID
Long tenantId = extractTenantIdFromArgs(joinPoint, tenantAware.tenantIdParam());
// 如果参数中没有租户ID尝试从当前上下文获取
if (tenantId == null) {
tenantId = TenantContextHolder.getTenantId();
}
// 根据配置决定如何处理
if (tenantId == null) {
if (tenantAware.required()) {
throw new IllegalStateException("租户上下文缺失无法发送WebSocket消息");
} else {
log.debug("[TenantWebSocketAspect] 租户上下文缺失跳过WebSocket消息发送: {}", method.getName());
return null; // 跳过执行
}
}
// 在指定租户上下文中执行
return TenantUtils.execute(tenantId, () -> {
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
throw new RuntimeException(throwable);
}
});
}
/**
* 从方法参数中提取租户ID
*/
private Long extractTenantIdFromArgs(ProceedingJoinPoint joinPoint, String tenantIdParam) {
Object[] args = joinPoint.getArgs();
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
if (tenantIdParam.equals(parameters[i].getName()) && args[i] instanceof Long) {
return (Long) args[i];
}
}
return null;
}
}

View File

@@ -0,0 +1,65 @@
package cn.iocoder.yudao.framework.websocket.core.sender;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import lombok.RequiredArgsConstructor;
/**
* 租户感知的 WebSocket 消息发送器装饰类
*
* 自动处理租户上下文,确保消息只发送给当前租户或指定租户的用户
*
* @author 芋道源码
*/
@RequiredArgsConstructor
public class TenantWebSocketMessageSender implements WebSocketMessageSender {
private final WebSocketMessageSender delegate;
@Override
public void send(Integer userType, Long userId, String messageType, String messageContent) {
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
if (TenantContextHolder.getTenantId() != null) {
delegate.send(userType, userId, messageType, messageContent);
}
}
@Override
public void send(Integer userType, String messageType, String messageContent) {
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
if (TenantContextHolder.getTenantId() != null) {
delegate.send(userType, messageType, messageContent);
}
}
@Override
public void send(String sessionId, String messageType, String messageContent) {
// Session级别发送不受租户限制
delegate.send(sessionId, messageType, messageContent);
}
/**
* 发送消息给指定租户的指定用户
*/
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
TenantUtils.execute(tenantId, () -> {
delegate.send(userType, userId, messageType, messageContent);
});
}
/**
* 发送消息给指定租户的指定用户类型
*/
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
TenantUtils.execute(tenantId, () -> {
delegate.send(userType, messageType, messageContent);
});
}
/**
* 获取原始的委托发送器(用于特殊情况下的直接访问)
*/
public WebSocketMessageSender getDelegate() {
return delegate;
}
}

View File

@@ -39,6 +39,27 @@ public interface WebSocketSenderApi {
*/
void send(String sessionId, String messageType, String messageContent);
/**
* 发送消息给指定租户的指定用户
*
* @param tenantId 租户编号
* @param userType 用户类型
* @param userId 用户编号
* @param messageType 消息类型
* @param messageContent 消息内容JSON 格式
*/
void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent);
/**
* 发送消息给指定租户的指定用户类型
*
* @param tenantId 租户编号
* @param userType 用户类型
* @param messageType 消息类型
* @param messageContent 消息内容JSON 格式
*/
void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent);
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
}
@@ -51,4 +72,12 @@ public interface WebSocketSenderApi {
send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
}
default void sendObjectToTenant(Long tenantId, Integer userType, Long userId, String messageType, Object messageContent) {
sendToTenant(tenantId, userType, userId, messageType, JsonUtils.toJsonString(messageContent));
}
default void sendObjectToTenant(Long tenantId, Integer userType, String messageType, Object messageContent) {
sendToTenant(tenantId, userType, messageType, JsonUtils.toJsonString(messageContent));
}
}

View File

@@ -1,5 +1,7 @@
package cn.iocoder.yudao.module.infra.api.websocket;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
import org.springframework.stereotype.Component;
@@ -31,4 +33,30 @@ public class WebSocketSenderApiImpl implements WebSocketSenderApi {
webSocketMessageSender.send(sessionId, messageType, messageContent);
}
@Override
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
// 优先使用租户感知的发送器
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, userId, messageType, messageContent);
} else {
// 降级方案使用TenantUtils
TenantUtils.execute(tenantId, () -> {
webSocketMessageSender.send(userType, userId, messageType, messageContent);
});
}
}
@Override
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
// 优先使用租户感知的发送器
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, messageType, messageContent);
} else {
// 降级方案使用TenantUtils
TenantUtils.execute(tenantId, () -> {
webSocketMessageSender.send(userType, messageType, messageContent);
});
}
}
}