当前位置:Java -> 实时从 Kafka 3.7.0 流式传输数据到 Flink 1.18.1 进行处理
在过去的几年中,Apache Kafka已经成为流数据领域的领先标准。快进到今天:Kafka已经普及,至少有80%的财富100强公司采用了它。这种广泛的采用归因于Kafka的架构,它远远超越了基本的消息传递。Kafka的架构多样性使得它在“互联网”规模下流数据处理非常适用,确保了容错性和数据一致性,对支持关键任务的应用至关重要。
Flink是一个高吞吐量的统一批处理和流处理引擎,以其处理大规模连续数据流的能力而闻名。它可以与Kafka无缝集成,并且对精确一次语义提供了强大的支持,确保每个事件都能在系统故障中精确处理一次。Flink成为Kafka的流处理器的自然选择。虽然Apache Flink作为实时数据处理工具取得了巨大成功和受欢迎,但要获得足够的资源和当前的例子来学习Flink可能会有挑战。
在本文中,我将逐步指导您将Kafka 2.13-3.7.0与Flink 1.18.1集成,以便在单节点集群中从一个主题中消费数据并在Flink中进行处理。Ubuntu-22.04 LTS是集群中使用的操作系统。
JAVA_HOME
环境变量。$ tar -xzf flink-1.18.1-bin-scala_2.12.tgz
解压缩 flink-1.18.1-bin-scala_2.12.tgz文件。成功解压后,将会创建一个 flink-1.18.1目录,请确保在其中有bin/
,conf/
和examples/
目录。bin
目录,并执行$ ./bin/start-cluster.sh
以启动单节点Flink集群。$ ./bin/stop-cluster.sh
来停止Flink集群。以下.jar包应包含在类路径/构建文件中:
我使用Eclipse IDE 23-12创建了一个基本的Java程序,用于在Flink中持续消费Kafka主题中的消息。使用Kafka内置的kafka-console-publisher
脚本发布了虚拟字符串消息到该主题。当消息到达Flink引擎时,对每条消息不会进行数据转换,而是简单地附加了一个额外的字符串并打印以进行验证,以确保消息不断地流入Flink。
package com.dataview.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.dataview.flink.util.IKafkaConstants;
public class readFromKafkaTopic {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
.setTopics(IKafkaConstants.FIRST_TOPIC_NAME)
.setGroupId(IKafkaConstants.GROUP_ID_CONFIG)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
see.execute();
}
}
整个执行过程都被录制了屏幕。如果感兴趣,可以在下面观看:
我希望您喜欢阅读本文。请继续关注接下来的文章,我将解释如何将消息/数据从Flink流向Kafka主题。
推荐阅读: 阿里巴巴面经(3)
本文链接: 实时从 Kafka 3.7.0 流式传输数据到 Flink 1.18.1 进行处理