当前位置:Java -> 使用Spring Boot探索Apache Ignite

使用Spring Boot探索Apache Ignite

在这里我要描述的使用情况中,我有2个服务:

  1. courses-service基本上提供了用于处理课程和讲师的CRUD操作
  2. reviews-service是另一个用于处理与courses-service中的课程完全无关的课程评价的CRUD操作提供者。

这两个应用都是用KotlinSpring Boot等库编写的。有了这两个服务,我们将讨论如何在Apache Ignite和Spring Boot中进行分布式缓存,并看看如何使用代码部署来通过Apache Ignite在服务上调用远程代码执行。

剧透警告:这里呈现的示例/用例纯粹是为了演示与一些Apache Ignite的能力集成;这里讨论的问题可以用各种方式解决,甚至可能有更好的解决方法,所以不要花太多时间去思考“为什么”。所以,话不多说,让我们开始编码吧。

:这里是源代码,如果您想一起学习的话。

简单分布式缓存

我们现在将把重点放在courses-service上,拥有这个实体:

@Entity
@Table(name = "courses")
class Course(
    var name: String,
    @Column(name = "programming_language")
    var programmingLanguage: String,
    @Column(name = "programming_language_description", length = 3000, nullable = true)
    var programmingLanguageDescription: String? = null,
    @Enumerated(EnumType.STRING)
    var category: Category,

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "instructor_id")
    var instructor: Instructor? = null
) : AbstractEntity() {

    override fun toString(): String {
        return "Course(id=$id, name='$name', category=$category)"
    }
}


以及CourseServiceImpl中的这个方法:

@Transactional
override fun save(course: Course): Course {
    return courseRepository.save(course)
}


我想增强保存的每个课程,添加一个由用户发送的编程语言描述。为此,我创建了一个维基百科API客户端,将在每次添加新课程时进行以下请求。

GET https://en.wikipedia.org/api/rest_v1/page/summary/java_(programming_language)


@Transactional
override fun save(course: Course): Course {
    enhanceWithProgrammingLanguageDescription(course)
    return courseRepository.save(course)
}

private fun enhanceWithProgrammingLanguageDescription(course: Course) {
    wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let { course.programmingLanguageDescription = it.summary }
}


Java、Kotlin、C#等其他热门编程语言。我们不希望为几乎相同的语言每次查询来降低保存的性能。此外,这也可以作为一种保护,以防API服务器宕机。

Apache Ignite是一个用于高性能计算的分布式数据库,具有内存速度。在Ignite中,数据存储在内存和/或磁盘上,并在多个节点的集群中进行分区或复制。这提供了可伸缩性、性能和弹性。

FAQ页面。 Spring Data时,就有其特殊情况需要讨论。有几种配置Apache Ignite的方法,可以通过XML或编程方式。我选择了编程方式配置Apache Ignite。
implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-spring-boot-autoconfigure-ext:1.0.0")


courses-service中的配置:
@Configuration
@Profile("!test")
@EnableConfigurationProperties(value = [IgniteProperties::class])
class IgniteConfig(val igniteProperties: IgniteProperties) {

    @Bean(name = ["igniteInstance"])
    fun igniteInstance(ignite: Ignite): Ignite {
        return ignite
    }

    @Bean
    fun configurer(): IgniteConfigurer {
        return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
            igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
            igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
        }
    }

    private fun configureDiscovery(): TcpDiscoverySpi {
        val spi = TcpDiscoverySpi()
        var ipFinder: TcpDiscoveryIpFinder? = null;
        if (igniteProperties.discovery.tcp.enabled) {
            ipFinder = TcpDiscoveryMulticastIpFinder()
            ipFinder.setMulticastGroup(DFLT_MCAST_GROUP)
        } else if (igniteProperties.discovery.kubernetes.enabled) {
            ipFinder = TcpDiscoveryKubernetesIpFinder()
            ipFinder.setNamespace(igniteProperties.discovery.kubernetes.namespace)
            ipFinder.setServiceName(igniteProperties.discovery.kubernetes.serviceName)
        }
        spi.setIpFinder(ipFinder)
        return spi
    }
}


IgniteProperties类,以便根据配置进行灵活配置。在我的情况下,本地将是多播发现,而在生产环境中,将是Kubernetes发现,但这个类并不是强制的。
@ConstructorBinding
@ConfigurationProperties(prefix = "ignite")
data class IgniteProperties(
    val instanceName: String,
    val discovery: DiscoveryProperties = DiscoveryProperties()
)

@ConstructorBinding
data class DiscoveryProperties(
    val tcp: TcpProperties = TcpProperties(),
    val kubernetes: KubernetesProperties = KubernetesProperties()
)

@ConstructorBinding
data class TcpProperties(
    val enabled: Boolean = false,
    val host: String = "localhost"
)

data class KubernetesProperties(
    val enabled: Boolean = false,
    val namespace: String = "evil-inc",
    val serviceName: String = "course-service"
)


application.yaml中的相应值:
ignite:
  instanceName: ${spring.application.name}-server-${random.uuid}
  discovery:
    tcp:
      enabled: true
      host: localhost
    kubernetes:
      enabled: false
      namespace: evil-inc
      service-name: course-service


