当前位置:Java -> 嵌入式模式下的Debezium

嵌入式模式下的Debezium

在之前的一篇博客中,我们设置了一个Debezium服务器,从 PostgreSQL 数据库中读取事件。然后我们通过 Redis 流将这些更改流式传输到一个 Redis 实例中。

我们可能会有这样的印象,要运行 Debezium 我们需要在基础设施中运行两个额外的组件:

  • 独立的 Debezium 服务器实例。
  • 具有流式传输能力和各种集成功能的软件组件,如 Redis 或 Kafka。

这并不总是这样,因为 Debezium 可以运行在嵌入式模式中。通过运行在嵌入式模式中,您可以使用Debezium直接从数据库的事务日志中读取。您可以自行决定如何处理检索到的条目。从事务日志中读取条目的过程可以驻留在任何 Java 应用程序上,因此无需独立部署。

除了减少组件的数量之外,另一个好处是我们可以在从数据库中读取条目时进行修改,并在我们的应用程序中采取行动。有时我们可能只需要一部分提供的功能。

让我们使用之前使用过的相同的 PostgreSQL 配置。

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3


此外,我们将为我们想要专注的表创建一个初始化脚本。

#!/bin/bash

set -e

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL

create schema test_schema;
  create table test_schema.employee(
          id  SERIAL PRIMARY KEY,
          firstname   TEXT    NOT NULL,
          lastname    TEXT    NOT NULL,
          email       TEXT    not null,
          age         INT     NOT NULL,
          salary         real,
          unique(email)
      );

EOSQL


我们的 Docker Compose 文件将如下所示。

version: '3.1'
  
services:
 
  postgres:
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init:/docker-entrypoint-initdb.d
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432


我们创建的配置文件被挂载到了 PostgreSQL Docker 容器上。Docker Compose V2 现在具备了许多优秀的功能。只要我们运行 docker compose up,就会启动一个具有模式和表的 Postgresql 服务器。并且该服务器将启用逻辑解码,Debezium 将能够通过事务日志跟踪该表的更改。我们有了继续构建我们的应用所需的一切。

首先,让我们添加所需的依赖项:

<properties>
    <maven.compiler.source>17</maven.compiler.source>
    <maven.compiler.target>17</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <version.debezium>2.3.1.Final</version.debezium>
    <logback-core.version>1.4.12</logback-core.version>
</properties>
 
<dependencies>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-postgres</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-storage-jdbc</artifactId>
        <version>${version.debezium}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback-core.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>${logback-core.version}</version>
    </dependency>
</dependencies>


我们还需要创建 Debezium 嵌入式属性:

name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=127.0.0.1
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
database.server.name==embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=test_schema
table.include.list=test_schema.employee


除了建立连接到 PostgreSQL 数据库外,我们还决定将偏移保存在文件中。通过使用 Debezium 中的偏移,我们可以跟踪我们在处理事件时所取得的进度。

在每次发生在表test_schema.employee上的更改时,我们都将接收一个事件。一旦收到该事件,我们的代码库就应该处理它。为了处理这些事件,我们需要创建一个DebeziumEngine.ChangeConsumer。ChangeConsumer 将消费所发出的事件。

package com.egkatzioura;
 
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
 
import java.util.List;
 
public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
 
    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        for(RecordChangeEvent<SourceRecord> record: records) {
            System.out.println(record.record().toString());
        }
    }
 
}


每个传入的事件都将打印在控制台上。现在我们可以添加我们的主类,设置引擎。

package com.egkatzioura;
 
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;
 
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
 
public class Application {
 
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
 
        try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
            properties.load(stream);
        }
        properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());
 
        var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(properties)
                .notifying(new CustomChangeConsumer())
                .build();
        engine.run();
 
    }
 
}


只要我们的应用程序和之前配置的 PostgreSQL 数据库都在运行,我们就可以开始插入数据。

docker exec -it debezium-embedded-postgres-1 psql postgres postgres
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.
 
postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);


此外,我们可以在控制台中看到更改。

SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}


我们做到了。我们成功地通过一个 Java 应用程序运行了 Debezium,而不需要运行独立的 Debezium 服务器或流式传输组件。您可以在GitHub上找到代码。

推荐阅读: 被裁员后如何维护自己的权益

本文链接: 嵌入式模式下的Debezium