오늘은 저번시간에 이어서 Reactor 써보기 (2)를 준비했다.

저번시간에는 Mono를 만드는 것을 배웠다. 필자도 아직까지는 써보지 않은 것들도 많이 존재한다. 대부분 쓰는 것들만 자주 쓰기에..

오늘은 저번시간에 이어서 Mono 오퍼레이터를 알아보도록 하자. 물론 다 알아보진 못할 거 같고(워낙 많아서..) 자주 사용될만한 것들 위주로 살펴 볼 예정이니 여기에 없는 것들은 문서를 찾아보면 되겠다.

map, flatMap, flatMapMany, filter

사실 이것들은 java8 에 나온 Stream API의 map 과 flatMap 과 사용법은 동일하다. 흔히 함수형 프로그래밍에서 말하는 functor, monad 라 하는 것들 처럼 의미하는 바도 동일하다. 이 부분을 더 알고 싶다면 함수형 프로그래밍을 공부하면 되겠다. 사실 필자도 잘..

이 오퍼레이션은 대부분 다 알고 있을거라 생각되기 때문에 간단한 사용법만 보고 넘어가자!

@Test
public void mapTest() {
    Mono.just("hello")
            .map(it -> it + " world")
            .subscribe(System.out::println);
}
@Test
public void flatMapTest() {
    Mono.just("hello")
            .flatMap(it -> Mono.just(it + " world"))
            .subscribe(System.out::println);
}
@Test
void filterTest() {
    Mono.just("filter")
            .filter(it -> it.equals("filter"))
            .subscribe(System.out::println);
}

사실 flatMapMany는 java의 Stream API 에는 존재하지는 않지만 flatMap과 동일하다.

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper)

Type 이 Mono 냐 Publisher냐의 차이이며 리턴타입도 Flux로 리턴한다.
flatMapMany 일경우에는 Mono, Flux 혹은 다른 reactive streams api 타입도 가능하다.

@Test
public void flatMapManyTest() {
    Mono.just("hello")
            .flatMapMany(it -> Flux.just("$it world", "$it world!!"))
            .subscribe(System.out::println);
}

java8의 Stream API를 사용했다면 그리 어려운 내용은 아닌 듯 싶다.
사실 이것들은 굉장히 자주 사용하는 메서드 중 하나다. 이것만 알아도? 대부분 무난하게 개발도 가능할 듯 싶다.
하지만 reactor 에서는 좀 더 우아한 방법의 메서드들이 존재하니 살펴보자.

zipWhen

위에서 간단하게 맛보기로 아마? 기존의 알던 오퍼레이터를 알아봤지만 이제부터는 reactor 만의 메서드를 알아보도록 하자.

이것은 필자가 실제로 map, flatMap 만큼 자주 사용하는 메서드중 하나다. flatMap가 사용법은 동일하지만 onNext 방출이 다르다. zip 이란 메서드에 걸맞게 묶어서 결과를 받을 수 있다.

@Test
public void zipWhenTest() {
    Mono.just("hello")
            .zipWhen(it -> Mono.just(it + " world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

필자는 타입을 보여주기 위함이지 생략해도 문제 없다.
출력은 다음과 같을 것이다.

hello, hello world

쉽게 생각해서 처음 Mono를 onNext 신호를 받을 수 있다. 만약 처음 만든 Mono 를 onNext 에서 사용해야 된다면 zipWhen 메서드를 사용하면 된다.

zipWith

zipWith 메서드는 사실 저번에 배운 Mono.zip 과 동작 방식은 동일하다. 이 역시 모노들이 각각 동작한다. 일단 예제를 살펴보자.

@Test
public void zipWithTest() {
    Mono.just("hello")
            .zipWith(Mono.just("world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

zip 과 동일하게 onNext 신호는 동일하다. 실제로 내부적으로는 Mono.zip 을 사용한다. 사실 필자는 이 메서드는 잘 사용하지 않는다. 그냥 Mono.zip() 을 더욱 선호하는 편이긴 하다. 만약 aggregating 하는 Mono 들이 여러개 늘어 난다면 사실 코드 보기가 좀 지저분해 보일 수도 있을 거 같아 그냥 zip 메서드를 사용한다. 사용하고 싶다면 위 예제처럼 두개의 모노만을 묶는다면 사용해도 괜찮을 듯 싶다.

만약 Mono 가 여러개라면 위와 같은 코드가 될 것 같다. 만약 더 많은 모노가 있다면 onNext 타입이 정말 복잡할 듯 싶다.

concatWith

이것은 메서드명 그대로 연결하는 메서드이다. 연결후 Flux 타입으로 리턴한다. 파라미터는 꼭 Mono타입이 아니더라도 가능하다.

public final Flux<T> concatWith(Publisher<? extends T> other)
@Test
public void concatWithTest() {
    Mono.just("hello")
            .concatWith(Mono.just("world"))
            .subscribe(System.out::println);
}

출력은 다음과 같다

hello
world

딱히 어려운 부분은 없다.

이 메서드는 순차적으로 진행되기 때문에 만약 병렬로 진행해도 된다면 아래의 mergeWith를 사용하면 된다.

mergeWith

concatWith 와 사용법은 동일하지만 이 메서드는 순차적이지 않다. concatWith 달리 각각 실행 후 합쳐서 onNext를 방출한다. 순차적으로 실행시킬 필요가 없을 경우 이 메서드를 사용하면 된다. 그렇기 때문에 순서는 보장하지 않는다. 이 역시 합치는 기능이므로 Flux 를 리턴한다.

public final Flux<T> mergeWith(Publisher<? extends T> other)

이 역시 파라미터 타입은 꼭 Mono 일 필요는 없다.

@Test
public void mergeWithTest() {
    Mono.just("hello")
            .mergeWith(Mono.just("world"))
            .subscribe(System.out::println);
}

필자의 경우 종종 mergeWith 는 사용했다. 아쉽게도 concatWith는 사용한적이 없다. 언젠가 필요할 때가 있겠지..

then

then 오퍼레이션은 몇가지 존재하는데 대부분 비슷하다. 조금씩 다르니 참고하면 되겠다.
아무 파라미터가 없는 경우 then()은 리턴 값은 Void 타입이다. 즉, onNext는 방출하지 않는 다는 뜻과 같다. onNext만 방출하지 않지 에러와 완료신호는 방출한다.

@Test
fun `then test`() {
    Mono.just("then").then()
        .subscribe({ println(it) }, {println("error")}) { println("complete") }
}

출력 결과는 보면 complete 이외엔 아무것도 출력 되지 않는다. 만약 error 가 방출 될경우에는 error 메시지가 출력 된다.
이것은 종종 운영에서도 사용했다. 예를들어 message 를 publisher 한다거나 혹은 consumer 할 때 주로 사용했다. 혹은 spring 의 EventListener를 사용할 때도 사용했다. 종종 쓸일이 있으니 알아두면 좋을 것 같다.

then 메서드에 파라미터를 받는 메스드가 존재한다. 메서드 형태는 다음과 같다.

public final <V> Mono<V> then(Mono<V> other)

해당 모노를 완료시킨후에 다른 모노를 연결하는 오퍼레이터이다. 이는 순차적이다. 이 메서드는 onNext를 방출 할 수 있다.

@Test
public void thenTest() {
    Mono.just("then1").then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}


위의 결과는 then2complete 가 출력된다. 만약 기존 모노가 empty 일지 라도 다음 모노는 실행 시킨다.

@Test
public void thenTest() {
    Mono.empty().then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위처럼 작성해도 동일하게 동작한다. 이 메서드 역시 운영에서 종종사용했다.
더 많은 메서드들이 있긴한데 간단하게 설명만 하고 넘어가도록 하자.

thenEmpty 는 Void 타입을 받는 Publisher이다. 즉, 실행만 시키고 onNext는 방출하지 않는다는 뜻이다. 내부적으로는 then(Mono other) 메서드를 호출한다.
thenReturn은 Publisher 타입이 아닌 T 타입을 받는 메서드이다. 내부적으로는 then(Mono other)를 호출한다.
thenMany는 Publisher 타입을 받는 메서드이다. then(Mono other)랑 비슷해보이지만 리턴 타입은 Flux를 리턴한다.

handle

handle 메서드는 좀 더 우아하게 onNext, 에러 혹은 완료 신호를 줄 수 있다. 글 보다 코드를 보는게 이해가 빠를 듯 싶다.

@Test
public void handleTest() {

    Mono.just("bar")
            .handle((it, sink) -> {
                if (it.equals("foo")) {
                    sink.next(it);
                } else if (it.equals("bar")) {
                    sink.error(new NullPointerException());
                } else {
                    sink.complete();
                }
            }).subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

}

위와 같이 특정 조건에 의해 next, error, complete 신호를 보낼 수 있다. 꼭 해당 타입과 sink의 제네릭 타입이 같지 않아도 된다.
foo, bar 혹은 그외의 것들을 넣어 한번씩 테스트해보는게 이해가 더욱 빠를 듯 싶다. 이 역시 운영에서 아주 많이 사용한다.

cast, ofType

cast 혹은 ofType을 써서 형 변환을 할 수 있다. cast 와 ofType 두 메서드 모두 타입 캐스팅을 할 수 있지만 조금 다르다. cast는 타입캐스팅 할 때 형이 맞지 않다면 에러가 발생하지만 ofType 의 경우에는 에러가 나지 않고 무시된다. ofType은 내부적으로 filter를 사용해 무시한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

둘다 출력 되는건 동일하다. 형변환이 잘 되었을 땐 동일하게 동작하지만 형변환이 안될 경우에는 조금 다르게 동작한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위와 같이 형 변환이 안됐을 시에는 cast 경우 에러가 출력 되며 ofType 일 경우에는 complete 만 출력 된다.

filterWhen

filter의 비동기 버전이다. filter는 다들 알다시피 걸러내는 작업을 하는 메서드이다. filter와 동일한 동작을 하니 예제만 보고 넘어가자.

@Test
public void filterWhenTest() {
    Mono.just("filterWhen")
            .filterWhen((it) -> Mono.just(true))
            .subscribe(System.out::println);
}

filter 와 동일한 동작은 하니 그리 어렵진 않다. 사실 이건 운영에서 써본적은 없다.

repeat

메서드명 그대로 반복하여 구독한다. repeat()는 여러 메서드가 존재한다. 예를들어 repeat() 만 사용할 경우에는 무한정으로 구독하는 시스템이다. 실제로 필요한지는 잘모르겠다. 특정 조건에 만족하면 반복하는 메서드도 존재한다.

public final Flux<T> repeat(BooleanSupplier predicate)

또 한 원하는 만큼 반복도 가능하다.

@Test
public void repeatTest() {
    Mono.just("repeat")
            .repeat(2)
            .subscribe(System.out::println);
}

위의 소스는 2번을 더 반복한다는 뜻이다. 실제로 3번의 구독이 이루어진다. 만약 0으로 지정을 했다면 구독은 한번만 이루어진다.
이외에도 사실 repeat** 메서드가 많긴하지만 필자는 사용한적이 없다. 만약 사용한다면 테스트할때 사용하지 않나 싶다. 운영에서 사용할일은 극히 드물 것 같다.

doOnSubscribe, doOnRequest, doOnNext, doOnCancel, doOnTerminate, doAfterTerminate, doOnSuccess, doOnError, doFinally, doOnEach

doOn** 시리즈는 trigger에 해당한다. 로직상으로는 많이 사용하지 않겠지만 로그를 찍을 때 사용하면 적절할 듯 싶다. 필자도 대부분 로그를 찍을 때 사용했다. 물론 다른 경우에도 사용하겠지만 아직까지는 그런일은 없었다.
일단 먼저 사용법 부터 보자.

@Test
void doOnSeries() {
    Mono.just("doOnSeries")
            .doOnSubscribe(it -> System.out.println("doOnSubscribe"))
            .doOnRequest(it -> System.out.println("doOnRequest"))
            .doOnNext(it -> System.out.println("doOnNext"))
            .doOnEach(it -> System.out.println("doOnEach"))
            .doOnCancel(() -> System.out.println("doOnCancel"))
            .doAfterTerminate(() -> System.out.println("doAfterTerminate"))
            .doOnTerminate(() -> System.out.println("doOnTerminate"))
            .doOnSuccess(it -> System.out.println("doOnSuccess"))
            .doOnError(it -> System.out.println("doOnError"))
            .doFinally(it -> System.out.println("doFinally"))
            .subscribe();
}

사용법은 위와 같이 간단하다. 각 메서드 별로 알아보자.
doOnSubscribe 메서드는 구독이 시작 될 트리거 된다. 위 메서드 중 가장 먼저 실행 된다. 파라미터로는 Subscription 가 넘어 온다.
doOnRequest 메서드는 요청 받을 떄 트리거 된다. 기본적으로 파라미터는 Long의 Long.MAX_VALUE 값이 넘어온다.
doOnNext는 성공적으로 데이터가 방출 될 때 트리거 된다. 파라미터로는 해당 T 타입이 넘어 온다.
doOnCancel 메서드는 구독이 취소 됐을 때 넘어오는 이벤트다 파라미터로는 아무것도 넘어오지 않는다.
doOnTerminate 메서드는 완료 혹은 에러가 났을 떄 트리거 된다. 이벤트시기는 완료, 에러 이벤트 전에 트리거 된다. 이 역시 파라미터로는 아무것도 넘어오지 않는다.
doAfterTerminate 메서드는 doOnTerminate 와 동일하게 트리거 되지만 시점이 조금 다르다. 완료, 에러 이벤트 후에 트리거 된다. 역시 파라미터는 없다.
doOnSuccess 완료 되면 트리거 된다. 파라미터는 해당 T 타입이 넘어 온다.
doOnError 에러가 났을 때 트리거 된다. 파라미터로는 Throwable 가 넘어 온다.
doFinally 에러, 취소, 완료 될 때 트리거 된다. 모노가 종료되면 무슨 일이든 트리거 된다. 파라미터로는 SignalType이 넘어와 종료 이벤트 타입을 받을 수 있다. 자바의 try catch finally의 finally 과 동일한 느낌이다.
doOnEach 메서드는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트다. 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다. 위의 코드에서는 doOnEach 가 두번 출력된다. 데이터를 방출 할때 한번, 완료 되었을 때 한번.

retry

메서드명 그대로 에러가 발생하였을 때 재시도 하는 메서드이다. 이 또한 많은 메서드가 존재한다. 기본 retry() 메서드는 무한정(Long.MAX_VALUE 사실 무한은 아닌 듯)으로 재시도를 한다.
retry(long) 은 retry 횟수를 지정할 수 있다. 만약 2로 지정하였을 경우는 2번을 재시도 한다.
retry(Predicate) 은 특정 조건에 만족하면 재시도를 시도 한다.

@Test
void retryTest() {
    Mono.just("retry")
            .handle((it, sink) -> {
                System.out.println(it);
                sink.error(new NullPointerException());
            }).retry(2)
            .subscribe();
}


위의 코드는 재시도를 2번 시도하는 코드이다. 출력은 retry 가 3번 출력된다. 한번은 처음 구독했을 때와 두번은 retry를 시도했을 때 총 3번 출력 된다.

retryBackoff

retryBackoff 는 기본 retry 보다 조금 우아하게 재시도를 할 수 있다, backoff 설정를 할 수 있어 특정 시간만큼 Duration을 줄 수 있다. 이 역시 메서드는 많다.

public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor)

하지만 위의 메서드를 설명하면 대부분의 메서드는 설명된다.

위의 retry 는 사실 큰 도움은 되지 않을 수 있다. 대기시간도 없어 실패하면 바로 다시 재시도를 하기 때문이다. 만약 동시에 모든 호출이 실패하면 바로 또 모든 호출을 동시에 날리기 때문이다. 어쩌면 운영에서는 retryBackoff 메서드를 쓰는게 좀 더 많은 도움이 되지 않을까 싶다.

retryBackoff의 첫번째 파라미터인 numRetries는 retry와 동일하게 재시도 횟수를 지정하면 된다.
두 번째 파라미터인 firstBackoff는 첫 번째 재시도 할 때의 대기 시간이다.
세 번째 파라미터 maxBackoff는 최대로 해당 시간만큼 대기 할 수 있다는 뜻 이다. 그러기 때문에 firstBackoff 보다는 작으면 안된다.
jitterFactor는 위에서 말했던 것 처럼 동시에 모든 호출이 실패하면 또 다시 모든 호출은 동시에 재시도 하기 때문에 이를 분산시키기 위한 값이다. 최소값은 0.0 이며 최대값은 1.0 이다. 지정 하지 않을 경우에는 0.5가 기본값이다.

궁금해할진 모르겠지만 해당 알고리즘은 다음과 같다. i는 재시도 한 값이다.

nextBackoff = firstbackoff * 2^i
jitterOffset = (nextBackoff * 100 * jitterFactor) / 100
lowBound = max(firstbackoff - nextBackoff, -jitterOffset)
highBound = min(maxBackoff - nextBackoff, jitterOffset)
jitter = random (lowBound, highBound)
backoff = nextBackoff + jitter

사용법은 대략 다음과 같다.

@Test
void retryBackoffTest() throws InterruptedException {
    Mono.just("retry")
            .handle((it, sink) -> sink.error(new NullPointerException()))
            .retryBackoff(2, Duration.ofSeconds(3), Duration.ofSeconds(10), 0.5)
            .subscribe();
    sleep()
}


retryBackoff 는 다른 쓰레드로 돌리기 때문에 보여주기 위해 sleep를 줬다.

delayElement

사실 Mono의 이 메서드는 테스트에서 사용했지 운영에선 사용해본적이 없다. 메서드명 그대로 delay 를 줄 수 있는 메서드이다.
사용법 부터 살펴보자.

@Test
void delayElement() throws InterruptedException {
    Mono.just("delayElement")
            .delayElement(Duration.ofSeconds(3))
            .doOnNext(System.out::println)
            .subscribe();
    sleep
}

이 역시 다른 스레드로 돌아가기 때문에 sleep 을 줬다. delayElement은 해당 시간만큼 onNext 방출을 지연시킨다. 실행해보면 3초 후에 delayElement 가 출력된다.

onErrorResume, onErrorReturn, onErrorMap, onErrorContinue

오늘 마지막으로 살펴볼 아이는 onError** 시리즈이다. 이 역시 여러 종류의 메서드들이 있지만 기본적인 내용은 살펴보고 나머지는 해당 메서드를 살펴보는 게 좋다. 그리 어렵지 않은 내용이니 말이다.

onErrorResume 메서드는 에러가 발생하였을 경우 다른 Mono로 대체 할 수 있다.

@Test
void onErrorResumeTest() {
    Mono.just("onErrorResume")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorResume(it -> Mono.just("foo"))
            .subscribe(System.out::println);
}

위의 코드는 foo가 출력 되는 것을 볼 수 있다.

onErrorReturn은 에러가 발생했을 경우 다른 값으로 대체 할 수 있다.

@Test
void onErrorReturnTest() {
    Mono.just("onErrorReturn")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorReturn("foo")
            .subscribe(System.out::println);
}

이 역시 foo가 출력 된다.

onErrorMap 다른 에러로 변환할 수 있다.

@Test
void onErrorMapTest() {
    Mono.just("onErrorMap")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorMap(it -> new IllegalArgumentException())
            .subscribe(System.out::println, System.out::println);
}

위 처럼 에러를 변환하고 싶다면 사용하면 된다.

onErrorContinue는 연산 과정에서 오류가 발생하였을 때 복구하는 메서드이다. 넘어오는 파라미터는 에러와 해당 object이 넘어온다.

@Test
void onErrorContinueTest() {
    Mono.just("onErrorContinue")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorContinue((it, object) -> { })
            .subscribe(System.out::println, System.out::println);
}

위는 오류가 복구되어 기존의 있던 값 onErrorContinue가 출력 된다.

오늘은 여기까지 배워보도록 하자. 우리가 공부한 이외에도 훨씬 더 많은 메서드가 존재한다. 필자도 대부분 사용하는 것만 사용하기에 필자도 모르는 메서드가 많다. 물론 언젠가는 사용할 날이 오겠지.. 그럼 그때 다시 공부하는 걸루..

오늘은 이렇게 Mono 의 오퍼레이터에 대해서 알아봤다. 사실 위의 메서드만 알아도 어느정도는 충분한 개발을 할 수 있다고 믿는다. 물론 다 알면 좋겠지만.. 만약 여기에 나오지 않은 오퍼레이터들은 나중에 Flux 시간에 나올 수 도 있으니 참고하면 되겠다.
다음시간에는 Flux에 대해 알아보도록 하자.