Go语言5-并发

我们常听到GO的一个主要特性介绍:“天生支持高并发”,什么叫天生支持呢?

并发和并行不是一回事,并行是多个处理器同时执行不同指令,而并发是指一个时间只能执行一条指令,但cpu不断切换进程/线程达到同时执行的效果。

一、线程和进程

开始前先了解下操作系统进程和线程的概念。

上图为一个应用程序运行时线程和进程的简要描述,程序启动时,操作系统启动一个进程,这个进程维护程序需要的各种资源包括内存、文件、设备句柄和线程等。进程间的资源是相互隔离的,有的软件采用多进程并发模型,比如nginx,和大多数web服务器一样,采用master/worker模型,一个master进程管理站个或者多个worker进程。多进程的方式系统开销大,所有进程都是由内核管理。

进程可以有一个或者多个线程,其中初始线程成为主线程。多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多最有效的一种模式。相比多进程,开销较小。线程共享进程的大部分资源,并参与CPU的调度, 当然线程自己也是拥有自己的资源的,例如,栈,寄存器等等。但是,高并发时,所能创建的线程数有限,且CPU切换开销大,很可能耗尽内存和CPU资源。

二、协程

协程(Coroutine)本质上是一种用户态线程,也可以说是轻量级线程,不需要操作系统来进行抢占式调度,系统开销极小。通过在线程中实现调度,避免了陷入内核级别的上下文切换造成的性能损失,进而突破了线程在IO上的性能瓶颈,可以有效提高线程的任务并发性,而避免多线程的缺点。使用协程的优点是编程简单,结构清晰;缺点是需要语言的支持,如果不支持,则需要用户在程序中自行实现调度器。

三、goroutine

goroutine是Go语言中的轻量级线程实现,本质上也是协程,由Go运行时(runtime)管理。我们说GO天生支持并发,就是指Go在语言层面管理、调度、执行goroutine的支持。通过关键词go,可以让函数独立执行。先看一个示例:

package main
import (
	"fmt"
)
func main() {
	for i := 1; i < 10; i++ {
		go printB()
	}
}
func printB() {
	fmt.Println("A:")
}

在main函数中条用printB 10次,但通过关键字go调用,运行时发现并没有输出。这就对了,printB并发执行,但main函数结束后程序就退出了,所以没有输出。但怎么做到等这10个printB运行完再结束呢?在java中,main函数本身是守护线程,所以能做到等其他线程都执行完才结束,go是怎么做呢?在java中,并发时最需要考虑的是共享数据的安全性,共享内存一般采用加锁或者通过一些并发安全或者具有原子性操作的库完成。

四、锁与共享资源

针对上面提出的疑问,一般情况都通过对共享资源加锁,GO也提供了很好的支持:原子操作,锁

atomic

原子操作函数指go在底层通过加锁等机制来实现同步访问变量和指针。

import (
	"fmt"
	"sync"
	"sync/atomic"
	"runtime"
)
var (
	counter int64
	wg      sync.WaitGroup // wg:用来等待程序结束
)
func incCounter(id int) {
	defer wg.Done()// 在函数结束时调用down通知main函数工作已经完成
	for count := 0; count < 100; count++ {
		// 安全地 counter + 1:同一时刻只能有一个 goroutine 运行并完成这个加法操作
		atomic.AddInt64(&counter, 1)
		runtime.Gosched()
	}
}
func main() {
	wg.Add(2)// 要等待两个goroutine
	go incCounter(1)
	go incCounter(2)
	wg.Wait()//等待goroutine结束
	fmt.Println("Final Counter:",counter)
}

atmoic 包的 AddInt64 函数。这个函数会同步整型 的加法, 方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。

mutual 互斥锁

另一种同步访问共享资源的方式是使用互斥锁(mutex)。

package main

import (
	"fmt"
	"sync"
	"runtime"
)

var (
	counter int64
	wg      sync.WaitGroup
	mutex   sync.Mutex
)
func incCounter(id int) {
	defer wg.Done()
	for count := 0; count < 100; count++ {
		mutex.Lock() //同一时刻只允许一个goroutine进入
		{
			value := counter
			runtime.Gosched()
			value ++
			counter = value
		}
		mutex.Unlock()
	}
}
func main() {
	wg.Add(2)
	go incCounter(1)
	go incCounter(2)
	wg.Wait()
	fmt.Println("Final Counter:", counter)
}

直到调用 Unlock()函数之后,其他 goroutine 才能进入lock区域。

加锁是比较传统的共享资源方式,但是,Go语言还提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。下面就来了解goroutine的通讯方式channel。

五、 channel

channel是进程内的通信方式,因此通过channel传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。channel是类型相关的,也就是说,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。例如,下面的示例表示声明了一个int的channel类型,只能传递int值。

var ch chan int
var ch1 chan<- int  //ch1只能写
var ch2 <-chan int  //ch2只能读

注意使用var声明的Channel没有初始化,往里写入数据将抛出panic,读取则一直阻塞。

从示例看到<-,他说channel的操作符,箭头的指向就是数据的流向。在go语言中,有4种引用类型:slice,map,channel,interface。Slice,map,channel一般都通过make进行初始化,并设置缓存大小:

ch := make(chan int)
ch := make(chan int, 100)

如果不设置缓存大小,默认为0,用来构建默认的“无缓冲channel”,也称为“同步channel”。

chan T          // 可以接收和发送类型为 T 的数据
chan<- float64  // 只可以用来发送 float64 类型的数据
<-chan int      // 只可以用来接收 int 类型的数据

可以通过内建的close方法可以关闭Channel。

close(ch)
v, ok := <-ch //检查Channel是否已经被关闭

buffer

举个例子。碎沙机和拉沙车是两个goroutine,独立工作。如果需要他们配合工作,把碎沙机的沙通过传送带(channel)给车拉走。

只要有一方不在传送带工作,另一方就需要等着(阻塞)。车没回来,碎沙机就得停止工作。

有buffer,就好像在传送带加了一个存放沙的池子,如果池子不满,碎沙机器可以一直不停工作。

func main() {
	ch1 := make(chan int)
	go func() {
			fmt.Println("goroutine1 revice value",<-ch1)
	}()
	ch1 <- 1
	ch1 <- 2 // 没有消费者,会一直阻塞。此处因为所以其他goroutine都关闭了,此处导致fatal error:
	time.Sleep(5*100*time.Millisecond)
	fmt.Println("Process done.")
}

如果把ch1 := make(chan int)改为ch1 := make(chan int,1),buffer为1,goroutine消费掉一个,因为容量为1,2还是可以放入Channel,不会阻塞。所以对于有buffer的Channel,特性如下:
buffer满了,发送方阻塞;buffer空了,消费方阻塞

发送数据(Sender)

表达式:

SendStmt = Channel "<-" Expression .
Channel  = Expression .

在发送数据前,Expression会先计算出结果。By default sends and receives block until both the sender and receiver are ready。并且,send被执行前通讯一直被阻塞着,如下,利用channel的通讯,在没有使用任何同步方法的情况下实现了同步,也解决了之前提出的问题。

messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
  • 往一个已经被close的channel中继续发送数据会报错
  • 往nil channel中发送数据会一致被阻塞
  • 从一个nil channel中接收数据会一直被阻塞
  • 从一个被close的channel中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值

同步

上面通过sender和receiver的阻塞,可以实现同步。下面在一个示例:

func worker(done chan bool) {
	fmt.Print("working...")
	time.Sleep(time.Second)
	fmt.Println("done")
	done <- true
}
func main() {
	done := make(chan bool)
	go worker(done)
	<-done
}

很简单,main函数在等待接收done的数据,一直阻塞,直到done <- true被执行。我们对示例稍作修改:

func worker(done chan bool, i int) {
	fmt.Printf("working%d...",i)
	fmt.Printf("done%d",i)
	fmt.Println()
	done <- true
}
func main() {
	done := make(chan bool, 1)
	for i :=0; i<10; i++{
		go worker(done,i)
	}
	<-done
}

从执行结果可知,只能保证至少一个working,done被执行,因为接收者收到数据后,程序就退出了。

Range

可以通过range迭代channel的元素,也就是一直从channel取出数据直到通道被关闭。

func main() {
	c := make(chan int)
	go func() {
		for i := 0; i < 10; i = i + 1 {
			c <- i
		}
		close(c)
	}()
	for i := range c {
		fmt.Println(i)
	}
	fmt.Println("Finished")
}

上面的示例会打印1到9,才执行finished,但如果close(c)被注释掉,程序会出现错误,因为已经没有goroutines在运行,但程序还在不断从chenel获取数据。现在在上面示例中添加一个goroutine如下:

go func() {
		time.Sleep(1 * time.Hour)
}()

再执行程序,程序会一直阻塞在range处。

Select

select语句可以用于多个channel的读或者写。它与switch语句比较类似,只不过select只用于channel。 如果有多个channel可以处理,那么select随机选择一个channel处理:

for {  // send random sequence of bits to c
    select {
    case c <- 0:  // note: no statement, no fallthrough, no folding of cases
    case c <- 1:
    }
}

如果所有channel都不能处理,如果有default语句,则执行default,如果没有default,则会阻塞,直到有channel可以处理。一个处理nil channel,没有default的select会永远阻塞。这常用于daemon程序。

select {}  // block forever

真正使用时可能如下:

func main() {

	go func(){
		for {
			time.Sleep(time.Second * 1)
			fmt.Println("do some work")
		}
	}()
	select {}
}

上面提到一个会一直阻塞的情况,而select本身没有超时设置,可以通过time实现:

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1 * time.Second)
    timeout <- true
}()

select {
case <-ch:
    // a read from ch has occurred
case <-timeout:
    // the read from ch has timed out
}

根据上面提供的模式,我们来实验下:

c1 := make(chan string, 1)
timeout := make(chan bool)
go func() {
	time.Sleep(time.Second * 2)
	c1 <- "result 1"
}()
go func() {
	time.Sleep(time.Second * 1)
	timeout <- true
}()
select {
case res := <-c1:
	fmt.Println(res)
case <- timeout:
	fmt.Println("timeout")
}

timeout

也不是非得一个匿名函数,timeout可以简写:

select {
case res := <-c1:
	fmt.Println(res)
case <- time.After(time.Second * 1):
	fmt.Println("timeout")
}

特殊状态数据

对于初始化非关闭的Channel读写都认为是正常状态。

关闭的 Channel
  1. 写:panic
  2. 读:Channel 类型的零值
  3. 读(有buffer):读取buffer中数据,继续读取零值

从以上特点可以知,如果使用select方式读取,如果Channel关闭,将会陷入死循环:

func main() {
	ch1 := make(chan int)
	go func() {
		for {
			select {
			case val := <-ch1:
				fmt.Println("Received value: ", val)
			}
		}

	}()
	for i := 0; i <= 10; i++ {
		ch1 <- i
	}
	close(ch1)
	time.Sleep(100* time.Microsecond)
	fmt.Println("Process done.")
}

如上,定时器结束前,一直输出Received value: 0

可以换种方式使用select:

go func() {
	var closed bool
	for {
		if closed {
			break
		}
		select {
		case val, ok := <-ch1:
			fmt.Println("Received value: ", val)
			if !ok {
				closed = true
				break
			}
		}
	}
}()

当然for...range是一个好的选择。

基于Channel 关闭后的行为,如何关闭它才是安全的呢?有一条比较
通用的适用原则,即不要从接收端关闭Channel

写入值为nil的Channel

当一个Channel 的值为nil 时,写入这个Channel 的操作会永远阻塞,读也永远阻塞。所以在使用Channel 时一定要注意这一点,在对其写入或者读取时一定要保证这个Channel 已经被初始化。


引用:
[1]. Go语言学习:Channel
[2]. Go Channel 详解
[3]. 《Go语言实战》
[4]. 《Go语言编程》
[5]. http://www.infoq.com/cn/minibooks/freewheel-minibook-go

CONTENTS