当前位置:Java -> 使用Kafka、Flink、LangChain和OpenAI进行GenAI演示
生成式人工智能(GenAI)实现了跨行业的自动化和创新。本博文探讨了Python、LangChain与OpenAI LLM结合的架构和演示,以及事件流和数据集成的Apache Kafka,以及流处理的Apache Flink。使用案例展示了数据流和GenAI如何帮助相关数据从Salesforce CRM中搜索线索信息,搜索Google和LinkedIn等公共数据集,并为销售代表推荐打破僵局的对话。
生成式人工智能(GenAI)指的是一类生成新内容的人工智能系统和模型,通常包括图像、文本、音频或其他类型的数据。这些模型能够理解并学习训练数据中存在的潜在模式、风格和结构,然后自行生成新的类似内容。
生成式人工智能在各个领域应用广泛,包括:
生成式人工智能面临的主要挑战之一是在生产基础架构中考虑上下文、可伸缩性和数据隐私。让我们探讨一个将GenAI整合到企业架构中以支持销售和营销的示例。
本文展示了一项演示,结合了由Apache Kafka和Flink提供实时数据流的大型语言模型与LangChain。如果您想了解如何使用Kafka和Flink与生成式人工智能结合进行数据流处理,请查看以下两篇文章:
以下演示是支持销售代表或自动化工具的生成式人工智能:
向我的同事 Carsten Muetzlitz 致敬,他构建了这个演示。代码可以在 GitHub 上获取。这是演示的架构:
以下技术和基础设施用于实现和部署GenAI演示。
以下是一个15分钟的视频,带您了解演示:
这个演示未使用用于RAG(检索增强生成)、模型嵌入或通过矢量数据库(如Pinecone、Weaviate、MongoDB或Oracle)进行向量搜索的高级GenAI技术。
这个演示的原则是KISS(“尽可能保持简单”)。这些技术可以并将被集成到许多现实世界的架构中。
该演示在延迟和规模方面存在限制。Kafka和Flink作为完全托管和弹性的SaaS运行。然而,围绕LangChain的AI/ML部分可以通过使用托管的SaaS和与其他专用AI平台集成来改善延迟。特别是数据密集型应用程序将需要矢量数据库和高级检索以及语义搜索技术,如RAG。
有趣的事实:当我搜索我的名字而不是Carsten的时候,演示会崩溃。因为网络爬虫在网上找到了太多关于我的内容,导致LangChain应用程序崩溃。这对于像Pinecone或MongoDB之类的辅助技术来说是一个有说服力的事件,这些技术可以在规模上进行索引、RAG和语义搜索。这些技术与Confluent Cloud进行完全托管集成,因此演示可以很容易地进行扩展。
LangChain是一个用语言模型驱动的应用程序开发的开源框架。LangChain也是该框架背后的商业供应商的名称。该工具为数据工程师提供所需的“粘合代码”,以便使用直观的API来构建GenAI应用,其中包括了连接大型语言模型(LLM)、具有上下文的提示、驱动有状态对话的代理以及与外部接口集成的工具。
LangChain支持:
LangChain软件包的主要价值主张是:
这些产品共同简化了整个应用程序生命周期:
该演示使用了几个LangChain概念,例如提示、聊天模型、使用LangChain表达式语言(LCEL)的链路,以及使用语言模型选择一系列要采取的行动的代理。
下面是LangChain业务流程的逻辑流程:
以下截图显示了生成的内容片段。它包括基于LinkedIn资料的特定上下文的打破僵局的对话。为了更好理解,Carsten在加入Confluent之前在甲骨文工作了24年。LLM使用这个LangChain提示的上下文来生成相关内容:
Apache Kafka是用于构建实时数据流管道和流应用程序的分布式流平台。它在高效可靠地处理和管理大量数据流方面起着至关重要的作用。
生成式AI通常涉及用于创建新数据的模型和算法,例如图像、文本或其他类型的内容。Apache Kafka通过提供可伸缩和弹性的基础架构来支持生成式AI。在生成式AI的环境中,Kafka可用于:
虽然Apache Kafka本身是专为生成式AI设计的工具,但其功能和能力有助于数据基础设施的整体效率和可伸缩性。Kafka的能力在处理大型数据集和复杂的机器学习模型方面至关重要,包括生成式AI应用中使用的模型。
Kafka是连接所有不同应用程序的数据平台。确保数据一致性是Kafka的一个优势所在,无论数据源或接收端是实时的、批处理的还是请求响应的API。
在这个演示中,Kafka从Salesforce CRM消费事件作为客户数据的主要数据源。不同的应用程序(Flink、LangChain、Salesforce)在业务流程的不同步骤中消费数据。Kafka Connect提供了与数据集成的能力,无需另外使用ETL、ESB或iPaaS工具。本演示使用Confluent的Change Data Capture(CDC)连接器实时消费Salesforce数据库中的变化,以进行进一步处理。
完全托管的Confluent Cloud是本演示中整个Kafka和Flink生态系统的基础设施。开发人员的重点应始终放在构建业务逻辑上,而不是担心操作基础设施。
虽然Kafka的核心是基于事件、实时和可伸缩的,但它也能够开箱即用地实现领域驱动设计和数据网格企业架构。
Apache Flink是一个开源的分布式流处理框架,用于实时分析和事件驱动应用程序。它的主要重点是高效且可伸缩地处理连续的数据流。虽然Apache Flink本身不是生成式AI的特定工具,但它在支持生成式AI工作流的某些方面起着作用。以下是Apache Flink相关的几种方式:
Apache Flink在生成式AI中的具体角色取决于特定的用例和整体系统架构。
本演示利用Apache Flink对传入的Salesforce CRM事件进行流式ETL(丰富化、数据质量改进)。
FlinkSQL提供了一种简单直观的方法,可以使用任何Java或Python代码实现ETL。完全托管的Confluent Cloud是本演示中Kafka和Flink的基础设施。无服务器的FlinkSQL允许根据需要进行扩展,但如果没有事件被消费和处理,则可以缩减到零。
该演示仅是一个开始。使用Apache Flink可以构建许多强大的应用程序。这包括流式ETL,以及Netflix、Uber等许多科技巨头的商业应用程序。
LangChain是一个易于使用的AI/ML框架,用于连接大型语言模型到其他数据源并创建有价值的应用程序。其灵活性和开放性使得开发人员和数据工程师能够构建各种应用程序,从聊天机器人到回答问题的智能系统。
使用Apache Kafka和Flink进行数据流可以为数据管道和流处理提供可靠且可伸缩的数据结构。Kafka的事件存储确保跨实时、批量和请求-响应API的数据一致性。领域驱动设计、微服务架构以及数据产品构建在数据网格中越来越多地依赖Kafka出于这些原因。
LangChain、GenAI技术(如OpenAI)以及与Kafka和Flink的数据流结合使得在实时基于AI的特定上下文决策方面具有强大的组合。
大多数企业在AI用例中采用云优先战略。数据流基础设施可在Confluent Cloud等SaaS中使用,使开发人员能够更快地关注业务逻辑。对于使用Python构建AI应用程序,存在许多选择(Python是AI的事实标准)。例如,可以在FlinkSQL应用程序中执行Python代码并将其从Kafka中消费,或者使用类似Quix Streams或Bytewax的独立应用程序开发框架和云平台来代替LangChain框架。
如何将Python、LangChain和LLM(大型语言模型)与Kafka和Flink等数据流技术结合起来?让我们在LinkedIn上进行连接并讨论吧!
推荐阅读: 14.线程的sleep()方法和yield()方法有什么不同?
本文链接: 使用Kafka、Flink、LangChain和OpenAI进行GenAI演示