package com.gxl.core.tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import redis.clients.jedis.JedisCluster;
/**
* 拦截RPC请求进行调用追踪和收集数据
*
* @author gaoxianglong
*/
@SuppressWarnings("resource")
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER })
public class TraceFilter implements Filter {
private TraceHandler traceHandler;
private static TraceDao traceDao;
public static JedisCluster jedisCluster;
/* 采样率计数器 */
private Integer samplingCounter = 1;
/* 采样率 */
public static Integer samplingNum;
private static Logger logger = LoggerFactory.getLogger(TraceFilter.class);
public TraceFilter() {
traceHandler = new TraceHandler();
}
static {
try {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:tracing-context.xml");
traceDao = (TraceDao) context.getBean("traceDao");
jedisCluster = (JedisCluster) context.getBean("jedisCluster");
} catch (Exception e) {
logger.error("error", e);
}
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
logger.info("hashCode-->" + this.hashCode() + "\tthreadID-->" + Thread.currentThread().getId());
final long BEFORE_TIME = System.currentTimeMillis();
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
/* 获取当前Span过程的状态记录信息 */
RpcContext context = RpcContext.getContext();
final boolean IS_CONSUMER = context.isConsumerSide();
final boolean IS_PROVIDER = context.isProviderSide();
final String HOST = context.getLocalHost();
final Integer PORT = context.getLocalPort();
final String SERVICE_NAME = context.getUrl().getServiceInterface();
String METHOD_NAME = context.getMethodName();
/* 从ThreadLocal中获取当前线程的Trace上下文信息 */
TraceBean traceBean = traceHandler.getTrace().get();
/* 判断是服务提供方还是调用方 */
if (IS_CONSUMER) {
/* 如果ThreadLocal中不包含当前线程的Trace上下文信息则意味着是根调用,需要创建TraceID */
if (null == traceBean) {
synchronized (samplingCounter) {
if (samplingCounter != samplingNum) {
logger.info("serviceName-->" + SERVICE_NAME + "不需要采样");
rpcInvocation.setAttachment("isSampling", "N");
/* 根据请求数递增采样率计数器 */
samplingCounter++;
return invoker.invoke(rpcInvocation);
}
/* 重置采样率计数器 */
samplingCounter = 1;
logger.info("serviceName-->" + SERVICE_NAME + "需要采样");
rpcInvocation.setAttachment("isSampling", "Y");
traceBean = traceHandler.createTracer(HOST, PORT, SERVICE_NAME, METHOD_NAME);
}
} else {
/* 检测是否需要采样 */
if (!consumerIsSampling(traceBean, rpcInvocation))
return invoker.invoke(rpcInvocation);
final Long TRACE_ID = traceBean.getTraceId();
/**
* 重设当前线程的调用链上下文信息
*
* 一次请求的Trace通过TraceID串联起来,服务之间的依赖关系、调用顺序
* 通过parentSpanID和SpanID保证,
* 服务提供方才会将Trace上下文信息存储在当前线程的ThreadLocal中,SpanID每次都是递增1的,
* 而parentSpanID则是取它的上一个Span过程的SpanID,这样就可以明确服务调用的调用顺序和依赖关系
*/
traceBean = traceHandler.createTracer(TRACE_ID, traceHandler.getSpanID(TRACE_ID, jedisCluster),
traceBean.getSpanId(), HOST, PORT, SERVICE_NAME, METHOD_NAME);
}
traceBean.setEventType(0);
/* 将Trace上下文信息设置到Invocation中 */
setAnnotation(rpcInvocation, traceBean);
}
if (IS_PROVIDER) {
/* 检测是否需要采样 */
if (!providerIsSampling(rpcInvocation))
return invoker.invoke(rpcInvocation);
/* 从Invocation中获取调用方传递过来的调用链上下文信息 */
final Long TRACE_ID = Long.parseLong(rpcInvocation.getAttachment("traceID"));
final Integer SPAN_ID = Integer.parseInt(rpcInvocation.getAttachment("spanID"));
final Integer PARENT_SPAN_ID = Integer.parseInt(rpcInvocation.getAttachment("parentSpanID"));
traceBean = traceHandler.createTracer(TRACE_ID, SPAN_ID, PARENT_SPAN_ID, HOST, PORT, SERVICE_NAME,
METHOD_NAME);
traceBean.setEventType(1);
}
/* 前置数据收集 */
beforeDataCollect(traceBean, BEFORE_TIME);
Result result = invoker.invoke(rpcInvocation);
/* 后置数据收集 */
afterDataCollect(traceBean, System.currentTimeMillis());
return result;
}
/**
* 设置需要传递给服务提供方的调用链上下文信息
*
* @author gaoxianglong
*/
public void setAnnotation(RpcInvocation invocation, TraceBean trace) {
invocation.setAttachment("traceID", String.valueOf(trace.getTraceId()));
invocation.setAttachment("spanID", String.valueOf(trace.getSpanId()));
invocation.setAttachment("parentSpanID", String.valueOf(trace.getParentSpanId()));
}
/**
* 前置数据收集
*
* @author gaoxianglong
*/
public void beforeDataCollect(TraceBean trace, long time) {
/* 事件类型,0为调用方,1为提供方 */
if (0 == trace.getEventType()) {
traceDao.insertClientSendTime(trace, time);
} else {
traceDao.insertServerReceiveTime(trace, time);
/* 将Trace上下文信息设置在ThreadLocal中 */
traceHandler.getTrace().set(trace);
}
}
/**
* 后置数据收集
*
* @author gaoxianglong
*/
public void afterDataCollect(TraceBean trace, long time) {
if (0 == trace.getEventType()) {
traceDao.insertClientReceiveTime(trace, time);
} else {
traceDao.insertServerSendTime(trace, time);
traceHandler.getTrace().remove();
}
/* 如果是根服务调用则删除存放在Redis中的一次请求的Span记录 */
if (trace.isRootSpan()) {
jedisCluster.del(String.valueOf(trace.getTraceId()));
}
}
/**
* 服务调用方是否采样检测
*
* @author gaoxianglong
*/
public boolean consumerIsSampling(TraceBean traceBean, RpcInvocation rpcInvocation) {
boolean result = false;
if ("N".equals(traceBean.getIsSampling())) {
rpcInvocation.setAttachment("isSampling", "N");
} else {
rpcInvocation.setAttachment("isSampling", "Y");
result = true;
}
return result;
}
/**
* 服务提供方是否采样检测
*
* @author gaoxianglong
*/
public boolean providerIsSampling(RpcInvocation rpcInvocation) {
boolean result = false;
TraceBean traceBean = traceHandler.createTracer();
if ("N".equals(rpcInvocation.getAttachment("isSampling"))) {
traceBean.setIsSampling("N");
} else {
traceBean.setIsSampling("Y");
result = true;
}
traceHandler.getTrace().set(traceBean);
return result;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
基于Dubbo埋点的分布式调用跟踪系统.zip
共18个文件
java:8个
xml:3个
properties:3个
需积分: 1 0 下载量 155 浏览量
2023-08-20
20:43:22
上传
评论
收藏 18KB ZIP 举报
温馨提示
springboot、Dubbo、MySQL,源码web系统,框架,代码均经过严格测试,可直接运行,有需要可自取
资源推荐
资源详情
资源评论
收起资源包目录
基于Dubbo埋点的分布式调用跟踪系统.zip (18个子文件)
content_code
pom.xml 7KB
src
test
resources
log4j.xml 952B
tracing-context.xml 4KB
properties
zk.properties 131B
create.sql 659B
druid-jdbc.properties 490B
redis.properties 184B
META-INF
dubbo
com.alibaba.dubbo.rpc.Filter 41B
main
java
com
gxl
zk
SamplingRateWatcher.java 1KB
ZKConnectionManager.java 4KB
utils
CreateTraceID.java 549B
core
tracing
TraceBean.java 2KB
TraceDaoImpl.java 2KB
TraceFilter.java 7KB
TraceHandler.java 2KB
TraceDao.java 653B
LICENSE 11KB
.gitignore 189B
共 18 条
- 1
资源评论
白话机器学习
- 粉丝: 8191
- 资源: 7687
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- JSP-JTBC-CMS(SQLITE).rar
- MC3362和MC145151调频无线接收器的设计.pdf
- MiniRenamer-v100.0一款简单易用的批量文件重命名工具(已注册PRO版本).rar
- 小狐狸Ai系统 小狐狸ai付费创作系统V2.8.0 ChatGPT智能机器人
- 公孙离-内衣-肚兜.zipgsl
- 快慢指针判断链表是否有环-go 语言实现
- 学生成绩管理系统的设计与实现-收藏备用.pdf
- JSP+SQL网站流量统计管理系统(源代码+论文).rar
- IBM-PC-XT微机过程...道中模拟量数据的采集和处理.pdf
- JSP+SQL网上选课系统(源代码+论文+答辩PPT).rar
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功