자바

모던 자바 인 액션 -13-

정한_s 2021. 12. 21. 16:26

개선된 자바 동시성

동시성 : 단일 코어 머신에서 발생할 수 있는 프로그래밍 속성으로 실행이 서로 겹칠 수 있는 것. 여러 작업이 동시에 실행되는 것처럼 보이는 것

 

병렬성 : 병렬 실행을 하드웨어에서 지원하는 것. 실제로 여러 작업 동시에 실행 될 수 있는 것

 

 

자바의 동시성 지원의 진화

처음에 자바는  Runnable 과 Thread를 동기화된 클래스와 메서드를 이용해 잠갔다.

 

자바 5에서는 좀 더 표현력 있는 동시성을 지원하는 ExecutorService(실행과 테스크 제출을 분리 한 인터페이스), Runnable(리턴 값없음), Thead의 변형을 반환하는 Callable <T> (리턴 값있음), Future <T> (비동기적 연산 결과 표현), 제네릭 등을 지원하였다. 

 

자바 7에서는 분할과 정복 알고리즘의 포크/조인 구현을 지원하는  RecursiveTask가 추가 되었다

 

자바 8에서는 스트림과 람다 지원에 기반한 병렬 프로세싱이 추가되었다. 

 

Executor와 스레드 풀

자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드를 만들고 종료하려면 비싼 비용을 치러야 하며(스레드마다 최소 16kb 메모리 재할당) 운영체제 스레드 숫자는 제한되어 있다. 

 

자바 ExecutorService는 테스트를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다.

 

프로그램은 newFixedThreadPool  같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다. 이 메서드는 워크 스레드라 불리는 ExecutorService를 만들고 스레드 풀에 저장한다. 스레드 풀에서 사용하지 않은 스레드로 제출된 태스크(Runnable이나  Callable)를 먼저 온 순서대로 실행한다. 실행이 종료되면 스레드를 풀로 반환한다. 

 

 

이러한 장점은 하드웨어에 맞는 수의 태스크를 유지함과 동시에 태스크를 스레드 풀에 오버헤드 없이 제출할 수 있다. 

 

스레드 풀 주의 사항

거의 모든 관점에서 스레드를 직접 사용하는 것보다 스레드 풀을 이용한 것이 좋지만 주의해야 하는 점이 있다.

 

  • k 스레드를 가진 스레드 풀은 오직 k 만큼의 스레드를 동시에 실행 가능하다.
    • 초과로 제출된 테스트는 큐에 저장되어 이전 테스트가 종료되기 전까지 대기하게 된다.
    • 일반적인 상황에서는 불필요한 스레드를 생성하지 않기 때문에 문제 되지 않지만 실행 중인 테스크에서 Sleep이 걸리거나 I/O를 기다리게 되면 성능이 급 혁하게 저하된다.
    • 만약 5 개의 스레드를 갖는 스레드 풀에 20개의 테스크가 할당되고 3개의 스레드가 I/O를 기다리게 되면 결론적으로 2개의 스레드가 15개의 테스크를 처리해야 한다.
    • 처음 제출한 테스크가 나중의 테스크의 제출을 기다리를 상황에 빠진다면 데드락에 걸릴수도 있다.
  • 프로그램을 종료하기전 모든 스레드 풀을 종료하는 습관을 가져야한다.
    • 풀의 워커 스레드가 만들어진 다음 다른 테스크의 제출을 기다리며 종료되지 않은 상태로 남을수도 있다.
    • 보통 장기간 실행하는 인터넷 서비스를 관리하며 오래 실행되는 ExecutorService를 갖는건 흔한일이며 자바는 이런상황을 다루도록 Thread.setDaemon 메서드를 지원한다.

스레드

태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다리는 것. 즉, 스새드 생성과 join()이 한쌍 처럼 이루어지는 메서드 호출을 엄격한 포크/조인이라 부른다.

엄격한 포크/조인

시작된 태스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 것을 여유로운 포크/조인이라 부른다.

여유로운 포크/조인

여유로운 포크/조인 즉, 메서드 호출자에 기능을 제공하도록 메서드가 반환된 후에도 테스크 실행이 계속되는 메서드를 비동기 메서드라고 한다.

 

동기 API와 비동기 API

// 메소드 시그니처
// 각각의 f와 g는 실행하는 데 오래걸린다
// 서로 상호작용하지 않음
int f(int x); 
int g(int x); 

int y = f(x); 
int z = g(x); 
System.out.println(y + z);

별도의 CPU 코어로 실행해서 합계를 구하는 시간을 단축할 수 있다. 

 

Thread로 구현

public class ThreadExample {

    public static void main(String args[]) throws InterruptedException {
        int x = 0;
        Result result = new Result();

        Thread thread1 = new Thread(()->result.left = f(x));
        Thread thread2 = new Thread(()->result.right = g(x));
        // * 시작 이전에 join 호출시 즉시 join에서 나옴
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(result.left+ result.right);
    }

    private static int g(int x) {
        return 2;
    }
    private static int f(int x) {
        return 1;
    }

    private static class Result{
        private int left;
        private int right;
    }
}

 

ExecutorService로 구현

public class ExecutorServiceExample {
    public static void main(String args[]) throws ExecutionException, InterruptedException {
        int x = 0;

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> f_r = executorService.submit(() -> f(x));
        Future<Integer> g_r = executorService.submit(() -> g(x));
        System.out.println(f_r.get() + g_r.get());
    }
    private static int g(int x) {
        return 2;
    }

    private static int f(int x) {
        return 1;
    }
}

 

비동기 리액티브 형식 으로 구현

함수에 추가 인수로 람다를 전달해서 결과가 준비되면 이를 람다로 호출하는 방법

public class CallbackStyleExample {

    public static void main(String args[]){
        int x =0;
        Result result = new Result();
        f(x, (int y) -> {
            result.left = y;
            System.out.println((result.left + result.right));
        });

        g(x, (int z) -> {
            result.right = z;
            System.out.println((result.left + result.right));
        });
    }

    private static class Result{
        private int left;
        private int right;
    }
    private static void f(int x, IntConsumer dealWithResult){
        dealWithResult.accept(Functions.f(x));
    }
    private static void g(int x, IntConsumer dealWithResult){
        dealWithResult.accept(Functions.g(x));
    }
}

 but 락을 사용하지 않기 때문에 위의 코드는 완전하지 않다. 

두가지 방법으로 보완할 수 있다

  • if-then-else를 이용해 적절한 락을 이용해 두 결과가 모두 호출되었는지 확인한 다음 원하는 기능을 수행한다.
  • 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.

리엑티브 형식의 비동기 API는 콜백으로 이어지는 일련의 값을 Future 형식의 API는 일회성 값을 처리하는 데 적합하다.

 

 

비효율적 작업 방식

work1(); 
Thread.sleep(10000); 
work2();

스레드 풀에서 sleep 중인 테스크는 다른 테스크가 시작되지 못하게 막으므로 자원울 소비한다

 

스케쥴링 작업 방식 

public class ScheduledExecutorServiceExample {

    public static void main(String args[]){
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        // work1 실행 후 종료
        work1();
        // work1 종료 후 10초 뒤에 실행될 수 있도록 큐에 추가한다
        scheduledExecutorService.schedule(()->work2(),10, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
    }

    private static void work1() {
        System.out.println("work1");
    }
    private static void work2() {
        System.out.println("work2");
    }

}

sleep 방식은 자는 동안 스레드 자원을 점유하는 반면에 이 방식은 다른 작업이 실행 될 수 있도록 허용하기 때문에 효율적이다. 

 

비동기 API에서 예외 처리

비동기 API에서 호출된 메서드의 실제 바디는 별도의 스레드에서 호출되며 이때 발생한 어떤 에러는 이미 호출자의 실행 범위와 관계가 없는 상황이 된다. 따라서 예외가 발생했을 때의 실행될 콜백 메서드를 추가해야한다. 이러한 콜백 메서드를 호출하는 것을 메시지 또는 이벤트라고 부른다.

 

박스와 채널 모델

박스와 채널 모델은 동시성을 설계하고 계념화하기 위한 모델이다.

박스와 채널 모델을 이용하면 생각과 코드를 구조화할 수 있으며, 시스템 구현의 추상화 수준을 높일 수 있다.

박스로 원하는 연산을 표현하면 계산을 손으로 코딩한 결과보다 더 효율적일 것이다. 

또한 병렬성을 직접 프로그래밍하는 관점을 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔준다.

간단한 박스와 채널 다이어그램

CompletableFuture와 콤비네이터를 이용한 동시성

자바8에서는 Future 인터페이스 구현인 CompletableFuture를 이용해 Future를 조합할 수 있는 기능을 추가했다.

일반적으로 Future 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어 진다. 

하지만 CompletableFuture은 실행할 코드 없이 Future를 만들 수 있도록 허용하며, complete() 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get() 으로 값을 얻을 수 있도록 허용한다

 

* 새로운 CompletableFuture 인스턴스 만들 때 thenAppy, thenAccept등을 사용하면 해당 stage의  CompletableFuture가 생긴다.  나중에 complete은 stage 이후의 완료되지 않은 stage에 적용되므로 조심할 것

 

 CompletableFuture 방식

public class CFComplete {
    public static void main(String args[]) throws ExecutionException, InterruptedException {

        ExecutorService executorService= Executors.newFixedThreadPool(10);

        int x =0;
        CompletableFuture<Integer> a = new CompletableFuture<>();
        executorService.submit(()->a.complete(f(x)));
        int b = g(x);
        System.out.println(a.get()+b);
        executorService.shutdown();
    }

    public static int f(int x) {
        return x+1;
    }

