当前位置:Java -> 改进我的OpenTelemetry跟踪演示

改进我的OpenTelemetry跟踪演示

去年,我写了一篇关于OpenTelemetry跟踪的文章,以更深入了解这个主题。我还围绕此创建了一个演示,其中包括以下组件:

  • Apache APISIX API 网关
  • Kotlin/Spring Boot 服务
  • Python/Flask 服务
  • Rust/Axum 服务

最近,我改进了演示以加深我的理解,并想分享我的学习。

使用常规数据库

在最初的演示中,我没有使用常规数据库。取而代之的是:

  • Kotlin服务使用嵌入式Java H2数据库
  • Python服务使用嵌入式SQLite
  • Rust服务在哈希映射中使用硬编码数据

我用常规的PostgreSQL数据库替换了所有这些,每个都有专用的架构。

OpenTelemetry代理在连接到JVM和Python数据库时添加了新的span。对于JVM,当使用Java代理时,它是自动的。在Python中,需要安装相关软件包 - 见下一节。

Python库中的OpenTelemetry集成

Python要求您显式添加用于OpenTelemetry的特定库的软件包。例如,演示中使用了Flask,因此我们应该添加Flask集成包。但是,这可能变得相当繁琐。

然而,一旦安装了opentelemetry-distro,您可以“嗅探”已安装的软件包并安装相关集成。

pip install opentelemetry-distro

opentelemetry-bootstrap -a install


对于演示,它安装了以下软件包:

opentelemetry_instrumentation-0.41b0.dist-info
opentelemetry_instrumentation_aws_lambda-0.41b0.dist-info
opentelemetry_instrumentation_dbapi-0.41b0.dist-info
opentelemetry_instrumentation_flask-0.41b0.dist-info
opentelemetry_instrumentation_grpc-0.41b0.dist-info
opentelemetry_instrumentation_jinja2-0.41b0.dist-info
opentelemetry_instrumentation_logging-0.41b0.dist-info
opentelemetry_instrumentation_requests-0.41b0.dist-info
opentelemetry_instrumentation_sqlalchemy-0.41b0.dist-info
opentelemetry_instrumentation_sqlite3-0.41b0.dist-info
opentelemetry_instrumentation_urllib-0.41b0.dist-info
opentelemetry_instrumentation_urllib3-0.41b0.dist-info
opentelemetry_instrumentation_wsgi-0.41b0.dist-info


上述设置为连接添加了一个新的自动化跟踪。

connect

在Flask上使用Gunicorn

每次启动Flask服务时,都会显示一个红色警告,指出不应在生产中使用。虽然这与OpenTelemetry无关,虽然没有人抱怨,但我不太喜欢。因此,我添加了一个“真正”的HTTP服务器。我选择了Gunicorn,只是因为我对Python生态系统的了解仍然较浅。

此服务器是运行时关注点。我们只需略微更改Dockerfile

RUN pip install gunicorn

ENTRYPOINT ["opentelemetry-instrument", "gunicorn", "-b", "0.0.0.0", "-w", "4", "app:app"]


  • -b选项指的是绑定;您可以绑定到特定IP。由于我在运行Docker,不知道IP,因此我绑定到任意IP。
  • -w选项指定工作进程的数量
  • 最后,app:app参数设置模块和应用程序,由冒号分隔

使用Heredocs取胜

如果您经常编写Dockerfile,则可能会从中受益。

每个Docker层都有存储成本。因此,在Dockerfile内部,人们往往会避免不必要的层。例如,以下两个代码片段产生相同的结果。

RUN pip install pip-tools 
RUN pip-compile
RUN pip install -r requirements.txt
RUN pip install gunicorn
RUN opentelemetry-bootstrap -a install

RUN pip install pip-tools \
  && pip-compile \
  && pip install -r requirements.txt \
  && pip install gunicorn \
  && opentelemetry-bootstrap -a install


第一个代码片段创建了五个层,而第二个只有一个;但是第一个比第二个更可读。使用heredocs,我们可以访问一个更可读的语法,以创建单个层:

RUN <<EOF

  pip install pip-tools 
  pip-compile
  pip install -r requirements.txt
  pip install gunicorn
  opentelemetry-bootstrap -a install

EOF


Heredocs是使Dockerfile更可读和更优化的好方法。试一试吧!

JVM上的显式API调用

在最初的演示中,我展示了两种方法:

  • 第一种使用自动调谐,不需要额外操作
  • 第二种使用Spring注解进行手动调谐

我想在改进后的版本中展示一个显式调用API的示例。用例是分析和使用消息队列:我从HTTP调用中获取跟踪数据,并创建一个包含这些数据的消息,以便订阅者可以将其用作父项。

首先,我们需要向项目添加OpenTelemetry API依赖项。我们继承于Spring Boot Starter父POM的版本:

<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-api</artifactId>
</dependency>


到了这一步,我们就可以访问API了。OpenTelemetry提供了一个获取实例的静态方法:

val otel = GlobalOpenTelemetry.get()


