본문 바로가기
Java/모던 자바 인 액션

[모던 자바 인 액션] CompletableFuture : 안정적인 비동기 프로그래밍

by dvid 2023. 1. 20.

CompletableFuture: 안정적 비동기 프로그래밍

1. Future의 단순 활용

비동기 계산을 모델링하는 데 Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.
시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다.

Future는 저수준에 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다.
Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출해야 한다.

ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(FutureEx::doSomeLongComputation);

doSomethingElse();

try {
    Double aDouble = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException e) {
    // 계산 중 예외 발생
} catch (InterruptedException e) {
    // 현재 스레드에서 대기 중 인터럽트 발생
} catch (TimeoutException e) {
    // Future가 완료되기 전에 타임아웃 발생
}

Future로 시간이 오래 걸리는 작업을 비동기적으로 실행하기

ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있다.
작업이 영원히 끝나지 않는다면, 최대 타임아웃 시간을 설정하는 것이 좋다.

1-1. Future 제한

여러 Future의 결과가 있을 때 이들의 의존성은 표현하기 어렵다.
다음과 같은 선언형 기능이 있다면 유용할 것이다.

  • 두 개의 비동기 계산 결과를 하나로 합친다. 독립적 || 의존
  • Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
  • Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.
  • 프로그램적으로 Future를 완료시킨다. (비동기 동작에 수동으로 결과 제공).
  • 결과를 기다리면서 블록 되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future의 결과로 원하는 추가 동작 수행

선언형으로 이용할 수 있도록 자바 8에서 새로 제공하는 CompletableFuture를 확인할 것이다.
람다 표현식과 파이프라이닝을 활용한다.

1-2. CompletableFuture로 비동기 애플리케이션 만들기

예산을 줄일 수 있도록 여러 온라인 상점 중 가장 저렴한 가격을 제시하는 상점을 찾는 애플리케이션을 완성하는 예제 이용.

배울 점

  • 고객에게 비동기 API를 제공하는 방법
  • 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법, 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법
  • 비동기 동작 완료에 대응하는 방법 (각 상점에서 가격 정보를 얻을 때마다 즉시 최저가격을 찾는 애플리케이션을 갱신하는 방법)

동기 API와 비동기 API

전통적인 동기 API에서는 블록 호출blocking call을 한다.
반면 비동기 API에서는 메서드가 즉시 반환되며 끝내지 못한 나머지 작업을 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다.
비블록 호출non-blocking call을 한다.
다른 스레드에 할당된 나머지 계산 결과는 콜백 메서드를 호출해서 전달하거나 호출자가 '계산 결과가 끝날 때까지 기다림' 메서드를 추가로 호출하면서 전달된다.

2. 비동기 API 구현

public class Shop {
    /**
     * 제품명에 해당하는 가격을 반환하는 메서드
     */
    public double getPrice(String product) {
        // 구현해야 함
    }
}

getPrice() 메서드는 상점의 데이터베이스를 이용해서 가격정보를 얻는 동시에 다른 외부 서비스에도 접근할 것이다.
오래 걸리는 작업은 delay()라는 메서드(1초를 기다림)로 대체할 것이다.

/**
 * 1초 지연 (오래 걸리는 작업)
 */
public static void delay() {
    try {
        Thread.sleep(1_000L);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
public class Shop {
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

사용자가 이 API를 호출하면 비동기 동작이 완료될 때까지 1초 동안 블록된다.
이 따라서 비동기 API를 만들어보자.

2-1. 동기 메서드를 비동기 메서드로 변환

public Future<Double> getPriceAsync(String product) { ... }

위와 같이 메서드 시그니처를 변경했다.
getPriceAsync()메서드는 즉시 반환되므로 호출자 스레드는 다른 작업을 수행할 수 있다.

public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread(() -> {
        double price = calculatePrice(product);
        futurePrice.complete(price);
    }).start();
    return futurePrice;
}

위 코드에서 오래 걸리는 계산 결과를 기다리지 않고 결과를 포함할 Future 인스턴스를 바로 반환했다.
요청한 제품의 가격 정보가 도착하면 complete() 메서드를 이용해서 CompletableFuture를 종료할 수 있다.
다음 코드처럼 클라이언트는 getPriceAsync를 활용할 수 있다.

Shop shop = new Shop("Best Shop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my product");
long invocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("Invocation returned after " + invocationTime + " msecs");

// 제품의 가격을 계산하는 동안
doSomethingElse();
// 다른 상점 검색 등 다른 작업 수행
try {
    Double price = futurePrice.get();
    System.out.printf("Price is %.2f %n", price);
} catch (Exception e) {
    throw new RuntimeException(e);
}


long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("Price returned after" + retrievalTime + " msecs");

팩토리 메서드 supplyAsyncCompletableFuture 만들기

public Future<Double> getPriceAsync(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));    
}

supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다.
CompletableFutureSupplier를 실행해서 비동기적으로 결과를 생성한다.
ForkJoinPoolExecutor 중 하나가 Supplier를 실행할 것이다.
두 번째 인수를 받는 supplyAsync 메서드를 이용해서 다른 Executor를 지정할 수 있다.
결국 모든 CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달할 수 있다.

3. 비블록 코드 만들기

상점 리스트에서 제품명을 입력하면 상점 이름과 제품 가격 문자열 정보를 포함하는 List를 반환하는 메서드를 구현해야 한다.

List<Shop> shops = List.of(
                        new Shop("Best Price"),
                        new Shop("Lets Save"),
                        new Shop("Favorite Shop"),
                        new Shop("Buy All"));       

public List<String> findPrices(String product) {
    return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice()))
                .collect(toList());
}

현재는 4초 이상이 걸린다. 개선이 필요하다.

3-1. 병렬 스트림으로 요청 병렬화하기

public List<String> findPrices(String product) {
    return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice()))
                .collect(toList());
}

병렬로 검색이 진행되므로 시간이 단축되었다. CompletableFuture 기능을 활용해서 findPrices 메서드의 동기 호출을 비동기로 바꿔보자.

3-2. CompletableFuture로 비동기 호출 구현

List<CompletavleFuture<String>> priceFutures = 
    shops.stream()
         .map(shop -> CompletableFuture.supplyAsync(
            () -> String.format("%s price is %.2f", 
                        shop.getName(), 
                        shop.getPrice(product))))
         .collect(toList());

위 코드로 CompletableFuture를 포함하는 리스트 List<CompletableFuture<String>>를 얻는다.
모든 CompletableFuture의 동작이 완료되고 결과를 추출한 다음에 리스트를 반환해야 한다.
CompletableFuturejoin 메서드는 Futureget과 같은 의미를 갖지만, 예외를 발생시키지 않는다.

public List<String> findPrices(String product) {
    List<CompletavleFuture<String>> priceFutures =
        shops.stream()
        .map(shop -> CompletableFuture.supplyAsync(
                        () -> String.format("%s price is %.2f",
                                shop.getName(),
                                shop.getPrice(product))))
        .collect(toList());
    return priceFutures.stream()
                       .map(CompletableFuture::join)
                       .collect(toList())
}

map 연산을 하나의 스트림 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리했다.
스트림 연산은 게으른 연산을 하므로 하나의 파이프라인으로 처리했다면 모든 가격 정보 요청이 순차적, 동기적으로 이루어진다.

순차적으로 처리된다면 이전 요청의 처리가 완전히 끝난 다음 새로 만든 CompletableFuture가 처리된다.
병렬로 처리한다면 CompletableFuture를 리스트로 모은 다음에 다른 작업과는 독립적으로 각자의 작업을 수행한다.

더 확장성이 좋은 해결 방법

CompletableFuture, 병렬스트림 둘 다 Runtime.getRuntime().availableProcessors()가 반환하는 스레드 수를 사용하면서 비슷한 결과가 된다.
결과적으로 비슷하지만 CompletableFuture는 병렬 스트림에 비해 작업에 이용할 수 있는 다양한 Executor를 지정할 수 있다.
따라서 Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.

3-4. 커스텀 Executor 사용하기

스레드 풀 크기 조절

Nthreads = NCPU * UCPU * (1 + W / C)

NCPU = Runtime.getRuntime().availableProcessors()
UCPUT = 0~1 사이 값을 갖는 CPU 활용 비율
W / C = 대기시간과 계산시간 비율

애플리케이션에서 상점의 응답을 대략 99%만큼 기다리므로 W / C = 100 이라고 할 수 있다.
즉 CPU 활용률이 100%라면 스레드 수는 NCPU * 100이라고 할 수 있다. 하지만 상점 수 보다 많을 필요가 없다.
스레드 수가 너무 많으면 오히려 서버 성능에 좋지 않을 수 있으므로 사용할 스레드 수는 100 이하로 설정하는 것이 좋다.

private final Executor executor = 
    Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
            public Thread newThread(Runnable r){
                Thread t = new Thread(r);
                t.setDaemon(true);  // 프로그램 종료를 방해하지 않는 데몬스레드
                return t;
            }
        }
    });

