Reactor 써보기 (1)

오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다.

요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자.
하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다.

위의 글들은 한번 읽어보면 좋을 것 같다. 토비느님의 방송 역시 함께 보면 개념적으로 더욱 이해가 빨리 될 듯 싶다. 꽤 많은 시리즈가 있으니 시간 날 때 짬짬이 보면 도움이 확실이 된다.

사실 Reactive Streams 구현체인 Reactor나 RxJava2나 사용법은 대부분 비슷하다. 같은 개발자가 만든건 아니지만 여러 회사들이 협업을 하면서 만든거니 아무래도 비슷할 수 밖에 없을 것도 같다. 아마도 구현체들 대부분 사용법은 비슷하지 않을까 싶다?

필자는 아무래도 거의 대부분 Spring을 사용하기에 Reactor로 먼저 접하게 되었다. 안드로이드에서는 Reactor 보다는 RxJava를 더 선호하고 많이 쓰는 것 같다. 하지만 상관없다. Spring 에선 RxJava 나 Reactor 나 혹은 다른 Reactive Streams 구현 된 어느 것을 써도 좋다. 이건 조만간 다시 한번 다뤄보기로 하겠다.

그럼 한번 어떤 메서드들이 있는지 주로 사용될만한 메서드 위주로 한번 살펴보도록 하자.

일단 모노 부터 만들어보자!

Mono

위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.)
위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다.
쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다.

Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다.

아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다.
Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다.

이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다.

그럼 이제 본격적으로 모노를 만들어 보자!

kotlin, java 모두 예제에 넣어봤다.

just


Mono<String> just = Mono.just("hello reactor");

코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions 디펜더시를 추가 해야 된다.

아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.

fromSupplier

만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.


Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");

from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다.
from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.

error

에러를 만드는 방법이다.


Mono<String> error = Mono.error(new NullPointerException()); Mono<String> errorSupplier = Mono.error(NullPointerException::new);

구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.

empty, never

빈 모노와 무기한으로 실행되는 모노를 만든다. 사실 never는 사용한 적이 단 한번도 없다.


Mono<String> empty = Mono.empty(); Mono<String> never = Mono.never();

해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다.
never 는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다.

아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..

zip

모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을 aggregating 하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다.

방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.


Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));

기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.


@Test void zipTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zip(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); } private Mono<String> testMethod1() { return Mono.just("foo") .delayElement(Duration.ofSeconds(1)) .doOnNext(System.out::println); } private Mono<String> testMethod2() { return Mono.just("bar") .doOnNext(System.out::println); }

위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 foo 가 출력된 후에 bar 출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar가 출력되고 1초후 foo가 실행 된다.

위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.


Mono.zip(testMethod1(), testMethod2()) .subscribe((Tuple2<String,String> it) -> { });

필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.

when

when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.


@Test void whenTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.when(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

이 역시 zip과 동일하게 오류를 방출하면 즉시 종료 된다.

delay

메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.


@Test void delayTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.delay(Duration.ofSeconds(1)) .doOnNext(System.out::println) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

위의 코드는 1초 후에 onNext 로 방출한다.

defer

defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.


Mono<String> defer = Mono.defer(() -> Mono.just("foo"));

음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty 에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.


Mono.just("foo") .switchIfEmpty(Mono.just("bar")) .subscribe(); }

만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just(“bar”)를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.


Mono.just("foo") .switchIfEmpty(Mono.defer(() -> Mono.just("bar"))) .subscribe(); }

위와 같이 코드를 작성한다면 Mono.just(“bar”)는 정말로 모노가 비어있을 때만 실행 된다.

from

reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.


Mono.from(Flux.just(1,2,3,4,5)) .subscribe(System.out::println);

결과는 1만 방출 되며 종료 된다.
꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다.
아래는 RxJava 를 사용한 예제이다.


Mono.from(Single.just("bar").toFlowable()) .subscribe(System.out::println);

동일하게 bar 가 방출 되며 종료 된다.

***DelayError

에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.


@Test void delayErrorTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException())) .subscribe((it) -> {}, System.out::println, countDownLatch::countDown); countDownLatch.await(); }