转而,流程是这样的:

val otel = GlobalOpenTelemetry.get()                                   //1
val tracer = otel.tracerBuilder("ch.frankel.catalog").build()          //2
val span = tracer.spanBuilder("AnalyticsFilter.filter")                //3
                 .setParent(Context.current())                         //4
                 .startSpan()                                          //5
// Do something here
span.end()                                                             //6


  1. 获取底层的OpenTelemetry
  2. 获取跟踪器构建器并"构建"跟踪器
  3. 获取跨度构建器
  4. 将跨度添加到整个链中
  5. 启动跨度
  6. 结束跨度;在此步骤之后,将数据发送到配置的OpenTelemetry端点

添加消息队列

当我基于这篇文章做了演讲时,与会者经常问是否OpenTelemetry能够与消息队列(如MQ或Kafka)一起工作。虽然我理论上认为是可以的,但我想要确保:我在演示中添加了一个消息队列,假装是分析功能。

Kotlin服务将在每个请求上向MQTT主题发布一条消息。NodeJS服务将订阅该主题。

将OpenTelemetry数据附加到消息

到目前为止,OpenTelemetry会自动读取上下文以查找跟踪ID和父跨度ID。无论采用何种方式,自动检测还是手动,基于注解还是显式,库都会自动处理。我没有发现任何现有类似的消息自动化;我们需要编写代码来实现。OpenTelemetry的核心是traceparent HTTP头。我们需要读取它并与消息一起发送。

首先,让我们将MQTT API添加到项目中。

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>


traceparent。但是,我们可以通过SpanContext类重新构建它。

我正在使用MQTT v5作为我的消息代理。请注意,v5允许将元数据附加到消息;使用v3时,消息本身需要包装它们。

val spanContext = span.spanContext                                                //1
val message = MqttMessage().apply {

  properties = MqttProperties().apply {
    val traceparent = "00-${spanContext.traceId}-${spanContext.spanId}-${spanContext.traceFlags}" //2
    userProperties = listOf(UserProperty("traceparent", traceparent))             //3
  }
  qos = options.qos
  isRetained = options.retained

  val hostAddress = req.remoteAddress().map { it.address.hostAddress }.getOrNull()
  payload = Json.encodeToString(Payload(req.path(), hostAddress)).toByteArray()   //4
}
val client = MqttClient(mqtt.serverUri, mqtt.clientId)                            //5
client.publish(mqtt.options, message)                                             //6


  1. 获取跨度上下文
  2. 根据W3C追踪上下文规范从跨度上下文构建traceparent
  3. 设置消息元数据
  4. 设置消息正文
  5. 创建客户端
  6. 发布消息

从消息中获取OpenTelemetry数据

订阅者是基于NodeJS的一个新组件。

首先,我们配置应用程序使用OpenTelemetry跟踪导出器:

const sdk = new NodeSDK({
  resource: new Resource({[SemanticResourceAttributes.SERVICE_NAME]: 'analytics'}),
  traceExporter: new OTLPTraceExporter({
    url: `${collectorUri}/v1/traces`
  })
})

sdk.start()


下一步是读取元数据,根据traceparent重新创建上下文,并创建一个跨度。

client.on('message', (aTopic, payload, packet) => {
  if (aTopic === topic) {

    console.log('Received new message')

    const data = JSON.parse(payload.toString())

    const userProperties = {}
    if (packet.properties['userProperties']) {                                  //1
      const props = packet.properties['userProperties']
      for (const key of Object.keys(props)) {
        userProperties[key] = props[key]
      }
    }

    const activeContext = propagation.extract(context.active(), userProperties) //2
    const tracer = trace.getTracer('analytics')
    const span = tracer.startSpan(                                              //3
      'Read message',
      {attributes: {path: data['path'], clientIp: data['clientIp']}},
      activeContext,
    )
    span.end()                                                                  //4
  }
})


  1. 读取元数据
  2. 根据traceparent重新创建上下文
  3. 创建跨度
  4. 结束跨度

用于消息传递的Apache APISIX

apisix:
  proxy_mode: http&stream                                                       #1
  stream_proxy:
    tcp:
      - addr: 9100                                                              #2
        tls: false


  1. 为两种模式配置APISIX
  2. 设置TCP端口
upstreams:
  - id: 4
    nodes:
      "mosquitto:1883": 1                                                       #1


stream_routes:                                                                  #2
  - id: 1
    upstream_id: 4
    plugins:
      mqtt-proxy:                                                               #3
        protocol_name: MQTT
        protocol_level: 5                                                       #4


  1. 将MQTT队列定义为上游
  2. 定义“流式”路由。APISIX将一切都不是HTTP的定义为流式
  3. 使用MQTT代理。请注意APISIX提供了基于Kafka的代理
  4. 指定MQTT版本。对于大于3的版本,应该是5

结论

GitHub上找到。

推荐阅读: 8.路由器和交换机的区别?

本文链接: 改进我的OpenTelemetry跟踪演示