모던 자바 인 액션 -6-
스트림 인터페이스를 이용하면 간단하게 요소를 병렬로 처리 할 수 있다. 컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다.
병렬스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.
순차 스트림에 parallel 메서드를 호출하면 기존의 연산이 병렬로 처리된다 반대로 sequential로 병렬 스트림을 순차 스트림을 바꿀 수 있다. 따라서 우리는 어떤 연산은 병렬로, 어떤 연산은 순차적으로 실행할지 제어할 수 있다.
- parallel 메서드 : 순차 스트림 -> 병렬 스트림
- sequential 메서드 : 병럴 스트림 -> 순차 스트림
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 마지막 호출이 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다
* 병렬스트림은 내부적으로 ForkJoinPool을 사용한다.
// N 개의 숫자를 더하는 함수들
// 전통적인 for문
public long iterativeSum() {
private static final long N = 10_000_000L;
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
// 순차 stream
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum);
}
// 병렬stream
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).parallel().reduce(0L, Long::sum);
}
stream의 문제점
- 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다
- 반복 작업은 병렬로 수행할 있는 독립 단위로 나누기가 어렵다.
위의 문제점으로 인해 stream의 결과가 기존보다 느리게 나왔다. 우리는 병렬로 수행될 수 있는 스트림 모델이 필요하지만 iterate은 이전 연산의 결과에 따라 다음 함수의 입력이 달라진다. 따라서 iterate연산은 청크로 분할하기 어렵다. 결국 병렬처리를 할 수 없고 스레드를 할당하는 오버헤드만 증가하게 되었다.
효과적으로 개선하는 방법
- 청크로 나눌 수 있게 순자 범위를 생산한다
- IntStream, LongStream등을 사용하여 박싱 오버헤드를 없앤다
// 개선된 순차 stream
public long rangedSum() {
return LongStream.rangeClosed(1, N).reduce(0L, Long::sum);
}
// 개선된 병렬 stream
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);
}
* 병렬화가 무조건 좋은 것은 아니다. 병렬화를 이용하면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.
병렬 스트림 효과적으로 사용하기
- 확신이 서지 않으면 직접 측정하라.
- 무조건 병렬 스트림으로 바꾸는 것이 능사는 아니다. (항상 병렬 스트림이 순차 스트림보다 빠른 것은 아님)
- 적절한 벤치마크로 직접 성능을 측정하는 것이 바람직함.
- 박싱을 주의하라.
- 자동 박싱과 자동 언박싱은 성능을 크게 저하시킬 수 있는 요소이다.
- 기본형 특화스트림(IntStream, LongStream, DoubleStream)을 사용.
- 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
- 특히나 limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
- 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
- 처리해야 할 요소 수가 N이고, 하나의 요소를 처리하는 데 드는 비용이 Q라 하면 전체 스트림 파이프라인 처리 비용을 N*Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미
- 소량의 데이터에서는 병렬 스트림이 도움이 되지 않는다.
- 1번과 마찬가지로, 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있는 만큼의 이득을 얻지 못하기 때문.
- 스트림을 구성하는 자료구조가 적절한지 확인.
- 예를 들어 ArrayList를 LinkedList보다 효율적으로 분할할 수 있다.
- 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
- 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
- 최종 연산의 병합 과정 비용을 살펴보라.
- 병합 과정이 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브 스트림 부분 결과를 합치는 과정에서 상쇄될 수 있다.
스트림 소스와 분해성 (병렬화 친밀도)
소스 | 분해성 |
ArrayList | 훌륭함 |
LinkedList | 나쁨 |
IntStream.range | 훌륭함 |
Stream.iterate | 나쁨 |
HashSet | 좋음 |
TreeSet | 좋음 |
포크/ 조인 프레임워크
병렬화 할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음, 서브테스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브테스크를 스레드 풀의 작업자 스레드에 분산 할당한다.
RecursiveTask 활용
스레드풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야하고, 추상메서드 compute를 구현해야 한다.
compute 메서드 구현 형식은 다음과 같은 의사코드 형식을 유지한다
if(태스크가 충분히 작거나 더이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 메시지를 재귀적으로 호출
모든 서브태스크의 연산이 왑료될때까지 대기
각 서브태스크의 결과를 합침
}
포크/조인 프레임워크를 이용해서 병렬 합계 수행
public class ForkJoinSumCalculator extends RecursiveTask<Long> { // RecursiveTask 상속
public static final long THRESHOLD = 10_000;
private final long[] numbers;
private final int start;
private final int end;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if(length<=THRESHOLD){
return computeSequentially();
}
// 배열의 절반을 더하도록 첫번째 서브테스크를 생성한다
ForkJoinSumCalculator left = new ForkJoinSumCalculator(numbers, start, start + length / 2);
// ForkJoinPool의 다른 스레드로 새로 생성한 테스크를 비동기로 실행한다 // 새로운 스레드
left.fork();
// 배열의 나머지 절만을 더하도록 두번째 서브테스크 생성한다
ForkJoinSumCalculator right = new ForkJoinSumCalculator(numbers, start + length / 2, end);
// 두번째 서브 테스크를 동기 실행한다 현재 스레드에서 실행
Long rightCompute = right.compute();
// 첫번째 서브 테스크(새로운 스레드)의 결과를 읽거나 결과가 없으면 기다린다
Long leftCompute = left.join();
return rightCompute + leftCompute;
}
// 분할이 불가능 할때 순차적으로 서브테스크 결과를 계산
private long computeSequentially() {
long sum = 0;
for(int i=start;i<end;i++){
sum += numbers[i];
}
return sum;
}
}
// 실행
public static long forkJoinSum(long n){
long [] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
포크/ 조인 프레임 워크 효과적으로 사용하는 방법
- join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출하지 않으면, 각각의 서브태스크가 다른 서브태스크를 기다리는 일이 발생할 수 있다.
- RecursiveTask 내에서는 compute나 fork 메서드를 사용하며, 순차코드에서 병렬 계산을 시작할때만 ForkJoinPool의 invoke 메서드를 사용해야 한다.
- 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다. 한쪽 작업에만 fork를 호출하고 다른쪽에는 compute를 호출하면, 한 태스크에는 같은 스레드를 재사용할 수 있으므로 불필요한 오버헤드를 피할 수 있다.
- 포크/조인 프레임워크의 병렬 계산은 디버깅하기 어렵다. fork라 불리는 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
- 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 하며, 각 서브태스크의 실행 시간은 새로운 태스크를 포킹하느데 드는 시간보다 길어야한다.
작업 훔치기
이론적으로는 CPU의 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 코어에서 태스크를 실행할 것이고, 같은 시간에 종료될 것이라고 생각할 수 있다. 하지만 다양한 이유로 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
포크/조인 프레임워크에서는 작업훔치기(work stealing)라는 기법으로 이 문제를 해결한다. 작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드에 거의 공정하게 분할된다.
각 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업 처리한다. 한 스레드가 할 일이 떨어진 경우, 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.(모든 태스크가 끝날때까지)
작업 훔치기 기법을 사용하기 때문에 실제 코어 수 보다 더 잘게 나눈다. 태스크의 크기를 작게 나누어야 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.
Spliterator 인터페이스
Spliterator는 소스의 요소 탐색 기능을 제공한다는 점에서 Iterator와 같지만 병렬 작업에 특화되어 있다
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
- tryAdvace - Spliterator의 요소를 순차적으로 소비하면서 탐색해야할 요소가 있으면 참을 반환
- trySplit - Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성
- estimateSize - 탐색해야할 요소 수
- characteristics - Spliterator의 특성을 정의
- 1단계: 첫 번째 Spliterator에 trySplit을 호출하면 두 번째 Spliterator가 생성된다.
- 2단계: 두개의 Spliterator에 trySplit을 다시 호출하면 4개의 Spliterator가 생성된다.
- 이처럼 trySplit의 결과가 null이 될 때 까지 이 과정을 반복한다.
- 3단계: trySplit이 null을 반환했다는 것은 더 이상 자료구조를 분할할 수 없음을 의미
- 4단계: Spliterator에 호출한 모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.
이러한 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향 받는다
Spliterator 특성
특성 | 의미 |
ORDERED | 리스트처럼 정해진 순서가 있으므로 요소를 탐색하고 분할할 때 순서에 유의해야 함 |
DISTINCT | x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환 |
SORTED | 탐색된 요소는 미리 정의된 정렬 순서를 따른다. |
SIZED | 크기가 알려진 소스로 생성했으므로 estimatedSize()는 정확한 값을 반환한다. |
NON-NULL | 탐새갛느 모든 요소는 null이 아니다. |
IMMUTABLE | 이 Spliterator의 소스는 불변이다. 요소를 탐색하는 동안 추가/삭제/수정할 수 없다. |
CONCURRET | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | 이 Spliterator와 분할되는 모든 spliterator의 SIZED 특성을 갖는다. |