igniteInstance的bean,这将是我们所有Ignite API的主要入口点。通过提供的ignite-spring-boot-autoconfigure-ext:1.0.0提供的IgniteConfigurer,我们开始配置我们的igniteInstance,并提供一个从属性获取的名称。然后我们通过TcpDiscoverySpi配置发现服务提供者接口。正如我之前提到的,根据提供的属性,我将使用TcpDiscoveryMulticastIpFinderTcpDiscoveryKubernetesIpFinder。有了这些,我们的基本配置就完成了,现在可以开始了! Apache Ignite由一个H2内存数据库支持,在Spring Boot领域中,您会自动获取它。这既是一种祝福又是一种诅咒,因为仅支持特定版本的H2,并且我们需要在build.gradle中明确声明它,就像这样:
ext["h2.version"] = "1.4.197"


Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.ignite.IgniteJdbcThinDriver


--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED
--add-opens=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED


现在我们可以开始了!

INFO 11116 --- [4-6ceb9d7d547b%] o.a.i.i.m.d.GridDiscoveryManager         : Topology snapshot [ver=2, locNode=9087c6ef, servers=1, clients=0, state=ACTIVE, CPUs=16, offheap=6.3GB, heap=4.0GB …
INFO 11116 --- [4-6ceb9d7d547b%] o.a.i.i.m.d.GridDiscoveryManager         :   ^-- Baseline [id=0, size=1, online=1, offline=0]
INFO 32076 --- [           main] o.a.i.s.c.tcp.TcpCommunicationSpi        : Successfully bound communication NIO server to TCP port [port=47100, locHost=0.0.0.0/0.0.0.0, selectorsCnt=8, selectorSpins=0, pairedConn=false]
INFO 32076 --- [           main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi  : Successfully bound to TCP port [port=47500, localHost=0.0.0.0/0.0.0.0, locNodeId=84e5553d-a7a9-46d9-a98c-81f34bf84673]


一旦你看到这个日志,Ignite就已经启动并运行了。拓扑快照表明有一个运行的服务器,没有客户端,并且我们可以看到发现/通信是通过绑定到端口47100/47500进行的。

此外,在日志中,您可能会注意到一些警告信息。让我们看看如何摆脱它们:

1.

^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)


添加以下VM参数:-XX:MaxDirectMemorySize=256m

2.

^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)


添加以下VM参数:

  • -Xms512m
  • -Xmx2g

3.

Metrics for local node (to disable set 'metricsLogFrequency' to 0)


这个并不是一个真正的问题,在开发过程中可能非常方便,但目前它会持续向日志中发送不必要的信息,我们将通过在配置中添加以下一行来禁用它:

  • igniteConfiguration.setMetricsLogFrequency(0)

4.

Message queue limit is set to 0 which may lead to potential OOMEs


这个在抱怨负责限制传入和传出消息的参数,默认值为0,也就是无限制。因此,我们将通过配置TcpCommunicationSpi来设置限制:

igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi())

    private fun configureTcpCommunicationSpi(): TcpCommunicationSpi {
        val tcpCommunicationSpi = TcpCommunicationSpi()
        tcpCommunicationSpi.setMessageQueueLimit(1024)
        return tcpCommunicationSpi
    }


好了,现在一切都设置好了,我们可以继续了。让我们在IgniteConfig类中配置一个缓存,看看如何解决维基百科响应缓存的问题。在Apache Ignite中,我们可以在配置级别或运行时配置缓存(在运行时,你也可以使用模板)。在这个演示中,我将展示如何在配置中配置它。

@Bean
fun configurer(): IgniteConfigurer {
    return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
        igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
        igniteConfiguration.setDiscoverySpi(configureDiscovery())
        igniteConfiguration.setMetricsLogFrequency(0) 
        igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi())        
        igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
    }
}


配置Ignite的入口点仍然是IgniteConfiguration-igniteConfiguration.setCacheConfiguration。这一行接受各种CacheConfiguration

private fun wikipediaSummaryCacheConfiguration(): CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary> {
    val wikipediaCache = CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary>(WIKIPEDIA_SUMMARIES)
    wikipediaCache.setIndexedTypes(String::class.java, WikipediaApiClientImpl.WikipediaSummary::class.java)
    wikipediaCache.setEagerTtl(true)
    wikipediaCache.setCacheMode(CacheMode.REPLICATED)
    wikipediaCache.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
    wikipediaCache.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
    wikipediaCache.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration(TimeUnit.MINUTES, 60)))
    return wikipediaCache
}


