본문 바로가기

카테고리 없음

[Java 21] Virtual Thread 의 원리 - Continuation

 

들어가며

Java 21 에는 경량 스레드 모델인 Virtual Thread 가 추가되었습니다. 기존 Java 의 스레드는 커널의 스레드와 1 대 1 매칭이 되기 때문에, 다수의 스레드를 사용하여 프로그래밍 하기에는 컨텍스트 스위칭 비용이 많이 들었습니다. 이를 보완하기 위해 Spring Webflux 와 같은 Reactive Programing 이나 Kotlin 의 Coroutine 을 많은 곳에서 적용하고 있으며, Spring Webflux + Kotlin Coroutine 조합은 하나의 표준처럼 보여지기도 합니다.

 

하지만, Webflux 를 사용하기 위해서는 DB 등의 드라이버들이 Mono, Flux 를 사용하는 Reactive 방식을 지원해주어야 하며, 잘못하여 blocking 로직이 애플리케이션에 포함되는 경우, 성능이 되려 떨어지는 것을 볼 수도 있습니다.

 

무엇보다도, 개인적으로는 reactive 방식은 실행 스레드가 여러 군데로 돌아다녀서 이를 트래킹하기 어려우며, 디버깅할 때, StackTrace 등을 제대로 보기 힘들다는 단점이 가장 불편하게 다가오는 것 같습니다. 그리고 아직은 spring cache, aop 와 같이 reactive 방식을 아직 프레임워크에서 온전히 지원하지 않습니다. 아직은 미성숙한 기술을 도입함으로 인해 기존에 쉽게 되던 것을 어렵게 돌아 해결해야 하는 것도 개발 리소스를 많이 소모하는 일이며, 유지보수에도 많은 비용이 듭니다.

 

주로 reactive, coroutine 과 같은 비동기 모델들은, I/O 혹은 네트워크 호출 등과 같은 blocking 타임을 줄여 효율적으로 자원을 활용하자는 것이 핵심 골자입니다. 이를 위해서 subscribe-publicsh, suspend with CSP 과 같은 기술들이 사용됩니다.

 

위 기법들은, Java 라는 언어의 기반 위에서 프로그래밍 방식 바꾸어서 문제를 해결했습니다. 이는 개발자의 개발 방식이나 패러다임도 이를 따라가야 하며, 함수의 색 문제로 인해 하나의 방식이 강제되는 불편함도 야기합니다.

(비동기와 같은 특정 프로그래밍 방식을 적용하기 위해 전체의 코드가 그 패러다임을 따라가야 하는 문제 / Mono, Flux 혹은 suspend function)

 

반면에, Virtual Thread 는 JVM 자체에서 suspend, 중단 기능을 제공한다는 점에서 큰 차이가 있습니다. 개발자는 기존(MVC) 처럼 그대로 코드를 작성하면서, 비동기로 자원을 효율적으로 활용할 수 있습니다.

 

그렇다면 Java 21 에서는 어떻게 Virtual Thread 가 중단/재개 될 수 있도록 해준걸까요? 그 기술에 대해, Spring I/O 2024 에서 발표된 <Continuations: The magic behind virtual threads in Java by Balkrishna Rawool @ Spring I/O 2024> 를 정리해보면서, 한번 살펴볼까 합니다.

 

VirtualThread 동작 방식

Virtual Thread 의 동작 방식에 대해 잘 설명해주신 글을 쉽게 찾아볼 수 있습니다.

https://techblog.woowahan.com/15398/

 

Virtual Thread 는 park() 메소드가 호출되면 실행이 중단되고, unpark() 메소드로 중단되었던 지점부터 다시 수행될 수 있습니다.

중단 그리고 재개하는 방식을 이해하기 위해서는 Virtual Thread 가 가지고 있는 Continuation 을 보면 됩니다.

Continuation 이란

 

package jdk.internal.vm 패키지에 있는 Continuation 은 현재 프로그램의 상태를 저장하는 곳입니다. 따라서, 이 Continuation 을 통해서 우리는 실행을 중단하여 상태를 보관하고 다른 어느 곳에서라도 다시 상태를 불러와서 나머지 실행을 이어갈 수 있습니다. 한마디로, “연산의 나머지” 라고 표현될 수 있습니다.

 

이 Continutation 을 직접 만져보면서 구체적으로 어떻게 동작하는 지 살펴보도록 하겠습니다.

Continuation API 는 애플리케이션에서 직접 사용할 수 없도록 제한되어 있습니다.

따라서, 아래와 같이 해당 패키지에 접근할 수 있도록 VM option 을 전달해야 합니다.

 

원리를 이해하기 위해서 제한을 풀어보는 것이며, 실제 상용 코드에서는 해당 옵션을 사용해서는 안됩니다.

 

