当前位置:Java -> 在线程间保留上下文

在线程间保留上下文

在构建一个大型的生产级无状态的微服务架构时,我们总是会遇到一个常见的挑战,即跨服务和线程保留请求上下文,包括传播上下文到子线程。

什么是上下文传播?

上下文传播意味着在分布式系统中传递跨不同组件或服务的上下文信息或状态,其中应用程序通常由运行在不同机器或容器上的多个服务组成。这些服务需要进行通信和协作,以完成用户请求或执行业务流程。

在这样的分布式系统中,上下文传播变得至关重要,以确保有关特定交易或操作的相关信息在它穿过不同服务时被携带。这个上下文可能包括数据,例如:

  • 用户身份验证详情
  • 请求标识符
  • 分布式跟踪信息
  • 其他元数据(有助于理解请求的状态和来源)

上下文传播的关键方面包括:

  • 请求上下文:当用户发起请求时,通常会触发跨多个服务的一系列交互。初始请求的上下文,包括用户身份、请求时间戳和唯一标识等相关信息,需要传播以确保一致的行为和跟踪。
  • 分布式跟踪和日志:上下文传播与分布式跟踪和日志机制密切相关。通过传播上下文信息,更容易追踪请求在各个服务之间的流动,有助于调试、性能分析和监控。
  • 一致性:在服务之间保持一致的上下文对于确保每个参与处理请求的服务都拥有执行其任务所需的必要信息至关重要。这有助于避免不一致性,并确保分布式系统的一致行为。
  • 中间件和框架支持:许多中间件和框架都提供了对上下文传播的内置支持。例如,在微服务架构中,像Spring Cloud、Istio或Zipkin等框架提供了管理和无缝传播上下文的工具。
  • 无状态性:在无状态架构中,上下文传播尤其重要,其中每个服务应独立运行,不依赖共享状态。上下文有助于提供处理请求所需的必要信息,而无需存储持久状态。

有效的上下文传播通过提供交易状态在不同服务中移动时的统一视图,有助于分布式系统的整体可靠性、可观察性和可维护性。它也有助于减少代码。

用例

假设您正在构建基于Springboot Webflux的微服务/应用程序,并且您需要确保用户状态(会话标识符、请求标识符、登录状态等)和客户端(设备类型、客户端IP等)在发起请求时传递到服务之间。

挑战

  • 服务间调用:对于内部的服务间调用,上下文传播不会自动发生。
  • 在类内传播上下文:在服务和/或辅助类内引用上下文时,您需要通过方法参数显式传递。可以通过创建一个类和一个存储上下文的静态方法解决这个问题。
  • Java Stream操作:由于Java流函数在独立的执行线程中运行,需要通过ThreadLocal明确将上下文传播到子线程。
  • Webflux:类似于Java流函数,Webflux中的上下文传播需要通过反应式钩子来处理。

这里的想法是如何确保上下文传播在子线程和通过反应式web客户端调用的内部服务中自动发生。类似的模式也可以在非响应式代码中实现。

解决方案

核心Java提供了两个类,ThreadLocal和InheritableThreadLocal,用于存储线程范围的值。

  • ThreadLocal允许创建对线程局部的变量,确保每个线程都有自己的变量副本。
  • ThreadLocal的一个限制是,如果在另一个线程的范围内生成了一个新的线程,子线程不会从其父线程继承ThreadLocal变量的值。


public class ExampleThreadLocal {

    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();



    public static void main(String[] args) {

        threadLocal.set("Main Thread Value");



        new Thread(() -> {

            System.out.println("Child Thread: " + threadLocal.get()); // Outputs: Child Thread: null

        }).start();



        System.out.println("Main Thread: " + threadLocal.get()); // Outputs: Main Thread: Main Thread Value

    }

}


另一方面;

  • InheritableThreadLocal扩展了ThreadLocal,并提供了子线程从父线程继承值的能力。
public class ExampleInheritableThreadLocal {

    private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();



    public static void main(String[] args) {

        inheritableThreadLocal.set("Main Thread Value");



        new Thread(() -> {

            System.out.println("Child Thread: " + inheritableThreadLocal.get()); // Outputs: Child Thread: Main Thread Value

        }).start();



        System.out.println("Main Thread: " + inheritableThreadLocal.get()); // Outputs: Main Thread: Main Thread Value

    }

}


因此,在需要确保上下文在父线程和子线程之间传播的场景中,我们可以使用应用程序范围的静态InheritableThreadLocal变量来保存上下文,并在需要的地方获取它。

@Getter
@ToString
@Builder
public class RequestContext {

  private String sessionId;
  private String correlationId;
  private String userStatus;
  private String channel;
}


public class ContextAdapter {

  final ThreadLocal<RequestContext> threadLocal = new InheritableThreadLocal<>();

  public RequestContext getCurrentContext() {
    return threadLocal.get();
  }

  public void setContext(tRequestContext requestContext) {
    threadLocal.set(requestContext);
  }

  public void clear() {
    threadLocal.remove();
  }
}



public final class Context {
  static ContextAdapter contextAdapter;

  private Context() {}

  static {
    contextAdapter = new ContextAdapter();
  }

  public static void clear() {
    if (contextAdapter == null) {
      throw new IllegalStateException();
    }
    contextAdapter.clear();
  }

  public static RequestContext getContext() {
    if (contextAdapter == null) {
      throw new IllegalStateException();
    }
    return contextAdapter.getCurrentContext();
  }

  public static void setContext(RequestContext requestContext) {
    if (cContextAdapter == null) {
      throw new IllegalStateException();
    }
    contextAdapter.setContext(requestContext);
  }

  public static ContextAdapter getContextAdapter() {
    return contextAdapter;
  }
}


然后,我们可以通过调用静态方法在代码中任何需要的地方引用上下文。

Context.getContext()


这解决了:

  • 在类内传播上下文。
  • Java Stream操作
  • Webflux

为了确保上下文自动传播到通过webclient的外部调用,我们可以创建一个自定义的ExchangeFilterFunction来从Context.getContext() 读取上下文,然后根据需要将上下文添加到标头或查询参数。

public class HeaderExchange implements ExchangeFilterFunction {

  @Override
  public Mono<ClientResponse> filter(
      ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
      return Mono.deferContextual(Mono::just)
        .flatMap(
            context -> {
              RequestContext  currentContext = Context.getContext();
              ClientRequest newRequest = ClientRequest.from(clientRequest)
                        .headers(httpHeaders ->{
                          httpHeaders.add("context-session-id",currentContext.getSessionId() );
                          httpHeaders.add("context-correlation-id",currentContext.getCorrelationId() );
                        }).build();

              return exchangeFunction.exchange(newRequest);
            });
  }
}


作为WebFilter的一部分初始化上下文。


@Slf4j
@Component
public class RequestContextFilter implements WebFilter {


  @Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

        String sessionId =  exchange.getRequest().getHeaders().getFirst("context-session-id");
        String correlationId =  exchange.getRequest().getHeaders().getFirst("context-correlation-id");


        RequestContext requestContext = RequestContext.builder().sessionId(sessionId).correlationId(correlationId).build()

        Context.setContext(requestContext);


        return chain.filter(exchange);
  }
}


推荐阅读: 62.什么是线程池?为什么使用线程池?

本文链接: 在线程间保留上下文