위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다.
출력 결과는 다음과 같다.

foo
foo
reactor.core.Exceptions$CompositeException: Multiple exceptions

만약 zip을 사용했다면 NullPointerException 에러만 방출 한다.

여기에선 zip 만 설명했지만 when 도 동일하다.

create

Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다.
말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.


@Test void createTest() { Mono.create(sink -> { client.async(request, new Listener() { @Override public void onFailure(Exception e) { sink.error(e); } @Override public void onResponse(Response response) { sink.success(response); } }); }); }

위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다.
만약 리스너 해제 및 자원 해제는 onDispose 메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel 메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.

using

이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다.
이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다.
사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.


@Test void usingTest() { Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ), it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { sink.success(attachment); } @Override public void failed(Throwable exc, ByteBuffer attachment) { sink.error(exc); } })), it -> { try { it.close(); } catch (IOException e) { } }); }

조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 using를 사용하는 코드를 참고 하면 되겠다.

usingWhen

이 것 역시 using과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher 타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.

@Test
void usingWhenTest() {
    Mono<String> data = Mono.just("foo");
    Mono.usingWhen(data, it ->
            Mono.just(it),
            it -> transition.commit(),
            (it, error) ->  transition.rollback(),
            it-> transition.rollback())
}

예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다.

오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다.

다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.

Spring WebClient

오늘은 Spring의 WebClient의 사용법에 대해서 몇가지 알아보도록 하자. 사용 API만 살펴 볼 예정이므로 reactive streams(reactor..) 들의 개념과 사용법은 다른 블로그를 살펴보길 바란다. reactive streams 대한 내용을 알고 보면 좋지만 몰라도 코드를 보는데는 문제가 없을 듯 하다.

WebClient는 Spring5 에 추가된 인터페이스다. spring5 이전에는 비동기 클라이언트로 AsyncRestTemplate를 사용을 했지만 spring5 부터는 Deprecated 되어 있다. 만약 spring5 이후 버전을 사용한다면 AsyncRestTemplate 보다는 WebClient 사용하는 것을 추천한다. 아직 spring 5.2(현재기준) 에서 AsyncRestTemplate 도 존재하긴 한다.

기본 문법

기본적으로 사용방법은 아주 간단하다. WebClient 인터페이스의 static 메서드인 create()를 사용해서 WebClient 를 생성하면 된다. 한번 살펴보자.

@Test
void test1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

create()는 두가지 메서드가 있는데 baseUrl를 지정해주는 것과 그렇지 않은 것 두가지가 존재한다. 원하는 API를 사용하면 되겠다.

@Test
void test1() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

API 자체도 명확하다. get(), post(), put(), patch(), 등 http method들을 모두 정의되어 있다.

webClient.get()
  .///
webClient.post()
  .///
webClient.put()
  .///
webClient.method(HttpMethod.GET)
  .///

또는 위와 같이 HttpMethod를 지정할 수 있다.

uri 또한 여러 메서드가 존재한다. 단순하게 string 으로 uri을 만들 수 도 있고 queryParam, pathVariable 등 명확하게 uri을 만들 수 도 있다. 위의 코드를 사실 RestTemplate 클래스를 자주 사용했다면 익숙한 문법일 수 있다.

@Test
void test1_3() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name=wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}


@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", Map.of("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri(it -> it.path("/sample")
                    .queryParam("name", "wonwoo")
                    .build()
            ).retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위와 같이 여러 방법이 존재하니 각자가 원하는 어떤것을 사용해도 좋다. 마지막 S uri(Function<UriBuilder, URI> uriFunction) API는 좀 더 세세하게 컨트롤 할 수 있으니 세세하게 컨트롤 할 일이 있다면 이 API를 사용하는게 좋다.

다음은 retrieve() 메서드인데 이 메서드는 request를 만들고 http request를 보내는 역할을 한다. 이 메서드 말고 exchange()가 존재하는데 약간 다르다. 사실 API만 살짝 다를뿐이지 retrieve() 내부에선 exchange() 메서드를 호출한다.

retrieve() 메서드는 ResponseSpec 타입을 리턴하고 exchange() 메서드는 Mono<ClientResponse> 를 리턴하고 있다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> it.bodyToMono(String.class));

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위의 test1_3 메서드와 test2 메서드는 동일한 동작을 한다. 위에서 말했다시피 exchange() 메서드는 ClientResponse를 사용해 좀 더 세세한 컨트롤을 하면 된다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> {
                if(it.statusCode() == HttpStatus.NOT_FOUND) {
                    throw new NotFoundException("....");
                }
                return it.bodyToMono(String.class);
            });

    StepVerifier.create(hello)
            .verifyError(NotFoundException.class);
}

이렇게 기본문법에 대해서 알아봤다. 그리 어려운 내용도 없는 듯 하다. 좀 더 범용성있게 사용하려면 아직은 부족하다. 좀 더 살펴보자.

formData 와 message

위에서 알아보지 않은게 있는데 바로 post나 put 기타 http 메서드에 자주 사용하는 formdata 와 message에 대해서 알아보자.
만약 formData 로 server에 보낸다면 다음과 같이 작성하면 된다.

import static org.springframework.web.reactive.function.BodyInserters.fromFormData;

@Test
void test3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.post()
            .uri("/sample")
            .body(fromFormData("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

fromFormData란 static 메서드를 사용해서 전달하면 된다. 만약 좀 더 많은 내용이 있다면 .with(key, value) 메서드를 체이닝해 사용하면 된다.

.body(fromFormData("name", "wonwoo").with("foo","bar").with("...","..."))

또는 MultiValueMap를 이용해서 fromFormData에 넣어도 된다.

이번엔 message에 대해서 알아보자. 일반적으로 json, xml 기타 message로 보낼때 유용하다. 한번 살펴보자.

@Test
void test3_1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .bodyValue(new Sample("wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

위와같이 bodyValue를 이용해서 message를 전달할 수 있다. 참고로 spring 5.2 이전버전에선 syncBody를 이용해야 한다. spring 5.2에선 syncBody는 Deprecated 되었다.

만약 전달하는 message가 Publisher 타입일 수 도 있다. 그럼 다음과 같이 작성하면 된다.

@Test
void test3_2() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .body(Mono.just(new Sample("wonwoo")), Sample.class)
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

filter

filter이용하면 client를 호출하기전에 인터셉터해서 추가적인 작업을 할 수 있다. 예를들면 로그를 출력 할 수도 있고 헤더정보 혹은 인증정보를 넣어 호출 할 수 있다.

필터를 사용하려면 ExchangeFilterFunction 인터페이스를 구현하면 된다. 추상 메서드는 하나뿐이라 람다를 이용해도 좋다.

@Test
void test4() {

    WebClient webClient = WebClient.builder().filter((request, next) -> next.exchange(ClientRequest.from(request)
            .header("foo", "bar").build())).baseUrl("http://localhost:8080")
            .build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위의 코드는 헤더 정보 를 추가하는 코드이다. 또한 한번에 여러 필터를 적용할 수 도 있다.

WebClient.builder().filters(exchangeFilterFunctions ->
        exchangeFilterFunctions.add(0, (request, next) -> {
            return next.exchange(request);
        }));

위의 코드는 0번째에 해당 필터를 삽입하는 코드이다. 물론 filter를 계속 체이닝해서 써도 상관 없다.

ClientHttpConnector

현재 spring에서는 reactive http client가 2개밖에 존재하지 않는다. netty와 jetty이다. 사실 spring을 사용한다면 그냥 netty를 사용하는게 정신건강에 좋을 듯 싶다.

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
</dependency>

위와 같이 jetty reactive httpclient를 먼저 디펜더시 받은 후에 다음과 같이 작성하면 된다.

@Test
void test5() {
    WebClient webClient = WebClient.builder().clientConnector(new JettyClientHttpConnector())
            .baseUrl("http://localhost:8080").build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

추가 적인 설정은 JettyClientHttpConnector 클래스와 org.eclipse.jetty.client.HttpClient 클래스를 살펴보면 되겠다.

default values

만약 기본으로 헤더정보 쿠키정보등 값을 지정하고 싶다면 다음과 같이 작성하면 된다.

@Test
void test6() {

    WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080")
            .defaultHeader("foo", "bar")
            .defaultCookie("foo", "BAR")
            .defaultRequest(it -> it.header("test", "sample")).build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위와 같이 작성하면 기본으로 위와 같은 값이 같이 전송된다. defaultRequest() 메서드를 사용하면 좀더 세세하게 컨트롤이 가능하니 참고하면 되겠다.

retrieve

위에서 잠깐 언급한 retrieve 메서드를 이용하는 경우에 보다 상세한 에러 코드들을 컨트롤 할 수 있다. 원한다면 사용해도 좋다.

@Test
void test7() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, __ -> Mono.error(new IllegalArgumentException("4xx")))
            .onStatus(HttpStatus::is5xxServerError, __ -> Mono.error(new IllegalArgumentException("5xx")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("4xx");

} 

onStatus 메서드를 이용해서 해당 코드를 작성후에 Mono type의 exception을 던지면 된다. 위의 코드는 4xx 에러 코드일땐 4xx라는 메시지를 던지고 5xx 에러 코드일 땐 5xx라는 메세지를 던진다는 코드이다.

onStatus() 메서드 말고도 onRawStatus() 메서드도 존재한다. 이것은 위와 같이 HttpStatus 코드가 아닌 int로된 코드를 리턴한다.

@Test
void test8() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .onRawStatus(it -> it == 400, __ -> Mono.error(new IllegalArgumentException("aaa")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("400");
}

이렇게 기본문법과 사용법에 대해서 알아봤다. 물론 좀 더 많은 메서드들이 있지만 필자가 자주 사용할만한 API 위주로 살펴봤다. 다른 궁금한 점이 있다면 해당 문서를 찾아보길 추천한다.

마지막으로 Spring boot를 사용할 때에 WebClient는 어떻게 사용해야 될까? 사실 기본적인 설정은 되어있다. 그래서 아주 쉽고 간단하게 사용할 수 있다.

spring boot

spring boot 를 사용할 때는 WebClient.Builder 인터페이스가 기본적으로 bean으로 등록 되어있다. 그래서 우리는 이걸 사용하면 된다.

@RestController
public class WebClientController {

    private final WebClient webClient;

    public WebClientController(WebClient.Builder builder) {
        this.webClient = builder.baseUrl("http://localhost:9999").build();
    }

    @GetMapping("/users")
    public Mono<String> findByName() {
        return webClient.get()
                .uri("/users")
                .retrieve()
                .bodyToMono(String.class);
    }
}

딱히 설명할 것도 없다. 만약 필터나 default values 가 필요하면 위에서 했던 그 방법을 그대로 이용하면 된다.

public WebClientController(WebClient.Builder builder) {
    this.webClient = builder
            .filter((request, next) -> next.exchange(request))
            .defaultHeader("foo", "bar")
            .baseUrl("http://localhost:8080")
            .build();
}

그리고 만약 전역적으로 커스텀할 코드들이 있다면 WebClientCustomizer 인터페이스를 이용해서 커스텀할 수 있다.

@Bean
WebClientCustomizer webClientCustomizer() {
    return builder -> builder.filter((request, next) -> next.exchange(request));
}

위와 같이 WebClientCustomizer 빈으로 등록하여 커스터마이징하면 된다.

번외로 kotlin 코드도 한번 살펴보자.

bodyToFlux(), bodyToMono(), body(), awaitExchange(), bodyToFlow(), awaitBody() 등 확장함수로 된 함수들이 몇가지 존재하니 참고하면 되겠다. 몇가지는 코루틴 관련 확장함수이다.

오늘은 이렇게 Spring의 WebClient에 대해서 살펴봤다. 이정도만 알아도 사용하기엔 충분할 듯 싶다. Spring 5에서 Non blocking http client를 사용한다면 꼭 WebClient를 사용하도록 하자!

Spring WebFlux HandlerMethodArgumentResolver

오늘은 Spring WebFlux의 HandlerMethodArgumentResolver에 대해서 알아보도록 하자.

사실 WebFlux 이전에 WebMvc에도 동일한 기능이 존재한다. 인터페이스명까지 동일하니 거부감은 사실 없다. 기존의 mvc의 기능과 동일은 하나 WebFlux API에 맞춰진 형태라 생각하면 된다. 어떤 기능인지는 여기를 참고해도 되고 다른 블로그 혹은 문서를 살펴봐도 좋다.

WebMvc 클래스는 org.springframework.web.method.support.HandlerMethodArgumentResolver와 같고 WebFlux의 클래스는 org.springframework.web.reactive.result.method.HandlerMethodArgumentResolver 이와 같다.

HandlerMethodArgumentResolverSupport

위의 내용을 알아보기 전에 WebMvc에는 존재 하지 않지만 WebFlux에 존재하는 클래스인 HandlerMethodArgumentResolverSupport 를 살펴보자. HandlerMethodArgumentResolverSupport 에는 protected 메서드가 3개 존재하는데 checkParameterType, checkParameterTypeNoReactiveWrapper, checkAnnotatedParamNoReactiveWrapper 라는 메서드이다.

어떤 일들은 하는지 한번살펴보자. 일반적으로는 HandlerMethodArgumentResolver.supportsParameter 메서드에서 많이 사용하고 있다. 물론 위의 메서드를 사용하지 않아도 된다.

checkParameterType

checkParameterType 메서드는 reactive 랩퍼 클래스를 허용한다는 메서드이다. 한마디로 파라미터에 reactive 랩퍼 클래스를 사용해도 된다는 의미를 갖고 있다.

사용법은 아래와 같다.

@Override
public boolean supportsParameter(MethodParameter parameter) {
    return checkParameterType(parameter, UserInfo.class::isAssignableFrom);
}

두번째 파라미터는 Predicate<Class<?>>로 해당 조건이 만족하면 true를 리턴하고 그렇지 않으면 false를 리턴한다.
UserInfo 라는 클래스를 파라미터로 허용한다는 뜻인데 그게 만약 reactive 클래스라도 허용한다는 뜻이다.

@GetMapping("/")
public Mono<Message> message(Mono<UserInfo> userInfo) {

    //...
}

@GetMapping("/")
public Mono<Message> message(UserInfo userInfo) {

     //...
}

위와 같이 Mono<UserInfo>UserInfo 둘다 가능하다는 뜻이다.

checkParameterTypeNoReactiveWrapper

checkParameterType 메서드와는 다르게 reactive 스타일을 허용하지 않는다.
사용법은 다음과 같다.

@Override
public boolean supportsParameter(MethodParameter parameter) {
    return checkParameterTypeNoReactiveWrapper(parameter, UserInfo.class::isAssignableFrom);
}

두번째 파라미터는 이 또한 Predicate<Class<?>>로 해당 조건이 만족하면 true를 리턴하고 그렇지 않으면 false를 리턴한다.

@GetMapping("/")
public Mono<Message> message(Mono<UserInfo> userInfo) {

    //...
}

만약 위와 같이 작성했지만 파라미터를 reactive 스타일로 받는다음 에러가 발생한다.

FooHandlerMethodArgumentResolver does not support reactive type wrapper: reactor.core.publisher.Mono<ml.wonwoo.example.UserInfo>

위의 메서드를 사용할 경우에는 아래와 같이 사용해야 된다.

@GetMapping("/")
public Mono<Message> message(UserInfo userInfo) {

    //...
}


checkAnnotatedParamNoReactiveWrapper

위의 두 내용은 Type과 관련있었지만 이 메서드는 애노테이션과 관련이 있다. 이 메서드는 리액티브 스타일을 허용 하지 않는다.

사용법은 다음과 같다.

@Override
public boolean supportsParameter(MethodParameter parameter) {
    return checkAnnotatedParamNoReactiveWrapper(parameter, CurrentUser.class,
            (annotation , clazz) -> !UserInfo.class.isAssignableFrom(clazz));
}

두번째 파라미터는 BiPredicate<A, Class<?>>로 해당 조건이 만족하면 true를 리턴하고 그렇지 않으면 false를 리턴한다.

위와 같이 작성하면 아래와 같이 파라미터로 사용가능하다.

@GetMapping("/")
public Mono<Message> message(@CurrentUser User user) {

     //...
}

근데 왜 checkAnnotatedParamReactiveWrapper 메서드는 만들지 않았을까?

어쨌든 WebFlux에서는 HandlerMethodArgumentResolverSupport 대부분 상속받아서 구현하고 있으니 HandlerMethodArgumentResolverSupport 상속받아서 구현해도 되며 HandlerMethodArgumentResolver 를 직접 구현해도 상관없다.

하지만 필자는 HandlerMethodArgumentResolverSupport 상속받아서 구현했다.

resolveArgument

위에서는 어떤 파라미터로 받을지 체크하는 부분이였다면 resolveArgument는 해당 파라미터에 대한 구현을 직접해줘야 한다.

만약 reactive 스타일로 파라미터로 받지 않는다면 예전처럼 webmvc 구현해도 되지만 그렇지 않고 reactive 스타일도 지원한다면 추가적으로 조금 구현해야 되는게 있다.

참고로 동기적으로 구현하고 싶다면 SyncHandlerMethodArgumentResolver 를 구현하면 된다.

해당 파라미터가 reactive 스타일의 파라미터인지 알아야 한다. 하지만 이것보다 중요한건 대부분 reactor를 사용하겠지만 만약 다른 Reactive Streams API(rxjava1, rxjava2, jdkFlow) 혹은 coroutine 을 사용한다면 그에 맞게 변환을 해줘야 한다. 그 클래스가 ReactiveAdapterRegistry 이며 자세한건 문서를 찾아보자.

참고로 HandlerMethodArgumentResolverSupport 의 메서드들도 ReactiveAdapterRegistry를 사용한다.

@Override
public Mono<Object> resolveArgument(MethodParameter parameter, BindingContext bindingContext, ServerWebExchange exchange) {

    ResolvableType resolvableType = ResolvableType.forMethodParameter(parameter);

    ReactiveAdapter adapter = (resolvableType != null ? getAdapterRegistry().getAdapter(resolvableType.resolve()) : null);

    Mono<UserInfo> userMono = Mono.just(new UserInfo("wonwoo", "wonwoo@test.com"));

    return userMono
            .map(userInfo -> adapter != null ? adapter.fromPublisher(userMono) : userInfo);
}

위와 같이 해당 타입의 ReactiveAdapter를 가져와 Publisher 하면 된다. 아주 간단하다. 크게 복잡한 내용은 없는 것 같다.
그럼 다음과 같이 사용가능하다.

//reactor
@GetMapping("/")
public Mono<UserInfo> hello(Mono<UserInfo> userInfo) {

    return userInfo;
}

@GetMapping("/")
public Flux<UserInfo> hello(Flux<UserInfo> userInfo) {

    return userInfo;
}

//rxjava 1,2 
@GetMapping("/")
public Single<UserInfo> hello(Single<UserInfo> userInfo) {

    return userInfo;
}

@GetMapping("/")
public Observable<UserInfo> hello(Observable<UserInfo> userInfo) {

    return userInfo;
}

@GetMapping("/")
public Maybe<UserInfo> hello(Maybe<UserInfo> userInfo) {

    return userInfo;
}

@GetMapping("/")
public Flowable<UserInfo> hello(Flowable<UserInfo> userInfo) {

    return userInfo;
}


//jdk
@GetMapping("/")
public Flow.Publisher<UserInfo> hello(Flow.Publisher<UserInfo> userInfo) {

    return userInfo;
}

rxjava는 잘 몰라서 그냥 테스트만 해봤다.

오늘은 이렇게 Spring WebFlux의 HandlerMethodArgumentResolver 대해서 알아봤다. 만약 사용할 일이 있다면 언젠든지 사용해도 좋다. 어려운 내용이 아니므로 한번씩 해보는 것도 나쁘지 않아 보인다.

WebFlux도 공부할게 많다.