Commit 0745b64b authored by zhouwei's avatar zhouwei

新增超时转移逻辑

parent e8c85f2b
...@@ -12,6 +12,12 @@ import org.springframework.context.annotation.Configuration; ...@@ -12,6 +12,12 @@ import org.springframework.context.annotation.Configuration;
public class AppConfig { public class AppConfig {
@Value("${alert.host}") @Value("${alert.host}")
String alertHost; String alertHost;
@Value("${gateway.request.timeout:100000}")
private int requestTimeout;
public int getRequestTimeout() {
return requestTimeout;
}
public String getAlertHost() { public String getAlertHost() {
return alertHost; return alertHost;
......
...@@ -2,6 +2,7 @@ package com.nanyan.securitylink.filter; ...@@ -2,6 +2,7 @@ package com.nanyan.securitylink.filter;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.nanyan.securitylink.config.AppConfig;
import com.nanyan.securitylink.service.AlertService; import com.nanyan.securitylink.service.AlertService;
import com.nanyan.securitylink.service.TokenRouteMappingService; import com.nanyan.securitylink.service.TokenRouteMappingService;
import lombok.Data; import lombok.Data;
...@@ -21,7 +22,9 @@ import org.springframework.web.reactive.function.BodyInserters; ...@@ -21,7 +22,9 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -36,8 +39,11 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -36,8 +39,11 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private static final int WINDOW_SIZE_SECONDS = 100; // 1分钟窗口 private static final int WINDOW_SIZE_SECONDS = 100; // 1分钟窗口
private static final double ERROR_THRESHOLD = 0.5; // 75%错误率阈值 private static final double ERROR_THRESHOLD = 0.5; // 75%错误率阈值
private static Long lastAlarmTime = System.currentTimeMillis(); private static Long lastAlarmTime = System.currentTimeMillis();
@Autowired @Autowired
AlertService alertService; AlertService alertService;
@Autowired
AppConfig appConfig;
private final Map<String, CircularQueue> errorStatsMap = new ConcurrentHashMap<>(); private final Map<String, CircularQueue> errorStatsMap = new ConcurrentHashMap<>();
public FailoverGatewayFilterFactory(WebClient.Builder webClientBuilder, public FailoverGatewayFilterFactory(WebClient.Builder webClientBuilder,
...@@ -98,7 +104,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -98,7 +104,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private boolean shouldSkipPrimary(String primaryUrl) { private boolean shouldSkipPrimary(String primaryUrl) {
CircularQueue stats = getOrCreateErrorStats(primaryUrl); CircularQueue stats = getOrCreateErrorStats(primaryUrl);
double errorRate = stats.getErrorRate(); double errorRate = stats.getErrorRate();
log.info("rate: {}%, url: {}",errorRate * 100, primaryUrl); log.info("rate: {}%, url: {}", errorRate * 100, primaryUrl);
boolean skip = errorRate >= ERROR_THRESHOLD; boolean skip = errorRate >= ERROR_THRESHOLD;
if (skip) { if (skip) {
// 发送告警 // 发送告警
...@@ -108,13 +114,15 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -108,13 +114,15 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
} }
return skip; return skip;
} }
private final Map<String, Long> alarmSentMap = new ConcurrentHashMap<>(); private final Map<String, Long> alarmSentMap = new ConcurrentHashMap<>();
private void sendAlarmIfNeeded(String primaryUrl, double errorRate) { private void sendAlarmIfNeeded(String primaryUrl, double errorRate) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
Long lastAlarmTime = alarmSentMap.get(primaryUrl); Long lastAlarmTime = alarmSentMap.get(primaryUrl);
// 如果从未发送过告警,或者距离上次告警已经超过冷却时间 // 如果从未发送过告警,或者距离上次告警已经超过冷却时间
if (lastAlarmTime == null || (currentTime - lastAlarmTime) > 20*60*1000) { if (lastAlarmTime == null || (currentTime - lastAlarmTime) > 20 * 60 * 1000) {
String alarmMessage = String.format( String alarmMessage = String.format(
"Service degradation detected: %s has high error rate (%.2f%%), switching to fallback service", "Service degradation detected: %s has high error rate (%.2f%%), switching to fallback service",
primaryUrl, errorRate * 100); primaryUrl, errorRate * 100);
...@@ -146,6 +154,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -146,6 +154,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri); log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri);
String primaryUrl = uriConfig.getPrimaryUrl(); String primaryUrl = uriConfig.getPrimaryUrl();
CircularQueue errorStats = getOrCreateErrorStats(primaryUrl); CircularQueue errorStats = getOrCreateErrorStats(primaryUrl);
long startTime = System.currentTimeMillis();
// 检查是否应该跳过主地址 // 检查是否应该跳过主地址
if (shouldSkipPrimary(uriConfig.getPrimaryUrl())) { if (shouldSkipPrimary(uriConfig.getPrimaryUrl())) {
//告警日志 //告警日志
...@@ -172,25 +181,29 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -172,25 +181,29 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
exchange, exchange,
true, true,
errorStats errorStats
).onErrorResume(primaryError -> { ).timeout(Duration.ofMillis(appConfig.getRequestTimeout()))
log.error("Primary endpoint failed: {}, error: {}", .onErrorResume(primaryError -> {
uriConfig.getPrimaryUrl() + sourceUri, log.error("Primary endpoint failed: {}, error: {}",
primaryError.getMessage()); uriConfig.getPrimaryUrl() + sourceUri,
// 记录错误 primaryError.getMessage());
getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false); long duration = System.currentTimeMillis() - startTime;
log.info("Source request timed out after {}ms, switching to target: {}",
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig); duration, uriConfig.getTargetUri());
return tryRequest( // 记录错误
uriConfig.getFallbackUrl(), getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false);
targetUri,
uriConfig.getFallbackHost(), String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
uriConfig.getModel(), return tryRequest(
uriConfig.getModelKey(), uriConfig.getFallbackUrl(),
exchange, targetUri,
false, uriConfig.getFallbackHost(),
null uriConfig.getModel(),
); uriConfig.getModelKey(),
}); exchange,
false,
null
);
});
}); });
}) })
.orElse(chain.filter(exchange)); .orElse(chain.filter(exchange));
...@@ -275,7 +288,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -275,7 +288,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private Mono<Void> handleResponse(ServerWebExchange exchange, private Mono<Void> handleResponse(ServerWebExchange exchange,
org.springframework.web.reactive.function.client.ClientResponse clientResponse, org.springframework.web.reactive.function.client.ClientResponse clientResponse,
boolean isSourceRequest,CircularQueue errorStats) { boolean isSourceRequest, CircularQueue errorStats) {
HttpStatus responseStatus = clientResponse.statusCode(); HttpStatus responseStatus = clientResponse.statusCode();
if (isSourceRequest) { if (isSourceRequest) {
...@@ -297,19 +310,19 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F ...@@ -297,19 +310,19 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
JSONObject jsonBody = JSON.parseObject(body); JSONObject jsonBody = JSON.parseObject(body);
boolean success = false; boolean success = false;
String bodyStatus = ""; String bodyStatus = "";
if(jsonBody.containsKey("status")){ if (jsonBody.containsKey("status")) {
Integer status = jsonBody.getInteger("status"); Integer status = jsonBody.getInteger("status");
bodyStatus = status +""; bodyStatus = status + "";
} }
if(jsonBody.containsKey("answer")){ if (jsonBody.containsKey("answer")) {
success = true; success = true;
} }
if(jsonBody.containsKey("data")){ if (jsonBody.containsKey("data")) {
JSONObject data = jsonBody.getJSONObject("data"); JSONObject data = jsonBody.getJSONObject("data");
if (data.containsKey("status")){ if (data.containsKey("status")) {
String status = data.getString("status"); String status = data.getString("status");
if("succeeded".equalsIgnoreCase(status)){ if ("succeeded".equalsIgnoreCase(status)) {
success = true; success = true;
} }
bodyStatus = status; bodyStatus = status;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment