当前位置:Java -> 使用Kafka、Flink、LangChain和OpenAI进行GenAI演示

使用Kafka、Flink、LangChain和OpenAI进行GenAI演示

生成式人工智能(GenAI)实现了跨行业的自动化和创新。本博文探讨了Python、LangChain与OpenAI LLM结合的架构和演示,以及事件流和数据集成的Apache Kafka,以及流处理的Apache Flink。使用案例展示了数据流和GenAI如何帮助相关数据从Salesforce CRM中搜索线索信息,搜索Google和LinkedIn等公共数据集,并为销售代表推荐打破僵局的对话。

GenAI Demo with Kafka, Flink, LangChain and OpenAI

生成式人工智能的出现

生成式人工智能(GenAI)指的是一类生成新内容的人工智能系统和模型,通常包括图像、文本、音频或其他类型的数据。这些模型能够理解并学习训练数据中存在的潜在模式、风格和结构,然后自行生成新的类似内容。

生成式人工智能在各个领域应用广泛,包括:

  • 图像生成:生成逼真的图像、艺术作品或图形
  • 文本生成:创建类似人类的文本,包括自然语言生成
  • 音乐作曲:生成新的音乐作曲或风格
  • 视频合成:创建逼真的视频内容
  • 数据增强:为机器学习模型生成额外的训练数据
  • 药物发现:为新药生成分子结构

生成式人工智能面临的主要挑战之一是在生产基础架构中考虑上下文、可伸缩性和数据隐私。让我们探讨一个将GenAI整合到企业架构中以支持销售和营销的示例。

演示:LangChain + Kafka + Flink = 自动化冷呼销售线索与Salesforce CRM和LinkedIn数据

本文展示了一项演示,结合了由Apache Kafka和Flink提供实时数据流的大型语言模型与LangChain。如果您想了解如何使用Kafka和Flink与生成式人工智能结合进行数据流处理,请查看以下两篇文章:

以下演示是支持销售代表或自动化工具的生成式人工智能:

  • Salesforce CRM通过其他接口或人工方式创建新的潜在客户。
  • 销售代表/SDR实时接收潜在客户信息以致电潜在客户。
  • 特殊的GenAI服务利用潜在客户信息(姓名和公司)搜索网络(主要是LinkedIn)为潜在客户的冷呼提供有用内容,包括摘要、两个有趣的事实、一个感兴趣的话题以及两个创意的打招呼方式以启动对话。

向我的同事 Carsten Muetzlitz 致敬,他构建了这个演示。代码可以在 GitHub 上获取。这是演示的架构:

GenAI Demo with Kafka, Flink, LangChain, OpenAI

演示中使用的技术和基础设施

以下技术和基础设施用于实现和部署GenAI演示。

  • Python:几乎每个数据工程师和数据科学家使用的编程语言
  • LangChain:Python框架实现了支持销售对话的应用
  • OpenAI:语言模型和API帮助构建简单但功能强大的GenAI应用
  • Salesforce:云CRM工具存储潜在客户信息和其他销售和营销数据
  • Apache Kafka:可扩展的实时数据中心,解耦数据源(CRM)和数据汇(GenAI应用和其他服务)
  • Kafka Connect:通过变更数据捕获(CDC)从Salesforce CRM进行数据集成
  • Apache Flink:用于对CRM数据进行丰富和数据质量改进的流处理
  • Confluent Cloud:完全托管的Kafka(流和存储)、Flink(处理)和Salesforce连接器(集成)
  • SerpAPI:利用潜在客户信息从Google和其他搜索引擎上抓取数据
  • proxyCurl:从LinkedIn获取关于潜在客户的丰富数据,无需担心扩展网络抓取和数据科学团队

Python + LangChain + OpenAI + Kafka + Flink 的现场演示

以下是一个15分钟的视频,带您了解演示:

  • 使用案例
  • 技术架构
  • 使用Kafka和LangChain的Python代码GitHub项目
  • 在Confluent Cloud UI中完全托管的Kafka和Flink
  • 通过Kafka Connect从Salesforce CRM实时推送新潜在客户
  • 使用Apache Flink进行流式ETL
  • Python、LangChain和OpenAI的生成式人工智能

缺失:LangChain演示中缺少矢量数据库和RAG与模型嵌入

这个演示未使用用于RAG(检索增强生成)、模型嵌入或通过矢量数据库(如Pinecone、Weaviate、MongoDB或Oracle)进行向量搜索的高级GenAI技术。