wikipediaSummaryCacheConfiguration返回一个CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary>:根据我们的需求,每种编程语言一个维基百科摘要。这个类定义了网格缓存配置。它定义了启动网格实例内缓存所需的所有配置参数。现在让我们看看如何配置它:

  • setIndexedTypes():此函数用于指定键和值类型的索引数组。
  • setEagerTtl():将其设置为true,Ignite将主动删除已过期的缓存条目。
  • setExpiryPolicyFactory():此配置设置在60分钟后使缓存过期。
  • setCacheMode():当您选择REPLICATED模式时,所有键都会分发到每个参与节点。默认模式是PARTITIONED,其中键被划分为分区并分布在节点之间。您可以使用setBackups()控制副本数,并指定分区丢失策略。
  • setWriteSynchronizationMode():此标志确定Ignite是否会等待来自其他节点的写入或提交响应。默认为PRIMARY_SYNC,Ignite等待主节点完成写入或提交,而不是等待备份节点更新。
  • setAtomicityMode():将此设置为TRANSACTIONAL可以为键值操作启用完全符合ACID标准的事务。相反,ATOMIC模式禁用了分布式事务和锁定功能,提供了更高的性能但牺牲了事务性功能。

有了这个配置,剩下的就是调整我们的enhanceWithProgrammingLanguageDescription方法来缓存获取的维基百科摘要:

private fun enhanceWithProgrammingLanguageDescription(course: Course) {
    val summaries = igniteInstance.cache<String, WikipediaApiClientImpl.WikipediaSummary>(WIKIPEDIA_SUMMARIES)
    log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
    summaries[course.programmingLanguage]?.let {
        log.debug("Cache value found, using cache's response $it to update $course programming language description")
        course.programmingLanguageDescription = it.summary
    } ?: wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
        log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
        summaries.putIfAbsent(course.programmingLanguage, it)
        it
    }?.let { course.programmingLanguageDescription = it.summary }
}


基本上,我们在Ignite实例上使用bean来检索我们配置的缓存。每个实例都是Apache Ignite集群中的一个成员和/或客户端。获取复制缓存后,只需要进行一些简单的检查:如果在我们的映射中针对编程语言键有摘要,那么我们使用它。如果没有,则从维基百科API获取它,添加到映射中并使用它。

现在让我们看看它的实际效果。如果我们执行以下HTTP请求:

###
POST http://localhost:8080/api/v1/courses
Content-Type: application/json

{
  "name": "C++ Development",
  "category": "TUTORIAL",
  "programmingLanguage" : "C++",
  "instructor": {
    "name": "Bjarne Stroustrup"
  }
}


我们在日志中可以看到:

DEBUG 32076 --- [nio-8080-exec-1] i.e.c.s.i.CourseServiceImpl$Companion     : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(0)]
DEBUG 32076 --- [nio-8080-exec-1] i.e.c.s.i.CourseServiceImpl$Companion     : No cache value found, using wikipedia's response


我们检索到之前配置的维基百科摘要缓存,但其大小为0。因此,更新是通过维基百科的API进行的。现在,如果我们再次执行相同的请求,我们将注意到不同的行为:

DEBUG 32076 --- [nio-8080-exec-2] i.e.c.s.i.CourseServiceImpl$Companion     : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(1)]
DEBUG 32076 --- [nio-8080-exec-2] i.e.c.s.i.CourseServiceImpl$Companion     : Cache value found, using cache's response…


现在缓存的大小为1,并且由于它是由我们之前的请求填充的,因此没有观察到对维基百科API的请求。然而,真正突出显示Apache Ignite集成的优雅和简易之处是当我们使用-Dserver.port=8060选项在不同端口启动我们应用的另一个实例时。这时我们可以看到复制缓存机制的运作。

