当前位置:Java -> 实时从 Kafka 3.7.0 流式传输数据到 Flink 1.18.1 进行处理

实时从 Kafka 3.7.0 流式传输数据到 Flink 1.18.1 进行处理

在过去的几年中,Apache Kafka已经成为流数据领域的领先标准。快进到今天:Kafka已经普及,至少有80%的财富100强公司采用了它。这种广泛的采用归因于Kafka的架构,它远远超越了基本的消息传递。Kafka的架构多样性使得它在“互联网”规模下流数据处理非常适用,确保了容错性和数据一致性,对支持关键任务的应用至关重要。

Flink是一个高吞吐量的统一批处理和流处理引擎,以其处理大规模连续数据流的能力而闻名。它可以与Kafka无缝集成,并且对精确一次语义提供了强大的支持,确保每个事件都能在系统故障中精确处理一次。Flink成为Kafka的流处理器的自然选择。虽然Apache Flink作为实时数据处理工具取得了巨大成功和受欢迎,但要获得足够的资源和当前的例子来学习Flink可能会有挑战。

在本文中,我将逐步指导您将Kafka 2.13-3.7.0与Flink 1.18.1集成,以便在单节点集群中从一个主题中消费数据并在Flink中进行处理。Ubuntu-22.04 LTS是集群中使用的操作系统。

假设

  • 系统至少拥有8GB RAM和250GB SSD,以及Ubuntu-22.04.2 amd64作为操作系统。
  • 已安装了OpenJDK 11,并配置了JAVA_HOME环境变量。
  • 系统上有Python 3或Python 2以及Perl 5。
  • 已经建立并运行了单节点的Apache Kafka-3.7.0集群,附带了Apache Zookeeper-3.5.6。(请阅读这里如何建立Kafka集群)。

安装和启动Flink 1.18.1

  • 可以从这里下载Flink-1.18.1的二进制分发包。
  • 在终端上使用$ tar -xzf flink-1.18.1-bin-scala_2.12.tgz解压缩 flink-1.18.1-bin-scala_2.12.tgz文件。成功解压后,将会创建一个 flink-1.18.1目录,请确保在其中有bin/conf/examples/目录。
  • 通过终端进入bin目录,并执行$ ./bin/start-cluster.sh以启动单节点Flink集群。
  • 此外,我们可以使用Flink的web UI来监视集群的状态和运行的作业,通过在端口8081打开浏览器访问。

Flink's web UI: monitoring the status of the cluster and running jobs

  • 可以通过执行$ ./bin/stop-cluster.sh来停止Flink集群。

依赖的JAR包列表

以下.jar包应包含在类路径/构建文件中:

.jars to be included in the classpath/build file

我使用Eclipse IDE 23-12创建了一个基本的Java程序,用于在Flink中持续消费Kafka主题中的消息。使用Kafka内置的kafka-console-publisher脚本发布了虚拟字符串消息到该主题。当消息到达Flink引擎时,对每条消息不会进行数据转换,而是简单地附加了一个额外的字符串并打印以进行验证,以确保消息不断地流入Flink。

package com.dataview.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.dataview.flink.util.IKafkaConstants;


public class readFromKafkaTopic {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
		KafkaSource<String> source = KafkaSource.<String>builder()
			    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
			    .setTopics(IKafkaConstants.FIRST_TOPIC_NAME)
			    .setGroupId(IKafkaConstants.GROUP_ID_CONFIG)
			    .setStartingOffsets(OffsetsInitializer.earliest())
			    .setValueOnlyDeserializer(new SimpleStringSchema())
			    .build();
		DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
		messageStream.rebalance().map(new MapFunction<String, String>() {
			private static final long serialVersionUID = -6867736771747690202L;

			@Override
			public String map(String value) throws Exception {
				return "Kafka and Flink says: " + value;
			}
		}).print();

		see.execute();
	}

}


整个执行过程都被录制了屏幕。如果感兴趣,可以在下面观看:

我希望您喜欢阅读本文。请继续关注接下来的文章,我将解释如何将消息/数据从Flink流向Kafka主题。

推荐阅读: 阿里巴巴面经(3)

本文链接: 实时从 Kafka 3.7.0 流式传输数据到 Flink 1.18.1 进行处理