우리가 만드는 풀은 데몬 스레드daemon thread를 포함한다. 자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다.
따라서 어떤 이벤트를 한없이 기다리면서 종료되지 않는 일반 스레드가 있으면 문제 될 수 있다.
반면 데몬스레드는 자바 프로그램이 종료될 때 강제로 실행이 종료될 수 있다.

스트림 병렬화와 CompletableFuture 병렬화

  • I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다.
  • I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며 대기 / 계산(W / C)의 비율에 적합한 스레드 수를 설정할 수 있다.

4. 비동기 작업 파이프라인 만들기

우리와 계약 맺은 모든 상점이 하나의 할인 서비스를 사용하기로 했다고 가정하자.
할인 서비스에서는 서로 다른 할인율을 제공하는 다섯 가지 코드를 제공하는데, Discount.Code로 표현할 수 있다.

public class Discount {
    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }
}

또한 Shop 클래스의 getPrice() 또한 ShopName:price:DiscountCode 형식을 반환하도록 리팩토링 한다.

public String getPrice(String product) {
    double price = calculatePrice(product);
    Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().lenth)];
    return String.format("%s:%.2f:%s", name, price, code);
}

4-1. 할인 서비스 구현

최저가격 검색 애플리케이션은 여러 상점에서 얻은 가격 정보로 할인서버에 질의를 보내야 한다.
상점에서 제공한 문자열 파싱은 Quote 클래스로 캡슐화할 수 있다.

public class Quote {
    private final String shopName;
    private final doubleprice;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, DiscountCode.Code code) {
        this.shopaName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code code = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, code);
    }

    // getters
}
public class Discount {
    public enum Code {
        // 생략
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode()); 
    }

    public static double apply(double price, Code code) {
        delay();
        return (price * 100 - code.percentage) / 100;
    }
}

4-2. 할인 서비스 사용

순차적 동기 방식으로 findPrices 메서드 구현

public List<String> findPrices(String product) {
    return shops.stream()
                .map(shop -> shop.getPrice())   // 할인 전 가격
                .map(Quote::parse)              // Quote 객체로 변환
                .map(Discount::applyDiscount)   // Discount 서비스로 Quote에 할인 적용
                .collect(toList());
}
  1. 첫 번째 연산에서 각 상점을 요청한 제품의 가격과 할인코드로 변환한다.
  2. 두 번째 연산에서는 이들 문자열을 파싱해서 Quote 객체를 만든다.
  3. 세 번째 연산에서는 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열 반환.

순차적으로 다섯 상점에 가격 정보를 요청하기 때문에 5초가 소요되고, 할인 서비스에 접근하여 5초 총 약 10초가 소요된다.
병렬 스트림이 사용하는 스레드 풀의 크기가 고정되어 있어 상점수가 늘어날 때는 유연하게 대응할 수 없다.
따라서 CompletableFuture에서 수행하는 태스크를 설정할 수 있는 커스텀 Executor를 정의함으로써 CPU 사용을 극대화할 수 있다.

4.3 동기 작업과 비동기 작업 조합

CompletableFute에서 제공하는 기능으로 findPrices 메서드를 비동기적으로 재구현 하자.

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures = 
        shop.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote -> 
                    CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)))
            .collect(toList());

    return priceFutures.stream()
                        .map(CompletableFuture::join)
                        .collect(toList());
}

동기 작업과 비동기 작업 조합하기

가격 정보 얻기

팩토리 메서드 supplyAsync에 람다 표현식을 전달해서 비동기적으로 상저에서 정보를 조회했다.
첫 번째 변환의 결과는 Stream<CompletableFuture<String>>이다. 각 CompletableFuture는 작업이 끝났을 때 해당 상점에서 반환하는 문자열 정보를 포함한다.
그리고 이전에 개발한 커스텀 ExecutorCompletableFuture를 설정한다.

Quote 파싱하기

두 번째 변환 과정에서는 첫 번째 결과 문자열을 Quote로 변환한다. 파싱 동작에서는 원격 서비스나 I/O가 없으므로 원하는 즉시 지연 없이 동작을 수행할 수 있다.
따라서 첫 번째 과정에서 생성된 CompletableFuturethenApply 메서드를 호출한 다음에 문자열을 Quote 인스턴스로 변환하는 Function으로 전달한다.

thenApply 메서드는 CompletableFuture가 끝날 때까지 블록하지 않는다는 점을 주의해야 한다.
즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 전달된 람다 표현식을 적용할 수 있다.
따라서 CompletableFuture <String>을 CompletableFuture<Quote>로 변환할 것이다.
이는 CompletableFuture의 결과물로 무엇을 하지 지정하는 것과 같다.

CompletableFuture를 조합해서 할인된 가격 계산하기

세 번째 map 연산에서는 상점에서 받은 할인 전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용해야 한다.
이번에는 원격 실행이 포함되므로 이전 두 변환과 다르며 동기적으로 작업을 수행해야 한다.

람다 표현식으로 이 동작을 팩토리 메서드 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환된다.
결국 두 가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두 개의 비동기 동작을 만들 수 있다.

  • 상점에서 가격 정보를 얻어와서 Quote로 변환
  • 변환된 QuoteDiscount 서비스로 전달해서 할인된 최종가격 획

자바 8의 CompletableFuture API는 이와 같이 두 비동기 연산을 파이프라인으로 만들 수 있도록 thenCompose 메서드를 제공한다.
thenCompose 메서드는 첫 번째 연산의 결과를 두 번째 연산으로 전달한다.

세 개의 map 연산 결과 스트림의 요소를 리스트로 수집하면 List<CompletableFuture<String>> 형식의 자료를 얻을 수 있다.
마지막으로 CompletableFuture가 완료되기를 기다렸다가 join으로 값을 추출할 수 있다.
thenComposeAsync 또한 존재하지만, 첫 번째 CompletableFuture에 의존하므로 두 CompletableFuture를 하나로 조합하든 Async 버전의 메서드를 사용하든 최종 결과나 개괄적인 실행시간에는 영향을 미치지 않는다.
따라서 스레드 전환 오버헤드가 적게 발생하면서 효율성이 좀 더 좋은 thenCompose를 사용한다.

독립 CompletableFuture와 비독립 CompletableFuture 합치기

두 개의 CompletableFuture를 합쳐야 할 때 첫 번째 CompletableFuture의 동작완료와 상관없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.
이런 상황에서는 thenCombine 메서드를 사용한다. thenCombine 메서드는 BiFunction을 두 번째 인수로 받는다.
thenCombineAsync 버전이 존재한다. thenCombineAsync 메서드에서는 BiFunction이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행된다.

예를 들자면 한 온라인 상점에서 유로(EUR) 가격 정보를 제공하는데, 고객에게는 달러(USD) 가격을 보여줘야 한다.
주어진 상품의 가격을 요청하면서 환율 서비스를 비동기적으로 요청해야 한다.

Future<Double> futurePriceInUSD = 
    CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                     .thenCombine(
                         CompletableFuture.supplyAsync(
                            () -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate
                     );

독립적인 두 개의 비동기 태스크 합치기

4-5. Future의 리플렉션과 CompletableFuture의 리플렉션

CompletableFuture는 람다 표현식을 사용할 수 있기 때문에 다양한 동기, 비동기 태스크를 활용하여 복잡한 연산 수행 방법을 효과적으로 쉽게 정의할 수 있는 선언형 API를 만들 수 있다.
바로 위의 코드와 아래의 코드를 비교한다면 확연한 차이를 볼 수 있다.

ExcutorService executor = Executors.newCachedThreaPool();
final Future<Double> futureRate = executor.submit(new Callable<Double> () {
    public Double call() {
        return exchangeService.getRate(Money.EUR, Money,USD);
    }
});

Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
    public Double call() {
        double priceInEUR = shop.getPrice(product);
        return priceInEUR * futureRate.get();
    }
});

4-6. 타임아웃 효과적으로 사용하기

Future의 계산 결과를 읽을 때는 무한정 기다리는 상황이 발생할 수 있으므로 블록을 하지 않는 것이 좋다.
CompletableFuture에서 제공하는 기능을 통해 이런 문제를 해결할 수 있다.
orTimeout 메서드는 지정된 시간이 지난 후에 CompletableFutureTimeoutException으로 완료하면서 또 다른 CompletableFuture를 반환할 수 있도록 내부적으로 ScheduledThreadExecutor를 활용한다.

Future<Double> futurePriceInUSD = 
    CompletableFuture.supplyAsync(() -> shop.getPrice(product))
    .thenCombine(
        CompletableFuture.supplyAsync(
            () -> exchangeService.getRate(Money.EUR, Money.USD)),
        (price, rate) -> price * rate)
    .orTimeout(3, TimeUnit.SECONDS);

환전 서비스가 1초 안에 실행이 되어야 하지만 그렇지 못했을 때는 미리 정해둔 환율을 활용하도록 할 수 있다.

