Commit 2691b976 authored by zhouwei's avatar zhouwei

add alarm service

parent 18300239
package com.nanyan.securitylink.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
@Slf4j
@RefreshScope
@Configuration
public class AppConfig {
@Value("${alert.host}")
String alertHost;
public String getAlertHost() {
return alertHost;
}
}
package com.nanyan.securitylink.entity;
import com.nanyan.securitylink.common.MsgCode;
public class Response<T> {
String msg;
int code;
T data;
public static Response getResponse(MsgCode msgCode){
Response response = new Response();
response.setCode(msgCode.getCode());
response.setMsg(msgCode.getMsg());
return response;
}
public static Response getResponse(int code, String msg){
Response response = new Response();
response.setCode(code);
response.setMsg(msg);
return response;
}
public static<T> Response<T> SUCCESS(T data){
Response response = new Response<T>();
response.setCode(MsgCode.SUCCESS.getCode());
response.setMsg(MsgCode.SUCCESS.getMsg());
response.setData(data);
return response;
}
public static<T> Response<T> FAILED(T data){
Response response = new Response();
response.setCode(MsgCode.FAILED.getCode());
response.setMsg(MsgCode.FAILED.getMsg());
response.setData(data);
return response;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
package com.nanyan.securitylink.entity;
import lombok.Data;
@Data
public class WeChatMsg {
String msgtype;
MsgText text;
MsgText markdown;
public static class MsgText{
public String content;
}
}
......@@ -2,9 +2,11 @@ package com.nanyan.securitylink.filter;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nanyan.securitylink.service.AlertService;
import com.nanyan.securitylink.service.TokenRouteMappingService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.io.buffer.DataBuffer;
......@@ -20,6 +22,8 @@ import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
......@@ -28,6 +32,12 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
private static final ThreadLocal<String> REQUEST_BODY_CACHE = new ThreadLocal<>();
private final WebClient.Builder webClientBuilder;
private final TokenRouteMappingService tokenRouteMappingService;
// 添加错误统计
private static final int WINDOW_SIZE_SECONDS = 10*60; // 1分钟窗口
private static final double ERROR_THRESHOLD = 0.5; // 75%错误率阈值
@Autowired
AlertService alertService;
private final Map<String, CircularQueue> errorStatsMap = new ConcurrentHashMap<>();
public FailoverGatewayFilterFactory(WebClient.Builder webClientBuilder,
TokenRouteMappingService tokenRouteMappingService) {
......@@ -36,6 +46,88 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
this.tokenRouteMappingService = tokenRouteMappingService;
}
// 内部类:环形队列,用于记录请求结果
private static class CircularQueue {
private final long[] timestamps;
private final boolean[] results; // true表示成功,false表示失败
private int currentIndex = 0;
private final int capacity;
private final Object lock = new Object();
public CircularQueue(int capacity) {
this.capacity = capacity;
this.timestamps = new long[capacity];
this.results = new boolean[capacity];
}
public void add(boolean success) {
synchronized (lock) {
timestamps[currentIndex] = System.currentTimeMillis();
results[currentIndex] = success;
currentIndex = (currentIndex + 1) % capacity;
}
}
public double getErrorRate() {
synchronized (lock) {
long currentTime = System.currentTimeMillis();
long oneMinuteAgo = currentTime - WINDOW_SIZE_SECONDS * 1000;
int totalRequests = 0;
int failedRequests = 0;
for (int i = 0; i < capacity; i++) {
if (timestamps[i] > oneMinuteAgo) {
totalRequests++;
if (!results[i]) {
failedRequests++;
}
}
}
return totalRequests == 0 ? 0 : (double) failedRequests / totalRequests;
}
}
}
private CircularQueue getOrCreateErrorStats(String primaryUrl) {
return errorStatsMap.computeIfAbsent(primaryUrl,
k -> new CircularQueue(WINDOW_SIZE_SECONDS * 10)); // 存储10次/秒的采样
}
private boolean shouldSkipPrimary(String primaryUrl) {
CircularQueue stats = getOrCreateErrorStats(primaryUrl);
double errorRate = stats.getErrorRate();
log.info("rate: {}%, url: {}",errorRate * 100, primaryUrl);
boolean skip = errorRate >= ERROR_THRESHOLD;
if (skip) {
// 发送告警
sendAlarmIfNeeded(primaryUrl, errorRate);
log.warn("Skipping primary endpoint due to high error rate: {}%, url: {}",
errorRate * 100, primaryUrl);
}
return skip;
}
private final Map<String, Long> alarmSentMap = new ConcurrentHashMap<>();
private void sendAlarmIfNeeded(String primaryUrl, double errorRate) {
long currentTime = System.currentTimeMillis();
Long lastAlarmTime = alarmSentMap.get(primaryUrl);
// 如果从未发送过告警,或者距离上次告警已经超过冷却时间
if (lastAlarmTime == null || (currentTime - lastAlarmTime) > WINDOW_SIZE_SECONDS*1000) {
String alarmMessage = String.format(
"Service degradation detected: %s has high error rate (%.2f%%), switching to fallback service",
primaryUrl, errorRate * 100);
try {
alertService.sendAlertMsg("dify 异常,已经切换到JAVA服务");
alarmSentMap.put(primaryUrl, currentTime);
log.warn("Alarm sent: {}", alarmMessage);
} catch (Exception e) {
log.error("Failed to send alarm: {}", e.getMessage());
}
}
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
......@@ -51,6 +143,25 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
.map(uriConfig -> {
return cacheRequestBody(exchange).flatMap(cachedBody -> {
log.info("Found matching URI config for token: {}, sourceUri: {}", token, sourceUri);
String primaryUrl = uriConfig.getPrimaryUrl();
CircularQueue errorStats = getOrCreateErrorStats(primaryUrl);
// 检查是否应该跳过主地址
if (shouldSkipPrimary(uriConfig.getPrimaryUrl())) {
//告警日志
log.info("Directly using fallback due to high error rate");
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
uriConfig.getFallbackUrl(),
targetUri,
uriConfig.getFallbackHost(),
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false,
null
);
}
return tryRequest(
uriConfig.getPrimaryUrl(),
sourceUri,
......@@ -58,11 +169,14 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
true
true,
errorStats
).onErrorResume(primaryError -> {
log.error("Primary endpoint failed: {}, error: {}",
uriConfig.getPrimaryUrl() + sourceUri,
primaryError.getMessage());
// 记录错误
getOrCreateErrorStats(uriConfig.getPrimaryUrl()).add(false);
String targetUri = tokenRouteMappingService.resolveTargetUri(sourceUri, uriConfig);
return tryRequest(
......@@ -72,7 +186,8 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
uriConfig.getModel(),
uriConfig.getModelKey(),
exchange,
false
false,
null
);
});
});
......@@ -114,7 +229,7 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
}
private Mono<Void> tryRequest(String baseUrl, String uri, String host, String model, String modelKey,
ServerWebExchange exchange, boolean isSourceRequest) {
ServerWebExchange exchange, boolean isSourceRequest, CircularQueue errorStats) {
String cachedBody = REQUEST_BODY_CACHE.get();
ServerHttpRequest request = exchange.getRequest();
String fullUrl = baseUrl + (baseUrl.endsWith("/") ? uri.substring(1) : uri);
......@@ -146,12 +261,12 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
return requestBodySpec
.exchange()
.flatMap(clientResponse -> handleResponse(exchange, clientResponse, isSourceRequest));
.flatMap(clientResponse -> handleResponse(exchange, clientResponse, isSourceRequest, errorStats));
}
private Mono<Void> handleResponse(ServerWebExchange exchange,
org.springframework.web.reactive.function.client.ClientResponse clientResponse,
boolean isSourceRequest) {
boolean isSourceRequest,CircularQueue errorStats) {
HttpStatus responseStatus = clientResponse.statusCode();
if (isSourceRequest) {
......@@ -160,6 +275,10 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
log.info("Source request response: status={}, body={}", responseStatus, body);
if (responseStatus != HttpStatus.OK) {
// 只在 source 请求时记录错误
if (errorStats != null) {
errorStats.add(false);
}
return Mono.error(new RuntimeException(
String.format("Source request HTTP status not 200: %s, body: %s",
responseStatus.value(), body)));
......@@ -169,10 +288,18 @@ public class FailoverGatewayFilterFactory extends AbstractGatewayFilterFactory<F
JSONObject jsonBody = JSON.parseObject(body);
Integer bodyStatus = jsonBody.getInteger("status");
if (bodyStatus == null || bodyStatus != 200) {
// 只在 source 请求时记录错误
if (errorStats != null) {
errorStats.add(false);
}
return Mono.error(new RuntimeException(
String.format("Source request body status not 200: %s, body: %s",
bodyStatus, body)));
}
// 只在 source 请求时记录错误
if (errorStats != null) {
errorStats.add(true);
}
exchange.getResponse().setStatusCode(responseStatus);
exchange.getResponse().getHeaders().putAll(clientResponse.headers().asHttpHeaders());
......
package com.nanyan.securitylink.service;
import com.alibaba.fastjson.JSONObject;
import com.nanyan.securitylink.config.AppConfig;
import com.nanyan.securitylink.entity.WeChatMsg;
import com.nanyan.securitylink.vo.Response;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.StandardCharsets;
@Slf4j
@Service
public class AlertService {
@Autowired
RestTemplate restTemplate;
@Autowired
AppConfig appConfig;
public boolean sendAlertMsg(String msg){
WeChatMsg weChatMsg = buildAlertMsg(msg);
log.info("发送企业微信msg:{}", JSONObject.toJSONString(weChatMsg));
HttpEntity<String> entity = new HttpEntity<>(JSONObject.toJSONString(weChatMsg),getHeader().getHeaders());
ResponseEntity<Response> response = restTemplate.postForEntity(String.format("%s/api/v1/msg", appConfig.getAlertHost()), entity, Response.class);
log.info("发送企业微信返回:{}", JSONObject.toJSONString(response));
return true;
}
private WeChatMsg buildAlertMsg(String content) {
if(content.length()>4000){
content = content.substring(0,4000);
}
WeChatMsg weChatMsg = new WeChatMsg();
weChatMsg.setMsgtype("markdown");
WeChatMsg.MsgText msgText = new WeChatMsg.MsgText();
String builder = "## dify异常\n" +
">内容:<font color=\"comment\"> " + content + "</font>\n";
msgText.content = builder;
weChatMsg.setMarkdown(msgText);
return weChatMsg;
}
private HttpEntity getHeader(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set("Content-Type","application/json");
return new HttpEntity<>(httpHeaders);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment