// the sender gofunc() { for { if value := rand.Intn(Max); value == 0 { // The only sender can close // the channel safely. close(dataCh) return } else { dataCh <- value } } }()
// receivers for i := 0; i < NumReceivers; i++ { gofunc() { defer wgReceivers.Done()
// Receive values until dataCh is // closed and the value buffer queue // of dataCh becomes empty. for value := range dataCh { log.Println(value) } }() }
funcmain() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1) // ... dataCh := make(chanint, 100) stopCh := make(chanstruct{}) // stopCh is an additional signal channel. // Its sender is the receiver of channel dataCh. // Its reveivers are the senders of channel dataCh. // senders for i := 0; i < NumSenders; i++ { gofunc() { for { value := rand.Intn(MaxRandomNumber) select { case <- stopCh: return case dataCh <- value: } } }() } // the receiver gofunc() { defer wgReceivers.Done() for value := range dataCh { if value == MaxRandomNumber-1 { // the receiver of the dataCh channel is // also the sender of the stopCh cahnnel. // It is safe to close the stop channel here. close(stopCh) return } log.Println(value) } }() // ... wgReceivers.Wait() }
funcmain() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const MaxRandomNumber = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chanint, 100) stopCh := make(chanstruct{}) // stopCh is an additional signal channel. // Its sender is the moderator goroutine shown below. // Its reveivers are all senders and receivers of dataCh. toStop := make(chanstring, 1) // the channel toStop is used to notify the moderator // to close the additional signal channel (stopCh). // Its senders are any senders and receivers of dataCh. // Its reveiver is the moderator goroutine shown below. var stoppedBy string // moderator gofunc() { stoppedBy = <- toStop // part of the trick used to notify the moderator // to close the additional signal channel. close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { gofunc(id string) { for { value := rand.Intn(MaxRandomNumber) if value == 0 { // here, a trick is used to notify the moderator // to close the additional signal channel. select { case toStop <- "sender#" + id: default: } return } // the first select here is to try to exit the // goroutine as early as possible. select { case <- stopCh: return default: } select {