您的位置:首页 > 理论基础 > 计算机网络

Java分布式跟踪系统Zipkin(四):Brave源码分析-HttpTracing

2017-12-05 16:55 381 查看
所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

上一篇博文中,我们分析了Tracing的相关源代码,这一篇我们来看看Brave是如何在Web项目中使用的

我们先来看看普通的servlet项目中,如何使用Brave,这对我们后面分析和理解Brave和SpringMVC等框架整合有帮助

首先Chapter1/servlet25项目中配置了FrontServlet和BackendServlet以及TracingFilter

web.xml

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">

<display-name>Servlet2.5 Application</display-name>

<filter>
<filter-name>TracingFilter</filter-name>
<filter-class>org.mozhu.zipkin.filter.BraveTracingFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>TracingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<servlet>
<servlet-name>BackendServlet</servlet-name>
<servlet-class>org.mozhu.zipkin.servlet.BackendServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>BackendServlet</servlet-name>
<url-pattern>/api</url-pattern>
</servlet-mapping>

<servlet>
<servlet-name>FrontendServlet</servlet-name>
<servlet-class>org.mozhu.zipkin.servlet.FrontendServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>FrontendServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>


TracingFilter

我们使用自定义的BraveTracingFilter作为入口,其init方法中,我们初始化了Tracing,然后创建HttpTracing对象,最后调用TracingFilter.create(httpTracing)创建了tracingFilter。

doFilter方法中,所有请求将被tracingFilter来处理

BraveTracingFilter

package org.mozhu.zipkin.filter;

import brave.Tracing;
import brave.context.log4j2.ThreadContextCurrentTraceContext;
import brave.http.HttpTracing;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import brave.servlet.TracingFilter;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;

import javax.servlet.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class BraveTracingFilter implements Filter {
Filter tracingFilter;

@Override
public void init(FilterConfig filterConfig) throws ServletException {
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

Tracing tracing = Tracing.newBuilder()
.localServiceName(System.getProperty("zipkin.service", "servlet25-demo"))
.spanReporter(asyncReporter)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.currentTraceContext(ThreadContextCurrentTraceContext.create())
.build();

HttpTracing httpTracing = HttpTracing.create(tracing);
filterConfig.getServletContext().setAttribute("TRACING", httpTracing);
tracingFilter = TracingFilter.create(httpTracing);
tracingFilter.init(filterConfig);
}

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
tracingFilter.doFilter(servletRequest, servletResponse, filterChain);
}

@Override
public void destroy() {
tracingFilter.destroy();
}

}


TracingFilter

TracingFilter在brave-instrumentation-servlet包中

public final class TracingFilter implements Filter {
public static Filter create(Tracing tracing) {
return new TracingFilter(HttpTracing.create(tracing));
}

public static Filter create(HttpTracing httpTracing) {
return new TracingFilter(httpTracing);
}

final ServletRuntime servlet = ServletRuntime.get();
final Tracer tracer;
final HttpServerHandler<HttpServletRequest, HttpServletResponse> handler;
final TraceContext.Extractor<HttpServletRequest> extractor;

TracingFilter(HttpTracing httpTracing) {
tracer = httpTracing.tracing().tracer();
handler = HttpServerHandler.create(httpTracing, new HttpServletAdapter());
extractor = httpTracing.tracing().propagation().extractor(HttpServletRequest::getHeader);
}
}


TracingFilter中几个重要的类

- HttpTracing - 包含Http处理相关的组件,clientParser,serverParser,clientSampler,serverSampler

- ServletRuntime - Servlet运行时类,包含根据环境来判断是否支持Servlet3异步调用等方法

- HttpServerHandler - Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成

- HttpServletAdapter - HttpServlet的适配器接口,此类的引入可以让httpServerHandler类变得更为通用,因为它是一个泛型接口,跟具体的request和response无关,能和更多框架进行整合

- TraceContext.Extractor - TraceContext的数据提取器

doFilter方法

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = servlet.httpResponse(response);

Span span = handler.handleReceive(extractor, httpRequest);
Throwable error = null;
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
chain.doFilter(httpRequest, httpResponse); // any downstream filters see Tracer.currentSpan
} catch (IOException | ServletException | RuntimeException | Error e) {
error = e;
throw e;
} finally {
if (servlet.isAsync(httpRequest)) { // we don't have the actual response, handle later
servlet.handleAsync(handler, httpRequest, span);
} else { // we have a synchronous response, so we can finish the span
handler.handleSend(httpResponse, error, span);
}
}
}


首先调用handler.handleReceive(extractor, httpRequest)从request中提取Span信息

然后调用tracer.withSpanInScope(span)将Span包装成Tracer.SpanInScope,而Tracer.SpanInScope和前面博文中分析的CurrentTraceContext.Scope比较像,都实现了Closeable接口,这里的目的也一样,都是为了利用JDK7的try-with-resources的特性,JVM会自动调用close方法,做一些线程对象的清理工作。其区别是后者是SPI(Service Provider Interface),不适合暴露给真正的使用者。

这样使得chain.doFilter(httpRequest, httpResponse)里的代码能用Tracer.currentSpan拿到从请求中提取(extract)的Span信息。

最后调用handler.handleSend(httpResponse, error, span)

下面来仔细分析下handler中handleReceive和handleSend两个方法

handleReceive方法

public Span handleReceive(TraceContext.Extractor<Req> extractor, Req request) {
return handleReceive(extractor, request, request);
}

public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
Span span = nextSpan(extractor.extract(carrier), request);
if (span.isNoop()) return span;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.SERVER);

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.request(adapter, request, span);
} finally {
ws.close();
}

boolean parsedEndpoint = false;
if (Platform.get().zipkinV1Present()) {
zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder().serviceName("");
if ((parsedEndpoint = adapter.parseClientAddress(request, deprecatedEndpoint))) {
span.remoteEndpoint(deprecatedEndpoint.build());
}
}
if (!parsedEndpoint) {
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
if (adapter.parseClientAddress(request, remoteEndpoint)) {
span.remoteEndpoint(remoteEndpoint.build());
}
}
return span.start();
}


首先调用nextSpan(extractor.extract(carrier), request)从request中提取TraceContextOrSamplingFlags,并创建Span,并将Span的kind类型设置为SERVER

然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。

最后为Span设置Endpoint信息,并调用start设置开始时间

handleSend方法

public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.response(adapter, response, error, span);
} finally {
ws.close();
span.finish();
}
}


handleSend比较简单,调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中

最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

接着看下nextSpan方法

Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
if (extracted.sampled() == null) { // Otherwise, try to make a new decision
extracted = extracted.sampled(sampler.trySample(adapter, request));
}
return extracted.context() != null
? tracer.joinSpan(extracted.context())
: tracer.nextSpan(extracted);
}


从请求里提取的对象extracted(TraceContextOrSamplingFlags),如果没有sampled信息,则由HttpSampler的trySample方法来决定是否采样

如果extracted中含有TraceContext信息,则由tracer调用joinSpan,加入已存在的trace,这种情况一般是客户端代码使用将trace信息放入header,而服务端收到请求后,则自动加入客户端发起的trace中,所以当backend的请求运行到这段代码,会joinSpan

如果extracted中不含TraceContext信息,则由tracer调用nextSpan,这种情况一般是我们用户发起的请求,比如浏览器发起,则请求header中肯定是没有trace信息的,所以当frontend的请求运行到这段代码,会新建一个span

joinSpan方法

public final Span joinSpan(TraceContext context) {
if (context == null) throw new NullPointerException("context == null");
if (!supportsJoin) return newChild(context);
// If we are joining a trace, we are sharing IDs with the caller
// If the sampled flag was left unset, we need to make the decision here
TraceContext.Builder builder = context.toBuilder();
if (context.sampled() == null) {
builder.sampled(sampler.isSampled(context.traceId()));
} else {
builder.shared(true);
}
return toSpan(builder.build());
}

public Span newChild(TraceContext parent) {
if (parent == null) throw new NullPointerException("parent == null");
return nextSpan(TraceContextOrSamplingFlags.create(parent));
}