Future<Double> futurePriceInUSD = 
    CompletableFuture.supplyAsync(() -> shop.getPrice(product))
    .thenCombine(
        CompletableFuture.supplyAsync(
            () -> exchangeService.getRate(Money.EUR, Money.USD))
                        .completeOnTimeout(DEFAULT_RATE, 1 , TimeUnit.SECONDS), // 정해둔 환율로 계산
        (price, rate) -> price * rate)
    .orTimeout(3, TimeUnit.SECONDS);

completeOnTimeout 메서드는 CompletableFuture를 반환하므로 이 결과를 다른 CompletableFuture 메서드와 연결할 수 있다.

5. ComletableFuture의 종료에 대응하는 방

실제 서비스는 얼마만큼의 지연이 발생할지 모르기 때문에 임의의 지연으로 시뮬레이션한다.

private static final Random ramdom = new Random();

public static void randomDelay() {
    int delay = 500 + random.nextInd(2_000);
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

상점에서 가격 정보를 제공할 때까지 기다리지 않고 각 상점에서 가격 정보를 제공할 때마다 즉시 보여줄 수 있는 최저가격 검색 애플리케이션을 만들자.

일부 상점은 타임아웃이 발생할 수 있다.

5-1. 최저가격 검색 애플리케이션 리팩터

모든 가격 정보를 포함할 때까지 리스트 생성을 기다리지 않도록 프로그램을 고쳐야 한다.
그러려면 상점에 필요한 일련의 연산 실행 정보를 포함하는 CompletableFuture의 스트림을 직접 제어해야 한다.

public Stream<CompletableFuture<String>> findPricesStream(String product) {
    return shop.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
                                () -> Discount.applyDiscount(quote), executor)));
}

CompletableFuturethenAccept라는 메서드로 CompletableFuture에 동작을 등록한다.
등록된 동작은 CompletableFuture의 계산이 끝나면 값은 소비한다.

findPriceStream("sample").map(f -> f.thenAccept(System.out::println));

thenAcceptAsync 메서드는 새로운 스레드를 만들어서 Consumer를 실행한다.

thenAccept 메서드는 CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 지정했으므로 CompletableFuture<Void>를 반환한다.

CompletableFuture[] futures = findPriceStream("myPhone")
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]);

CompletableFuture.allOf(futures).join();

팩토리 메서드 allOfCompletableFuture 배열을 입력으로 받아 CompletableFuture<Void>를 반환한다.
따라서 allOf 메서드가 반환하는 CompletableFuturejoin을 호출하면 원래 스트림의 모든 CompletableFutrue의 실행 완료를 기다릴 수 있다.
이를 이용해서 애플리케이션은 '모든 상점이 결과를 반환했거나 타임아웃되었음' 같은 메세지를 사용자에게 보여줄 수 있다.

반면 배열의 CompletableFuture 중 하나의 작업이 끝나길 기다리는 상황도 있을 수 있다.
이때는 팩토리 메서드 anyOf를 사용한다. anyOf 메서드는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Object>를 반환한다.
CompletableFuture<Object>는 처음으로 완료한 CompletableFuture의 값으로 동작을 완료한다.

7. 마치며

  • 한 개 이산의 원격 외부 서비스를 사용하는 긴 동작을 실행할 때는 비동기 방식으로 애플리케이션의 성능과 반응성을 향상할 수 있다.
  • 우리 고객에게 비동기 API를 제공하는 것을 고려해야 한다. CompletableFuture의 기능을 이용하면 쉽게 비동기 API를 구현할 수 있다.
  • CompletableFuture를 이용할 때 비동기 태스크에서 발생한 에러를 관리하고 전달할 수 있다.
  • 동기 API를 CompletableFuture로 감싸서 비동기적으로 소비할 수 있다.
  • 서로 독립적인 비동기 동작이든 아니면 하나의 비동기 동작이 다른 비동기 동작의 결과에 의존하는 상황이든 여러 비등기 동작을 조립하고 조합할 수 있다.
  • CompletableFuture에 콜백을 등록해서 Future가 동작을 끝내고 결과를 생산했을 때 어떤 코드를 실행하도록 지정할 수 있다.
  • CompletableFuture 리스트의 모든 값이 완료될 때까지 기다릴지 아니면 첫 값만 완료되길 기다릴지 선택할 수 있다.
  • 자바 9에서는 orTimeout, completeOnTimeout 메서드로 CompletableFuture에 비동기 타임아웃 기능을 추가했다.

댓글