Reactor 써보기 (2)

오늘은 저번시간에 이어서 Reactor 써보기 (2)를 준비했다.

저번시간에는 Mono를 만드는 것을 배웠다. 필자도 아직까지는 써보지 않은 것들도 많이 존재한다. 대부분 쓰는 것들만 자주 쓰기에..

오늘은 저번시간에 이어서 Mono 오퍼레이터를 알아보도록 하자. 물론 다 알아보진 못할 거 같고(워낙 많아서..) 자주 사용될만한 것들 위주로 살펴 볼 예정이니 여기에 없는 것들은 문서를 찾아보면 되겠다.

map, flatMap, flatMapMany, filter

사실 이것들은 java8 에 나온 Stream API의 map 과 flatMap 과 사용법은 동일하다. 흔히 함수형 프로그래밍에서 말하는 functor, monad 라 하는 것들 처럼 의미하는 바도 동일하다. 이 부분을 더 알고 싶다면 함수형 프로그래밍을 공부하면 되겠다. 사실 필자도 잘..

이 오퍼레이션은 대부분 다 알고 있을거라 생각되기 때문에 간단한 사용법만 보고 넘어가자!

@Test
public void mapTest() {
    Mono.just("hello")
            .map(it -> it + " world")
            .subscribe(System.out::println);
}
@Test
public void flatMapTest() {
    Mono.just("hello")
            .flatMap(it -> Mono.just(it + " world"))
            .subscribe(System.out::println);
}
@Test
void filterTest() {
    Mono.just("filter")
            .filter(it -> it.equals("filter"))
            .subscribe(System.out::println);
}

사실 flatMapMany는 java의 Stream API 에는 존재하지는 않지만 flatMap과 동일하다.

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper)

Type 이 Mono 냐 Publisher냐의 차이이며 리턴타입도 Flux로 리턴한다.
flatMapMany 일경우에는 Mono, Flux 혹은 다른 reactive streams api 타입도 가능하다.

@Test
public void flatMapManyTest() {
    Mono.just("hello")
            .flatMapMany(it -> Flux.just("$it world", "$it world!!"))
            .subscribe(System.out::println);
}

java8의 Stream API를 사용했다면 그리 어려운 내용은 아닌 듯 싶다.
사실 이것들은 굉장히 자주 사용하는 메서드 중 하나다. 이것만 알아도? 대부분 무난하게 개발도 가능할 듯 싶다.
하지만 reactor 에서는 좀 더 우아한 방법의 메서드들이 존재하니 살펴보자.

zipWhen

위에서 간단하게 맛보기로 아마? 기존의 알던 오퍼레이터를 알아봤지만 이제부터는 reactor 만의 메서드를 알아보도록 하자.

이것은 필자가 실제로 map, flatMap 만큼 자주 사용하는 메서드중 하나다. flatMap가 사용법은 동일하지만 onNext 방출이 다르다. zip 이란 메서드에 걸맞게 묶어서 결과를 받을 수 있다.

