MDC分布式日志追踪 - dubbo
1.公共类:
线程传递–ThreadTraceIdUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* InheritableThreadLocal 线程传递
* @author Sjh
*/
public class ThreadTraceIdUtil {
/**使用InheritableThreadLocal便于在主子线程间传递参数*/
private static final ThreadLocal<String> TRACE_ID = new InheritableThreadLocal<>();
public ThreadTraceIdUtil() {
}
/**
* 从当前线程局部变量获取TraceId
* 首次调用该方法会生成traceId,后续每次都从线程上下文获取
* @return
*/
public static String getTraceId() {
return TRACE_ID.get();
}
public static void setTraceId(String traceId) {
TRACE_ID.set(traceId);
}
public static void removeTraceId() {
TRACE_ID.remove();
}
}
生成 TraceId - TraceIdGenerator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @author Sjh
*
* 生成 TraceId
*/
@Slf4j
public class TraceIdGenerator {
/**
* 消费端创建TraceId,并设置到线程上下文中
* 该方法只调用一次
*
* @return
*/
public static String createTraceId() {
// 创建的同时就设置到上下文中
String traceId = getTraceId();
ThreadTraceIdUtil.setTraceId(traceId);
return traceId;
}
/**
* 生成32位traceId
*
* @return
*/
private static String getTraceId() {
String result = "";
String ip = "";
// 获取本地ipv4地址
try {
InetAddress address = InetAddress.getLocalHost();
ip = address.getHostAddress();
} catch (Exception var5) {
return result;
}
// 根据.截取为String数组
String[] ipAddressInArray = ip.split("\\.");
// 拼装为字符串,将每一个元素转换为16进制
for (int i = 3; i >= 0; --i) {
Integer id = Integer.parseInt(ipAddressInArray[3 - i]);
result = result + String.format("%02x", id);
}
// 拼装时间戳及随机数
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
result = result + simpleDateFormat.format(new Date()) + UUID.randomUUID().toString().substring(0, 7);
return result;
}
/**
* TraceId默认第一个为空,如果没值则分配一个
*
* @param traceId
* @return
*/
public static String validateTraceId(String traceId) {
if (null == traceId) {
traceId = createTraceId();
if (log.isDebugEnabled()) {
log.debug("[TraceInterceptor]首次请求未分配TraceId,生成首次TraceId={}", traceId);
}
}
return traceId;
}
}
RpcContext传递 - RpcTraceIdFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
* 消费者织入 TraceId
* 生产者取出 TraceId
* @author Sjh
*/
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {
/**
* RpcContext 加入 traceId
*
* @param invoker
* @param invocation
* @return
* @throws RpcException
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String traceId;
//消费者
if (rpcContext.isConsumerSide()) {
traceId = MDC.get(Constants.TRACE_ID);
if (traceId == null) {
traceId = TraceIdGenerator.createTraceId();
}
rpcContext.setAttachment(Constants.TRACE_ID, traceId);
}
//生产者
if (rpcContext.isProviderSide()) {
traceId = rpcContext.getAttachment(Constants.TRACE_ID);
MDC.put(Constants.TRACE_ID, traceId);
}
Result result = invoker.invoke(invocation);
// after
if (rpcContext.isProviderSide()) {
// clear traceId from MDC
MDC.remove(Constants.TRACE_ID);
}
return result;
}
}
2.消费者端实现(http服务调用dubbo服务)
拦截web请求,生成TraceId ,并放入log4j的MDC和dubbo的RpcContext中, 通过 attachMent 把TraceId传递给生产者
InterceptorConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* MVC Interceptor
* traceId的拦截器LogInterceptor到容器
* @author sjh
*/
@Configuration
//@EnableMvc 会引发其它重写冲突,注释掉
public class InterceptorConfig implements WebMvcConfigurer {
/**
* 注解LogInterceptor类到IOC容器中
*/
@Bean
public LogInterceptor logInterceptor() {
return new LogInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
//注册日志拦截器
registry.addInterceptor(logInterceptor());
}
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
/**
* 解决 webmvc 与 swagger 页面冲突的问题
* https://blog.csdn.net/Kerwin_luo/article/details/114266444
* @param registry
*/
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
//添加静态页面资源,文件下载资源等
registry.addResourceHandler("/**").addResourceLocations("classpath:/META-INF/resources/",
"classpath:/resources/", "classpath:/static/", "classpath:/public/","file:/E:/","file:/");
}
}
LogInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 拦截web请求
*
* @author Sjh
*/
@Slf4j
public class LogInterceptor implements HandlerInterceptor {
/**
* controller方法前调用
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//如果有上层调用就用上层的ID
String traceId = request.getHeader(Constants.TRACE_ID);
traceId = TraceIdGenerator.validateTraceId(traceId);
MDC.put(Constants.TRACE_ID, traceId);
return true;
}
/**
* preHandle方法返回true之后
* 在DispatcherServlet进行视图的渲染之后调用
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {
//controller结束之后删除对应的唯一值
MDC.remove(Constants.TRACE_ID);
}
}
dubbo的SPI扩展RpcTraceIdFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
* 消费者织入 TraceId
* 生产者取出 TraceId
* @author Sjh
*/
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {
/**
* RpcContext 加入 traceId
*
* @param invoker
* @param invocation
* @return
* @throws RpcException
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String traceId;
//消费者
if (rpcContext.isConsumerSide()) {
traceId = MDC.get(Constants.TRACE_ID);
if (traceId == null) {
traceId = TraceIdGenerator.createTraceId();
}
rpcContext.setAttachment(Constants.TRACE_ID, traceId);
}
//生产者
if (rpcContext.isProviderSide()) {
traceId = rpcContext.getAttachment(Constants.TRACE_ID);
MDC.put(Constants.TRACE_ID, traceId);
}
Result result = invoker.invoke(invocation);
// after
if (rpcContext.isProviderSide()) {
// clear traceId from MDC
MDC.remove(Constants.TRACE_ID);
}
return result;
}
}
logback-spring.xml 加入 [TraceId:\%X{traceId}]
1
2
3
4
5
<!-- CONSOLE_LOG_PATTERN属性会在console-appender.xml文件中引用 -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(${spring_application_name}){cyan}[TraceId:%X{traceId}]|%clr{blue}|%clr(%d{ISO8601}){faint}|%clr(%p)|${server_port}|${PID}|%clr(%t){faint}|%clr(%.40logger{39}){cyan}.%clr(%method){cyan}:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<!-- FILE_LOG_PATTERN属性会在logback-defaults.xml文件中引用 -->
<property name="FILE_LOG_PATTERN" value="${spring_application_name}[TraceId:%X{traceId}]|%d{ISO8601}|%p|${server_port}|${PID}|%t|%.40logger{39}.%method:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
yml配置调整
1
2
3
consumer:
......
filter: RpcTraceIdFilter
SPI扩展RpcTraceIdFilter 增加纯文本文件
1
2
3
4
5
6
7
8
9
10
路径 :
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.Filter (注意:区分是alibaba dubbo 不是 apache dubbo)
内容:
rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter()
3.消费者端实现(dubbo)
拦截Rpc请求,attachMent里面传递过来的TraceId放到MDC内容中
dubbo的SPI扩展RpcTraceIdFilter
logback-spring.xml 加入 [TraceId:\%X{traceId}]
yml配置调整
provider:
......
filter: RpcTraceIdFilter
4.多线程实现
多线程ThreadPoolTaskExecutor实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 线程池包装类
*
* @author hetiantian
* @version 1.0
* @Date 2020/03/18 15:29
*/
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
SPI扩展 RpcTraceIdFilter
1
2
3
4
5
6
7
8
9
10
11
路径 :
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.Filter (注意:区分是alibaba dubbo 不是 apache dubbo)
内容:
rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter(全限定名)
实现中我是封装成一个jar包,给dubbo应用引用