这个演示的原则是KISS(“尽可能保持简单”)。这些技术可以并将被集成到许多现实世界的架构中。

该演示在延迟和规模方面存在限制。Kafka和Flink作为完全托管和弹性的SaaS运行。然而,围绕LangChain的AI/ML部分可以通过使用托管的SaaS和与其他专用AI平台集成来改善延迟。特别是数据密集型应用程序将需要矢量数据库和高级检索以及语义搜索技术,如RAG。

有趣的事实:当我搜索我的名字而不是Carsten的时候,演示会崩溃。因为网络爬虫在网上找到了太多关于我的内容,导致LangChain应用程序崩溃。这对于像Pinecone或MongoDB之类的辅助技术来说是一个有说服力的事件,这些技术可以在规模上进行索引、RAG和语义搜索。这些技术与Confluent Cloud进行完全托管集成,因此演示可以很容易地进行扩展。

LangChain在GenAI中的角色

LangChain是一个用语言模型驱动的应用程序开发的开源框架。LangChain也是该框架背后的商业供应商的名称。该工具为数据工程师提供所需的“粘合代码”,以便使用直观的API来构建GenAI应用,其中包括了连接大型语言模型(LLM)、具有上下文的提示、驱动有状态对话的代理以及与外部接口集成的工具。

LangChain支持:

  • 上下文感知:将语言模型连接到上下文来源(提示说明、少量示例、内容以在其响应中进行基础、等等)
  • 推理:依赖于语言模型进行推理(根据所提供的上下文来回答问题、采取行动等等)

LangChain软件包的主要价值主张是:

  1. 组件:用于处理语言模型的组合工具和集成;组件是模块化且易于使用的,无论您是否使用LangChain框架的其余部分。
  2. 现成的链路:用于完成更高级任务的内置组件组合

LangChain软件包

这些产品共同简化了整个应用程序生命周期:

  • 开发:使用LangChain/LangChain.js编写您的应用程序。使用模板作为参考,快速上手。
  • 生产化:使用LangSmith来检查、测试和监控您的链路,以便您可以不断改进并带着信心部署。
  • 部署:使用LangServe将任何链路转换为API。

演示中的LangChain

该演示使用了几个LangChain概念,例如提示、聊天模型、使用LangChain表达式语言(LCEL)的链路,以及使用语言模型选择一系列要采取的行动的代理。

下面是LangChain业务流程的逻辑流程:

  1. 获取新的潜在客户:从Salesforce CRM实时收集潜在客户的全名和公司名称,并存储在Kafka主题中。
  2. 查找LinkedIn个人资料:使用Google Search API“SerpAPI”搜索潜在客户的LinkedIn个人资料的URL。
  3. 收集潜在客户信息:使用Proxycurl从LinkedIn收集潜在客户所需的信息。
  4. 为销售代表或自动脚本创建冷调建议:通过OpenAI API将所有信息引入ChatGPT LLM,并将生成的文本发送到Kafka主题。

以下截图显示了生成的内容片段。它包括基于LinkedIn资料的特定上下文的打破僵局的对话。为了更好理解,Carsten在加入Confluent之前在甲骨文工作了24年。LLM使用这个LangChain提示的上下文来生成相关内容:

Carsten的两个有趣的事实

Apache Kafka在GenAI中的角色

Apache Kafka是用于构建实时数据流管道和流应用程序的分布式流平台。它在高效可靠地处理和管理大量数据流方面起着至关重要的作用。

生成式AI通常涉及用于创建新数据的模型和算法,例如图像、文本或其他类型的内容。Apache Kafka通过提供可伸缩和弹性的基础架构来支持生成式AI。在生成式AI的环境中,Kafka可用于:

  • 数据摄入:Kafka可以处理大型数据集的摄入,包括用于训练生成式AI模型的多样化且可能容量大的数据。
  • 实时数据处理:Kafka的实时数据处理能力对于数据不断变化的情况很有用,可以用于快速更新和训练生成式AI模型。
  • 事件溯源:Kafka的事件溯源捕获并存储随时间发生的事件,提供数据变化的历史纪录。这些历史数据对于生成式AI模型的训练和改进是有价值的。
  • 与其他工具的集成:Kafka可以集成到较大的数据处理和机器学习管道中,促进数据在生成式AI工作流中的不同组件和工具之间的流动。

虽然Apache Kafka本身是专为生成式AI设计的工具,但其功能和能力有助于数据基础设施的整体效率和可伸缩性。Kafka的能力在处理大型数据集和复杂的机器学习模型方面至关重要,包括生成式AI应用中使用的模型。

