Reactor 써보기 (1)

오랜만에 글을 쓰니 좀 어색하다. 요 근래 계속 글을 안썼더니 말이다.

요즘 필자는 회사에서 Spring Webflux를 사용하고 있다. 그래서 좀 더 잘 사용해보자라는 의미에서 Reactor 를 공부해보도록 하자.
하지만 여기에선 Reactive Streams 에 대해 개념적으로는 설명하지 않겠다. 이미 다른 블로그에 좋은 글들이 많으니 그걸 보고 개념을 이해하면 좋겠다.

위의 글들은 한번 읽어보면 좋을 것 같다. 토비느님의 방송 역시 함께 보면 개념적으로 더욱 이해가 빨리 될 듯 싶다. 꽤 많은 시리즈가 있으니 시간 날 때 짬짬이 보면 도움이 확실이 된다.

사실 Reactive Streams 구현체인 Reactor나 RxJava2나 사용법은 대부분 비슷하다. 같은 개발자가 만든건 아니지만 여러 회사들이 협업을 하면서 만든거니 아무래도 비슷할 수 밖에 없을 것도 같다. 아마도 구현체들 대부분 사용법은 비슷하지 않을까 싶다?

필자는 아무래도 거의 대부분 Spring을 사용하기에 Reactor로 먼저 접하게 되었다. 안드로이드에서는 Reactor 보다는 RxJava를 더 선호하고 많이 쓰는 것 같다. 하지만 상관없다. Spring 에선 RxJava 나 Reactor 나 혹은 다른 Reactive Streams 구현 된 어느 것을 써도 좋다. 이건 조만간 다시 한번 다뤄보기로 하겠다.

그럼 한번 어떤 메서드들이 있는지 주로 사용될만한 메서드 위주로 한번 살펴보도록 하자.

일단 모노 부터 만들어보자!

Mono

위의 그림은 reactor Mono 에 대한 그림이다. 사실 처음 보는 개발자분들이라면 이게 뭔가 싶기도 하다. (필자도 가끔 뭔가 싶기도 하다.)
위의 그림은 마블다이어그램이라 하는데 해당 오퍼레이션들이 어떤 행위를 하는지 나타낸 그림이다.
쉽게 생각하면 왼쪽에서 오른쪽으로의 흐름을 나타내며 오버레이터를 통해 어떤 결과가 어떤식으로 나오고 있는지 생각하면 될듯 싶다.

Mono 는 Reactive Streams 의 구현체로 0 또는 1의 스트림을 만들 수 있다. 나중에 배울 Flux도 마찬가지로 Reactive Streams의 구현체이며 0 부터 N 까지의 스트림을 만들 수 있으니 참고 하면 되겠다.

아주 쉽게 비교를 하자면 java8에 나온 Optional 과 Stream 으로 비교할 수 있을 것 같다.
Optional 은 비어있거나 값을 가지고 있고 Stream 은 0 ~ N 까지의 연속된 요소들을 의미한다.

이렇게 비교하면 좀 더 접근하기 쉬울 것 같아 비교를 해봤다.

그럼 이제 본격적으로 모노를 만들어 보자!

kotlin, java 모두 예제에 넣어봤다.

just


Mono<String> just = Mono.just("hello reactor");

코틀린의 경우 원래 reactor 에 기본 확장확함수가 있었는데 Deprecated 되고 extensions 디펜더시를 추가 해야 된다.

아주 기본적인 사용법이다. 모노를 만드는 가장 쉬운 방법이다. 자바의 Optional과 조금 비슷해 보인다.

fromSupplier

만약 지연된 처리가 필요하다면 fromSupplier 을 사용하면 된다.


Mono<String> fromSupplier = Mono.fromSupplier(() -> "hello reactor");

from** 으로 시작하는 메서드는 다양하다. Callable, CompletionStage, Runnable, Future 등 여러 메서드들이 있으니 필요에 따라 사용하면 되겠다.
from** 메서드는 기본 값으로 사용하거나 fallback 으로 운영에서도 종종 사용하는 편이다.

error

에러를 만드는 방법이다.


Mono<String> error = Mono.error(new NullPointerException()); Mono<String> errorSupplier = Mono.error(NullPointerException::new);

구독할 때 에러를 방출하여 종료한다. 이것 역시 운영에서 종종 사용하는 편이다.

empty, never

빈 모노와 무기한으로 실행되는 모노를 만든다. 사실 never는 사용한 적이 단 한번도 없다.


Mono<String> empty = Mono.empty(); Mono<String> never = Mono.never();

해당 메서드는 데이터를 방출하지 않는다. 사실상 아무 것도 하지 않는다. empty 는 완료신호는 오지만 never 경우는 무기한으로 실행되므로 오류, 완료 등 어떠한 신호도 오지 않는다.
never 는 필요에 따라 테스트할 경우 사용한다 하는데 필자는 그런 경우가 없어 사용한 적은 없다.

아주 기본적인 모노를 만드는 경우를 살펴봤다. 아직 모노만 만드는데 반도 못온 느낌이다. 이러다 오늘은 모노만 만들다 끝나겠는걸..

zip

