CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
동시성을 구현하는 자바 지원의 진화
- 처음
Runnable
,Thread
를 동기화된 클래스와 메서드를 이용해 잠갔다.
- Java5
ExecutorService
인터페이스,Callable<T>
,Future<T>
- Java7
- 분할 정복, 포크/조인을 지원하는
java.util.concurrent.RecursiveTask
- 분할 정복, 포크/조인을 지원하는
- Java8
- 스트림, 람다에 기반한 병렬 프로세싱
- Java9
- 분산 비동기 프로그래밍을 명시적으로 지원
- 발행-구독 프로토콜(
Flow
API)
Executor와 스레드 풀
자바5는 Executor
프레임워크와 스레드 풀을 통해 스레드의 성능을 고도화 함.
스레드의 문제
스레드를 만들고 종료하는 비용은 비싸다.
따라서 기존 스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나서는 안된다.
스레드 풀이 좋은 이유
ExecutorService
는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다.newFixedThreadPool
같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다.
ExecutorService newFixedThreadPool(int nThreads)
- 워커 스레드라고 하는 nThreads를 포함하는
ExecutorService
를 만들고 이들을 스레드 풀에 저장한다. - 스레드 풀에서 사용하지 않은 스레드로 제출된 테스크를 먼저 온 순서대로 실행 후 종료되면 스레드를 스레드 풀에 반납한다.
- 장점은 하드웨어 수에 맞는 테스크 수를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다는 점이다.
스레드 풀이 나쁜 이유
주의해야 할 두 가지 사항
- 블록할 수 있는 태스크는 스레드 풀에 제출하지 말아야 한다.
- 프로그램을 종료하기 전에 스레드 풀을 종료하는 슴관을 갖는 것이 중요하다.
동기 API, 비동기 API
int f(int x)
int g(int x)
위의 두 메서드의 결과를 합하는 예제
int a = f(x);
int b = g(x);
System.out.println(a + b);
f, g를 실행하는데 오랜 시간이 걸린다고 가정한다면 병렬처리할 수 있지만 코드가 복잡해진다.
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
int x = 1337;
Result result = new Result();
Thread t1 = new Thread(() -> result.left = f(x));
Thread t2 = new Thread(() -> result.right = g(x));
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(result.left + result.right);
}
private static int f(int x) {
return x;
}
private static int g(int x) {
return x;
}
static class Result {
private int left;
private int right;
}
}
Runnable
대신 Future
인터페이스를 이용해 더 단순화 할 수 있다.
public class ExecutorServiceExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int x = 1337;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> y = executorService.submit(() -> f(x));
Future<Integer> z = executorService.submit(() -> g(x));
System.out.println(y.get() + z.get());
executorService.shutdown();
}
private static int f(int x) {
return x;
}
private static int g(int x) {
return x;
}
}
하지만 submit()
호출 등 불필요한 코드로 오염되었다.
비동기 API로 해결 할 수 있다.
- 자바5
Future
-> 자바8CompletableFuture
- 발행-구독 프로토콜에 기반한 자바9
java.util.concurrent.Flow
인터페이스
Future 형식 API
Future<Integer> f(int x);
Future<Integer> g(int x);
Future<Integer> y = f(x);
Future<Integer> z = g(x);
System.out.println(y.get() + z.get());
메서드 f, g는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future
반환하고 두 Future
가 완료되어 결과가 합쳐지길 기다린다.
또는 g는 유지하고 f에만 Future를 적용할 수도 있다.
하지만 더 큰 프로그램에서는 두 가지 이유로 이런 방식을 사용하지 않는다.
- API 형식을 통일하는 것이 바람직하다.
- 병렬 하드웨어로 프로그램 실행속도를 극대화하기 위해서는 여러 작고 합리적인 크기의 태스크로 나누는 것이 좋다.
리액티브 형식 API
f, g의 시그니처를 바꿔 콜백 형식의 프로그래밍을 이용하는 것이다.
void f(int x, IntConsumer dealWithResult);
결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것이다.
public class CallbackStyleExample {
public static void main(String[] args) {
int x = 1337;
Result result = new Result();
f(x, y -> {
result.left = y;
System.out.println(result.left + result.right);
});
g(x, y -> {
result.left = y;
System.out.println(result.left + result.right);
});
}
private static void f(int x, IntConsumer dealWithResult) {
...
}
private static void g(int x, IntConsumer dealWithResult) {
...
}
private static class Result {
int left;
int right;
}
}
f와 g의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력한다. 락을 사용하지 않으므로 값을 두 번 출력하거나 println호출 전에 값이 변할 수 있다.
두 가지 방법으로 문제를 보완할 수 있다.
- if-else를 이용해 적절한 락을 이용하여 콜백 호출 확인
- Future 이용
잠자기는 해로운 것으로 간주
스레드는 잠들어도(sleep()
) 여전히 시스템 자원을 점유한다.
스레드 풀에서 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다.
블록 동작도 마찬가지이다. 다른 태스크가 어떤 동작을 완료하기를 기다리는 것 & 외부 상호작용(네트워크, DB, 키보드 입력 등)
이 상황에서는 태스크에서 기다리는 일을 만들지 않거나 코드에서 예외를 발생하여 이를 처리
work1();
Thread.sleep(10_000); // 10초
work2();
public class ScheduledExecutorServiceExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
work1();
// work1이 끝난 10초 뒤 work2를 개별 태스크로 스케줄 함.
scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
}
private static void work1() {
System.out.println("work1");
}
private static void work2() {
System.out.println("work2");
}
}
두 코드 다 같은 동작을 수행하지만 1번 코드는 sleep 시 자원을 점유하지만 2번 코드는 스레드가 사용되지 않고 메모리만 조금 더 사용한다.
태스크를 블록하는 것 보다 다음 작업을 태스크로 제출하고 현재 태스크는 종료하는 것이 좋다.
비동기 API에서 예외 처리
Future
를 구현한 CompletableFuture
에서는 런타임 get()
메서드에 예외를 처리할 수 있는 기능을 제공하며 예외에서 회복할 수 있도록 exceptionally()
같은 메서드도 제공한다.
리액티브 형식의 비동기 API 에서는 return 대신 기존 콜백이 호출되므로 예외가 발생했을 때 실행될 추가 콜백을 만들어 인터페이스를 바꿔야 한다.
void f(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);
콜백이 여러 개면 한 객체로 메서드를 감싼다. Flow
API는 여러 콜백을 한 객체(Subscriber<T>
)로 감싼다.
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
박스와 채널 모델
동시성 모델을 가장 잘 설계하고 개념화하려면 그림이 필요하다. 이 기법을 박스와 채널 모델box-and-channel model이라고 한다.
// 병렬성 X
int t = p(x);
println(r(q1(t), q2(t)));
// 박스와 채널 다이어그램의 모양상 p와 r을 Future로 감싸지 않았다.
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
println(r(a1.get(), a2.get()));
자바8에서는 CompletableFuture
, 콤비네이터를 이용해 문제를 해결한다.
CompletableFuture와 콤비네이터를 이용한 동시성
CompletableFuture
는 실행할 코드 없이 Future
를 만들 수 있고, complete()
메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고, get()
으로 값을 얻을 수 있도록 허용한다.
public class CFComplete {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> a.complete(f(x)));
int b = g(x);
System.out.println(a.get() + b);
executorService.shutdown();
}
private static int f(int x) {
return 0;
}
private static int g(int x) {
return 0;
}
}
f가 끝나지 않는다면 get()
을 가다려야 하므로 프로세싱 자원 낭비 가능성이 있다.CompletableFuture
를 사용하면 해결할 수 있다. thenCombine()
메서드를 사용함으로써 두 연산 결과를 효과적으로 사용할 수 있다.
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T, U, V> fn)
이 메서드는 두 개의 CompletableFuture
ㄱ밧을 받아 한개의 새 값을 만든다.
처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록하지 않은 상태로 결과 Future
를 반환한다.
public class CFCombine {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
CompletableFuture<Integer> c = a.thenCombine(b, Integer::sum);
executorService.submit(() -> a.complete(f(x)));
executorService.submit(() -> b.complete(g(x)));
System.out.println(c.get());
executorService.shutdown();
}
private static Integer f(int x) {
return 0;
}
private static Integer g(int x) {
return 0;
}
}
Future a
, Future b
의 결과를 알지 못한 상태에서 thenCombine()
은 두 연산이 끝났을 때 스레드 풀에서 실행된 연산을 만든다.
결과를 추가하는 세 번째 연산 c
는 두 작이 끝날 때까지는 스레드에서 실행되지 않는다.
발행-구독 그리고 리액티브 프로그래밍
Future
는 한 번만 실행해 결과를 제공한다.
반만 리액티브 프로그래밍은 시간이 흐르면서 Future
같은 객체를 통해 여러 결과를 제공한다.
자바 9에서는 java.util.concurrent.Flow
인터페이스에 발행-구독 모델(pub-sub protocol)을 적용해 리액티브 프로그래밍을 제공한다.
다음 세 가지로 Flow API
를 정리할 수 있다.
- 구독자가 구독할 수 있는 발행자
- 이 연결을 구독(subscription)이라고 한다.
- 이 연결을 이용해 메세지(또는 이벤트)를 전송한다.
두 Flow를 합치는 예제
스프레드 시트 셀을 통해 발행-구독의 특징을 간단하게 확인할 수 있다.
public class SimpleCell {
private int value;
private String name;
public SimpleCell(String name) {
value = 0;
this.name = name;
}
}
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1이나 c2의 값이 바뀌었을 때 c3가 두 값을 더하도록 하려면 c1과 c2에 이벤트가 발생했을 때 c3를 구독하도록 만들어야 한다.
다음과 같은 Publisher<T>
가 필요하다,
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Subscriber<T>
인터페이스는 onNext(T t)
라는 정보를 전달하는 단순한 메서드를 포함한다.
public interface Subscriber<T> {
void onNext(T t);
}
Cell
은 Publisher
이며 동시에 Subscriber
이다.
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
private int value;
private String name;
private List<Subscriber> subscribers;
public SimpleCell(String name) {
value = 0;
this.name = name;
subscribers = new ArrayList<>();
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscribers.add(subscriber);
}
private void notifyAllSubscribers() {
subscribers.forEach(subscriber -> subscriber.onNext(this.value));
}
@Override
public void onNext(Integer newValue) {
this.value = newValue;
System.out.println(this.name + ": " + this.value);
notifyAllSubscribers();
}
}
SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C1");
SimpleCell c1 = new SimpleCell("C2");
c1.subscribe(c3);
c1.onNext(10);
c2.onNext(20);
C1: 10
C3: 10
C2: 20
C3=C1+C2
는 왼쪽 오른쪽의 연산 결과를 저장할 수 있는 별도의 클래스가 필요하다.
public class ArithmeticCell extends SimpleCell {
private int left;
private int right;
public ArithmeticCell(String name) {
super(name);
}
public void setLeft(int left) {
this.left = left;
super.onNext(left + this.right);
}
public void setRight(int right) {
this.right = right;
super.onNext(this.left + right);
}
}
SimpleCell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3);
c1.onNext(10);
c2.onNext(20);
C1: 10
C3: 10
C2: 20
C3: 30
C1: 15
C3: 35
C1의 값이 15로 갱신되었을 때 C3이 즉시 반응해 자신의 값을 갱신한다.
발행자-구독자 상호작용의 또 한 가지 특징은 발행자 그래프를 설정할 수 있다.C5=C4+C3
처럼 C3과 C4에 의존하는 C5를 만들 수 있다.
ArithmeticCell c5 = new ArithmeticCell("C5");
ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c4 = new SimpleCell("C4");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);
c3.subscribe(c5::setLeft);
c4.subscribe(c5::setRight);
c1.onNext(10);
c2.onNext(20);
c1.onNext(15);
c4.onNext(1);
c4.onNext(3);
C1: 10
C3: 10
C5: 10
C2: 20
C3: 30
C5: 30
C1: 15
C3: 35
C5: 35
C4: 1
C5: 36
C4: 3
C5: 38
데이터가 발행자에서 구독자로 흐름에 착안해 개발자는 이를 **업스트림** 또는 **다운스트림**이라고 부른다. 위 예제에서 데이터 `newValue`는 업스트림 `onNext()` 메서드로 전달되고 `notifyAllSubscribers()` 호출을 통해 다운스트림 `onNext()` 호출로 전달된다.
역압력의 간단한 형태
Subscriber
가onSubsribe
로 전달된Subscription
객체를subscription
같은 필드에 로컬로 저장한다.Subscriber
가 수 많은 이벤트를 받지 않도록onSubscribe
,onNext
,onError
의 마지막 동작에channel.request(1)
을 추가해 오직 한 이벤트만 요청한다.- 요청을 보낸 채널에만
onNext
,onError
이벤트를 보내도록Publisher
의notifyAllScribers
코드를 바꾼다.
구현이 간단해 보일 수 있지만, 역압력을 구현하려면 여러가지 장단점을 생각해야 한다.
- 여러
Subscriber
가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각Subscriber
에게 보내지 않는 데이터를 저장할 별도의 큐를 가질 것인가? - 큐가 너무 커진다면?
Subscriber
가 준비 안되었다면 큐의 데이터를 폐기할 것 인가?
위의 답변은 데이터의 성격에 따라 달라진다.
은행 시스템에서 거래내역은 잃어버리면 안되지만, 온도 데이터는 크게 중요하지 않을 것이다.
리액티브 시스템 vs 리액티브 프로그래밍
- 리액티브 시스템
- 런타임 환경이 변화에 대응하도록 전체 아키텍쳐가 설계된 프로그램
- 리액티브 시스템이 가져야 할 세 가지 속성
- 반응성: 큰 작업을 처리하느라 간단한 질의응답을 지연하지 않고 실시간으로 입력에 반응하는 것.
- 회복성: 한 컴포넌트의 실패로 전체 시스템이 실패하지 않음
- 탄력성: 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함.
Flow
인터페이스 같은 리액티브 프로그래밍 형식을 이용해 이런 속성을 구현할 수 있다.
마치며
- 스레드 풀은 보통 유용하지만, 블록되는 태스크가 많아지면 문제가 생긴다.
- 메서드를 비동기로 만들면 병렬성을 추가할 수 있으며 부수적으로 루프를 최적화한다.
- 박스와 채널 모델을 이용해 비동기 시스템을 시각화 할 수 있다.
- 자바 8
CompletableFuture
클래스와 자바 9Flow API
모두 박스 채널 다이어그램으로 표현할 수 있다. CompletableFuture
클래스는 한 번의 비동기 연산을 표현한다. 콤비네이터로 비동기 연산을 조합함으로Future
를 이용할 때 발생했던 기존의 블로킹 문제를 해결할 수 있다.Flow API
는 발행-구독 프로토콜, 역압력을 이용하면 자바의 리액티브 프로그래밍의 기초를 제공한다.- 리액티브 프로그래밍을 이용해 리액티브 시스템을 구현할 수 있다.
참고
'Java > 모던 자바 인 액션' 카테고리의 다른 글
Java Optional (0) | 2023.06.14 |
---|---|
[모던 자바 인 액션] 리액티브 프로그래밍 (0) | 2023.02.04 |
[모던 자바 인 액션] CompletableFuture : 안정적인 비동기 프로그래밍 (0) | 2023.01.20 |
댓글