Go的并发与任务控制

goroutine和channel

在golang中,使用goroutine创建轻量级线程(协程),用来独立执行任务,实现并发操作。使用goroutine和channel实现编程是go的一种特有的并发模式,叫做CSP(Communicating Sequential Process)模式。实现并发的关键点是处理多任务之间的通信,在OS中我们知道,例如:管道、共享内存、消息队列等都是常见的进程通信方法。CSP模式中,使用管道channel完成不同goroutine之间的通信。

注意点

  1. 使用go routine的地方往往需要保持程序持续运行,写代码时容易忘记for的使用
  2. 关于父子协程(main除外)没有直接的控制关系,也就是说父协程kill掉后,子协程依旧可以继续执行;但是main停到后所有的子协程都会被kill掉
  3. golang在select-case中,若没有default同时没有一个case可以执行,那么该select将会阻塞,直到有某一个case执行

并发编程

通常来讲,golang并发编程三要素有:

  • 生成器
  • 服务/任务

此外,还要有合适的接收多任务结果的方法。

package main

import (
	"fmt"
	"math/rand"
	"time"
)
// 生成器,用来创建独立运行的goroutine
func gen(name string) chan string {
	c := make(chan string)
	go func() {
        // 任务name:
		// 使用管道 c 做一些任务,并返回 c 以便其他任务使用
        // 对于并发执行的任务,会有异步性和不确定性
        // 每一个任务产生结果的时间是不同的
        for {
            
        }
	}()
	return c
}
// 第1种接收多任务结果的方法:
// 该方法适用于不知道需要多少任务的情况
// 中心思想:
// 1)创建一个公用管道,用于接收所有任务的数据
// 2)为每一个任务都生成一个 goroutine ,每个goroutine只负责收集自己的结果,放入公用管道中
// 3)公用管道中有所有任务的处理结果,将公用管道返回,拿到所有任务的结果
func fanIn(chs ...chan string) chan string {
	c := make(chan string)
	for _, ch := range chs {
		go func(in chan string) {
			for {
				c <- <-in
			}
		}(ch)	// 由于golang值传递的特性,避免丢失参数,必须将拷贝值传入
	}
	return c
}
// 第2中接收多任务结果的方法:
// 适用于知道多少个任务的情况
// 中心思想:
// 1)创建一个公用管道,用于接收所有任务的数据
// 2)只用一个 goroutine ,通过 for + select 选择已经准备好数据的任务
// 3)由于任务并发执行的异步性、不确定性,for每次只会选择一个任务去接收数据
// 4)公用管道中有所有任务的处理结果,将公用管道返回,拿到所有任务的结果
func fanInBySelect(c1, c2 chan string) chan string {
	c := make(chan string)
	go func() {
		for {
			select {
			case m := <-c1:
				c <- m
			case m := <-c2:
				c <- m
			}
		}
	}()
	return c
}

func main() {
    // 创建两个任务,分别用1个channel进行通信
    m1, m2 := gen("m1"), gen("m2")
    //c := fanIn(m1, m2)
    c := fanInBySelect(m1, m2)
    fmt.Println(<-c)   
}

并发编程-Demo

package main

import (
	"fmt"
	"math/rand"
	"time"
)

/*
- 任务生成器
- 接收任务名字
- 返回一个channel
*/
func WorkerGen(name string) chan string {
	workChan := make(chan string)

	// 启一个 go routine 不定期发送消息
	go func() {
		for {
			// 随机睡眠 1ms - 100ms
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
			message := name + " send a message."
			/*
				- 当message没有被取出时,该 goroutine会被阻塞
				- 但是不影响 父 协程执行
			*/
			workChan <- message
		}
	}()
	return workChan
}

/*
- 方法一
- 适用于不知道并发任务数量的情况
- 通过便利来处理管道
*/
func WorkerDealUnknownNum(workers ...chan string) {
	// 创建公用管道
	commenChan := make(chan string)

	for _, workChan := range workers {
		/*
			- 这里对每个 workchan 都开一个 goroutine
			- 防止某个 worker 被阻塞,导致整个程序都不能正常执行,避免死锁
		*/
		go func(inChan chan string) {
			for { 	// 需要不停的放数据
				msg := <-inChan
				commenChan <- msg
			}

		}(workChan) // 必须显示的传入所调用的任务的工作管道
	}

	// 再开一个 goroutine 保持从 commonChannel 中拿数据
	go func() {
		for {	// 需要不停的拿数据
			tempMsg := <-commenChan
			fmt.Println(tempMsg)
		}
	}()
}

/*
- 方法二
- 适用于已知任务数量的情况
- 使用 select 对任务进行选择
- 这里假设三个
*/
func WorkerDealKnownNum(worker0, worker1, worker2 chan string) {
	// 创建公用管道
	commonchan := make(chan string)

	// 起一个 goroutine 保持对worker的判断
	go func() {
		for {
			// 每次循环只拿一个管道的信息
			select {
			case msg := <- worker0:
				commonchan <- msg
			case msg := <- worker1:
				commonchan <- msg
			case msg := <- worker2:
				commonchan <- msg
			}
		}
	}()

	// 起一个 goroutine 保持输出
	go func() {
		for {
			msg := <- commonchan
			fmt.Println(msg)
		}
	}()
}

func main() {

	worker0 := WorkerGen("work0")
	worker1 := WorkerGen("work1")
	worker2 := WorkerGen("work2")

    // WorkerDealUnknownNum(worker0, worker1, worker2)
	WorkerDealKnownNum(worker0, worker1, worker2)
	// 停留 5s 看输出
	time.Sleep(time.Second * 5)
}

任务控制

常用的并发任务控制有这几种方法:

  • 非阻塞等待
  • 超时机制
  • 任务中断退出
  • 使用sync.WaitGroup

非阻塞等待

使用case-default实现,如果此时没有准备好数据,就走default做别的任务。

func nonBlockingWait(c chan string) (string, bool) {
	select {
	case m := <-c:
		return m, true
	default:
		return "", false
	}
}

超时机制

给定一个timeout,在select中,若规定的timeout内没有消息,则time.After会向管道发送数据,表示规定时间已到,时间超过,就去做别的事情。

func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
	select {
	case m := <-c:
		return m, true
	case <-time.After(timeout):
		return "", false
	}
}

任务中断退出

package main

import (
	"fmt"
	"math/rand"
	"time"
)
// 使用channel,让主进程和goroutine之间通知任务结束
func msgGen(name string, done chan struct{}) chan string {
	c := make(chan string)
	go func() {
		i := 0
		for {
			select {
			case <-time.After(time.Duration(rand.Intn(5000)) * time.Millisecond):
				c <- fmt.Sprintf("service %s: message %d", name, i)
			case <-done:	// goroutine 中得到通知,任务结束
				fmt.Println("cleaning up")
				time.Sleep(2*time.Second)
				fmt.Println("cleanup done")
				done<- struct{}{}	// 清理任务后,具备结束条件,告知主进程,然后结束
				return
			}
			i++
		}
	}()
	return c
}


func main() {
	done := make(chan struct{})
	m1 := msgGen("server1", done)
	for i := 0; i < 5; i++ {
		if m, ok := timeoutWait(m1, time.Duration(rand.Intn(2000))*time.Millisecond); ok {
			fmt.Println(m)
		} else {
			// fmt.Println("no message from service2")
			fmt.Println("service1 timeout")
		}
	}
	done <- struct{}{}	// 主进程向 done 管道通知,结束
	<-done				// 收到 goroutine 结束的消息,然后结束
	time.Sleep(time.Second)

}

sync.WaitGroup

// 使用背景:combine 需要等 4 个任务都完成之后 才可以处理返回值
// 使用 sync.WaitGroup 来等待任务完成
func combine(r image.Rectangle,
	c1, c2, c3, c4 <-chan image.Image) <-chan string {

	c := make(chan string)

	go func() {
		// 使用 waitgroup 是因为:
		// 开了 goroutine 后 goroutine 的生命周期 就不受这个函数的影响了
		// 如果 goroutine 还没有执行完这个函数就结束了
		// 这个函数。自然拿不到 goroutine 的结果,也就无法完成任务
		var wg sync.WaitGroup
		newImage := image.NewNRGBA(r)
		// 复制块
		copy := func(dst draw.Image, r image.Rectangle,
			src image.Image, sp image.Point, index string) {
			draw.Draw(dst, r, src, sp, draw.Src)
			// 记得释放一个 mutex
			wg.Done() // 匿名函数 使用外面的 变量
		}

		wg.Add(4)
		var s1, s2, s3, s4 image.Image
		var ok1, ok2, ok3, ok4 bool

		// 使用循环做,保证 4 个任务都完成
		for {
			select {
			case s1, ok1 = <-c1:
				go copy(newImage, s1.Bounds(), s1,
					image.Point{r.Min.X, r.Min.Y}, "1")
			case s2, ok2 = <-c2:
				go copy(newImage, s2.Bounds(), s2,
					image.Point{r.Max.X / 2, r.Min.Y}, "2")
			case s3, ok3 = <-c3:
				go copy(newImage, s3.Bounds(), s3,
					image.Point{r.Min.X, r.Max.Y / 2}, "3")
			case s4, ok4 = <-c4:
				go copy(newImage, s4.Bounds(), s4,
					image.Point{r.Max.X / 2, r.Max.Y / 2}, "4")
			}
			// 结束条件
			if ok1 && ok2 && ok3 && ok4 {
				break
			}
		}
		// 先阻塞 保证goroutine运行完
		wg.Wait()
		buf2 := new(bytes.Buffer)
		jpeg.Encode(buf2, newImage, nil)
		c <- base64.StdEncoding.EncodeToString(buf2.Bytes())
	}()
	return c
}

任务控制-Demo

以:超时机制、任务中断退出两种情况为例

package main

import (
	"fmt"
	"math/rand"
	"strings"
	"time"
)

/**
 *  msgGen
 *  @Description: 消息生成器
 *  @param name: 用来标识发送消息的实体
 *  @param done: 传递从调度程序来的结束消息
 *  @return chan: 将自己的消息通过chan传递出去
 **/
func msgGen(name string, done chan struct{}) chan string {
	c := make(chan string)
	go func() {
		i := 0
		for {
			select {
			// time.After函数在到达规定时间后,返回一个 <-chan 类型的管道
			case <-time.After(time.Duration(rand.Intn(3000)) * time.Millisecond):
				// 随机的发送消息
				c <- fmt.Sprintf("service %s: message %d", name, i)
			// 如果结束通知到来,结束,并通知主协程 自己已经结束了
			case <-done:
				fmt.Println("cleaning up")
				time.Sleep(2 * time.Second)
				fmt.Println("cleanup done")
				done <- struct{}{}
				return
			}
			i++
		}
	}()
	return c
}

/**
 *  msgGenTimeout
 *  @Description:	该消息生成器一旦生成消息超时,自动done掉
 *  @param name:	服务名标识
 *  @return chan:	通过管道传出消息
 **/
func msgGenTimeout(name string) chan string {
	c := make(chan string)
	go func() {
		i := 0
		for { // 如果没有触发超时,就一直发送数据
			select {
			// 每轮消息 10ms 内随机发送
			case <-time.After(time.Duration(rand.Intn(10)) * time.Millisecond):
				c <- fmt.Sprintf("service %s: send message %d", name, i)
			// 超时计数在每轮的 100ms 内随机超时
			case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
				c <- fmt.Sprintf("%s timeout", name)
				return // 超时自动退出
			}
		i++
		}
	}()
	return c
}

/**
 *  timeoutWait
 *  @Description: 模拟超时
 *  @param c: 查看管道 c 中是否有消息
 *  @param timeout: 超时时间
 *  @return string: 返回消息或者空
 *  @return bool: 超时标志
 **/
func timeoutWait(c chan string, timeout time.Duration) (string, bool) {
	select {
	case m := <-c:
		return m, true
	case <-time.After(timeout):
		return "", false
	}
}

/**
 *  timeoutMechanism
 *  @Description: 并发任务控制 -- 超时自动退出
 *				假设有n=3个协程,如果其中某个协程规定时间内没有发出消息,则退出并通知主协程
 **/
func timeoutMechanism() {
	m1 := msgGenTimeout("service1")
	m2 := msgGenTimeout("service2")
	m3 := msgGenTimeout("service3")

	i := 0 // 用来计数有几个协程timeout
	for {
		if i == 3 {
			break
		}
		select {
		case msg := <-m1:
			if strings.Contains(msg, "timeout") {
				// 超时消息
				fmt.Println(msg,"===================")
				i++
			} else {
				fmt.Println(msg)
			}
		case msg := <-m2:
			if strings.Contains(msg, "timeout") {
				fmt.Println(msg,"===================")
				i++
			} else {
				fmt.Println(msg)
			}
		case msg := <-m3:
			if strings.Contains(msg, "timeout") {
				fmt.Println(msg,"===================")
				i++
			} else {
				fmt.Println(msg)
			}
		}
	}

}

/**
 *  notifyToExit
 *  @Description: 并发任务控制 -- 主进程通过 channel 通知子协程退出
 **/
func notifyToExit() {
	done := make(chan struct{})
	m1 := msgGen("server1", done)
	for i := 0; i < 5; i++ {
		if m, ok := timeoutWait(m1, time.Duration(rand.Intn(2000))*time.Millisecond); ok {
			// 如果 msgGen 有消息产生,就打印出来
			// 每轮消息在 5000ms 内随机产生  ---  每轮超时的时间在 2000ms 内
			fmt.Println(m)
		} else {
			// fmt.Println("no message from service2")
			fmt.Println("service1 timeout")
		}
	}
	// 主进程通知子协程中断退出
	done <- struct{}{}
	// 子协程退出后通知父协程
	<-done
	time.Sleep(time.Second)
}

func main() {
	//timeoutMechanism()	// 超时自动退出
	notifyToExit()			// 主进程通知结束
}
Licensed under CC BY-NC-SA 4.0
自认为是幻象波普星的来客
Built with Hugo
主题 StackJimmy 设计