5、并发编程

并发编程

并发从程序设计的角度,就是希望通过某些机制让计算机可以在一个时间段内,执行多个任务。让一个或多个物理 CPU 在多个程序之间多路复用,提高对计算机资源的利用率。

Go 语言通过编译器运行时(runtime),从语言上支持了并发的特性。Go 语言的并发通过 goroutine 特性完成。goroutine 类似于线程,但是可以根据需要创建多个 goroutine 并发工作**。goroutine 是由 Go 语言的运行时调度完成,而线程是由操作系统调度完成。**

并发编程实现模型

并发模型一般分为:多进程编程、多线程编程、非阻塞/异步 IO 编程以及基于协程的编程。

常见概念

并发的作用

例子

1、一方面我们需要灵敏响应的图形用户界面,一方面程序还需要执行大量的运算或者 IO 密集操作,而我们需要让界面响应与运算同时执行。

2、当我们的 Web 服务器面对大量用户请求时,需要有更多的“Web 服务器工作单元”来分别响应用户。

3、我们的事务处于分布式环境上,相同的工作单元在不同的计算机上处理着被分片的数据,计算机的 CPU 从单内核(core)向多内核发展,而我们的程序都是串行的,计算机硬件的能力没有得到发挥。

4、我们的程序因为 IO 操作被阻塞,整个程序处于停滞状态,其他 IO 无关的任务无法执行。

作用

Goroutine

goroutine 是协程的 Go 语言实现,它是语言原生支持的,相对于一般由库实现协程的方式,goroutine 更加强大,它的调度一定程度上是由 go 运行时(runtime)管理。

其好处之一是,当某 goroutine 发生阻塞时(例如同步IO操作等),会自动出让 CPU 给其它 goroutine。

goroutine是非常轻量级的,它就是一段代码,一个函数入口,以及在堆上为其分配的一个堆栈(初始大小为4K,会随着程序的执行自动增长删除)。所以它非常廉价,我们可以很轻松的创建上万个 goroutine。

Go 程序从 main 包的 main() 函数开始,在程序启动时,Go 程序就会为 main() 函数创建一个默认的 goroutine。

开启一个运行时goroutine

func myFunc(){
    
}
//方式一:调用方法的语句前添加go关键字
go myFunc()
//方式二:匿名函数
go func(){
    
}()

所有 goroutine 在 main() 函数结束时会一同结束,终止 goroutine 的最好方法就是自然返回 goroutine 对应的函数。

channel

channel 是Go语言在语言级别提供的 goroutine 间的通信方式。我们可以使用 channel 在两个或多个 goroutine 之间传递消息。

channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,我们建议用分布式系统的方法来解决,比如使用 Socket 或者 HTTP 等通信协议。Go语言对于网络方面也有非常完善的支持。

Go语言提倡使用通信的方法代替共享内存,当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针

channel 是类型相关的,也就是说,一个 channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。

通道的特性

Go语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。

通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

声明和创建

定义一个 channel 时,也需要定义发送到 channel 的值的类型,注意,chan 类型的空值是 nil,必须使用 make 创建 channel

//声明,chanName:通道变量名称,chanType:通道内的数据类型
var chanName chan chanType
//声明并定义
var chanName chan chanType := make(chan chanType)
//声明并定义,使用自动类型推导
chanName := make(chan chanType)

var c chan int = make(chan int)
c := make(chan int)
c := make(chan string)

关闭通道

关闭 channel 非常简单,直接使用Go语言内置的 close() 函数即可

close(chName)

发送和接收

通道创建后,就可以使用通道进行发送和接收操作。

发送

通道的发送使用特殊的操作符<-,将数据通过通道发送的格式为:

chanName <- value

发送后阻塞

把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,例如,现在创建一个chan,但是没有接收方

package main

func main() {
	ch := make(chan string)
	ch <- "hello"
}

/*
fatal error: all goroutines are asleep - deadlock!
*/

运行时发现所有的 goroutine(包括main)都处于等待 goroutine。也就是说所有 goroutine 中的 channel 并没有形成发送和接收对应的代码。

接收

接收方式一共有四种:

1、阻塞接收数据

阻塞模式接收数据时,将接收变量作为<-操作符的左值,执行该语句时将会阻塞,直到接收到数据并赋值给 value变量。

value := <-chanName

2、非阻塞接收数据

使用非阻塞方式从通道接收数据时,语句不会发生阻塞,非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。

value, ok := <-chanName
/*
value:表示接收到的数据。未接收到数据时,value 为通道类型的零值。
ok:表示是否接收到数据,false表示通道已经关闭
*/

3、接收任意数据,忽略接收的数据

阻塞接收数据后,忽略从通道返回的数据,执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。

<-chanName

4、循环接收

通道的数据接收可以借用 for range 语句进行多个元素的接收操作,使用后所在的goroutine将会阻塞循环接收

for value := range chanName {
    
}

例子

package main

import "fmt"

func send(ch chan string) {
	for {
		var s string
		fmt.Scanln(&s)
		ch <- s
		fmt.Println("send ok")
	}
}
func accept(ch chan string) {
	for v := range ch {
		if v == "break" {
			break
		}
		fmt.Println("accept ok:", v)
	}
}

func main() {
    //创建一个通道,用于两个goroutine通信
	ch := make(chan string)
    //main函数执行完后,关闭通道
    defer close(ch)
    //开启一个goroutine用于发送
	go send(ch)
    //使用main的gotroutine进行接收
	accept(ch)
}

/*
hello
send ok
accept ok: hello
world
send ok
accept ok: world
break
*/

注意:使用main函数所在goroutine直接阻塞接收的目的就是,不让main函数return,因为一旦main函数return,那么所有的goroutine都会结束

单向通道

所谓的单向 channel 概念,其实只是对 channel 的一种使用限制,比如限制一个通道在这个函数中的读写,因此,单向通道有利于代码接口的严谨性。

声明

//只写的通道
var chanName chan<- chanType
//只读的通道
var chanName <-chan chanType

实例

ch := make(chan string)
var onlyWrite chan<- string = ch
var onlyRead <-chan string = ch

通道的缓冲

无缓冲通道

Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

阻塞指的是由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足才解除阻塞。

同步指的是在两个或多个协程(线程)之间,保持数据内容一致性的机制。

缓冲通道

Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同**:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。**

在无缓冲通道的基础上,为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲通道中有数据,接收时将不会发生阻塞,直到通道中没有数据可读时,通道将会再度阻塞。

例如收取快递,无缓冲通道:快递员打电话让取件人取快递,这时发送方(快递员)处在阻塞状态,快递员需要等待取件人取快递,当快递取完,此时取件人又处于阻塞状态,等待快递员打电话取快递;缓冲通道:快递员将快递放入快递柜(缓冲区),此时快递员可以继续放其他快递,而取件人可以等有空了再取这些快递,只有当快递柜放不下了,快递员才会进入阻塞状态,而取件人只有快递柜里的快递都取完了,才会进入阻塞状态。

创建带缓冲的通道
chanName := make(chan chanType,size)
/*
chanName:通道实例变量名
chanType:通道所内数据类型
size:通道缓冲区大小
*/

select-case

select 的用法与 switch 语言非常类似,由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。

golang 的 select 就是监听 IO 操作,当 IO 操作发生时,触发相应的动作每个case语句里必须是一个IO操作,确切的说,应该是一个面向channel的IO操作

语法

ch : make(chan int)
select {
    case <-ch:
        //如果从ch通道成功接收数据,则执行该分支代码
    case ch <- 1:
        //如果成功向ch通道成功发送数据,则执行该分支代码
    default:
        // 如果上面都没有成功(处于阻塞),则进入 default 分支处理流程
}

使用注意

1、case后面必须是对chan的io操作,select是go在语言层面提供的IO多路复用机制,专门用于检测多个channel是否准备完毕:可读可写

2、select语句中除default外,每个case操作一个channel,要么读要么写,各case的执行顺序是完全随机(假)的

3、当所有的case都无法执行(处于阻塞),如果有default语句,那么会执行default分支;如果没有default语句,那么会假随机到任意的一个case,并进行阻塞等待

4、对于空的select(),会引起死锁,例如我们编写http服务的时候,可以使用空select阻塞main函数,要注意除了main函数所在的goroutine,一定要有一直活动的goroutine,否则会报deadlock

5、对于for中的select{}, 也有可能会引起cpu占用过高的问题

6、对于case条件语句中,如果存在信道值为nil的读写操作,则该分支将被忽略,可以理解为从select语句中删除了这个case语句

控制超时

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int)

	go func() {
		for i := 0; i < 5; i++ {
			ch <- i
		}
	}()
exit:
	for {
		select {
		case i := <-ch:
			fmt.Println(i)
		case <-time.After(3 * time.Second):
			fmt.Println("timeout")
			break exit
		}
	}
	fmt.Println("end")
}

/*
0
1
2
3
4
##等待3秒后##
timeout
end
*/

自己实现超时控制

package main

import (
	"fmt"
	"time"
)

//和After原理一样,调用函数返回一个chan,i秒后将i写入通道
func timeout(i int) chan int {
	//非阻塞通道
	ch := make(chan int, 1)
	go func() {
		time.Sleep(time.Duration(i) * time.Second)
		ch <- i
	}()
	return ch
}

func main() {
	ch := make(chan int)
	go func() {
		for i := 0; i < 5; i++ {
			ch <- i
		}
	}()

exit:
	for {
		select {
		case i := <-ch:
			fmt.Println(i)
		case <-timeout(3):
			fmt.Println("timeout")
			break exit
		}
	}
	fmt.Println("end")
}

/*
0
1
2
3
4
##等待3秒后##
timeout
end
*/

锁的作用就是某个协程(线程)在访问某个资源时先锁住,防止其它协程的访问,等访问完毕解锁后其他协程再来加锁进行访问。

Go 语言中的 sync 包提供了两种锁类型,分别为:sync.Mutex sync.RWMutex,即互斥锁读写锁

互斥锁

每个资源都对应于一个可称为 “互斥锁” 的标记,这个标记用来保证在任意时刻,只能有一个协程(线程)访问该资源。其它的协程只能等待。

互斥锁是传统并发编程对共享资源进行访问控制的主要手段,在 Golang 中,它由标准库 sync 中的Mutex结构体类型表示。sync.Mutex类型 只有两个公开的指针方法,Lock 和 Unlock。Lock 锁定当前的共享资源,Unlock 进行解锁。

在使用互斥锁时,**一定要注意:对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助defer。**锁定后,立即使用 defer 语句保证互斥锁及时解锁。

使用

// 定义互斥锁变量 mutex
var lock sync.Mutex
// 对需要访问的资源加锁
lock.Lock( )
// 资源访问结束解锁
lock.Unlock( )

例子

在如下案例中,两个协程同时对num分别执行100000次+1,结果应该是200000,但是实际执行后,结果不正确是随机的

package main

import (
	"fmt"
)

var (
	num  = 0
)

func add() {
	num++
}

func main() {
	ch := make(chan bool)

	go func() {
		for i := 0; i < 100000; i++ {
			add()
		}
		ch <- true
	}()
	for i := 0; i < 100000; i++ {
		add()
	}
	<-ch
	fmt.Println(num)
}

对add函数加锁后,发现执行的结果正确为200000

package main

import (
	"fmt"
	"sync"
)

var (
	num  = 0
	lock sync.Mutex
)

func add() {
	lock.Lock()
	defer lock.Unlock()
	num++
}

func main() {
	ch := make(chan bool)

	go func() {
		for i := 0; i < 100000; i++ {
			add()
		}
		ch <- true
	}()
	for i := 0; i < 100000; i++ {
		add()
	}
	<-ch
	fmt.Println(num)
}

读写锁

读写锁里面其实也是有一个互斥锁的结构,读写锁的实现本质也是借助于互斥锁来实现的。

读写锁是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个 goroutine 可同时获取读锁(调用 RLock() 方法;而写锁(调用 Lock() 方法)会阻止任何其他 goroutine(无论读和写)进来,整个锁相当于由该 goroutine 独占。

互斥锁的问题

互斥锁的本质是当一个goroutine访问的时候,其他 goroutine 都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。

其实,当我们对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少goroutine 同时读取,都是可以的。

使用

1、读锁
// 定义一个读写锁
var rwLock sync.RWMutex
// 锁住需要读取的数据
rwLock.RLock()
// 释放锁
rwLock.RUnLock()
2、写锁
// 定义一个读写锁
var rwLock sync.RWMutex
// 锁住需要写入的数据
rwLock.Lock()
// 释放锁
rwLock.UnLock()

等待组

Go语言中除了可以使用通道(channel)和互斥锁进行两个并发程序间的同步外,还可以使用等待组进行多个任务的同步,等待组可以保证在并发环境中完成指定数量的任务后在继续执行下面代码

var wg sync.WaitGroup
// 设置需要等待的协程数
wg.Add(num)
// 一个协程处理结束
wg.Done()
// 等到所有的协程结束
wg.Wait()
package main

import (
	"fmt"
	"sync"
	"time"
)

func pri() {
	for i := 0; i < 3; i++ {
		fmt.Println("pri,i =", i)
		time.Sleep(1 * time.Second)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		pri()
	}()
	go func() {
		defer wg.Done()
		pri()
	}()
	wg.Wait()
	fmt.Println("all goroutines over")
}

/*
pri,i = 0
pri,i = 0
pri,i = 1
pri,i = 1
pri,i = 2
pri,i = 2
all goroutines over
*/