본문 바로가기
Study OR Book/코틀린 코루틴

[코틀린 코루틴] 16장 - 채널

by Baest 2025. 6. 12.

 

 

데이터를 Stream 처럼 전송하기 위한 인터페이스
Kafka 처럼 데이터를 제공하는 produce와 소비하는 consumer 의 역할로 구성

개념 및 특징

  • 송신자와 수신자의 수에 제한이 없음
  • 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있음
  • Queue와 동일하게 첫번째로 제공된 데이터는 항상 첫번째로 소비되는 것 보장
  • 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스
    • SendChannel: 원소(데이터)를 보내거나 채널을 닫는 용도
    • ReceiveChannel: 원소(데이터)를 받을 때 사용
    • send, receive 모두 중단 함수 (원소를 보내고, 받는 함수가 중단 함수인 것은 필수)
    • receive: 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단
    • send: 채널의 용량이 다 찼을 때 중단 (용량 제한)
    • 중단 함수가 아닌 send, receive를 해야한다면, trySend, tryReceive 사용
interface SendChannel<in E> {
    suspend fun send(element:E)
    fun close(): Boolean
    // ...
}

interface ReceiveChannel<out E> {
    suspend fun recevie(): E
    fun cancel(cause: CancellationException? = null)
    // ...
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

 

 

  • 일반적으로 수신자는 제공되는 데이터의 양을 모르기 때문에, 송신자가 보내는 만큼 기다리는 방식 선호
    • 채널이 닫히기 전까지, for 또는 consumeEach 함수 사용 가능
    • 아래 코드의 문제점은 channel을 닫는 것을 잊을 수 있다는 점
suspend fun main(): Unit = coroutineScope {
	val channel = Channel<Int>()
    launch {
    	repeat(5) { index ->
        	println("Producing next one")
            delay(1000)
            channel.send(index * 2)
        }
        channel.close()
    }
    
    launch {
    	for (elm in channel) {
        	println(elm)
        }
        // 또는
        // channel.consumeEach { elm ->
        // 	println(elm)
        // }
    }
}

 

 

  • 위와 같은 이슈를 해결하기 위해 ReceiveChannel을 반환하는 코루틴 빌더인 produce 함수 사용
    • produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 관계없이 채널을 닫음
    • produce는 새로운 코루틴을 생성하고, 그 안에서 데이터를 채널을 통해 비동기로 전송하게됨
    • 채널을 만드는 가장 인기 있는 방법 추가적으로 가장 안전하며 편리하다고 함
suspend fun main(): Unit = coroutineScope {
	val channel = produce {
    	repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            send(index * 2)
        }
    }
    
    for (elm in channel) {
    	println(elm)
    }
}

 

 

채널 타입

크게 네 가지로 구분 가능하다.

 

  1. 무제한: 제한 없는 용량 버퍼를 가지며, send가 중단되지 않음
  2. 버퍼: 특정 용량 크기 또는 Channel.BUFFERED(기본값: 64)로 설정된 채널
  3. 랑데뷰: 용량이 0인 채널로, 송신자와 수신자가 만날 때만 원소 교환
  4. 융합: 버퍼 크기가 1인 Channel.CONFLATED를 가진 채널로 새로운 원소가 이전 원소 대체

 

버퍼 오버플로일 때

채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때의 행동 정의가 가능하다.

아래는 오버플로우와 관련된 옵션이다.

 

  • SUSPEND: 버퍼가 가득 찼을 때, send 메소드가 중단
  • DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소 제거
  • DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소 제거
suspend fun main(): Unit = coroutineScope {
	val channel = Channel<Int>(
    	capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    
    launch {
    	repeat(5) { index >
            channel.send(index * 2)
            delay(100)
            println("Sent")
        }
        channel.close()
    }
    
    delay(1000)
    for (elm in channel) {
    	println(elm)
        delay(1000)
    }
}

 

전달되지 않은 원소 핸들러

onUndeliveredElement: 채널에 보낸 값이 소비되지 않고 유실될 경우 이를 감지하고 정리할 수 있는 콜백 함수이다.

대부분의 경우 채널이 닫히거나 취소되었음을 의미하는데, send, receive, receiveOrNull, hasNext가 에러를 던질 때 발생할 수도 있다.

 

팬아웃(Fan-out)

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있는데, 적절히 처리하려면 반드시 for 루프를 사용해야한다.

(consumeEach는 여러 개의 코루틴이 사용할 때 안전하지 않다.)

 

원소는 공평하게 배분되며, 채널은 원소를 기다리는 코루틴들을 FIFO 큐로 가지고 있다.

 

팬인(Fan-in)

여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있다.

 

다수의 채널을 하나의 채널로 합쳐야 하는 경우 produce 함수로 여러 개의 채널을 합치는 fanIn 함수를 사용할 수 있다.

 

fun <T> CoroutineScope.fanIn(
	channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
	for (channel in channels) {
    	launch {
        	for (elm in channel) {
            	send(elm)
            }
        }
    }
}

 

실제 처리는 아래와 같이 진행됨

val c1 = produce {
    repeat(5) {
        send("from c1: $it")
        delay(100)
    }
}

val c2 = produce {
    repeat(5) {
        send("from c2: $it")
        delay(150)
    }
}

val merged = coroutineScope.fanIn(listOf(c1, c2))

for (i in 1..10) {
    println(merged.receive())
}

 

파이프라인

한 채널로부터 받은 우너소를 다른 채널로 전송하는 경우를 파이프라인이라고 부른다.

즉, 여러 코루틴이 체인처럼 연결되어 데이터가 흐르듯이 전달 및 처리되는 것이 파이프라인이다.

 

아래와 같은 유즈케이스에서 사용될 수 있다.

 

  • 웹 크롤러 → URL 생산 → HTML 파싱 → 정제 → DB 저장
  • Kafka 컨슈머 → 필터링 → 변환 → 결과 채널로 전달
  • 실시간 로그 파이프라인 → 필터 → 집계 → 알림

 

 

// 메인 로직
val numbers: ReceiveChannel<Int> = produce {
    for (i in 1..3) {
        send(i)
    }
}

fun CoroutineScope.square(input: ReceiveChannel<Int>) = produce {
    for (x in input) send(x * x)
}

fun CoroutineScope.stringify(input: ReceiveChannel<Int>) = produce {
    for (x in input) send("Result: $x")
}

// 사용
val squared = square(numbers)
val strings = stringify(squared)

for (msg in strings) {
    println(msg)
}



// 파이프라인 구조
produce(1~3)
   ↓
square(제곱)
   ↓
stringify(문자열 변환)
   ↓
출력


// 결과
Result: 1
Result: 4
Result: 9
Result: 16
Result: 25

 

실제 사용 사례

- 채널 셋팅

@Configuration
class ChannelConfig(
    @Value("\${batch.size}")
    private val batchSize: Int,
) {
    @Bean
    fun TaskRequestChannel(): Channel<TaskRequest> {
        return Channel(capacity = batchSize * 2)
    }
}

 

- 카프카 컨슈머 셋팅

@Component
class TaskConsumerListener(
    private val taskRequestChannel: Channel<TaskRequest>,
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaListener(
        topics = ["task-request"],
        groupId = "\${spring.kafka.consumer.group-id}",
        clientIdPrefix = "test",
        containerFactory = "byteArrayKafkaListenerContainerFactory",
    )
    fun consume(
        record: ConsumerRecord<String, ByteArray>,
        acknowledgment: Acknowledgment,
    ) {
        runCatching {
            val message = record.value()
            externalTaskRequestChannel.trySend(message).getOrThrow()
            acknowledgment.acknowledge()
        }.onFailure {
            logger.error("Kafka consume failed: ${it.message}", it)
            throw it
        }
    }
}

 

 

- 채널의 데이터 소비

@Component
class CommandLineRunnerSample(
    private val taskRequestChannel: Channel<TaskRequest>,
) : CommandLineRunner {
    private val logger = LoggerFactory.getLogger(CommandLineRunner::class.java)
    private val buffer = mutableListOf<ExternalTaskRequest>()

    override fun run(args: Array<String>): Unit =
        runBlocking {
            CoroutineScope(Dispatchers.IO).launch {
                while (isActive) {
                	// 채널에서 데이터 수신 후 버퍼 전달
                    val task = taskRequestChannel.receive()
                    buffer.add(task)

                    // 생략
                }
            }
        }
}

 

요약

  • 코루틴끼리 통신할 때 사용하는 강력한 기본 도구
  • 송신자, 수신자 수에 제한 없음
  • 채널에 전송된 데이터는 단 한 번 받는 것이 보장
  • 추후 나올 flow와 채널을 연결해서 사용하는 경우가 많음