当前位置:Java -> JDK 22: Gatherer

JDK 22: Gatherer

如果你看一下JDK-22,你会发现一个非常有趣的东西,名为 461: Stream Gatherers (Preview)。这些 Gatherer 是增强 JDK 中的 Stream API 的一种方式,这个 API 从 2014 年(JDK 8)便存在了。随着时间的推移,这些收集器也得到了增强(你可以通过 Javaalmanac 查找 Collectors 的差异)。JDK 中 提供的收集器 已经涵盖了很多功能,但有时仍然会遇到一些情况,这些收集器的功能可能还不够强大,或者不够灵活,或者生成的代码可能阅读起来较难(更确切地说是难以理解)。最初的想法可能是要求在 JDK 的 Collectors 上做增强,但这将意味着在已有 44 个方法的收集器上再添加更多方法(如果我数对的话)。除了有一个如此特定的问题,值得将其添加到 JDK 中之外。因此,给用户一个根据自己的需求增强 Stream API 的方式可能是更好的解决方案。这也是 JEP 461 的总结:

增强 Stream API 以支持自定义中间操作。
这将允许流水线以不易通过现有内置中间操作方法实现的方式对数据进行转换。这是一个预览 API。

让我们从一个简单的例子开始。我们想要基于字符串的长度对字符串列表进行分组。可以使用现有的 Stream API 和收集器轻松表达如下:

@Test
void usingGroupingBy() {
  var result = Stream.of(
          "123456", "foo", "bar", "baz", 
          "quux", "anton", "egon", "banton")
      .collect(Collectors.groupingBy(String::length));
  System.out.println("result = " + result);
}


上面的输出是:

result = {3=[foo, bar, baz], 4=[quux, egon], 5=[anton], 6=[123456, banton]}


这里没有什么花哨的。另一个例子是在流中使用 distinct()。这可能看起来像这样:

@Test
void exampleDistinctWithNumbers() {
  var integers = List.of(1, 10, 11, 10, 11);
  var result = integers.stream().distinct().toList();
  System.out.println("result = " + result);
}


上面的输出是:

result = [1, 10, 11]


让我们将这两个想法结合起来,创建一个基于长度的 distinct。不幸的是,没有办法改变 distinct,因为它根本没有参数,只有一个单一的实现。好吧,有方法可以实现,应该看起来像这样(是的,这段代码取自 JEP 461):

record DistinctByLength(String str) {
  @Override public boolean equals(Object obj) {
    return obj instanceof DistinctByLength(String other)
           && str.length() == other.length();
  }

  @Override public int hashCode() {
    return str == null ? 0 : Integer.hashCode(str.length());
  }
}

@Test
void example_one() {
  var result = Stream.of(
          "123456", "foo", "bar", "baz", 
          "quux", "anton", "egon", "banton")
      .map(DistinctByLength::new)
      .distinct()
      .map(DistinctByLength::str)
      .toList();
  System.out.println("result = " + result);
}


输出:

result = [123456, foo, quux, anton]


所以,为了实现目标,我们首先要做一个映射.map(DistinctByLength::new),然后.distinct(),最后再映射回字符串.map(DistinctByLength::str)。好吧,它起作用了!能够维护吗?可能可以。灵活性?结果显示,首先到达的元素被输出。如果我想要最后一个呢?或者当存在两个以上的元素时,想要第二个怎么办?如果你可以这样编写代码会不会更容易:

var streamList = List.of(
    "123456", "foo", "bar", "baz",
    "quux", "anton", "egon", "banton");

var result = streamList
    .stream()
    .distinctBy(String::length)
    .toList();


那就太棒了,但是 JEP 461 已经非常接近这样了。这意味着你必须这样写代码:

var streamList = List.of(
    "123456", "foo", "bar", "baz",
    "quux", "anton", "egon", "banton");

var result = streamList
    .stream()
    .gather(distinctBy(String::length))
    .toList();


因此,最终,我们必须在这里实现 distinctBy。那么,它将如何工作?首先看一下 .gather(..) 方法。你可以看到参数使用了接口 Gatherer<T, A, R>,通常包括四个部分。initializerintegratorcombinerfinisher

Integrator

让我们从 integrator 开始。这个部分在每次将流中的元素放入流中时被调用,这意味着 integrator 将看到流中的每个元素。基于实现的方式,它可以对元素做任何事情。为了更好地理解,我们从一个什么都不做的映射的实现开始。让我们称之为 mapNoOp(你可能已经从名称中暗示了 Map No Operation)。可以像这样使用:

@Test
void noOperation_withGathererOf() {
  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);

  var resultList = integerList.stream()
      .gather(Gatherer.of(mapNoOp))
      .toList();
  System.out.println("resultList = " + resultList);
}


基于现有的收集器,也可以实现这个:

@Test
void noOperationMapping() {
  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);

  var resultList = integerList.stream()
      .map(Function.identity())
      .toList();
  System.out.println("resultList = " + resultList);
}


现在让我们看一下基于 Gatherer 的实现:

static final Gatherer.Integrator<Void, Integer, Integer> mapNoOp =
    (state, element, downstream) -> {
      downstream.push(element);
      return true;
    };


因此,我们只需要实现一个 integrator。integrator 将通过element消耗元素,然后通过downstream.push(element)产生元素。这里的return true负责告诉流是否消耗更多元素,或者不继续消耗更多元素通过return false。目前,state 在这里根本没有被使用(我为了更好的理解保留了state。可以用下划线(_)替换,以清楚地表明它没有被使用,参见
JEP-456)。正如之前所述,要使用它,你需要使用Gatherer.of(mapNoOp)。这可以做得更方便一些:

static <T> Gatherer<T, ?, T> mapNoOp() {
  Gatherer.Integrator<Void, T, T> integrator = (_, element, downstream) -> {
    downstream.push(element);
    return true;
  };
  return Gatherer.ofSequential(integrator);
}


这也使得通用化更加容易。所以,现在你可以非常简单地使用这个:

@Test
void noOperation_Integration() {
  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);

  var resultList = integerList.stream()
      .gather(mapNoOp())
      .toList();
  System.out.println("resultList = " + resultList);
}


所以,根据这一点,使用它与其他类型是没有问题的:

@Test
void noOperation_IntegrationDifferentType() {
  var integerList = List.of("1", "2", "3", "4", "5", "6", "7", "8");

  var resultList = integerList.stream()
      .gather(mapNoOp())
      .toList();
  System.out.println("resultList = " + resultList);
}


这变得非常方便。

初始化器和完成器

那么,回到我们最初的想法。如果重新考虑在开头使用groupingBydistinct的例子。groupingBy在发出最终结果之前需要在内部存储中间结果。另外,distinct部分需要记住哪些元素已经经过流并且已经存在。这时候initializer就发挥作用了,或者说integrator中的state变得重要。所以,要牢记的事情是initializer的意图,它在创建这样的内存,而state则是访问那个内存的接口。实现看起来像这样:

static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
  //
  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
    A apply = classifier.apply(element);
    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
    return true;
  };
  //
  return Gatherer.ofSequential(initializer, integrator);
}


那么,在这里我们要做什么?基于initializer,我们创建一个内存(HashMap),它根据键classifier存储元素。classifier类似于String::length。因此我们有一个HashMap<Integer,List<String>>。在integrator的迭代中将会填充这个内存。首先应用classifier(其结果是长度),然后将element(String)添加到HashMap的列表中。如果你仔细研究上面的实现,你可能会发现问题。它会消耗流中的所有元素,但永远不会发布任何元素。所以,我们必须考虑在什么情况下我们希望发布元素。如果HashMap包含一个元素,其中一个条目在其中,或者用代码表示:

static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
  //
  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
    A apply = classifier.apply(element);
    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
    if (state.get(apply).size() == 1) {
      downstream.push(element);
    }
    return true;
  };
  //
  return Gatherer.ofSequential(initializer, integrator);
}


使用看起来像这样:

@Test
void usingDistinctByExample() {
  var streamList = List.of(
      "123456", "foo", "bar", "baz",
      "quux", "anton", "egon", "banton");

  var result = streamList
      .stream()
      .gather(distinctBy(String::length))
      .toList();

  System.out.println("result = " + result);
}


输出是这样的:

result = [123456, foo, quux, anton]


这意味着原始的需求已经得到解决。转到下一个需求。发出出现的最后一个元素。这意味着我们必须确保整个流已经完成,或者换句话说,所有元素已经被查看。这仅凭integrator是不可解决的,此时需要finisher

完成器

完成器在整个流结束前调用,可以说是在流结束之前。finisher定义为BiConsumer<A, Downstream<? super R>> finisher。这意味着,换句话说,第一部分是state,当然还有downstream,这让我们有机会在它的最终状态中发布元素。这同时也意味着所有的流元素都已经通过了integrator

static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
  //
  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
    A apply = classifier.apply(element);
    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
    return true;
  };
  //
  BiConsumer<HashMap<A, List<T>>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
    state.forEach((_, value) -> downstream.push(value.getLast()));
  };
  //
  return Gatherer.ofSequential(initializer, integrator, finisher);
}


因此上述定义现在可以像以前一样使用,唯一的不同之处是它将发布最后的元素而不是第一个元素,看起来像这样:

result = [baz, egon, anton, banton]


同样,我表达的另一个需求(第二个元素等)也可以很容易地通过finisher解决。我把这留作温柔读者的练习。

组合器