    public static int g(int x) {
        return x+2;
    }
}

 

CompletableFuture 방식 - 효과적인 방법 

public class CFCombine {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        int x = 0;
        CompletableFuture<Integer> a = new CompletableFuture<>();
        CompletableFuture<Integer> b = new CompletableFuture<>();
        
        // thenCombine 메서드 사용 // 두 작업이 끝났을 때 함수 적용
        CompletableFuture<Integer> c = a.thenCombine(b,(num1,num2)->num1+num2);

        executorService.submit(()->a.complete(f(x)));
        executorService.submit(()->a.complete(f(x)));

        System.out.println(c.get());
        executorService.shutdown();
    }
    public static int f(int x) {
        return x+1;
    }

    public static int g(int x) {
        return x+2;
    }
}

결과를 더해주는 세 번째 연산은 다른 두 작업이 끝날 때 까지 스레드에서 실행되지 않는다.(먼저 시작해서 블록되지 않는다) 따라서 기존의 발생했던 블록 문제가 발생하지 않는다. 

 

리액티브 프로그래밍

Future은 독립적 실행과 병렬성에 기반하므로, 한번만 실행해 결과를 제공한다.

반면 리액티브 프로그래밍은 여러 Future 같은 객체를 통해 여러 결과를 제공한다. 

 

자바9에서는 java.util.concurrent.Flow 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다.

플로 API를 간단히 정리하면 다음과 같다.

  • 구독자가 구독할 수 있는 발행자
  • 이 연결을 구독(subscription)이라 한다.
  • 이 연결을 이용해 메시지(또는 이벤트)를 전송한다.

두 플로 합치는 예제

// 조건 : c1이나 c2의 값이 갱신되면 c3에 새로운 값 반영
/*
c1이나 c2의 값이 바뀌었을 때 c3이 새로운 값 반영하기 위해서는,
c1과 c2에 이벤트가 발생했을 때 c3 구독해야 한다. Publisher<T> 필요하다.

Subscriber<T> 는 구독자를 받아서, 구현자 필요한 대로 메서드를 구현한다
 */
// Publisher 이며 동시에 Subscriber 일  수 있다
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
    private int value = 0;
    private String name;
    private List<Subscriber> subscribers = new ArrayList<>();
    public SimpleCell(String name) {
        this.name = name;
    }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }


    @Override
    public void onSubscribe(Subscription subscription) {

    }
    private void notifyAllSubscribers(){
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }

    @Override
    public void onNext(Integer item) {
        this.value = item;
        System.out.println(this.name + " : " + this.value);
        notifyAllSubscribers();
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {

    }
}

데이터가 발행자(생산자)에서 구독자(소비자)로 흐름에 착안해 이를 업스트림 또는 다운스트림이라 부른다. 위 예제에서 newValue는 업스트림 onNext() 메서드로 전달되고 notifyAllSubscribers() 호출을 통해 다운스트림 onNext() 호출로 전달된다.

 

압력과 역압력

매 초마다 수천개의 메시지가 onNext로 전달된다면 빠르게 전달되는 이벤트를 아무 문제 없이 처리할 수 있을까? 이러한 상황을 압력(pressure)이라 부른다.

이럴때는 정보의 흐름 속도를 제어하는 역압력 기법이 필요하다. 역압력은 Subscriber가 Publisher로 정보를 요청할 때만 전달할수 있도록 한다.

void onSubscribe(Subscription subscription);

위 메서드는 Subscribe 인터페이스에서 제공하며, Subscriber와 Publisher 사이에 채널이 연결되면 첫 이벤트로 이 메서드가 호출된다.

 

Subscription 객체는 Subscriber와 Publisher가 통신할 수 있는 메서드를 포함한다.

interface subscription { 
	void cancle (); 
	void request (long n); 
}

Publisher는 Subscription 객체를 만들어 Subscriber로 전달하고 Subscriber는 이를 통해 Publisher로 정보를 보낼 수 있다.

한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하기 위해 다음과 같은 작업이 필요하다.

  • Subscriber가 onSubscribe로 전달된 Subscription 객체를 subscription  같은 필드에 로컬로 저장한다.
  • Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel, request(1)을 추가해 오직 한 이벤트만 요청한다.
  • 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다.

위처럼 역압력을 구현하려면 장단점도 생각해야 한다.

  • 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도 큐를 가질 것인가?
  • 큐가 너무 커지면 어떻게 할까?
  • Subscriber가 준비가 안되었다면 큐의 데이터를 폐기할 것인가?

소실 가능 여부 등 데이터의 성격에 따라 구현도 바뀔 것이다.

 

리액티브 프로그래밍

리액티브 시스템은 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램을 가리킨다. 리액티브 시스템이 가져야할 속성은 반응성(responsive), 회복성(resilient), 탄력성(elastic)의 세가지 속성으로 요약할 수 있다.

이러한 속성을 구현하는 방법 중 하나로 리액티브 프로그래밍을 이용할 수 있다.