처음으로 Continuataion 의 namespace 와 같은 역할을 하는 ContinutationScope 를 만들어줍니다.

Continuation 에 scope 와 실행하고자 하는 로직을 Runnable 로 넘겨 생성해줍니다.

실행 결과를 보면, cont.run() 으로 Continuation 을 실행할 수 있으며, Runnable 내부에서 Continuation.yield(scope) 로 실행을 중단시킬 수 있으며, 다른 로직을 수행한 다음에 다시, cont.run() 을 통해 중단 지점부터 다시 수행되도록 할 수 있습니다.

 

그럼 Continuation.yield(scope) 에서 어떤 마법을 부려주기에 이게 가능할걸까요?

 

Continuation 이해하기

 

먼저 cont.run() 을 하게 되면, Runnable 내부의 로직들이 Stack 에 쌓이게 됩니다.

 

 

그러다가 Cont.yield() 가 호출되게 되면 cont.run() 이후 Stack 을 cont 가 가지고 Heap 메모리 영역으로 이동하게 됩니다.

 

 

다시 cont.run() 이 호출되면 cont 가 Head 으로 가져갔던 Stack 부분을 다시 cont.run() 이후에 쌓고 실행을 이어나가게 됩니다.

 

Continuation 만 있으면 우리는 간단하게 VirtualThread 를 직접 만들어볼 수 있습니다.

public class VirtualThread {
    private static final AtomicInteger COUNTER = new AtomicInteger(1);
    public static ContinuationScope SCOPE = new ContinuationScope("VirtualThread");

    private Continuation cont;
    private int id;

    public VirtualThread(Runnable runnable) {
        cont = new Continuation(SCOPE, runnable);
        id = COUNTER.getAndIncrement();
    }

    public void run() {
        System.out.println(STR."VirtualThread \{id} is running on \{Thread.currentThread()}");
        cont.run();
    }
}

 

public class VirtualThreadScheduler {

    private Queue<VirtualThread> queue = new ConcurrentLinkedQueue<>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);

    public void start() {
        while (true) {
            if (!queue.isEmpty()) {
                var vt = queue.remove();
                executor.submit(vt::run);
            }
        }
    }

    public void schedule(VirtualThread vt) {
        queue.add(vt);
    }
}


public class VtApplication {
    public static final VirtualThreadScheduler SCHEDULER = new VirtualThreadScheduler();

    public static void main(String[] args) {
        new Thread(SCHEDULER::start).start();

        var vt1 = new VirtualThread(() -> {
            System.out.println("1.1");
            System.out.println("1.2");
            System.out.println("1.3");
            System.out.println("1.4");
        });

        var vt2 = new VirtualThread(() -> {
            System.out.println("2.1");
            System.out.println("2.2");
            System.out.println("2.3");
            System.out.println("2.4");
        });

        SCHEDULER.schedule(vt1);
        SCHEDULER.schedule(vt2);
    }
}


// output
VirtualThread 1 is running on Thread[#23,pool-1-thread-1,5,main]
1.1
1.2
1.3
1.4
VirtualThread 2 is running on Thread[#23,pool-1-thread-1,5,main]
2.1
2.2
2.3
2.4

 

위에서 프린트된 Thread.currentThread() 는 사실 Platform Thread 입니다. VirtualThread 마다 서로 다른 값을 가지게 하고 싶다면, Project Loom 에서는 ThreadLocal 과 유사하지만, VirtualThread 에 더 적합한 ScopedValue 를 제공합니다.

public class VirtualThreadScheduler {
    public static final ScopedValue<VirtualThread> CURRENT_VIRTUAL_THREAD = ScopedValue.newInstance();

    private Queue<VirtualThread> queue = new ConcurrentLinkedQueue<>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);

    public void start() {
        while (true) {
            if (!queue.isEmpty()) {
                var vt = queue.remove();
                executor.submit(() -> ScopedValue.where(CURRENT_VIRTUAL_THREAD, vt)
                        .run(vt::run));
            }
        }
    }

    public void schedule(VirtualThread vt) {
        queue.add(vt);
    }
}

 

위와 같이 ScopedValue 를 쓴다면, 각 VirtualThread 마다 CURRENT_VIRTUAL_THREAD 에는 자기 자신을 값으로 넣을 수 있게 됩니다.

 

이제 I/O 작업이나 Network 요청과 같이 blocking 로직이 있는 상황에서 VirtualThread 가 어떻게 중단되고 재개되는 지 간단하게 살펴보겠습니다.

public class WaitingOperation {

    public static void perform(String name, int duration) {
        System.out.println(STR."Waiting for \{name} for \{duration} seconds");

        var virtualThread = CURRENT_VIRTUAL_THREAD.get();
        var timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                SCHEDULER.schedule(virtualThread);
                timer.cancel();
            }
        }, duration * 1000L);
        Continuation.yield(VirtualThread.SCOPE);
    }
}

public class VtApplication {
    public static final VirtualThreadScheduler SCHEDULER = new VirtualThreadScheduler();

    public static void main(String[] args) {
        new Thread(SCHEDULER::start).start();

        var vt1 = new VirtualThread(() -> {
            System.out.println("1.1");
            System.out.println("1.2");
            WaitingOperation.perform("Network", 2);
            System.out.println("1.3");
            System.out.println("1.4");
        });

        var vt2 = new VirtualThread(() -> {
            System.out.println("2.1");
            System.out.println("2.2");
            WaitingOperation.perform("DB", 5);
            System.out.println("2.3");
            System.out.println("2.4");
        });

        SCHEDULER.schedule(vt1);
        SCHEDULER.schedule(vt2);
    }
}

 

위와 같은 blocking 로직을 추가해보겠습니다.

VirtualThread 1 is running on Thread[#23,pool-1-thread-1,5,main]
VirtualThread 2 is running on Thread[#24,pool-1-thread-2,5,main]
1.1
1.2
2.1
2.2
Waiting for Network for 2 seconds
Waiting for DB for 5 seconds
VirtualThread 1 is running on Thread[#27,pool-1-thread-3,5,main]
1.3
1.4
VirtualThread 2 is running on Thread[#23,pool-1-thread-1,5,main]
2.3
2.4

 

Continuation.yield(VirtualThread.SCOPE) 을 실행한 WaitingOperation 은 blocking 동안 중단되었고 다른 VirtualThread 를 실행한 것을 볼 수 있으며, VirtualThread 를 실행하는 Platform Thread 도 계속 변경되는 것도 확인할 수 있습니다.

 

Continuation.yield 내부 구현은 어떻게 되어 있을까요?

Continuation.yield

 

yield 는 내부적으로 yield0 를 호출하고 있으며, yield0 는 다시 doYield 를 호출합니다.

doYield 는 들어가보니 JNI 를 통해 네이티브 코드를 호출하고 있습니다. 그래도 궁금하니 doYield 가 어떻게 구현되어 있는지 살짝 봐보겠습니다.

 

openJdk repo 에서 찾아보면 중단하는 doYield 그리고 재개하는 enterSpecial 의 C++ 구현을 찾아볼 수 있었습니다.

https://github.com/openjdk/jdk/blob/367e0a65561f95aad61b40930d5f46843fee3444/src/hotspot/share/runtime/continuationFreezeThaw.cpp

/************************************************

Thread-stack layout on freeze/thaw.
See corresponding stack-chunk layout in instanceStackChunkKlass.hpp

            +----------------------------+
            |      .                     |
            |      .                     |
            |      .                     |
            |   carrier frames           |
            |                            |
            |----------------------------|
            |                            |
            |    Continuation.run        |
            |                            |
            |============================|
            |    enterSpecial frame      |
            |  pc                        |
            |  rbp                       |
            |  -----                     |
        ^   |  int argsize               | = ContinuationEntry
        |   |  oopDesc* cont             |
        |   |  oopDesc* chunk            |
        |   |  ContinuationEntry* parent |
        |   |  ...                       |
        |   |============================| <------ JavaThread::_cont_entry = entry->sp()
        |   |  ? alignment word ?        |
        |   |----------------------------| <--\
        |   |                            |    |
        |   |  ? caller stack args ?     |    |   argsize (might not be 2-word aligned) words
Address |   |                            |    |   Caller is still in the chunk.
        |   |----------------------------|    |
        |   |  pc (? return barrier ?)   |    |  This pc contains the return barrier when the bottom-most frame
        |   |  rbp                       |    |  isn't the last one in the continuation.
        |   |                            |    |
        |   |    frame                   |    |
        |   |                            |    |
            +----------------------------|     \__ Continuation frames to be frozen/thawed
            |                            |     /
            |    frame                   |    |
            |                            |    |
            |----------------------------|    |
            |                            |    |
            |    frame                   |    |
            |                            |    |
            |----------------------------| <--/
            |                            |
            |    doYield/safepoint stub  | When preempting forcefully, we could have a safepoint stub
            |                            | instead of a doYield stub
            |============================| <- the sp passed to freeze
            |                            |
            |  Native freeze/thaw frames |
            |      .                     |
            |      .                     |
            |      .                     |
            +----------------------------+

************************************************/

 

///////////

enum class oop_kind { NARROW, WIDE };
template <oop_kind oops, typename BarrierSetT>
class Config {
public:
  typedef Config<oops, BarrierSetT> SelfT;
  using OopT = std::conditional_t<oops == oop_kind::NARROW, narrowOop, oop>;

  static int freeze(JavaThread* thread, intptr_t* const sp) {
    return freeze_internal<SelfT>(thread, sp);
  }

  static intptr_t* thaw(JavaThread* thread, Continuation::thaw_kind kind) {
    return thaw_internal<SelfT>(thread, kind);
  }
};


// https://github.com/openjdk/jdk/blob/34edc7358f733cdf433d0ff50921bcb5a94c5e35/src/hotspot/share/runtime/continuationFreezeThaw.cpp#L662
void FreezeBase::freeze_fast_copy(stackChunkOop chunk, int chunk_start_sp CONT_JFR_ONLY(COMMA bool chunk_is_allocated))

// https://github.com/openjdk/jdk/blob/367e0a65561f95aad61b40930d5f46843fee3444/src/hotspot/share/runtime/continuationFreezeThaw.cpp#L1895
void ThawBase::copy_from_chunk(intptr_t* from, intptr_t* to, int size)

// https://github.com/openjdk/jdk/blob/34edc7358f733cdf433d0ff50921bcb5a94c5e35/src/hotspot/share/oops/stackChunkOop.inline.hpp#L330
inline void stackChunkOopDesc::copy_from_stack_to_chunk(intptr_t* from, intptr_t* to, int size)

 

doYield 는 freeze, enterSpecial 은 thaw 로 연결되며 내부 구현을 보면 stack memory 를 memcopy 하여 chuck 로 힙에 저장해놓고, 다시 실행할 때 가져와 stack 에 부어주는 것으로 보여집니다.

결국, 우리가 앞서 본 것 처럼 JNI 를 통해 정말로 stack 을 copy 해놨다가 다시 재개할 시점에 다시 불러오고 있는 것을 확인할 수 있습니다.

 

실제로 Spring MVC 에서 사용해본다면, 언제 VirtualThread 는 중단될까요? JPA 를 통해 간단히 확인해보겠습니다.

사례 연구) Spring Jpa

Spring Boot 3.2 & Java 21 에서는 Virtual Thread 를 사용하기 위해서는 아래 property 만 추가해주면 끝납니다.

spring.threads.virtual.enabled=true

 

실험해보기 위해 간단히 Jpa 로 mysql 으로 부터 데이터를 받아오는 함수를 작성해줍니다.

@Repository
public interface SimpleRepository extends JpaRepository<SimpleEntity, Long> {
    List<SimpleEntity> findAll();
}

// ----------------------------------

@Service
public class SimpleService {
	public List<SimpleEntity> getAllSimples() {
	        return simpleRepository.findAll();
	}
}

 

db 는 mysql 이지만 아직 mysql connector 에서는 synchronized 블럭으로 인한 pinning 이슈가 다 해결되지 않아서 mariadb connector 를 사용했습니다.

더보기

(mydql connector)

아래 StackTrace 는 db 조회 시에 virtualThread 의 park 호출하기 전까지의 내용입니다.

executeQuery 하면 driver 는 NioSocketImpl 을 통해서 db 와 통신하게 되고, 이 통신 과정에서 IOStatus 에 따라서 park 를 호출합니다. 

implRead:309, NioSocketImpl (sun.nio.ch)
read:346, NioSocketImpl (sun.nio.ch)
read:796, NioSocketImpl$1 (sun.nio.ch)
read:1099, Socket$SocketInputStream (java.net)
fill:291, BufferedInputStream (java.io)
read1:347, BufferedInputStream (java.io)
implRead:420, BufferedInputStream (java.io)
read:399, BufferedInputStream (java.io)
readReusablePacket:62, PacketReader (org.mariadb.jdbc.client.socket.impl)
readPacket:153, ClientMessage (org.mariadb.jdbc.message)
readPacket:886, StandardClient (org.mariadb.jdbc.client.impl)
readResults:825, StandardClient (org.mariadb.jdbc.client.impl)
readResponse:744, StandardClient (org.mariadb.jdbc.client.impl)
execute:668, StandardClient (org.mariadb.jdbc.client.impl)
executeInternal:91, ClientPreparedStatement (org.mariadb.jdbc)
executeQuery:290, ClientPreparedStatement (org.mariadb.jdbc)
executeQuery:52, ProxyPreparedStatement (com.zaxxer.hikari.pool)
executeQuery:-1, HikariProxyPreparedStatement (com.zaxxer.hikari.pool)
executeQuery:246, DeferredResultSetAccess (org.hibernate.sql.results.jdbc.internal)
getResultSet:167, DeferredResultSetAccess (org.hibernate.sql.results.jdbc.internal)

 

NioSocketImpl 의 park 메서드는 virtualThread 일 경우 기존의 커널에 요청하는 Net.poll 이 아닌 Poller.poll 을 따라

 

virtualThread 의 park 를 호출하고 위에서 우리가 보았던 것처럼 I/O 동안에 Continuation 에 실행 흐름을 담아두는 로직으로 이어집니다.

 

reference)