让我们来看最后一个,combiner。这负责组合(你猜对了;-)不同状态的状态,这只能在并行流中运行收集器时发生。正如你可能已经意识到的,我使用了return Gatherer.ofSequential(initializer, integrator, finisher);这意味着顺序处理(那不是显而易见的吗?)。我选择了一个不同的需求来演示组合器的使用。我们应该尝试在列表或流中选择重复的元素。好吧,让我们看一个简单的解决方案(我打赌还有其他或者更好的解决方案):

@Test
void exampleFindDuplicates() {
  var integers = List.of(100, 1, 10, 11, 5, 10, 11, 5, 100, 75, 78, 90);
  var duplicates = findDuplicates(integers);
  System.out.println("duplicates = " + duplicates);
}

List<Integer> findDuplicates(List<Integer> givenList) {
  long count = givenList.stream().distinct().count();
  if (count < givenList.size()) {
    return givenList.stream().filter(i -> Collections.frequency(givenList, i) > 1)
        .distinct().toList();
  } else {
    return List.of();
  }
}


这可以通过Gathers以更一般和更方便的方式来在流中使用:

static <T> Gatherer<? super T, ?, T> duplicatesWithoutCombiner() {
  Supplier<HashMap<T, Integer>> initializer = HashMap::new;
  //
  Gatherer.Integrator<HashMap<T, Integer>, T, T> integrator = (state, element, _) -> {
    var orDefault = state.getOrDefault(element, 0);
    state.put(element, orDefault + 1);
    return true;
  };
  //
  BiConsumer<HashMap<T, Integer>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
    state.forEach((k, v) -> {
      if (v >= 2) {
        downstream.push(k);
      }
    });
  };
  //
  return Gatherer.ofSequential(initializer, integrator, finisher);
}


这个解决方案使用初始化器创建内部状态,它是一个HashMap<T, Integer>。类型T代表流使用的类型。这个Integer只是为了计算出现次数。集成器不发布任何值,因为集成器不能确定是否会有更多的元素到来。这意味着结果的最终发布只能在完成器中完成。当然,只有当出现次数大于或等于两次时才有用。如果你在定义的数字集上运行它,你将得到:

resultList = [100, 5, 10, 11]


现在让我们来看一下使用combiner的解决方案:

static <T> Gatherer<? super T, ?, T> duplicates() {
  Supplier<HashMap<T, Integer>> initializer = HashMap::new;
  //
  Gatherer.Integrator<HashMap<T, Integer>, T, T> integrator = (state, element, _) -> {
    var orDefault = state.getOrDefault(element, 0);
    state.put(element, orDefault + 1);
    return true;
  };
  //
  BiConsumer<HashMap<T, Integer>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
    state.forEach((k, v) -> {
      if (v >= 2) {
        downstream.push(k);
      }
    });
  };
  //
  BinaryOperator<HashMap<T, Integer>> combiner = (s1, s2) -> {
    s1.forEach((k, v) -> {
      var s1def = s2.getOrDefault(k, 0);
      s2.put(k, v + s1def);
    });
    return s2;
  };
  //
  return Gatherer.of(initializer, integrator, combiner, finisher);
}


有两个不同之处。首先,最后一行使用return Gatherer.of(initializer, integrator, combiner, finisher);而不是之前提到的... ofSequential(..),当然,还有combiner本身的实现。Combiner的意思就是一个合并器,在这里如果你返回s2或者s1都不会有任何区别。当然,你应该相应地做出更改实现如下:

BinaryOperator<HashMap<T, Integer>> combiner = (s1, s2) -> {
  s2.forEach((k, v) -> {
    var s1def = s1.getOrDefault(k, 0);
    s1.put(k, v + s1def);
  });
  return s1;
};


你要确保这两个状态的合并得到正确的结果。最后,你可以这样使用gatherer:

@Test
void exampleFindDuplicatesWithGathererCombiner() {
  var integers = List.of(100, 1, 10, 11, 5, 10, 11, 5, 100, 75, 78, 90);
  var resultList = integers.parallelStream().gather(duplicates()).toList();

  assertThat(resultList).containsExactlyInAnyOrder(100, 10, 11, 5);
}


如果你想看到combiner的工作原理,你可以简单地在那里放置一个打印语句(或者最好使用你的IDE调试器)并看到它被调用了几次。

我认为这是JDK的一个很好的增强,使其更方便根据Stream API定制所需的功能。不幸的是,我已经意识到IntStreamLongStreamDoubleStream并没有包含这样的增强。目前看来,这三个接口只是缺少一个附加方法gather而已。或者我可能误解了添加这样简单的新增功能的复杂性。这里的示例代码可以在Gatherer找到。

祝你编码愉快。

推荐阅读: 为什么应届生的身份这么值钱?

本文链接: JDK 22: Gatherer