Java8 - Stream 으로 데이터 수집 (2): Partitioning, Collector 인터페이스, Custom Collector


이 포스트에서는 아래 내용에 대해 알아본다.

  • 데이터 분할
  • 커스텀 컬렉터

소스는 github 에 있습니다.


목차


1. 분할: partitioningBy()

분할 함수인 Collectors.partitioningBy() 는 Predicate<T> 를 분류 함수로 사용하는 그룹화 기능으로, boolean 을 반환하기 때문에 맵의 키 형식은 Boolean 이다.
따라서 그룹화 맵은 true, false 최대 2 개의 그룹으로 분류된다.

private static Map<Boolean, List<Dish>> partitioningByVegetarian() {
  return Dish.menu.stream().collect(Collectors.partitioningBy(Dish::isVegetarian));
}

// partitioningBy() 사용하여 분류
Map<Boolean, List<Dish>> partitionMenu = partitioningByVegetarian();
System.out.println(partitionMenu);
// {false=[pork, beef, chicken, prawns, salmon], true=[french fries, rice, season fruit, pizza]}

// 위에서 채식자만 분류
List<Dish> vegetarian = partitionMenu.get(true);
System.out.println(vegetarian);
// [french fries, rice, season fruit, pizza]

List<Dish> notVegetarian = partitionMenu.get(false);
System.out.println(notVegetarian);
// [pork, beef, chicken, prawns, salmon]

filter() 를 이용해서도 같은 결과를 얻을 수 있다.

private static List<Dish> vegetarianByFilter() {
  return Dish.menu.stream().filter(Dish::isVegetarian).collect(Collectors.toList());
}

// 필터로 채식 확인
System.out.println(vegetarianByFilter());
// [french fries, rice, season fruit, pizza]

컬렉터를 두 번째 인수로 전달하여 다수준 분할도 가능하다.

// 채식/비채식 스트림을 각각 요리 타입으로 그룹화
private static Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianByType() {
  return Dish.menu.stream().collect(
          partitioningBy(Dish::isVegetarian,  // 분할 함수
          groupingBy(Dish::getType) // 두 번째 컬렉터
          )
  );
}
{false={FISH=[prawns, salmon], MEAT=[pork, beef, chicken]}, true={OTHER=[french fries, rice, season fruit, pizza]}}

Collectors.maxBy() 를 이용하여 아래처럼 채식/비채식 중 칼로리가 가장 높은 요리를 찾을 수도 있다.

// 채식/비채식 중 가장 높은 칼로리
private static Map<Boolean, Dish> mostCaloricPartitioningBy() {
  return Dish.menu.stream().collect(
          partitioningBy(Dish::isVegetarian,
                  collectingAndThen(
                          maxBy(Comparator.comparingInt(Dish::getCalories)),
                          Optional::get
                  )
  ));
}
{false=pork, true=pizza}

채식/비채식 중 칼로리가 500 이상인 요리는 아래와 같이 찾을 수 있다.

// 채식/비채식 중 칼로리가 500 이상인 요리
private static Map<Boolean, Map<Boolean, List<Dish>>> partitioningByVegetarianAndCaloric() {
  return Dish.menu.stream().collect(
          partitioningBy(Dish::isVegetarian,
                  partitioningBy(d -> d.getCalories() > 500))
  );
}
{false={false=[chicken, prawns, salmon], true=[pork, beef]}, true={false=[rice, season fruit], true=[french fries, pizza]}}

Collectors.counting() 을 이용하여 채식/비채식 각 항목의 개수는 아래와 같이 구할 수 있다.

// 채식/비채식 각 항목의 개수
private static Map<Boolean, Long> partitioningByVegetarianCount() {
  return Dish.menu.stream().collect(
          partitioningBy(Dish::isVegetarian, counting())
  );
}
{false=5, true=4}

관련된 퀴즈는 Java8 - Stream 으로 데이터 수집 (2): Quiz 를 보세요.


1.1. Collectors 클래스의 static factory 메서드

