Compare commits
2 Commits
cursor/bc-
...
cursor/sen
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a9222d9749 | ||
![]() |
e454bfd959 |
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.websocket.config;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.websocket.core.aop.TenantWebSocketAspect;
|
||||
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
|
||||
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
|
||||
import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
|
||||
@@ -15,6 +16,8 @@ import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMess
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
|
||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
|
||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
|
||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
|
||||
@@ -27,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.web.socket.WebSocketHandler;
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||
@@ -82,10 +86,31 @@ public class YudaoWebSocketAutoConfiguration {
|
||||
return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建租户感知的WebSocket消息发送器
|
||||
* 自动包装实际的发送器,提供租户隔离功能
|
||||
*/
|
||||
@Bean
|
||||
@Primary
|
||||
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
|
||||
public TenantWebSocketMessageSender tenantWebSocketMessageSender(WebSocketMessageSender webSocketMessageSender) {
|
||||
return new TenantWebSocketMessageSender(webSocketMessageSender);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建租户WebSocket AOP切面
|
||||
* 自动处理带有 @TenantAware 注解的方法
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
|
||||
public TenantWebSocketAspect tenantWebSocketAspect() {
|
||||
return new TenantWebSocketAspect();
|
||||
}
|
||||
|
||||
// ==================== Sender 相关 ====================
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local")
|
||||
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true)
|
||||
public class LocalWebSocketMessageSenderConfiguration {
|
||||
|
||||
@Bean
|
||||
|
@@ -0,0 +1,30 @@
|
||||
package cn.iocoder.yudao.framework.websocket.core.annotation;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 租户感知注解
|
||||
*
|
||||
* 用于标记需要进行租户隔离处理的WebSocket消息发送方法
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Target({ElementType.METHOD, ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface TenantAware {
|
||||
|
||||
/**
|
||||
* 租户ID参数名
|
||||
* 当方法参数中包含租户ID时,指定参数名
|
||||
*/
|
||||
String tenantIdParam() default "tenantId";
|
||||
|
||||
/**
|
||||
* 是否强制要求租户上下文
|
||||
* 如果为true,当没有租户上下文时会抛出异常;
|
||||
* 如果为false,当没有租户上下文时会忽略消息发送
|
||||
*/
|
||||
boolean required() default false;
|
||||
|
||||
}
|
@@ -0,0 +1,88 @@
|
||||
package cn.iocoder.yudao.framework.websocket.core.aop;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Parameter;
|
||||
|
||||
/**
|
||||
* 租户感知的WebSocket AOP切面
|
||||
*
|
||||
* 自动处理带有 {@link TenantAware} 注解的方法的租户上下文
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Aspect
|
||||
@Slf4j
|
||||
public class TenantWebSocketAspect {
|
||||
|
||||
@Around("@annotation(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware) || " +
|
||||
"@within(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware)")
|
||||
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||
// 获取方法上的注解
|
||||
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
|
||||
TenantAware tenantAware = AnnotationUtils.findAnnotation(method, TenantAware.class);
|
||||
|
||||
// 如果方法上没有,尝试从类上获取
|
||||
if (tenantAware == null) {
|
||||
tenantAware = AnnotationUtils.findAnnotation(method.getDeclaringClass(), TenantAware.class);
|
||||
}
|
||||
|
||||
if (tenantAware == null) {
|
||||
return joinPoint.proceed();
|
||||
}
|
||||
|
||||
// 尝试从方法参数中获取租户ID
|
||||
Long tenantId = extractTenantIdFromArgs(joinPoint, tenantAware.tenantIdParam());
|
||||
|
||||
// 如果参数中没有租户ID,尝试从当前上下文获取
|
||||
if (tenantId == null) {
|
||||
tenantId = TenantContextHolder.getTenantId();
|
||||
}
|
||||
|
||||
// 根据配置决定如何处理
|
||||
if (tenantId == null) {
|
||||
if (tenantAware.required()) {
|
||||
throw new IllegalStateException("租户上下文缺失,无法发送WebSocket消息");
|
||||
} else {
|
||||
log.debug("[TenantWebSocketAspect] 租户上下文缺失,跳过WebSocket消息发送: {}", method.getName());
|
||||
return null; // 跳过执行
|
||||
}
|
||||
}
|
||||
|
||||
// 在指定租户上下文中执行
|
||||
return TenantUtils.execute(tenantId, () -> {
|
||||
try {
|
||||
return joinPoint.proceed();
|
||||
} catch (Throwable throwable) {
|
||||
if (throwable instanceof RuntimeException) {
|
||||
throw (RuntimeException) throwable;
|
||||
}
|
||||
throw new RuntimeException(throwable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 从方法参数中提取租户ID
|
||||
*/
|
||||
private Long extractTenantIdFromArgs(ProceedingJoinPoint joinPoint, String tenantIdParam) {
|
||||
Object[] args = joinPoint.getArgs();
|
||||
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
|
||||
Parameter[] parameters = method.getParameters();
|
||||
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
if (tenantIdParam.equals(parameters[i].getName()) && args[i] instanceof Long) {
|
||||
return (Long) args[i];
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@@ -0,0 +1,65 @@
|
||||
package cn.iocoder.yudao.framework.websocket.core.sender;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
/**
|
||||
* 租户感知的 WebSocket 消息发送器装饰类
|
||||
*
|
||||
* 自动处理租户上下文,确保消息只发送给当前租户或指定租户的用户
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
public class TenantWebSocketMessageSender implements WebSocketMessageSender {
|
||||
|
||||
private final WebSocketMessageSender delegate;
|
||||
|
||||
@Override
|
||||
public void send(Integer userType, Long userId, String messageType, String messageContent) {
|
||||
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
|
||||
if (TenantContextHolder.getTenantId() != null) {
|
||||
delegate.send(userType, userId, messageType, messageContent);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Integer userType, String messageType, String messageContent) {
|
||||
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
|
||||
if (TenantContextHolder.getTenantId() != null) {
|
||||
delegate.send(userType, messageType, messageContent);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(String sessionId, String messageType, String messageContent) {
|
||||
// Session级别发送不受租户限制
|
||||
delegate.send(sessionId, messageType, messageContent);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息给指定租户的指定用户
|
||||
*/
|
||||
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
|
||||
TenantUtils.execute(tenantId, () -> {
|
||||
delegate.send(userType, userId, messageType, messageContent);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息给指定租户的指定用户类型
|
||||
*/
|
||||
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
|
||||
TenantUtils.execute(tenantId, () -> {
|
||||
delegate.send(userType, messageType, messageContent);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取原始的委托发送器(用于特殊情况下的直接访问)
|
||||
*/
|
||||
public WebSocketMessageSender getDelegate() {
|
||||
return delegate;
|
||||
}
|
||||
}
|
@@ -39,6 +39,27 @@ public interface WebSocketSenderApi {
|
||||
*/
|
||||
void send(String sessionId, String messageType, String messageContent);
|
||||
|
||||
/**
|
||||
* 发送消息给指定租户的指定用户
|
||||
*
|
||||
* @param tenantId 租户编号
|
||||
* @param userType 用户类型
|
||||
* @param userId 用户编号
|
||||
* @param messageType 消息类型
|
||||
* @param messageContent 消息内容,JSON 格式
|
||||
*/
|
||||
void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent);
|
||||
|
||||
/**
|
||||
* 发送消息给指定租户的指定用户类型
|
||||
*
|
||||
* @param tenantId 租户编号
|
||||
* @param userType 用户类型
|
||||
* @param messageType 消息类型
|
||||
* @param messageContent 消息内容,JSON 格式
|
||||
*/
|
||||
void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent);
|
||||
|
||||
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
|
||||
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
|
||||
}
|
||||
@@ -51,4 +72,12 @@ public interface WebSocketSenderApi {
|
||||
send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
|
||||
}
|
||||
|
||||
default void sendObjectToTenant(Long tenantId, Integer userType, Long userId, String messageType, Object messageContent) {
|
||||
sendToTenant(tenantId, userType, userId, messageType, JsonUtils.toJsonString(messageContent));
|
||||
}
|
||||
|
||||
default void sendObjectToTenant(Long tenantId, Integer userType, String messageType, Object messageContent) {
|
||||
sendToTenant(tenantId, userType, messageType, JsonUtils.toJsonString(messageContent));
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -1,5 +1,7 @@
|
||||
package cn.iocoder.yudao.module.infra.api.websocket;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
|
||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -31,4 +33,30 @@ public class WebSocketSenderApiImpl implements WebSocketSenderApi {
|
||||
webSocketMessageSender.send(sessionId, messageType, messageContent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
|
||||
// 优先使用租户感知的发送器
|
||||
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
|
||||
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, userId, messageType, messageContent);
|
||||
} else {
|
||||
// 降级方案:使用TenantUtils
|
||||
TenantUtils.execute(tenantId, () -> {
|
||||
webSocketMessageSender.send(userType, userId, messageType, messageContent);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
|
||||
// 优先使用租户感知的发送器
|
||||
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
|
||||
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, messageType, messageContent);
|
||||
} else {
|
||||
// 降级方案:使用TenantUtils
|
||||
TenantUtils.execute(tenantId, () -> {
|
||||
webSocketMessageSender.send(userType, messageType, messageContent);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user