INFO 37600 --- [           main] o.a.i.s.c.tcp.TcpCommunicationSpi         : Successfully bound communication NIO server to TCP port [port=47101, locHost=0.0.0.0/0.0.0.0, selectorsCnt=8, selectorSpins=0, pairedConn=false]
INFO 37600 --- [           main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi   : Successfully bound to TCP port [port=47501, localHost=0.0.0.0/0.0.0.0, locNodeId=4770d2ff-2979-4b4b-8d0e-30565aeff75e]
INFO 37600 --- [1-d0db3c4f0d78%] a.i.i.p.c.d.d.p.GridDhtPartitionDemander : Starting rebalance routine [WIKIPEDIA_SUMMARIES]
INFO 37600 --- [           main] o.a.i.i.m.d.GridDiscoveryManager          : Topology snapshot [ver=6, locNode=4770d2ff, servers=2, clients=0, state=ACTIVE, CPUs=16, offheap=13.0GB, heap=4.0GB...
INFO 37600 --- [           main] o.a.i.i.m.d.GridDiscoveryManager         :    ^-- Baseline [id=0, size=2, online=2, offline=0]


我们看到我们的TcpDiscoveryMulticastIpFinder在端口47100/47500发现了已经运行的一个Apache Ignite节点,与我们第一个运行在端口8080上的courses-service实例一起运行。因此,此外,还在端口47101/47501上建立了一个新的集群连接。这触发了我们缓存的再平衡例程。最终我们在拓扑日志行中观察到现在服务器数量是2。现在如果我们在8060实例上发出新的HTTP请求来创建相同的课程,我们将看到以下情况:

DEBUG 37600 --- [nio-8060-exec-2] i.e.c.s.i.CourseServiceImpl$Companion     : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(1)]
DEBUG 37600 --- [nio-8060-exec-2] i.e.c.s.i.CourseServiceImpl$Companion     : Cache value found, using cache's response


因此,我们使用的是大小为1的相同缓存,没有对维基百科API的请求。正如您所想,如果我们在8060上为另一种语言进行一些请求,填充的缓存也将在为该语言请求的8080上看到。

Spring数据支持

Apache Ignite一起提供的一个相当令人惊讶的特性是Spring数据支持,它允许我们以更优雅/熟悉的方式与缓存进行交互。Spring数据框架提供了一个被广泛采用的API,从应用程序层面抽象出底层数据存储。Apache Ignite通过实现Spring数据CrudRepository接口与Spring数据无缝集成。该集成进一步增强了应用程序数据层的灵活性和适应性。

让我们通过添加以下依赖来进行配置:

implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")


通过扩展IgniteRepository来声明我们的存储库。

@Repository
@RepositoryConfig(cacheName = WIKIPEDIA_SUMMARIES)
interface WikipediaSummaryRepository : IgniteRepository<WikipediaApiClientImpl.WikipediaSummary, String>


同时拥有Ignite的Spring数据支持和Spring数据JPA在类路径上可能会产生一些bean扫描问题,我们可以通过明确指示JPAIgnite从哪里查找它们的bean来解决这些问题,就像这样:

@EnableIgniteRepositories(basePackages = ["inc.evil.coursecatalog.ignite"])
@EnableJpaRepositories(
    basePackages = ["inc.evil.coursecatalog.repo"],
    excludeFilters = [ComponentScan.Filter(type = FilterType.ANNOTATION, value = [RepositoryConfig::class])]
)


有了这样的配置,我们确保Ignite仅会在Ignite包中扫描其存储库,JPA将只在repo包中扫描其存储库,并排除任何带有@RepositoryConfig的类。

现在让我们重构我们的CourseServiceImpl,使其使用新创建的WikipediaSummaryRepository

private fun enhanceWithProgrammingLanguageDescription(course: Course) {
    val summaries = wikipediaSummaryRepository.cache()
    log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
    wikipediaSummaryRepository.findById(course.programmingLanguage).orElseGet {
        wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
            log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
            wikipediaSummaryRepository.save(course.programmingLanguage, it)
            it
        }
    }?.let { course.programmingLanguageDescription = it.summary }
}


我们不再直接与低级别缓存/映射交互,而是转向将我们的请求指向一个称为WikipediaSummaryRepository的新高级类。这种方法不仅在实现/使用上更加优雅,而且与Spring粉丝更为契合,不是吗?此外,您可能已经注意到,我们不再需要igniteInstance来访问缓存。存储库可以通过.cache()方法提供给我们,因此即使我们使用IgniteRepository,我们也不会失去访问我们的缓存及其低级操作的权限。如果我们像使用缓存一样与之交互,我们会发现行为没有发生变化。

但等等,还有更多!与Spring数据集成会带来大量优势:查询抽象/查询生成、手动查询、分页/排序、投影、具有Cache.Entry返回类型或类似实体类型的查询 - 您以及IgniteRepository都将拥有。出于这个目的,我将尝试使用CommandLineRunner,因为我不提供任何API直接与WikipediaSummaryRepository集成。

首先,让我们编写一些查询:

@Repository
@RepositoryConfig(cacheName = WIKIPEDIA_SUMMARIES)
interface WikipediaSummaryRepository : IgniteRepository<WikipediaSummary, String> {

    fun findByTitle(title: String): List<WikipediaSummary>

    fun findByDescriptionContains(keyword: String): List<Cache.Entry<String, WikipediaSummary>>

    @Query(value = "select description, count(description) as \"count\" from WIKIPEDIA_SUMMARIES.WIKIPEDIASUMMARY group by description")
    fun countPerDescription(): List<CountPerProgrammingLanguageType>

    interface CountPerProgrammingLanguageType {
        fun getDescription(): String
        fun getCount(): Int
    }
}


这是CommandLineRunner

@Bean
fun init(client: WikipediaApiClient, repo: WikipediaSummaryRepository): CommandLineRunner = CommandLineRunner {
    run {
        client.fetchSummaryFor("Java programming language")?.let { repo.save("Java", it) }
        client.fetchSummaryFor("Kotlin programming language")?.let { repo.save("Kotlin", it) }
        client.fetchSummaryFor("C++")?.let { repo.save("C++", it) }
        client.fetchSummaryFor("Python programming language")?.let { repo.save("C#", it) }
        client.fetchSummaryFor("Javascript")?.let { repo.save("Javascript", it) }

        repo.findAll().forEach { log.info("Fetched {}", it) }
        repo.findByTitle("Kotlin").forEach { log.info("Fetched by title {}", it) }
        repo.findByDescriptionContains("programming language").forEach { log.info(" Fetched by description {}", it) }
        repo.countPerDescription().forEach { log.info("Count per description {}", it) }
    }
}


在我们运行之前,我们将必须稍微调整我们的缓存实体,如下所示:

@JsonIgnoreProperties(ignoreUnknown = true)
data class WikipediaSummary(
    @JsonProperty("title")
    @QuerySqlField(name = "title", index = true)
    val title: String,
    @JsonProperty("description")
    @QuerySqlField(name = "description", index = false)
    val description: String,
    @JsonProperty("extract")
    @QuerySqlField(name = "summary", index = false)
    val summary: String
)


您可能已经注意到每个字段上的@QuerySqlField,所有将涉及到SQL子句的字段都必须具有此注解。需要这个注解是为了指示Ignite为我们的每个字段创建一个列;否则,它将创建一个包含我们负载的单个大列。这有点侵入式,但这是为了换取我们所获得的各种可能性而付出的小代价。现在一旦运行,我们将得到以下日志行:

INFO 3252 --- [            main] i.e.c.CourseCatalogApplication$Companion : Fetched WikipediaSummary(title=Python (programming language)…
…
INFO 3252 --- [            main] i.e.c.CourseCatalogApplication$Companion :  Fetched by description Entry [key=C#, val=WikipediaSummary(title=Python (programming language)…
…
INFO 3252 --- [            main] i.e.c.CourseCatalogApplication$Companion : Count per description {count=1, description=General-purpose programming language derived from Java}
…


这证明了我们的实现符合预期。

注意: 如果您想要在研究过程中连接到Ignite的内存数据库,可能会遇到这个VM参数:-DIGNITE_H2_DEBUG_CONSOLE=true。我想提一下,Ignite团队在2.8版本中弃用了IGNITE_H2_DEBUG_CONSOLE,转而支持他们的轻量级JDBC驱动。因此,如果要连接到数据库,请参考更新的文档,简而言之:JDBC URL是jdbc:ignite:thin://127.0.0.1/,默认端口10800,而IntelliJ在其数据库数据源中提供了一流的支持。

分布式锁

另一个随Apache Ignite提供的有用功能是分布式锁的API。假设我们的enhanceWithProgrammingLanguageDescription方法是一个与缓存和其他资源打交道的耗时操作,我们不希望同一实例上的其他线程甚至来自不同实例的其他请求在操作完成之前干扰或更改某些内容。这时就需要IgniteLock登场:这个接口提供了一个用于管理分布式可重入锁的全面API,类似于java.util.concurrent.ReentrantLock。您可以使用IgnitereentrantLock()方法创建这些锁的实例。当将failoverSafe标志设置为true时,IgniteLock提供了对节点故障的保护:锁将自动恢复。如果拥有锁的节点失败,确保整个集群中的锁管理不中断。另一方面,如果failoverSafe设置为false,节点失败将导致IgniteException,使锁无法使用。因此在这种情况下,让我们尝试保护我们所谓的“临界区”。

private fun enhanceWithProgrammingLanguageDescription(course: Course) {
    val lock = igniteInstance.reentrantLock(SUMMARIES_LOCK, true, true, true)
    if (!lock.tryLock()) throw LockAcquisitionException(SUMMARIES_LOCK, "enhanceWithProgrammingLanguageDescription")
    log.debug("Acquired lock {}", lock)
    Thread.sleep(2000)
    val summaries = wikipediaSummaryRepository.cache()
    log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
    wikipediaSummaryRepository.findById(course.programmingLanguage).orElseGet {
        wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
            log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
            wikipediaSummaryRepository.save(course.programmingLanguage, it)
            it
        }
    }?.let { course.programmingLanguageDescription = it.summary }
    lock.unlock()
}


如您所见,这个实现相当简单:我们通过igniteInstancereentrantLock方法获取锁,然后尝试使用tryLock()锁定它。如果获取的锁可用或已被当前线程持有,则锁定将成功,它将立即返回true。否则,它将返回false并抛出一个LockAcquisitionException。然后我们通过使用Thread.sleep(2000)来模拟一些耗时工作,最后使用unlock()释放已获取的锁。

现在,如果我们在端口8080运行应用的单个实例并尝试进行2个连续请求,其中一个会成功,另一个会失败:

ERROR 36580 --- [nio-8080-exec-2] e.c.w.r.e.RESTExceptionHandler$Companion : Exception while handling request [summaries-lock] could not be acquired for [enhanceWithProgrammingLanguageDescription] operation. Please try again.
inc.evil.coursecatalog.common.exceptions.LockAcquisitionException: [summaries-lock] could not be acquired for [enhanceWithProgrammingLanguageDescription] operation. Please try again.


如果我们将一个请求发送到我们应用的8080实例,然后在2秒的时间范围内发送到8060实例,第一个请求将成功,而第二个请求将失败。

代码部署

现在让我们将注意力转向reviews-service,记住 - 这个服务完全不知道课程:它只是为某个course_id添加评论的一种方式。考虑到这一点,我们有这个实体:

@Table("reviews")
data class Review(
    @Id
    var id: Int? = null,
    var text: String,
    var author: String,
    @Column("created_at")
    @CreatedDate
    var createdAt: LocalDateTime? = null,
    @LastModifiedDate
    @Column("last_modified_at")
    var lastModifiedAt: LocalDateTime? = null,
    @Column("course_id")
    var courseId: Int? = null
)


而且我们在ReviewServiceImpl中有这个方法。

因此,我们新的“愚蠢”功能请求将是以某种方式检查写评论的课程是否存在。我们该怎么做?最明显的选择可能是在courses-service上调用一个REST端点,检查我们是否有评论的该课程course_id,但这不是本文讨论的内容。我们有Apache Ignite,对吧?我们将通过Ignite的集群从course-service调用reviews-service中的代码。

为此,我们需要创建某种API或Gateway模块,我们将其发布为一个构件,这样courses-service就可以实现它,reviews-service可以依赖并使用它来调用代码。

好的 - 首先,让我们将新模块设计为一个courses-api模块:

plugins {
    id("org.springframework.boot") version "2.7.3"
    id("io.spring.dependency-management") version "1.0.13.RELEASE"
    kotlin("jvm") version "1.6.21"
    kotlin("plugin.spring") version "1.6.21"
    kotlin("plugin.jpa") version "1.3.72"
    `maven-publish`
}

group = "inc.evil"
version = "0.0.1-SNAPSHOT"

repositories {
    mavenCentral()
}

publishing {
    publications {
        create<MavenPublication>("maven") {
            groupId = "inc.evil"
            artifactId = "courses-api"
            version = "1.1"

            from(components["java"])
        }
    }
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.6.4")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
    implementation("org.apache.commons:commons-lang3:3.12.0")

    implementation("org.apache.ignite:ignite-core:2.15.0")


    testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.8.1")
}

tasks.getByName<Test>("test") {
    useJUnitPlatform()
}


这里没有什么花里胡哨的,除了我们将使用maven-publish插件将构件发布到本地Maven仓库。

这是courses-service将实现的接口,reviews-service将使用:

interface CourseApiFacade: Service {
    companion object {
        const val COURSE_API_FACADE_SERVICE_NAME = "CourseApiFacade"
    }
    fun findById(id: Int): CourseApiResponse
}


data class InstructorApiResponse(
    val id: Int?,
    val name: String?,
    val summary: String?,
    val description: String?
)

data class CourseApiResponse(
    val id: Int?,
    val name: String,
    val category: String,
    val programmingLanguage: String,
    val programmingLanguageDescription: String?,
    val createdAt: String,
    val updatedAt: String,
    val instructor: InstructorApiResponse
)


您可能已经注意到CourseApiFacade扩展了org.apache.ignite.services.Service接口 - 网格管理服务的实例,我们的服务可能部署在这里。

courses-service中:
implementation(project(":courses-api"))


@Component
class CourseApiFacadeImpl : CourseApiFacade {

    @Transient
    @SpringResource(resourceName = "courseService")
    lateinit var courseService: CourseServiceImpl

    @Transient
    @IgniteInstanceResource //spring constructor injection won't work since ignite is not ready
    lateinit var igniteInstance: Ignite

    companion object {
        private val log: Logger = LoggerFactory.getLogger(this::class.java)
    }

    override fun findById(id: Int): CourseApiResponse = courseService.findById(id).let {
        CourseApiResponse(
            id = it.id,
            name = it.name,
            category = it.category.toString(),
            programmingLanguage = it.programmingLanguage,
            programmingLanguageDescription = it.programmingLanguageDescription,
            createdAt = it.createdAt.toString(),
            updatedAt = it.updatedAt.toString(),
            instructor = InstructorApiResponse(it.instructor?.id, it.instructor?.name, it.instructor?.summary, it.instructor?.description)
        )
    }

    override fun cancel() {
        log.info("Canceling service")
    }

    override fun init() {
        log.info("Before deployment :: Pre-initializing service before execution on node {}", igniteInstance.cluster().forLocal().node())
    }

    override fun execute() {
        log.info("Deployment :: The service is deployed on grid node {}", igniteInstance.cluster().forLocal().node())
    }
}


你可以看到,CourseFacadeImpl实现了CourseFacade方法findById并且重写了Service接口的一些方法,用于调试。当一个服务被部署在集群节点上,Ignite将调用该服务的execute()方法。类似地,当一个部署的服务被取消时,Ignite将自动调用该服务的cancel()方法。init()保证在execute()之前调用。另外,还有一些新的注释:

  1. @SpringResource(resourceName = "courseService") - 用于注释从Spring ApplicationContext注入资源的字段或setter方法。由于现在是IgniteService,我们需要让Ignite处理bean的注入。resourceName是一个强制字段,等于Spring applicationContext中的bean名称。
  2. @IgniteInstanceResource - 再次提醒,由于这将被部署,我们不能再依赖Spring进行自动装配,因此Ignite提供了这个注释,可以将igniteInstance注入到网格任务和网格作业中。
  3. @Transient/transient in java - 这个注释/关键字确保在集群中不序列化不必要的对象层次结构。

为了使上述一切工作,我们必须稍微修改我们的build.gradle依赖项以适配Ignite

implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-spring:2.15.0")
implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")


我们放弃了ignite-spring-boot-autoconfigure,转而使用ignite-spring,因为我无法让Ignite意识到Spring的应用程序上下文与自动配置。正如你可能猜到的,由于我们不再有IgniteAutoConfiguration,我们现在必须手动编写Igniteconfiguration,但别担心:它们是相似的。这是courses-service中更新的IgniteConfig

@Configuration
@Profile("!test")
@EnableConfigurationProperties(value = [IgniteProperties::class])
@EnableIgniteRepositories(basePackages = ["inc.evil.coursecatalog.ignite"])
class IgniteConfig(
    val igniteProperties: IgniteProperties,
    val applicationContext: ApplicationContext
) {
    companion object {
        const val WIKIPEDIA_SUMMARIES = "WIKIPEDIA_SUMMARIES"
    }

    @Bean(name = ["igniteInstance"])
    fun igniteInstance(igniteConfiguration: IgniteConfiguration): Ignite {
        return IgniteSpring.start(igniteConfiguration, applicationContext)
    }

    @Bean
    fun igniteConfiguration(): IgniteConfiguration {
        val igniteConfiguration = IgniteConfiguration()
        igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
        igniteConfiguration.setMetricsLogFrequency(0) // no spam
        igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
        igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
        igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
        return igniteConfiguration
    }
}


改变不是很大,对吧?我们声明了一个名为IgniteConfiguration的bean,而不是IgniteConfigurer,它负责我们的配置。我们在配置中注入了applicationContext,以便在重写的igniteInstance bean中传递它,现在这个bean是Spring感知的IgniteSpring

现在我们已经更新了我们的配置,我们必须告诉Ignite关于我们的新的IgniteService - CourseApiFacade

    @Bean
    fun igniteConfiguration(): IgniteConfiguration {
        val igniteConfiguration = IgniteConfiguration()
        igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
        igniteConfiguration.setPeerClassLoadingEnabled(true)
        igniteConfiguration.setMetricsLogFrequency(0) // no spam
        igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
        igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
        igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
        igniteConfiguration.setServiceConfiguration(courseApiFacadeConfiguration()) //vararg
        return igniteConfiguration
    }

    private fun courseApiFacadeConfiguration(): ServiceConfiguration {
        val serviceConfiguration = ServiceConfiguration()
        serviceConfiguration.service = courseApiFacade
        serviceConfiguration.name = CourseApiFacade.COURSE_API_FACADE_SERVICE_NAME
        serviceConfiguration.maxPerNodeCount = 1
        return serviceConfiguration
    }


我们创建了一个ServiceConfiguration,它绑定到courseApiFacade,其名称来自于courses-api中公开接口的名称,并且设置每个节点一个服务,最后我们在IgniteConfiguration中设置了courseApiFacadeConfiguration

现在回到 reviews-service。首先,我们想要添加所需的依赖项为Apache Ignite,由于reviews-service更简单,不需要Spring感知的Ignite。我们将在这里使用ignite-spring-boot-autoconfigure

implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-spring-boot-autoconfigure-ext:1.0.0")
implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")


此外,我先前提到我们将使用courses-api的接口。我们可以在courses-api上运行publishMavenPublicationToMavenLocal gradle任务,将我们的artifact发布,然后我们可以将以下依赖项添加到reviews-service中。

implementation("inc.evil:courses-api:1.1")


现在我们还需要像之前在courses-service中一样配置Ignite

@Configuration
@EnableConfigurationProperties(value = [IgniteProperties::class])
@EnableIgniteRepositories(basePackages = ["inc.evil.reviews.ignite"])
class IgniteConfig(val igniteProperties: IgniteProperties) {

    @Bean(name = ["igniteInstance"])
    fun igniteInstance(ignite: Ignite): Ignite {
        return ignite
    }

    @Bean
    fun configurer(): IgniteConfigurer {
        return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
            igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
            igniteConfiguration.setClientMode(true)
           
            igniteConfiguration.setMetricsLogFrequency(0) // no spam
            igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
            igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
        }
    }

    private fun configureTcpCommunicationSpi(): TcpCommunicationSpi {
        val tcpCommunicationSpi = TcpCommunicationSpi()
        tcpCommunicationSpi.setMessageQueueLimit(1024)
        return tcpCommunicationSpi
    }

    private fun configureDiscovery(): TcpDiscoverySpi {
        val spi = TcpDiscoverySpi()
        var ipFinder: TcpDiscoveryIpFinder? = null;
        if (igniteProperties.discovery.tcp.enabled) {
            ipFinder = TcpDiscoveryMulticastIpFinder()
            ipFinder.setMulticastGroup(DFLT_MCAST_GROUP)
        } else if (igniteProperties.discovery.kubernetes.enabled) {
            ipFinder = TcpDiscoveryKubernetesIpFinder()
            ipFinder.setNamespace(igniteProperties.discovery.kubernetes.namespace)
            ipFinder.setServiceName(igniteProperties.discovery.kubernetes.serviceName)
        }
        spi.setIpFinder(ipFinder)
        return spi
    }
}


