当前位置:Java -> Kafka 反序列化错误的及时处理

Kafka 反序列化错误的及时处理

事件驱动架构在许多组织的各种业务案例中已经成功使用了相当长的时间。它在性能、可伸缩性、可演进性和容错性方面表现出色,提供了很高的抽象度和弹性。这些优势使它们成为应用需要实时或几乎实时的反应性时的良好选择。

在实现方面,对于标准消息传递,ActiveMQ 和 RabbitMQ 是很好的候选,而对于数据流,像 Apache Kafka 和 Redpanda 这样的平台更加合适。通常,当开发人员和架构师需要选择这两个方向中的一个时,他们会从各个角度分析和衡量 - 消息负载、数据流动和使用、吞吐量和解决方案拓扑结构。由于围绕这些方面的讨论可能会变得太大和复杂,所以这不会作为本文的一部分进行精炼。

概念上,事件驱动架构至少涉及三个主要角色:消息生产者、消息代理和消息消费者。简而言之,目的是允许生产者和消费者以解耦的异步方式进行通信,这是通过前述的消息代理来实现的。在乐观的情况下,生产者创建一个消息,将其发布到代理拥有的主题上,消费者读取该消息,进行处理,然后礼貌地提供一份回应。当生产者将消息发送到主题时,消息将被生产者序列化(编组),而消费者在从主题接收消息时将其进行反序列化(解组)。

本文重点讨论了当消费者在反序列化收到的消息时遇到问题,并提供了进一步采取行动的方式。这些行动的几个示例可能包括构建默认消息或发送反馈给消息代理。开发人员有足够的创造力根据特定的实施用例决定这种行为。

设置

  • Java 21
  • Maven 3.9.2
  • Spring Boot – 版本 3.1.5
  • 在 Docker 中运行的 Redpanda 消息代理 – 镜像版本 23.2.15

Redpanda 是一个轻量级的消息代理,被选为此概念验证的工具,以便读者有机会尝试不同于广泛使用的 Kafka 的选型。由于它兼容 Kafka,如果从一个服务提供商转移到另一个服务提供商,生产者和消费者的开发和配置不需要任何更改。

根据 Redpanda 文档,Docker 支持仅应用于开发和测试。对于这个项目的目的来说,这已经足够了;因此,在 Docker 中设置了一个单独的 Redpanda 消息代理。

有关如何进行最小设置的详细信息,请参阅本文末尾的资源 1。

一旦启动并运行,就会创建一个名为 minifig 的主题,命令如下:

>docker exec -it redpanda-0 rpk topic create minifig
TOPIC    STATUS
minifig  OK


如果查看集群,可能会观察到已创建一个具有一个分区和一个副本的主题。

>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421
 
BROKERS
=======
ID      HOST        PORT
0*      redpanda-0  9092
 
TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
minifig             1           1


实施

流程非常简单:生产者向配置的主题发送请求,然后消费者读取请求。

请求代表了被以下记录简单地建模的迷你人物:

public record Minifig(String id,
                      Size size,
                      String name) {
 
    public Minifig(Size size, String name) {
        this(UUID.randomUUID().toString(), size, name);
    }
 
    public enum Size {
        SMALL, MEDIUM, BIG;
    }
}


id 是迷你人物的唯一标识符,具有特定的 name 和特定的 size – 小,中或大。

要配置生产者和消费者,至少需要这些属性(application.properties 文件):

# the path to the message broker
broker.url=localhost:19092
 
# the name of the broker topic
topic.minifig=minifig
 
# the unique string that identifies the consumer group of the consumer
topic.minifig.group.id=group-0


为了发送消息,生产者需要一个KafkaTemplate 实例。

@Configuration
public class KafkaConfig {
 
    @Value("${broker.url}")
    private String brokerUrl;
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
 
        return new KafkaTemplate<>(producerFactory);
    }
}


可以看到,生产者配置中选择了StringSerializer 来对负载值进行编组。通常情况下,JsonSerializer 提供了更强大的生产者-消费者约定。然而,这里的选择是有意的,是为了增加消费者端的实验灵活性(正如后面将看到的)。只是作为提醒,此概念验证的兴趣在于对遇到的反序列化错误采取行动。

一旦消息到达minifig主题,就会配置一个消费者来获取它们。

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${broker.url}")
    private String brokerUrl;
 
    @Value("${topic.minifig.group.id}")
    private String groupId;
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());
 
        DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props);
 
        ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}