在joinSpan方法中,会共享调用方的traceId,如果调用者没有传入sampled信息,则由服务端自己决定是否采样,即sampler.isSampled(context.traceId())

nextSpan方法

public Span nextSpan(TraceContextOrSamplingFlags extracted) {
TraceContext parent = extracted.context();
if (extracted.samplingFlags() != null) {
TraceContext implicitParent = currentTraceContext.get();
if (implicitParent == null) {
return toSpan(newRootContext(extracted.samplingFlags(), extracted.extra()));
}
// fall through, with an implicit parent, not an extracted one
parent = appendExtra(implicitParent, extracted.extra());
}
long nextId = Platform.get().randomLong();
if (parent != null) {
return toSpan(parent.toBuilder() // copies "extra" from the parent
.spanId(nextId)
.parentId(parent.spanId())
.shared(false)
.build());
}
TraceIdContext traceIdContext = extracted.traceIdContext();
if (extracted.traceIdContext() != null) {
Boolean sampled = traceIdContext.sampled();
if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
return toSpan(TraceContext.newBuilder()
.sampled(sampled)
.debug(traceIdContext.debug())
.traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
.spanId(nextId)
.extra(extracted.extra()).build());
}
// TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
throw new AssertionError("should not reach here");
}


在nextSpan方法中,首先找出合适的parent,当parent存在时,则新建一个child Span,否则返回new Span

到这里服务端接受到请求后,是如何记录Span信息的代码已经分析完毕,接下来我们看看作为客户端,我们是如何上报Span信息

FrontServlet

首先我们看到FrontServet中init方法里,我们初始化了OkHttpClient,并将TracingInterceptor拦截器添加到OkHttpClient的NetworkInterceptor拦截器栈中,然后还用CurrentTraceContext中的ExecutorService的包装方法,将Dispatcher中的ExecutorService包装后设置到OkHttpClient中。

package org.mozhu.zipkin.servlet;

import brave.http.HttpTracing;
import brave.okhttp3.TracingInterceptor;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

public class FrontendServlet extends HttpServlet {

private final static Logger LOGGER = LoggerFactory.getLogger(FrontendServlet.class);

private OkHttpClient client;

@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
HttpTracing httpTracing = (HttpTracing) config.getServletContext().getAttribute("TRACING");
client = new OkHttpClient.Builder()
.dispatcher(new Dispatcher(
httpTracing.tracing().currentTraceContext()
.executorService(new Dispatcher().executorService())
))
.addNetworkInterceptor(TracingInterceptor.create(httpTracing))
.build();
}

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
LOGGER.info("frontend receive request");
Request request = new Request.Builder()
.url("http://localhost:9000/api")
.build();

Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

PrintWriter writer = resp.getWriter();
writer.write(response.body().string());
writer.flush();
writer.close();
}

}


public final class TracingInterceptor implements Interceptor {
// ...

final Tracer tracer;
final String remoteServiceName;
final HttpClientHandler<Request, Response> handler;
final TraceContext.Injector<Request.Builder> injector;

TracingInterceptor(HttpTracing httpTracing) {
if (httpTracing == null) throw new NullPointerException("HttpTracing == null");
tracer = httpTracing.tracing().tracer();
remoteServiceName = httpTracing.serverName();
handler = HttpClientHandler.create(httpTracing, new HttpAdapter());
injector = httpTracing.tracing().propagation().injector(SETTER);
}
}


TracingInterceptor中依赖Tracer,TraceContext.Injector,HttpClientHandler,HttpAdapter。

- TraceContext.Injector - 将Trace信息注入到HTTP Request中,即放到Http headers中

- HttpClientHandler - 和HttpServerHandler对应,也是Http处理的核心组件,基本上所有和trace相关的操作均在此类中完成

- HttpAdapter - 能从Http request中获得各种数据,比如method,请求Path,header值等

@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Request.Builder requestBuilder = request.newBuilder();