지금까지 사용한 Collectors 의 static factory 메서드들은 아래와 같다.

  • toList()
    • 스트림의 모든 항목을 리스트 형태로 수집
  • toSet()
    • 스트림의 모든 항목을 중복없는 항목으로 수집
  • toCollection()
    • 스트림의 모든 항목을 공급자가 제공하는 컬렉션으로 수집
    • stream.collect(toCollection(), ArrayList::new)
  • counting()
    • 스트림의 항목 수 계산
    • stream.collect(counting())
  • summingInt()
    • 스트림의 항목에서 정수 프로퍼티값 더함
    • stream.collect(summingInt(Dish::getCalories))
  • averagingInt()
  • summarizingInt()
    • 스트림 내의 항목의 최대값, 최소값, 합계, 평균 등의 정수 정보 통계 수집
    • stream.collect(summarizingInt(Dish::getCalories))
  • joining()
    • 스트림 각 항목에 toString() 을 호출한 결과 문자열 연결
    • stream.map(Dish::getName).collect(joining(“,”))
  • maxBy()
    • 주어진 비교자를 이용해서 스트림의 최대값 요소를 Optional 로 감싼 값 반환, 스트림 요소가 없을 땐 Optional.empty() 반환
    • stream.collect(maxBy(comparingInt(Dish::getCalories)))
  • minBy()
  • reducing()
    • 누적자를 초기값으로 설정한 후 BinaryOperator<T> (T,T) -> T 로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱
    • stream.collect(reducing(0, Dish::getCalories, Integer::sum))
  • collectingAndThen()
    • 다른 컬렉터를 감싸고, 그 결과에 변환 함수를 적용
    • stream.collect(collectingAndThen(toList(), List::size))
  • groupingBy()
    • 하나의 프로퍼티값을 기준으로 스트림의 항목을 그룹화하며, 기준 프로퍼티값을 맵의 키로 사용
    • stream.collect(groupingBy(Dish::getType))
  • partitioningBy()
    • Predicate<T> 를 스트림의 각 항목에 적용한 결과로 항목을 분할
    • stream.collect(partitioningBy(Dish::isVegetarian))

2. Collector 인터페이스

위에서 설명한 모든 컬렉터들은 Collector 인터페이스를 구현한다.

Collector 인터페이스는 리듀싱 연산(=컬렉터) 를 어떻게 구현할 지 제공하는 메서드 집합으로 구성된다.
toList(), groupingBy() 등은 Collector 인터페이스를 구현하는 컬렉터 중 하나이다.

아래에서는 toList() 가 어떻게 구현되어 있는지 살펴보면서 Collector 인터페이스는 어떻게 정의되어 있고, 내부적으로 collect() 메서드는 toList() 가 반환하는 함수를 어떻게 활용하는지 알아본다.

Collector 인터페이스 시그니처와 메서드 정의

public interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();

    ...
  
    enum Characteristics {
        CONCURRENT,
        UNORDERED,
        IDENTITY_FINISH
    }
}
  • T
    • 수집될 스트림 항목의 제네릭 형식 (= 스트림 요소의 형식)
  • A
    • 누적자, 수집 과정에서 중간 결과를 누적하는 객체의 정의
  • R
    • 수집 연산 결과 객체의 형식 (= collect 연산의 최종 결과 형식)

<함수 설명>

  • supplier(), accumulator(), combiner(), finisher()
    • collect() 메서드에서 실행하는 함수를 반환
  • characteristics()
    • collect() 메서드가 어떤 최적화(병렬화 같은..)를 이용해서 리듀싱 연산을 수행할 것인지 결정하도록 돕는 힌트 특성 집합 제공

각 함수형 인터페이스는 Java8 - 람다 표현식 (1): 함수형 인터페이스, 형식 검사 를 참고하세요.


2.1. Collector 인터페이스 메서드

각 메서드를 살펴보면서 ToListCollector 라는 커스텀 컬렉터를 구현해본다.


2.1.1. supplier(): 새로운 결과 컨테이터 생성

supplier() 는 빈 결과로 이루어진 Supplier<A> 를 반환한다. (= 수집 과정에서 빈 누적자 인스턴스를 만드는, 파라메터가 없는 함수)

Supplier<A> supplier();

Supplier<A>

  • 매개값은 없고, 반환값은 있음
  • 데이터를 공급해주는 역할
  • A get() 추상 메서드 가짐
  • 함수 디스크립터: () -> A

ToListCollector 커스텀 컬렉터에서는 아래와 같이 구현한다.

public Supplier<List<T>> supplier() {
  //return () -> new ArrayList<>();
  return ArrayList::new;  // 생성자 레퍼런스 전달 방식
}

2.1.2. accumulator(): 결과 컨테이너에 요소 추가

accumulator() 메서드는 리듀싱 연산을 수행하는 함수를 반환한다.
스트림에서 n 번째 요소를 탐색할 때 두 개의 인수인 누적자와 n 번째 요소를 함수에 적용한다.
함수의 반환값은 void 이다. (= 요소를 탐색하면서 적용하는 함수에 의해 누적자 내부 상태가 변경됨)

BiConsumer<A, T> accumulator();

BiConsumer<A, T>

  • 매개값은 있고, 반환값은 없음
  • 리턴이 되지 않고 함수 내에서 사용 후 끝
  • void accept(A a, T t) 추상 메서드 가짐
  • 함수 디스크립터: (A,T) -> void

ToListCollector 커스텀 컬렉터에서는 이미 탐색한 항목을 포함하는 리스트에 현재 항목을 추가하도록 구현한다.

public BiConsumer<List<T>, T> accumulator() {
  //return (list, item) -> list.add(item);
  return List::add; // 메서드 레퍼런스 방식
}

메서드 레퍼런스는 Java8 - 람다 표현식 (2): 메서드 레퍼런스, 람다 표현식과 메서드의 조합1. 메서드 레퍼런스 를 참고하세요.


2.1.3. finisher(): 최종 변환값을 결과 컨테이너로 적용

finisher() 메서드는 스트림 탐색을 끝내고 누적자 객체를 최종 결과로 변환하면서 누적 과정을 끝낼 때 호출할 함수를 반환한다.
때로는 아래 ToListCollector 커스텀 컬렉터처럼 누적자가 객체가 이미 최종 결과인 상황도 있는데 이 때는 변환 과정이 필요하지 않으므로 finisher() 메서드는 항등 함수를 반환한다.

Function<A, R> finisher();

Function<A, R>

  • 매개값과 리턴값 있음
  • 주로 매개값을 반환값으로 매핑할 때 (=타입 변환이 목적일 때) 사용
  • R apply(A a) 추상 메서드 가짐
  • 함수 디스크립터: A -> R

ToListCollector 커스텀 컬렉터에서는 아래와 같이 구현한다.

public Function<List<T>, List<T>> finisher() {
  //return i -> i;
  return Function.identity(); // 항등 함수 반환
}

2.1.4. combiner(): 두 결과 컨테이너 병합

combiner() 메서드는 리듀싱 연산에서 사용할 함수를 반환한다.
스트림의 서로 다른 서브 파트를 병렬로 처리할 때 누적자가 이 결과를 어떻게 처리할 지 정의한다.
toList() 의 combiner() 는 스트림의 두 번째 서브파트에서 수집한 항목을 리스트의 첫 번째 서브파트 결과 뒤에 추가하면 된다.

BinaryOperator<A> combiner();

BinaryOperator<A>

  • 매개값과 리턴값 있음
  • 주로 매개값을 연산하여 동일한 타입의 결과를 반환할 때 사용
  • 입력을 연산하여 동일 타입의 출력으로 리턴
  • T apply(A a1, A a2) 추상 메서드 가짐
  • 함수 디스크립터: (A,A) -> A

combiner() 메서드를 이용하면 스트림의 리듀싱을 병렬로 수행할 수 있다.
(스트림의 리듀싱을 병렬 수행할 때는 Java7 의 포크/조인 프레임워크와 Spliterator 를 사용함)

포크/조인 프레임워크와 Spliterator 는 각각 Java8 - Stream 으로 병렬 데이터 처리 (1): 병렬 스트림, 포크/조인 프레임워크Java8 - Stream 으로 병렬 데이터 처리 (2): Spliterator 인터페이스 를 참고하세요.

  • 스트림 병렬 리듀싱 수행 과정
    • 스트림을 분할해야 하는지 정의하는 조건이 false 가 되기 전까지 원래 스트림을 재귀적으로 분할
      (분산된 작업의 크기가 너무 작아지면 병렬 수행 속도는 순차 수행 속도보다 느려진다, 일반적으로 프로세싱 코어의 개수를 초과하는 병렬 작업은 효율적이지 않음)
    • 모든 서브 스트림의 각 요소에 리듀싱 연산을 순차적으로 적용해서 서브 스트림을 병렬로 처리
    • 마지막에 컬렉터의 combiner() 메서드가 반환하는 함수로 모든 부분 결과를 쌍으로 합침 (= 분할된 모든 서브 스트림의 결과를 합치면서 연산이 완료됨)

ToListCollector 커스텀 컬렉터에서는 아래와 같이 구현한다.

public BinaryOperator<List<T>> combiner() {
  return (list1, list2) -> {
    list1.addAll(list2);
    return list1;
  };
}

2.1.5. characteristics()

characteristics() 메서드는 컬렉터의 연산을 정의하는 Characteristics 형식의 불변 집합을 반환한다.

Set<Characteristics> characteristics();

enum Characteristics {
    CONCURRENT,
    UNORDERED,
    IDENTITY_FINISH
}

Characteristics 은 스트림을 병렬로 리듀스할 것인지, 병렬로 리듀스한다면 어떤 최적화를 선택해야 할 지 힌트를 제공한다.

  • CONCURRENT
    • 다중 스레드에서 accumulator() 함수를 동시에 호출할 수 있으며 이 컬렉터는 스트림의 병렬 리듀싱 수행 가능
    • 컬렉터의 플래그에 UNORDERED 를 함께 설정하지 않으면 데이터 소스가 정렬되어 있지 않은 상황(집합처럼 요소의 순서가 무의미한) 에서만 병렬 리듀싱 수행 가능
  • UNORDERED
    • 리듀싱 결과는 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않음
  • IDENTITY_FINISH
    • finisher() 메서드가 반환하는 함수가 단순히 identity() 기능 (=항등 기능) 이며 생략될 수 있음을 나타냄
    • 따라서 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용 가능
    • 만일 IDENTITY_FINISH 가 설정되면 누적자 A 는 결과 R 로 안전하게 형변환되어야 한다.

피니셔 기능이 항등 기능이며 생략될 수 있음을 나타냅니다. 설정된 경우 A에서 R로의 확인되지 않은 캐스트가 성공하는 경우여야 합니다.

ToListCollector 커스텀 컬렉터에서는 스트림의 요소를 누적하는데 사용한 리스트가 최종 결과 형식이므로 추가 변환이 필요없다. 따라서 ToListCollector 는 IDENTITY_FINISH 이고, 순서도 상관이 없으므로 UNORDERED 이다. 또한 CONCURRENT 이다.

요소의 순서가 무의미한 데이터 소스이어야 병렬 실행 가능

ToListCollector 커스텀 컬렉터에서는 아래와 같이 구현한다.

public Set<Characteristics> characteristics() {
  return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
}

위에서 본 ToListCollector 커스텀 컬렉터의 전체 소스는 아래와 같다.

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
  // 새로운 결과 컨테이터 생성
  @Override
  public Supplier<List<T>> supplier() {
    //return () -> new ArrayList<>();
    return ArrayList::new;  // 생성자 레퍼런스 전달 방식
  }

  // 결과 컨테이너에 요소 추가
  @Override
  public BiConsumer<List<T>, T> accumulator() {
    //return (list, item) -> list.add(item);
    return List::add; // 메서드 레퍼런스 방식
  }

  // 두 결과 컨테이너 병합
  @Override
  public BinaryOperator<List<T>> combiner() {
    return (list1, list2) -> {
      list1.addAll(list2);
      return list1;
    };
  }

  // 최종 변환값을 결과 컨테이너로 적용
  @Override
  public Function<List<T>, List<T>> finisher() {
    //return i -> i;
    return Function.identity(); // 항등 함수 반환
  }

  // 컬렉터의 플래그를 IDENTITY_FINISH, CONCURRENT 로 설정
  @Override
  public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
  }
}

이제 toList() 대신 ToListCollector() 사용이 가능하다.
toList() 는 factory 이고, ToListCollector 는 new 로 인스턴스화한다는 점이 다르다.

List<Dish> dishes = Dish.menu.stream().collect(new ToListCollector<Dish>());
List<Dish> dishes2 = Dish.menu.stream().collect(Collectors.toList());

// [pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon]
System.out.println(dishes);
System.out.println(dishes2);

바로 위에선 ToListCollector 커스텀 컬렉터를 구현하여 커스텀하게 데이터를 수집했지만, 커스텀 컬렉터를 구현하지 않고도 커스텀하게 데이터를 수집할 수 있다.

// 커스텀 컬렉터 구현하지 않고 커스텀하게 데이터 수집
List<Dish> dishes3 = Dish.menu.stream().collect(
        ArrayList::new, // supplier
        List::add,  // accumulator
        List::addAll  // combiner
);

// [pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon]
System.out.println(dishes3);

커스텀 컬렉터를 구현하는 것보다 코드가 간결하지만 가독성이 떨어지고, 적절한 클래스로 커스텀 컬렉터를 구현하는 것이 중복을 피하고 재사용성을 높이는 데 도움이 된다.

또한 이런 방식으로는 collect() 메서드에 Characteristics 를 전달할 수 없기 때문에 위와 같이 사용하게 되면 collect() 메서드는 IDENTITY_FINISH 와 CONCURRENT 이지만 UNORDERED 는 아닌 컬렉터로만 동작한다.


3. 커스텀 컬렉터

Java8 - Stream 으로 데이터 수집 (2): Quiz 에서 Collectors 클래스가 제공하는 static factory 메서드를 이용하여 정수 n 을 입력받았을 때 2~n 까지의 자연수를 소수와 비소수로 나누었다.

System.out.println("Primes and NonPrime: " + partitionPrimes(10));

// 주어진 수가 소수인지 판단
private static boolean isPrime(int candidate) {
  // 소수의 대상은 주어진 수의 제곱근 이하로 제한
  int candidateRoot = (int) Math.sqrt(candidate);
  return IntStream.rangeClosed(2, candidateRoot)
          .noneMatch(i -> candidate % i == 0);
}

private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
  return IntStream.rangeClosed(2, n)  // IntStream 반환
    .boxed()  // Stream<Integer> 반환
    .collect(
            partitioningBy(i -> isPrime(i))
  );
}

이제 커스텀 컬렉터를 이용하여 성능을 개선해보도록 한다.


3.1. 소수로만 나누기

소수로 나누어떨어지는지 확인해서 대상의 범위를 좁힐 수 있다.
devisor 가 소수가 아니면 소용이 없기 때문에 devisor 를 현재 숫자 이하에서 발견한 소수로 제한할 수 있다.
또한 현재 숫자가 소수인지 판단해야 하는데 그러기 위해선 지금까지 발견한 소수 리스트에 접근이 가능해야 한다. 하지만 Java8 - Stream 으로 데이터 수집 (2): Quiz 의 방식으로는 컬렉터 수집 과정에서 부분 결과에 접근할 수 없다.

isPrime() 메서드로 중간 결과 리스트가 있을 경우 그 결과를 전달하도록 수정한다.

// 소수인지 확인하는 Predicate, 중간 결과 리스트를 전달받을 수 있음
public static boolean isPrime(List<Integer> primes, int candidate) {
  return primes.stream().noneMatch(i -> candidate % i == 0);
}

주어진 수의 제곱근보다 작은 소수만 사용하도록 하기 위해 다음 소수가 주어진 수의 루트보다 커지면 소수로 나누는 검사를 멈춰야 하는데 Stream API 에는 중간에 멈출 수 있는 기능이 없다.
filter(p -> p <= candidateRoot) 를 이용해서 주어진 수의 루트보다 작은 소수를 필터링할 수는 있지만 filter 는 전체 스트림을 처리한 후 결과를 반환하기 때문에 소스 리스트와 대상 숫자 범위가 커지면 성능 이슈가 발생할 수 있다.

그럼 이제 주어진 수의 제곱근보다 큰 소수를 찾으면 검사를 중단하도록 하자.

// 리스트와 Predicate 를 인수로 받아서 리스트의 첫 요소에서 시작해서 Predicate 를 만족하는 가장 긴 요소로 이루어진 리스트 반환
public static <A> List<A> customTakeWhile(List<A> list, Predicate<A> p) {
  int i = 0;
  for (A item: list) {
    if (!p.test(item)) {
      // 리스트의 현재 요소가 Predicate 를 만족하지 않으면 검사한 항목의 앞쪽에 위치한 서브리스트 반환
      return list.subList(0, i);
    }
    i++;
  }
  return list;
}

위 메서드를 이용해서 isPrime() 이 자신의 제곱근보다 작은 소수만 찾도록 최적화한다.

// 소수인지 확인하는 Predicate, 중간 결과 리스트를 전달받을 수 있음
public static boolean isPrime(List<Integer> primes, int candidate) {
  double candidateRoot = (int) Math.sqrt(candidate);
  return customTakeWhile(primes, i -> i <= candidateRoot) // List<Integer> 반환
          .stream() // Stream<Integer> 반환
          .noneMatch(p -> candidate % p == 0);
}

Stream 의 takeWhile(Predicate p) 를 이용하여 위 코드를 다시 구현해보자.

// 소수인지 확인하는 Predicate, 중간 결과 리스트를 전달받을 수 있음
public static boolean isPrime(List<Integer> primes, int candidate) {
  double candidateRoot = (int) Math.sqrt(candidate);
  return primes.stream()
          .takeWhile(i -> i <= candidateRoot)
          .noneMatch(i -> candidate % i == 0);
}

Stream.takeWhile(Predicate p) 과 Stream.filter(Predicate p) 차이
filter 는 조건에 대해 모두 검사하면서 true 를 반환
takeWhile 은 조건에 대해 true 가 아닐 경우 바로 결과 반환

// 2,4,6,8
Stream.of(2,3,4,5,6,7,8)
      .filter(n -> n % 2 == 0)
      .forEach(System.out::println);

// 2
Stream.of(2,3,4,5,6,7,8)
        .takeWhile(n -> n % 2 == 0)
        .forEach(System.out::println);

이제 Collector 인터페이스를 구현하는 클래스 선언 후 Collector 인터페이스의 5개 메서드를 구현하여 커스텀 컬렉터를 구현한다.

정수로 이루어진 스트림에서 누적자와 최종 결과의 형식이 Java8 - Stream 으로 데이터 수집 (2): Quiz 의 partitionPrimes(int n) 의 리턴값과 동일하도록 Map<Boolean, List> 인 컬렉터를 구현한다.

커스텀 컬렉터 클래스 형식

public class PrimeNumbersCustomCollector
                    implements Collector<Integer  // 스트림 요소의 형식
                                        , Map<Boolean, List<Integer>> // 누적자 형식
                                        , Map<Boolean, List<Integer>>> {  // 수집 연산의 결과 형식
  ...
}

이제 리듀싱 연산을 구현한다.

변경 가능한 새로운 결과 컨테이너를 생성 후 반환하는 supplier() 는 누적자를 만드는 함수를 반환하도록 한다.

/**
 * A function that creates and returns a new mutable result container.
 * 2 개의 빈 리스트를 포함하는 맵으로 수집 동작 시작
 * 
 * @return a function which returns a new, mutable result container
 *        새롭고 변경 가능한 결과 컨테이너를 반환하는 함수 반환
 *        누적자를 만드는 함수를 반환
 */
@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
  // 누적자로 사용할 맵을 만들면서 true, false 키와 빈 리스트로 초기화 진행
  // (수집 과정에서 빈 리스트에 각각 소수와 비소수 추가)
  return () -> new HashMap<Boolean, List<Integer>>() {
    {
      put(true, new ArrayList<Integer>());
      put(false, new ArrayList<Integer>());
    }
  };
}

이제 스트림 요소를 어떻게 수집할 지 경정하는 accumulator() 를 구현한다.
이 메서드 구현에 의해 수집 과정의 중간 결과, 즉 지금까지 발견한 소수를 포함하는 누적자에 접근할 수 있다.

/**
 * A function that folds a value into a mutable result container.
 *
 * @return a function which folds a value into a mutable result container
 *        값을 변경 가능한 결과 컨테이너로 넣는 함수 반환
 */
@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
  return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
                  acc.get(CustomCollector.isPrime(acc.get(true) // 지금까지 발견한 소수 리스트를 isPrime() 에 전달
                          , candidate))  // isPrime() 결과에 따라 소수 리스트와 비소수 리스트 만듬
                     .add(candidate); // candidate 를 알맞은 리스트에 추가
  };
}

병렬 수집 과정에서 두 부분의 누적자를 합치는 combiner() 를 구현해야 하는데 지금 하는 예시의 알고리즘은 순차적이어서 컬렉터를 실제로 병렬 사용할 수 없으니 빈 구현으로 남겨둔다.


최종 변환값을 결과 컨테이너로 적용하는 finisher() 를 구현한다.
accumulator() 의 형식은 컬렉터 결과 형식과 같으므로 변환 과정이 필요없기 때문에 항등 함수인 identity 를 반환하도록 구현한다.

/**
 * Perform the final transformation from the intermediate accumulation type
 * {@code A} to the final result type {@code R}.
 *
 * <p>If the characteristic {@code IDENTITY_FINISH} is
 * set, this function may be presumed to be an identity transform with an
 * unchecked cast from {@code A} to {@code R}.
 *
 * @return a function which transforms the intermediate result to the final result
 *        중간 결과를 최종 결과로 변환하는 함수 반환
 */
@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
  // 최종 수집 과정에서 데이터 변환이 필요하지 않으므로 항등 함수 반환
  return Function.identity();
}

이제 컬렉터의 연산을 정의하는 characteristics() 를 구현한다.
이 커스텀 컬렉터는 CONCURRENT 도 아니고, UNORDERED 도 아니지만, IDENTITY_FINISH 이므로 아래처럼 구현한다.

/**
 * Returns a {@code Set} of {@code Collector.Characteristics} indicating
 * the characteristics of this Collector.  This set should be immutable.
 *
 * @return an immutable set of collector characteristics
 */
@Override
public Set<Characteristics> characteristics() {
  return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
}

이제 partitioningBy() 를 사용하던 기존의 partitionPrimes() 함수를 아래와 같이 partitioningBy() 대신 커스텀 컬렉터를 사용하도록 변경할 수 있다.

// 소수와 비소수 구분
private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
  return IntStream.rangeClosed(2, n)  // IntStream 반환
          .boxed()  // Stream<Integer> 반환
          .collect(new PrimeNumbersCustomCollector());
}

최종 구현 코드
CustomCollector.java

import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

public class CustomCollector {
  public static void main(String[] args) {
    System.out.println("Primes and NonPrime: " + partitionPrimes(10));
  }

  // 소수인지 확인하는 Predicate, 중간 결과 리스트를 전달받을 수 있음
  public static boolean isPrime(List<Integer> primes, int candidate) {
    double candidateRoot = (int) Math.sqrt(candidate);
    return primes.stream()
            .takeWhile(i -> i <= candidateRoot)
            .noneMatch(i -> candidate % i == 0);
  }

  // 소수와 비소수 구분
  private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
    return IntStream.rangeClosed(2, n)  // IntStream 반환
            .boxed()  // Stream<Integer> 반환
            .collect(new PrimeNumbersCustomCollector());
  }
}

PrimeNumbersCustomCollector.java

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH;

