单向Channel
可以将 channel
隐式转换为单向队列,只收或只发。
c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
send <- 1
// <-send // Error: receive from send-only type chan<- int
<-recv
// recv <- 2 // Error: send to receive-only type <-chan int
不能将单向 channel
转换为普通 channel
d := (chan int)(send) // Error: cannot convert type chan<- int to type chan int
d := (chan int)(recv) // Error: cannot convert type <-chan int to type chan int
选择Channel
如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。
package main
import (
"fmt"
"os"
)
func main() {
a, b := make(chan int, 3), make(chan int)
go func() {
v, ok, s := 0, false, ""
for {
select { // 随机选择可用 channel,接收数据。
case v, ok = <-a:
s = "a"
case v, ok = <-b:
s = "b"
}
if ok {
fmt.Println(s, v)
} else {
os.Exit(0) // 退出程序
}
}
}()
for i := 0; i < 5; i++ {
select { // 随机选择可用 channel,发送数据。
case a <- i:
case b <- i:
}
}
close(a) // 关闭channel防止死锁
close(b)
select {} // 没有可用 channel,阻塞 main goroutine。
}
应用场景
并发任务打包
用简单工厂模式打包并发任务和 channel
package main
import (
"math/rand"
"time"
)
func NewTest() chan int {
c := make(chan int)
rand.Seed(time.Now().UnixNano())
go func() {
time.Sleep(time.Second)
c <- rand.Int()
}()
return c
}
func main() {
for i:=0;i<10;i++ {
println(<-NewTest()) // 打包使用并发,不同循环间的线程不会相互干扰
} // 等待 goroutine 结束返回。
}
使用channel实现信号量
也就是上一章中说到的使用channel做同步锁
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(3)
sem := make(chan int, 1)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
sem <- 1 // 向 sem 发送数据,阻塞或者成功。
for x := 0; x < 3; x++ {
fmt.Println(id, x)
}
<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。
}(i)
}
wg.Wait()
}
使用 closed channel 发出退出通知
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
quit := make(chan bool)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-quit: // closed channel 不会阻塞,因此可用作退出通知。
return
default: // 执行正常任务。
task(id)
}
}
}(i)
}
time.Sleep(time.Second * 5) // 让测试 goroutine 运行一会。
close(quit) // 发出退出通知。
wg.Wait()
}
//执行任务
func task(id int){
fmt.Println(id, time.Now().Nanosecond())
time.Sleep(time.Second)
}
timeout模式
为避免一个线程运行时间过长,可以通过 channel
为线程实现 timeout
以计算斐波那契数列为例说明
package main
import (
"fmt"
"time"
)
const size int64 = 60 //计算位数
func main() {
data := make(chan int64)
go func() {
//计算斐波那契数列
record := make([]int64, size)
var i int64 = 0
for ; i < size; i++ {
if i < 2 {
record[i] = i
} else {
record[i] = record[i-1] + record[i-2]
}
fmt.Println(i+1, ":", record[i])
time.Sleep(time.Millisecond * 100)
}
data <- record[size-1]
}()
select {
case v := <-data: // 如果算出,输出结果
fmt.Println("result:", v)
case <-time.After(time.Second * 5): // 如果超时,停止执行
fmt.Println("timeout!")
return
}
}
将封装和timeout结合起来
package main
import (
"fmt"
"sync"
"time"
)
const round int = 2 //线程数
const size int64 = 60 //计算位数
func main() {
wg := new(sync.WaitGroup)
wg.Add(round)
for i := 1; i <= round; i++ {
go func(i int) {
defer wg.Done()
task(fmt.Sprint("routine", i))
}(i) // 向多线程传值务必使用这种方式,而不要直接在线程函数体内直接使用外部的变量!!(channel 除外)
}
wg.Wait()
}
// 封装
func task(name string) {
data := make(chan int64)
go func() {
//计算斐波那契数列
record := make([]int64, size)
var i int64 = 0
for ; i < size; i++ {
if i < 2 {
record[i] = i
} else {
record[i] = record[i-1] + record[i-2]
}
fmt.Println(name, "---", i+1, ":", record[i])
time.Sleep(time.Millisecond * 100)
}
data <- record[size-1]
}()
select {
case v := <-data: // 如果算出,输出结果
fmt.Println(name, "result:", v)
case <-time.After(time.Second * 5): // 如果超时,停止执行
fmt.Println(name, "timeout!")
}
}
封装+timeout+closed channel
其实上面的代码是有问题的
当你在 main
中添加一个等待
func main() {
wg := new(sync.WaitGroup)
wg.Add(round)
for i := 1; i <= round; i++ {
go func(i int) {
defer wg.Done()
task(fmt.Sprint("routine", i))
}(i)
}
wg.Wait()
// 添加等待
time.Sleep(time.Second * 10)
}
你会发现一个有趣的现象,那就是 其实程序还在计算斐波那契数列,这是因为 负责计算斐波那契的线程并没有被停掉,添加等待前没有问题仅仅是因为 main
结束了
所以,还应该使用上面的 closed channel
发出退出通知
package main
import (
"fmt"
"sync"
"time"
)
const round int = 2 //线程数
const size int64 = 80 //计算位数
const timeout time.Duration = 1 //超时时间
const sleepTime time.Duration = 100 //休眠时间
func main() {
wg := new(sync.WaitGroup)
wg.Add(round)
for i := 1; i <= round; i++ {
go func(i int) {
defer wg.Done()
task(fmt.Sprint("routine", i))
}(i)
}
wg.Wait()
time.Sleep(time.Second * (timeout+1))
}
func task(name string) {
data := make(chan int64)
go func(name string) {
//计算斐波那契数列
record := make([]int64, size)
var i int64 = 0
for ; i < size; i++ {
// 判断是否有关闭通知
select {
case <-data: //收到关闭通知
fmt.Println(name, "收到关闭通知,进程关闭")
return
default:
if i < 2 {
record[i] = i
} else {
record[i] = record[i-1] + record[i-2]
}
fmt.Println(name, "---", i+1, ":", record[i])
time.Sleep(time.Millisecond * sleepTime)
}
}
data <- record[size-1]
defer close(data) //发出关闭通知,通常管道由发送方关闭
}(name)
select {
case v := <-data: // 如果算出,输出结果
fmt.Println(name, "result:", v)
case <-time.After(time.Second * timeout): // 如果超时,停止执行
fmt.Println(name, "timeout!")
}
}
参考
Q.E.D.