Reactor 써보기 (2)

오늘은 저번시간에 이어서 Reactor 써보기 (2)를 준비했다.

저번시간에는 Mono를 만드는 것을 배웠다. 필자도 아직까지는 써보지 않은 것들도 많이 존재한다. 대부분 쓰는 것들만 자주 쓰기에..

오늘은 저번시간에 이어서 Mono 오퍼레이터를 알아보도록 하자. 물론 다 알아보진 못할 거 같고(워낙 많아서..) 자주 사용될만한 것들 위주로 살펴 볼 예정이니 여기에 없는 것들은 문서를 찾아보면 되겠다.

map, flatMap, flatMapMany, filter

사실 이것들은 java8 에 나온 Stream API의 map 과 flatMap 과 사용법은 동일하다. 흔히 함수형 프로그래밍에서 말하는 functor, monad 라 하는 것들 처럼 의미하는 바도 동일하다. 이 부분을 더 알고 싶다면 함수형 프로그래밍을 공부하면 되겠다. 사실 필자도 잘..

이 오퍼레이션은 대부분 다 알고 있을거라 생각되기 때문에 간단한 사용법만 보고 넘어가자!

@Test
public void mapTest() {
    Mono.just("hello")
            .map(it -> it + " world")
            .subscribe(System.out::println);
}
@Test
public void flatMapTest() {
    Mono.just("hello")
            .flatMap(it -> Mono.just(it + " world"))
            .subscribe(System.out::println);
}
@Test
void filterTest() {
    Mono.just("filter")
            .filter(it -> it.equals("filter"))
            .subscribe(System.out::println);
}

사실 flatMapMany는 java의 Stream API 에는 존재하지는 않지만 flatMap과 동일하다.

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper)

Type 이 Mono 냐 Publisher냐의 차이이며 리턴타입도 Flux로 리턴한다.
flatMapMany 일경우에는 Mono, Flux 혹은 다른 reactive streams api 타입도 가능하다.

@Test
public void flatMapManyTest() {
    Mono.just("hello")
            .flatMapMany(it -> Flux.just("$it world", "$it world!!"))
            .subscribe(System.out::println);
}

java8의 Stream API를 사용했다면 그리 어려운 내용은 아닌 듯 싶다.
사실 이것들은 굉장히 자주 사용하는 메서드 중 하나다. 이것만 알아도? 대부분 무난하게 개발도 가능할 듯 싶다.
하지만 reactor 에서는 좀 더 우아한 방법의 메서드들이 존재하니 살펴보자.

zipWhen

위에서 간단하게 맛보기로 아마? 기존의 알던 오퍼레이터를 알아봤지만 이제부터는 reactor 만의 메서드를 알아보도록 하자.

이것은 필자가 실제로 map, flatMap 만큼 자주 사용하는 메서드중 하나다. flatMap가 사용법은 동일하지만 onNext 방출이 다르다. zip 이란 메서드에 걸맞게 묶어서 결과를 받을 수 있다.

@Test
public void zipWhenTest() {
    Mono.just("hello")
            .zipWhen(it -> Mono.just(it + " world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

필자는 타입을 보여주기 위함이지 생략해도 문제 없다.
출력은 다음과 같을 것이다.

hello, hello world

쉽게 생각해서 처음 Mono를 onNext 신호를 받을 수 있다. 만약 처음 만든 Mono 를 onNext 에서 사용해야 된다면 zipWhen 메서드를 사용하면 된다.

zipWith

zipWith 메서드는 사실 저번에 배운 Mono.zip 과 동작 방식은 동일하다. 이 역시 모노들이 각각 동작한다. 일단 예제를 살펴보자.

@Test
public void zipWithTest() {
    Mono.just("hello")
            .zipWith(Mono.just("world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

zip 과 동일하게 onNext 신호는 동일하다. 실제로 내부적으로는 Mono.zip 을 사용한다. 사실 필자는 이 메서드는 잘 사용하지 않는다. 그냥 Mono.zip() 을 더욱 선호하는 편이긴 하다. 만약 aggregating 하는 Mono 들이 여러개 늘어 난다면 사실 코드 보기가 좀 지저분해 보일 수도 있을 거 같아 그냥 zip 메서드를 사용한다. 사용하고 싶다면 위 예제처럼 두개의 모노만을 묶는다면 사용해도 괜찮을 듯 싶다.

만약 Mono 가 여러개라면 위와 같은 코드가 될 것 같다. 만약 더 많은 모노가 있다면 onNext 타입이 정말 복잡할 듯 싶다.

concatWith

이것은 메서드명 그대로 연결하는 메서드이다. 연결후 Flux 타입으로 리턴한다. 파라미터는 꼭 Mono타입이 아니더라도 가능하다.

public final Flux<T> concatWith(Publisher<? extends T> other)
@Test
public void concatWithTest() {
    Mono.just("hello")
            .concatWith(Mono.just("world"))
            .subscribe(System.out::println);
}

출력은 다음과 같다

hello
world

딱히 어려운 부분은 없다.

이 메서드는 순차적으로 진행되기 때문에 만약 병렬로 진행해도 된다면 아래의 mergeWith를 사용하면 된다.

mergeWith

concatWith 와 사용법은 동일하지만 이 메서드는 순차적이지 않다. concatWith 달리 각각 실행 후 합쳐서 onNext를 방출한다. 순차적으로 실행시킬 필요가 없을 경우 이 메서드를 사용하면 된다. 그렇기 때문에 순서는 보장하지 않는다. 이 역시 합치는 기능이므로 Flux 를 리턴한다.

public final Flux<T> mergeWith(Publisher<? extends T> other)

이 역시 파라미터 타입은 꼭 Mono 일 필요는 없다.

@Test
public void mergeWithTest() {
    Mono.just("hello")
            .mergeWith(Mono.just("world"))
            .subscribe(System.out::println);
}

필자의 경우 종종 mergeWith 는 사용했다. 아쉽게도 concatWith는 사용한적이 없다. 언젠가 필요할 때가 있겠지..

then

then 오퍼레이션은 몇가지 존재하는데 대부분 비슷하다. 조금씩 다르니 참고하면 되겠다.
아무 파라미터가 없는 경우 then()은 리턴 값은 Void 타입이다. 즉, onNext는 방출하지 않는 다는 뜻과 같다. onNext만 방출하지 않지 에러와 완료신호는 방출한다.

@Test
fun `then test`() {
    Mono.just("then").then()
        .subscribe({ println(it) }, {println("error")}) { println("complete") }
}

출력 결과는 보면 complete 이외엔 아무것도 출력 되지 않는다. 만약 error 가 방출 될경우에는 error 메시지가 출력 된다.
이것은 종종 운영에서도 사용했다. 예를들어 message 를 publisher 한다거나 혹은 consumer 할 때 주로 사용했다. 혹은 spring 의 EventListener를 사용할 때도 사용했다. 종종 쓸일이 있으니 알아두면 좋을 것 같다.

then 메서드에 파라미터를 받는 메스드가 존재한다. 메서드 형태는 다음과 같다.

public final <V> Mono<V> then(Mono<V> other)

해당 모노를 완료시킨후에 다른 모노를 연결하는 오퍼레이터이다. 이는 순차적이다. 이 메서드는 onNext를 방출 할 수 있다.

@Test
public void thenTest() {
    Mono.just("then1").then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}


위의 결과는 then2complete 가 출력된다. 만약 기존 모노가 empty 일지 라도 다음 모노는 실행 시킨다.

@Test
public void thenTest() {
    Mono.empty().then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위처럼 작성해도 동일하게 동작한다. 이 메서드 역시 운영에서 종종사용했다.
더 많은 메서드들이 있긴한데 간단하게 설명만 하고 넘어가도록 하자.

thenEmpty 는 Void 타입을 받는 Publisher이다. 즉, 실행만 시키고 onNext는 방출하지 않는다는 뜻이다. 내부적으로는 then(Mono other) 메서드를 호출한다.
thenReturn은 Publisher 타입이 아닌 T 타입을 받는 메서드이다. 내부적으로는 then(Mono other)를 호출한다.
thenMany는 Publisher 타입을 받는 메서드이다. then(Mono other)랑 비슷해보이지만 리턴 타입은 Flux를 리턴한다.

handle

handle 메서드는 좀 더 우아하게 onNext, 에러 혹은 완료 신호를 줄 수 있다. 글 보다 코드를 보는게 이해가 빠를 듯 싶다.

@Test
public void handleTest() {

    Mono.just("bar")
            .handle((it, sink) -> {
                if (it.equals("foo")) {
                    sink.next(it);
                } else if (it.equals("bar")) {
                    sink.error(new NullPointerException());
                } else {
                    sink.complete();
                }
            }).subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

}

위와 같이 특정 조건에 의해 next, error, complete 신호를 보낼 수 있다. 꼭 해당 타입과 sink의 제네릭 타입이 같지 않아도 된다.
foo, bar 혹은 그외의 것들을 넣어 한번씩 테스트해보는게 이해가 더욱 빠를 듯 싶다. 이 역시 운영에서 아주 많이 사용한다.

cast, ofType

cast 혹은 ofType을 써서 형 변환을 할 수 있다. cast 와 ofType 두 메서드 모두 타입 캐스팅을 할 수 있지만 조금 다르다. cast는 타입캐스팅 할 때 형이 맞지 않다면 에러가 발생하지만 ofType 의 경우에는 에러가 나지 않고 무시된다. ofType은 내부적으로 filter를 사용해 무시한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

둘다 출력 되는건 동일하다. 형변환이 잘 되었을 땐 동일하게 동작하지만 형변환이 안될 경우에는 조금 다르게 동작한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위와 같이 형 변환이 안됐을 시에는 cast 경우 에러가 출력 되며 ofType 일 경우에는 complete 만 출력 된다.

filterWhen

filter의 비동기 버전이다. filter는 다들 알다시피 걸러내는 작업을 하는 메서드이다. filter와 동일한 동작을 하니 예제만 보고 넘어가자.

@Test
public void filterWhenTest() {
    Mono.just("filterWhen")
            .filterWhen((it) -> Mono.just(true))
            .subscribe(System.out::println);
}

filter 와 동일한 동작은 하니 그리 어렵진 않다. 사실 이건 운영에서 써본적은 없다.

repeat

메서드명 그대로 반복하여 구독한다. repeat()는 여러 메서드가 존재한다. 예를들어 repeat() 만 사용할 경우에는 무한정으로 구독하는 시스템이다. 실제로 필요한지는 잘모르겠다. 특정 조건에 만족하면 반복하는 메서드도 존재한다.

public final Flux<T> repeat(BooleanSupplier predicate)

또 한 원하는 만큼 반복도 가능하다.

@Test
public void repeatTest() {
    Mono.just("repeat")
            .repeat(2)
            .subscribe(System.out::println);
}

위의 소스는 2번을 더 반복한다는 뜻이다. 실제로 3번의 구독이 이루어진다. 만약 0으로 지정을 했다면 구독은 한번만 이루어진다.
이외에도 사실 repeat** 메서드가 많긴하지만 필자는 사용한적이 없다. 만약 사용한다면 테스트할때 사용하지 않나 싶다. 운영에서 사용할일은 극히 드물 것 같다.

doOnSubscribe, doOnRequest, doOnNext, doOnCancel, doOnTerminate, doAfterTerminate, doOnSuccess, doOnError, doFinally, doOnEach

doOn** 시리즈는 trigger에 해당한다. 로직상으로는 많이 사용하지 않겠지만 로그를 찍을 때 사용하면 적절할 듯 싶다. 필자도 대부분 로그를 찍을 때 사용했다. 물론 다른 경우에도 사용하겠지만 아직까지는 그런일은 없었다.
일단 먼저 사용법 부터 보자.

@Test
void doOnSeries() {
    Mono.just("doOnSeries")
            .doOnSubscribe(it -> System.out.println("doOnSubscribe"))
            .doOnRequest(it -> System.out.println("doOnRequest"))
            .doOnNext(it -> System.out.println("doOnNext"))
            .doOnEach(it -> System.out.println("doOnEach"))
            .doOnCancel(() -> System.out.println("doOnCancel"))
            .doAfterTerminate(() -> System.out.println("doAfterTerminate"))
            .doOnTerminate(() -> System.out.println("doOnTerminate"))
            .doOnSuccess(it -> System.out.println("doOnSuccess"))
            .doOnError(it -> System.out.println("doOnError"))
            .doFinally(it -> System.out.println("doFinally"))
            .subscribe();
}

사용법은 위와 같이 간단하다. 각 메서드 별로 알아보자.
doOnSubscribe 메서드는 구독이 시작 될 트리거 된다. 위 메서드 중 가장 먼저 실행 된다. 파라미터로는 Subscription 가 넘어 온다.
doOnRequest 메서드는 요청 받을 떄 트리거 된다. 기본적으로 파라미터는 Long의 Long.MAX_VALUE 값이 넘어온다.
doOnNext는 성공적으로 데이터가 방출 될 때 트리거 된다. 파라미터로는 해당 T 타입이 넘어 온다.
doOnCancel 메서드는 구독이 취소 됐을 때 넘어오는 이벤트다 파라미터로는 아무것도 넘어오지 않는다.
doOnTerminate 메서드는 완료 혹은 에러가 났을 떄 트리거 된다. 이벤트시기는 완료, 에러 이벤트 전에 트리거 된다. 이 역시 파라미터로는 아무것도 넘어오지 않는다.
doAfterTerminate 메서드는 doOnTerminate 와 동일하게 트리거 되지만 시점이 조금 다르다. 완료, 에러 이벤트 후에 트리거 된다. 역시 파라미터는 없다.
doOnSuccess 완료 되면 트리거 된다. 파라미터는 해당 T 타입이 넘어 온다.
doOnError 에러가 났을 때 트리거 된다. 파라미터로는 Throwable 가 넘어 온다.
doFinally 에러, 취소, 완료 될 때 트리거 된다. 모노가 종료되면 무슨 일이든 트리거 된다. 파라미터로는 SignalType이 넘어와 종료 이벤트 타입을 받을 수 있다. 자바의 try catch finally의 finally 과 동일한 느낌이다.
doOnEach 메서드는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트다. 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다. 위의 코드에서는 doOnEach 가 두번 출력된다. 데이터를 방출 할때 한번, 완료 되었을 때 한번.

retry

메서드명 그대로 에러가 발생하였을 때 재시도 하는 메서드이다. 이 또한 많은 메서드가 존재한다. 기본 retry() 메서드는 무한정(Long.MAX_VALUE 사실 무한은 아닌 듯)으로 재시도를 한다.
retry(long) 은 retry 횟수를 지정할 수 있다. 만약 2로 지정하였을 경우는 2번을 재시도 한다.
retry(Predicate) 은 특정 조건에 만족하면 재시도를 시도 한다.

@Test
void retryTest() {
    Mono.just("retry")
            .handle((it, sink) -> {
                System.out.println(it);
                sink.error(new NullPointerException());
            }).retry(2)
            .subscribe();
}


위의 코드는 재시도를 2번 시도하는 코드이다. 출력은 retry 가 3번 출력된다. 한번은 처음 구독했을 때와 두번은 retry를 시도했을 때 총 3번 출력 된다.

retryBackoff

retryBackoff 는 기본 retry 보다 조금 우아하게 재시도를 할 수 있다, backoff 설정를 할 수 있어 특정 시간만큼 Duration을 줄 수 있다. 이 역시 메서드는 많다.

public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor)

하지만 위의 메서드를 설명하면 대부분의 메서드는 설명된다.

위의 retry 는 사실 큰 도움은 되지 않을 수 있다. 대기시간도 없어 실패하면 바로 다시 재시도를 하기 때문이다. 만약 동시에 모든 호출이 실패하면 바로 또 모든 호출을 동시에 날리기 때문이다. 어쩌면 운영에서는 retryBackoff 메서드를 쓰는게 좀 더 많은 도움이 되지 않을까 싶다.

retryBackoff의 첫번째 파라미터인 numRetries는 retry와 동일하게 재시도 횟수를 지정하면 된다.
두 번째 파라미터인 firstBackoff는 첫 번째 재시도 할 때의 대기 시간이다.
세 번째 파라미터 maxBackoff는 최대로 해당 시간만큼 대기 할 수 있다는 뜻 이다. 그러기 때문에 firstBackoff 보다는 작으면 안된다.
jitterFactor는 위에서 말했던 것 처럼 동시에 모든 호출이 실패하면 또 다시 모든 호출은 동시에 재시도 하기 때문에 이를 분산시키기 위한 값이다. 최소값은 0.0 이며 최대값은 1.0 이다. 지정 하지 않을 경우에는 0.5가 기본값이다.

궁금해할진 모르겠지만 해당 알고리즘은 다음과 같다. i는 재시도 한 값이다.

nextBackoff = firstbackoff * 2^i
jitterOffset = (nextBackoff * 100 * jitterFactor) / 100
lowBound = max(firstbackoff - nextBackoff, -jitterOffset)
highBound = min(maxBackoff - nextBackoff, jitterOffset)
jitter = random (lowBound, highBound)
backoff = nextBackoff + jitter

사용법은 대략 다음과 같다.

@Test
void retryBackoffTest() throws InterruptedException {
    Mono.just("retry")
            .handle((it, sink) -> sink.error(new NullPointerException()))
            .retryBackoff(2, Duration.ofSeconds(3), Duration.ofSeconds(10), 0.5)
            .subscribe();
    sleep()
}


retryBackoff 는 다른 쓰레드로 돌리기 때문에 보여주기 위해 sleep를 줬다.

delayElement

사실 Mono의 이 메서드는 테스트에서 사용했지 운영에선 사용해본적이 없다. 메서드명 그대로 delay 를 줄 수 있는 메서드이다.
사용법 부터 살펴보자.

@Test
void delayElement() throws InterruptedException {
    Mono.just("delayElement")
            .delayElement(Duration.ofSeconds(3))
            .doOnNext(System.out::println)
            .subscribe();
    sleep
}

이 역시 다른 스레드로 돌아가기 때문에 sleep 을 줬다. delayElement은 해당 시간만큼 onNext 방출을 지연시킨다. 실행해보면 3초 후에 delayElement 가 출력된다.

onErrorResume, onErrorReturn, onErrorMap, onErrorContinue

오늘 마지막으로 살펴볼 아이는 onError** 시리즈이다. 이 역시 여러 종류의 메서드들이 있지만 기본적인 내용은 살펴보고 나머지는 해당 메서드를 살펴보는 게 좋다. 그리 어렵지 않은 내용이니 말이다.

onErrorResume 메서드는 에러가 발생하였을 경우 다른 Mono로 대체 할 수 있다.

@Test
void onErrorResumeTest() {
    Mono.just("onErrorResume")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorResume(it -> Mono.just("foo"))
            .subscribe(System.out::println);
}

위의 코드는 foo가 출력 되는 것을 볼 수 있다.

onErrorReturn은 에러가 발생했을 경우 다른 값으로 대체 할 수 있다.

@Test
void onErrorReturnTest() {
    Mono.just("onErrorReturn")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorReturn("foo")
            .subscribe(System.out::println);
}

이 역시 foo가 출력 된다.

onErrorMap 다른 에러로 변환할 수 있다.

@Test
void onErrorMapTest() {
    Mono.just("onErrorMap")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorMap(it -> new IllegalArgumentException())
            .subscribe(System.out::println, System.out::println);
}

위 처럼 에러를 변환하고 싶다면 사용하면 된다.

onErrorContinue는 연산 과정에서 오류가 발생하였을 때 복구하는 메서드이다. 넘어오는 파라미터는 에러와 해당 object이 넘어온다.

@Test
void onErrorContinueTest() {
    Mono.just("onErrorContinue")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorContinue((it, object) -> { })
            .subscribe(System.out::println, System.out::println);
}

위는 오류가 복구되어 기존의 있던 값 onErrorContinue가 출력 된다.

오늘은 여기까지 배워보도록 하자. 우리가 공부한 이외에도 훨씬 더 많은 메서드가 존재한다. 필자도 대부분 사용하는 것만 사용하기에 필자도 모르는 메서드가 많다. 물론 언젠가는 사용할 날이 오겠지.. 그럼 그때 다시 공부하는 걸루..

오늘은 이렇게 Mono 의 오퍼레이터에 대해서 알아봤다. 사실 위의 메서드만 알아도 어느정도는 충분한 개발을 할 수 있다고 믿는다. 물론 다 알면 좋겠지만.. 만약 여기에 나오지 않은 오퍼레이터들은 나중에 Flux 시간에 나올 수 도 있으니 참고하면 되겠다.
다음시간에는 Flux에 대해 알아보도록 하자.

Reactor 써보기 (1)

오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다.

요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자.
하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다.

위의 글들은 한번 읽어보면 좋을 것 같다. 토비느님의 방송 역시 함께 보면 개념적으로 더욱 이해가 빨리 될 듯 싶다. 꽤 많은 시리즈가 있으니 시간 날 때 짬짬이 보면 도움이 확실이 된다.

사실 Reactive Streams 구현체인 Reactor나 RxJava2나 사용법은 대부분 비슷하다. 같은 개발자가 만든건 아니지만 여러 회사들이 협업을 하면서 만든거니 아무래도 비슷할 수 밖에 없을 것도 같다. 아마도 구현체들 대부분 사용법은 비슷하지 않을까 싶다?

필자는 아무래도 거의 대부분 Spring을 사용하기에 Reactor로 먼저 접하게 되었다. 안드로이드에서는 Reactor 보다는 RxJava를 더 선호하고 많이 쓰는 것 같다. 하지만 상관없다. Spring 에선 RxJava 나 Reactor 나 혹은 다른 Reactive Streams 구현 된 어느 것을 써도 좋다. 이건 조만간 다시 한번 다뤄보기로 하겠다.

그럼 한번 어떤 메서드들이 있는지 주로 사용될만한 메서드 위주로 한번 살펴보도록 하자.

일단 모노 부터 만들어보자!

Mono

위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.)
위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다.
쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다.

Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다.

아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다.
Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다.

이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다.

그럼 이제 본격적으로 모노를 만들어 보자!

kotlin, java 모두 예제에 넣어봤다.

just


Mono<String> just = Mono.just("hello reactor");

코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions 디펜더시를 추가 해야 된다.

아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.

fromSupplier

만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.


Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");

from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다.
from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.

error

에러를 만드는 방법이다.


Mono<String> error = Mono.error(new NullPointerException()); Mono<String> errorSupplier = Mono.error(NullPointerException::new);

구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.

empty, never

빈 모노와 무기한으로 실행되는 모노를 만든다. 사실 never는 사용한 적이 단 한번도 없다.


Mono<String> empty = Mono.empty(); Mono<String> never = Mono.never();

해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다.
never 는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다.

아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..

zip

모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을 aggregating 하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다.

방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.


Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));

기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.


@Test void zipTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zip(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); } private Mono<String> testMethod1() { return Mono.just("foo") .delayElement(Duration.ofSeconds(1)) .doOnNext(System.out::println); } private Mono<String> testMethod2() { return Mono.just("bar") .doOnNext(System.out::println); }

위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 foo 가 출력된 후에 bar 출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar가 출력되고 1초후 foo가 실행 된다.

위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.


Mono.zip(testMethod1(), testMethod2()) .subscribe((Tuple2<String,String> it) -> { });

필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.

when

when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.


@Test void whenTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.when(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

이 역시 zip과 동일하게 오류를 방출하면 즉시 종료 된다.

delay

메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.


@Test void delayTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.delay(Duration.ofSeconds(1)) .doOnNext(System.out::println) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

위의 코드는 1초 후에 onNext 로 방출한다.

defer

defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.


Mono<String> defer = Mono.defer(() -> Mono.just("foo"));

음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty 에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.


Mono.just("foo") .switchIfEmpty(Mono.just("bar")) .subscribe(); }

만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just(“bar”)를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.


Mono.just("foo") .switchIfEmpty(Mono.defer(() -> Mono.just("bar"))) .subscribe(); }

위와 같이 코드를 작성한다면 Mono.just(“bar”)는 정말로 모노가 비어있을 때만 실행 된다.

from

reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.


Mono.from(Flux.just(1,2,3,4,5)) .subscribe(System.out::println);

결과는 1만 방출 되며 종료 된다.
꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다.
아래는 RxJava 를 사용한 예제이다.


Mono.from(Single.just("bar").toFlowable()) .subscribe(System.out::println);

동일하게 bar 가 방출 되며 종료 된다.

***DelayError

에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.


@Test void delayErrorTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException())) .subscribe((it) -> {}, System.out::println, countDownLatch::countDown); countDownLatch.await(); }

위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다.
출력 결과는 다음과 같다.

foo
foo
reactor.core.Exceptions$CompositeException: Multiple exceptions

만약 zip을 사용했다면 NullPointerException 에러만 방출 한다.

여기에선 zip 만 설명했지만 when 도 동일하다.

create

Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다.
말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.


@Test void createTest() { Mono.create(sink -> { client.async(request, new Listener() { @Override public void onFailure(Exception e) { sink.error(e); } @Override public void onResponse(Response response) { sink.success(response); } }); }); }

위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다.
만약 리스너 해제 및 자원 해제는 onDispose 메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel 메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.

using

이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다.
이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다.
사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.


@Test void usingTest() { Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ), it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { sink.success(attachment); } @Override public void failed(Throwable exc, ByteBuffer attachment) { sink.error(exc); } })), it -> { try { it.close(); } catch (IOException e) { } }); }

조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 using를 사용하는 코드를 참고 하면 되겠다.

usingWhen

이 것 역시 using과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher 타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.

@Test
void usingWhenTest() {
    Mono<String> data = Mono.just("foo");
    Mono.usingWhen(data, it ->
            Mono.just(it),
            it -> transition.commit(),
            (it, error) ->  transition.rollback(),
            it-> transition.rollback())
}

예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다.

오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다.

다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.

Spring WebClient

오늘은 Spring의 WebClient의 사용법에 대해서 몇가지 알아보도록 하자. 사용 API만 살펴 볼 예정이므로 reactive streams(reactor..) 들의 개념과 사용법은 다른 블로그를 살펴보길 바란다. reactive streams 대한 내용을 알고 보면 좋지만 몰라도 코드를 보는데는 문제가 없을 듯 하다.

WebClient는 Spring5 에 추가된 인터페이스다. spring5 이전에는 비동기 클라이언트로 AsyncRestTemplate를 사용을 했지만 spring5 부터는 Deprecated 되어 있다. 만약 spring5 이후 버전을 사용한다면 AsyncRestTemplate 보다는 WebClient 사용하는 것을 추천한다. 아직 spring 5.2(현재기준) 에서 AsyncRestTemplate 도 존재하긴 한다.

기본 문법

기본적으로 사용방법은 아주 간단하다. WebClient 인터페이스의 static 메서드인 create()를 사용해서 WebClient 를 생성하면 된다. 한번 살펴보자.

@Test
void test1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

create()는 두가지 메서드가 있는데 baseUrl를 지정해주는 것과 그렇지 않은 것 두가지가 존재한다. 원하는 API를 사용하면 되겠다.

@Test
void test1() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

API 자체도 명확하다. get(), post(), put(), patch(), 등 http method들을 모두 정의되어 있다.

webClient.get()
  .///
webClient.post()
  .///
webClient.put()
  .///
webClient.method(HttpMethod.GET)
  .///

또는 위와 같이 HttpMethod를 지정할 수 있다.

uri 또한 여러 메서드가 존재한다. 단순하게 string 으로 uri을 만들 수 도 있고 queryParam, pathVariable 등 명확하게 uri을 만들 수 도 있다. 위의 코드를 사실 RestTemplate 클래스를 자주 사용했다면 익숙한 문법일 수 있다.

@Test
void test1_3() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name=wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}


@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", Map.of("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri(it -> it.path("/sample")
                    .queryParam("name", "wonwoo")
                    .build()
            ).retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위와 같이 여러 방법이 존재하니 각자가 원하는 어떤것을 사용해도 좋다. 마지막 S uri(Function<UriBuilder, URI> uriFunction) API는 좀 더 세세하게 컨트롤 할 수 있으니 세세하게 컨트롤 할 일이 있다면 이 API를 사용하는게 좋다.

다음은 retrieve() 메서드인데 이 메서드는 request를 만들고 http request를 보내는 역할을 한다. 이 메서드 말고 exchange()가 존재하는데 약간 다르다. 사실 API만 살짝 다를뿐이지 retrieve() 내부에선 exchange() 메서드를 호출한다.

retrieve() 메서드는 ResponseSpec 타입을 리턴하고 exchange() 메서드는 Mono<ClientResponse> 를 리턴하고 있다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> it.bodyToMono(String.class));

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위의 test1_3 메서드와 test2 메서드는 동일한 동작을 한다. 위에서 말했다시피 exchange() 메서드는 ClientResponse를 사용해 좀 더 세세한 컨트롤을 하면 된다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> {
                if(it.statusCode() == HttpStatus.NOT_FOUND) {
                    throw new NotFoundException("....");
                }
                return it.bodyToMono(String.class);
            });

    StepVerifier.create(hello)
            .verifyError(NotFoundException.class);
}

이렇게 기본문법에 대해서 알아봤다. 그리 어려운 내용도 없는 듯 하다. 좀 더 범용성있게 사용하려면 아직은 부족하다. 좀 더 살펴보자.

formData 와 message

위에서 알아보지 않은게 있는데 바로 post나 put 기타 http 메서드에 자주 사용하는 formdata 와 message에 대해서 알아보자.
만약 formData 로 server에 보낸다면 다음과 같이 작성하면 된다.

import static org.springframework.web.reactive.function.BodyInserters.fromFormData;

@Test
void test3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.post()
            .uri("/sample")
            .body(fromFormData("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

fromFormData란 static 메서드를 사용해서 전달하면 된다. 만약 좀 더 많은 내용이 있다면 .with(key, value) 메서드를 체이닝해 사용하면 된다.

.body(fromFormData("name", "wonwoo").with("foo","bar").with("...","..."))

또는 MultiValueMap를 이용해서 fromFormData에 넣어도 된다.

이번엔 message에 대해서 알아보자. 일반적으로 json, xml 기타 message로 보낼때 유용하다. 한번 살펴보자.

@Test
void test3_1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .bodyValue(new Sample("wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

위와같이 bodyValue를 이용해서 message를 전달할 수 있다. 참고로 spring 5.2 이전버전에선 syncBody를 이용해야 한다. spring 5.2에선 syncBody는 Deprecated 되었다.

만약 전달하는 message가 Publisher 타입일 수 도 있다. 그럼 다음과 같이 작성하면 된다.

@Test
void test3_2() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .body(Mono.just(new Sample("wonwoo")), Sample.class)
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

filter

filter이용하면 client를 호출하기전에 인터셉터해서 추가적인 작업을 할 수 있다. 예를들면 로그를 출력 할 수도 있고 헤더정보 혹은 인증정보를 넣어 호출 할 수 있다.

필터를 사용하려면 ExchangeFilterFunction 인터페이스를 구현하면 된다. 추상 메서드는 하나뿐이라 람다를 이용해도 좋다.

@Test
void test4() {

    WebClient webClient = WebClient.builder().filter((request, next) -> next.exchange(ClientRequest.from(request)
            .header("foo", "bar").build())).baseUrl("http://localhost:8080")
            .build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위의 코드는 헤더 정보 를 추가하는 코드이다. 또한 한번에 여러 필터를 적용할 수 도 있다.

WebClient.builder().filters(exchangeFilterFunctions ->
        exchangeFilterFunctions.add(0, (request, next) -> {
            return next.exchange(request);
        }));

위의 코드는 0번째에 해당 필터를 삽입하는 코드이다. 물론 filter를 계속 체이닝해서 써도 상관 없다.

ClientHttpConnector

현재 spring에서는 reactive http client가 2개밖에 존재하지 않는다. netty와 jetty이다. 사실 spring을 사용한다면 그냥 netty를 사용하는게 정신건강에 좋을 듯 싶다.

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
</dependency>

위와 같이 jetty reactive httpclient를 먼저 디펜더시 받은 후에 다음과 같이 작성하면 된다.

@Test
void test5() {
    WebClient webClient = WebClient.builder().clientConnector(new JettyClientHttpConnector())
            .baseUrl("http://localhost:8080").build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

추가 적인 설정은 JettyClientHttpConnector 클래스와 org.eclipse.jetty.client.HttpClient 클래스를 살펴보면 되겠다.

default values

만약 기본으로 헤더정보 쿠키정보등 값을 지정하고 싶다면 다음과 같이 작성하면 된다.

@Test
void test6() {

    WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080")
            .defaultHeader("foo", "bar")
            .defaultCookie("foo", "BAR")
            .defaultRequest(it -> it.header("test", "sample")).build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위와 같이 작성하면 기본으로 위와 같은 값이 같이 전송된다. defaultRequest() 메서드를 사용하면 좀더 세세하게 컨트롤이 가능하니 참고하면 되겠다.

retrieve

위에서 잠깐 언급한 retrieve 메서드를 이용하는 경우에 보다 상세한 에러 코드들을 컨트롤 할 수 있다. 원한다면 사용해도 좋다.

@Test
void test7() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, __ -> Mono.error(new IllegalArgumentException("4xx")))
            .onStatus(HttpStatus::is5xxServerError, __ -> Mono.error(new IllegalArgumentException("5xx")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("4xx");

} 

onStatus 메서드를 이용해서 해당 코드를 작성후에 Mono type의 exception을 던지면 된다. 위의 코드는 4xx 에러 코드일땐 4xx라는 메시지를 던지고 5xx 에러 코드일 땐 5xx라는 메세지를 던진다는 코드이다.

onStatus() 메서드 말고도 onRawStatus() 메서드도 존재한다. 이것은 위와 같이 HttpStatus 코드가 아닌 int로된 코드를 리턴한다.

@Test
void test8() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .onRawStatus(it -> it == 400, __ -> Mono.error(new IllegalArgumentException("aaa")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("400");
}

이렇게 기본문법과 사용법에 대해서 알아봤다. 물론 좀 더 많은 메서드들이 있지만 필자가 자주 사용할만한 API 위주로 살펴봤다. 다른 궁금한 점이 있다면 해당 문서를 찾아보길 추천한다.

마지막으로 Spring boot를 사용할 때에 WebClient는 어떻게 사용해야 될까? 사실 기본적인 설정은 되어있다. 그래서 아주 쉽고 간단하게 사용할 수 있다.

spring boot

spring boot 를 사용할 때는 WebClient.Builder 인터페이스가 기본적으로 bean으로 등록 되어있다. 그래서 우리는 이걸 사용하면 된다.

@RestController
public class WebClientController {

    private final WebClient webClient;

    public WebClientController(WebClient.Builder builder) {
        this.webClient = builder.baseUrl("http://localhost:9999").build();
    }

    @GetMapping("/users")
    public Mono<String> findByName() {
        return webClient.get()
                .uri("/users")
                .retrieve()
                .bodyToMono(String.class);
    }
}

딱히 설명할 것도 없다. 만약 필터나 default values 가 필요하면 위에서 했던 그 방법을 그대로 이용하면 된다.

public WebClientController(WebClient.Builder builder) {
    this.webClient = builder
            .filter((request, next) -> next.exchange(request))
            .defaultHeader("foo", "bar")
            .baseUrl("http://localhost:8080")
            .build();
}

그리고 만약 전역적으로 커스텀할 코드들이 있다면 WebClientCustomizer 인터페이스를 이용해서 커스텀할 수 있다.

@Bean
WebClientCustomizer webClientCustomizer() {
    return builder -> builder.filter((request, next) -> next.exchange(request));
}

위와 같이 WebClientCustomizer 빈으로 등록하여 커스터마이징하면 된다.

번외로 kotlin 코드도 한번 살펴보자.

bodyToFlux(), bodyToMono(), body(), awaitExchange(), bodyToFlow(), awaitBody() 등 확장함수로 된 함수들이 몇가지 존재하니 참고하면 되겠다. 몇가지는 코루틴 관련 확장함수이다.

오늘은 이렇게 Spring의 WebClient에 대해서 살펴봤다. 이정도만 알아도 사용하기엔 충분할 듯 싶다. Spring 5에서 Non blocking http client를 사용한다면 꼭 WebClient를 사용하도록 하자!