모노를 만드는 메서드 중에 필자가 아마 가장많이 쓰지 않나 싶다. 물론 각자가 다 다르겠지만 필자의 경우 각각의 Mono 들을 aggregating 하는 경우가 많았다. 아마 가장 많이는 사용하지 않더라도 프로젝트에 reactor 를 사용한다면 꼭 한번을 쓸 일이 있을 듯 하다.

방금도 이야기 했지만 모노들을 aggregating 하는 역할을 한다. 모노들이 각각 동작하므로 여러 모노들을 한꺼번에 동작하게 만들 때 유용하게 쓰인다.


Mono<String> zip = Mono.zip(Mono.just("foo"), Mono.just("bar"));

기본 사용법은 위와 같다. 필자가 말한대로 모노들이 각각 동작하는지는 테스트해보자.


@Test void zipTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zip(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); } private Mono<String> testMethod1() { return Mono.just("foo") .delayElement(Duration.ofSeconds(1)) .doOnNext(System.out::println); } private Mono<String> testMethod2() { return Mono.just("bar") .doOnNext(System.out::println); }

위의 메서드들 중에 하나는 delay 를 주었다. 만약 차례대로 실행이 되어야 한다면 foo 가 출력된 후에 bar 출력 되어야 한다. 하지만 그렇지 않다. 실행시키지마자 bar가 출력되고 1초후 foo가 실행 된다.

위와 같이 zip의 파라미터가 2개 일 경우에는 Tuple2<T1, T2> 로 생산 된다. zip으로 8개 까지 가능하며 그 후로는 Iterable 타입으로 넘겨야한다.


Mono.zip(testMethod1(), testMethod2()) .subscribe((Tuple2<String,String> it) -> { });

필자는 어떤 타입인지 보여주기 위함이지 타입은 제거해도 좋다. 참고로 zip의 하나라도 empty 거나 오류를 방출하면 즉시 종료된다. empty 경우엔 onNext도 방출하지 않는다.

when

when 은 zip 과 유사하지만 onNext 를 방출하지 않는다. 단지 각각의 모노를 독립적으로 실행 시킬때만 사용하면 된다. 이 역시 종종 사용할 경우가 있다.


@Test void whenTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.when(testMethod1(), testMethod2()) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

이 역시 zip과 동일하게 오류를 방출하면 즉시 종료 된다.

delay

메스드명 그대로 delay를 줄 수 있는 모노를 만들 수 있다. 해당 Duration 만큼 지연된 후에 onNext를 방출한다. 방출되는 Long 의 값은 0 이다.


@Test void delayTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.delay(Duration.ofSeconds(1)) .doOnNext(System.out::println) .subscribe(it -> { }, (throwable) -> { }, countDownLatch::countDown); countDownLatch.await(); }

위의 코드는 1초 후에 onNext 로 방출한다.

defer

defer 메서드는 fromSupplier 와 비슷하게 지연된 Mono 처리를 하고 싶다면 해당 메서드를 이용하면 된다.


Mono<String> defer = Mono.defer(() -> Mono.just("foo"));

음 간단한 예 로는 아직 배우진 않았지만 switchIfEmpty 에 아주 적합할 수 있다. 만약 모노가 비었을 때 해당 메서드를 사용하여 다른 모노로 대체할 수 있는 fallback 메서드이다.


Mono.just("foo") .switchIfEmpty(Mono.just("bar")) .subscribe(); }

만약 위와 같은 코드가 있다면 모노가 비어있지 않았음에도 불구하고 Mono.just(“bar”)를 매번 호출 한다. 사실 위와 같은 코드라면 많은 상관은 없지만 만약 다른 무거운 작업을 한다 가정하면 사실 불필요한 작업을 더 하는 꼴이 된다. 좀 더 우아하게 이 때 defer 메서드를 사용하면 된다.


Mono.just("foo") .switchIfEmpty(Mono.defer(() -> Mono.just("bar"))) .subscribe(); }

위와 같이 코드를 작성한다면 Mono.just(“bar”)는 정말로 모노가 비어있을 때만 실행 된다.

from

reactive streams API 의 Publisher 타입을 Mono 로 바꿀 수 있다. 1개 이상의 스트림일 경우 (예 : Flux) 첫번째 onNext 만 방출 되며 종료 된다.


Mono.from(Flux.just(1,2,3,4,5)) .subscribe(System.out::println);

결과는 1만 방출 되며 종료 된다.
꼭 Reactor 만 되는 것은 아니다 Publisher 타입을 구현한 것이라면 해당 메서드를 사용할 수 있다.
아래는 RxJava 를 사용한 예제이다.


Mono.from(Single.just("bar").toFlowable()) .subscribe(System.out::println);

동일하게 bar 가 방출 되며 종료 된다.

***DelayError

에러를 지연시키며 모든 예외가 결합되서 에러를 발생시킨다. 에러가 나더라도 zip 의 모든 Mono 를 실행 시킨다.


@Test void delayErrorTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.zipDelayError(testMethod1(), Mono.error(new NullPointerException()), testMethod1(), Mono.error(new IllegalArgumentException())) .subscribe((it) -> {}, System.out::println, countDownLatch::countDown); countDownLatch.await(); }

위와 같은 코드가 있다면 모든 zip의 Mono 를 실행하고 나머지 에러를 결합해서 보여준다.
출력 결과는 다음과 같다.

foo
foo
reactor.core.Exceptions$CompositeException: Multiple exceptions

만약 zip을 사용했다면 NullPointerException 에러만 방출 한다.

여기에선 zip 만 설명했지만 when 도 동일하다.

create

Listener 혹은 callback 기반의 모노를 만들 수 있다. 예를 들어 비동기 콜백 코드를 모노로 만들 수 있다 생각하면 되겠다.
말보다는 코드를 보면 훨씬 이해가 빠를듯 싶다.


@Test void createTest() { Mono.create(sink -> { client.async(request, new Listener() { @Override public void onFailure(Exception e) { sink.error(e); } @Override public void onResponse(Response response) { sink.success(response); } }); }); }

위와 같이 비동기 콜백 코드는 모노 형태로 바꾸어 사용할 수 있다.
만약 리스너 해제 및 자원 해제는 onDispose 메서드를 사용해서 처리 할 수 있으며 취소 시그널을 받고 싶다면 onCancel 메서드를 사용하면 된다. 만약 다른 라이브러리를 쓰는데 비동기 콜백 코드로 작성되어 있다면 쉽게 모노 바꿀 수 있어 좋다. 필자도 종종 운영에서 사용했다.

using

이 메서드는 사실 사용해보지 않았다. 그리고 사용할 일도 없었던거 같았다. 마블 다이어그램도 복잡하다.
이 메서드는 외부 자원을 스트리밍하고 해제하는 역할을 한다. 아마 가장 많이 사용할 곳은 파일을 읽고 쓰고 하는곳이 아닐까 싶다. 혹은 socket을 열고 닫고 하는? 대략 그런 부분에서 많이 사용될 듯 싶다.
사실 필자도 써본적이 없어 그냥 간단한 사용법만 가져왔다.


@Test void usingTest() { Mono.using(() -> AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.READ), it -> Mono.create(sink -> it.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { sink.success(attachment); } @Override public void failed(Throwable exc, ByteBuffer attachment) { sink.error(exc); } })), it -> { try { it.close(); } catch (IOException e) { } }); }

조금 복잡해 보이긴해도 그닥 어려운 내용은 아니다. 만약 좀 더 궁금하다면 using를 사용하는 코드를 참고 하면 되겠다.

usingWhen

이 것 역시 using과 동일하게 사용해본적이 없다. using 과 사용법은 거의 동일하나 타입이 Publisher 타입이다. 사용곳은 아마 주로 트랜잭션 처리에 사용가능 할 듯 하다.

@Test
void usingWhenTest() {
    Mono<String> data = Mono.just("foo");
    Mono.usingWhen(data, it ->
            Mono.just(it),
            it -> transition.commit(),
            (it, error) ->  transition.rollback(),
            it-> transition.rollback())
}

예제가 영 시원찮다. 나중에 좀 더 나온 샘플 코드가 생각나거나 필자가 사용할 일이 있다면 좀 더 구체적으로 남기겠다.

오늘은 중간에도 말했다시피 모노를 만드는 것으로 끝이났다. 이렇게 보니까 모노로 만들 수 있는 메서드가 생각 보다 많은 것 같다. 오버로딩 된 메서드들은 따로 설명하지 않아도 행위 자체는 동일 하기에 필요하다면 문서를 보는 것을 추천한다.

다음 편은 이어서 Mono 의 오퍼레이터에 대해서 살펴보도록 하자.

Spring boot 2.2

오늘은 좀 늦은감이 있지만 그래도 spring boot 2.2 의 변화에 대해서 알아보도록 하자. 물론 예전에 틈틈이 특정부분은 관련해서도 남기긴 했지만 정리하는 의미에서 다시 한번 살펴보도록 하자. 물론 이것도 필지가 자주 사용할 것들 혹은 자주 사용하는 것들만 정리하니 나머지는 해당 문서를 참고하면 되겠다.

Spring Framework 5.2

알다시피 Spring Framework 5.2로 업그레이드 되었다. 관련해서는 해당 문서를 찾아보면 더 좋을 듯 싶다.
해당 문서는 여기를 살펴보자.

JMX now disabled by default

JMX는 더 이상 기본적으로 비활성화 되어있다. 만약 이 기능을 사용하고 싶다면 spring.jmx.enabled=true를 사용하여 활성화 시킬 수 있다.
사실 이 기능은 많은 사용자가 사용하고 있지 않다고 판단되고 생각보다 많은 리소스를 필요로 하기 때문에 이와 같은 결정을 내린 것이라고 한다.

Jakarta EE dependencies

다들 알다시피 java ee는 몇년전에 이클립스 재단에 넘어갔다. 그러면서 아마도 네임스페이스를 변경작업을 진행했을 것이다. 스프링부트도 그에 맞게 javax. 에서 jakarta.를 사용할 수 있게 디펜더시가 추가 되었다. 아직은 javax의 디펜더시가 존재하지만 추후에는 사라질 예정이니 만약 직접적으로 디펜더시를 받고 있다면 마이그레이션 하는게 좋다.

<jakarta-activation.version>1.2.1</jakarta-activation.version>
<jakarta-annotation.version>1.3.5</jakarta-annotation.version>
<jakarta-jms.version>2.0.3</jakarta-jms.version>
<jakarta-json.version>1.1.6</jakarta-json.version>
<jakarta-json-bind.version>1.0.2</jakarta-json-bind.version>
<jakarta-mail.version>1.6.4</jakarta-mail.version>

...

위는 디펜더시 관리하는 부분의 일부를 가져왔다.
또한 com.sun.mail:javax.mailcom.sun.mail:jakarta.mail 변경이 되었고, org.glassfish:javax.elorg.glassfish:jakarta.el로 artifact ID 가 변경 되었으므로 이것 역시 직접적으로 선언해서 사용한다면 마이그레이션하는게 정신건강에 좋을 듯 싶다.

JUnit 5

기본적으로 spring boot starter test 에는 junit5가 디폴트로 설정되어 있다. 또한 vintage engine 도 포함되어 있으니 junit4를 이용해도 좋다. 만약 spring boot 2.2로 마이그레이션을 한다 한들 문제는 없다. 점진적으로 junit5로 업그레이드도 가능하다. 왜냐하면 동일한 모듈에서 junit5 와 junit4를 혼합하여 테스트를 작성해도 아무 문제가 없다.

허나 주의할 점이 하나 있다. junit4의 listener 기능을 사용한다면 위의 junit5의 모듈들을 사용할 수 없다.

<configuration>
    <properties>
        <property>
            <name>listener</name>
            <value>com.example.CustomRunListener</value>
        </property>
    </properties>
</configuration>

만약 위의 같은 기능을 현재 사용하고 있다면 vintage engine을 사용할 수 없으니 junit4를 직접적으로 선언하여 사용해야 한다.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.mockito</groupId>
                <artifactId>mockito-junit-jupiter</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

Elasticsearch & Reactive Elasticsearch Auto-configuration

Elasticsearch 관련해 조금 변경 사항이 있다. Elasticsearch 쪽의 TransportClient가 7.0부터 Deprecated 되어 아마도 추후에는 해당 관련 설정은 다 삭제 될 것으로 보인다. 클러스터 노드를 설정할 수 있는 spring.data.elasticsearch.cluster-nodesspring.data.elasticsearch.properties 프로퍼티도 Deprecated 되었다.

jest 또한 Deprecated 되어 이제는 RestHighLevelClient를 권장하는 듯 하다.

Reactive Elasticsearch 자동설정이 추가 되었다. 해당 설정은 spring.data.elasticsearch.client.reactive. 들을 이용하면 된다. 해당 설정후에 ReactiveElasticsearchClient를 직접사용해도 되고, Spring 스럽게(?) 만든 ReactiveElasticsearchOperations 을 이용해도 좋다.

또한 ReactiveElasticsearchRepository을 이용하여 Spring data 스럽게(?) 만든 repository도 이용할 수 있다.

public class FooController {

    private final ReactiveElasticsearchClient reactiveElasticsearchClient;
    private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;

    public FooController(ReactiveElasticsearchClient reactiveElasticsearchClient,
                         ReactiveElasticsearchOperations reactiveElasticsearchOperations) {
        this.reactiveElasticsearchClient = reactiveElasticsearchClient;
        this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
    }

    // ...
}

public interface FooRepository implements ReactiveElasticsearchRepository<Foo, String> {

    //...
}

Hibernate Dialect

예전에는 아무 설정 하지 않았을 경우 Dialect 를 spring boot 가 결정하곤 했다. 하지만 이제는 spring boot가 결정하지 않고 해당 JPA 컨테이너가 결정하기로 변경하였다. 사실 하이버네이트 말곤 다른 구현체들은 잘 모르겠다. 테스트도 해보지 않아서.. 만약 동작하지 않는다면 명시적으로 선언하면 된다.

Actuator HTTP Trace and Auditing are disabled by default

Actuator HTTP Trace 와 Auditing 기본적으로 활성화가 되지 않는다. 기본적으로는 인 메모리를 사용하기 때문에 불필요한 메모리를 사용하고 클러스터의 친화적이지 않기 때문에 비활성으로 기본값을 변경하였다. 만약 운영에서 사용하려면 Spring Cloud Sleuth 또는 그와 유사한 어떠한 것을 사용해도 좋다.

혹시나 간단한 테스트를 하기 위해 인메모리라도 사용하고 싶다면 다음과 같이 설정하면 된다.

@Bean
public InMemoryHttpTraceRepository traceRepository() {
    return new InMemoryHttpTraceRepository();
}

Logback max history

로그백 기본 max history가 0일에서 7일로 변경되었다. 만약 그 값을 변경하고 싶다면 logging.file.max-history 프로퍼티를 사용해 변경하면 된다.

Sample projects renamed and relocated

Sample projects 들의 이름(smoke)과 소스 재배치가 되었다. 궁금하다면 github을 찾아 보면 된다.

Java 13 support

Spring Boot 2.2는 Java 13에 지원한다. 또한 Java 8 및 11도 여전히 지원하고 있다.

Lazy initialization

모든 빈들을 초기화를 지연 시킬수 있다. spring.main.lazy-initialization 이용해서 모든 빈들의 생성을 지연시킬 수 있는 옵션이다. 여기서 주의할 점은 http 첫 요청이 조금 느리고, startup시 에러가 발생한 코드가 들어있다면 startup시 찾지 못할 수도 있다. 사실 http 첫 요청이 느린건 괜찮지만 후자인 startup시 에러가 발생하지 않는다면 조금 크리티컬한 부분일 수 도 있다. 아마도 운영환경에선 사용하지 않는 것을 추천하고 싶다.

만약 때때로 특정 클래스는 초기화를 지연시킬 필요가 없거나 그러고 싶지 않은 경우도 있을 것이다. 그럴때는 @Lazy(false)어노테이션을 해당 빈에 설정하면 된다.

@Bean
@Lazy(false)
public Filters filters() {
    return new Filters();
}

위의 경우는 초기화 지연을 하지 않는다. 만약 개발자가 직접 컨트롤 할 수 없는 클래스들은 LazyInitializationExcludeFilter를 이용해서 제외 시키면 된다.

@Bean
static LazyInitializationExcludeFilter integrationLazyInitExcludeFilter() {
    return LazyInitializationExcludeFilter.forBeanTypes(Filters.class);
}

Spring Data Moore

Spring Boot 2.2는 Spring Data Moore(spring data 2.2) 를 제공한다. 사실 이부분은 추후에 다룰 예정이니 그때 글을 참고하면 될 것같다.

@ConfigurationProperties scanning

@ConfigurationProperties 어노테이션을 스캐닝 할 수 있는 어노테이션이 추가 되었다. spring boot 2.2 이전엔 @ConfigurationProperties 어노테이션을 직접사용할 경우에 @Component 나 @EnableConfigurationProperties 어노테이션을 사용했지만 이제는 그럴 필요 없다. @ConfigurationPropertiesScan 어노테이션을 사용하면 @ConfigurationProperties 어노테이션이 달린 클래스들은 모두 스캔하므로 추가적인 작업을 할 필요 없다.

@EnableConfigurationProperties(FooProperties.class)
@Configuration
public class Config {
}

or 

@Component
@ConfigurationProperties("foo")
public class FooProperties {
}

예전에는 위와같이 작업을 했다면 이제는 그럴 필요 없이 아래와 같이 작업을 하면 된다.

@SpringBootApplication
@ConfigurationPropertiesScan
public class Application {
   //.. main 
}

참고로 spring 2.2.0 에서는 @SpringBootApplication 어노테이션에 @ConfigurationPropertiesScan 어노테이션이 메타 어노테이션으로 존재했지만 spring boot 2.2.1에서는 버그로 인해 제거 되었다. 그 이유가 궁금하다면 여기를 살펴보면 된다.

Immutable @ConfigurationProperties binding

불변의 @ConfigurationProperties가 추가 되었다. 코틀린을 좀 더 위한거겠지? 어떻게 사용하는지 보자.

@ConfigurationProperties("http")
@ConstructorBinding
public class BarProperties {

    private final String url;
    private final Duration timeout;
    private final LocalDate date;

    public BarProperties(String url, @DefaultValue("10s") Duration timeout,
                         @DateTimeFormat(pattern = "yyyyMMdd") LocalDate date) {
        this.url = url;
        this.timeout = timeout;
        this.date = date;
    }

    public Duration getTimeout() {
        return timeout;
    }

    public String getUrl() {
        return url;
    }

    public LocalDate getDate() {
        return date;
    }
}

위와 같이 @ConstructorBinding 어노테이션을 이용해 생성자 바인딩을 한다고 알려줘야 한다. 그런후엔 생성자로 파라미터들을 받을 수 있다. @DefaultValue 어노테이션을 이용해 기본값을 넣을 수도 있고 @DateTimeFormat 어노테이션을 이용해 date format도 지정할 수 있다.

http.url=http://localhost
http.date=20191111

코틀린 코드도 한번 보자.

더 심플한 코드가 되었다. 이래서 코틀린을 써야.. 아무튼 좀 더 코틀린스럽게(?) 코드가 되었다.

RSocket Support

Rsocket을 지원하기 시작했다. Rsocket이 뭔지 궁금한 분들을 여기를 참고하면 되겠다.
spring-boot-starter-rsocket인 starter 를 디펜더시 받으면 자동설정이 동작한다. spring.rsocket.server. 프로퍼티들을 이용해서 설정할 수 있으니 참고하면 되겠다.
CBOR과 JSON을 사용하여 인코딩 디코딩 설정을 자동으로 구성해주고 있다. 또한 spring-security-rsocket가 클래스패스에 있을 경우 시큐리티도 자동으로 설정 된다.
아주 간단하게 샘플 코드만 보도록 하고 나머지는 해당 문서를 보는 것을 추천한다.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

@Controller
public class RSocketController {

    @MessageMapping("foo.{name}")
    public Mono<Foo> foo(@DestinationVariable String name) {
        return Mono.just(new Foo(name));
    }
}

public class Foo {

    private final String name;

    public Foo(@JsonProperty("name") String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

@SpringBootTest(properties = "spring.rsocket.server.port=0")
class RSocketControllerTests {

    @LocalRSocketServerPort
    private int port;

    private final RSocketRequester.Builder builder;

    @Autowired
    RSocketControllerTests(RSocketRequester.Builder builder) {
        this.builder = builder;
    }

    @Test
    void rsocketTest() {
        RSocketRequester requester = this.builder
                .connectTcp("localhost", this.port).block(Duration.ofSeconds(10));
        Mono<Foo> result = requester.route("foo.wonwoo").retrieveMono(Foo.class);
        StepVerifier.create(result)
                .assertNext(it -> Assertions.assertThat(it.getName()).isEqualTo("wonwoo"))
                .verifyComplete();
    }
}

음 딱히 어려운 부분은 없다. 우리가 자주 사용하는 web Controller와 비슷한 느낌이라 많은 거부감은 없는 듯 싶다.

RestTemplateBuilder request customisation

RestTemplateBuilder 에 몇가지 추가 되었다. 기본헤더를 넣을 수 있거나 request를 커스텀할 수 있는 기능이다.

public RestTemplateBuilder defaultHeader(String name, String... values)

public RestTemplateBuilder requestCustomizers(RestTemplateRequestCustomizer<?>... requestCustomizers)

요청 정보 커스터마이저 메서드는 좀 더 있지만 비슷한 맥략이라 생략했다.

private final RestTemplateBuilder builder;

public TestController(RestTemplateBuilder builder) {
    this.builder = builder.defaultHeader("foo", "bar").requestCustomizers(request -> ...).build();
}

Plain text support for Thread dump endpoint

actuator endpoint 중 하나인 Thread dump를 Plain text 내려받을 수 있다. 기존에는 json만 존재했지만 이제는 Plain text도 추가 되어 Thread Dump Analyzer
https://fastthread.io 여기에서 비쥬얼라이징 할 수 있으니 참고하면 되겠다. 사실 테스트는 fastthread 여기에서만 해봤다.

Qualifier for Spring Batch datasource

여러 데이터 소스가 있는 경우 @BatchDataSource어노테이션으로 spring batch에서 사용하는 datasource를 표시할 수 있다.

Health indicator groups

Health indicator를 그룹핑하여 사용할 수 있다. 쿠버네티스의 liveness, readiness 프로브의 대해 다른 상태를 표시할 수 있다는 그런내용?..

management.endpoint.health.group.foo.include=db,redis

위와 같이 설정했다면 엔드포인트 /actuator/health/foo를 실행 할 수 있다. 그러면 db와 redis만 Health indicator에 포함되어 체크한다. 만약 redis를 포함하고 싶지 않다면 redis를 제외 시키면 된다.

management.endpoint.health.group.foo.include=db

이외에도 좀 더 많은 프로퍼티가 존재하니 참고하면 되겠다.

management.endpoint.health.group.foo.show-details=
management.endpoint.health.group.foo.roles=
management.endpoint.health.group.foo.exclude=
management.endpoint.health.group.foo.show-components=

하나만 제외하고 나머지 프로퍼티들은 원래 있던 기능이니 해당 문서를 살펴보길 추천한다. show-components 바로 밑에 설명하겠다.

Health Endpoint component detail

Health Endpoint 의 component detail을 볼 수 있는 기능이 생겼다. 위에서 잠깐 언급했지만 그 프로퍼티는 show-components를 이용하면 된다. 이것 역시 show-details과 동일하게 NEVER, WHEN_AUTHORIZED, ALWAYS 등 3가지 타입이 존재한다.
NEVER는 표시 하지 않겠다는 의미, WHEN_AUTHORIZED 인증 후에 표시 하겠다는 의미, ALWAYS 항상 표시 하겠다는 의미를 갖고 있다.

이 기능은 세부정보와는 다르게 컴포넌트들의 Health 정보의 상태만 보여주는 기능이다.

http :8080/actuator/health

{
    "components": {
        "db": {
            "status": "UP"
        },
        "diskSpace": {
            "status": "UP"
        },
        "ping": {
            "status": "UP"
        }
    },
    "status": "UP"
}

show-details과는 다르게 각 컴포넌트 별 status만 출력 되는 것을 볼 수 있다.

오늘은 위와 같이 spring boot 2.2에 대해서 알아봤다. 사실 더 많은 내용이 있긴 한데 필자가 잘 쓰지 않거나 이해 되지 않은 내용은 작성하지 않았으니 꼭 해당 문서를 통해 확인하길 바란다.

Spring WebClient

오늘은 Spring의 WebClient의 사용법에 대해서 몇가지 알아보도록 하자. 사용 API만 살펴 볼 예정이므로 reactive streams(reactor..) 들의 개념과 사용법은 다른 블로그를 살펴보길 바란다. reactive streams 대한 내용을 알고 보면 좋지만 몰라도 코드를 보는데는 문제가 없을 듯 하다.

WebClient는 Spring5 에 추가된 인터페이스다. spring5 이전에는 비동기 클라이언트로 AsyncRestTemplate를 사용을 했지만 spring5 부터는 Deprecated 되어 있다. 만약 spring5 이후 버전을 사용한다면 AsyncRestTemplate 보다는 WebClient 사용하는 것을 추천한다. 아직 spring 5.2(현재기준) 에서 AsyncRestTemplate 도 존재하긴 한다.

기본 문법

기본적으로 사용방법은 아주 간단하다. WebClient 인터페이스의 static 메서드인 create()를 사용해서 WebClient 를 생성하면 된다. 한번 살펴보자.

@Test
void test1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

create()는 두가지 메서드가 있는데 baseUrl를 지정해주는 것과 그렇지 않은 것 두가지가 존재한다. 원하는 API를 사용하면 되겠다.

@Test
void test1() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

API 자체도 명확하다. get(), post(), put(), patch(), 등 http method들을 모두 정의되어 있다.

webClient.get()
  .///
webClient.post()
  .///
webClient.put()
  .///
webClient.method(HttpMethod.GET)
  .///

또는 위와 같이 HttpMethod를 지정할 수 있다.

uri 또한 여러 메서드가 존재한다. 단순하게 string 으로 uri을 만들 수 도 있고 queryParam, pathVariable 등 명확하게 uri을 만들 수 도 있다. 위의 코드를 사실 RestTemplate 클래스를 자주 사용했다면 익숙한 문법일 수 있다.

@Test
void test1_3() {

    WebClient webClient = WebClient.create();
    Mono<String> hello = webClient.get()
            .uri("http://localhost:8080/sample?name=wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}


@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", Map.of("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

@Test
void test1_3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.get()
            .uri(it -> it.path("/sample")
                    .queryParam("name", "wonwoo")
                    .build()
            ).retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위와 같이 여러 방법이 존재하니 각자가 원하는 어떤것을 사용해도 좋다. 마지막 S uri(Function<UriBuilder, URI> uriFunction) API는 좀 더 세세하게 컨트롤 할 수 있으니 세세하게 컨트롤 할 일이 있다면 이 API를 사용하는게 좋다.

다음은 retrieve() 메서드인데 이 메서드는 request를 만들고 http request를 보내는 역할을 한다. 이 메서드 말고 exchange()가 존재하는데 약간 다르다. 사실 API만 살짝 다를뿐이지 retrieve() 내부에선 exchange() 메서드를 호출한다.

retrieve() 메서드는 ResponseSpec 타입을 리턴하고 exchange() 메서드는 Mono<ClientResponse> 를 리턴하고 있다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> it.bodyToMono(String.class));

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

위의 test1_3 메서드와 test2 메서드는 동일한 동작을 한다. 위에서 말했다시피 exchange() 메서드는 ClientResponse를 사용해 좀 더 세세한 컨트롤을 하면 된다.

@Test
void test2() {

    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .exchange()
            .flatMap(it -> {
                if(it.statusCode() == HttpStatus.NOT_FOUND) {
                    throw new NotFoundException("....");
                }
                return it.bodyToMono(String.class);
            });

    StepVerifier.create(hello)
            .verifyError(NotFoundException.class);
}

이렇게 기본문법에 대해서 알아봤다. 그리 어려운 내용도 없는 듯 하다. 좀 더 범용성있게 사용하려면 아직은 부족하다. 좀 더 살펴보자.

formData 와 message

위에서 알아보지 않은게 있는데 바로 post나 put 기타 http 메서드에 자주 사용하는 formdata 와 message에 대해서 알아보자.
만약 formData 로 server에 보낸다면 다음과 같이 작성하면 된다.

import static org.springframework.web.reactive.function.BodyInserters.fromFormData;

@Test
void test3() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.post()
            .uri("/sample")
            .body(fromFormData("name", "wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

fromFormData란 static 메서드를 사용해서 전달하면 된다. 만약 좀 더 많은 내용이 있다면 .with(key, value) 메서드를 체이닝해 사용하면 된다.

.body(fromFormData("name", "wonwoo").with("foo","bar").with("...","..."))

또는 MultiValueMap를 이용해서 fromFormData에 넣어도 된다.

이번엔 message에 대해서 알아보자. 일반적으로 json, xml 기타 message로 보낼때 유용하다. 한번 살펴보자.

@Test
void test3_1() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .bodyValue(new Sample("wonwoo"))
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

위와같이 bodyValue를 이용해서 message를 전달할 수 있다. 참고로 spring 5.2 이전버전에선 syncBody를 이용해야 한다. spring 5.2에선 syncBody는 Deprecated 되었다.

만약 전달하는 message가 Publisher 타입일 수 도 있다. 그럼 다음과 같이 작성하면 된다.

@Test
void test3_2() {

    WebClient webClient = WebClient.create("http://localhost:8080");
    Mono<String> hello = webClient.put()
            .uri("/sample")
            .body(Mono.just(new Sample("wonwoo")), Sample.class)
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("wonwoo")
            .verifyComplete();
}

filter

filter이용하면 client를 호출하기전에 인터셉터해서 추가적인 작업을 할 수 있다. 예를들면 로그를 출력 할 수도 있고 헤더정보 혹은 인증정보를 넣어 호출 할 수 있다.

필터를 사용하려면 ExchangeFilterFunction 인터페이스를 구현하면 된다. 추상 메서드는 하나뿐이라 람다를 이용해도 좋다.

@Test
void test4() {

    WebClient webClient = WebClient.builder().filter((request, next) -> next.exchange(ClientRequest.from(request)
            .header("foo", "bar").build())).baseUrl("http://localhost:8080")
            .build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위의 코드는 헤더 정보 를 추가하는 코드이다. 또한 한번에 여러 필터를 적용할 수 도 있다.

WebClient.builder().filters(exchangeFilterFunctions ->
        exchangeFilterFunctions.add(0, (request, next) -> {
            return next.exchange(request);
        }));

위의 코드는 0번째에 해당 필터를 삽입하는 코드이다. 물론 filter를 계속 체이닝해서 써도 상관 없다.

ClientHttpConnector

현재 spring에서는 reactive http client가 2개밖에 존재하지 않는다. netty와 jetty이다. 사실 spring을 사용한다면 그냥 netty를 사용하는게 정신건강에 좋을 듯 싶다.

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
</dependency>

위와 같이 jetty reactive httpclient를 먼저 디펜더시 받은 후에 다음과 같이 작성하면 된다.

@Test
void test5() {
    WebClient webClient = WebClient.builder().clientConnector(new JettyClientHttpConnector())
            .baseUrl("http://localhost:8080").build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();
}

추가 적인 설정은 JettyClientHttpConnector 클래스와 org.eclipse.jetty.client.HttpClient 클래스를 살펴보면 되겠다.

default values

만약 기본으로 헤더정보 쿠키정보등 값을 지정하고 싶다면 다음과 같이 작성하면 된다.

@Test
void test6() {

    WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080")
            .defaultHeader("foo", "bar")
            .defaultCookie("foo", "BAR")
            .defaultRequest(it -> it.header("test", "sample")).build();

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .expectNext("hello wonwoo!")
            .verifyComplete();

}

위와 같이 작성하면 기본으로 위와 같은 값이 같이 전송된다. defaultRequest() 메서드를 사용하면 좀더 세세하게 컨트롤이 가능하니 참고하면 되겠다.

retrieve

위에서 잠깐 언급한 retrieve 메서드를 이용하는 경우에 보다 상세한 에러 코드들을 컨트롤 할 수 있다. 원한다면 사용해도 좋다.

@Test
void test7() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample1?name={name}", "wonwoo")
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, __ -> Mono.error(new IllegalArgumentException("4xx")))
            .onStatus(HttpStatus::is5xxServerError, __ -> Mono.error(new IllegalArgumentException("5xx")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("4xx");

} 

onStatus 메서드를 이용해서 해당 코드를 작성후에 Mono type의 exception을 던지면 된다. 위의 코드는 4xx 에러 코드일땐 4xx라는 메시지를 던지고 5xx 에러 코드일 땐 5xx라는 메세지를 던진다는 코드이다.

onStatus() 메서드 말고도 onRawStatus() 메서드도 존재한다. 이것은 위와 같이 HttpStatus 코드가 아닌 int로된 코드를 리턴한다.

@Test
void test8() {
    WebClient webClient = WebClient.create("http://localhost:8080");

    Mono<String> hello = webClient.get()
            .uri("/sample?name={name}", "wonwoo")
            .retrieve()
            .onRawStatus(it -> it == 400, __ -> Mono.error(new IllegalArgumentException("aaa")))
            .bodyToMono(String.class);

    StepVerifier.create(hello)
            .verifyErrorMessage("400");
}

이렇게 기본문법과 사용법에 대해서 알아봤다. 물론 좀 더 많은 메서드들이 있지만 필자가 자주 사용할만한 API 위주로 살펴봤다. 다른 궁금한 점이 있다면 해당 문서를 찾아보길 추천한다.

마지막으로 Spring boot를 사용할 때에 WebClient는 어떻게 사용해야 될까? 사실 기본적인 설정은 되어있다. 그래서 아주 쉽고 간단하게 사용할 수 있다.

spring boot

spring boot 를 사용할 때는 WebClient.Builder 인터페이스가 기본적으로 bean으로 등록 되어있다. 그래서 우리는 이걸 사용하면 된다.

@RestController
public class WebClientController {

    private final WebClient webClient;

    public WebClientController(WebClient.Builder builder) {
        this.webClient = builder.baseUrl("http://localhost:9999").build();
    }

    @GetMapping("/users")
    public Mono<String> findByName() {
        return webClient.get()
                .uri("/users")
                .retrieve()
                .bodyToMono(String.class);
    }
}

딱히 설명할 것도 없다. 만약 필터나 default values 가 필요하면 위에서 했던 그 방법을 그대로 이용하면 된다.

public WebClientController(WebClient.Builder builder) {
    this.webClient = builder
            .filter((request, next) -> next.exchange(request))
            .defaultHeader("foo", "bar")
            .baseUrl("http://localhost:8080")
            .build();
}

그리고 만약 전역적으로 커스텀할 코드들이 있다면 WebClientCustomizer 인터페이스를 이용해서 커스텀할 수 있다.

@Bean
WebClientCustomizer webClientCustomizer() {
    return builder -> builder.filter((request, next) -> next.exchange(request));
}

위와 같이 WebClientCustomizer 빈으로 등록하여 커스터마이징하면 된다.

번외로 kotlin 코드도 한번 살펴보자.

bodyToFlux(), bodyToMono(), body(), awaitExchange(), bodyToFlow(), awaitBody() 등 확장함수로 된 함수들이 몇가지 존재하니 참고하면 되겠다. 몇가지는 코루틴 관련 확장함수이다.

오늘은 이렇게 Spring의 WebClient에 대해서 살펴봤다. 이정도만 알아도 사용하기엔 충분할 듯 싶다. Spring 5에서 Non blocking http client를 사용한다면 꼭 WebClient를 사용하도록 하자!