goroutine和channel
在golang中,使用goroutine创建轻量级线程(协程),用来独立执行任务,实现并发操作。使用goroutine和channel实现编程是go的一种特有的并发模式,叫做CSP(Communicating Sequential Process)模式。实现并发的关键点是处理多任务之间的通信,在OS中我们知道,例如:管道、共享内存、消息队列等都是常见的进程通信方法。CSP模式中,使用管道channel完成不同goroutine之间的通信。
注意点
- 使用go routine的地方往往需要保持程序持续运行,写代码时容易忘记
for
的使用 - 关于父子协程(
main
除外)没有直接的控制关系,也就是说父协程kill掉后,子协程依旧可以继续执行;但是main
停到后所有的子协程都会被kill掉 - golang在
select-case
中,若没有default
同时没有一个case
可以执行,那么该select
将会阻塞,直到有某一个case
执行
并发编程
通常来讲,golang并发编程三要素有:
- 生成器
- 服务/任务
此外,还要有合适的接收多任务结果的方法。
package main
import (
"fmt"
"math/rand"
"time"
)
// 生成器,用来创建独立运行的goroutine
func gen(name string) chan string {
c := make(chan string)
go func() {
// 任务name:
// 使用管道 c 做一些任务,并返回 c 以便其他任务使用
// 对于并发执行的任务,会有异步性和不确定性
// 每一个任务产生结果的时间是不同的
for {
}
}()
return c
}
// 第1种接收多任务结果的方法:
// 该方法适用于不知道需要多少任务的情况
// 中心思想:
// 1)创建一个公用管道,用于接收所有任务的数据
// 2)为每一个任务都生成一个 goroutine ,每个goroutine只负责收集自己的结果,放入公用管道中
// 3)公用管道中有所有任务的处理结果,将公用管道返回,拿到所有任务的结果
func fanIn(chs ...chan string) chan string {
c := make(chan string)
for _, ch := range chs {
go func(in chan string) {
for {
c <- <-in
}
}(ch) // 由于golang值传递的特性,避免丢失参数,必须将拷贝值传入
}
return c
}
// 第2中接收多任务结果的方法:
// 适用于知道多少个任务的情况
// 中心思想:
// 1)创建一个公用管道,用于接收所有任务的数据
// 2)只用一个 goroutine ,通过 for + select 选择已经准备好数据的任务
// 3)由于任务并发执行的异步性、不确定性,for每次只会选择一个任务去接收数据
// 4)公用管道中有所有任务的处理结果,将公用管道返回,拿到所有任务的结果
func fanInBySelect(c1, c2 chan string) chan string {
c := make(chan string)
go func() {
for {
select {
case m := <-c1:
c <- m
case m := <-c2:
c <- m
}
}
}()
return c
}
func main() {
// 创建两个任务,分别用1个channel进行通信
m1, m2 := gen("m1"), gen("m2")
//c := fanIn(m1, m2)
c := fanInBySelect(m1, m2)
fmt.Println(<-c)
}
并发编程-Demo
package main
import (
"fmt"
"math/rand"
"time"
)
/*
- 任务生成器
- 接收任务名字
- 返回一个channel
*/
func WorkerGen(name string) chan string {
workChan := make(chan string)
// 启一个 go routine 不定期发送消息
go func() {
for {
// 随机睡眠 1ms - 100ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
message := name + " send a message."
/*
- 当message没有被取出时,该 goroutine会被阻塞
- 但是不影响 父 协程执行
*/
workChan <- message
}
}()
return workChan
}
/*
- 方法一
- 适用于不知道并发任务数量的情况
- 通过便利来处理管道
*/
func WorkerDealUnknownNum(workers ...chan string) {
// 创建公用管道
commenChan := make(chan string)
for _, workChan := range workers {
/*
- 这里对每个 workchan 都开一个 goroutine
- 防止某个 worker 被阻塞,导致整个程序都不能正常执行,避免死锁
*/
go func(inChan chan string) {
for { // 需要不停的放数据
msg := <-inChan
commenChan <- msg
}
}(workChan) // 必须显示的传入所调用的任务的工作管道
}
// 再开一个 goroutine 保持从 commonChannel 中拿数据
go func() {
for { // 需要不停的拿数据
tempMsg := <-commenChan
fmt.Println(tempMsg)
}
}()
}
/*
- 方法二
- 适用于已知任务数量的情况
- 使用 select 对任务进行选择
- 这里假设三个
*/
func WorkerDealKnownNum(worker0, worker1, worker2 chan string) {
// 创建公用管道
commonchan := make(chan string)
// 起一个 goroutine 保持对worker的判断
go func() {
for {
// 每次循环只拿一个管道的信息
select {
case msg := <- worker0:
commonchan <- msg
case msg := <- worker1:
commonchan <- msg
case msg := <- worker2:
commonchan <- msg
}
}
}()
// 起一个 goroutine 保持输出
go func() {
for {
msg := <- commonchan
fmt.Println(msg)
}
}()
}
func main() {
worker0 := WorkerGen("work0")
worker1 := WorkerGen("work1")
worker2 := WorkerGen("work2")
// WorkerDealUnknownNum(worker0, worker1, worker2)
WorkerDealKnownNum(worker0, worker1, worker2)
// 停留 5s 看输出
time.Sleep(time.Second * 5)
}
任务控制
常用的并发任务控制有这几种方法:
- 非阻塞等待
- 超时机制
- 任务中断退出
- 使用
sync.WaitGroup
非阻塞等待
使用case-default
实现,如果此时没有准备好数据,就走default
做别的任务。
func nonBlockingWait(c chan string) (string, bool) {
select {
case m := <-c:
return m, true
default:
return "", false
}
}
超时机制
给定一个timeout
,在select
中,若规定的timeout
内没有消息,则time.After
会向管道发送数据,表示规定时间已到,时间超过,就去做别的事情。
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
select {
case m := <-c:
return m, true
case <-time.After(timeout):
return "", false
}
}
任务中断退出
package main
import (
"fmt"
"math/rand"
"time"
)
// 使用channel,让主进程和goroutine之间通知任务结束
func msgGen(name string, done chan struct{}) chan string {
c := make(chan string)
go func() {
i := 0
for {
select {
case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
c <- fmt.Sprintf("service %s: message %d", name, i)
case <-done: // goroutine 中得到通知,任务结束
fmt.Println("cleaning up")
time.Sleep(2*time.Second)
fmt.Println("cleanup done")
done<- struct{}{} // 清理任务后,具备结束条件,告知主进程,然后结束
return
}
i++
}
}()
return c
}
func main() {
done := make(chan struct{})
m1 := msgGen("server1", done)
for i := 0; i < 5; i++ {
if m, ok := timeoutWait(m1, time.Duration(rand.Intn(2000))*time.Millisecond); ok {
fmt.Println(m)
} else {
// fmt.Println("no message from service2")
fmt.Println("service1 timeout")
}
}
done <- struct{}{} // 主进程向 done 管道通知,结束
<-done // 收到 goroutine 结束的消息,然后结束
time.Sleep(time.Second)
}
sync.WaitGroup
// 使用背景:combine 需要等 4 个任务都完成之后 才可以处理返回值
// 使用 sync.WaitGroup 来等待任务完成
func combine(r image.Rectangle,
c1, c2, c3, c4 <-chan image.Image) <-chan string {
c := make(chan string)
go func() {
// 使用 waitgroup 是因为:
// 开了 goroutine 后 goroutine 的生命周期 就不受这个函数的影响了
// 如果 goroutine 还没有执行完这个函数就结束了
// 这个函数。自然拿不到 goroutine 的结果,也就无法完成任务
var wg sync.WaitGroup
newImage := image.NewNRGBA(r)
// 复制块
copy := func(dst draw.Image, r image.Rectangle,
src image.Image, sp image.Point, index string) {
draw.Draw(dst, r, src, sp, draw.Src)
// 记得释放一个 mutex
wg.Done() // 匿名函数 使用外面的 变量
}
wg.Add(4)
var s1, s2, s3, s4 image.Image
var ok1, ok2, ok3, ok4 bool
// 使用循环做,保证 4 个任务都完成
for {
select {
case s1, ok1 = <-c1:
go copy(newImage, s1.Bounds(), s1,
image.Point{r.Min.X, r.Min.Y}, "1")
case s2, ok2 = <-c2:
go copy(newImage, s2.Bounds(), s2,
image.Point{r.Max.X / 2, r.Min.Y}, "2")
case s3, ok3 = <-c3:
go copy(newImage, s3.Bounds(), s3,
image.Point{r.Min.X, r.Max.Y / 2}, "3")
case s4, ok4 = <-c4:
go copy(newImage, s4.Bounds(), s4,
image.Point{r.Max.X / 2, r.Max.Y / 2}, "4")
}
// 结束条件
if ok1 && ok2 && ok3 && ok4 {
break
}
}
// 先阻塞 保证goroutine运行完
wg.Wait()
buf2 := new(bytes.Buffer)
jpeg.Encode(buf2, newImage, nil)
c <- base64.StdEncoding.EncodeToString(buf2.Bytes())
}()
return c
}
任务控制-Demo
以:超时机制、任务中断退出两种情况为例
package main
import (
"fmt"
"math/rand"
"strings"
"time"
)
/**
* msgGen
* @Description: 消息生成器
* @param name: 用来标识发送消息的实体
* @param done: 传递从调度程序来的结束消息
* @return chan: 将自己的消息通过chan传递出去
**/
func msgGen(name string, done chan struct{}) chan string {
c := make(chan string)
go func() {
i := 0
for {
select {
// time.After函数在到达规定时间后,返回一个 <-chan 类型的管道
case <-time.After(time.Duration(rand.Intn(3000)) * time.Millisecond):
// 随机的发送消息
c <- fmt.Sprintf("service %s: message %d", name, i)
// 如果结束通知到来,结束,并通知主协程 自己已经结束了
case <-done:
fmt.Println("cleaning up")
time.Sleep(2 * time.Second)
fmt.Println("cleanup done")
done <- struct{}{}
return
}
i++
}
}()
return c
}
/**
* msgGenTimeout
* @Description: 该消息生成器一旦生成消息超时,自动done掉
* @param name: 服务名标识
* @return chan: 通过管道传出消息
**/
func msgGenTimeout(name string) chan string {
c := make(chan string)
go func() {
i := 0
for { // 如果没有触发超时,就一直发送数据
select {
// 每轮消息 10ms 内随机发送
case <-time.After(time.Duration(rand.Intn(10)) * time.Millisecond):
c <- fmt.Sprintf("service %s: send message %d", name, i)
// 超时计数在每轮的 100ms 内随机超时
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
c <- fmt.Sprintf("%s timeout", name)
return // 超时自动退出
}
i++
}
}()
return c
}
/**
* timeoutWait
* @Description: 模拟超时
* @param c: 查看管道 c 中是否有消息
* @param timeout: 超时时间
* @return string: 返回消息或者空
* @return bool: 超时标志
**/
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
select {
case m := <-c:
return m, true
case <-time.After(timeout):
return "", false
}
}
/**
* timeoutMechanism
* @Description: 并发任务控制 -- 超时自动退出
* 假设有n=3个协程,如果其中某个协程规定时间内没有发出消息,则退出并通知主协程
**/
func timeoutMechanism() {
m1 := msgGenTimeout("service1")
m2 := msgGenTimeout("service2")
m3 := msgGenTimeout("service3")
i := 0 // 用来计数有几个协程timeout
for {
if i == 3 {
break
}
select {
case msg := <-m1:
if strings.Contains(msg, "timeout") {
// 超时消息
fmt.Println(msg,"===================")
i++
} else {
fmt.Println(msg)
}
case msg := <-m2:
if strings.Contains(msg, "timeout") {
fmt.Println(msg,"===================")
i++
} else {
fmt.Println(msg)
}
case msg := <-m3:
if strings.Contains(msg, "timeout") {
fmt.Println(msg,"===================")
i++
} else {
fmt.Println(msg)
}
}
}
}
/**
* notifyToExit
* @Description: 并发任务控制 -- 主进程通过 channel 通知子协程退出
**/
func notifyToExit() {
done := make(chan struct{})
m1 := msgGen("server1", done)
for i := 0; i < 5; i++ {
if m, ok := timeoutWait(m1, time.Duration(rand.Intn(2000))*time.Millisecond); ok {
// 如果 msgGen 有消息产生,就打印出来
// 每轮消息在 5000ms 内随机产生 --- 每轮超时的时间在 2000ms 内
fmt.Println(m)
} else {
// fmt.Println("no message from service2")
fmt.Println("service1 timeout")
}
}
// 主进程通知子协程中断退出
done <- struct{}{}
// 子协程退出后通知父协程
<-done
time.Sleep(time.Second)
}
func main() {
//timeoutMechanism() // 超时自动退出
notifyToExit() // 主进程通知结束
}