메뉴

문서정보

목차

원문 - The behavior of channels

소개

처음 go의 채널을 사용 했을 때, 나는 채널을 데이터 구조체(스트럭처)처럼 다루는 실수를 했다. 채널은 고루틴(goroutine)사이에서 큐를 제공하고, 자동으로 데이터를 동기화 해주는 것이라고 보았기 때문이다. 이렇게 채널을 구조체로 보는 것 때문에, 복잡하고 나쁜 동시성 코드를 만들게 됐다.

시간이 지나면서 채널을 스트럭처로 보는 대신, 행위에 중점을 두는게 좋다는 것을 알게 됐다. 이제 나는 채널을 구조체가 아닌 시그널링(signaling)로 바라본다. 채널은 하나의 고루틴에서 다른 고루틴으로 어떤 이벤트를 보내는 시그널이다. 시그널을 주고 받는 행위가 채널의 핵심이다. 채널을 시그널 관점에서 보면, 좀 더 깨끗하고 정확히 작동하는 코드를 만들 수 있다.

채널을 시그널 관점에서 이해하기 위해서 아래의 특성들을 살펴보려 한다.

전송 보장

전송 보장은 다른 고루틴으로 시그널을 보냈을 때, 그 고루틴이 받았다는 것을 어떻게 보장 받을 수 있는지에 대한 물음이다.

아래의 코드를 보자.
func main() {
	go func() {
		p := <-ch   // Receive
	}()
	ch <- "paper"   // Send
}
main 함수는 ch 채널로 문자열 "paper"를 전송한다. 이 문자열이 고루틴에 전송될 것을 보장 할 수 있을까 ? 고루틴이 먼저 실행될 수 있지만, main 함수의 "paper" 전송보다 느리게 시작될 수도 있을 것이다. 이 경우에는 어떨까 ? 고루틴에 sleep를 줘서 의도적으로 고루틴이 늦게 시작되게 만들어봤다.
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go func() {
        for {
            time.Sleep(time.Second * 2)
            p := <-ch
            fmt.Println(">>>", p)
        }
    }()

    fmt.Println("Send paper")
    ch <- "paper"

    fmt.Println("Send apple")
    ch <- "apple"
    time.Sleep(time.Second * 2)
}
고루틴이 채널에서 메시지를 읽지 않으면, 채널 쓰기가 블럭되는 걸 확인 할 수 있을 것이다. 채널을 수신하는 쪽이 메시지를 읽어야지만 메시지를 송신 할 수 있다. 즉, 전송된 메시지가 반드시 도달 할 것이라는 것을 보장 할 수 있다.

이렇게 전송이 보장될지 안될지는 채널이 Buffered인지 Unbuffered인지에 따라 달라진다. 아래 그림을 보자.

 Guarantee Of Delivery

인생에 있어서 무엇을 보장 받는 다는 것이 중요한 것처럼, 소프트웨어에서도 (때때로 일정부분 보장을 포기해야 하는 경우도 있지만)보장은 매우 중요하다. Go에서 어떻게 메시지가 전달되는 걸 보장 할 수 있는지, 어떤 경우에 포기해야 하는지를 살펴보려 한다.

채널의 상태

채널의 상태는 채널이 어떻게 작동 할 지에 영향을 끼친다. 채널은 nil, open, close 3 개중 하나의 상태를 가질 수 있다. 아래 코드는 채널의 상태를 선언하고 배치하는 방법을 보여준다.
// ** nil 채널
// 채널의 zero value는 nil 이다.
var ch chan string

// 채널을 명시적으로 nil 상태로 설정 할 수도 있다.
ch = nil

// ** open 채널
ch := make(chan string)

// ** close 채널
close(ch)
채널의 상태에 따라서 전송과 수신이 어떻게 될지 결정된다.

 Channel State

채널이 nil이라면 채널에 대한 읽기와 쓰기 모두 블럭된다. 채널이 open상태라면 즉시 주고 받을 수 있다. 채널이 closed상태에서 시그널을 보내면 panic이 발생한다. 하지만 받는 쪽은 계속 시그널을 읽는다. 시그널을 기다리는 채널이 close 되면 채널 "타입의 제로값"을 반환한다.
package main

