Reactor 써보기 (3)

Reactor를 거의 일년만에 다시 작성한다. 요즘 영 귀찮아서 블로그를 잘 안썼더니 올해 처음으로 작성한다.

Flux

just, fromIterable, fromStream, range

just는 Mono 에서도 배웠다. 동일하게 Flux에서도 just를 통해 Flux를 생성할 수 있다.

@Test
void fluxJustTest() {
    Flux.just(1, 2, 3, 4, 5)
            .subscribe(System.out::println);
}

Mono 와는 조금 다르게 가변인자 파라미터를 받는다. 0 부터 N까지의 스트림을 만들수 있기 때문이다. 실행해보면 1 ~ 5까지 숫자가 출력되는 것을 볼수 있다.
여러개의 엘리먼트를 생성할 수 있으니 Iterable를 받아 생성할 수 도 있다.

@Test
void fluxFromIterableTest() {
    Flux.fromIterable(List.of(1, 2, 3, 4, 5))
            .subscribe(System.out::println);

}

또한 java8의 Stream을 받아 생성할 수 도 있다.

@Test
void fluxStreamTest() {
    Flux.fromStream(Stream.of(1, 2, 3, 4, 5))
            .subscribe(System.out::println);

}

위 처럼 작성해도 1부터5까지의 숫자가 출력된다.
어떤 범위를 표현하고 싶다면 range를 사용하면 된다.

@Test
void fluxRangeTest() {
    Flux.range(1, 10)
            .subscribe(System.out::println);
    }
}

첫번쨰 파라미터는 시작값이고 두번째 파라미터는 갯수를 의미한다.
위 내용을 실행해보면 1부터 10까지 숫자가 출력된다.

여기까지는 가장 기본적인 Flux의 생성법을 알아봤다. 대부분 위의 내용을 알고 있을 듯 싶다. 구글링해보면 대부분 위의 내용이 잘 표현되고 있으니 말이다.
우리는 좀 더 많은 내용을 다뤄보기로 하자.

concat, merge

concatmerge은 거의 동일하게 여러 Publisher를 합쳐 내보낸다.
하지만 조금 다른 부분이 있는데 concat은 순서대로 동작하며 이전 Publisher 먼저 구독하고 완료된 후에 다음 Publisher 가 동작한다.

@Test
void fluxConcatTest() throws InterruptedException {
    Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))
            .subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
}

위의 코드는 1 ~ 4까지 1초간격으로 출력되고 5 ~ 6까지는 이전 Publisher(1~4까지)가 완료된후에 한꺼번에 출력된다.

merge의 경우에는 순서가 없으며 다른 Publisher의 영향도 없다.

@Test
void fluxMergeTest() throws InterruptedException{
    Flux.merge(Flux.just(1, 2, 3, 4).delayElements(Duration.ofSeconds(1)), Flux.just(5, 6, 7, 8))
            .subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
}

위는 첫번째 Publisher와 상관없이 두번째 Publisher 동시에 구독한다. 첫번째 Publisher에게 delay를 주어서 5~8까지 먼저 출력 후 1~4가 다음에 출력된다.
만약 순서가 중요하다면 concat을 순서가 상관없다면 merge를 이용하면 된다.

concat은 순서대로 구독을한다.

만약 Publisher의 순서가 상관없고 마지막 시퀀스만 순서대로 방출하려면 어떻게 해야 될까?
Publisher가 빨라도 상관없다면 mergeSequential를 사용하면 된다. Publisher는 동시에 구독하지만 마지막 시퀀스는 구독 순서대로 방출한다.

동시라고 표현했지만 구독은 순서대로 구독한다. 그 차이가 거의 동시이기에 그렇게 표현했다.

mergeSequential

concatmerge를 조금 합쳐놓은듯한 느낌이다. Publisher는 순서와 상관없이 모두 동시에 구독하며 마지막 시퀀스는 구독 순서대로 방출한다.

위의 코드를 실행하면 어떻게 출력될까? 한번 merge경우에도 어떻게 출력될지 상상해보도록 하자.
Publisher는 순서와 상관없이 동시에 구독하지만 마지막 시퀀스는 순서를 보장한다.
그래서 5~8까지 doOnNext의 로그가 출력되고 1~4까지의 doOnNext와 마지막 시퀀스가 방출된다.

second : 5
second : 6
second : 7
second : 8
first : 1
1
first : 2
2
first : 3
3
first : 4
4
5
6
7
8

위는 mergeSequential의 결과이다. 마지막 시퀀스는 순서가 보장되었다.

mergeOrdered

merge의 종류도 많다. 사실 이건 언제 사용할지도 감이 잘 오지 않는다. 뭐 하다보면 생길지도 모르겠지만..
엘리먼트 순서와 관련이 있다. 일단 기본은 오름차순인데 내림차순으로 변경도 할수 있다.
Publisher들의 엘리먼트들 순서대로 가장 작은 엘리먼트를 비교하여 방출한다. 말이 어렵구만. 한번 예제를 보자.

예를들어 위의 두개의 Publisher 실행한다고 해보자.
9-2 를 비교하여 작은 엘리먼트를 방출한다.
2가 방출되었으니 같은 Publisher에 있는 2 다음의 10과 9를 비교하여 작은 엘리먼트인 9를 방출한다.
다음 9가 방출되었으니 6과 10을 비교하여 작은 엘리먼트인 6이 방출된다.
6이 방출되었으니 같은 Publisher에 있는 11과 10을 비교하여 작은 엘리먼트인 10을 방출한다.
이런식으로 모든 엘리먼트를 방출한다.

2
9
6
10
1
4
11
3

물론 필자는 숫자로 했지만 Comparable를 구현한 클래스라면 뭐든 가능하다.

combineLatest

이거 역시 말로 설명하기 힘들다. 메서드명과 같이 마지막으로 결합되는 엘리먼트를 방출하는 역할을 하는 메서드이다.
먼저 예제를 보자.

@Test
void fluxCombineLatestTest() throws InterruptedException {
    Flux.combineLatest(Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80)), Flux.just(4, 5, 6).delayElements(Duration.ofMillis(100)), (a, b) -> a + ", " + b)
            .subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
}

위에서 말했다시피 마지막으로 결합되는 즉시 방출한다.

예를들어 첫번째 Publisher는 80밀리세컨드로 delay를 시켰고 두번째 Publisher는 100밀리세컨드로 delay를 주었다.

1이 80ms 뒤에 방출되었지만 4가 20ms동안 더 대기를 해야되므로 100ms 뒤에 1, 4 가 출력된다.
그 후 2는 60ms 뒤에 방출되므로 2, 4 가 출력된다.
다음은 40ms 뒤에 두번째 Publisher에 있는 5가 방출 되어 2, 5가 출력된다.
이런식으로 마지막으로 결합되는 즉시 방출한다. 시원찮지만 대충 그림을 보면 아래와 같다.

     80ms 100ms   160ms  200ms   240ms
======== 1 ======== 2 ====   ==== 3 
========== 4 ======   ==== 5 ====   ====== 6

========== 1 ====== 2 ==== 2 ==== 3 ====== 3
           4        4      5      5        6

generate

generate는 신호를 차례대로 생성하는 메서드이다. 일단 코드를 보자.

@Test
void fluxGenerateTest() {
    Flux.generate(() -> 1, (number, sink) -> {
        if (number == 10) {
            sink.complete();
        } else {
            sink.next(number);
        }
        return number + 1;
    }).subscribe(System.out::println);
}

위의 코드는 1 부터 9까지 생성하는 Flux 이다. 첫번째 파라미터는 최초값이며 두번째 파라미터는 BiFunction을 받고 있으며 현재 상태를 사용하여 신호를 보낸후 다음 상태의 값을 반환한다. 내부 상태값을 갖고 있으니 상태에 맞게 next, complete, error 신호를 발생시키면 된다.
만약 무한으로 신호를 보내고 싶다면 complete, error 신호를 보내지 않으면 된다.

generate vs create

Flux 의 create는 설명하지 않았지만 Mono의 create와 사용법은 동일하기에 생략했다. (멀티스레드를 제외한 기능은 동일하다)
generate와 create의 차이점을 살펴보자.

  1. generate는 내부에 상태값이 있으며 create는 상태값이 없다.
  2. generate는 next를 한번만 호출해야되는 반면 create는 여러번 호출해도 된다.
  3. generate 동기식프로그램만 가능하지만 create는 비동기 멀티스레드 프로그래밍이 가능하다.

두 메서드를 비교를 했지만 쓰임새가 전혀 다르다. 오히려 createpush를 비교하는게 좀 더 나아 보인다.

좀 더 자세한 내용은 문서를 보는 것을 추천한다. 그래야 이해가 더 빠를 것 같다.
또한 실제로 직접 사용해보고 예제 코드들을 살펴봐야 좀 더 도움이 될 것이다.

오늘 이렇게 Flux 생성에 대해서 알아봤다. 좀 더 많은 사용법이 있지만 이전글 Mono 생성하기에서 겹치는 부분도 있고 필자가 써보지 않았던 메서드들을 작성하지 않았다.

다음시간에는 Flux 오퍼레이터에 대해 알아보도록 하자.

Spring Boot 2.4 Config file processing

Spring boot 2.4-m2 버전이 저번달(8/14)에 릴리즈 되었다. 그러기엔 이글이 좀 늦은 듯 하다. 한달이나 지나서야..원..
Spring Boot 의 m2 버전에서 여러 추가 기능이 있지만 제일 큰 변화는 아마도 Config file processing 처리하는 방법이 아닐까 싶다.
나중에 정식 릴리즈가 되면 다른 특징들도 알아보고 오늘은 Config file processing 관한 변화의 특징들을 알아보도록 하자.

spring boot 2.3 까지 Kubernetes 지원을 열심히 하고 있다. 하지만 그 중에 할 수 없었던 부분이 volume mounted configuration라는 기능이다. (사실 필자도 잘 모름) Kubernetes 에서 인기있는 기능이라고 하니.. 하지만 이 기능을 지원하기 위해서는 ConfigFileApplicationListener 클래스를 수정해야만 했다고 한다.

개발을 하다보면 때때로 변경하기 어려운 코드들이 존재한다. 사실 이건 어느 누구에게나 닥칠 수 있는 상황이라고 생각한다. 그 중 ConfigFileApplicationListener는 변경하기 어려운 코드중 하나라고 판단 되었다고 한다. 사실 ConfigFileApplicationListener 클래스는 코드를 잘못 작성하거나 혹은 테스트 코드 누락이 된 것이 아니라 기능을 추가 하면서 그 클래스가 할 수 있는 일을 다했다고 판단 되었다.

