데이터를 Stream 처럼 전송하기 위한 인터페이스
Kafka 처럼 데이터를 제공하는 produce와 소비하는 consumer 의 역할로 구성
개념 및 특징
- 송신자와 수신자의 수에 제한이 없음
- 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있음
- Queue와 동일하게 첫번째로 제공된 데이터는 항상 첫번째로 소비되는 것 보장
- 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스
- SendChannel: 원소(데이터)를 보내거나 채널을 닫는 용도
- ReceiveChannel: 원소(데이터)를 받을 때 사용
- send, receive 모두 중단 함수 (원소를 보내고, 받는 함수가 중단 함수인 것은 필수)
- receive: 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단
- send: 채널의 용량이 다 찼을 때 중단 (용량 제한)
- 중단 함수가 아닌 send, receive를 해야한다면, trySend, tryReceive 사용
interface SendChannel<in E> {
suspend fun send(element:E)
fun close(): Boolean
// ...
}
interface ReceiveChannel<out E> {
suspend fun recevie(): E
fun cancel(cause: CancellationException? = null)
// ...
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
- 일반적으로 수신자는 제공되는 데이터의 양을 모르기 때문에, 송신자가 보내는 만큼 기다리는 방식 선호
- 채널이 닫히기 전까지, for 또는 consumeEach 함수 사용 가능
- 아래 코드의 문제점은 channel을 닫는 것을 잊을 수 있다는 점
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
println("Producing next one")
delay(1000)
channel.send(index * 2)
}
channel.close()
}
launch {
for (elm in channel) {
println(elm)
}
// 또는
// channel.consumeEach { elm ->
// println(elm)
// }
}
}
- 위와 같은 이슈를 해결하기 위해 ReceiveChannel을 반환하는 코루틴 빌더인 produce 함수 사용
- produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 관계없이 채널을 닫음
- produce는 새로운 코루틴을 생성하고, 그 안에서 데이터를 채널을 통해 비동기로 전송하게됨
- 채널을 만드는 가장 인기 있는 방법 추가적으로 가장 안전하며 편리하다고 함
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) { index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (elm in channel) {
println(elm)
}
}
채널 타입
크게 네 가지로 구분 가능하다.
- 무제한: 제한 없는 용량 버퍼를 가지며, send가 중단되지 않음
- 버퍼: 특정 용량 크기 또는 Channel.BUFFERED(기본값: 64)로 설정된 채널
- 랑데뷰: 용량이 0인 채널로, 송신자와 수신자가 만날 때만 원소 교환
- 융합: 버퍼 크기가 1인 Channel.CONFLATED를 가진 채널로 새로운 원소가 이전 원소 대체
버퍼 오버플로일 때
채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때의 행동 정의가 가능하다.
아래는 오버플로우와 관련된 옵션이다.
- SUSPEND: 버퍼가 가득 찼을 때, send 메소드가 중단
- DROP_OLDEST: 버퍼가 가득 찼을 때, 가장 오래된 원소 제거
- DROP_LATEST: 버퍼가 가득 찼을 때, 가장 최근의 원소 제거
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
repeat(5) { index >
channel.send(index * 2)
delay(100)
println("Sent")
}
channel.close()
}
delay(1000)
for (elm in channel) {
println(elm)
delay(1000)
}
}
전달되지 않은 원소 핸들러
onUndeliveredElement: 채널에 보낸 값이 소비되지 않고 유실될 경우 이를 감지하고 정리할 수 있는 콜백 함수이다.
대부분의 경우 채널이 닫히거나 취소되었음을 의미하는데, send, receive, receiveOrNull, hasNext가 에러를 던질 때 발생할 수도 있다.
팬아웃(Fan-out)
여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있는데, 적절히 처리하려면 반드시 for 루프를 사용해야한다.
(consumeEach는 여러 개의 코루틴이 사용할 때 안전하지 않다.)
원소는 공평하게 배분되며, 채널은 원소를 기다리는 코루틴들을 FIFO 큐로 가지고 있다.
팬인(Fan-in)
여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있다.
다수의 채널을 하나의 채널로 합쳐야 하는 경우 produce 함수로 여러 개의 채널을 합치는 fanIn 함수를 사용할 수 있다.
fun <T> CoroutineScope.fanIn(
channels: List<ReceiveChannel<T>>
): ReceiveChannel<T> = produce {
for (channel in channels) {
launch {
for (elm in channel) {
send(elm)
}
}
}
}
실제 처리는 아래와 같이 진행됨
val c1 = produce {
repeat(5) {
send("from c1: $it")
delay(100)
}
}
val c2 = produce {
repeat(5) {
send("from c2: $it")
delay(150)
}
}
val merged = coroutineScope.fanIn(listOf(c1, c2))
for (i in 1..10) {
println(merged.receive())
}
파이프라인
한 채널로부터 받은 우너소를 다른 채널로 전송하는 경우를 파이프라인이라고 부른다.
즉, 여러 코루틴이 체인처럼 연결되어 데이터가 흐르듯이 전달 및 처리되는 것이 파이프라인이다.
아래와 같은 유즈케이스에서 사용될 수 있다.
- 웹 크롤러 → URL 생산 → HTML 파싱 → 정제 → DB 저장
- Kafka 컨슈머 → 필터링 → 변환 → 결과 채널로 전달
- 실시간 로그 파이프라인 → 필터 → 집계 → 알림
// 메인 로직
val numbers: ReceiveChannel<Int> = produce {
for (i in 1..3) {
send(i)
}
}
fun CoroutineScope.square(input: ReceiveChannel<Int>) = produce {
for (x in input) send(x * x)
}
fun CoroutineScope.stringify(input: ReceiveChannel<Int>) = produce {
for (x in input) send("Result: $x")
}
// 사용
val squared = square(numbers)
val strings = stringify(squared)
for (msg in strings) {
println(msg)
}
// 파이프라인 구조
produce(1~3)
↓
square(제곱)
↓
stringify(문자열 변환)
↓
출력
// 결과
Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
실제 사용 사례
- 채널 셋팅
@Configuration
class ChannelConfig(
@Value("\${batch.size}")
private val batchSize: Int,
) {
@Bean
fun TaskRequestChannel(): Channel<TaskRequest> {
return Channel(capacity = batchSize * 2)
}
}
- 카프카 컨슈머 셋팅
@Component
class TaskConsumerListener(
private val taskRequestChannel: Channel<TaskRequest>,
) {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(
topics = ["task-request"],
groupId = "\${spring.kafka.consumer.group-id}",
clientIdPrefix = "test",
containerFactory = "byteArrayKafkaListenerContainerFactory",
)
fun consume(
record: ConsumerRecord<String, ByteArray>,
acknowledgment: Acknowledgment,
) {
runCatching {
val message = record.value()
externalTaskRequestChannel.trySend(message).getOrThrow()
acknowledgment.acknowledge()
}.onFailure {
logger.error("Kafka consume failed: ${it.message}", it)
throw it
}
}
}
- 채널의 데이터 소비
@Component
class CommandLineRunnerSample(
private val taskRequestChannel: Channel<TaskRequest>,
) : CommandLineRunner {
private val logger = LoggerFactory.getLogger(CommandLineRunner::class.java)
private val buffer = mutableListOf<ExternalTaskRequest>()
override fun run(args: Array<String>): Unit =
runBlocking {
CoroutineScope(Dispatchers.IO).launch {
while (isActive) {
// 채널에서 데이터 수신 후 버퍼 전달
val task = taskRequestChannel.receive()
buffer.add(task)
// 생략
}
}
}
}
요약
- 코루틴끼리 통신할 때 사용하는 강력한 기본 도구
- 송신자, 수신자 수에 제한 없음
- 채널에 전송된 데이터는 단 한 번 받는 것이 보장
- 추후 나올 flow와 채널을 연결해서 사용하는 경우가 많음
'Study OR Book > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 28장 - 다른 언어에서의 코루틴 사용법 (1) | 2025.08.06 |
---|---|
[코틀린 코루틴] 24장 - 공유플로우와 상태플로우 (2) | 2025.07.14 |
[코틀린 코루틴] 13장 - 코루틴 스코프 만들기 (1) | 2025.06.11 |
[코틀린 코루틴] 2장 시퀀스 빌더 (0) | 2025.05.20 |
[코틀린 코루틴] 1장 코틀린 코루틴을 배워야 하는 이유 (0) | 2025.05.20 |