当前位置:Java -> 从Apache Flink实时传输数据到Kafka再到Druid进行分析和决策

从Apache Flink实时传输数据到Kafka再到Druid进行分析和决策

通过使用实时分析,企业可以快速有效地对用户行为模式做出反应。这使它们能够抓住可能会错过的机会,并防止问题进一步恶化。

流行的事件流平台Apache Kafka可以用于从各种来源(如物联网、金融交易、库存等)实时摄取数据/事件。然后,这些数据可以流入多个下游应用程序或引擎进行进一步处理,最终进行分析以支持决策。

Apache Flink作为一个强大的引擎,通过在到达Kafka主题时修改、丰富或重构流数据,对其进行精细化或增强处理。本质上,Flink充当一个下游应用程序,持续消耗来自Kafka主题的数据流进行处理,然后将处理后的数据摄入到各种Kafka主题中。最终,可以集成Apache Druid来消费来自Kafka主题的处理后的流数据进行分析、查询和实时业务决策。

从Apache Flink到Kafka到Druid的实时数据传输进行分析/决策

点击此处查看放大视图

我之前的文章中,我解释了如何将Flink 1.18与Kafka 3.7.0集成。在本文中,我将概述将处理后的数据从Flink 1.18.1传输到Kafka 2.13-3.7.0主题的步骤。几个月前,我们发表了一篇详细介绍从Kafka主题摄取流数据到Apache Druid进行分析和查询的文章。您可以在这里阅读。

执行环境

  • 我们配置了一个多节点集群(三个节点),每个节点的最低配置为8 GB RAM和250 GB SSD,操作系统为Ubuntu-22.04.2 amd64。
  • 每个节点上都安装了OpenJDK 11,并配置了JAVA_HOME环境变量。
  • 每个节点上都提供了Python 3或Python 2以及Perl 5。
  • 一个三节点的Apache Kafka-3.7.0集群正在运行,其中两个节点上安装了Apache Zookeeper-3.5.6。
  • Apache Druid 29.0.0已经安装并配置在集群中的一个节点上,该节点未安装Kafka代理的Zookeeper。另外两个节点上安装并配置了Zookeeper。Leader代理在运行Druid的节点上。
  • 使用Datafaker库开发了一个模拟器,以每10秒的间隔产生实时的假金融交易JSON记录,并将其发布到创建的Kafka主题上。以下是模拟器生成的JSON数据源。
    {"timestamp":"2024-03-14T04:31:09Z ","upiID":"9972342663@ybl","name":"Kiran Marar","note":" ","amount":"14582.00","currency":"INR","geoLocation":"Latitude: 54.1841745 Longitude: 13.1060775","deviceOS":"IOS","targetApp":"PhonePe","merchantTransactionId":"ebd03de9176201455419cce11bbfed157a","merchantUserId":"65107454076524@ybl"}


  • 在未运行Druid和Kafka领导代理的节点上提取Apache Flink-1.18.1-bin-scala_2.12.tgz的存档。

在Flink中运行流作业

我们将深入研究从Kafka主题提取数据的过程,其中来自模拟器的传入消息正在发布,对其执行处理任务,然后将处理后的数据重新集成到多节点Kafka集群的不同主题中。

我们开发了一个Java程序(StreamingToFlinkJob.java),将其作为作业提交给Flink,执行上述步骤,考虑到一个窗口为2分钟,并计算从模拟UPI交易数据流的相同手机号码(upi id)中交易的平均金额。项目构建或类路径中包含了以下jar文件列表。 

Jar文件目录

使用下面的代码,我们可以在开发的Java类中获取Flink执行环境。 

Configuration conf = new Configuration();
  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);


现在我们应该在Java程序内部读取消息/流,这些消息/流已经由模拟器发布到Kafka主题。以下是代码块。 

KafkaSource kafkaSource = KafkaSource.<UPITransaction>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)// IP Address with port 9092 where leader broker is running in cluster
    .setTopics(IKafkaConstants.INPUT_UPITransaction_TOPIC_NAME)
    .setGroupId("upigroup")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new KafkaUPISchema())
    .build();


为了从Kafka中检索信息,在Flink中设置一个反序列化模式对于处理JSON格式的事件非常重要,将原始数据转换为结构化形式。重要的是,需要将setParallelism设置为Kafka主题分区的数量,否则水印对于源不起作用,数据不会释放到汇流处。

DataStream<UPITransaction> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)), "Kafka Source").setParallelism(1);


成功从Kafka检索事件后,我们可以通过加入处理步骤来增强流作业。下面的代码段读取Kafka数据,按手机号码(upiID)组织数据,并计算每个手机号码的平均价格。为了实现这一点,我们开发了一个自定义的窗口函数来计算平均值,并实现了水印处理以有效处理事件时间语义。以下是代码片段:

SerializableTimestampAssigner<UPITransaction> sz = new SerializableTimestampAssigner<UPITransaction>() {
   @Override
   public long extractTimestamp(UPITransaction transaction, long l) {
    try {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
     Date date = sdf.parse(transaction.eventTime);
     return date.getTime();
    } catch (Exception e) {
     return 0;
    }
   }
  };
 
WatermarkStrategy<UPITransaction> watermarkStrategy = WatermarkStrategy.<UPITransaction>forBoundedOutOfOrderness(Duration.ofMillis(100)).withTimestampAssigner(sz);
DataStream<UPITransaction> watermarkDataStream = stream.assignTimestampsAndWatermarks(watermarkStrategy);
 
//Instead of event time, we can use window based on processing time. Using TumblingProcessingTimeWindows  
DataStream<TransactionAgg> groupedData = watermarkDataStream.keyBy("upiId").window(TumblingEventTimeWindows.of(Time.milliseconds(2500),
       Time.milliseconds(500))).sum("amount"); 
          .apply(new TransactionAgg());


最终,基于交易流的连续流,处理逻辑(根据移动号码为2分钟窗口内相同的UPI ID计算平均价格)在Flink内执行。以下是用于在每个UPI ID或移动号码上计算平均金额的窗口函数的代码块。

public class TransactionAgg
   implements WindowFunction<UPITransaction, TransactionAgg, Tuple, TimeWindow> {

  @Override
  public void apply(Tuple key, TimeWindow window, Iterable<UPITransaction> values, Collector<TransactionAgg> out) {
   Integer sum = 0; //Consider whole number
   int count = 0;
   String upiID = null ;
   for (UPITransaction value : values) {
    sum += value.amount;
    upiID = value.upiID;
    count++;
   }

   TransactionAgg output = new TransactionAgg();
   output.upiID = upiID;
   output.eventTime = window.getEnd();
   output.avgAmount = (sum / count);
   out.collect( output);
  }

 }


我们已经处理了数据。下一步是将对象序列化并将其发送到不同的Kafka主题。在开发的Java代码(StreamingToFlinkJob.java)中添加一个KafkaSink,将从Flink引擎发送处理过的数据到在多节点Kafka集群上创建的不同Kafka主题。以下是在将对象发送/发布到Kafka主题之前对对象进行序列化的代码片段:

public class KafkaTrasactionSinkSchema implements KafkaRecordSerializationSchema<TransactionAgg> {

   
    @Override
    public ProducerRecord<byte[], byte[]> serialize(
            TransactionAgg aggTransaction, KafkaSinkContext context, Long timestamp) {
        try {
            return new ProducerRecord<>(
                    topic,
                    null, // not specified  partition so setting null
                    aggTransaction.eventTime,
                    aggTransaction.upiID.getBytes(),
                    objectMapper.writeValueAsBytes(aggTransaction));
        } catch (Exception e) {
            throw new IllegalArgumentException(
                    "Exception on serialize record: " + aggTransaction, e);
        }
       
    }
}


以下是将处理过的数据发送回不同的Kafka主题的代码块。

KafkaSink<TransactionAgg> sink = KafkaSink.<TransactionAgg>builder()
    .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)
    .setRecordSerializer(new KafkaTrasactionSinkSchema(IKafkaConstants.OUTPUT_UPITRANSACTION_TOPIC_NAME))
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();
 groupedData.sinkTo(sink); // DataStream that created above for TransactionAgg
  env.execute();


将Druid与Kafka主题连接

在这最后一步中,我们需要将Druid与Kafka主题集成,以消耗由Flink持续发布的处理数据流。借助Apache Druid,我们可以直接连接Apache Kafka,以便实时数据可以持续摄入,并随后进行查询,即可实时做出业务决策,而无需干预任何第三方系统或应用程序。Apache Druid的另一个亮点是我们无需配置或安装任何第三方UI应用程序来查看着陆到Kafka主题的数据或已发布到Kafka主题的数据。为了精简这篇文章,我省略了将Druid与Apache Kafka集成的步骤。不过,几个月前,我在本文中提到了同样的主题(在本文的早些链接里)。您可以阅读它并遵循相同的方法。

最终说明

上面提供的代码片段仅供理解之用。它说明了从Kafka主题获取消息/数据流、处理消耗的数据以及最终将修改后的数据发送/推送到另一个Kafka主题的顺序步骤。这使得Druid可以获取修改后的数据流进行查询、分析作为最终步骤。稍后,如果您有兴趣在自己的基础架构上执行此代码库,我们将在GitHub上上传整个代码库。

希望您喜欢阅读本文。如果您觉得这篇文章有价值,请考虑点赞和分享。

推荐阅读: 剑指offer 15.二进制中1的个数

本文链接: 从Apache Flink实时传输数据到Kafka再到Druid进行分析和决策