ConfigFileApplicationListener 문제

spring.profiles: local
foo.username: userb
foo.password: userb
---
spring.profiles: '!dev'
spring.profiles.include: local
foo.username: userc

위와 같이 설정후에 --spring.profiles.active=prod 로 실행하는 경우 어떤 값들이 나와야 할까? 조금 애매한 부분이 많다. prod으로 프로파일을 설정했으니 !dev 프로파일이 동작할텐데 foo.username 과 foo.password 값은 어떤 값이 될까? include가 local 로 되어 있으니 local 프로파일을 오버라이딩 할까? 그럼 foo.password 의 값은 뭐가 될까? 사실 돌려보면 다음과 같다.

userc
null

근데 이게 맞는 걸까? 필자도 잘 모르겠다. 사실 이건 프로파일을처리 할 때 활성화 되지 않았다. (prod 와 정확한 매칭되는게 없어) 그래서 local 이라는 프로파일을 포함시키지 못한다. 이러한 처리는 자주 논란이 되었다고 한다. 문제가 제기 되면 그 문제를 해결 할때마다 다른 문제들이 생기곤 한다. 그래서 Spring boot 쪽에선 이와 같은 문제를 해결하고자 다시 재정의 했다.

  • 문서의 정렬
  • 프로파일에 더이상 추가 프로파일 작성 불가

Spring boot 에선 위와 같이 새롭게 정의 하였다.

문서 정렬

문서 정렬은 아주 간단하다. 하위 문서가 상위 문서를 오버라이딩한다.

spring.profiles: test
foo.username: wonwoo
foo.password: wonwoo123
---
spring.profiles: dev
foo.username: fidel
foo.password: fidel123
---
foo.username: overridden-wonwoo

위와 같이 문서가 작성 되고 --spring.profiles.active=test로 실행을 한다면 어떤 결과가 나올까? spring2.3 이전버전과 spring 2.4 버전에선 다른 결과가 출력 된다.

2.3

wonwoo
wonwoo123

2.4

overridden-wonwoo
wonwoo123

위와 같이 하위 문서가 상위 문서를 재정의 하도록 변경 되었다. 추후에 2.4로 마이그레이션 할 때 주의할 점이다.

멀티 프로퍼티

yaml을 파일과 비슷한 기능이다. yaml 파일은 ---구분자를 통해 한 파일에 여러 프로파일을 작성 할 수 있었다. 이제는 properties 파일도 가능하게 지원한다. (필자는 프로퍼티파일을 더 선호에서 좋은 기능같다.)

spring.profiles=test
foo.username=wonwoo
foo.password=wonwoo123
#---
spring.profiles=dev
foo.username=fidel
foo.password=fidel123

프로퍼티파일은 #--- 구분자를 통해 구분할 수 있다. 이제는 여러 파일로 나누지 않고도 한 파일에 환경별로 분리 할 수 있어 좋은 것 같다. 하지만 아직 IDEA 에선 빨간줄이..

spring.config.activate.on-profile

spring.profiles에서 spring.config.activate.on-profile로 변경 되었다.
아직은 spring.profiles 을 사용할 수는 있지만 deprecated 되어 있으니 참고하면 되겠다.

spring.config.activate.on-profile=test
foo.username=wonwoo
foo.password=wonwoo123
#---
spring.config.activate.on-profile=dev
foo.username=fidel
foo.password=fidel123

위와 같이 작성하면 기존의 spring.profiles 과 동일한 효과를 얻을 수 있다.

하지만 여기서 조금 주의할 점이 있다.

만약 spring.config.activate.on-profile 설정과 spring.profiles 설정을 함께 사용하면 안된다.

spring.config.activate.on-profile=test
spring.profiles=dev
foo.username=wonwoo
foo.password=wonwoo123

위와 같이 사용할 경우에는 에러가 발생한다. 하지만 아래와 같은 경우는 상관없다.

spring.config.activate.on-profile=test
foo.username=wonwoo
foo.password=wonwoo123
#---
spring.profiles=dev
foo.username=fidel
foo.password=fidel123

각 환경별로는 다르게 사용해도 에러가 나지 않는다. 하지만 일관성있게 사용하는 것이 좋아보인다.

프로파일에 더이상 추가 프로파일 작성 불가

이제는 정의된 프로파일에 더 이상 추가 프로파일을 작성 할 수 없다. 다음 설정 파일을 보자.

spring.config.activate.on-profile=test
spring.profiles.include=dev
foo.username=wonwoo
foo.password=wonwoo123
#---
spring.config.activate.on-profile=dev
foo.username=fidel
foo.password=fidel123

이전에는 위와 같이 특정 프로파일에 추가적으로 프로파일을 포함시킬 수 있었다. 하지만 이제는 더이상 추가적으로 프로파일을 포함 할 수 없다. 위와 같이 작성한다면 에러가 발생한다.

spring.profiles 을 사용해도 마찬가지다.
좀 더 빠르게 이해가 되며 쉽게 관리 할 수 있다는 장점이 있을 것 같다. 허나 여러 종류의 프로파일들을 함께 사용하고 싶을 때는 어떻게 할까?
Spring boot 의 새기능인 Profiles Groups 기능을 사용하면 된다.

Profiles Groups

이제는 Profiles을 group 을 지정할 수 있다. 여러 프로파일들을 한꺼번에 그룹지어 하나의 프로파일로 만들수 있다.

spring.config.activate.on-profile=test
foo.username=wonwoo
foo.password=wonwoo123
#---
spring.config.activate.on-profile=dev
foo.username=fidel
foo.password=fidel123
#---
spring.profiles.group.testdev=test,dev

위와 같이 test, dev 을 그룹지어 하나의 프로파일을 만들 수 있다. include 기능보다 좀 더 쉽게 추론할 수 있을 듯 싶다.
또 한 @Configuration 사용해 @Profile 설정을 할 떄 유용하다.

@Profile("devDb")
@Configuration
class DevDataBase {

}

@Profile("devMessage")
@Configuration
class DevMessage {

}

spring.profiles.group.dev=devDb,devMessage

이와 같이 dev 프로파일에 message 설정과 db 설정을 한꺼번에 넣을 수 있다. 테스트할 때도 좋은 기능같고 다른 설정으로 배포할 경우에도 좋은 기능 같다.

Import Properties

이전 Spring boot는 application.properties, application.yml 이외에 추가적인 설정파일을 가져오는데 spring.config.additional-location 사용하여 가져올 수 있었으나 application.properties 로 작성시에는 가져오지 못하고 환경변수나 인자로 넘거야 했었다. 또 한 파일 유형이 너무 제한적이다.

어쩄든 이 불편함을 수정하기 위해 새로운 설정으로 추가적인 파일들을 추가할 수 있다.

spring.config.import=classpath:/test/test.properties
spring.config.import=file:/test/test.properties
spring.config.import=configtree:/test/test.properties

위와 같이 spring.config.import를 이용해서 추가적인 파일들을 가져올 수 있다. 클래스패스, 파일, 혹은 configtree(Kubernetes 에서 사용하는 듯) 를 prefix를 사용해 다양한 유형들을 설정할 수 있다.

이 뿐만이 아니라 다른 파일 유형들도 적용할 수 있다. 예를 들어 추후에는 archaius, vault, zookeeper 와 같이 외부 설정을 불러 올 수 있도록 확장가능하다.

archaius://
vault://
zookeeper://

대략 위와 같은 모양이 되겠다.

만약 import할 대상의 파일이 없을 경우 에러가 발생한다. 그래서 파일이 없더라도 에러가 발생하지 않고 무시할 수 있는 기능이 있다. 그러기 위해서는 다음과 같이 optional: 을 prefix로 지정하면 된다.

spring.config.import=optional:classpath:/test/test.properties
spring.config.import=optional:file:/test/test.properties
spring.config.import=optional:configtree:/test/test.properties

위와 같이 optional: 사용하면 파일이 없더라도 에러가 나지 않는다. 뿐만아니라 기존에 사용하고 있던 spring.config.additional-location 설정도 동일하게 optional을 사용할 수 있다.

만약 모든 항목에 optional 처럼 기능을 작동하고 싶다면 spring.config.on-location-not-found=ignore, 혹은 SpringApplication.setDefaultProperties(…​) 메서드를 사용해서 속성을 넣을 수 있다.

spring.config.import=classpath:/test/test.properties

ignore 설정 후 위와 같이 optional:를 제거해도 파일이 없다는 에러는 발생하지 않는다.

레거시 사용

기존의 레거시(ConfigFileApplicationListener)를 사용해서 기존과 동일한 프로파일 설정들을 사용할 수 있다. 위의 내용이 아직 불편하다면 굳이 사용할 필요 없이 기존 설정과 동일하게 사용할 수 있다.

spring.config.use-legacy-processing=true

spring.config.use-legacy-processing 를 사용하면 기존 프로파일 설정(ConfigFileApplicationListener) 그대로 사용할 수 있다. 하지만 천천히 조금은 익숙해져야 되지 않을까 싶다.

오늘은 이렇게 spring boot 2.4의 새로운 config file processing 대해서 알아봤다. 만약에 추후에 2.4로 마이그레이션 할 때 조금 눈여겨 봐야할 특징들이다. 아직은 레거시도 지원하기에 급하지 않겠지만 Spring boot 특성상 바로 다음 메이저 업그레이드할 때 삭제 될 것 같다. Spring boot 는 대부분 deprecated 시킨 버전 바로 다음 버전에 대부분 삭제했다. 아마도 2.5 에는 삭제 되지 않을까 생각된다.

Reactor 써보기 (2)

오늘은 저번시간에 이어서 Reactor 써보기 (2)를 준비했다.

저번시간에는 Mono를 만드는 것을 배웠다. 필자도 아직까지는 써보지 않은 것들도 많이 존재한다. 대부분 쓰는 것들만 자주 쓰기에..

오늘은 저번시간에 이어서 Mono 오퍼레이터를 알아보도록 하자. 물론 다 알아보진 못할 거 같고(워낙 많아서..) 자주 사용될만한 것들 위주로 살펴 볼 예정이니 여기에 없는 것들은 문서를 찾아보면 되겠다.

map, flatMap, flatMapMany, filter

사실 이것들은 java8 에 나온 Stream API의 map 과 flatMap 과 사용법은 동일하다. 흔히 함수형 프로그래밍에서 말하는 functor, monad 라 하는 것들 처럼 의미하는 바도 동일하다. 이 부분을 더 알고 싶다면 함수형 프로그래밍을 공부하면 되겠다. 사실 필자도 잘..

이 오퍼레이션은 대부분 다 알고 있을거라 생각되기 때문에 간단한 사용법만 보고 넘어가자!

@Test
public void mapTest() {
    Mono.just("hello")
            .map(it -> it + " world")
            .subscribe(System.out::println);
}
@Test
public void flatMapTest() {
    Mono.just("hello")
            .flatMap(it -> Mono.just(it + " world"))
            .subscribe(System.out::println);
}
@Test
void filterTest() {
    Mono.just("filter")
            .filter(it -> it.equals("filter"))
            .subscribe(System.out::println);
}

사실 flatMapMany는 java의 Stream API 에는 존재하지는 않지만 flatMap과 동일하다.

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper)

Type 이 Mono 냐 Publisher냐의 차이이며 리턴타입도 Flux로 리턴한다.
flatMapMany 일경우에는 Mono, Flux 혹은 다른 reactive streams api 타입도 가능하다.

@Test
public void flatMapManyTest() {
    Mono.just("hello")
            .flatMapMany(it -> Flux.just("$it world", "$it world!!"))
            .subscribe(System.out::println);
}

java8의 Stream API를 사용했다면 그리 어려운 내용은 아닌 듯 싶다.
사실 이것들은 굉장히 자주 사용하는 메서드 중 하나다. 이것만 알아도? 대부분 무난하게 개발도 가능할 듯 싶다.
하지만 reactor 에서는 좀 더 우아한 방법의 메서드들이 존재하니 살펴보자.

zipWhen

위에서 간단하게 맛보기로 아마? 기존의 알던 오퍼레이터를 알아봤지만 이제부터는 reactor 만의 메서드를 알아보도록 하자.

이것은 필자가 실제로 map, flatMap 만큼 자주 사용하는 메서드중 하나다. flatMap가 사용법은 동일하지만 onNext 방출이 다르다. zip 이란 메서드에 걸맞게 묶어서 결과를 받을 수 있다.

@Test
public void zipWhenTest() {
    Mono.just("hello")
            .zipWhen(it -> Mono.just(it + " world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

필자는 타입을 보여주기 위함이지 생략해도 문제 없다.
출력은 다음과 같을 것이다.

hello, hello world

쉽게 생각해서 처음 Mono를 onNext 신호를 받을 수 있다. 만약 처음 만든 Mono 를 onNext 에서 사용해야 된다면 zipWhen 메서드를 사용하면 된다.

zipWith

zipWith 메서드는 사실 저번에 배운 Mono.zip 과 동작 방식은 동일하다. 이 역시 모노들이 각각 동작한다. 일단 예제를 살펴보자.

@Test
public void zipWithTest() {
    Mono.just("hello")
            .zipWith(Mono.just("world"))
            .subscribe((Tuple2<String, String> it) -> System.out.println(it.getT1() + ", " + it.getT2()));
}

zip 과 동일하게 onNext 신호는 동일하다. 실제로 내부적으로는 Mono.zip 을 사용한다. 사실 필자는 이 메서드는 잘 사용하지 않는다. 그냥 Mono.zip() 을 더욱 선호하는 편이긴 하다. 만약 aggregating 하는 Mono 들이 여러개 늘어 난다면 사실 코드 보기가 좀 지저분해 보일 수도 있을 거 같아 그냥 zip 메서드를 사용한다. 사용하고 싶다면 위 예제처럼 두개의 모노만을 묶는다면 사용해도 괜찮을 듯 싶다.

만약 Mono 가 여러개라면 위와 같은 코드가 될 것 같다. 만약 더 많은 모노가 있다면 onNext 타입이 정말 복잡할 듯 싶다.

concatWith

이것은 메서드명 그대로 연결하는 메서드이다. 연결후 Flux 타입으로 리턴한다. 파라미터는 꼭 Mono타입이 아니더라도 가능하다.

public final Flux<T> concatWith(Publisher<? extends T> other)
@Test
public void concatWithTest() {
    Mono.just("hello")
            .concatWith(Mono.just("world"))
            .subscribe(System.out::println);
}

출력은 다음과 같다

hello
world

딱히 어려운 부분은 없다.

이 메서드는 순차적으로 진행되기 때문에 만약 병렬로 진행해도 된다면 아래의 mergeWith를 사용하면 된다.

mergeWith

concatWith 와 사용법은 동일하지만 이 메서드는 순차적이지 않다. concatWith 달리 각각 실행 후 합쳐서 onNext를 방출한다. 순차적으로 실행시킬 필요가 없을 경우 이 메서드를 사용하면 된다. 그렇기 때문에 순서는 보장하지 않는다. 이 역시 합치는 기능이므로 Flux 를 리턴한다.

public final Flux<T> mergeWith(Publisher<? extends T> other)

이 역시 파라미터 타입은 꼭 Mono 일 필요는 없다.

@Test
public void mergeWithTest() {
    Mono.just("hello")
            .mergeWith(Mono.just("world"))
            .subscribe(System.out::println);
}

필자의 경우 종종 mergeWith 는 사용했다. 아쉽게도 concatWith는 사용한적이 없다. 언젠가 필요할 때가 있겠지..

then

then 오퍼레이션은 몇가지 존재하는데 대부분 비슷하다. 조금씩 다르니 참고하면 되겠다.
아무 파라미터가 없는 경우 then()은 리턴 값은 Void 타입이다. 즉, onNext는 방출하지 않는 다는 뜻과 같다. onNext만 방출하지 않지 에러와 완료신호는 방출한다.

@Test
fun `then test`() {
    Mono.just("then").then()
        .subscribe({ println(it) }, {println("error")}) { println("complete") }
}

출력 결과는 보면 complete 이외엔 아무것도 출력 되지 않는다. 만약 error 가 방출 될경우에는 error 메시지가 출력 된다.
이것은 종종 운영에서도 사용했다. 예를들어 message 를 publisher 한다거나 혹은 consumer 할 때 주로 사용했다. 혹은 spring 의 EventListener를 사용할 때도 사용했다. 종종 쓸일이 있으니 알아두면 좋을 것 같다.

then 메서드에 파라미터를 받는 메스드가 존재한다. 메서드 형태는 다음과 같다.

public final <V> Mono<V> then(Mono<V> other)

해당 모노를 완료시킨후에 다른 모노를 연결하는 오퍼레이터이다. 이는 순차적이다. 이 메서드는 onNext를 방출 할 수 있다.

@Test
public void thenTest() {
    Mono.just("then1").then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}


위의 결과는 then2complete 가 출력된다. 만약 기존 모노가 empty 일지 라도 다음 모노는 실행 시킨다.

@Test
public void thenTest() {
    Mono.empty().then(Mono.just("then2"))
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위처럼 작성해도 동일하게 동작한다. 이 메서드 역시 운영에서 종종사용했다.
더 많은 메서드들이 있긴한데 간단하게 설명만 하고 넘어가도록 하자.

thenEmpty 는 Void 타입을 받는 Publisher이다. 즉, 실행만 시키고 onNext는 방출하지 않는다는 뜻이다. 내부적으로는 then(Mono other) 메서드를 호출한다.
thenReturn은 Publisher 타입이 아닌 T 타입을 받는 메서드이다. 내부적으로는 then(Mono other)를 호출한다.
thenMany는 Publisher 타입을 받는 메서드이다. then(Mono other)랑 비슷해보이지만 리턴 타입은 Flux를 리턴한다.

handle

handle 메서드는 좀 더 우아하게 onNext, 에러 혹은 완료 신호를 줄 수 있다. 글 보다 코드를 보는게 이해가 빠를 듯 싶다.

@Test
public void handleTest() {

    Mono.just("bar")
            .handle((it, sink) -> {
                if (it.equals("foo")) {
                    sink.next(it);
                } else if (it.equals("bar")) {
                    sink.error(new NullPointerException());
                } else {
                    sink.complete();
                }
            }).subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

}

위와 같이 특정 조건에 의해 next, error, complete 신호를 보낼 수 있다. 꼭 해당 타입과 sink의 제네릭 타입이 같지 않아도 된다.
foo, bar 혹은 그외의 것들을 넣어 한번씩 테스트해보는게 이해가 더욱 빠를 듯 싶다. 이 역시 운영에서 아주 많이 사용한다.

cast, ofType

cast 혹은 ofType을 써서 형 변환을 할 수 있다. cast 와 ofType 두 메서드 모두 타입 캐스팅을 할 수 있지만 조금 다르다. cast는 타입캐스팅 할 때 형이 맞지 않다면 에러가 발생하지만 ofType 의 경우에는 에러가 나지 않고 무시된다. ofType은 내부적으로 filter를 사용해 무시한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(Number.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

둘다 출력 되는건 동일하다. 형변환이 잘 되었을 땐 동일하게 동작하지만 형변환이 안될 경우에는 조금 다르게 동작한다.

@Test
public void castTest() {
    Mono.just(1)
            .cast(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));

    Mono.just(1)
            .ofType(String.class)
            .subscribe(System.out::println, (e) -> System.out.println("error"), () -> System.out.println("complete"));
}

위와 같이 형 변환이 안됐을 시에는 cast 경우 에러가 출력 되며 ofType 일 경우에는 complete 만 출력 된다.

filterWhen

filter의 비동기 버전이다. filter는 다들 알다시피 걸러내는 작업을 하는 메서드이다. filter와 동일한 동작을 하니 예제만 보고 넘어가자.

@Test
public void filterWhenTest() {
    Mono.just("filterWhen")
            .filterWhen((it) -> Mono.just(true))
            .subscribe(System.out::println);
}

filter 와 동일한 동작은 하니 그리 어렵진 않다. 사실 이건 운영에서 써본적은 없다.

repeat

메서드명 그대로 반복하여 구독한다. repeat()는 여러 메서드가 존재한다. 예를들어 repeat() 만 사용할 경우에는 무한정으로 구독하는 시스템이다. 실제로 필요한지는 잘모르겠다. 특정 조건에 만족하면 반복하는 메서드도 존재한다.

public final Flux<T> repeat(BooleanSupplier predicate)

또 한 원하는 만큼 반복도 가능하다.

@Test
public void repeatTest() {
    Mono.just("repeat")
            .repeat(2)
            .subscribe(System.out::println);
}

위의 소스는 2번을 더 반복한다는 뜻이다. 실제로 3번의 구독이 이루어진다. 만약 0으로 지정을 했다면 구독은 한번만 이루어진다.
이외에도 사실 repeat** 메서드가 많긴하지만 필자는 사용한적이 없다. 만약 사용한다면 테스트할때 사용하지 않나 싶다. 운영에서 사용할일은 극히 드물 것 같다.

doOnSubscribe, doOnRequest, doOnNext, doOnCancel, doOnTerminate, doAfterTerminate, doOnSuccess, doOnError, doFinally, doOnEach

doOn** 시리즈는 trigger에 해당한다. 로직상으로는 많이 사용하지 않겠지만 로그를 찍을 때 사용하면 적절할 듯 싶다. 필자도 대부분 로그를 찍을 때 사용했다. 물론 다른 경우에도 사용하겠지만 아직까지는 그런일은 없었다.
일단 먼저 사용법 부터 보자.

@Test
void doOnSeries() {
    Mono.just("doOnSeries")
            .doOnSubscribe(it -> System.out.println("doOnSubscribe"))
            .doOnRequest(it -> System.out.println("doOnRequest"))
            .doOnNext(it -> System.out.println("doOnNext"))
            .doOnEach(it -> System.out.println("doOnEach"))
            .doOnCancel(() -> System.out.println("doOnCancel"))
            .doAfterTerminate(() -> System.out.println("doAfterTerminate"))
            .doOnTerminate(() -> System.out.println("doOnTerminate"))
            .doOnSuccess(it -> System.out.println("doOnSuccess"))
            .doOnError(it -> System.out.println("doOnError"))
            .doFinally(it -> System.out.println("doFinally"))
            .subscribe();
}

사용법은 위와 같이 간단하다. 각 메서드 별로 알아보자.
doOnSubscribe 메서드는 구독이 시작 될 트리거 된다. 위 메서드 중 가장 먼저 실행 된다. 파라미터로는 Subscription 가 넘어 온다.
doOnRequest 메서드는 요청 받을 떄 트리거 된다. 기본적으로 파라미터는 Long의 Long.MAX_VALUE 값이 넘어온다.
doOnNext는 성공적으로 데이터가 방출 될 때 트리거 된다. 파라미터로는 해당 T 타입이 넘어 온다.
doOnCancel 메서드는 구독이 취소 됐을 때 넘어오는 이벤트다 파라미터로는 아무것도 넘어오지 않는다.
doOnTerminate 메서드는 완료 혹은 에러가 났을 떄 트리거 된다. 이벤트시기는 완료, 에러 이벤트 전에 트리거 된다. 이 역시 파라미터로는 아무것도 넘어오지 않는다.
doAfterTerminate 메서드는 doOnTerminate 와 동일하게 트리거 되지만 시점이 조금 다르다. 완료, 에러 이벤트 후에 트리거 된다. 역시 파라미터는 없다.
doOnSuccess 완료 되면 트리거 된다. 파라미터는 해당 T 타입이 넘어 온다.
doOnError 에러가 났을 때 트리거 된다. 파라미터로는 Throwable 가 넘어 온다.
doFinally 에러, 취소, 완료 될 때 트리거 된다. 모노가 종료되면 무슨 일이든 트리거 된다. 파라미터로는 SignalType이 넘어와 종료 이벤트 타입을 받을 수 있다. 자바의 try catch finally의 finally 과 동일한 느낌이다.
doOnEach 메서드는 데이터를 방출 할 때, 혹은 완료 에러가 발생했을 때의 고급 이벤트다. 파라미터로 Signal 이 넘어온다. 이 신호에는 context 도 포함되어 있다. 주로 모니터링으로 사용한다고 한다. 위의 코드에서는 doOnEach 가 두번 출력된다. 데이터를 방출 할때 한번, 완료 되었을 때 한번.

retry

메서드명 그대로 에러가 발생하였을 때 재시도 하는 메서드이다. 이 또한 많은 메서드가 존재한다. 기본 retry() 메서드는 무한정(Long.MAX_VALUE 사실 무한은 아닌 듯)으로 재시도를 한다.
retry(long) 은 retry 횟수를 지정할 수 있다. 만약 2로 지정하였을 경우는 2번을 재시도 한다.
retry(Predicate) 은 특정 조건에 만족하면 재시도를 시도 한다.

@Test
void retryTest() {
    Mono.just("retry")
            .handle((it, sink) -> {
                System.out.println(it);
                sink.error(new NullPointerException());
            }).retry(2)
            .subscribe();
}


위의 코드는 재시도를 2번 시도하는 코드이다. 출력은 retry 가 3번 출력된다. 한번은 처음 구독했을 때와 두번은 retry를 시도했을 때 총 3번 출력 된다.

retryBackoff

retryBackoff 는 기본 retry 보다 조금 우아하게 재시도를 할 수 있다, backoff 설정를 할 수 있어 특정 시간만큼 Duration을 줄 수 있다. 이 역시 메서드는 많다.

public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor)

하지만 위의 메서드를 설명하면 대부분의 메서드는 설명된다.

위의 retry 는 사실 큰 도움은 되지 않을 수 있다. 대기시간도 없어 실패하면 바로 다시 재시도를 하기 때문이다. 만약 동시에 모든 호출이 실패하면 바로 또 모든 호출을 동시에 날리기 때문이다. 어쩌면 운영에서는 retryBackoff 메서드를 쓰는게 좀 더 많은 도움이 되지 않을까 싶다.

retryBackoff의 첫번째 파라미터인 numRetries는 retry와 동일하게 재시도 횟수를 지정하면 된다.
두 번째 파라미터인 firstBackoff는 첫 번째 재시도 할 때의 대기 시간이다.
세 번째 파라미터 maxBackoff는 최대로 해당 시간만큼 대기 할 수 있다는 뜻 이다. 그러기 때문에 firstBackoff 보다는 작으면 안된다.
jitterFactor는 위에서 말했던 것 처럼 동시에 모든 호출이 실패하면 또 다시 모든 호출은 동시에 재시도 하기 때문에 이를 분산시키기 위한 값이다. 최소값은 0.0 이며 최대값은 1.0 이다. 지정 하지 않을 경우에는 0.5가 기본값이다.

궁금해할진 모르겠지만 해당 알고리즘은 다음과 같다. i는 재시도 한 값이다.

nextBackoff = firstbackoff * 2^i
jitterOffset = (nextBackoff * 100 * jitterFactor) / 100
lowBound = max(firstbackoff - nextBackoff, -jitterOffset)
highBound = min(maxBackoff - nextBackoff, jitterOffset)
jitter = random (lowBound, highBound)
backoff = nextBackoff + jitter

사용법은 대략 다음과 같다.

@Test
void retryBackoffTest() throws InterruptedException {
    Mono.just("retry")
            .handle((it, sink) -> sink.error(new NullPointerException()))
            .retryBackoff(2, Duration.ofSeconds(3), Duration.ofSeconds(10), 0.5)
            .subscribe();
    sleep()
}


retryBackoff 는 다른 쓰레드로 돌리기 때문에 보여주기 위해 sleep를 줬다.

delayElement

사실 Mono의 이 메서드는 테스트에서 사용했지 운영에선 사용해본적이 없다. 메서드명 그대로 delay 를 줄 수 있는 메서드이다.
사용법 부터 살펴보자.

@Test
void delayElement() throws InterruptedException {
    Mono.just("delayElement")
            .delayElement(Duration.ofSeconds(3))
            .doOnNext(System.out::println)
            .subscribe();
    sleep
}

이 역시 다른 스레드로 돌아가기 때문에 sleep 을 줬다. delayElement은 해당 시간만큼 onNext 방출을 지연시킨다. 실행해보면 3초 후에 delayElement 가 출력된다.

onErrorResume, onErrorReturn, onErrorMap, onErrorContinue

오늘 마지막으로 살펴볼 아이는 onError** 시리즈이다. 이 역시 여러 종류의 메서드들이 있지만 기본적인 내용은 살펴보고 나머지는 해당 메서드를 살펴보는 게 좋다. 그리 어렵지 않은 내용이니 말이다.

onErrorResume 메서드는 에러가 발생하였을 경우 다른 Mono로 대체 할 수 있다.

@Test
void onErrorResumeTest() {
    Mono.just("onErrorResume")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorResume(it -> Mono.just("foo"))
            .subscribe(System.out::println);
}

위의 코드는 foo가 출력 되는 것을 볼 수 있다.

onErrorReturn은 에러가 발생했을 경우 다른 값으로 대체 할 수 있다.

@Test
void onErrorReturnTest() {
    Mono.just("onErrorReturn")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorReturn("foo")
            .subscribe(System.out::println);
}

이 역시 foo가 출력 된다.

onErrorMap 다른 에러로 변환할 수 있다.

@Test
void onErrorMapTest() {
    Mono.just("onErrorMap")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorMap(it -> new IllegalArgumentException())
            .subscribe(System.out::println, System.out::println);
}

위 처럼 에러를 변환하고 싶다면 사용하면 된다.

onErrorContinue는 연산 과정에서 오류가 발생하였을 때 복구하는 메서드이다. 넘어오는 파라미터는 에러와 해당 object이 넘어온다.

@Test
void onErrorContinueTest() {
    Mono.just("onErrorContinue")
            .handle((__, sink) -> sink.error(new NullPointerException()))
            .onErrorContinue((it, object) -> { })
            .subscribe(System.out::println, System.out::println);
}

위는 오류가 복구되어 기존의 있던 값 onErrorContinue가 출력 된다.

오늘은 여기까지 배워보도록 하자. 우리가 공부한 이외에도 훨씬 더 많은 메서드가 존재한다. 필자도 대부분 사용하는 것만 사용하기에 필자도 모르는 메서드가 많다. 물론 언젠가는 사용할 날이 오겠지.. 그럼 그때 다시 공부하는 걸루..

오늘은 이렇게 Mono 의 오퍼레이터에 대해서 알아봤다. 사실 위의 메서드만 알아도 어느정도는 충분한 개발을 할 수 있다고 믿는다. 물론 다 알면 좋겠지만.. 만약 여기에 나오지 않은 오퍼레이터들은 나중에 Flux 시간에 나올 수 도 있으니 참고하면 되겠다.
다음시간에는 Flux에 대해 알아보도록 하자.