public class PrimeNumbersCustomCollector
                    implements Collector<Integer  // 스트림 요소의 형식
                                        , Map<Boolean, List<Integer>> // 누적자 형식
                                        , Map<Boolean, List<Integer>>> {
  /**
   * A function that creates and returns a new mutable result container.
   * 2 개의 빈 리스트를 포함하는 맵으로 수집 동작 시작
   *
   * @return a function which returns a new, mutable result container
   *        새롭고 변경 가능한 결과 컨테이너를 반환하는 함수 반환
   *        누적자를 만드는 함수를 반환
   */
  @Override
  public Supplier<Map<Boolean, List<Integer>>> supplier() {
    // 누적자로 사용할 맵을 만들면서 true, false 키와 빈 리스트로 초기화 진행
    // (수집 과정에서 빈 리스트에 각각 소수와 비소수 추가)
    return () -> new HashMap<Boolean, List<Integer>>() {
      {
        put(true, new ArrayList<Integer>());
        put(false, new ArrayList<Integer>());
      }
    };
  }

  /**
   * A function that folds a value into a mutable result container.
   *
   * @return a function which folds a value into a mutable result container
   *        값을 변경 가능한 결과 컨테이너로 넣는 함수 반환
   */
  @Override
  public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
    return (Map<Boolean, List<Integer>> acc, Integer candidate) -> {
                    acc.get(CustomCollector.isPrime(acc.get(true) // 지금까지 발견한 소수 리스트를 isPrime() 에 전달
                           , candidate))  // isPrime() 결과에 따라 소수 리스트와 비소수 리스트 만듬
                       .add(candidate); // candidate 를 알맞은 리스트에 추가
    };
  }

  /**
   * A function that accepts two partial results and merges them.  The
   * combiner function may fold state from one argument into the other and
   * return that, or may return a new result container.
   *
   * @return a function which combines two partial results into a combined
   * result
   */
  @Override
  public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
    return null;
  }

  /**
   * Perform the final transformation from the intermediate accumulation type
   * {@code A} to the final result type {@code R}.
   *
   * <p>If the characteristic {@code IDENTITY_FINISH} is
   * set, this function may be presumed to be an identity transform with an
   * unchecked cast from {@code A} to {@code R}.
   *
   * @return a function which transforms the intermediate result to the final result
   *        중간 결과를 최종 결과로 변환하는 함수 반환
   */
  @Override
  public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
    // 최종 수집 과정에서 데이터 변환이 필요하지 않으므로 항등 함수 반환
    return Function.identity();
  }

  /**
   * Returns a {@code Set} of {@code Collector.Characteristics} indicating
   * the characteristics of this Collector.  This set should be immutable.
   *
   * @return an immutable set of collector characteristics
   */
  @Override
  public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
  }
}

3.2. Collectors 성능 비교

이제 기존의 partitioningBy() 로 구현한 코드와 커스텀 컬렉터로 만든 코드의 성능을 확인해보도록 한다.

public class CollectorHarness {
  public static void main(String[] args) {
    // 425ms
    //System.out.println(execute(QuizPrimeNumber::partitionPrimes) + "ms");

    // 122ms
    System.out.println(execute(CustomCollector::partitionPrimes) + "ms");
  }

  private static long execute(Consumer<Integer> primePartitioner) {
    long fastest = Long.MAX_VALUE;
    // 테스트 10번 반복
    for (int i=0; i<10; i++) {
      long start = System.nanoTime();
      // 백만 개의 숫자를 소수와 비소수로 분할
      primePartitioner.accept(1_000_000);
      long duration = (System.nanoTime() - start) / 1_000_000;

      // 가장 빨리 실행된 값을 저장
      if (duration < fastest) {
        fastest = duration;
      }
    }
    return fastest;
  }
}

커스텀 컬렉터가 122ms 로 훨씬 좋은 성능을 보이는 것을 알 수 있다.


참고 사이트 & 함께 보면 좋은 사이트

본 포스트는 라울-게이브리얼 우르마, 마리오 푸스코, 앨런 마이크로프트 저자의 Java 8 in Action을 기반으로 스터디하며 정리한 내용들입니다.






© 2020.08. by assu10

Powered by assu10