Span span = handler.handleSend(injector, requestBuilder, request);
parseServerAddress(chain.connection(), span);
Response response = null;
Throwable error = null;
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
return response = chain.proceed(requestBuilder.build());
} catch (IOException | RuntimeException | Error e) {
error = e;
throw e;
} finally {
handler.handleReceive(response, error, span);
}
}


这里代码和TracingFilter中doFilter比较相似,是一个相反的过程

- 首先将trace信息注入到request中,并创建Span对象

- 然后调用chain.proceed(requestBuilder.build())来执行发送http请求

- 最后handler.handleReceive(response, error, span)

接下来看看HttpClientHandler的handleSend方法和handleReceive方法

handleSend方法

public Span handleSend(TraceContext.Injector<Req> injector, Req request, Span span) {
return handleSend(injector, request, request, span);
}

public <C> Span handleSend(TraceContext.Injector<C> injector, C carrier, Req request, Span span) {
injector.inject(span.context(), carrier);
if (span.isNoop()) return span;

// all of the parsing here occur before a timestamp is recorded on the span
span.kind(Span.Kind.CLIENT);

// Ensure user-code can read the current trace context
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.request(adapter, request, span);
} finally {
ws.close();
}

boolean parsedEndpoint = false;
if (Platform.get().zipkinV1Present()) {
zipkin.Endpoint.Builder deprecatedEndpoint = zipkin.Endpoint.builder()
.serviceName(serverNameSet ? serverName : "");
if ((parsedEndpoint = adapter.parseServerAddress(request, deprecatedEndpoint))) {
span.remoteEndpoint(deprecatedEndpoint.serviceName(serverName).build());
}
}
if (!parsedEndpoint) {
Endpoint.Builder remoteEndpoint = Endpoint.newBuilder().serviceName(serverName);
if (adapter.parseServerAddress(request, remoteEndpoint) || serverNameSet) {
span.remoteEndpoint(remoteEndpoint.build());
}
}
return span.start();
}


首先调用injector.inject(span.context(), carrier)将Trace信息注入request中,并将Span的kind类型设置为CLIENT

然后调用parser.request(adapter, request, span),将request的内容,将span的name改为request的method即GET或者POST,而且会将当前请求的路径以Tag(http.path)写入Span中,这样我们就能在Zipkin的UI界面中能清晰的看出某个Span是发起了什么请求。

最后为Span设置Endpoint信息,并调用start设置开始时间

handleReceive方法

public void handleReceive(@Nullable Resp response, @Nullable Throwable error, Span span) {
if (span.isNoop()) return;
Tracer.SpanInScope ws = tracer.withSpanInScope(span);
try {
parser.response(adapter, response, error, span);
} finally {
ws.close();
span.finish();
}
}


handleReceive比较简单,当客户端收到服务端的响应后handleReceive方法会被调用,即调用parser.response(adapter, response, error, span),会将HTTP状态码写入Span的Tag(http.status_code)中,如果有出错,则会将错误信息写入Tag(error)中

最后会调用Span的finish方法,而finish方法中,会调用Reporter的report方法将Span信息上报到Zipkin。

BackendServlet

最后看看BackendServlet,在收到请求后,将请求的header中参数user-name取出,添加到时间戳字符串尾部,并返回。

在上一篇博文中,我们看到如果我们向Frontend发送的请求中带有header user-name参数,Frontend会将这个值传递给Backend,然后backend会将它放到响应字符串中返回,以表明接收到该header。

package org.mozhu.zipkin.servlet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Date;

public class BackendServlet extends HttpServlet {

private final static Logger LOGGER = LoggerFactory.getLogger(BackendServlet.class);

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
LOGGER.info("backend receive request");
String username = req.getHeader("user-name");
String result;
if (username != null) {
result = new Date().toString() + " " + username;
} else {
result = new Date().toString();
}
PrintWriter writer = resp.getWriter();
writer.write(result);
writer.flush();
writer.close();
}

}


至此,我们已经分析完Brave是如何在普通的web项目中使用的,分析了TracingFilter拦截请求处理请求的逻辑,也分析了OkHttpClient是如何将Trace信息放入request中的。

后面博文中,我们还会继续分析Brave和Spring Web项目的整合方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息