社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
@author 鲁伟林
记录《Go语言实战》中各章节学习过程,写下一些自己的思考和总结。希望维护管理此仓库,记录学习过程中部分心得,以与其他同行参考。
本博客中涉及的完整代码:
GitHub地址: https://github.com/thinkingfioa/go-learning
本人博客地址: https://blog.csdn.net/thinking_fioa
文中如若有任何错误,欢迎指出。个人邮箱: thinking_fioa@163.com
Go语言里的并发指的是能让某个函数独立于其他函数运行的能力。当一个函数创建为goroutine时,Go会将其视为独立的工作单元。
Go语言的并发同步模型来自一个叫作通信顺序进程(CSP)。CSP是一种消息传递模型,在goroutine之间传递数据来传递消息,而不是对数据进行加锁来实现同步。
操作系统会在物理处理器上调度线程来执行,而Go语言的运行会在逻辑处理器上调度goroutine来运行。在1.5版本后,Go语言的运行默认会在每个可用的物理处理器分配一个逻辑处理器。
如果创建一个goroutine并准备运行,这个goroutine会被放到调度器的全局运行队列中。之后,调度器就将队列中的goroutine分配一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中,如下图6-2
下面是一段代码,有两个goroutine匿名函数,同时逻辑处理器的数量设置为2(runtime.GOMAXPROCS(2))
func main() {
runtime.GOMAXPROCS(2)
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("start program")
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
go func() {
defer wg.Done()
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
wg.Wait()
fmt.Println("Finisg")
}
竞争状态:两个或多个gorotine在没有相互同步的情况下,访问某个共享的资源,并试图同时读和写这个资源。int类型是非线程安全,在多gorotine访问时需要保护操作。下面代码就存在竞争状态
var (
count int
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go addCount()
go addCount()
wg.Wait()
fmt.Printf("count %d", count)
}
func addCount() {
defer wg.Done()
for i := 0; i < 10000; i++ {
if i == 500 {
// 当前的gorotine从线程退出,并放回本地运行队列
runtime.Gosched()
}
count++
}
}
Go语言提供了传统的同步gorotine的机制,就是对共享资源加锁,类似于其他语言。atomic和sync包里的函数提供了很好的解决方案
6.4.1 原子函数
使用 atomic包中的AddInt64函数
var (
count int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go addCount(10000)
go addCount(10000)
wg.Wait()
fmt.Printf("count %d", count)
}
func addCount(loopNum int) {
defer wg.Done()
for i := 0; i < loopNum; i++ {
//count++
atomic.AddInt64(&count, 1)
}
}
注: 使用原子方法:atomic.AddInt64(&count, 1),来保证线程安全
使用原子函数LoadInt64和StoreInt64
原子函数LoadInt64和StoreInt64提供了一种安全地读和写一个整型值的方式
var (
shutdown int64
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(10 * time.Second)
atomic.StoreInt64(&shutdown, 1)
wg.Wait()
fmt.Println("Finish")
}
func doWork(name string) {
defer wg.Done()
for {
fmt.Printf("output %sn", name)
time.Sleep(250 * time.Millisecond)
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("%s shutdown.n", name)
break
}
}
}
6.4.2 互斥锁(mutex)
另一种同步访问共享资源的方式是使用互斥锁(mutex)。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个goroutine可以执行这个临界区代码。使用mutex.Lock()和mutex.Unlock()声明一段代码是临界区
var (
count int64
wg sync.WaitGroup
mutex sync.Mutex
)
func main() {
wg.Add(2)
go addCount(10000)
go addCount(10000)
wg.Wait()
fmt.Printf("count %dn", count)
}
func addCount(loopNum int) {
defer wg.Done()
for i := 0; i < loopNum; i++ {
mutex.Lock()
count++
mutex.Unlock()
}
}
Go语言不仅提供了原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源
Go语言使用内置函数make来创建一个通道。通道有两种类型,一个是无缓冲的通道,一个是有缓冲的通道
6.5.1 无缓冲的通道
无缓冲的通道是指在接收前没有能力保存任何值的通道。通道要求发送gorotine和接收gorotine同时准备好,才能完成发送和接收操作,否则将一直阻塞等待。
无缓冲的通道操作是一个阻塞操作,数据没有被成功接收,发送gorotine将一直阻塞。
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
court := make(chan int)
wg.Add(2)
go player("thinking", court)
go player("ppp", court)
court <- 1
wg.Wait()
}
func player(name string, court chan int) {
defer wg.Done()
for {
ball, ok := <-court
if !ok {
fmt.Printf("Player %s Wonn", name)
return
}
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missedn", name)
close(court)
return
}
fmt.Printf("Player %s Hit %dn", name, ball)
ball++
court <- ball
}
}
6.5.2 有缓冲的通道
有缓冲的通道是一种在被接收前能存储一个或多个值的通道。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。有点类似于Java语言中的LinkedBlockingQueue
const (
numberWorker = 4
taskNum = 10
)
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
wg.Add(numberWorker)
tasks := make(chan string, taskNum)
// 启动${numberWorker}个goroutine
for gr := 1; gr <= numberWorker; gr++ {
go worker(tasks, gr)
}
// 提交任务
for post := 1; post <= taskNum; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}
// 关闭通道
close(tasks)
wg.Wait()
}
func worker(tasks chan string, work int) {
defer wg.Done()
for {
// 获取任务
task, ok := <-tasks
if !ok {
fmt.Printf("Worker: %d : Shutting Downn", work)
return
}
fmt.Printf("Worker: %d : Started %sn", work, task)
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
fmt.Printf("Worker: %d : Completed %sn", work, task)
}
}
主要介绍3个可以在实际工程中使用的包: runner、pool和work
runner包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以使用runner包来终止程序
runner支持以下终止点:
run.go 代码如下
type Runner struct {
// 接收操作系统发送的中断信号
interrupt chan os.Signal
complete chan error
// 接收超时事件
timeout <-chan time.Time
// 执行的函数,必须是一个接收整数且什么都不返回的函数
tasks []func(int)
}
var ErrTimeout = errors.New("received timeout")
var ErrInterrupt = errors.New("received interrupt")
func New(d time.Duration) *Runner {
return &Runner{
// interrupt通道的缓冲区容量初始化为1,确保语言运行是发送这个事件的时候不会阻塞
interrupt: make(chan os.Signal, 1),
// 当任务完成或退出后,返回一个error或者nil值,将等待main函数区接收这个值
complete: make(chan error),
// 在指定duration时间后,向通道发送一个time.Time的值
timeout: time.After(d),
}
}
func (r *Runner) add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
func (r *Runner) Start() error {
//
signal.Notify(r.interrupt, os.Interrupt)
go func() {
r.complete <- r.run()
}()
select {
case err := <-r.complete:
return err
case <-r.timeout:
return ErrTimeout
}
}
func (r *Runner) run() error {
for id, task := range r.tasks {
if r.gotInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
// 当发生中断事件信号时,停止接收后续信号
signal.Stop(r.interrupt)
return true
default:
return false
}
}
main.go
const timeout = 3 * time.Second
func main() {
log.Printf("Starting work.")
r := runner.New(timeout)
r.Add(createTask(), createTask(), createTask())
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println("Terminating due to timeout.")
os.Exit(1)
case runner.ErrInterrupt:
log.Println("Terminating due to interrupt.")
os.Exit(2)
}
}
}
func createTask() func(int) {
return func(id int) {
log.Printf("Processor - Task #%d.", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
7.1.1 代码解释
pool包展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。
当资源池资源不够时,创建新的资源分配,如下factory字段是一个函数类型,可以用该函数创建新的资源
pool.go代码如下:
type Pool struct {
m sync.Mutex
//有缓冲的通道资源池
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
var ErrPoolClosed = errors.New("Pool has been closed.")
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size value is wrong")
}
return &Pool{
resources: make(chan io.Closer, size),
factory: fn,
}, nil
}
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
log.Printf("Acquire:", "New Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
return p.factory()
}
}
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <- r:
log.Printf("Release:", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
// 在清空通道里的资源之前,将通道关闭,如果不这样做,会发生死锁
close(p.resources)
for r := range p.resources {
r.Close()
}
}
main.go代码
const (
maxGoroutines = 25
pooledResources = 2
)
type dbConnection struct {
ID int32
}
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program.")
p.Close()
}
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("Query: QID[%d] CID[%d]n", query, conn.(*dbConnection).ID)
}
7.2.1 代码解释
work包的目的是展示如何使用无缓冲的通道来创建一个goroutine池,多个goroutine执行并控制一组工作,让其并发执行。
使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组goroutine配合执行。
work.go代码如下:
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutine int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutine)
for i := 0; i < maxGoroutine; i++ {
go func() {
for w := range p.work {
w.Task()
}
// goroutine结束
p.wg.Done()
}()
}
return &p
}
func (p *Pool) Run(w Worker) {
p.work <- w
}
func (p *Pool) Shutdown() {
close(p.work)
// 等待所有的goroutine终止
p.wg.Wait()
}
7.3.1 代码解释
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!