import (
    "fmt"
    "time"
)

func main() {
    sch := make(chan string)
    ich := make(chan int)
    go func() {
        d, ok := <-sch
        fmt.Printf("Read Str %#v %#v\n", d, ok)
    }()

    go func() {
        d, ok := <-ich
        fmt.Printf("Read Int %#v %#v\n", d, ok)
    }()

    time.Sleep(time.Second * 1)
    fmt.Println("Close")
    close(sch)
    close(ich)
    time.Sleep(time.Second * 1)
}
		
채널에 제로값이 전달된건지 아니면, 채널이 닫혀서 제로값이 전달된 건지는 두번째 리턴값으로 알 수 있다. 채널이 닫힌 경우 more에 false가 반환된다.

각 상태에서 전송과 수신이 어떻게 작동하는지, 그리고 어느 때 전송을 보장할 수 있는지를 알고 있다면 각 선택의 결과로 발생하는 비용과 편익을 분석 할 수 있다. 많은 경우 코드를 읽는 것만으로도 빠르게 버그를 발견할 수 있다. 채널이 어떻게 작동할지를 알 수 있기 때문이다.

데이터가 있을 때와 없을 때

시그널은 데이터를 포함하며 우리는 채널을 통해서 데이터를 보낼 수 있다.
ch <- "paper"
데이터는 보통 아래와 같은 목적으로 전송한다. 체널을 닫는 경우 데이터를 포함하지 않은 시그널을 전송한다.
close(ch)
데이터를 포함하지 않는 시그널은 아래의 경우에 사용한다. 아래코드를 보자.
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go func() {
        for {
            p, more := <-ch
            fmt.Println(">>>", p, more)
        }
    }()

    fmt.Println("Send Data")
    ch <- "paper"
    time.Sleep(time.Second * 2)
    fmt.Println("close paper")
    close(ch)
    time.Sleep(time.Second * 40)
}
close(ch)를 호출하면, 데이터가 전송되지 않으며 more가 false인 것을 알 수 있다.

시그널에 데이터를 보내기

시그널에 데이터를 함께 보낼 경우, buffered 채널인지 unbufered 채널인지에 따라서 전송보장이 결정된다.

 Signaling with data

Unbuffered, Buffered > 1Buffered = 1 3개의 채널 옵션이 있다.

버퍼의 크기에 따라서 보장(Guarantee)의 상태가 달라지기 때문에, 임의의 크기가 아닌 잘 정의된 제약조건에 따라서 계산된 크기를 설정해야한다.

데이터가 없는 시그널 보내기

데이터 없이 시그널은 주로 작업 취소를 위해서 사용한다. 고루틴은 다른 고루틴에 신호를 보내서 작업을 취소하게 하고 다른 작업을 진행 할 수 있다. 취소는 Unbuffered 채널과 Buffered 채널 모두를 사용하여 구현할 수 있지만 Buffred 채널을 사용 할 때는 코딩에 신경을 좀 써줘야 한다.

 Signaling Without Data

내장 함수 close를 이용해서 데이터 없이 시그널을 보낼 수 있다. close를 이용해서 채널을 닫더라도 여전히 채널에서 시그널을 기다릴 수 있다. 하지만 닫힌 채널은 항상 시그널을 보내기 때문에 블럭되지 않고 계속 반환된다.

context를 이용해서 데이터 없는 시그널을 보낼 수도 있고, 많은 표준 라이브러리들이 context를 이용해서 시그널을 보내고 있다. 실제 context 패키지는 Unbuffered 채널과 close를 이용해서 데이터 없는 시그널을 전송한다.

시나리오

이상 채널과 시그널에 대해서 살펴봤는데, 이 특성을 이해하기 위해서 시나리오를 선정하고 이를 코드로 구현해보려 한다.

Signal With Data - Guarantee - Unbuffered 채널

Wait For Task
당신은 신입사원에게에게 작업을 맡기려는 관리자다. 신입사원인만큼 당신은 직접 신입사원에게 업무를 지시해야 한다. 신입사원이 업무내용이 담긴 종이를 받으면 업무를 시작한다.
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	wait := sync.WaitGroup{}
	ch := make(chan string)

	wait.Add(1)
	go func() {
		fmt.Println("신입사원 >> 일을 기다리는 중")
		p := <-ch
		fmt.Println("신입 사원 >> 내가 할 일은", p)
		time.Sleep(time.Second * 2)
		fmt.Println("Complete job..")
		wait.Done()
	}()
	fmt.Println("매니저 >> 어떤 일을 맡길지 생각 중...")
	time.Sleep(time.Second * 2)
	fmt.Println("매니저 >> 바닥청소를 맡기자")
	ch <- "바닥청소"
	wait.Wait()
}
		

11 줄에서 Unbuffered 채널을 만들었다. 고루틴에서 신입사원은 작업을 기다린다. 앞서 반든 채널 ch에서 시그널이 오면, 작업내용(string)을 읽어서 작업을 수행한다. 작업 수행에 2초가 걸린다.

main 함수에서 매니저는 어떤 일을 맡길지 2초 정도 고민한 후 "바닥청소"를 맡긴다. 결과적으로 신입사원은 2초를 기다린뒤 업무를 받게 된다.

Wait For Result
신입사원도 언젠가는 중고사원이 된다. 신입사원은 중고사원이 됐고 이제 스스로 일을 찾아서 할 수 있게 됐다. 이제 당신은 사원에게 더 이상 어떤 일을 하라고 일일이 명령을 내릴 필요가 없게 됐다. 대신 사원은 자신이 할 일을 당신에게 보고를 한다.
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wait := sync.WaitGroup{}
    ch := make(chan string)

    wait.Add(1)
    go func() {
        fmt.Println("신입사원 >> 오늘 할 일 계획 세우는 중...")
        time.Sleep(time.Second * 2)
        ch <- "바닥 청소를 하겠습니다"
        wait.Done()
    }()
    fmt.Println("매니저 >> 업무 보고를 기다리는 중...")
    job := <-ch
    fmt.Println("신입사원 Job >> ", job)
    wait.Wait()
}

		
11 줄에서 string 타입의 Unbuffered 채널을 만들었다. 14 줄의 고루틴이 신입사원 루틴으로 2초 동안 어떤 일을 할지 고민한 다음 바닥 청소를 하겠다는 시그널을 전송한다. 매니저는 21 줄에서 신입사원의 시그널을 기다린다.

Unbuffered 채널은 시그널의 전송을 보장한다. 훌륭하긴 하지만 대기시간을 알 수 없다는 문제가 있다. 이 시나리오에서 매니저는 도달하지 않는 시그널을 무한정 기다릴 수도 있다. 그러므로 개발자는 time.After등을 이용해서 일정시간 동안 시그널이 없을 때 타임아웃 시그널을 발생하는 등의 코드를 넣어야 한다.
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)

    go func() {
        fmt.Println("신입사원 >> 오늘 할 일 계획 세우는 중...")
        time.Sleep(time.Second * 100)
        ch <- "바닥 청소를 하겠습니다"
    }()
    fmt.Println("매니저 >> 업무 보고를 기다리는 중...")
    select {
    case job := <-ch:
        fmt.Println("신입사원 Job >> ", job)
    case <-time.After(3 * time.Second):
        fmt.Println("타임아웃. 아직 출근을 안했나")
    }
}

Signal With Data - No Guarantee - Buffered Channels > 1

전송한 시그널을 수신했다는 것을 알 필요가 없을 때, 사용 할 수 있다. Fan Out과 Drop 가 대표적인 시나리오다. Fan out은 메시지 지향 미들웨어 소프트웨어에서 정보 교환을 위해서 사용하는 메시징 패턴이다. 메시징을 전달하는 프로세스를 중지하지 않고, 여러 대상으로 메시지를 전달(분산)할 수 있는 패턴이다.

버퍼링된 채널은 데이터를 저장하기 위한 공간을 가지고 있다. 때문에 얼마나 많은 공간을 만들지를 결정해야 한다. 이러한 질문에 답을 할 준비가 돼 있지 않다면, 1보다 더 큰 Buffered 채널을 사용하지 않는게 나을 수 있다.

시나리오 1 - Fan Out
 Fan out Pattern

Fan Out 패턴을 사용하면, 하나 이상의 워커(worker)에 작업을 던질 수 있다. 각 워커는 한번에 하나의 작업만을 처리 할 수 있기 때문에, 얼마나 많은 작업을 처리 할지를 예상 할 수 있다. 개발자는 채널의 크기로 작업의 규모를 설정 할 수 있다. 이 패턴은 워커의 작업결과를 받아볼 필요가 없다는 이점이 있다.

다시 매니저라고 상상해보자. 이번에는 여러명의 직원을 고용해서 팀단위로 업무를 수행 할 수 있다. 이 직원들은 업무를 수행하고 그 결과를 매니저에게 보고한다.
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    emps := 20
    ch := make(chan string, emps)

    for e := 0; e < emps; e++ {
        go func(id int) {
            time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            ch <- fmt.Sprintf("직원(%d) : 작업시작", id)
        }(e)
    }

    for emps > 0 {
        p := <-ch
        fmt.Println(p)
        emps--
    }
}
		
11 줄에서 20의 크기를 가지는 string 타입의 채널을 만들었다. 이는 20개의 시그널을 저장할 수 있는 버퍼(작업공간)를 만들었음을 의미한다.

작업 준비를 마친 종업원은 어떤 업무를 시작했다는 것을 채널로 전송한다. 관리자는 채널에서 데이터를 읽어서 종업원의 전체 업무 상태를 관리 할 수 있다.

시나리오 2 - Drop
당신이 고용한 작업자가 있다. 그리고 이들 작업자가 처리해야 할 한 무더기의 일이 있다. 이 작업자들은 일을 끝내면, 다음 일을 찾아서 진행을 한다. 모든 일이 완료되면 작업은 끝난다. 이때 관리자는 각 작업자가 업무 하나를 끝냈는지의 여부는 중요하지 않다. 관리자에게 중요한 것은 일의 처리 속도다. 일을 처리 할 여유가 없다면 일을 줄이거나 작업자를 늘려주면 되고, 일을 생각보다 빨리 처리해서 작업자들이 놀고 있다면 더 많은 작업을 할당 하면 된다(혹은 작업자를 줄이거나)
package main

import (
    "fmt"
)
    
func main() {
    const cap = 5
    ch := make(chan string, cap)

    go func() {
        for p := range ch {
            fmt.Println("작업자 : 작업 :", p)
        }
    }()

    const work = 20
    i := 0
    for w := 0; w < work; w++ {
        i++
        select {
        case ch <- fmt.Sprintf("작업 %d", i):
            fmt.Println("관리자 : 작업 추가")
        default:
            fmt.Println("관리자 : 작업량 초과")
        }
    }

    close(ch)
}   
코드를 실행하면 drop 되는 작업들이 생길 것이다. 아래와 같이 잠시 쉬는 걸로 drop 작업을 줄일 수 있다.
        default:
            fmt.Println("관리자 : 작업량 초과")
            time.Sleep(time.Millisecond * 2)
Buffered가 1보다 큰 채널은 전송한 신호를 반드시 수신한다는 보장이 없다. 하지만 두 개의 고루틴 간에 통신지연을 없앨 수 있다. Fan Out 시나리오에서는 작업 처리자에 대한 버퍼를 만들 수 있다. 관리자는 작업자가 작업을 끝나는 걸 기다릴 필요 없이 동시에 업무를 수행 하도록 할 수 있다. Drop 시나리오에서는 작업을 (Drop이 있지만)중단 없이 업무를 수행 할 수 있다.

어떤 작업은 완전한 전송 보장 보다, 대기 시간의 감소(이는 결국 업무 처리 속도를 높일 수 있음을 의미한다)가 중요 할 수 있다.

Signal With Data - Delayed Guarantee - Buffered Channel 1

새로운 신호를 보내기 전에 이전 신호가 처리됐는지를 알아야 할 때 사용 할 수 있는 방법이다.
시나리오 1 - Wait For Tasks
새로운 직원들이 들어왔다. 이 직원들은 하나 이상의 업무를 처리해야 한다. 당신은 직원들에게 작업을 차례로 할당해야 한다. 하지만 새로운 작업을 시작하려면 이전에 하고 있던 작업을 완료해야 한다. 한번에 하나의 작업만 수행 할 수 있으므로, 작업을 전환 할 때 지연이 발생 할 수 있다. 직원이 다음일을 수행 할 것이라는 것을 보장하려면 어떻게 해야 할까 ? 아래와 같이 크기가 1인 작업대를 만들면 될 것이다. 아래 그림을 보자.

 버퍼크기가 1인 채널

3명의 작업자가 모두 작업을 하고 있다. 버퍼에는 작업이 하나가 있는데, 이는 작업을 할 수 있는(놀고 있는) 작업자가 한명도 없다는 의미다. 따라서 관리자는 대기해야 한다. 작업자중 한명이 작업을 끝내면 버퍼에 있는 작업을 가져갈테고, 관리자는 작업을 밀어 넣을 수 있을 거다.
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func waitForTasks() {
    ch := make(chan string, 1)

    for i := 0; i < 3; i++ {
        go func(id int) {
            for p := range ch {
                fmt.Printf("작업 수행 ( %d ) : %s\n", id, p)
                time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
            }
        }(i)
    }

    const work = 10
    i := 0
    for w := 0; w < work; w++ {
        i++
        ch <- fmt.Sprintf("작업 %d", i)
    }
    time.Sleep(time.Second * 2)
}

func main() {
    waitForTasks()
}

		

Signal Without Data - Context

마지막 시나리오에서는 context 패키지를 사용해서 실행중인 고루틴을 취소하는 방법을 보여준다.

매니저는 직원을 고용해서 일을 마쳤다. 매니저는 직원이 일을 마치지 못했다고 해서 무한정 기다릴 생각이 없다. 업무를 중단하고 리뷰를 해서 일을 마치지 못한 이유를 확인해야 한다.
package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
)

func waitForTasks() {
    duration := 80 * time.Millisecond
    ctx, cancel := context.WithTimeout(context.Background(), duration)
    defer cancel()

    ch := make(chan string, 1)
    rand.Seed(time.Now().UTC().UnixNano())
    go func() {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        ch <- "paper"
    }()

    select {
    case p := <-ch:
        fmt.Println("work complete", p)
    case <-ctx.Done():
        fmt.Println("moving on")
    }
}

func main() {
    waitForTasks()
}
context.WithTimeout 을 이용해서 80밀리세컨트 이내에 작업을 끝내지 못하면, 작업을 중단하도록 설정했다. 고루틴은 랜덤하게 0~100 밀리세컨드를 쉬기 때문에 20%의 확률로 시간내에 작업을 완료하지 못할 것이다. 그러면 context는 데이터 없는 시그널(Signal without Data)을 발생 할 것이다. select 문에서 이 시그널을 잡은 다음 프로세스를 종료하고, cancle() 함수가 실행 된다.

정리

시그널의 전송이 보장될지를 판단하기 위해서는 채널과 동시성 그리고 서비스에 대한 이해가 필요하다. 이러한 이해는 동시성 프로그램과 관련 알고리즘을 최적화 하는데 도움을 줄 것이다. 또한 잠재적인 버그를 해결하고 기분 나쁜 느낌이 드는 나쁜 코드를 제거하는데도 도움을 줄 것이다.

이 글에서는 여러 시나리오들을 예로 어떤 시그널을 사용하는게 좋을지를 살펴봤다. 비록 몇 가지 패턴만 살펴봤으며, 모든 패턴과 규칙에는 예외가 있기 마련이지만 기초를 다지는데 도움이 될 것이다. 지금까지의 내용을 요약했다. 채널을 이용해서 고루틴을 조율하고 조정 할 수 있다. Unbuffered channels Buffered channels 채널 닫기 nil 채널