Несколько goroutines прослушивания на одном канале
у меня есть несколько goroutines пытается получить на том же канале одновременно. Похоже, что последний goroutine, который начинает получать на канале, получает значение. Это где-то в спецификации языка или это неопределенное поведение?
c := make(chan string)
for i := 0; i < 5; i++ {
go func(i int) {
<-c
c <- fmt.Sprintf("goroutine %d", i)
}(i)
}
c <- "hi"
fmt.Println(<-c)
выход:
goroutine 4
EDIT:
Я просто понял, что это сложнее, чем я думал. Сообщение передается по все горутины.
c := make(chan string)
for i := 0; i < 5; i++ {
go func(i int) {
msg := <-c
c <- fmt.Sprintf("%s, hi from %d", msg, i)
}(i)
}
c <- "original"
fmt.Println(<-c)
выход:
original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4
5 ответов:
Да, это сложно, но есть несколько эмпирических правил, которые должны сделать вещи гораздо более простыми.
- предпочитаю использовать формальные аргументы для каналов вы переходите к go-подпрограммам вместо доступа к каналам в глобальной области. Вы можете получить больше проверки компилятора таким образом, и лучше модульность тоже.
- избегайте чтения и записи на одном и том же канале в определенной процедуре go-routine (включая основной один.) В противном случае, тупик является гораздо большим риском.
вот альтернативная версия вашей программы, применяя эти два руководства. Этот случай демонстрирует много писателей и одного читателя на канале:
c := make(chan string) for i := 1; i <= 5; i++ { go func(i int, co chan<- string) { for j := 1; j <= 5; j++ { co <- fmt.Sprintf("hi from %d.%d", i, j) } }(i, c) } for i := 1; i <= 25; i++ { fmt.Println(<-c) }http://play.golang.org/p/quQn7xePLw
Он создает пять go-подпрограмм, записывающих в один канал, каждый из которых записывает пять раз. Основная go-рутина читает все двадцать пять сообщений - вы можете заметить, что в порядке их следования появляются в часто не последовательные (т. е. параллелизм очевиден).
этот пример демонстрирует особенность каналов Go: возможно иметь несколько писателей, совместно использующих один канал; Go будет перемежать сообщения автоматически.
то же самое относится к одному писателю и нескольким читателям на одном канале, как показано во втором примере здесь:
c := make(chan int) var w sync.WaitGroup w.Add(5) for i := 1; i <= 5; i++ { go func(i int, ci <-chan int) { j := 1 for v := range ci { time.Sleep(time.Millisecond) fmt.Printf("%d.%d got %d\n", i, j, v) j += 1 } w.Done() }(i, c) } for i := 1; i <= 25; i++ { c <- i } close(c) w.Wait()этой второй пример включает в себя ожидание, наложенное на основной goroutine, который будет в противном случае выходите быстро и заставьте другие пять goroutines быть прекращенными рано (спасибо Олов для этого исправления).
в обоих примерах буферизация не требовалась. Как правило, хороший принцип заключается в том, чтобы рассматривать буферизацию только как усилитель производительности. Если ваша программа не блокируется без буферы, это не будет тупик С буферы либо (но наоборот не всегда правда). С тем еще одно эмпирическое правило, Начните без буферизации, а затем добавьте его позже по мере необходимости.
поздний ответ, но я надеюсь, что это поможет другим в будущем, как длинный опрос, "глобальная" кнопка, трансляция для всех?
Эффективный Go объясняет выпуска:
приемники всегда блокируются, пока нет данных для приема.
это означает, что вы не можете иметь более 1 goroutine прослушивания 1 канала и ожидать, что все goroutines получить то же значение.
запустить код Пример.
package main import "fmt" func main() { c := make(chan int) for i := 1; i <= 5; i++ { go func(i int) { for v := range c { fmt.Printf("count %d from goroutine #%d\n", v, i) } }(i) } for i := 1; i <= 25; i++ { c<-i } close(c) }вы не увидите "счет 1" более одного раза, хотя есть 5 goroutines прослушивания канала. Это связано с тем, что когда первый goroutine блокирует канал, все остальные goroutines должны ждать в очереди. Когда канал разблокирован, счетчик уже был получен и удален из канала, поэтому следующий goroutine в строке получает следующее значение счетчика.
Это сложно.
кроме того, посмотрите, что происходит с
GOMAXPROCS = NumCPU+1. Например,package main import ( "fmt" "runtime" ) func main() { runtime.GOMAXPROCS(runtime.NumCPU() + 1) fmt.Print(runtime.GOMAXPROCS(0)) c := make(chan string) for i := 0; i < 5; i++ { go func(i int) { msg := <-c c <- fmt.Sprintf("%s, hi from %d", msg, i) }(i) } c <- ", original" fmt.Println(<-c) }выход:
5, original, hi from 0, hi from 4и, посмотрите, что происходит с буферизацией каналов. Например,
package main import "fmt" func main() { c := make(chan string, 5+1) for i := 0; i < 5; i++ { go func(i int) { msg := <-c c <- fmt.Sprintf("%s, hi from %d", msg, i) }(i) } c <- "original" fmt.Println(<-c) }выход:
originalвы должны быть в состоянии объяснить эти случаи.
Я изучил существующие решения и создал простую широковещательную библиотеку https://github.com/grafov/bcast.
group := bcast.NewGroup() // you created the broadcast group go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members member := group.Join() // then you join member(s) from other goroutine(s) member.Send("test message") // or send messages of any type to the group member1 := group.Join() // then you join member(s) from other goroutine(s) val := member1.Recv() // and for example listen for messages
для нескольких goroutine слушать на одном канале, да, это возможно. ключевым моментом является само сообщение, вы можете определить какое-то сообщение следующим образом:
package main import ( "fmt" "sync" ) type obj struct { msg string receiver int } func main() { ch := make(chan *obj) // both block or non-block are ok var wg sync.WaitGroup receiver := 25 // specify receiver count sender := func() { o := &obj { msg: "hello everyone!", receiver: receiver, } ch <- o } recv := func(idx int) { defer wg.Done() o := <-ch fmt.Printf("%d received at %d\n", idx, o.receiver) o.receiver-- if o.receiver > 0 { ch <- o // forward to others } else { fmt.Printf("last receiver: %d\n", idx) } } go sender() for i:=0; i<reciever; i++ { wg.Add(1) go recv(i) } wg.Wait() }выход случайный:
5 received at 25 24 received at 24 6 received at 23 7 received at 22 8 received at 21 9 received at 20 10 received at 19 11 received at 18 12 received at 17 13 received at 16 14 received at 15 15 received at 14 16 received at 13 17 received at 12 18 received at 11 19 received at 10 20 received at 9 21 received at 8 22 received at 7 23 received at 6 2 received at 5 0 received at 4 1 received at 3 3 received at 2 4 received at 1 last receiver 4
Comments