Руководство по Stream.reduce ()

1. Обзор

Stream API предоставляет богатый набор промежуточных, редукционных и оконечных функций, которые также поддерживают распараллеливание.

В частности, операции с потоком сокращения позволяют нам получить один единственный результат из последовательности элементов , многократно применяя операцию объединения к элементам в последовательности.

В этом уроке мы рассмотрим общее назначение Stream.reduce () операция и увидеть его в некоторых случаях применения бетона.

2. Ключевые концепции: идентичность, аккумулятор и объединитель.

Прежде чем мы углубимся в использование операции Stream.reduce () , давайте разберем элементы-участники операции на отдельные блоки. Таким образом, нам будет легче понять роль, которую играет каждый:

  • Идентичность - элемент, являющийся начальным значением операции сокращения и результатом по умолчанию, если поток пуст
  • Аккумулятор - функция, которая принимает два параметра: частичный результат операции сокращения и следующий элемент потока
  • Combiner - функция, используемая для объединения частичного результата операции сокращения, когда сокращение распараллеливается, или когда существует несоответствие между типами аргументов аккумулятора и типами реализации аккумулятора.

3. Использование Stream.reduce ()

Чтобы лучше понять функциональность элементов идентификатора, аккумулятора и объединителя, давайте рассмотрим несколько основных примеров:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

В этом случае, целое значение 0 тождественно. Он сохраняет начальное значение операции сокращения, а также результат по умолчанию, когда поток целочисленных значений пуст.

Точно так же лямбда-выражение :

subtotal, element -> subtotal + element

- это аккумулятор , поскольку он принимает частичную сумму целочисленных значений и следующего элемента в потоке.

Чтобы сделать код еще более кратким, мы можем использовать ссылку на метод вместо лямбда-выражения:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Конечно, мы можем использовать операцию reduce () для потоков, содержащих другие типы элементов.

Например, мы можем использовать reduce () для массива элементов String и объединить их в один результат:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

Точно так же мы можем переключиться на версию, которая использует ссылку на метод:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Воспользуемся операцией reduce () для соединения элементов массива букв в верхнем регистре :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

Кроме того, мы можем использовать reduce () в параллельном потоке (подробнее об этом позже):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Когда поток выполняется параллельно, среда выполнения Java разделяет поток на несколько подпотоков. В таких случаях нам нужно использовать функцию для объединения результатов подпотоков в один . Это роль объединителя - в приведенном выше фрагменте это ссылка на метод Integer :: sum .

Как ни странно, этот код не компилируется:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

В этом случае у нас есть поток объектов User , а типы аргументов аккумулятора - Integer и User. Однако реализация аккумулятора представляет собой сумму целых чисел, поэтому компилятор просто не может определить тип параметра пользователя .

Мы можем исправить эту проблему, используя комбайнер:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Проще говоря, если мы используем последовательные потоки и типы аргументов аккумулятора и типы его реализации совпадают, нам не нужно использовать объединитель .

4. Параллельное сокращение

Как мы узнали ранее, мы можем использовать reduce () для распараллеленных потоков.

Когда мы используем распараллеленные потоки, мы должны убедиться, что reduce () или любые другие агрегированные операции, выполняемые с потоками:

  • ассоциативный : на результат не влияет порядок операндов
  • не мешает : операция не влияет на источник данных
  • без состояния и детерминированный : операция не имеет состояния и производит тот же вывод для данного ввода

Мы должны выполнить все эти условия, чтобы не допустить непредсказуемых результатов.

Как и ожидалось, операции, выполняемые с распараллеленными потоками, включая reduce (), выполняются параллельно, что позволяет использовать преимущества многоядерных аппаратных архитектур.

По очевидным причинам распараллеленные потоки намного более производительны, чем последовательные аналоги . Даже в этом случае их может быть слишком много, если операции, применяемые к потоку, не являются дорогостоящими или количество элементов в потоке невелико.

Конечно, параллельные потоки - правильный путь, когда нам нужно работать с большими потоками и выполнять дорогостоящие агрегированные операции.

Давайте создадим простой тест производительности JMH (Java Microbenchmark Harness) и сравним соответствующее время выполнения при использовании операции reduce () в последовательном и параллельном потоках:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).

These are our benchmark results:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Throwing and Handling Exceptions While Reducing

In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Теперь, когда учтены Джон и Джули, давайте воспользуемся Stream.reduce () для вычисления средней оценки для обоих пользователей. В качестве идентификатора давайте вернем новый рейтинг, если наш список ввода пуст :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Если мы посчитаем, то получим средний балл 3,6:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Заключение

В этом руководстве мы узнали, как использовать операцию Stream.reduce () . Кроме того, мы узнали, как выполнять сокращения для последовательных и распараллеленных потоков и как обрабатывать исключения при сокращении .

Как обычно, все примеры кода, показанные в этом руководстве, доступны на GitHub.