单向Channel

可以将 channel 隐式转换为单向队列,只收或只发。

c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
send <- 1
// <-send // Error: receive from send-only type chan<- int
<-recv
// recv <- 2 // Error: send to receive-only type <-chan int

不能将单向 channel 转换为普通 channel

d := (chan int)(send) // Error: cannot convert type chan<- int to type chan int
d := (chan int)(recv) // Error: cannot convert type <-chan int to type chan int

选择Channel

如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。

package main

import (
	"fmt"
	"os"
)

func main() {
	a, b := make(chan int, 3), make(chan int)

	go func() {
		v, ok, s := 0, false, ""

		for {
			select { // 随机选择可用 channel,接收数据。
			case v, ok = <-a:
				s = "a"
			case v, ok = <-b:
				s = "b"
			}

			if ok {
				fmt.Println(s, v)
			} else {
				os.Exit(0) // 退出程序
			}
		}
	}()

	for i := 0; i < 5; i++ {
		select { // 随机选择可用 channel,发送数据。
		case a <- i:
		case b <- i:
		}
	}

	close(a) // 关闭channel防止死锁
	close(b)
	select {} // 没有可用 channel,阻塞 main goroutine。
}

应用场景

并发任务打包

用简单工厂模式打包并发任务和 channel

package main

import (
	"math/rand"
	"time"
)

func NewTest() chan int {
	c := make(chan int)
	rand.Seed(time.Now().UnixNano())

	go func() {
		time.Sleep(time.Second)
		c <- rand.Int()
	}()

	return c
}

func main() {
	for i:=0;i<10;i++ {
		println(<-NewTest()) // 打包使用并发,不同循环间的线程不会相互干扰
	} // 等待 goroutine 结束返回。
}

使用channel实现信号量

也就是上一章中说到的使用channel做同步锁

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(3)

	sem := make(chan int, 1)

	for i := 0; i < 3; i++ {
		go func(id int) {
			defer wg.Done()

			sem <- 1 // 向 sem 发送数据,阻塞或者成功。

			for x := 0; x < 3; x++ {
				fmt.Println(id, x)
			}

			<-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。
		}(i)
	}

	wg.Wait()
}

使用 closed channel 发出退出通知

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	quit := make(chan bool)

	for i := 0; i < 2; i++ {
		wg.Add(1)

		go func(id int) {
			defer wg.Done()

			for {
				select {
				case <-quit: // closed channel 不会阻塞,因此可用作退出通知。
					return
				default: // 执行正常任务。
					task(id)
				}
			}
		}(i)
	}

	time.Sleep(time.Second * 5) // 让测试 goroutine 运行一会。

	close(quit) // 发出退出通知。
	wg.Wait()
}

//执行任务
func task(id int){
	fmt.Println(id, time.Now().Nanosecond())
	time.Sleep(time.Second)
}

timeout模式

为避免一个线程运行时间过长,可以通过 channel 为线程实现 timeout

以计算斐波那契数列为例说明

package main

import (
	"fmt"
	"time"
)

const size int64 = 60 //计算位数

func main() {
	data := make(chan int64)

	go func() {
		//计算斐波那契数列
		record := make([]int64, size)
		var i int64 = 0
		for ; i < size; i++ {
			if i < 2 {
				record[i] = i
			} else {
				record[i] = record[i-1] + record[i-2]
			}
			fmt.Println(i+1, ":", record[i])
			time.Sleep(time.Millisecond * 100)
		}
		data <- record[size-1]
	}()

	select {
	case v := <-data: // 如果算出,输出结果
		fmt.Println("result:", v)
	case <-time.After(time.Second * 5): // 如果超时,停止执行
		fmt.Println("timeout!")
		return
	}
}

将封装和timeout结合起来

package main

import (
	"fmt"
	"sync"
	"time"
)

const round int = 2   //线程数
const size int64 = 60 //计算位数

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(round)

	for i := 1; i <= round; i++  {
		go func(i int) {
			defer wg.Done()
			task(fmt.Sprint("routine", i))
		}(i) // 向多线程传值务必使用这种方式,而不要直接在线程函数体内直接使用外部的变量!!(channel 除外)
	}

	wg.Wait()
}

// 封装
func task(name string) {
	data := make(chan int64)

	go func() {
		//计算斐波那契数列
		record := make([]int64, size)
		var i int64 = 0
		for ; i < size; i++ {
			if i < 2 {
				record[i] = i
			} else {
				record[i] = record[i-1] + record[i-2]
			}
			fmt.Println(name, "---", i+1, ":", record[i])
			time.Sleep(time.Millisecond * 100)
		}
		data <- record[size-1]
	}()

	select {
	case v := <-data: // 如果算出,输出结果
		fmt.Println(name, "result:", v)
	case <-time.After(time.Second * 5): // 如果超时,停止执行
		fmt.Println(name, "timeout!")
	}
}


封装+timeout+closed channel

其实上面的代码是有问题的

当你在 main 中添加一个等待

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(round)

	for i := 1; i <= round; i++ {
		go func(i int) {
			defer wg.Done()
			task(fmt.Sprint("routine", i))
		}(i)
	}

	wg.Wait()
    // 添加等待
	time.Sleep(time.Second * 10)
}

你会发现一个有趣的现象,那就是 其实程序还在计算斐波那契数列,这是因为 负责计算斐波那契的线程并没有被停掉,添加等待前没有问题仅仅是因为 main 结束了

所以,还应该使用上面的 closed channel 发出退出通知

package main

import (
	"fmt"
	"sync"
	"time"
)

const round int = 2                 //线程数
const size int64 = 80               //计算位数
const timeout time.Duration = 1     //超时时间
const sleepTime time.Duration = 100 //休眠时间

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(round)

	for i := 1; i <= round; i++ {
		go func(i int) {
			defer wg.Done()
			task(fmt.Sprint("routine", i))
		}(i)
	}

	wg.Wait()
	time.Sleep(time.Second * (timeout+1))
}

func task(name string) {
	data := make(chan int64)

	go func(name string) {
		//计算斐波那契数列
		record := make([]int64, size)
		var i int64 = 0
		for ; i < size; i++ {
            // 判断是否有关闭通知
			select {
			case <-data: //收到关闭通知
				fmt.Println(name, "收到关闭通知,进程关闭")
				return
			default:
				if i < 2 {
					record[i] = i
				} else {
					record[i] = record[i-1] + record[i-2]
				}
				fmt.Println(name, "---", i+1, ":", record[i])
				time.Sleep(time.Millisecond * sleepTime)
			}
		}
		data <- record[size-1]
		defer close(data) //发出关闭通知,通常管道由发送方关闭
	}(name)

	select {
	case v := <-data: // 如果算出,输出结果
		fmt.Println(name, "result:", v)
	case <-time.After(time.Second * timeout): // 如果超时,停止执行
		fmt.Println(name, "timeout!")
	}
}

参考

Go边看边练

Q.E.D.


此 生 无 悔 恋 真 白 ,来 世 愿 入 樱 花 庄 。