@Test
public void zipWhenTest() {
    Mono.just("hello")
            .zipWhen(it -> Mono.just(it + " world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

필자는 타입을 보여주기 위함이지 생략해도 문제 없다.
출력은 다음과 같을 것이다.

hello, hello world

쉽게 생각해서 처음 Mono를 onNext 신호를 받을 수 있다. 만약 처음 만든 Mono 를 onNext 에서 사용해야 된다면 zipWhen 메서드를 사용하면 된다.

zipWith

zipWith 메서드는 사실 저번에 배운 Mono.zip 과 동작 방식은 동일하다. 이 역시 모노들이 각각 동작한다. 일단 예제를 살펴보자.

@Test
public void zipWithTest() {
    Mono.just("hello")
            .zipWith(Mono.just("world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

zip 과 동일하게 onNext 신호는 동일하다. 실제로 내부적으로는 Mono.zip 을 사용한다. 사실 필자는 이 메서드는 잘 사용하지 않는다. 그냥 Mono.zip() 을 더욱 선호하는 편이긴 하다. 만약 aggregating 하는 Mono 들이 여러개 늘어 난다면 사실 코드 보기가 좀 지저분해 보일 수도 있을 거 같아 그냥 zip 메서드를 사용한다. 사용하고 싶다면 위 예제처럼 두개의 모노만을 묶는다면 사용해도 괜찮을 듯 싶다.

만약 Mono 가 여러개라면 위와 같은 코드가 될 것 같다. 만약 더 많은 모노가 있다면 onNext 타입이 정말 복잡할 듯 싶다.

concatWith

이것은 메서드명 그대로 연결하는 메서드이다. 연결후 Flux 타입으로 리턴한다. 파라미터는 꼭 Mono타입이 아니더라도 가능하다.

public final Flux<T> concatWith(Publisher<? extends T> other)
@Test
public void concatWithTest() {
    Mono.just("hello")
            .concatWith(Mono.just("world"))
            .subscribe(System.out::println);
}

출력은 다음과 같다

hello
world

딱히 어려운 부분은 없다.

이 메서드는 순차적으로 진행되기 때문에 만약 병렬로 진행해도 된다면 아래의 mergeWith를 사용하면 된다.

mergeWith

concatWith 와 사용법은 동일하지만 이 메서드는 순차적이지 않다. concatWith 달리 각각 실행 후 합쳐서 onNext를 방출한다. 순차적으로 실행시킬 필요가 없을 경우 이 메서드를 사용하면 된다. 그렇기 때문에 순서는 보장하지 않는다. 이 역시 합치는 기능이므로 Flux 를 리턴한다.

public final Flux<T> mergeWith(Publisher<? extends T> other)

이 역시 파라미터 타입은 꼭 Mono 일 필요는 없다.

@Test
public void mergeWithTest() {
    Mono.just("hello")
            .mergeWith(Mono.just("world"))
            .subscribe(System.out::println);
}

필자의 경우 종종 mergeWith 는 사용했다. 아쉽게도 concatWith는 사용한적이 없다. 언젠가 필요할 때가 있겠지..

then

then 오퍼레이션은 몇가지 존재하는데 대부분 비슷하다. 조금씩 다르니 참고하면 되겠다.
아무 파라미터가 없는 경우 then()은 리턴 값은 Void 타입이다. 즉, onNext는 방출하지 않는 다는 뜻과 같다. onNext만 방출하지 않지 에러와 완료신호는 방출한다.

@Test
fun `then test`() {
    Mono.just("then").then()
        .subscribe({ println(it) }, {println("error")}) { println("complete") }
}

출력 결과는 보면 complete 이외엔 아무것도 출력 되지 않는다. 만약 error 가 방출 될경우에는 error 메시지가 출력 된다.
이것은 종종 운영에서도 사용했다. 예를들어 message 를 publisher 한다거나 혹은 consumer 할 때 주로 사용했다. 혹은 spring 의 EventListener를 사용할 때도 사용했다. 종종 쓸일이 있으니 알아두면 좋을 것 같다.

then 메서드에 파라미터를 받는 메스드가 존재한다. 메서드 형태는 다음과 같다.

public final <V> Mono<V> then(Mono<V> other)

해당 모노를 완료시킨후에 다른 모노를 연결하는 오퍼레이터이다. 이는 순차적이다. 이 메서드는 onNext를 방출 할 수 있다.

@Test
public void thenTest() {
    Mono.just("then1").then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}


위의 결과는 then2complete 가 출력된다. 만약 기존 모노가 empty 일지 라도 다음 모노는 실행 시킨다.

@Test
public void thenTest() {
    Mono.empty().then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위처럼 작성해도 동일하게 동작한다. 이 메서드 역시 운영에서 종종사용했다.
더 많은 메서드들이 있긴한데 간단하게 설명만 하고 넘어가도록 하자.

thenEmpty 는 Void 타입을 받는 Publisher이다. 즉, 실행만 시키고 onNext는 방출하지 않는다는 뜻이다. 내부적으로는 then(Mono other) 메서드를 호출한다.
thenReturn은 Publisher 타입이 아닌 T 타입을 받는 메서드이다. 내부적으로는 then(Mono other)를 호출한다.
thenMany는 Publisher 타입을 받는 메서드이다. then(Mono other)랑 비슷해보이지만 리턴 타입은 Flux를 리턴한다.

handle

handle 메서드는 좀 더 우아하게 onNext, 에러 혹은 완료 신호를 줄 수 있다. 글 보다 코드를 보는게 이해가 빠를 듯 싶다.

@Test
public void handleTest() {

    Mono.just("bar")
            .handle((it, sink) -> {
                if (it.equals("foo")) {
                    sink.next(it);
                } else if (it.equals("bar")) {
                    sink.error(new NullPointerException());
                } else {
                    sink.complete();
                }
            }).subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

}

위와 같이 특정 조건에 의해 next, error, complete 신호를 보낼 수 있다. 꼭 해당 타입과 sink의 제네릭 타입이 같지 않아도 된다.
foo, bar 혹은 그외의 것들을 넣어 한번씩 테스트해보는게 이해가 더욱 빠를 듯 싶다. 이 역시 운영에서 아주 많이 사용한다.

cast, ofType

cast 혹은 ofType을 써서 형 변환을 할 수 있다. cast 와 ofType 두 메서드 모두 타입 캐스팅을 할 수 있지만 조금 다르다. cast는 타입캐스팅 할 때 형이 맞지 않다면 에러가 발생하지만 ofType 의 경우에는 에러가 나지 않고 무시된다. ofType은 내부적으로 filter를 사용해 무시한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

둘다 출력 되는건 동일하다. 형변환이 잘 되었을 땐 동일하게 동작하지만 형변환이 안될 경우에는 조금 다르게 동작한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위와 같이 형 변환이 안됐을 시에는 cast 경우 에러가 출력 되며 ofType 일 경우에는 complete 만 출력 된다.

filterWhen

filter의 비동기 버전이다. filter는 다들 알다시피 걸러내는 작업을 하는 메서드이다. filter와 동일한 동작을 하니 예제만 보고 넘어가자.

@Test
public void filterWhenTest() {
    Mono.just("filterWhen")
            .filterWhen((it) -> Mono.just(true))
            .subscribe(System.out::println);
}

filter 와 동일한 동작은 하니 그리 어렵진 않다. 사실 이건 운영에서 써본적은 없다.

repeat

메서드명 그대로 반복하여 구독한다. repeat()는 여러 메서드가 존재한다. 예를들어 repeat() 만 사용할 경우에는 무한정으로 구독하는 시스템이다. 실제로 필요한지는 잘모르겠다. 특정 조건에 만족하면 반복하는 메서드도 존재한다.

public final Flux<T> repeat(BooleanSupplier predicate)

또 한 원하는 만큼 반복도 가능하다.

@Test
public void repeatTest() {
    Mono.just("repeat")
            .repeat(2)
            .subscribe(System.out::println);
}

위의 소스는 2번을 더 반복한다는 뜻이다. 실제로 3번의 구독이 이루어진다. 만약 0으로 지정을 했다면 구독은 한번만 이루어진다.
이외에도 사실 repeat** 메서드가 많긴하지만 필자는 사용한적이 없다. 만약 사용한다면 테스트할때 사용하지 않나 싶다. 운영에서 사용할일은 극히 드물 것 같다.

doOnSubscribe, doOnRequest, doOnNext, doOnCancel, doOnTerminate, doAfterTerminate, doOnSuccess, doOnError, doFinally, doOnEach

doOn** 시리즈는 trigger에 해당한다. 로직상으로는 많이 사용하지 않겠지만 로그를 찍을 때 사용하면 적절할 듯 싶다. 필자도 대부분 로그를 찍을 때 사용했다. 물론 다른 경우에도 사용하겠지만 아직까지는 그런일은 없었다.
일단 먼저 사용법 부터 보자.

@Test
void doOnSeries() {
    Mono.just("doOnSeries")
            .doOnSubscribe(it -> System.out.println("doOnSubscribe"))
            .doOnRequest(it -> System.out.println("doOnRequest"))
            .doOnNext(it -> System.out.println("doOnNext"))
            .doOnEach(it -> System.out.println("doOnEach"))
            .doOnCancel(() -> System.out.println("doOnCancel"))
            .doAfterTerminate(() -> System.out.println("doAfterTerminate"))
            .doOnTerminate(() -> System.out.println("doOnTerminate"))
            .doOnSuccess(it -> System.out.println("doOnSuccess"))
            .doOnError(it -> System.out.println("doOnError"))
            .doFinally(it -> System.out.println("doFinally"))
            .subscribe();
}

사용법은 위와 같이 간단하다. 각 메서드 별로 알아보자.
doOnSubscribe 메서드는 구독이 시작 될 트리거 된다. 위 메서드 중 가장 먼저 실행 된다. 파라미터로는 Subscription 가 넘어 온다.
doOnRequest 메서드는 요청 받을 떄 트리거 된다. 기본적으로 파라미터는 Long의 Long.MAX_VALUE 값이 넘어온다.
doOnNext는 성공적으로 데이터가 방출 될 때 트리거 된다. 파라미터로는 해당 T 타입이 넘어 온다.
doOnCancel 메서드는 구독이 취소 됐을 때 넘어오는 이벤트다 파라미터로는 아무것도 넘어오지 않는다.
doOnTerminate 메서드는 완료 혹은 에러가 났을 떄 트리거 된다. 이벤트시기는 완료, 에러 이벤트 전에 트리거 된다. 이 역시 파라미터로는 아무것도 넘어오지 않는다.
doAfterTerminate 메서드는 doOnTerminate 와 동일하게 트리거 되지만 시점이 조금 다르다. 완료, 에러 이벤트 후에 트리거 된다. 역시 파라미터는 없다.
doOnSuccess 완료 되면 트리거 된다. 파라미터는 해당 T 타입이 넘어 온다.
doOnError 에러가 났을 때 트리거 된다. 파라미터로는 Throwable 가 넘어 온다.
doFinally 에러, 취소, 완료 될 때 트리거 된다. 모노가 종료되면 무슨 일이든 트리거 된다. 파라미터로는 SignalType이 넘어와 종료 이벤트 타입을 받을 수 있다. 자바의 try catch finally의 finally 과 동일한 느낌이다.
doOnEach 메서드는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트다. 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다. 위의 코드에서는 doOnEach 가 두번 출력된다. 데이터를 방출 할때 한번, 완료 되었을 때 한번.

retry

메서드명 그대로 에러가 발생하였을 때 재시도 하는 메서드이다. 이 또한 많은 메서드가 존재한다. 기본 retry() 메서드는 무한정(Long.MAX_VALUE 사실 무한은 아닌 듯)으로 재시도를 한다.
retry(long) 은 retry 횟수를 지정할 수 있다. 만약 2로 지정하였을 경우는 2번을 재시도 한다.
retry(Predicate) 은 특정 조건에 만족하면 재시도를 시도 한다.

@Test
void retryTest() {
    Mono.just("retry")
            .handle((it, sink) -> {
                System.out.println(it);
                sink.error(new NullPointerException());
            }).retry(2)
            .subscribe();
}


위의 코드는 재시도를 2번 시도하는 코드이다. 출력은 retry 가 3번 출력된다. 한번은 처음 구독했을 때와 두번은 retry를 시도했을 때 총 3번 출력 된다.

retryBackoff

retryBackoff 는 기본 retry 보다 조금 우아하게 재시도를 할 수 있다, backoff 설정를 할 수 있어 특정 시간만큼 Duration을 줄 수 있다. 이 역시 메서드는 많다.

public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor)

하지만 위의 메서드를 설명하면 대부분의 메서드는 설명된다.

위의 retry 는 사실 큰 도움은 되지 않을 수 있다. 대기시간도 없어 실패하면 바로 다시 재시도를 하기 때문이다. 만약 동시에 모든 호출이 실패하면 바로 또 모든 호출을 동시에 날리기 때문이다. 어쩌면 운영에서는 retryBackoff 메서드를 쓰는게 좀 더 많은 도움이 되지 않을까 싶다.

retryBackoff의 첫번째 파라미터인 numRetries는 retry와 동일하게 재시도 횟수를 지정하면 된다.
두 번째 파라미터인 firstBackoff는 첫 번째 재시도 할 때의 대기 시간이다.
세 번째 파라미터 maxBackoff는 최대로 해당 시간만큼 대기 할 수 있다는 뜻 이다. 그러기 때문에 firstBackoff 보다는 작으면 안된다.
jitterFactor는 위에서 말했던 것 처럼 동시에 모든 호출이 실패하면 또 다시 모든 호출은 동시에 재시도 하기 때문에 이를 분산시키기 위한 값이다. 최소값은 0.0 이며 최대값은 1.0 이다. 지정 하지 않을 경우에는 0.5가 기본값이다.

궁금해할진 모르겠지만 해당 알고리즘은 다음과 같다. i는 재시도 한 값이다.

nextBackoff = firstbackoff * 2^i
jitterOffset = (nextBackoff * 100 * jitterFactor) / 100
lowBound = max(firstbackoff - nextBackoff, -jitterOffset)
highBound = min(maxBackoff - nextBackoff, jitterOffset)
jitter = random (lowBound, highBound)
backoff = nextBackoff + jitter

사용법은 대략 다음과 같다.

@Test
void retryBackoffTest() throws InterruptedException {
    Mono.just("retry")
            .handle((it, sink) -> sink.error(new NullPointerException()))
            .retryBackoff(2, Duration.ofSeconds(3), Duration.ofSeconds(10), 0.5)
            .subscribe();
    sleep()
}


retryBackoff 는 다른 쓰레드로 돌리기 때문에 보여주기 위해 sleep를 줬다.

delayElement

사실 Mono의 이 메서드는 테스트에서 사용했지 운영에선 사용해본적이 없다. 메서드명 그대로 delay 를 줄 수 있는 메서드이다.
사용법 부터 살펴보자.

@Test
void delayElement() throws InterruptedException {
    Mono.just("delayElement")
            .delayElement(Duration.ofSeconds(3))
            .doOnNext(System.out::println)
            .subscribe();
    sleep
}

이 역시 다른 스레드로 돌아가기 때문에 sleep 을 줬다. delayElement은 해당 시간만큼 onNext 방출을 지연시킨다. 실행해보면 3초 후에 delayElement 가 출력된다.

onErrorResume, onErrorReturn, onErrorMap, onErrorContinue

오늘 마지막으로 살펴볼 아이는 onError** 시리즈이다. 이 역시 여러 종류의 메서드들이 있지만 기본적인 내용은 살펴보고 나머지는 해당 메서드를 살펴보는 게 좋다. 그리 어렵지 않은 내용이니 말이다.

onErrorResume 메서드는 에러가 발생하였을 경우 다른 Mono로 대체 할 수 있다.

@Test
void onErrorResumeTest() {
    Mono.just("onErrorResume")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorResume(it -> Mono.just("foo"))
            .subscribe(System.out::println);
}

위의 코드는 foo가 출력 되는 것을 볼 수 있다.

onErrorReturn은 에러가 발생했을 경우 다른 값으로 대체 할 수 있다.

@Test
void onErrorReturnTest() {
    Mono.just("onErrorReturn")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorReturn("foo")
            .subscribe(System.out::println);
}

이 역시 foo가 출력 된다.

onErrorMap 다른 에러로 변환할 수 있다.

@Test
void onErrorMapTest() {
    Mono.just("onErrorMap")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorMap(it -> new IllegalArgumentException())
            .subscribe(System.out::println, System.out::println);
}

위 처럼 에러를 변환하고 싶다면 사용하면 된다.

onErrorContinue는 연산 과정에서 오류가 발생하였을 때 복구하는 메서드이다. 넘어오는 파라미터는 에러와 해당 object이 넘어온다.

@Test
void onErrorContinueTest() {
    Mono.just("onErrorContinue")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorContinue((it, object) -> { })
            .subscribe(System.out::println, System.out::println);
}

위는 오류가 복구되어 기존의 있던 값 onErrorContinue가 출력 된다.

오늘은 여기까지 배워보도록 하자. 우리가 공부한 이외에도 훨씬 더 많은 메서드가 존재한다. 필자도 대부분 사용하는 것만 사용하기에 필자도 모르는 메서드가 많다. 물론 언젠가는 사용할 날이 오겠지.. 그럼 그때 다시 공부하는 걸루..

오늘은 이렇게 Mono 의 오퍼레이터에 대해서 알아봤다. 사실 위의 메서드만 알아도 어느정도는 충분한 개발을 할 수 있다고 믿는다. 물론 다 알면 좋겠지만.. 만약 여기에 나오지 않은 오퍼레이터들은 나중에 Flux 시간에 나올 수 도 있으니 참고하면 되겠다.
다음시간에는 Flux에 대해 알아보도록 하자.

Reactor 써보기 (1)

오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다.

요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자.
하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다.

위의 글들은 한번 읽어보면 좋을 것 같다. 토비느님의 방송 역시 함께 보면 개념적으로 더욱 이해가 빨리 될 듯 싶다. 꽤 많은 시리즈가 있으니 시간 날 때 짬짬이 보면 도움이 확실이 된다.

사실 Reactive Streams 구현체인 Reactor나 RxJava2나 사용법은 대부분 비슷하다. 같은 개발자가 만든건 아니지만 여러 회사들이 협업을 하면서 만든거니 아무래도 비슷할 수 밖에 없을 것도 같다. 아마도 구현체들 대부분 사용법은 비슷하지 않을까 싶다?

필자는 아무래도 거의 대부분 Spring을 사용하기에 Reactor로 먼저 접하게 되었다. 안드로이드에서는 Reactor 보다는 RxJava를 더 선호하고 많이 쓰는 것 같다. 하지만 상관없다. Spring 에선 RxJava 나 Reactor 나 혹은 다른 Reactive Streams 구현 된 어느 것을 써도 좋다. 이건 조만간 다시 한번 다뤄보기로 하겠다.

그럼 한번 어떤 메서드들이 있는지 주로 사용될만한 메서드 위주로 한번 살펴보도록 하자.

일단 모노 부터 만들어보자!

Mono

위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.)
위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다.
쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다.

Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다.

아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다.
Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다.

이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다.

그럼 이제 본격적으로 모노를 만들어 보자!

kotlin, java 모두 예제에 넣어봤다.

just


Mono<String> just = Mono.just("hello reactor");

코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions 디펜더시를 추가 해야 된다.

아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.

fromSupplier

만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.


Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");

from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다.
from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.

error

에러를 만드는 방법이다.


Mono<String> error = Mono.error(new NullPointerException()); Mono<String> errorSupplier = Mono.error(NullPointerException::new);

구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.

empty, never

빈 모노와 무기한으로 실행되는 모노를 만든다. 사실 never는 사용한 적이 단 한번도 없다.


Mono<String> empty = Mono.empty(); Mono<String> never = Mono.never();

해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다.
never 는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다.

아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..

zip

모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을 aggregating 하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다.

방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.


Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));

기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.


@Test void zipTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zip(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); } private Mono<String> testMethod1() { return Mono.just("foo") .delayElement(Duration.ofSeconds(1)) .doOnNext(System.out::println); } private Mono<String> testMethod2() { return Mono.just("bar") .doOnNext(System.out::println); }

위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 foo 가 출력된 후에 bar 출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar가 출력되고 1초후 foo가 실행 된다.

위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.


Mono.zip(testMethod1(), testMethod2()) .subscribe((Tuple2<String,String> it) -> { });

필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.

when

when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.


@Test void whenTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.when(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

이 역시 zip과 동일하게 오류를 방출하면 즉시 종료 된다.

delay

메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.


@Test void delayTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.delay(Duration.ofSeconds(1)) .doOnNext(System.out::println) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

위의 코드는 1초 후에 onNext 로 방출한다.

defer

defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.


Mono<String> defer = Mono.defer(() -> Mono.just("foo"));

음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty 에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.


Mono.just("foo") .switchIfEmpty(Mono.just("bar")) .subscribe(); }

만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just(“bar”)를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.


Mono.just("foo") .switchIfEmpty(Mono.defer(() -> Mono.just("bar"))) .subscribe(); }

위와 같이 코드를 작성한다면 Mono.just(“bar”)는 정말로 모노가 비어있을 때만 실행 된다.

from

reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.


Mono.from(Flux.just(1,2,3,4,5)) .subscribe(System.out::println);

결과는 1만 방출 되며 종료 된다.
꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다.
아래는 RxJava 를 사용한 예제이다.


Mono.from(Single.just("bar").toFlowable()) .subscribe(System.out::println);

동일하게 bar 가 방출 되며 종료 된다.

***DelayError

에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.


@Test void delayErrorTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException())) .subscribe((it) -> {}, System.out::println, countDownLatch::countDown); countDownLatch.await(); }

위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다.
출력 결과는 다음과 같다.

foo
foo
reactor.core.Exceptions$CompositeException: Multiple exceptions

만약 zip을 사용했다면 NullPointerException 에러만 방출 한다.

여기에선 zip 만 설명했지만 when 도 동일하다.

create

Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다.
말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.


@Test void createTest() { Mono.create(sink -> { client.async(request, new Listener() { @Override public void onFailure(Exception e) { sink.error(e); } @Override public void onResponse(Response response) { sink.success(response); } }); }); }

위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다.
만약 리스너 해제 및 자원 해제는 onDispose 메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel 메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.

using

이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다.
이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다.
사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.


@Test void usingTest() { Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ), it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { sink.success(attachment); } @Override public void failed(Throwable exc, ByteBuffer attachment) { sink.error(exc); } })), it -> { try { it.close(); } catch (IOException e) { } }); }

조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 using를 사용하는 코드를 참고 하면 되겠다.

usingWhen

이 것 역시 using과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher 타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.

@Test
void usingWhenTest() {
    Mono<String> data = Mono.just("foo");
    Mono.usingWhen(data, it ->
            Mono.just(it),
            it -> transition.commit(),
            (it, error) ->  transition.rollback(),
            it-> transition.rollback())
}

예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다.

오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다.

다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.