본문 바로가기
Study OR Book/코틀린 코루틴

[코틀린 코루틴] 24장 - 공유플로우와 상태플로우

by Baest 2025. 7. 14.

공유플로우(SharedFlow)

SharedFlow: 이벤트 스트림을 여러 수집자에게 브로드캐스팅하는 플로우

 

특징

  • 여러 수집자가 동시에 값을 수집할 수 있음
  • 수집자가 없어도 값을 방출할 수 있음
  • 버퍼링과 리플레이 기능 제공
  • 완료나 예외를 공유하지 않음

 

아래는 브로드캐스트 채널과 비슷한 MutableSharedFlow 이다.

공유플로우를 통해 메시지를 보내면 대기하고 있는 모든 코루틴이 수신하게 된다.

 

  • replay = 0 이라는 의미는 새로운 수집자에게 이전 값을 보내지 않는다.
  • 즉, 수집자(각 launch의 collect)가 구독한 시점 이후에 내보내지는 값만 받게 된다.
  • 아래 두 개의 코루틴이 동시에 같은 SharedFlow를 구독하게 되며, 각각 #1, #2로 구분되어 출력된다.
suspend fun main(): Unit = coroutineScope {
	val mutableSharedFlow = 
    	MutableSharedFlow<String>(replay = 0)
    // 또는 MutableSharedFlow<String>()
    
    launch {
    	mutableSharedFlow.collect {
        	println("#1 receuved $it")
        }
    }
    
    launch {
    	mutableSharedFlow.collect {
        	println("#2 receuved $it")
        }
    }
    
    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}

// (1초 후 )
// #1 receuved Message1
// #2 receuved Message1
// #1 receuved Message1
// #2 receuved Message2

 

위와 다르게 메시지를 보내는 작업을 유지할 수도 있는데, replay 인자를 설정하면 마지막으로 전송한 값들이 셋팅된 수만큼 저장된다.

아래와 같이 replay가 1로 설정됐을 경우 값이 내보내진 이후 추가된 수집자도 메세지를 받게 된다.

val mutableSharedFlow = MutableSharedFlow<String>(replay = 1)

// 값을 먼저 방출
mutableSharedFlow.emit("Message1")

// 그 다음에 수집자 추가
launch {
    mutableSharedFlow.collect {
        println("Late subscriber received: $it")
    }
}

 

 

정리하자면,

  • 동시 수집: SharedFlow는 여러 수집자가 동시에 같은 값을 받을 수 있음
  • replay = 0: 구독 시점 이후 값만 받음
  • 순서 보장: 방출 순서대로 각 수집자에게 전달됨
  • 브로드캣트: 하나의 값이 모든 활성 수집자에게 전달

 

목적: 이벤트 시스템이나 실시간 데이터 스트리밍에서 유용한 패턴으로 사용될 수 있다.

 

shareIn

다양한 클래스가 변화를 감자히는 상황에서 하나의 플로우로 여러 개의 플로우를 만들고 싶을 때 SharedFlow를 사용할 수 있다.
그리고 Flow를 SharedFlow로 바꾸는 가장 쉬운 방법은 shareIn 함수를 사용하는 것이다.

 

shareIn은 Cold Flow를 Hot Flow(ShareFlow)로 변환하는 연산자라고 할 수 있다.

여러 수집자가 동일한 업스트림 플로우를 공유할 수 있도록 해준다.

  • Cold Flow: 각 수집자마다 독립적으로 실행 (새로운 데이터 소스 생성)
  • Hot Flow(ShareFlow): 하나의 데이터 소스를 여러 수집자가 공유

 

shareIn 함수 구조

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

 

  • scope: CoroutineScope
    • SharedFlow가 활성화될 스코프
    • 이 스코프가 취소되면 SharedFlow도 취소
  • started: SharingStarted
    • 언제 업스트림 플로우를 시작할지 결정
    • SharingStarted.Eagerly: 즉시 시작
    • SharingStarted.Lazily: 첫 번째 수집자가 나타날 때 시작
    • SharingStarted.WhileSubscribed(): 수집자가 있는 동안만 활성화
  • replay: Int
    • 새로운 수집자에게 보낼 최근 값의 개수

 

아래와 같이 shareIn을 사용할 수 있다

suspend fun main() = coroutineScope {
    val coldFlow = flow {
        println("Flow started")
        repeat(5) {
            delay(1000)
            emit("Value $it")
        }
    }
    
    val sharedFlow = coldFlow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
        replay = 1
    )
    
    delay(2000)
    
    launch {
        sharedFlow.collect {
            println("Collector 1: $it")
        }
    }
    
    delay(2000)
    
    launch {
        sharedFlow.collect {
            println("Collector 2: $it")
        }
    }
}

 

실시간 데이터 수집 공유 시 사용 예시

  • 수집자가 있을 때만 활성화되며, 없으면 중단됨
@Service
class BrowserDataCollectionService {
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
    
    fun collectBrowserData(): SharedFlow<BrowserData> = flow {
        while (true) {
            val data = collectDataFromBrowser()
            emit(data)
            delay(1000)
        }
    }.shareIn(
        scope = scope,
        started = SharingStarted.WhileSubscribed(),
        replay = 0
    )
}

 

 

상태플로우(StateFlow)

상태플로우는 공유플로우의 개념이 확장된 것이며, replay 값이 1로 셋팅된 공유플로우와 유사하게 작동한다.
상태플로우는 value 프로퍼티로 접근 가능한 값 하나를 항상 가지고 있다.

즉, 현재 상태를 유지하고 상태 변경을 관찰할 수 있는 플로우라고 보면 된다.

 

특징

  • 항상 현재 값을 가지고 있음
  • 새로운 수집자는 즉시 현재 값을 받음
  • 중복된 값은 내보내지 않음
  • 완료되지 않고, 예외를 던지지 않음

 

val stateFlow = MutableStateFlow("초기값")

// 값 변경
stateFlow.value = "새로운 값"

// 현재 값 읽기
println("현재 값: ${stateFlow.value}")

// 수집
stateFlow.collect { value ->
    println("상태 변경: $value")
}

 

 

stateIn

일반적인 Flow를 StateFlow로 변환하는 연산자이다.
shareIn과 유사하지만 상태 관리에 특화되어 있다.

 

stateIn은 Flow의 최신 값을 상태로 유지하며, 새로운 수집자는 즉시 현재 상태를 받을 수 있다.

stateFlow는 항상 값을 가져야하고, 값이 명시되지 않았을 경우 첫 번째 값이 계산될 때까지 기다려야한다.

 

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

 

  • scope: CoroutineScope
    • StateFlow가 활성화될 스코프
  • started: SharingStarted
    • 언제 업스트림 플로우를 시작할지 결정
    • shareIn과 같은 옵션 가짐
  • initialValue: T
    • StateFlow의 초기값(필수)

 

SharingStarted.Eagerly 옵션을 사용한 경우의 예시 코드

  • 즉시 값을 감지하기 시작하고 플로우로 값을 전송
  • replay 값에 제한이 있고, 감지 시작하기 전에 값이 나오면 일부 유실될 수 있다는 점에 주의
susppend fun main(): Unit = couroutinScope {
	val flow = flowOf("A","B","C")
    
    val sharedFlow: SharedFlow<String> = flow.shareIn(
    	scope = this,
        started = SharingStarted.Eagerly,
    )
    
    delay(100)
    launch {
    	sharedFlow.collect { println("#1 $it") }
    }
    println("Done")
}
// (0.1초 후)
// Done

 

공유플로우와 상태플로우 차이

용도

  • 공유플로우: 이벤트 스트림(클릭, 네트워크 요청 등)
  • 상태플로우: 상태 관라(UI 상태, 데이터 상태 등)

값 보관

  • 공유플로우: replay 설정에 따라 최근 값들 보관
  • 상태플로우: 항상 현재 값 하나만 보관

중복 처리

  • 공유플로우: 중복 값도 모두 내보냄
  • 상태플로우: 중복 값은 내보내지 않음