当前位置:Java -> 嵌入式模式下的Debezium
在之前的一篇博客中,我们设置了一个Debezium服务器,从 PostgreSQL 数据库中读取事件。然后我们通过 Redis 流将这些更改流式传输到一个 Redis 实例中。
我们可能会有这样的印象,要运行 Debezium 我们需要在基础设施中运行两个额外的组件:
这并不总是这样,因为 Debezium 可以运行在嵌入式模式中。通过运行在嵌入式模式中,您可以使用Debezium直接从数据库的事务日志中读取。您可以自行决定如何处理检索到的条目。从事务日志中读取条目的过程可以驻留在任何 Java 应用程序上,因此无需独立部署。
除了减少组件的数量之外,另一个好处是我们可以在从数据库中读取条目时进行修改,并在我们的应用程序中采取行动。有时我们可能只需要一部分提供的功能。
让我们使用之前使用过的相同的 PostgreSQL 配置。
listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3
此外,我们将为我们想要专注的表创建一个初始化脚本。
#!/bin/bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
create schema test_schema;
create table test_schema.employee(
id SERIAL PRIMARY KEY,
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
email TEXT not null,
age INT NOT NULL,
salary real,
unique(email)
);
EOSQL
我们的 Docker Compose 文件将如下所示。
version: '3.1'
services:
postgres:
image: postgres
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- ./postgresql.conf:/etc/postgresql/postgresql.conf
- ./init:/docker-entrypoint-initdb.d
command:
- "-c"
- "config_file=/etc/postgresql/postgresql.conf"
ports:
- 5432:5432
我们创建的配置文件被挂载到了 PostgreSQL Docker 容器上。Docker Compose V2 现在具备了许多优秀的功能。只要我们运行 docker compose up
,就会启动一个具有模式和表的 Postgresql 服务器。并且该服务器将启用逻辑解码,Debezium 将能够通过事务日志跟踪该表的更改。我们有了继续构建我们的应用所需的一切。
首先,让我们添加所需的依赖项:
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.3.1.Final</version.debezium>
<logback-core.version>1.4.12</logback-core.version>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-jdbc</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-core.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback-core.version}</version>
</dependency>
</dependencies>
我们还需要创建 Debezium 嵌入式属性:
name=embedded-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.flush.interval.ms=60000
database.hostname=127.0.0.1
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
database.server.name==embedded-debezium
debezium.source.plugin.name=pgoutput
plugin.name=pgoutput
database.server.id=1234
topic.prefix=embedded-debezium
schema.include.list=test_schema
table.include.list=test_schema.employee
除了建立连接到 PostgreSQL 数据库外,我们还决定将偏移保存在文件中。通过使用 Debezium 中的偏移,我们可以跟踪我们在处理事件时所取得的进度。
在每次发生在表test_schema.employee
上的更改时,我们都将接收一个事件。一旦收到该事件,我们的代码库就应该处理它。为了处理这些事件,我们需要创建一个DebeziumEngine.ChangeConsumer
。ChangeConsumer 将消费所发出的事件。
package com.egkatzioura;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
@Override
public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
for(RecordChangeEvent<SourceRecord> record: records) {
System.out.println(record.record().toString());
}
}
}
每个传入的事件都将打印在控制台上。现在我们可以添加我们的主类,设置引擎。
package com.egkatzioura;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.ChangeEventFormat;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Application {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
try(final InputStream stream = Application.class.getClassLoader().getResourceAsStream("embedded_debezium.properties")) {
properties.load(stream);
}
properties.put("offset.storage.file.filename",new File("offset.dat").getAbsolutePath());
var engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(properties)
.notifying(new CustomChangeConsumer())
.build();
engine.run();
}
}
只要我们的应用程序和之前配置的 PostgreSQL 数据库都在运行,我们就可以开始插入数据。
docker exec -it debezium-embedded-postgres-1 psql postgres postgres
psql (15.3 (Debian 15.3-1.pgdg120+1))
Type "help" for help.
postgres=# insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
此外,我们可以在控制台中看到更改。
SourceRecord{sourcePartition={server=embedded-debezium}, sourceOffset={last_snapshot_record=true, lsn=22518160, txId=743, ts_usec=1705916606794160, snapshot=true}} ConnectRecord{topic='embedded-debezium.test_schema.employee', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{embedded-debezium.test_schema.employee.Key:STRUCT}, value=Struct{after=Struct{id=1,firstname=John,lastname=Doe 1,email=john1@doe.com,age=18,salary=1234.23},source=Struct{version=2.3.1.Final,connector=postgresql,name=embedded-debezium,ts_ms=1705916606794,snapshot=last,db=postgres,sequence=[null,"22518160"],schema=test_schema,table=employee,txId=743,lsn=22518160},op=r,ts_ms=1705916606890}, valueSchema=Schema{embedded-debezium.test_schema.employee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
我们做到了。我们成功地通过一个 Java 应用程序运行了 Debezium,而不需要运行独立的 Debezium 服务器或流式传输组件。您可以在GitHub上找到代码。
推荐阅读: 被裁员后如何维护自己的权益
本文链接: 嵌入式模式下的Debezium