courses-service的唯一区别是reviews-service将在客户端模式下运行。除此之外,一切都是一样的。好的,Ignite配置好了,现在是时候在reviews-service中利用我们的IgniteService - courses-service了。为此,我创建了这个类:

@Component
class IgniteCoursesGateway(private val igniteInstance: Ignite) {

    fun findCourseById(id: Int) = courseApiFacade().findById(id)

    private fun courseApiFacade(): CourseApiFacade {
        return igniteInstance.services(igniteInstance.cluster().forServers())
            .serviceProxy(CourseApiFacade.COURSE_API_FACADE_SERVICE_NAME, CourseApiFacade::class.java, false)
    }
}


IgniteCoursesGateway是通过Ignite集群进入课程域的入口点。通过自动装配的igniteInstance,我们获取了名称为COURSE_API_FACADE_SERVICE_NAMECourseApiFacade类型的serviceProxy。我们还告诉Ignite始终尝试在服务之间进行负载平衡,将sticky flag设置为false。然后在findCourseById()中,我们简单地使用获取到的serviceProxy通过id查询所需的课程。

IgniteCoursesGateway在ReviewServiceImpl中使用,完成了特性的要求。
override suspend fun save(review: Review): Review {
    runCatching {
        igniteCoursesGateway.findCourseById(review.courseId!!).also { log.info("Call to ignite ended with $it") }
    }.onFailure { log.error("Oops, ignite remote execution failed due to ${it.message}", it) }
        .getOrNull() ?: throw NotFoundException(CourseApiResponse::class, "course_id", review.courseId.toString())
    return reviewRepository.save(review).awaitFirst()
}


