관리 메뉴

생각해보기

모던 자바 인 액션 -14- 본문

자바

모던 자바 인 액션 -14-

정한_s 2021. 12. 27. 13:25

자바 5에서 부터 Future 인터페이스를 제공하고 있다. 비동기 계산을 모델링 하는데 Future을 사용할 수 있다. Future는 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다. 시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다.

 

Future로 오래 걸리는 작업을 비동기적으로 실행하기

public class FutureAsync {
    public static void main(String args[]){
        // 쓰레드 풀에 태스크를 제출하려면 ExecutorService를 만들어야 한다
        ExecutorService executor  = Executors.newCachedThreadPool();

        // Callable을 ExcutorService로 제출한다
        Future<Double> future = executor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                return doSomeLongComputation();
            }
        });
        // 비동기 작업을 수행하는 동안 다른 작업 수행
        doSomethingElse();
        try {
         // 비동기 작업의 결과를 가져온다. 결과가 준비 되지 않으면 호출 스레드가 블록된다. 최대 1초를 기다린다
         Double result=  future.get(1, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            // 계산 중 예외 발생
            e.printStackTrace();
        } catch (InterruptedException e) {
            // 현재 스레드에서 대기 중 인터럽트 발생
            e.printStackTrace();
        } catch (TimeoutException e) {
            // Future가 완료되기 전에 타임 아웃 발생
            e.printStackTrace();
        }
    }

    private static void doSomethingElse() {
        System.out.println("do something");
    }

    private static Double doSomeLongComputation() {
        return 0.0;
    }
}

 

그림으로 나타낸 Future 비동기 작업 흐름 

다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때 Future의 get 메서드로 결과를 가져올 수 있다 get 메서드를 호출했을 때 이미 계산이 완료되어 결과가 준비되어 있다면 즉시 결과를 반환한다. 하지만 결과가 준비되어 있지 않다면 작업이 완료 될때 까지 호출자 스레드를 블록 시킨다.

 

 

CompletableFuture 

Future 인터페이스가 제공하는 메서드만으로 간결한 동시 실행 코드를 구현하기가 어렵다. 따라서 자바8에서 Future 인터페이스를 구현한 CompletableFuture로 다음과 같은 기능을 제공한다

 

CompleatableFuture 제공 기능

  •  두 개의 비동기 계산 결과를 하나로 합친다
  • Future 집합이 실행하는 모든 태스크의 완료를 기다린다
  • Future 집합에서 가장 빨리 완료되는 테스크를 기다렸다가 결과를 얻는다
  • 프로그램적으로 Future를 완료시킨다. (비동기 동작에 수동으로 결과를 제공) 
  • Future 완료 동작에 반응한다(결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음 Future의 결과로 원하는 추가 동작을 할 수 있다)

* 동기 API 와 비동기 API

더보기

동기 API(blocking call) : 메서드를 호출한 다음에 메서드가 계산을 완료할 때 까지 기다렸다 메서드가 반환되면 호출자는 반환된 값으로 다른 동작을 수행한다.

 

비동기 API(non-blocking call) : 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행할 수 있도록 다른 스레드에 할당한다

 

동기 방식과 비동기 방식 

public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }
	
    // 동기 방법
    public double getPrice(String product) {
        return calculatePrice(product);
    }
	
    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    // 비동기 방법
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();
        return futurePrice;
    }

    public String getName() {
        return name;
    }

    public static void delay() {
        int delay = 1000;
        //int delay = 500 + RANDOM.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Runner 클래스 

public class ShopRunner {
    public static void main(String args[]){
        printSyncTime();
        System.out.println("======");
        printAsyncTime();
    }
    public static void printAsyncTime(){
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime
                + " msecs");
        // 다른 상점 질의 같은 다른 작업 수행
        doSomethingElse();
        //제품 가격을 계산하는 동안
        try {
        	// 가격정보가 있으면 Future에서 가격 정보를 읽고, 없으면 기다린다
            double price = futurePrice.get();
            System.out.printf("Price is %.2f%n", price);
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");
    }
    public static void printSyncTime(){
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        double price = shop.getPrice("my favorite product");
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime
                + " msecs");
        System.out.printf("Price is %.2f%n", price);
        doSomethingElse();
    }
    private static void doSomethingElse() {
        System.out.println("do something...");
    }
}

결과

Invocation returned after 1000 msecs
Price is 123.26
do something...
======
Invocation returned after 4 msecs
do something...
Price is 123.26
Price returned after 1005 msecs

비동기 에러 전파

// 비동기 방법 에러 전파 
public Future<Double> getPriceAsyncHandleException(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        try{
            double price = calculatePrice(product);
            // 계산이 정상적으로 종료되면 Future에 가격정보를 저장채로 Future 종료
            futurePrice.complete(price);
            
        }catch (Exception ex){
        	// 문제가 생길경우 발생한 에러를 포함하여 Future 종료
            futurePrice.completeExceptionally(ex);               
        }
    }).start();
    return futurePrice;
}

completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생하는 예외를 클라이언트로 전달할 수 있다. 

 

팩토리 메서드 supplyAsync 이용

public Future<Double> getPriceAsyncSimple(String product) {
    return CompletableFuture.supplyAsync(()->calculatePrice(product));
}

간단하게 supplyAsync로 completableFuture 만들 수 있다. 이 메서드는 Suppiler를 인수로 받아 CompletableFuture를 반환한다. CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성한다. 기본은 ForkJoinPool의 commonPool을 통해 Supplier를 실행한다. 

 

비블록 코드 만들기

상점 리스트

조건 : 제품명을 입력하면 상점 이름과 가격을 포함하는 List를 반환한다

private final List<Shop> shops = Arrays.asList(
        new Shop("BestPrice"),
        new Shop("LetsSaveBig"),
        new Shop("MyFavoriteShop"),
        new Shop("BuyItAll"));

순차적으로 정보 요청하는 코드

public List<String> findPrices(String product){
    return shops.stream()
            .map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
            .collect(toList());
}

병렬 스트림 사용

public List<String> findPricesParallel(String product){
    return shops.parallelStream()
            .map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
            .collect(toList());
}

CompletableFuture 비동기 호출

public List<String> findPricesFuture(String product) {
    List<CompletableFuture<String>> pricesFutures =
            shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(
                            ()->String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))
                    .collect(toList());
    return pricesFutures.stream().map(CompletableFuture::join).collect(toList());
}

두 map 연산을 하나의 스트림 처리 파이프라인으로 처리 하지 않고 두개의 스트림 파이프라인으로 처리했다.

join은 Future 메서드의 get 메서드와 같은 의미를 가지며 아무 예외도 발생시키지 않는다(get은 예외 발생시킨다).

스트림 연산은 게으른 특성이 있으므로 하나의 파이프 라인으로 연산을 처리했다면 모든 가격 정보 요청 동작이 동기적, 순차적으로 이루어지게 된다. 따라서 계산과 반환으로 두개의 파이프 라인으로 계산을 비동기적으로 처리하였다 

 

결과

순차 done in 4013 msecs
병렬 done in 1005 msecs
CompeletableFuture done in 1005 msecs

병렬 스트림과 CompletableFuture 버전이 순차 스트림 버전보다 빠른 것을 알 수 있다. 또한 병렬 스트림과 CompletableFuture 버전은 내부에서 Runtime.getRuntime().availableProcessors() 가 반환하는 스레드 수* 를 사용하면서 비슷한 결과를 내었다. CompletableFuture의 장점은 병렬 스트림 버전에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다. 따라서 Executor로 스레드 풀을 조정하여 애플리케이션에 맞는 최적화 설정을 얻을 수 있다.

 

* ForkjoinPool의 commonPool 사용, 스레드 수는 processor -1 이다

 

* 스레드 풀 크기 조절

 

자바 병렬 프로그래밍에서는 스레드 풀의 최적값을 찾는 방법을 제안한다. 스레드 풀이 너무크면 CPU와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다. 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을수도 있다. 게츠는 다음 공식으로 대략적인 CPU 활용 비율을 계산할 수 있다


 

N(threads)는 스레드풀의 크기
 Runtime.getRuntime().availableProcessors()가 반환하는 코어의수
는 0과 1사이의 값을 갖는 CPU 활용 비율
는 대기시간과 계산시간의 비율

N(threads)=N(cpu)∗U(cpu)(1+W/C)

N(threads)는 스레드풀의 크기
N(cpu)는 Runtime.getRuntime().availableProcessors()가 반환하는 코어의수
U(cpu)는 0과 1사이의 값을 갖는 CPU 활용 비율
W/C는 대기시간과 계산시간의 비율

 

커스텀 Executor

private final Executor executor = Executors.newFixedThreadPool(shops.size(),(Runnable r)->{
    Thread t= new Thread(r);
    t.setDaemon(true);
    return t;
});

 

CompletableFuture 기능

* Async로 끝나는 버전과 끝나지 않는 버전의 차이

Async로 끝나는 메서드는 다음 작업이 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다.

Async로 끝나지 않는 메서드는 이전 작업이 수행한 스레드와 같은 스레드에서 작업을 실행한다.

 

비동기 작업 파이프라인을 만들기

  • 첫번째 연산 getPrice() : 각 상점에서 요청한 제품의 가격을 얻는다(문자열)
  • 두번째 연산 parse() : 반환한 문자열을 Quote 객체로 변환
  • 세번째 연산 applyDiscount() : Discount 서비스를 이용해서 각 Quote에 할인을 적용 

동기 작업과 비동기 작업 조합하기

thenApply 메서드

CompletableFuture 동작을 완전히 완료한 다음 thenApply 메서드에 람다식이 적용된다. 

따라서 CompletableFuture<String>에서 CompletableFuture<Quote>로 변경된다.

 

thenCompose 메서드

두 가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두개의 비동기 동작을 만들 수 있다. 

첫번째 CompletableFuture의 계산 결과를 두번째 CompletableFuture의 계산 입력으로 사용한다.

 

독립적인 두 개의 비동기 태스크 합치기

 

thenCombine 메서드

독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 할 때 사용된다.

독립적이란 첫 번째 CompletableFuture의 결과와는 상관없이 두 번째 CompletableFuture를 실행할 수 있는 것을 말한다

thenCombine 메서드는 BiFunction을 두번 째 인수로 받는다. BiFunction은 두 개의 CompletableFuture 결과를 어떻게 합칠 지를 결정한다.

 

타임아웃 사용(자바 9) - orTimeout, completeOnTimeout

두 메서드 모두 CompletableFuture를 반환하므로 이 결과를 다른 CompletableFuture 메서드와 연결할 수 있다

 

orTimeout

지정된 시간이 지난 후에 CompletableFuture을 TimeoutExecption으로 완료한다.

내부적으로 ScheduledThreadExecutor를 활용한다.

 

completeOnTimeout

지정된 시간이 지났을 때 Execption으로 처리하는 것이 아닌 임의의 값으로 완료한다.

 

CompletableFuture 종료에 대응

 

thenApply 메서드

CompletableFuture 계산이 끝나면 값을 소비한다. 연산 결과를 소비하는 Consumer를 인수로 받는다.

 

allOf 메서드 

CompletableFuture 배열을 입력으로 받는다 ( input : CompletableFuture[ ] )

전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>를 반환한다. 

 

anyOf 메서드

CompletableFuture 배열을 입력으로 받는다 ( input : CompletableFuture[ ] )

처음으로 완료한 CompletableFuture의 값으로 동작을 완료한다. (output : CompletableFuture<Object>)

 

'자바' 카테고리의 다른 글

모던 자바 인 액션 -16-  (0) 2021.12.28
모던 자바 인 액션 -15-  (0) 2021.12.27
모던 자바 인 액션 -13-  (0) 2021.12.21
모던 자바 인 액션 -12-  (0) 2021.12.20
모던 자바 인 액션 -11-  (0) 2021.12.20
Comments