KafkaListenerContainerFactory 接口负责为特定端点创建监听容器。配置类上的@EnableKafka 注解启用容器中任何 Spring 管理的 bean 上的@KafkaListener 注解的检测。因此,接下来要开发的是实际的监听器(即消息消费者)。 

@Component
public class MinifigListener {
 
    private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class);
 
    @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}")
    public void onReceive(@Payload Minifig minifig) {
        LOG.info("New minifig received - {}.", minifig);
    }
}


它的功能很简单。它只记录从主题minifig读取的消息,这些消息是为配置的消费者组而设定的。

如果应用程序已启动,且消息代理已经运行,监听器已经准备好接收消息。

INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig
INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null)
INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0
INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-0: partitions assigned: [minifig-0]


为了检查集成,使用了以下简单的测试。由于将由监听器接收一个Minifig,则为方便起见创建了一个符合模版。

@SpringBootTest
class AppTest {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Value("${topic.minifig}")
    private String topic;
 
    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";
 
    @Test
    void send_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man");
 
        kafkaTemplate.send(topic, minifig);
    }
}


运行测试时,会发送一个“符合”消息到代理,预期会被本地消费者成功接收。

INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man].


Redpanda 控制台可以帮助观察经纪人级别发生的情况,特别是minifig主题中发生的情况。 

Redpanda控制台:代理级别

在上述场景中,消息通过消息代理从生产者发送到消费者,如计划的那样。

在反序列化失败时恢复

在这个概念验证的特殊情况下,假设小型图形的类型可以是SMALL、MEDIUM或BIG,符合定义的Type enum。假如生产者发送了一个未知类型的小型图形,即与约定不完全相符的类型,那么这些消息基本上将被监听器拒绝,因为负载无法进行反序列化。

为了模拟这种情况,运行以下测试。

@SpringBootTest
class AppTest {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    @Value("${topic.minifig}")
    private String topic;
 
    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";
     
    @Test
    void send_non_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), "Unknown", "Spider-Man");
 
        kafkaTemplate.send(topic, minifig);
    }
}


消息到达主题,但未到达MinifigListener#onReceive()方法。正如预期的那样,当正在对负载进行解组时出现了错误。可以通过深入查看堆栈跟踪来揭示出错原因。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data  from topic [minifig]
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])


另一个方面是消息在消费者端持续尝试读取。这对于消费者来说至少是不幸的,因为日志不断积累。

为了解决这种情况,用于解组负载值的JsonDeserializer被装饰成为其实际委托的ErrorHandlingDeserializer。此外,ErrorHandlingDeserializer具有failedDeserializationFunction成员,根据其JavaDoc,在反序列化失败时提供了一种替代机制。

新的消费者配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());
 
    JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class);
 
    ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
    valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction());
 
    DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(), valueDeserializer);
 
    ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(defaultFactory);
    factory.setCommonErrorHandler(new DefaultErrorHandler());
    return factory;
}


在此处使用的failedDeserializationFunction 是简单的,但原因是为了证明其实用性。

public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> {
 
    private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class);
 
    @Override
    public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception exception = failedDeserializationInfo.getException();
        LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage());
        return new Minifig("Default");
    }
}


FailedDeserializationInfo实体(Function#apply()输入)是在从反序列化异常中进行恢复时构造的,它封装了各种信息(这里异常就是其中的一种)。由于apply()方法的输出是实际的反序列化结果,因此可以根据期望的行为返回null或其他适当的值。

如果再次运行send_non_compliant()测试,将处理反序列化异常并返回默认值。此外,将调用MinifigListener并有机会处理它。

INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].


结论

配置Kafka生产者和消费者,并对它们进行微调,以便根据使用的消息代理实现所需的性能并不总是直接了当的。对通信的每一步进行控制无疑是一件值得期望的事情,而且在未知情况下迅速行动有助于提供健壮且易于维护的解决方案。本文重点介绍了在Kafka消费者级别可能出现的反序列化问题,并提供了在处理不符合的负载时有备选方案。

示例代码

资源

  1. Redpanda快速入门
  2. Spring for Apache Kafka
  3. 图片来源:摄于罗马尼亚布拉索夫动物园

推荐阅读: 9.什么情况会发生栈内存溢出

本文链接: Kafka 反序列化错误的及时处理