逻辑如下:在保存之前,我们尝试通过在我们的Ignite 集群中调用findCourseById来查找评论的course_id所对应的课程。如果出现异常(如果请求的课程未找到,CourseApiFacadeImpl将抛出NotFoundException),我们将忽略该异常,并抛出一个NotFoundException,说明课程无法被检索。如果我们的方法返回了一个课程,我们就继续保存它 - 就是这样。

现在让我们重新启动course-service并观察日志:

INFO 23372 --- [a-67c579c6ea47%] i.e.c.f.i.CourseApiFacadeImpl$Companion   : Before deployment :: Pre-initializing service before execution on node TcpDiscoveryNode …
INFO 23372 --- [a-67c579c6ea47%] o.a.i.i.p.s.IgniteServiceProcessor        : Starting service instance [name=CourseApiFacade, execId=52de6edc-ac6f-49d4-8c9e-17d6a6ebc8d5]
INFO 23372 --- [a-67c579c6ea47%] i.e.c.f.i.CourseApiFacadeImpl$Companion   : Deployment :: The service is deployed on grid node TcpDiscoveryNode …


根据我们在Service接口中重写的方法,我们可以看到CourseApiFacade已成功部署。现在我们有了正在运行的courses-service,如果我们启动reviews-service,我们会看到如下日志:

INFO 13708 --- [           main] o.a.i.i.m.d.GridDiscoveryManager          : Topology snapshot [ver=2, locNode=cb90109d, servers=1, clients=1, state=ACTIVE, CPUs=16, offheap=6.3GB, heap=4.0GB...
INFO 13708 --- [           main] o.a.i.i.m.d.GridDiscoveryManager         :    ^-- Baseline [id=0, size=1, online=1, offline=0]


您可能注意到我们有1个服务器正在运行以及1个客户端。现在让我们尝试为现有课程添加评论的请求(reviews-service正在使用GraphQL)。

GRAPHQL http://localhost:8070/graphql
Content-Type: application/graphql

mutation { createReview(request: {text: "Amazing, loved it!" courseId: 39 author: "Mike Scott"}) {
    id
    text
    author
    courseId
    createdAt
    lastModifiedAt
}
}


在日志中,我们将注意到:

INFO 13708 --- [actor-tcp-nio-1] i.e.r.s.i.ReviewServiceImpl$Companion    : Call to ignite ended with CourseApiResponse(id=39, name=C++ Development, category=TUTORIAL …


而在courses-service的日志中,我们将注意到以下代码执行:

DEBUG 29316 --- [2-64cc57b09c89%] i.e.c.c.aop.LoggingAspect$Companion       : before :: execution(public inc.evil.coursecatalog.model.Course inc.evil.coursecatalog.service.impl.CourseServiceImpl.findById(int))


这意味着请求成功执行。如果我们尝试为不存在的课程发出相同的请求 - 比如ID为999,我们将在reviews-service中观察到NotFoundException

WARN 33188 --- [actor-tcp-nio-1] .w.g.e.GraphQLExceptionHandler$Companion : Exception while handling request: CourseApiResponse with course_id equal to [999] could not be found!

结论

好了,大家,就到这里吧!相信你现在对Apache Ignite 有了很好的了解。我们深入探讨了使用Ignite 和Spring Boot设计简单的分布式缓存,探讨了Ignite的Spring Data支持、用于保护代码关键部分的分布式锁,最后看到了Apache Ignite的代码部署是如何在集群中执行代码的。
再次强调,如果你错过了,你可以在本文开头的链接中访问我们讨论过的所有代码。

愉快的编码!

推荐阅读: 阿里巴巴面经(4)

本文链接: 使用Spring Boot探索Apache Ignite