Commit b0ef985b authored by zhouwei's avatar zhouwei

增强超时逻辑

parent 39891e5c
......@@ -12,7 +12,7 @@ import org.springframework.context.annotation.Configuration;
public class AppConfig {
@Value("${alert.host}")
String alertHost;
@Value("${gateway.request.timeout:100000}")
@Value("${gateway.request.timeout:30000}")
private int requestTimeout;
public int getRequestTimeout() {
......
......@@ -36,7 +36,7 @@ import java.util.concurrent.TimeoutException;
@Component
public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<FailoverGatewayFilterFactory.Config> {
private static final ThreadLocal<String> REQUEST_BODY_CACHE = new ThreadLocal<>();
private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
private final WebClient.Builder webClientBuilder;
private final TokenRouteMappingService tokenRouteMappingService;
// 添加错误统计
......@@ -153,101 +153,100 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
if (token != null) {
return tokenRouteMappingService.findMatchingUriConfig(token, sourceUri)
.map(uriConfig -> {
return cacheRequestBody(exchange).flatMap(cachedBody -> {
log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri);
String primaryUrl = uriConfig.getPrimaryUrl();
CircularQueue errorStats = getOrCreateErrorStats(primaryUrl);
long startTime = System.currentTimeMillis();
// 检查是否应该跳过主地址
if (shouldSkipPrimary(uriConfig.getPrimaryUrl())) {
//告警日志
log.info("Directly using fallback due to high error rate");
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null
);
}
.map(uriConfig -> cacheRequestBody(exchange).flatMap(cachedBody -> {
log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri);
String primaryUrl = uriConfig.getPrimaryUrl();
CircularQueue errorStats = getOrCreateErrorStats(primaryUrl);
long startTime = System.currentTimeMillis();
// 检查是否应该跳过主地址
if (shouldSkipPrimary(uriConfig.getPrimaryUrl())) {
//告警日志
log.info("Directly using fallback due to high error rate");
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
uriConfig.getPrimaryUrl(),
sourceUri,
uriConfig.getPrimaryHost(),
uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
true,
errorStats
).timeout(Duration.ofMillis(appConfig.getRequestTimeout()))
.onErrorResume(TimeoutException.class, ex -> {
// 记录超时错误
long duration = System.currentTimeMillis() - startTime;
log.warn("Source request timed out after {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
// 直接转发到 target
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(Exception.class, ex -> {
// 处理其他错误
log.error("Source request failed: {}", ex.getMessage());
log.info("Source request timed out after {}ms, switching to target: {}",
System.currentTimeMillis() - startTime, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(primaryError -> {
log.error("Primary endpoint failed: {}, error: {}",
uriConfig.getPrimaryUrl() + sourceUri,
primaryError.getMessage());
long duration = System.currentTimeMillis() - startTime;
log.info("Source request timed out after {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
// 记录错误
getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false);
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null
);
});
});
})
false,
null
);
}
return tryRequest(
uriConfig.getPrimaryUrl(),
sourceUri,
uriConfig.getPrimaryHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
true,
errorStats
).timeout(Duration.ofMillis(appConfig.getRequestTimeout()))
.onErrorResume(TimeoutException.class, ex -> {
// 记录超时错误
long duration = System.currentTimeMillis() - startTime;
log.info("Source request timed out after {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
// 直接转发到 target
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(Exception.class, ex -> {
// 处理其他错误
log.error("Source request failed: {}", ex.getMessage());
log.info("Source request exception after {}ms, switching to target: {}",
System.currentTimeMillis() - startTime, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(primaryError -> {
log.error("Primary endpoint failed: {}, error: {}",
uriConfig.getPrimaryUrl() + sourceUri,
primaryError.getMessage());
long duration = System.currentTimeMillis() - startTime;
log.info("Source request primaryError {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
// 记录错误
getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false);
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null
);
});
}))
.orElse(chain.filter(exchange));
}
return chain.filter(exchange);
} finally {
// 确保在请求处理完成后清理 ThreadLocal
REQUEST_BODY_CACHE.remove();
// 清理缓存的请求体
exchange.getAttributes().remove(CACHED_REQUEST_BODY_KEY);
}
};
}
......@@ -255,11 +254,17 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private Mono<String> cacheRequestBody(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
// 如果已经缓存过请求体,直接返回
if (exchange.getAttribute(CACHED_REQUEST_BODY_KEY) != null) {
return Mono.just("");
}
// 对于不需要请求体的方法,直接返回
if (!requiresRequestBody(request.getMethod())) {
return Mono.just("");
}
// 读取并缓存请求体
return DataBufferUtils.join(request.getBody())
.map(dataBuffer -> {
try {
......@@ -268,7 +273,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
String body = new String(bytes, StandardCharsets.UTF_8);
// 将请求体存储在 ThreadLocal 中
log.info("cache body:{}", body);
REQUEST_BODY_CACHE.set(body);
exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, body);
return body;
} finally {
DataBufferUtils.release(dataBuffer);
......@@ -279,7 +284,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private Mono<Void> tryRequest(String baseUrl, String uri, String host, String model, String modelKey,
ServerWebExchange exchange, boolean isSourceRequest, CircularQueue errorStats) {
String cachedBody = REQUEST_BODY_CACHE.get();
String cachedBody = exchange.getAttribute(CACHED_REQUEST_BODY_KEY);
ServerHttpRequest request = exchange.getRequest();
String fullUrl = baseUrl + (baseUrl.endsWith("/") ? uri.substring(1) : uri);
......
......@@ -3,24 +3,4 @@ spring.cloud.config.profile=qa
spring.cloud.config.label=master
spring.cloud.config.enabled=true
#spring.config.import=optional:configserver:http://sl-config-center:8080
spring.config.import=optional:configserver:http://43.199.200.152:32594
#crime.mapping.event={"Arson":"\u7EB5\u706B\u7F6A","Assault":"\u88AD\u51FB/\u653B\u51FB","Burglary":"\u5165\u5BA4\u76D7\u7A83","Disturbing the Peace":"\u6270\u4E71\u516C\u5171\u79E9\u5E8F","Drugs / Alcohol Violations":"\u6BD2\u54C1/\u9152\u7CBE\u8FDD\u89C4","DUI":"\u9189\u9A7E","Fraud":"\u8BC8\u9A97","Homicide":"\u6740\u4EBA\u7F6A","Motor Vehicle Theft":"\u673A\u52A8\u8F66\u76D7\u7A83","Robbery":"\u62A2\u52AB","Sex Crimes":"\u6027\u72AF\u7F6A","Theft / Larceny":"\u76D7\u7A83","Vandalism":"\u6076\u610F\u7834\u574F\u8D22\u7269","Vehicle Break-In / Theft":"\u8F66\u8F86\u95EF\u5165/\u76D7\u7A83","Weapons":"\u6D89\u62A2\u6D89\u68B0\u72AF\u7F6A"}
# Gateway Configuration
#spring.cloud.gateway.httpclient.connect-timeout=5000
#spring.cloud.gateway.httpclient.response-timeout=5000
#
## Gateway Failover Configuration
#gateway.failover.token-mappings[0].tokens[0]=app-KNq0O8kENP4ITqSmqHQ0IzAt1
#gateway.failover.token-mappings[0].uri-configs[0].source-uri=/v1/workflows/run
#gateway.failover.token-mappings[0].uri-configs[0].target-uri=/api/v1/tag
#gateway.failover.token-mappings[0].uri-configs[0].primary-host=18.163.46.22
#gateway.failover.token-mappings[0].uri-configs[0].primary-url=http://18.163.46.22
#gateway.failover.token-mappings[0].uri-configs[0].fallback-host=127.0.0.1
#gateway.failover.token-mappings[0].uri-configs[0].fallback-url=http://127.0.0.1:8081
#gateway.failover.token-mappings[0].uri-configs[1].source-uri=/v1/workflows/status/*
#gateway.failover.token-mappings[0].uri-configs[1].target-uri=/api/v1/status/{remaining}
#gateway.failover.token-mappings[0].uri-configs[1].primary-host=18.163.46.22
#gateway.failover.token-mappings[0].uri-configs[1].primary-url=http://18.163.46.22
#gateway.failover.token-mappings[0].uri-configs[1].fallback-host=k8s-security-ingresss-2004545575-1912502751.ap-east-1.elb.amazonaws.com
#gateway.failover.token-mappings[0].uri-configs[1].fallback-url=http://k8s-security-ingresss-2004545575-1912502751.ap-east-1.elb.amazonaws.com
\ No newline at end of file
spring.config.import=optional:configserver:http://43.199.200.152:32594
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment