Background) Spring reactive cache
This topic is still relatively new. At the time of writing this article, there was no fluent integration between @Cacheable and reactive frameworks. The primary issue is that there are no non-blocking cache implementations (JSR-107 cache API is blocking). Only Redis is providing a reactive driver.
Despite the issue we mentioned in the previous paragraph, we can still use @Cacheable on our service methods. This will result in caching of our wrapper objects (Mono or Flux) but won’t cache the actual result of our method.
ref) https://www.baeldung.com/spring-webflux-cacheable
reactive framework 중에 @Cacheable 을 지원하는 framework 는 아직 없습니다. (non-blocking cache 구현)
redis 만 reacive driver 를 지원하고 있습니다.
webflux reactive 를 지원해주는 개인이 만들어 놓은 라이브러리 들도 있긴 합니다.
ref) https://github.com/Bryksin/redis-reactive-cache
아직 webflux 에서 spring-boot-starter-cache 를 통해 간편하게 annotation 방식의 AOP 로 redis cache 를 사용할 수 없습니다.
Problem) suspend 함수에 적용된 aop는 왜 제대로 동작하지 않는가
직접 Aspect 코드를 작성하여 redis 에 캐시를 하는 AOP 를 구현해보고자 했습니다.
Kotlin Coroutine 이 적용된 webflux 에 만약 @Cacheable 을 적용한다면 cache 에 COROUTINE_SUSPEND 가 저장되는 등 의도한 대로 동작하지 않는 것을 볼 수 있습니다.
이는 Kotlin Suspend 함수, Coroutine CSP 에서 이유를 찾아볼 수 있습니다.
Why) Kotlin 에서 Coroutine 을 지원하는 방법 CSP
Kotlin 은 비동기 프로그래밍을 지원을 위해 CSP(Communicating Sequential Process) 기법을 사용합니다.
Suspend 함수는 Suspension point(중단점)을 제공하여 함수가 중단될 수 있도록 하며, 이를 통하여 blocking 로직으로 부터 벗어나서 비동기로 동작할 수 있도록 합니다.
Kotlin 은 이 중단점을 제공하기 위해 Suspend 함수를 컴파일 할 때, 함수 인자의 마지막에 Continuation 이라는 객체를 넣어줍니다.
// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/src/kotlin/coroutines/Continuation.kt
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
Continuation은 coroutine에서 실행되는 code block과 execution context를 실질적으로 소유한 객체로, coroutine의 실행 및 중단과 관련된 핵심적인 역할을 수행합니다. 이런 점에서 coroutine은 연속적으로 이어진 하나 이상의 Continuation의 집합체라고도 할 수 있습니다.
ref) https://velog.io/@koo8624/Kotlin-Coroutine의-동작-원리
Kotlin 표준 라이브러리에서 제공하는 가장 대표적인 suspend function 인 delay 는 대략 다음과 같은 형태로 구현되어 있습니다.
suspend fun delay(ms: Long) = suspendCoroutine { continuation ->
Timer().schedule(object : TimerTask() {
override fun run() {
continuation.resume(Unit)
}
}, ms)
}
대략적으로 Continuation 이 어떤 동작을 하는 지 살펴보자면,
아래와 같이 suspend 함수를 사용하는 코드는 아래와 같이 컴파일 됩니다.
suspend fun findUserById(id: Long): User? {
// getUserById 는 suspend 함수!!
val user = userRepository.getUserById(id)
return user
}
static Object findUserById$suspendImpl(final UserSuspendService $this, long id, Continuation $completion) {
Object $continuation;
label20: {
...
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return UserSuspendService.findUserById$suspendImpl($this, 0L, (Continuation)this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch (((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
UserRepository var8 = $this.userRepository;
((<undefinedtype>)$continuation).label = 1;
var10000 = var8.getUserById(id, (Continuation)$continuation);
if (var10000 == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
User user = (User)var10000;
return user;
}
getUserById 는 suspend 함수이기 때문에 중단점이 될 수 있어야 합니다.
아래 switch 문을 보면, getUserById 에서 COROUTINE_SUSPENDED 가 반환되면, 바로 COROUTINE_SUSPENDED 를 리턴합니다. getUserById 함수에서 아직 값이 준비되지 않았다면 , 우선 COROUTINE_SUSPENDED 을 반환하여 중단되도록 한 것입니다.
현재 수행하는 함수를 COROUTINE_SUSPENDED 를 반환하며 빠져 나온 것이고, 이는 Thread 를 Free 시키는 효과가 있습니다.
함수의 실행이 중단 된 것!
이후에 getUserById 으로 부터 값이 준비되면, Continuation 의 resumeWith 를 통해 다시 중단됐던 함수를 실행합니다.
처음 getUserById 을 호출할 때, label 을 1 로 바꾸어 다시 switch 문을 실행했을 때 다음 case 가 수행될 수 있도록 실행 시점을 바꾸었습니다. 따라서, 다시 중단됐던 함수가 실행되면 getUserById 호출(중단점) 이후의 동작을 할 수 있게 됩니다.
Continuation 을 활용하여 callback 과 같은 방식으로 동작하기 때문에
val result = joinPoint.proceed()
을 호출하면 COROUTINE_SUSPENDED 를 반환할 뿐 나머지 로직을 수행할 수 없음과 더불어 본래 result 도 받을 수 없습니다.
결론적으로, AOP 를 적용하더라도 Kotlin Continuation 가 제대로 동작하도록 조치를 취해 줘야 합니다.
How to do) suspendCoroutineUniterceptedOrReturn, startCoroutineUninterceptedOrReturn
Spring project repo 에서 이와 관련된 issue 를 살펴보니
ref) https://github.com/spring-projects/spring-framework/issues/22462#issuecomment-940844810
Kotlin Coroutine API 를 사용하여 COROUTINE_SUSPENDED 가 아닌 제대로된 반환 값을 받아와서 AOP 로직을 수행할 수 있도록 한 코드를 발견했습니다.
해당 코드는 간단하게 이야기해서, arg 의 마지막 인자로 들어오는 Continuation 을 활용하여, Continuation 체인 안에서 AOP 로 수행하고자 하는 로직을 넣도록 하는 것입니다.
// https://gist.github.com/pjanczyk/5d958821bafd911a5996bc0b66788ea3
val ProceedingJoinPoint.coroutineContinuation: Continuation<Any?>
get() = this.args.last() as Continuation<Any?>
val ProceedingJoinPoint.coroutineArgs: Array<Any?>
get() = this.args.sliceArray(0 until this.args.size - 1)
suspend fun ProceedingJoinPoint.proceedCoroutine(
args: Array<Any?> = this.coroutineArgs
): Any? =
suspendCoroutineUninterceptedOrReturn { continuation ->
this.proceed(args + continuation)
}
fun ProceedingJoinPoint.runCoroutine(
block: suspend () -> Any?
): Any? =
block.startCoroutineUninterceptedOrReturn(this.coroutineContinuation)
startCoroutineUninterceptedOrReturn
Java 에서 call 할 수 있는 API 로 Continuation 을 통해 Coroutine 을 조작할여 수행할 수 있도록 합니다.
Starts an unintercepted coroutine without a receiver and with result type T and executes it until its first suspension. Returns the result of the coroutine or throws its exception if it does not suspend or COROUTINE_SUSPENDED if it suspends. In the latter case, the completion continuation is invoked when the coroutine completes with a result or an exception.
전달 받은 Continuation 을 가지고 suspend 함수 block 을 실행 할 수 있도록 합니다.
suspendCoroutineUniterceptedOrReturn
suspendCoroutineUninterceptedOrReturn() 함수는 전달 된 코드 블럭에서 호출 코루틴(Continuation) 정보에 접근할 수 있도록 해줍니다. 또한, 전달 된 코드 블럭에서 COROUTINE_SUSPENDED 라는 미리 정의 된 값을 반환 할 경우에는 코루틴이 처리를 위해 시간이 필요하여 값을 바로 반환하지 않고 처리가 완료되면 continuation 파라미터를 통해 결과를 전달할 것임을 나타내고, 그 이외의 값을 반환할 경우에는 중단 없이 바로 결과 값을 반환한 것을 나타냅니다.
ref) https://myungpyo.medium.com/코루틴-공식-가이드-자세히-읽기-part-2-dive-1-4c468828319
적용하기
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
annotation class CoroutineCacheable(
val prefix: String,
val keyExpression: String,
val ttlSecond: Int
)
class CoroutineCacheImpl: CoroutineCache<Any> {
private val store = ConcurrentHashMap<String, Any>()
override suspend fun get(key: String): Any? {
delay(1)
return store[key]
}
override suspend fun put(key: String, value: Any) {
delay(1)
store[key] = value
}
}
// for both spring 6.1.0+ and earlier
@Around("@annotation(com.example.coroutineaop.aop.CoroutineCacheable) && suspendFunctionPointCut()")
fun coroutineCacheable(joinPoint: ProceedingJoinPoint): Any? {
val coroutineCacheable = (joinPoint.signature as MethodSignature).method.getAnnotation(CoroutineCacheable::class.java)
val prefix = coroutineCacheable.prefix
val expire = coroutineCacheable.ttlSecond
val keyValue = getKeyValue(joinPoint, coroutineCacheable.keyExpression)
val cacheKey = "$prefix:$keyValue"
return joinPoint.runCoroutine {
// get 은 suspend 함수
coroutineCache.get(cacheKey)?.let { cached ->
return@runCoroutine cached
}
joinPoint.proceedCoroutine().let { rtn ->
if (rtn is Mono<*>) {
// for spring 6.1.0 and later
rtn.awaitSingleOrNull()?.let { result ->
coroutineCache.put(cacheKey, result)
return@runCoroutine result
}
} else {
rtn?.let {
coroutineCache.put(cacheKey, rtn)
return@runCoroutine rtn
}
}
}
}
}
위 코드를 통해 이제 COROUTINE_SUSPENDED 가 아닌 제대로된 반환값을 가지고 로직을 수행할 수 있게 되었습니다.
Plus) Spring AOP is not compatible with Kotlin Coroutines #22462
ref) https://github.com/spring-projects/spring-framework/issues/22462
위 코드를 보면 spring 6.1.0 이후 부터는 mono 가 반환되어 awaitSingleOrNull 를 통해 본래 반환값을 가져오는 것을 볼 수 있습니다.
(Spring Boot 의 경우 3.2 부터 사용할 수 있는 것으로 보입니다. Spring Boot 3.2 Release Notes)
사실 Spring AOP 에서 Kotlin Coroutine 을 support 하는 논의는 오래전부터 있었습니다.
Spring 6.1.0-M5 에 이를 반영하여, AopUtils#invokeJoinpointUsingReflection 에서 아래 invokeSuspendingFuction 를 사용하여 Suspend 함수일 경우, Mono 를 반환하여 Reactive support 하는 MoehodInterceptor 에서 처리되도록 하였습니다.
따라서, 해당 이후 버전을 사용한다면, 아래와 같이 Kotlin Coroutine API 를 사용하지 않고도 바른 값을 가져올 수 있게 되었습니다.
하지만, Cache 관련 로직이 Coroutine Suspend 를 사용한다면, 위에서 한 방식처럼 Continutation 체인 안에 올바르게 관련 로직을 포함하여야 합니다.
// only for spring 6.1.0 and later
@Around("@annotation(com.example.coroutineaop.aop.Cacheable)")
fun cacheable(joinPoint: ProceedingJoinPoint): Any? {
val method = (joinPoint.signature as MethodSignature).method
val cacheable = method.getAnnotation(Cacheable::class.java)
val prefix = cacheable.prefix
val expire = cacheable.ttlSecond
val keyValue = getKeyValue(joinPoint, cacheable.keyExpression)
val cacheKey = "$prefix:$keyValue"
// get 은 suspend 함수가 아니다
cache.get(cacheKey)?.let { cached ->
return cached
}
return joinPoint.proceed().let { rtn ->
if (KotlinDetector.isKotlinReflectPresent() && KotlinDetector.isSuspendingFunction(method)) {
return@let (rtn as Mono<*>).map { result ->
cache.put(cacheKey, result)
result
}
}
rtn?.let {
cache.put(cacheKey, rtn)
rtn
}
}
}
참고
Add Coroutines support for @Cacheable
Spring v6.1.0-RC2 에서도 @Cacheable 에서 Coroutine Suspend 함수를 지원할 수 있도록 하였습니다.
https://github.com/spring-projects/spring-framework/commit/466c8d8f23a982cb53f1cfad0143afd28e9c27ab
Webflux Netty 와 Kotlin Coroutine 은 어떻게 연결되어 있을까?
CoRouterFunctionDsl 을 보면 아래와 같이
public fun <T> mono(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> {
require(context[Job] === null) { "Mono context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return monoInternal(GlobalScope, context, block)
}
private fun <T> monoInternal(
scope: CoroutineScope, // support for legacy mono in scope
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
Creates a cold mono that runs a given block in a coroutine and emits its result
Coroutine 을 생성/실행하는 Mono 를 반환하도록 합니다. 즉 Mono / Flux 기반의 인터페이스에서 Coroutine 인터페이스로 옮길 수 있게 된 것입니다.
@Transactional 은?
Kotlin Coroutine Webflux 에서 @Transactional 은 정상 동작 하는데, 이것도 AOP 일텐데 어떻게 지원해주고 있을까요?
Support suspending functions annotated with @Transactional #23575
위 이슈에서 먼저 지원되기 시작했습니다.
ref) https://github.com/spring-projects/spring-framework/commit/5429c7afebaa3255ea80197224023c29c7d552ec
Mono 등 reactive 로 동작하는 Transaction AOP 를 coroutine 인터페이스와 연결한 것을 알 수 있습니다.
업데이트 2024/07/07 ) coroutine aware CglibAopProxy
앞서, spring 6.1.0 이후 부터는 AopUtils#invokeJoinpointUsingReflection 에서 invokeSuspendingFuction 를 사용하여 Suspend 함수일 경우, Mono 를 반환하여 Reactive support 하는 MoehodInterceptor 에서 처리하도록 변경되었다고 했습니다.
CglibAopProxy 에서도 suspend 함수라면 mono 로 응답이 왔을 것이므로 awaitSIngleOrNull 을 통해 Mono 를 Croutine 으로 변환해줍니다.
Aspect 에서 Coroutine 을 실행하기 위해 joinPoint.runCoroutine 으로 실행한 suspend block 은 아래와 같이 반환값이 CORUTINE_SUSPENDED 입니다. CORUTINE_SUSPENDED 는 단순 enum 값으로 awaitSIngleOrNull 은 mono 가 아닌 값은 Mono.just(value) 로 바꾼 후 await 합니다.
suspend 함수를 호출하는 것은 coroutine context 일 것으로 CORUTINE_SUSPENDED 값을 알아서 잘 처리해줄 것이라고 기대해볼 수 있습니다. (Continuation 을 통해 다시 돌아올 수 있음)
하지만, proxy 로직에서 target method 가 suspend 함수이면 mono 혹은 flow 가 반환될 것이라 기대하고 있습니다. 혹시 해당 proxy 부분에 mono, flow 관련된 로직이 추가될 경우, Spring AOP 의 기대와 다르게 CORUTINE_SUSPENDED 가 return 되고 있으므로 누락될 가능성이 있습니다.
따라서, 변경된 suspend 함수에 대한 Spring AOP 로직에 맞추어 Coroutine 을 실행하는 Aspect 에서도 Coroutine 을 Mono 로 변환하여 리턴해주는 것이 서로 약속된 패러다임을 지킬 수 있어 보다 안전하다고 보여집니다.
fun ProceedingJoinPoint.runCoroutine(
block: suspend () -> Any?
): Mono<*> {
val continuation = this.coroutineContinuation
return mono {
block.startCoroutineUninterceptedOrReturn(continuation)
}
}
Coroutine 을 mono 로 변환하여 반환하였고 proxy 에서도 기대한 바와 같이 mono 로 처리될 수 있는 것을 볼 수 잇습니다.
또한, Mono return 값을 jointPoint.proceedCoroutine 에서 await 하도록 하면 Aspect 에서 매번 Mono 인지 아닌지 고민해줄 필요 없이 실제 값으로 생각하고 처리할 수 있도록 공통 로직을 빼줄 수도 있습니다.
suspend fun ProceedingJoinPoint.proceedCoroutine(
args: Array<Any?> = this.coroutineArgs
): Any? {
val rtn = suspendCoroutineUninterceptedOrReturn<Any?> { continuation ->
this.proceed(args + continuation)
}
if (rtn is Mono<*>) {
// for spring 6.1.0 and later
return rtn.awaitSingleOrNull()
}
return rtn
}
sample code) https://github.com/imeansu/kotlin-coroutine-spring-aop