program story

N 채널을 듣는 방법?

inputbox 2020. 8. 11. 08:25
반응형

N 채널을 듣는 방법? (동적 선택 문)


두 개의 고 루틴을 실행하는 무한 루프를 시작하려면 아래 코드를 사용할 수 있습니다.

메시지를받은 후 새로운 고 루틴을 시작하고 영원히 계속됩니다.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

이제 N 고 루틴에 대해 동일한 동작을 원하지만이 경우 select 문은 어떻게 보일까요?

이것은 내가 시작한 코드 비트이지만 select 문을 코딩하는 방법이 혼란 스럽습니다.

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

reflect 패키지 Select함수를 사용하여이를 수행 할 수 있습니다 .

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select는 케이스 목록에 설명 된 선택 작업을 실행합니다. Go select 문과 마찬가지로 케이스 중 하나 이상이 진행될 때까지 차단하고 균일 한 의사 랜덤 선택을 한 다음 해당 케이스를 실행합니다. 선택한 케이스의 인덱스를 반환하고 해당 케이스가 수신 작업 인 경우 수신 된 값과 해당 값이 채널의 전송에 해당하는지 여부를 나타내는 부울을 반환합니다 (채널이 닫혀 있기 때문에 수신 된 0 값이 아님).

SelectCase선택할 채널, 작업 방향 및 전송 작업의 경우 보낼 값을 식별하는 구조체 배열을 전달합니다.

따라서 다음과 같이 할 수 있습니다.

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
# ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

http://play.golang.org/p/8zwvSk4kjx 에서 좀 더 구체화 된 예제로 실험 할 수 있습니다.


공유 "집계"채널로 메시지를 "전달"하는 goroutine에서 각 채널을 래핑하여이를 수행 할 수 있습니다. 예를 들면 :

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

메시지가 시작된 채널을 알아야하는 경우 집계 채널로 전달하기 전에 추가 정보가있는 구조체로 래핑 할 수 있습니다.

내 (제한된) 테스트 에서이 방법은 reflect 패키지를 사용하여 크게 수행됩니다.

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

여기에 벤치 마크 코드


이전 답변에 대한 몇 가지 의견을 확장하고 여기에 더 명확한 비교를 제공하기 위해 동일한 입력, 읽을 채널 조각 및 각 값을 호출하는 함수가 주어 졌을 때 지금까지 제시된 두 접근 방식의 예가 있습니다. 채널 가치의 출처.

접근 방식에는 세 가지 주요 차이점이 있습니다.

  • Complexity. Although it may partially be a reader preference I find the channel approach more idiomatic, straight-forward, and readable.

  • Performance. On my Xeon amd64 system the goroutines+channels out performs the reflect solution by about two orders of magnitude (in general reflection in Go is often slower and should only be used when absolutely required). Of course, if there is any significant delay in either the function processing the results or in the writing of values to the input channels this performance difference can easily become insignificant.

  • Blocking/buffering semantics. The importantance of this depends on the use case. Most often it either won't matter or the slight extra buffering in the goroutine merging solution may be helpful for throughput. However, if it is desirable to have the semantics that only a single writer is unblocked and it's value fully handled before any other writer is unblocked, then that can only be achieved with the reflect solution.

Note, both approaches can be simplified if either the "id" of the sending channel isn't required or if the source channels will never be closed.

Goroutine merging channel:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Reflection select:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Full code on the Go playground.]


Why this approach wouldn't work assuming that somebody is sending events?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}

참고URL : https://stackoverflow.com/questions/19992334/how-to-listen-to-n-channels-dynamic-select-statement

반응형