演示中的Apache Kafka

Kafka是连接所有不同应用程序的数据平台。确保数据一致性是Kafka的一个优势所在,无论数据源或接收端是实时的、批处理的还是请求响应的API。

在这个演示中,Kafka从Salesforce CRM消费事件作为客户数据的主要数据源。不同的应用程序(Flink、LangChain、Salesforce)在业务流程的不同步骤中消费数据。Kafka Connect提供了与数据集成的能力,无需另外使用ETL、ESB或iPaaS工具。本演示使用Confluent的Change Data Capture(CDC)连接器实时消费Salesforce数据库中的变化,以进行进一步处理。

完全托管的Confluent Cloud是本演示中整个Kafka和Flink生态系统的基础设施。开发人员的重点应始终放在构建业务逻辑上,而不是担心操作基础设施。

虽然Kafka的核心是基于事件、实时和可伸缩的,但它也能够开箱即用地实现领域驱动设计和数据网格企业架构。

Apache Flink在GenAI中的角色

Apache Flink是一个开源的分布式流处理框架,用于实时分析和事件驱动应用程序。它的主要重点是高效且可伸缩地处理连续的数据流。虽然Apache Flink本身不是生成式AI的特定工具,但它在支持生成式AI工作流的某些方面起着作用。以下是Apache Flink相关的几种方式:

  1. 实时数据处理:Apache Flink可以实时处理和分析数据,这对于生成式AI模型需要在流式数据上操作、适应变化并实时生成响应的情景非常有用。
  2. 事件时间处理: Flink具有内置的事件时间处理支持,允许按照事件发生的顺序处理事件,即使它们是无序到达的。这在对时间顺序至关重要的情况下非常有益,比如用于训练或应用生成式AI模型的数据序列。
  3. 有状态处理:Flink支持有状态处理,可以在事件之间维护状态。这在生成式AI业务流程需要维护过去事件的上下文或记忆以生成连贯和上下文感知的输出时非常有用。
  4. 与机器学习库的集成:虽然Flink本身不是一个机器学习框架,但它可以与其他用于机器学习的工具和库集成,包括与生成式AI相关的工具和库。这种集成可以促进在基于Flink的流应用程序内部部署和执行机器学习模型。

Apache Flink在生成式AI中的具体角色取决于特定的用例和整体系统架构。

演示中的Apache Flink

本演示利用Apache Flink对传入的Salesforce CRM事件进行流式ETL(丰富化、数据质量改进)。

FlinkSQL提供了一种简单直观的方法,可以使用任何Java或Python代码实现ETL。完全托管的Confluent Cloud是本演示中Kafka和Flink的基础设施。无服务器的FlinkSQL允许根据需要进行扩展,但如果没有事件被消费和处理,则可以缩减到零。

该演示仅是一个开始。使用Apache Flink可以构建许多强大的应用程序。这包括流式ETL,以及Netflix、Uber等许多科技巨头的商业应用程序。

LangChain + 完全托管的Kafka和Flink = 简单、强大的实时GenAI

LangChain是一个易于使用的AI/ML框架,用于连接大型语言模型到其他数据源并创建有价值的应用程序。其灵活性和开放性使得开发人员和数据工程师能够构建各种应用程序,从聊天机器人到回答问题的智能系统。

使用Apache Kafka和Flink进行数据流可以为数据管道和流处理提供可靠且可伸缩的数据结构。Kafka的事件存储确保跨实时、批量和请求-响应API的数据一致性。领域驱动设计、微服务架构以及数据产品构建在数据网格中越来越多地依赖Kafka出于这些原因。

LangChain、GenAI技术(如OpenAI)以及与Kafka和Flink的数据流结合使得在实时基于AI的特定上下文决策方面具有强大的组合。

大多数企业在AI用例中采用云优先战略。数据流基础设施可在Confluent Cloud等SaaS中使用,使开发人员能够更快地关注业务逻辑。对于使用Python构建AI应用程序,存在许多选择(Python是AI的事实标准)。例如,可以在FlinkSQL应用程序中执行Python代码并将其从Kafka中消费,或者使用类似Quix Streams或Bytewax的独立应用程序开发框架和云平台来代替LangChain框架。

如何将Python、LangChain和LLM(大型语言模型)与Kafka和Flink等数据流技术结合起来?让我们在LinkedIn上进行连接并讨论吧!

推荐阅读: 14.线程的sleep()方法和yield()方法有什么不同?

本文链接: 使用Kafka、Flink、LangChain和OpenAI进行GenAI演示