Skip to content

Commit 8264938

Browse files
committed
refactor(logging): use reactive Sinks for asynchronous log processin.
1 parent 566f352 commit 8264938

File tree

6 files changed

+105
-78
lines changed

6 files changed

+105
-78
lines changed

audit/src/main/java/io/github/simpleauth0/audit/AuditLogMethodInterceptor.java

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.simpleauth0.audit;
22

33
import io.github.simpleauth0.audit.annotation.AuditLog;
4-
import io.github.simpleauth0.audit.expression.ExpressionAttribute;
4+
import io.github.simpleauth0.audit.expression.AuditLogExpressionAttribute;
55
import io.github.simpleauth0.audit.handler.AuditLogStorageHandler;
66
import io.github.simpleauth0.core.utils.AnnotationUtils;
77
import org.aopalliance.aop.Advice;
@@ -36,7 +36,7 @@ public class AuditLogMethodInterceptor implements Ordered, MethodInterceptor, Po
3636

3737
private AuditLogStorageHandler auditLogStorageHandler;
3838

39-
public AuditLogMethodInterceptor(AuditLogStorageHandler<? extends ExpressionAttribute> auditLogStorageHandler) {
39+
public AuditLogMethodInterceptor(AuditLogStorageHandler<? extends AuditLogExpressionAttribute> auditLogStorageHandler) {
4040
Assert.notNull(auditLogStorageHandler, "auditLogStorageHandler cannot be null");
4141
this.auditLogStorageHandler = auditLogStorageHandler;
4242
this.pointcut = AuditLogMethodPointcuts.forAnnotations(AuditLog.class);
@@ -58,7 +58,7 @@ public Object invoke(MethodInvocation mi) throws Throwable {
5858
throw e;
5959
} finally {
6060
AuditLogExpressionAttribute attribute = this.registry.getAttribute(mi);
61-
if (attribute != ExpressionAttribute.NULL_ATTRIBUTE) {
61+
if (attribute != AuditLogExpressionAttribute.NULL_ATTRIBUTE) {
6262
String[] parameterNames = discoverer.getParameterNames(mi.getMethod());
6363
if (parameterNames != null && parameterNames.length > 0) {
6464
attribute.setParams(parameterNames);
@@ -130,41 +130,6 @@ private AuditLog findPostFilterAnnotation(Method method) {
130130

131131
}
132132

133-
public static final class AuditLogExpressionAttribute extends ExpressionAttribute {
134-
135-
private static final AuditLogExpressionAttribute NULL_ATTRIBUTE = new AuditLogExpressionAttribute(null, null);
136-
137-
private final AuditLog auditLog;
138-
139-
private Object result;
140-
141-
private Boolean success;
142-
143-
private AuditLogExpressionAttribute(Expression expression, AuditLog auditLog) {
144-
super(expression);
145-
this.auditLog = auditLog;
146-
}
147-
148-
public void setResult(Object result) {
149-
this.result = result;
150-
}
151-
152-
public void setSuccess(Boolean success) {
153-
this.success = success;
154-
}
155-
156-
public Object getResult() {
157-
return result;
158-
}
159-
160-
public Boolean getSuccess() {
161-
return success;
162-
}
163-
164-
public AuditLog getAuditLog() {
165-
return auditLog;
166-
}
167-
}
168133
}
169134

170135

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.github.simpleauth0.audit.expression;
2+
3+
import io.github.simpleauth0.audit.annotation.AuditLog;
4+
import lombok.Data;
5+
import org.springframework.expression.Expression;
6+
7+
/**
8+
* @author: ReLive27
9+
* @date: 2025/4/28 20:53
10+
*/
11+
@Data
12+
public class AuditLogExpressionAttribute {
13+
14+
public static final AuditLogExpressionAttribute NULL_ATTRIBUTE = new AuditLogExpressionAttribute(null, null);
15+
16+
private final Expression expression;
17+
18+
private Object[] args;
19+
20+
private String[] params;
21+
22+
private final AuditLog auditLog;
23+
24+
private Object result;
25+
26+
private Boolean success;
27+
28+
public AuditLogExpressionAttribute(Expression expression, AuditLog auditLog) {
29+
this.expression = expression;
30+
this.auditLog = auditLog;
31+
}
32+
33+
public void setResult(Object result) {
34+
this.result = result;
35+
}
36+
37+
public void setSuccess(Boolean success) {
38+
this.success = success;
39+
}
40+
41+
public Object getResult() {
42+
return result;
43+
}
44+
45+
public Boolean getSuccess() {
46+
return success;
47+
}
48+
49+
public AuditLog getAuditLog() {
50+
return auditLog;
51+
}
52+
53+
public void setArgs(Object[] args) {
54+
this.args = args;
55+
}
56+
57+
public void setParams(String[] params) {
58+
this.params = params;
59+
}
60+
}

audit/src/main/java/io/github/simpleauth0/audit/expression/ExpressionAttribute.java

Lines changed: 0 additions & 32 deletions
This file was deleted.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package io.github.simpleauth0.audit.handler;
22

3-
import io.github.simpleauth0.audit.expression.ExpressionAttribute;
3+
4+
import io.github.simpleauth0.audit.expression.AuditLogExpressionAttribute;
45

56
/**
67
* @author: ReLive27
78
* @date: 2025/5/23 22:06
89
*/
9-
public interface AuditLogStorageHandler<T extends ExpressionAttribute> {
10+
public interface AuditLogStorageHandler<T extends AuditLogExpressionAttribute> {
1011

1112
void handler(T expressionAttribute);
1213
}

audit/src/main/java/io/github/simpleauth0/audit/handler/DefaultAuditLogStorageHandler.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,54 @@
22

33
import eu.bitwalker.useragentutils.OperatingSystem;
44
import eu.bitwalker.useragentutils.UserAgent;
5-
import io.github.simpleauth0.audit.AuditLogMethodInterceptor;
65
import io.github.simpleauth0.audit.annotation.AuditLog;
6+
import io.github.simpleauth0.audit.expression.AuditLogExpressionAttribute;
77
import io.github.simpleauth0.audit.repository.AuditLogEvent;
88
import io.github.simpleauth0.audit.repository.AuditLogEventRepository;
99
import io.github.simpleauth0.audit.repository.Field;
1010
import io.github.simpleauth0.core.utils.IPUtils;
1111
import io.github.simpleauth0.core.utils.JsonUtils;
12+
import lombok.extern.slf4j.Slf4j;
1213
import org.springframework.expression.EvaluationContext;
1314
import org.springframework.expression.spel.support.StandardEvaluationContext;
1415
import org.springframework.util.Assert;
1516
import org.springframework.util.StringUtils;
1617
import org.springframework.web.context.request.RequestContextHolder;
1718
import org.springframework.web.context.request.ServletRequestAttributes;
19+
import reactor.core.publisher.Sinks;
1820

1921
import javax.servlet.http.HttpServletRequest;
22+
import java.time.Duration;
2023
import java.util.*;
2124

2225
/**
2326
* @author: ReLive27
2427
* @date: 2025/5/24 21:42
2528
*/
26-
public class DefaultAuditLogStorageHandler implements AuditLogStorageHandler<AuditLogMethodInterceptor.AuditLogExpressionAttribute> {
29+
@Slf4j
30+
public class DefaultAuditLogStorageHandler implements AuditLogStorageHandler<AuditLogExpressionAttribute> {
31+
32+
private static final Sinks.Many<AuditLogEvent> sink = Sinks.many()
33+
.unicast()
34+
.onBackpressureBuffer();
2735

2836
private final AuditLogEventRepository auditLogRepository;
2937

3038
public DefaultAuditLogStorageHandler(AuditLogEventRepository auditLogRepository) {
3139
Assert.notNull(auditLogRepository, "auditLogRepository can not be null");
3240
this.auditLogRepository = auditLogRepository;
41+
// 异步订阅处理,批量保存
42+
sink.asFlux()
43+
.bufferTimeout(10, Duration.ofSeconds(3))
44+
.flatMap(this.auditLogRepository::save) // 异步保存
45+
.doOnNext(batch -> log.debug("已保存日志数量:" + batch))
46+
.doOnError(e -> log.error("日志流异常:" + e.getMessage()))
47+
.onErrorContinue((e, o) -> log.error("单条日志保存失败: " + o))
48+
.subscribe();
3349
}
3450

3551
@Override
36-
public void handler(AuditLogMethodInterceptor.AuditLogExpressionAttribute expressionAttribute) {
52+
public void handler(AuditLogExpressionAttribute expressionAttribute) {
3753
AuditLogEvent auditEvent = new AuditLogEvent();
3854
AuditLog auditLog = expressionAttribute.getAuditLog();
3955
auditEvent.setEventType(auditLog.resourceType());
@@ -70,6 +86,21 @@ public void handler(AuditLogMethodInterceptor.AuditLogExpressionAttribute expres
7086
fieldList.add(new Field("request_param", JsonUtils.toJson(requestParams), null));
7187
fieldList.add(new Field("response_body", expressionAttribute.getResult() != null ? JsonUtils.toJson(expressionAttribute.getResult()) : "", null));
7288
auditEvent.setFields(fieldList);
73-
this.auditLogRepository.save(auditEvent);
89+
90+
save(auditEvent);
91+
}
92+
93+
public void save(AuditLogEvent logEvent) {
94+
sink.emitNext(logEvent, (signalType, emitResult) -> {
95+
if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
96+
// Reactor 自身会重试;返回 true 表示可重试
97+
return true;
98+
}
99+
if (emitResult.isFailure()) {
100+
log.warn("日志发送失败: " + emitResult);
101+
}
102+
// 返回 false 表示不重试
103+
return false;
104+
});
74105
}
75106
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package io.github.simpleauth0.audit.repository;
22

3-
import reactor.core.publisher.Mono;
3+
import reactor.core.publisher.Flux;
4+
5+
import java.util.List;
46

57
/**
68
* @author: ReLive27
79
* @date: 2025/6/30 22:56
810
*/
911
public interface AuditLogEventRepository {
1012

11-
Mono<Void> save(AuditLogEvent event);
13+
Flux<Integer> save(List<AuditLogEvent> auditLogEvents);
1214

1315
}

0 commit comments

Comments
 (0)