MDC分布式日志追踪

Posted by Procon on December 26, 2023

MDC分布式日志追踪 - dubbo

1.公共类:

线程传递–ThreadTraceIdUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * InheritableThreadLocal 线程传递
 * @author Sjh
 */
public class ThreadTraceIdUtil {
 
    /**使用InheritableThreadLocal便于在主子线程间传递参数*/
    private static final ThreadLocal<String> TRACE_ID = new InheritableThreadLocal<>();
 
    public ThreadTraceIdUtil() {
    }
 
    /**
     * 从当前线程局部变量获取TraceId
     * 首次调用该方法会生成traceId,后续每次都从线程上下文获取
     * @return
     */
    public static String getTraceId() {
        return TRACE_ID.get();
    }
 
    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }
 
    public static void removeTraceId() {
        TRACE_ID.remove();
    }
}

生成 TraceId - TraceIdGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 
/**
 * @author Sjh
 * 
 * 生成  TraceId
 */
@Slf4j
public class TraceIdGenerator {
 
 
    /**
     * 消费端创建TraceId,并设置到线程上下文中
     * 该方法只调用一次
     *
     * @return
     */
    public static String createTraceId() {
        // 创建的同时就设置到上下文中
        String traceId = getTraceId();
        ThreadTraceIdUtil.setTraceId(traceId);
        return traceId;
    }
 
    /**
     * 生成32位traceId
     *
     * @return
     */
    private static String getTraceId() {
        String result = "";
        String ip = "";
 
        // 获取本地ipv4地址
        try {
            InetAddress address = InetAddress.getLocalHost();
            ip = address.getHostAddress();
        } catch (Exception var5) {
            return result;
        }
 
        // 根据.截取为String数组
        String[] ipAddressInArray = ip.split("\\.");
        // 拼装为字符串,将每一个元素转换为16进制
        for (int i = 3; i >= 0; --i) {
            Integer id = Integer.parseInt(ipAddressInArray[3 - i]);
            result = result + String.format("%02x", id);
        }
        // 拼装时间戳及随机数
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
        result = result + simpleDateFormat.format(new Date()) + UUID.randomUUID().toString().substring(0, 7);
        return result;
    }
 
    /**
     * TraceId默认第一个为空,如果没值则分配一个
     *
     * @param traceId
     * @return
     */
    public static String validateTraceId(String traceId) {
        if (null == traceId) {
            traceId = createTraceId();
            if (log.isDebugEnabled()) {
                log.debug("[TraceInterceptor]首次请求未分配TraceId,生成首次TraceId={}", traceId);
            }
        }
        return traceId;
    }
}

RpcContext传递 - RpcTraceIdFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
 * 消费者织入  TraceId
 * 生产者取出  TraceId
 * @author Sjh
 */
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {
 
    /**
     * RpcContext 加入 traceId
     *
     * @param invoker
     * @param invocation
     * @return
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
 
        String traceId;
 
        //消费者
        if (rpcContext.isConsumerSide()) {
 
            traceId = MDC.get(Constants.TRACE_ID);
            if (traceId == null) {
                traceId = TraceIdGenerator.createTraceId();
            }
            rpcContext.setAttachment(Constants.TRACE_ID, traceId);
        }
 
        //生产者
        if (rpcContext.isProviderSide()) {
            traceId = rpcContext.getAttachment(Constants.TRACE_ID);
            MDC.put(Constants.TRACE_ID, traceId);
        }
 
        Result result = invoker.invoke(invocation);
 
        // after
        if (rpcContext.isProviderSide()) {
            // clear traceId from MDC
            MDC.remove(Constants.TRACE_ID);
        }
        return result;
 
    }
 
}

2.消费者端实现(http服务调用dubbo服务)

拦截web请求,生成TraceId ,并放入log4j的MDC和dubbo的RpcContext中, 通过 attachMent 把TraceId传递给生产者

InterceptorConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
 
/**
* MVC  Interceptor
* traceId的拦截器LogInterceptor到容器
* @author sjh
*/
@Configuration
//@EnableMvc 会引发其它重写冲突,注释掉
public class InterceptorConfig implements WebMvcConfigurer {
 
    /**
     * 注解LogInterceptor类到IOC容器中
     */
    @Bean
    public LogInterceptor logInterceptor() {
        return new LogInterceptor();
    }
 
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        //注册日志拦截器
        registry.addInterceptor(logInterceptor());
    }
 
    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        /**
         * 解决 webmvc 与 swagger 页面冲突的问题
         * https://blog.csdn.net/Kerwin_luo/article/details/114266444
         * @param registry
         */
        registry.addResourceHandler("/webjars/**")
                .addResourceLocations("classpath:/META-INF/resources/webjars/");
 
        //添加静态页面资源,文件下载资源等
        registry.addResourceHandler("/**").addResourceLocations("classpath:/META-INF/resources/",
                "classpath:/resources/", "classpath:/static/", "classpath:/public/","file:/E:/","file:/");
    }
 
 
}

LogInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 
/**
 * 拦截web请求
 *
 * @author Sjh
 */
@Slf4j
public class LogInterceptor implements HandlerInterceptor {
 
    /**
     * controller方法前调用
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        //如果有上层调用就用上层的ID
        String traceId = request.getHeader(Constants.TRACE_ID);
 
        traceId = TraceIdGenerator.validateTraceId(traceId);
 
        MDC.put(Constants.TRACE_ID, traceId);
        return true;
    }
 
 
    /**
     * preHandle方法返回true之后
     * 在DispatcherServlet进行视图的渲染之后调用
     */
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
            throws Exception {
        //controller结束之后删除对应的唯一值
        MDC.remove(Constants.TRACE_ID);
    }
}

dubbo的SPI扩展RpcTraceIdFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * dubbo 的Filter 重写 invoke,利用RpcContext传递 TraceId
 * 消费者织入  TraceId
 * 生产者取出  TraceId
 * @author Sjh
 */
@Slf4j
@Activate(group = {"provider", "consumer"}, order = -10000)
public class RpcTraceIdFilter implements Filter {
 
    /**
     * RpcContext 加入 traceId
     *
     * @param invoker
     * @param invocation
     * @return
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
 
        String traceId;
 
        //消费者
        if (rpcContext.isConsumerSide()) {
 
            traceId = MDC.get(Constants.TRACE_ID);
            if (traceId == null) {
                traceId = TraceIdGenerator.createTraceId();
            }
            rpcContext.setAttachment(Constants.TRACE_ID, traceId);
        }
 
        //生产者
        if (rpcContext.isProviderSide()) {
            traceId = rpcContext.getAttachment(Constants.TRACE_ID);
            MDC.put(Constants.TRACE_ID, traceId);
        }
 
        Result result = invoker.invoke(invocation);
 
        // after
        if (rpcContext.isProviderSide()) {
            // clear traceId from MDC
            MDC.remove(Constants.TRACE_ID);
        }
        return result;
 
    }
 
}

logback-spring.xml 加入 [TraceId:\%X{traceId}]

1
2
3
4
5
<!-- CONSOLE_LOG_PATTERN属性会在console-appender.xml文件中引用 -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(${spring_application_name}){cyan}[TraceId:%X{traceId}]|%clr{blue}|%clr(%d{ISO8601}){faint}|%clr(%p)|${server_port}|${PID}|%clr(%t){faint}|%clr(%.40logger{39}){cyan}.%clr(%method){cyan}:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

<!-- FILE_LOG_PATTERN属性会在logback-defaults.xml文件中引用 -->
<property name="FILE_LOG_PATTERN" value="${spring_application_name}[TraceId:%X{traceId}]|%d{ISO8601}|%p|${server_port}|${PID}|%t|%.40logger{39}.%method:%L|%m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

yml配置调整

1
2
3
consumer:
  ......
  filter: RpcTraceIdFilter

SPI扩展RpcTraceIdFilter 增加纯文本文件

1
2
3
4
5
6
7
8
9
10
路径 :

|-resources
	|-META-INF

        |-dubbo

            |-com.alibaba.dubbo.rpc.Filter  (注意:区分是alibaba dubbo 不是 apache dubbo)
内容:
rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter()            

3.消费者端实现(dubbo)

拦截Rpc请求,attachMent里面传递过来的TraceId放到MDC内容中

dubbo的SPI扩展RpcTraceIdFilter

logback-spring.xml 加入 [TraceId:\%X{traceId}]

yml配置调整

provider:
 ......
 filter: RpcTraceIdFilter

4.多线程实现

多线程ThreadPoolTaskExecutor实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 
/**
 * 线程池包装类
 *
 * @author hetiantian
 * @version 1.0
 * @Date 2020/03/18 15:29
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
 
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
 
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
 
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }
 
    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
 
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }
 
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
 
    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

SPI扩展 RpcTraceIdFilter

1
2
3
4
5
6
7
8
9
10
11
路径 :

|-resources
		|-META-INF

        	|-dubbo

            	|-com.alibaba.dubbo.rpc.Filter  (注意:区分是alibaba dubbo 不是 apache dubbo)
内容:

rpcTraceIdFilter=com.xxx.xxx.xxx.RpcTraceIdFilter(全限定名)            

实现中我是封装成一个jar包,给dubbo应用引用