Commit 1c983936 authored by zhouwei's avatar zhouwei

增强超时处理逻辑

parent b0ef985b
......@@ -12,7 +12,7 @@ import org.springframework.context.annotation.Configuration;
public class AppConfig {
@Value("${alert.host}")
String alertHost;
@Value("${gateway.request.timeout:30000}")
@Value("${gateway.request.timeout:50000}")
private int requestTimeout;
public int getRequestTimeout() {
......
......@@ -29,6 +29,7 @@ import reactor.netty.http.client.HttpClient;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
......@@ -37,6 +38,7 @@ import java.util.concurrent.TimeoutException;
public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<FailoverGatewayFilterFactory.Config> {
private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
private static final String REQUEST_ID = "request_id";
private final WebClient.Builder webClientBuilder;
private final TokenRouteMappingService tokenRouteMappingService;
// 添加错误统计
......@@ -154,7 +156,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
if (token != null) {
return tokenRouteMappingService.findMatchingUriConfig(token, sourceUri)
.map(uriConfig -> cacheRequestBody(exchange).flatMap(cachedBody -> {
log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri);
log.info("Found matching URI config for token: {}, sourceUri: {}, request id:{}", token, sourceUri, exchange.getAttributes().get(REQUEST_ID));
String primaryUrl = uriConfig.getPrimaryUrl();
CircularQueue errorStats = getOrCreateErrorStats(primaryUrl);
long startTime = System.currentTimeMillis();
......@@ -185,44 +187,13 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
true,
errorStats
).timeout(Duration.ofMillis(appConfig.getRequestTimeout()))
.onErrorResume(TimeoutException.class, ex -> {
// 记录超时错误
long duration = System.currentTimeMillis() - startTime;
log.info("Source request timed out after {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
// 直接转发到 target
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(Exception.class, ex -> {
// 处理其他错误
log.error("Source request failed: {}", ex.getMessage());
log.info("Source request exception after {}ms, switching to target: {}",
System.currentTimeMillis() - startTime, uriConfig.getTargetUri());
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null);
})
.onErrorResume(primaryError -> {
.onErrorResume(throwable -> {
log.error("Primary endpoint failed: {}, error: {}",
uriConfig.getPrimaryUrl() + sourceUri,
primaryError.getMessage());
throwable.getMessage());
long duration = System.currentTimeMillis() - startTime;
log.info("Source request primaryError {}ms, switching to target: {}",
duration, uriConfig.getTargetUri());
log.info("Source request error:{} took:{}ms, switching to target: {} request id:{}",
throwable.getMessage(),duration, uriConfig.getTargetUri(), exchange.getAttributes().get(REQUEST_ID));
// 记录错误
getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false);
......@@ -260,6 +231,10 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
return Mono.just("");
}
if (exchange.getAttribute(REQUEST_ID) == null) {
exchange.getAttributes().put(REQUEST_ID, UUID.randomUUID().toString());
}
// 对于不需要请求体的方法,直接返回
if (!requiresRequestBody(request.getMethod())) {
return Mono.just("");
......@@ -288,7 +263,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
ServerHttpRequest request = exchange.getRequest();
String fullUrl = baseUrl + (baseUrl.endsWith("/") ? uri.substring(1) : uri);
log.info("Starting {} request to: {}", isSourceRequest ? "source" : "target", fullUrl);
log.info("request id:{} Starting {} request to: {}",exchange.getAttributes().get(REQUEST_ID), isSourceRequest ? "source" : "target", fullUrl);
long startTime = System.currentTimeMillis();
// 创建 WebClient 请求
WebClient client = webClientBuilder.clone()
......@@ -325,11 +300,11 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
.doFinally(signalType -> {
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
log.info("{} request completed: {} - took {}ms, signal: {}",
log.info("{} request completed: {} - took {}ms, signal: {} request id:{}",
isSourceRequest ? "Source" : "Target",
fullUrl,
duration,
signalType);
signalType,exchange.getAttributes().get(REQUEST_ID));
});
}
......@@ -341,7 +316,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
if (isSourceRequest) {
return clientResponse.bodyToMono(String.class)
.flatMap(body -> {
log.info("Source request response: status={}, body={}", responseStatus, body);
log.info("Source request response: status={}, body={} request id:{}", responseStatus, body, exchange.getAttributes().get(REQUEST_ID));
if (responseStatus != HttpStatus.OK) {
// 只在 source 请求时记录错误
......@@ -349,8 +324,8 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
errorStats.add(false);
}
return Mono.error(new RuntimeException(
String.format("Source request HTTP status not 200: %s, body: %s",
responseStatus.value(), body)));
String.format("Source request HTTP status not 200: %s, body: %s ,request id:%s",
responseStatus.value(), body, exchange.getAttributes().get(REQUEST_ID))));
}
try {
......@@ -381,8 +356,8 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
errorStats.add(false);
}
return Mono.error(new RuntimeException(
String.format("Source request body status not 200: %s, body: %s",
bodyStatus, body)));
String.format("Source request body status not 200: %s, body: %s, request id:%s",
bodyStatus, body, exchange.getAttributes().get(REQUEST_ID))));
}
// 只在 source 请求时记录错误
if (errorStats != null) {
......@@ -396,9 +371,9 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
return exchange.getResponse().writeWith(Mono.just(buffer));
} catch (Exception e) {
log.error("Error parsing response body: {}", e.getMessage());
log.error("Error parsing response body: {} request id:{}", e.getMessage(), exchange.getAttributes().get(REQUEST_ID));
return Mono.error(new RuntimeException(
String.format("Failed to parse response body as JSON: %s", body)));
String.format("Failed to parse response body as JSON: %s request id:{}", body, exchange.getAttributes().get(REQUEST_ID))));
}
});
}
......@@ -408,12 +383,12 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
return clientResponse.bodyToMono(DataBuffer.class)
.flatMap(body -> {
log.info("Target request response: status={}, body={}", responseStatus, body);
log.info("Target request response: status={}, body={},request id={}", responseStatus, body, exchange.getAttributes().get(REQUEST_ID));
DataBuffer body1 = body;
// 检查 HTTP 状态码
if (!responseStatus.is2xxSuccessful()) {
String errorMessage = String.format("Target request failed with status: %s, body: %s",
responseStatus.value(), body);
String errorMessage = String.format("Target request failed with status: %s, body: %s,request id={}",
responseStatus.value(), body, exchange.getAttributes().get(REQUEST_ID));
log.error(errorMessage);
return writeErrorResponse(exchange, HttpStatus.BAD_GATEWAY, errorMessage);
}
......@@ -421,14 +396,14 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
try {
return writeResponse(exchange, responseStatus, body);
} catch (Exception e) {
String errorMessage = String.format("Failed to process target response: %s, body: %s",
e.getMessage(), body);
String errorMessage = String.format("Failed to process target response: %s, body: %s,request id={}",
e.getMessage(), body, exchange.getAttributes().get(REQUEST_ID));
log.error(errorMessage);
return writeErrorResponse(exchange, HttpStatus.BAD_GATEWAY, errorMessage);
}
})
.onErrorResume(throwable -> {
log.info("Error handling response: {}", throwable.getMessage());
log.info("Error handling response: {},request id={}", throwable.getMessage(), exchange.getAttributes().get(REQUEST_ID));
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
String errorMessage = throwable.getMessage();
DataBuffer buffer = exchange.getResponse().bufferFactory()
......
......@@ -19,7 +19,7 @@ public class ScheduledTasksService {
this.taskScheduler = taskScheduler;
}
@PostConstruct
// @PostConstruct
public void scheduleTask() {
//定时刷新配置
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment