Compare commits
2 Commits
cursor/bc-
...
cursor/sen
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a9222d9749 | ||
![]() |
e454bfd959 |
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.framework.websocket.config;
|
|||||||
|
|
||||||
import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
|
import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration;
|
||||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||||
|
import cn.iocoder.yudao.framework.websocket.core.aop.TenantWebSocketAspect;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
|
import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
|
import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
|
import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor;
|
||||||
@@ -15,6 +16,8 @@ import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMess
|
|||||||
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
|
import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
|
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
|
import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender;
|
||||||
|
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
|
||||||
|
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
|
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
|
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
|
import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl;
|
||||||
@@ -27,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.web.socket.WebSocketHandler;
|
import org.springframework.web.socket.WebSocketHandler;
|
||||||
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
import org.springframework.web.socket.config.annotation.EnableWebSocket;
|
||||||
@@ -82,10 +86,31 @@ public class YudaoWebSocketAutoConfiguration {
|
|||||||
return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);
|
return new WebSocketAuthorizeRequestsCustomizer(webSocketProperties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建租户感知的WebSocket消息发送器
|
||||||
|
* 自动包装实际的发送器,提供租户隔离功能
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
@Primary
|
||||||
|
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
|
||||||
|
public TenantWebSocketMessageSender tenantWebSocketMessageSender(WebSocketMessageSender webSocketMessageSender) {
|
||||||
|
return new TenantWebSocketMessageSender(webSocketMessageSender);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建租户WebSocket AOP切面
|
||||||
|
* 自动处理带有 @TenantAware 注解的方法
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true)
|
||||||
|
public TenantWebSocketAspect tenantWebSocketAspect() {
|
||||||
|
return new TenantWebSocketAspect();
|
||||||
|
}
|
||||||
|
|
||||||
// ==================== Sender 相关 ====================
|
// ==================== Sender 相关 ====================
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local")
|
@ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true)
|
||||||
public class LocalWebSocketMessageSenderConfiguration {
|
public class LocalWebSocketMessageSenderConfiguration {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@@ -101,7 +126,7 @@ public class YudaoWebSocketAutoConfiguration {
|
|||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager,
|
||||||
RedisMQTemplate redisMQTemplate) {
|
RedisMQTemplate redisMQTemplate) {
|
||||||
return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
|
return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -0,0 +1,30 @@
|
|||||||
|
package cn.iocoder.yudao.framework.websocket.core.annotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 租户感知注解
|
||||||
|
*
|
||||||
|
* 用于标记需要进行租户隔离处理的WebSocket消息发送方法
|
||||||
|
*
|
||||||
|
* @author 芋道源码
|
||||||
|
*/
|
||||||
|
@Target({ElementType.METHOD, ElementType.TYPE})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
public @interface TenantAware {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 租户ID参数名
|
||||||
|
* 当方法参数中包含租户ID时,指定参数名
|
||||||
|
*/
|
||||||
|
String tenantIdParam() default "tenantId";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否强制要求租户上下文
|
||||||
|
* 如果为true,当没有租户上下文时会抛出异常;
|
||||||
|
* 如果为false,当没有租户上下文时会忽略消息发送
|
||||||
|
*/
|
||||||
|
boolean required() default false;
|
||||||
|
|
||||||
|
}
|
@@ -0,0 +1,88 @@
|
|||||||
|
package cn.iocoder.yudao.framework.websocket.core.aop;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||||
|
import cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.aspectj.lang.ProceedingJoinPoint;
|
||||||
|
import org.aspectj.lang.annotation.Around;
|
||||||
|
import org.aspectj.lang.annotation.Aspect;
|
||||||
|
import org.springframework.core.annotation.AnnotationUtils;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Parameter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 租户感知的WebSocket AOP切面
|
||||||
|
*
|
||||||
|
* 自动处理带有 {@link TenantAware} 注解的方法的租户上下文
|
||||||
|
*
|
||||||
|
* @author 芋道源码
|
||||||
|
*/
|
||||||
|
@Aspect
|
||||||
|
@Slf4j
|
||||||
|
public class TenantWebSocketAspect {
|
||||||
|
|
||||||
|
@Around("@annotation(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware) || " +
|
||||||
|
"@within(cn.iocoder.yudao.framework.websocket.core.annotation.TenantAware)")
|
||||||
|
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||||
|
// 获取方法上的注解
|
||||||
|
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
|
||||||
|
TenantAware tenantAware = AnnotationUtils.findAnnotation(method, TenantAware.class);
|
||||||
|
|
||||||
|
// 如果方法上没有,尝试从类上获取
|
||||||
|
if (tenantAware == null) {
|
||||||
|
tenantAware = AnnotationUtils.findAnnotation(method.getDeclaringClass(), TenantAware.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tenantAware == null) {
|
||||||
|
return joinPoint.proceed();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试从方法参数中获取租户ID
|
||||||
|
Long tenantId = extractTenantIdFromArgs(joinPoint, tenantAware.tenantIdParam());
|
||||||
|
|
||||||
|
// 如果参数中没有租户ID,尝试从当前上下文获取
|
||||||
|
if (tenantId == null) {
|
||||||
|
tenantId = TenantContextHolder.getTenantId();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据配置决定如何处理
|
||||||
|
if (tenantId == null) {
|
||||||
|
if (tenantAware.required()) {
|
||||||
|
throw new IllegalStateException("租户上下文缺失,无法发送WebSocket消息");
|
||||||
|
} else {
|
||||||
|
log.debug("[TenantWebSocketAspect] 租户上下文缺失,跳过WebSocket消息发送: {}", method.getName());
|
||||||
|
return null; // 跳过执行
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 在指定租户上下文中执行
|
||||||
|
return TenantUtils.execute(tenantId, () -> {
|
||||||
|
try {
|
||||||
|
return joinPoint.proceed();
|
||||||
|
} catch (Throwable throwable) {
|
||||||
|
if (throwable instanceof RuntimeException) {
|
||||||
|
throw (RuntimeException) throwable;
|
||||||
|
}
|
||||||
|
throw new RuntimeException(throwable);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从方法参数中提取租户ID
|
||||||
|
*/
|
||||||
|
private Long extractTenantIdFromArgs(ProceedingJoinPoint joinPoint, String tenantIdParam) {
|
||||||
|
Object[] args = joinPoint.getArgs();
|
||||||
|
Method method = ((org.aspectj.lang.reflect.MethodSignature) joinPoint.getSignature()).getMethod();
|
||||||
|
Parameter[] parameters = method.getParameters();
|
||||||
|
|
||||||
|
for (int i = 0; i < parameters.length; i++) {
|
||||||
|
if (tenantIdParam.equals(parameters[i].getName()) && args[i] instanceof Long) {
|
||||||
|
return (Long) args[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,65 @@
|
|||||||
|
package cn.iocoder.yudao.framework.websocket.core.sender;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 租户感知的 WebSocket 消息发送器装饰类
|
||||||
|
*
|
||||||
|
* 自动处理租户上下文,确保消息只发送给当前租户或指定租户的用户
|
||||||
|
*
|
||||||
|
* @author 芋道源码
|
||||||
|
*/
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class TenantWebSocketMessageSender implements WebSocketMessageSender {
|
||||||
|
|
||||||
|
private final WebSocketMessageSender delegate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Integer userType, Long userId, String messageType, String messageContent) {
|
||||||
|
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
|
||||||
|
if (TenantContextHolder.getTenantId() != null) {
|
||||||
|
delegate.send(userType, userId, messageType, messageContent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Integer userType, String messageType, String messageContent) {
|
||||||
|
// 如果当前有租户上下文,直接发送;否则跳过(避免跨租户发送)
|
||||||
|
if (TenantContextHolder.getTenantId() != null) {
|
||||||
|
delegate.send(userType, messageType, messageContent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(String sessionId, String messageType, String messageContent) {
|
||||||
|
// Session级别发送不受租户限制
|
||||||
|
delegate.send(sessionId, messageType, messageContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息给指定租户的指定用户
|
||||||
|
*/
|
||||||
|
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
|
||||||
|
TenantUtils.execute(tenantId, () -> {
|
||||||
|
delegate.send(userType, userId, messageType, messageContent);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息给指定租户的指定用户类型
|
||||||
|
*/
|
||||||
|
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
|
||||||
|
TenantUtils.execute(tenantId, () -> {
|
||||||
|
delegate.send(userType, messageType, messageContent);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取原始的委托发送器(用于特殊情况下的直接访问)
|
||||||
|
*/
|
||||||
|
public WebSocketMessageSender getDelegate() {
|
||||||
|
return delegate;
|
||||||
|
}
|
||||||
|
}
|
@@ -39,6 +39,27 @@ public interface WebSocketSenderApi {
|
|||||||
*/
|
*/
|
||||||
void send(String sessionId, String messageType, String messageContent);
|
void send(String sessionId, String messageType, String messageContent);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息给指定租户的指定用户
|
||||||
|
*
|
||||||
|
* @param tenantId 租户编号
|
||||||
|
* @param userType 用户类型
|
||||||
|
* @param userId 用户编号
|
||||||
|
* @param messageType 消息类型
|
||||||
|
* @param messageContent 消息内容,JSON 格式
|
||||||
|
*/
|
||||||
|
void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 发送消息给指定租户的指定用户类型
|
||||||
|
*
|
||||||
|
* @param tenantId 租户编号
|
||||||
|
* @param userType 用户类型
|
||||||
|
* @param messageType 消息类型
|
||||||
|
* @param messageContent 消息内容,JSON 格式
|
||||||
|
*/
|
||||||
|
void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent);
|
||||||
|
|
||||||
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
|
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
|
||||||
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
|
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
|
||||||
}
|
}
|
||||||
@@ -51,4 +72,12 @@ public interface WebSocketSenderApi {
|
|||||||
send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
|
send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default void sendObjectToTenant(Long tenantId, Integer userType, Long userId, String messageType, Object messageContent) {
|
||||||
|
sendToTenant(tenantId, userType, userId, messageType, JsonUtils.toJsonString(messageContent));
|
||||||
|
}
|
||||||
|
|
||||||
|
default void sendObjectToTenant(Long tenantId, Integer userType, String messageType, Object messageContent) {
|
||||||
|
sendToTenant(tenantId, userType, messageType, JsonUtils.toJsonString(messageContent));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,7 @@
|
|||||||
package cn.iocoder.yudao.module.infra.api.websocket;
|
package cn.iocoder.yudao.module.infra.api.websocket;
|
||||||
|
|
||||||
|
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||||
|
import cn.iocoder.yudao.framework.websocket.core.sender.TenantWebSocketMessageSender;
|
||||||
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
|
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@@ -31,4 +33,30 @@ public class WebSocketSenderApiImpl implements WebSocketSenderApi {
|
|||||||
webSocketMessageSender.send(sessionId, messageType, messageContent);
|
webSocketMessageSender.send(sessionId, messageType, messageContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendToTenant(Long tenantId, Integer userType, Long userId, String messageType, String messageContent) {
|
||||||
|
// 优先使用租户感知的发送器
|
||||||
|
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
|
||||||
|
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, userId, messageType, messageContent);
|
||||||
|
} else {
|
||||||
|
// 降级方案:使用TenantUtils
|
||||||
|
TenantUtils.execute(tenantId, () -> {
|
||||||
|
webSocketMessageSender.send(userType, userId, messageType, messageContent);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendToTenant(Long tenantId, Integer userType, String messageType, String messageContent) {
|
||||||
|
// 优先使用租户感知的发送器
|
||||||
|
if (webSocketMessageSender instanceof TenantWebSocketMessageSender) {
|
||||||
|
((TenantWebSocketMessageSender) webSocketMessageSender).sendToTenant(tenantId, userType, messageType, messageContent);
|
||||||
|
} else {
|
||||||
|
// 降级方案:使用TenantUtils
|
||||||
|
TenantUtils.execute(tenantId, () -> {
|
||||||
|
webSocketMessageSender.send(userType, messageType, messageContent);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user