본문 바로가기

카테고리 없음

spring webflux, kotlin coroutine 환경에서 aop 사용해보기

Background) Spring reactive cache

This topic is still relatively new. At the time of writing this article, there was no fluent integration between @Cacheable and reactive frameworks. The primary issue is that there are no non-blocking cache implementations (JSR-107 cache API is blocking). Only Redis is providing a reactive driver.

Despite the issue we mentioned in the previous paragraph, we can still use @Cacheable on our service methods. This will result in caching of our wrapper objects (Mono or Flux) but won’t cache the actual result of our method.

ref) https://www.baeldung.com/spring-webflux-cacheable

 

reactive framework 중에 @Cacheable 을 지원하는 framework 는 아직 없습니다. (non-blocking cache 구현)
redis 만 reacive driver 를 지원하고 있습니다.
webflux reactive 를 지원해주는 개인이 만들어 놓은 라이브러리 들도 있긴 합니다.

ref) https://github.com/Bryksin/redis-reactive-cache
 
아직 webflux 에서 spring-boot-starter-cache 를 통해 간편하게 annotation 방식의 AOP 로 redis cache 를 사용할 수 없습니다.

Problem) suspend 함수에 적용된 aop는 왜 제대로 동작하지 않는가

직접 Aspect 코드를 작성하여 redis 에 캐시를 하는 AOP 를 구현해보고자 했습니다.

Kotlin Coroutine 이 적용된 webflux 에 만약 @Cacheable 을 적용한다면 cache 에 COROUTINE_SUSPEND 가 저장되는 등 의도한 대로 동작하지 않는 것을 볼 수 있습니다.
이는 Kotlin Suspend 함수, Coroutine CSP 에서 이유를 찾아볼 수 있습니다.
 

Why) Kotlin 에서 Coroutine 을 지원하는 방법 CSP 

Kotlin 은 비동기 프로그래밍을 지원을 위해 CSP(Communicating Sequential Process) 기법을 사용합니다.
Suspend 함수는 Suspension point(중단점)을 제공하여 함수가 중단될 수 있도록 하며, 이를 통하여 blocking 로직으로 부터 벗어나서 비동기로 동작할 수 있도록 합니다.
Kotlin 은 이 중단점을 제공하기 위해 Suspend 함수를 컴파일 할 때, 함수 인자의 마지막에 Continuation 이라는 객체를 넣어줍니다.

// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/src/kotlin/coroutines/Continuation.kt
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

Continuation은 coroutine에서 실행되는 code block과 execution context를 실질적으로 소유한 객체로, coroutine의 실행 및 중단과 관련된 핵심적인 역할을 수행합니다. 이런 점에서 coroutine은 연속적으로 이어진 하나 이상의 Continuation의 집합체라고도 할 수 있습니다.

ref) https://velog.io/@koo8624/Kotlin-Coroutine의-동작-원리
 
Kotlin 표준 라이브러리에서 제공하는 가장 대표적인 suspend function 인 delay 는 대략 다음과 같은 형태로 구현되어 있습니다.

suspend fun delay(ms: Long) = suspendCoroutine { continuation ->
    Timer().schedule(object : TimerTask() {
        override fun run() {
            continuation.resume(Unit)
        }
    }, ms)
}

 
대략적으로 Continuation 이 어떤 동작을 하는 지 살펴보자면,
아래와 같이 suspend 함수를 사용하는 코드는 아래와 같이 컴파일 됩니다.

suspend fun findUserById(id: Long): User? {
        // getUserById 는 suspend 함수!!
        val user = userRepository.getUserById(id)
        return user
    }
static Object findUserById$suspendImpl(final UserSuspendService $this, long id, Continuation $completion) {
      Object $continuation;
      label20: {
				
				...

         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return UserSuspendService.findUserById$suspendImpl($this, 0L, (Continuation)this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      Object var10000;
      switch (((<undefinedtype>)$continuation).label) {
         case 0:
            ResultKt.throwOnFailure($result);
            UserRepository var8 = $this.userRepository;
            ((<undefinedtype>)$continuation).label = 1;
            var10000 = var8.getUserById(id, (Continuation)$continuation);
            if (var10000 == COROUTINE_SUSPENDED) {
               return COROUTINE_SUSPENDED;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      User user = (User)var10000;
      return user;
   }

getUserById 는 suspend 함수이기 때문에 중단점이 될 수 있어야 합니다.
아래 switch 문을 보면, getUserById 에서 COROUTINE_SUSPENDED 가 반환되면, 바로 COROUTINE_SUSPENDED 를 리턴합니다. getUserById 함수에서 아직 값이 준비되지 않았다면 , 우선 COROUTINE_SUSPENDED 을 반환하여 중단되도록 한 것입니다. 
현재 수행하는 함수를 COROUTINE_SUSPENDED 를 반환하며 빠져 나온 것이고, 이는 Thread 를 Free 시키는 효과가 있습니다.
함수의 실행이 중단 된 것!
 
이후에 getUserById 으로 부터 값이 준비되면, Continuation 의 resumeWith 를 통해 다시 중단됐던 함수를 실행합니다.
처음 getUserById 을 호출할 때, label 을 1 로 바꾸어 다시 switch 문을 실행했을 때 다음 case 가 수행될 수 있도록 실행 시점을 바꾸었습니다. 따라서, 다시 중단됐던 함수가 실행되면 getUserById 호출(중단점) 이후의 동작을 할 수 있게 됩니다.
 
Continuation 을 활용하여 callback 과 같은 방식으로 동작하기 때문에  

val result = joinPoint.proceed()

을 호출하면 COROUTINE_SUSPENDED 를 반환할 뿐 나머지 로직을 수행할 수 없음과 더불어 본래 result 도 받을 수 없습니다.
 
결론적으로, AOP 를 적용하더라도 Kotlin Continuation 가 제대로 동작하도록 조치를 취해 줘야 합니다.
 

How to do) suspendCoroutineUniterceptedOrReturn, startCoroutineUninterceptedOrReturn

Spring project repo 에서 이와 관련된 issue 를 살펴보니 

ref) https://github.com/spring-projects/spring-framework/issues/22462#issuecomment-940844810
 
Kotlin Coroutine API 를 사용하여 COROUTINE_SUSPENDED 가 아닌 제대로된 반환 값을 받아와서 AOP 로직을 수행할 수 있도록 한 코드를 발견했습니다.
해당 코드는 간단하게 이야기해서, arg 의 마지막 인자로 들어오는 Continuation 을 활용하여, Continuation 체인 안에서 AOP 로 수행하고자 하는 로직을 넣도록 하는 것입니다.

// https://gist.github.com/pjanczyk/5d958821bafd911a5996bc0b66788ea3
val ProceedingJoinPoint.coroutineContinuation: Continuation<Any?>
    get() = this.args.last() as Continuation<Any?>

val ProceedingJoinPoint.coroutineArgs: Array<Any?>
    get() = this.args.sliceArray(0 until this.args.size - 1)

suspend fun ProceedingJoinPoint.proceedCoroutine(
    args: Array<Any?> = this.coroutineArgs
): Any? =
    suspendCoroutineUninterceptedOrReturn { continuation ->
        this.proceed(args + continuation)
    }

fun ProceedingJoinPoint.runCoroutine(
    block: suspend () -> Any?
): Any? =
    block.startCoroutineUninterceptedOrReturn(this.coroutineContinuation)

startCoroutineUninterceptedOrReturn

Java 에서 call 할 수 있는 API 로 Continuation 을 통해 Coroutine 을 조작할여 수행할 수 있도록 합니다.

Starts an unintercepted coroutine without a receiver and with result type T and executes it until its first suspension. Returns the result of the coroutine or throws its exception if it does not suspend or COROUTINE_SUSPENDED if it suspends. In the latter case, the completion continuation is invoked when the coroutine completes with a result or an exception.

전달 받은 Continuation 을 가지고 suspend 함수 block 을 실행 할 수 있도록 합니다.

suspendCoroutineUniterceptedOrReturn

suspendCoroutineUninterceptedOrReturn() 함수는 전달 된 코드 블럭에서 호출 코루틴(Continuation) 정보에 접근할 수 있도록 해줍니다. 또한, 전달 된 코드 블럭에서 COROUTINE_SUSPENDED 라는 미리 정의 된 값을 반환 할 경우에는 코루틴이 처리를 위해 시간이 필요하여 값을 바로 반환하지 않고 처리가 완료되면 continuation 파라미터를 통해 결과를 전달할 것임을 나타내고, 그 이외의 값을 반환할 경우에는 중단 없이 바로 결과 값을 반환한 것을 나타냅니다.

ref) https://myungpyo.medium.com/코루틴-공식-가이드-자세히-읽기-part-2-dive-1-4c468828319

적용하기

@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
annotation class CoroutineCacheable(
    val prefix: String,
    val keyExpression: String,
    val ttlSecond: Int
)

class CoroutineCacheImpl: CoroutineCache<Any> {
    private val store = ConcurrentHashMap<String, Any>()

    override suspend fun get(key: String): Any? {
        delay(1)
        return store[key]
    }

    override suspend fun put(key: String, value: Any) {
        delay(1)
        store[key] = value
    }
}
// for both spring 6.1.0+ and earlier
    @Around("@annotation(com.example.coroutineaop.aop.CoroutineCacheable) && suspendFunctionPointCut()")
    fun coroutineCacheable(joinPoint: ProceedingJoinPoint): Any? {
        val coroutineCacheable = (joinPoint.signature as MethodSignature).method.getAnnotation(CoroutineCacheable::class.java)
        val prefix = coroutineCacheable.prefix
        val expire = coroutineCacheable.ttlSecond

        val keyValue = getKeyValue(joinPoint, coroutineCacheable.keyExpression)
        val cacheKey = "$prefix:$keyValue"

        return joinPoint.runCoroutine {
        	// get 은 suspend 함수
            coroutineCache.get(cacheKey)?.let { cached ->
                return@runCoroutine cached
            }

            joinPoint.proceedCoroutine().let { rtn ->
                if (rtn is Mono<*>) {
                    // for spring 6.1.0 and later
                    rtn.awaitSingleOrNull()?.let { result ->
                        coroutineCache.put(cacheKey, result)
                        return@runCoroutine result
                    }
                } else {
                    rtn?.let {
                        coroutineCache.put(cacheKey, rtn)
                        return@runCoroutine rtn
                    }
                }
            }
        }
    }

위 코드를 통해 이제 COROUTINE_SUSPENDED 가 아닌 제대로된 반환값을 가지고 로직을 수행할 수 있게 되었습니다.

Plus) Spring AOP is not compatible with Kotlin Coroutines #22462

ref) https://github.com/spring-projects/spring-framework/issues/22462
 
위 코드를 보면 spring 6.1.0 이후 부터는 mono 가 반환되어 awaitSingleOrNull 를 통해 본래 반환값을 가져오는 것을 볼 수 있습니다.
(Spring Boot 의 경우 3.2 부터 사용할 수 있는 것으로 보입니다. Spring Boot 3.2 Release Notes)
 
사실 Spring AOP 에서 Kotlin Coroutine 을 support 하는 논의는 오래전부터 있었습니다. 
Spring 6.1.0-M5 에 이를 반영하여, AopUtils#invokeJoinpointUsingReflection 에서 아래 invokeSuspendingFuction 를 사용하여 Suspend 함수일 경우, Mono 를 반환하여 Reactive support 하는 MoehodInterceptor 에서 처리되도록 하였습니다.

따라서, 해당 이후 버전을 사용한다면, 아래와 같이 Kotlin Coroutine API 를 사용하지 않고도 바른 값을 가져올 수 있게 되었습니다.
하지만, Cache 관련 로직이 Coroutine Suspend 를 사용한다면, 위에서 한 방식처럼 Continutation 체인 안에 올바르게 관련 로직을 포함하여야 합니다.

// only for spring 6.1.0 and later
@Around("@annotation(com.example.coroutineaop.aop.Cacheable)")
fun cacheable(joinPoint: ProceedingJoinPoint): Any? {
    val method = (joinPoint.signature as MethodSignature).method
    val cacheable = method.getAnnotation(Cacheable::class.java)
    val prefix = cacheable.prefix
    val expire = cacheable.ttlSecond

    val keyValue = getKeyValue(joinPoint, cacheable.keyExpression)
    val cacheKey = "$prefix:$keyValue"
	
    // get 은 suspend 함수가 아니다
    cache.get(cacheKey)?.let { cached ->
        return cached
    }

    return joinPoint.proceed().let { rtn ->
        if (KotlinDetector.isKotlinReflectPresent() && KotlinDetector.isSuspendingFunction(method)) {
            return@let (rtn as Mono<*>).map { result ->
                cache.put(cacheKey, result)
                result
            }
        }

        rtn?.let {
            cache.put(cacheKey, rtn)
            rtn
        }
    }
}

참고

Add Coroutines support for @Cacheable

Spring v6.1.0-RC2 에서도 @Cacheable 에서 Coroutine Suspend 함수를 지원할 수 있도록 하였습니다.

https://github.com/spring-projects/spring-framework/commit/466c8d8f23a982cb53f1cfad0143afd28e9c27ab

Webflux Netty 와 Kotlin Coroutine 은 어떻게 연결되어 있을까?

CoRouterFunctionDsl 을 보면 아래와 같이 

public fun <T> mono(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Mono<T> {
    require(context[Job] === null) { "Mono context cannot contain job in it." +
            "Its lifecycle should be managed via Disposable handle. Had $context" }
    return monoInternal(GlobalScope, context, block)
}

private fun <T> monoInternal(
    scope: CoroutineScope, // support for legacy mono in scope
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
    val reactorContext = context.extendReactorContext(sink.currentContext())
    val newContext = scope.newCoroutineContext(context + reactorContext)
    val coroutine = MonoCoroutine(newContext, sink)
    sink.onDispose(coroutine)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
Creates a cold mono that runs a given block in a coroutine and emits its result

 
Coroutine 을 생성/실행하는 Mono 를 반환하도록 합니다. 즉 Mono / Flux 기반의 인터페이스에서 Coroutine 인터페이스로 옮길 수 있게 된 것입니다.

@Transactional 은?

Kotlin Coroutine Webflux 에서 @Transactional 은 정상 동작 하는데, 이것도 AOP 일텐데 어떻게 지원해주고 있을까요?
Support suspending functions annotated with @Transactional #23575
위 이슈에서 먼저 지원되기 시작했습니다.

ref) https://github.com/spring-projects/spring-framework/commit/5429c7afebaa3255ea80197224023c29c7d552ec
Mono 등 reactive 로 동작하는 Transaction AOP 를 coroutine 인터페이스와 연결한 것을 알 수 있습니다.
 

업데이트 2024/07/07 ) coroutine aware CglibAopProxy

앞서, spring 6.1.0 이후 부터는 AopUtils#invokeJoinpointUsingReflection 에서 invokeSuspendingFuction 를 사용하여 Suspend 함수일 경우, Mono 를 반환하여 Reactive support 하는 MoehodInterceptor 에서 처리하도록 변경되었다고 했습니다.

 

CglibAopProxy 에서도 suspend 함수라면 mono 로 응답이 왔을 것이므로 awaitSIngleOrNull 을 통해 Mono 를 Croutine 으로 변환해줍니다. 

Aspect 에서 Coroutine 을 실행하기 위해 joinPoint.runCoroutine 으로 실행한 suspend block 은 아래와 같이 반환값이 CORUTINE_SUSPENDED 입니다. CORUTINE_SUSPENDED 는 단순 enum 값으로 awaitSIngleOrNull 은 mono 가 아닌 값은 Mono.just(value) 로 바꾼 후 await 합니다. 

 

suspend 함수를 호출하는 것은 coroutine context 일 것으로 CORUTINE_SUSPENDED 값을 알아서 잘 처리해줄 것이라고 기대해볼 수 있습니다. (Continuation 을 통해 다시 돌아올 수 있음)

하지만, proxy 로직에서 target method 가 suspend 함수이면 mono 혹은 flow 가 반환될 것이라 기대하고 있습니다. 혹시 해당 proxy 부분에 mono, flow 관련된 로직이 추가될 경우, Spring AOP 의 기대와 다르게 CORUTINE_SUSPENDED 가 return 되고 있으므로 누락될 가능성이 있습니다. 

따라서, 변경된 suspend 함수에 대한 Spring AOP 로직에 맞추어 Coroutine 을 실행하는 Aspect 에서도 Coroutine 을 Mono 로 변환하여 리턴해주는 것이 서로 약속된 패러다임을 지킬 수 있어 보다 안전하다고 보여집니다.

fun ProceedingJoinPoint.runCoroutine(
    block: suspend () -> Any?
): Mono<*> {
    val continuation = this.coroutineContinuation
    return mono {
        block.startCoroutineUninterceptedOrReturn(continuation)
    }
}

 

Coroutine 을 mono 로 변환하여 반환하였고 proxy 에서도 기대한 바와 같이 mono 로 처리될 수 있는 것을 볼 수 잇습니다.

 

또한, Mono return 값을 jointPoint.proceedCoroutine 에서 await 하도록 하면 Aspect 에서 매번 Mono 인지 아닌지 고민해줄 필요 없이 실제 값으로 생각하고 처리할 수 있도록 공통 로직을 빼줄 수도 있습니다.

suspend fun ProceedingJoinPoint.proceedCoroutine(
    args: Array<Any?> = this.coroutineArgs
): Any? {
    val rtn = suspendCoroutineUninterceptedOrReturn<Any?> { continuation ->
        this.proceed(args + continuation)
    }

    if (rtn is Mono<*>) {
        // for spring 6.1.0 and later
        return rtn.awaitSingleOrNull()
    }
    return rtn
}


 
sample code) https://github.com/imeansu/kotlin-coroutine-spring-aop

 

GitHub - imeansu/kotlin-coroutine-spring-aop

Contribute to imeansu/kotlin-coroutine-spring-aop development by creating an account on GitHub.

github.com