GO-并发技术 - Go语言中文社区

GO-并发技术


并发技术1:CSP并发理论

异步async 
并行:多个任务并发执行

同步sync 
串行:多个任务依次执行

阻塞block 
某个并发任务由于拿不到资源没法干活,从而无所事事地干等

异步回调async callback 
A线程唤起B线程,令其干活 
同时给B一个回调函数 
命令B在干完活以后,执行这个回调函数 
这个回调函数会与A线程发生交互 
A不必阻塞等待B执行的结果,AB两个线程可以并发执行 
利弊

  • 效率高
  • 回调地狱CallbackHell,逻辑线不清晰

共享内存

  • 多个并发线程通过共享内存的方式交互数据
  • 线程安全问题:AB间共享的数据地址可能被C并发修改

同步锁/资源锁 
为了解决共享内存所导致的线程安全问题,共享的内存地址在特定时间段被特定线程锁定 
加锁期间,其它线程无法访问,带来低效率问题

死锁 
A锁住B的资源 
B锁住A要的资源 
AB同时阻塞 
案例:小两口的冷战

  • 女:锁住女人的尊严,得到男人的尊严后才释放
  • 男:锁住男人的尊严,得到女人的尊严后才释放

线程池

  • 背景:线程的开销大
  • 内存:保存上下文数据
  • CPU:线程调度

    为了避免无度创建线程(内存溢出OutOfMemory),在一个池中创建一堆线程,循环利用这些线程,用完了以后重置并丢回池中.

  • 利弊

    利:避免了无度创建线程,降低了OOM的风险
    弊:用不用都占去了一大块内存开销
    

线程并发的弊端 
开线程占内存 
啥也不干就拿走1M栈空间 
1024条线程就占用1G内存 
线程切换占CPU 
内存共享不安全 
加了锁效率又低下 
回调地狱导致开发难度高

堆栈 

  • 变量和对象的名称
  • 引用堆地址

  • 杂乱无章地堆放各种数据
  • 没有栈对其进行引用时,就由nil进行引用
  • 被nil引用的堆地中的内容随时可能被垃圾回收器回收

垃圾回收

  • 一块堆内存如果没有被栈引用,就会被0号栈(空nil)所引用
  • 一切被nil引用的对内存,会随时被垃圾回收器(GarbageCollector=GC)回收

CSP模型

  • CommunicatingSequentialProcess
  • 可通信的序列化进程
  • 并发的进程间通过管道进行通信

共享内存 VS 管道

  • 内存共享:通过内存共享通信
  • 管道:通过通信共享内存

管道

  • 最早由CSP模型提出
  • 以点对点管道代替内存共享实现并发进程间的数据交互
  • 相比内存共享数据交互的相率要高很多

协程

  • coroutine
  • coorperte
  • 协作
  • IO时让出CPU
  • routine
  • 事务
  • 微线程/纤程

并发技术2:多协程

创建Goroutine

import (
    "fmt"
    "time"
)

func newTask() {
    for {
        fmt.Println("劳资是子协程")
        time.Sleep(time.Second)
    }
}

func main() {

    //开一条协程,与主协程并发地执行newTask()
    go newTask()

    //主协程赖着不死,主协程如果死了,子协程也得陪着死
    for {
        fmt.Println("this is a main goroutine")
        time.Sleep(time.Second)
    }

}

出让协程资源 
通过runtime.Gosched()出让协程资源,让其他协程优先执行

package main

import (
    "fmt"
    "runtime"
)

func main() {

    go func() {
        for i := 0; i < 5; i++ {
            fmt.Println("go")
        }
    }()

    for i := 0; i < 2; i++ {
        //让出时间片,先让别的协程执行,它执行完,再回来执行此协程
        //(詹姆斯协程:先排档期,你们先上)
        runtime.Gosched()
        fmt.Println("hello")
    }

}

协程自杀

package main

import (
    "fmt"
    "runtime"
    "time"
)

func test() {
    //遗嘱:临终前说的话
    defer fmt.Println("这是test的遗嘱")
    //自杀,触发提前执行遗嘱,暴毙,后边的好日子不过了,调用它的协程也暴毙
    runtime.Goexit()
    //自杀了,后边的好日子不过了
    fmt.Println("生活承诺的很多美好事情...")
    //到这是test的正常退出
}

func wildMan()  {
    for i:=0;i<6;i++{
        fmt.Println("我是野人,我不喜欢约束,我讨厌制约的我的主协程")
        time.Sleep(time.Second)
    }
}

func main() {

    //一个会暴毙的协程
    go func() {
        fmt.Println("aaaaaaaaaaaaaa")
        //test中有协程自杀程序runtime.Goexit()
        test()
        fmt.Println("bbbbbbbbbbbbbbb")
    }()

    //一个讨厌主协程约束的野人协程,主协程正常结束会把她带走
    //如果主协程暴毙,则野人协程失去约束
    go wildMan()

    for i:=0;i<3;i++ {
        time.Sleep(time.Second)
    }

    //主协程的暴毙,会令所有子协程失去牵制——野人永远失去控制
    //主协程暴毙的情况下,如果所有协程都结束了,程序崩溃:fatal error: no goroutines (main called runtime.Goexit) - deadlock!
    runtime.Goexit()
    fmt.Println("主协程正常返回,会带走所有子协程")

}

查看可用内核数

package main

import (
    "fmt"
    "runtime"
)

/*
可用内核越多,并发质量越高
*/

func main() {
    //把可用的最大逻辑CPU核心数设为1,返回先前的设置
    previousMaxProcs := runtime.GOMAXPROCS(1)

    //获得逻辑CPU核心数
    cpu_num := runtime.NumCPU()
    fmt.Println("cpu_num = ", cpu_num)//8
    fmt.Println("previousMaxProcs=",previousMaxProcs)//8

    for {
        //主协程打0,子协程打1
        go fmt.Print(1)
        fmt.Print(0)
    }
}

协程间公平竞争资源

package main

import (
    "fmt"
    "time"
)

func PrinterVII(str string) {
    for _, data := range str {
        fmt.Printf("%c", data)
        time.Sleep(time.Second)
    }
    fmt.Printf("n")
}

func person1VII() {
    PrinterVII("今生注定我爱你")
}

func person2VII() {
    PrinterVII("FUCKOFF")
}

func main() {
    go person1VII()
    go person2VII()

    for {
        time.Sleep(time.Second)
    }
}

并发技术3:管道通信

channel 介绍

channel 提供了一种通信机制,通过它,一个 goroutine 可以想另一 goroutine 发送消息。channel 本身还需关联了一个类型,也就是 channel 可以发送数据的类型。例如: 发送 int 类型消息的 channel 写作 chan int 。

channel 创建

channel 使用内置的 make 函数创建,下面声明了一个 chan int 类型的 channel:

ch := make(chan int)

c和 map 类似,make 创建了一个底层数据结构的引用,当赋值或参数传递时,只是拷贝了一个 channel 引用,指向相同的 channel 对象。和其他引用类型一样,channel 的空值为 nil 。使用 == 可以对类型相同的 channel 进行比较,只有指向相同对象或同为 nil 时,才返回 true

channel 的读写操作

ch := make(chan int)

// write to channel
ch <- x

// read from channel
x <- ch

// another way to read
x = <- chnnel 一定要初始化后才能进行读写操作,否则会永久阻塞。

channel 一定要初始化后才能进行读写操作,否则会永久阻塞。

关闭 channel

golang 提供了内置的 close 函数对 channel 进行关闭操作。

ch := make(chan int)
close(ch)

有关 channel 的关闭,你需要注意以下事项:

  1. 关闭一个未初始化(nil) 的 channel 会产生 panic
  2. 重复关闭同一个 channel 会产生 panic
  3. 向一个已关闭的 channel 中发送消息会产生 panic
  4. 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel中还未被读取的消息,若消息均已读出,则会读到类型的零值。从一个已关闭的 channel 中读取消息永远不会阻塞,并且会返回一个为
  5. false 的 ok-idiom,可以用它来判断 channel 是否关闭
  6. 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息
ch := make(chan int, 10)
ch <- 11
ch <- 12

close(ch)

for x := range ch {
    fmt.Println(x)
}

x, ok := <- ch
fmt.Println(x, ok)


-----
output:

11
12
0 false

channel 的类型

channel 分为不带缓存的 channel 和带缓存的 channel。

无缓存的 channel

从无缓存的 channel 中读取消息会阻塞,直到有 goroutine 向该 channel 中发送消息;同理,向无缓存的 channel 中发送消息也会阻塞,直到有 goroutine 从 channel 中读取消息。

通过无缓存的 channel 进行通信时,接收者收到数据 happens before 发送者 goroutine 唤醒

有缓存的 channel

有缓存的 channel 的声明方式为指定 make 函数的第二个参数,该参数为 channel 缓存的容量

ch := make(chan int, 10)

有缓存的 channel 类似一个阻塞队列(采用环形数组实现)。当缓存未满时,向 channel 中发送消息时不会阻塞,当缓存满时,发送操作将被阻塞,直到有其他 goroutine 从中读取消息;相应的,当 channel 中消息不为空时,读取消息不会出现阻塞,当 channel 为空时,读取操作会造成阻塞,直到有 goroutine 向 channel 中写入消息。

ch := make(chan int, 3)

// blocked, read from empty buffered channel
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// blocked, send to full buffered channel
ch <- 4

通过 len 函数可以获得 chan 中的元素个数,通过 cap 函数可以得到 channel 的缓存长度。

实例

通过channel实现同步

导入依赖

import (
    "fmt"
    "time"
)
//语法点①:创建int类型的无缓存管道
//var ch = make(chan int)
var ch = make(chan int,0)

func Printer(str string) {
    for _, data := range str {
        fmt.Printf("%c", data)
        time.Sleep(time.Second)
    }
    fmt.Printf("n")
}

func person1() {
    //打印完需要7秒钟
    //劳资不打印完是不会往管道中塞数据的,阻塞不死你丫的
    Printer("今生注定我爱你")

    //箭头指向管道内部,写数据
    //在打完今生注定我爱你(耗时7秒钟)后,才写入数据
    //语法点②:向管道里写数据,无论读写,箭头只能朝左
    //语法点⑤:如果管道缓存已满,则阻塞等待至有人取出数据腾出空间,再写入
    ch <- 666
}

func person2() {
    //箭头指向管道外面,代表从管道中拿出数据,读数据

    //语法点③:从管理取出数据,但不不接收
    //语法点⑥:管道里没数据时,阻塞死等
    <-ch

    //语法点④:从管理取出数据,且使用data变量接收
    //data:=<-ch
    //fmt.Println("读出数据:",data)

    //终于妈的可以打印了
    Printer("FUCKOFF")
}

func main() {

    go person1()
    go person2()

    //主协程赖着不死
    for {
        time.Sleep(time.Second)
    }
}

通过channel实现同步和数据交互

package main

import (
    "fmt"
    "time"
)

func main() {
    //创建无缓存管道
    ch := make(chan string)

    //5、主协程结束
    defer fmt.Println("主协程也结束")

    //子协程负责写数据
    go func() {
        //3、结束任务
        defer fmt.Println("子协程调用完毕")

        //1、缓缓打印2次序号
        for i := 0; i < 2; i++ {
            fmt.Println("子协程 i= ", i)
            time.Sleep(time.Second)
        }

        //2、向管道发送数据
        ch <- "我是子协程,工作完毕"
    }()

    //4、阻塞接收
    str := <-ch
    fmt.Println("str = ", str)
}

无缓冲的channel

package main

import (
    "fmt"
    "time"
)

func main() {
    //创建一个无缓冲的管道
    ch := make(chan int, 1)

    //长度0,缓存能力0
    fmt.Printf("len(ch) = %d, cap(ch)=%dn", len(ch), cap(ch))

    go func() {
        //向管道中存入0,被阻塞,存入1,被阻塞,存入2
        for i := 0; i < 3; i++ {
            fmt.Println("子协程: i = ", i)
            ch <- i

            fmt.Println("5秒以内被打印出来给杰神100万!")
        }
    }()

    //睡眠2秒
    time.Sleep(5 * time.Second)

    //读取0,被阻塞,读取1,被阻塞,读取2
    for i := 0; i < 3; i++ {
        num := <-ch
        fmt.Println("num = ", num)
    }

}

有缓存的channel

package main

import (
    "fmt"
    "time"
)

func main() {
    //创建3缓存的管道
    ch := make(chan int, 3)
    //长度0,缓存能力3(即使没人读,也能写入3个值)
    fmt.Printf("len(ch) = %d, cap(ch) = %dn", len(ch), cap(ch))

    //一次性存入3个:012,3456789
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
            fmt.Printf("子协程存入[%d]: len(ch) = %d, cap(ch) = %dn", i, len(ch), cap(ch))
            //time.Sleep(1 * time.Second)
        }
    }()

    //time.Sleep(5 * time.Second)

    //一次性读取3个:012,345,678,9
    for i := 0; i < 10; i++ {
        num := <-ch
        fmt.Println("num = ", num)
    }
    time.Sleep(1*time.Nanosecond)
}

并发技术4:同步调度

等待组

import (
    "time"
    "fmt"
    "sync"
)

//主协程等待子协程全部结束:通过管道阻塞
func main0() {
    chanRets := make(chan int, 3)
    fmt.Println(len(chanRets),cap(chanRets))
    for i := 0; i < 3; i++ {
        go func(index int) {
            ret := getFibonacci(index)
            chanRets <- ret
            fmt.Println(index,ret)
        }(i)
    }

    for{
        if len(chanRets)==3{
            time.Sleep(time.Nanosecond)
            break
        }
    }
}

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 3; i++ {
        //等待组中协程数+1(主协程中)
        wg.Add(1)

        go func(index int) {
            ret := getFibonacci(index)
            fmt.Println(index,ret)
            //等待组中协程数-1(子协程中)
            wg.Done()
        }(i)
    }

    //阻塞至等待组中的协程数为0
    wg.Wait()

}

func getFibonacci(n int) int {
    x, y := 1, 1
    for i := 0; i < n; i++ {
        x, y = y, x+y
    }
    <-time.After(3 * time.Second)
    return x
}

互斥锁案例1

package main

import (
    "fmt"
    "time"
    "sync"
)

func main() {

    //必须保证并发安全的数据
    type Account struct {
        money float32
    }

    var wg sync.WaitGroup
    account := Account{1000}
    fmt.Println(account)

    //资源互斥锁(谁抢到锁,谁先访问资源,其他人阻塞等待)
    //全局就这么一把锁,谁先抢到谁操作,其他人被阻塞直到锁释放
    var mt sync.Mutex

    //银行卡取钱
    wg.Add(1)
    go func() {
        //拿到互斥锁
        mt.Lock()

        //加锁的访问
        fmt.Println("取钱前:",account.money)
        account.money -= 500
        time.Sleep(time.Nanosecond)
        fmt.Println("取钱后:",account.money)
        wg.Done()

        //释放互斥锁
        mt.Unlock()
    }()

    //存折存钱
    wg.Add(1)
    go func() {
        //拿到互斥锁(如果别人先抢到,则阻塞等待)
        mt.Lock()

        fmt.Println("存钱前:",account.money)
        account.money += 500
        time.Sleep(time.Nanosecond)
        fmt.Println("存钱后:",account.money)
        wg.Done()

        //释放互斥锁
        mt.Unlock()
    }()

    wg.Wait()
}

互斥锁案例2

package main

import (
    "sync"
    "fmt"
    "time"
)

//必须保证并发安全的数据
type Account struct {
    name  string
    money float32

    //定义该数据的互斥锁
    mt    sync.Mutex
}

//本方法不能被并发执行——并发安全的
func (a *Account) saveGet(amount float32) {
    //先将资源锁起来
    a.mt.Lock()

    //执行操作
    fmt.Println("操作前:", a.money)
    a.money += amount
    fmt.Println("操作后:", a.money)
    <-time.After(3 * time.Second)

    //释放资源
    a.mt.Unlock()
}

//本方法可以被并发执行——不是并发安全的,无此必要
func (a *Account) getName() string {
    return a.name
}

func main() {
    a := Account{name: "张全蛋", money: 1000}

    var wg sync.WaitGroup

    wg.Add(1)
    go func() {
        //调用一个加锁的方法(同步)
        a.saveGet(500)
        wg.Done()
    }()

    wg.Add(1)
    go func() {
        //调用一个加锁的方法(同步)
        a.saveGet(-500)
        wg.Done()
    }()

    for i:=0;i<3 ;i++  {
        wg.Add(1)
        go func() {
            //调用一个普通的没有访问锁的方法(异步)
            fmt.Println(a.getName())
            wg.Done()
        }()
    }

    wg.Wait()
}

通过信号量控制并发数

package main

import (
    "fmt"
    "time"
    "sync"
)

/*信号量:通过控制管道的“带宽”(缓存能力)控制并发数*/

func main() {

    //定义信号量为5“带宽”的管道
    sema = make(chan int, 5)

    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(index int) {
            ret := getPingfangshu(index)
            fmt.Println(index, ret)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

//该函数只允许5并发执行
var sema chan int
func getPingfangshu(i int) int {
    sema <- 1
    <-time.After(2 * time.Second)
    <- sema
    return i
}

并发技术5:死锁问题

1. 同一个goroutine中,使用同一个 channel 读写

package main
func main(){
    ch:=make(chan int)  //这就是在main程里面发生的死锁情况
    ch<-6   //  这里会发生一直阻塞的情况,执行不到下面一句
    <-ch
}

这是最简单的死锁情况 
看运行结果 
这里写图片描述

2. 2个 以上的go程中, 使用同一个 channel 通信。 读写channel 先于 go程创建。

 package main

func main(){
    ch:=make(chan int)
    ch<-666    //这里一直阻塞,运行不到下面
    go func (){
        <-ch  //这里虽然创建了子go程用来读出数据,但是上面会一直阻塞运行不到下面
    }()
}

这里如果想不成为死锁那匿名函数go程就要放到ch<-666这条语句前面 
这里写图片描述

3. 2个以上的go程中,使用多个 channel 通信。 A go 程 获取channel 1 的同时,尝试使用channel 2, 同一时刻,B go 程 获取channel 2 的同时,尝试使用channel 1

package main
func main()  {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {    //匿名子go程
        for {
            select {    //这里互相等对方造成死锁
            case <-ch1:   //这里ch1有数据读出才会执行下一句
                ch2 <- 777
            }
        }
    }()
    for {         //主go程
        select {
        case <-ch2 : //这里ch2有数据读出才会执行下一句
            ch1 <- 999
        }
    }
}

第三种是互相等对方造成死锁 
4.在go语言中, channel 和 读写锁、互斥锁 尽量避免交叉混用。——“隐形死锁”。如果必须使用。推荐借助“条件变量”

package main

import (
    "runtime"
    "math/rand"
    "time"
    "fmt"
    "sync"
)
// 使用读写锁
var rwMutex2 sync.RWMutex

func readGo2(idx int, in <-chan int)  {     // 读go程
    for {
        time.Sleep(time.Millisecond * 500)      // 放大实验现象// 一个go程可以读 无限 次。
        rwMutex2.RLock()    // 读模式加  读写锁
        num := <-in         // 从 公共的 channel 中获取数据
        fmt.Printf("%dth 读 go程,读到:%dn", idx, num)
        rwMutex2.RUnlock()  // 解锁 读写锁
    }
}

func writeGo2(idx int, out chan<- int)  {
    for {                                   // 一个go程可以写 无限 次。
        // 生产一个随机数
        num := rand.Intn(500)
        rwMutex2.Lock()     // 写模式加  读写锁
        out <- num
        fmt.Printf("-----%dth 写 go程,写入:%dn", idx, num)
        rwMutex2.Unlock()   // 解锁  读写锁

        //time.Sleep(time.Millisecond * 200)        // 放大实验现象
    }
}

func main()  {
    // 播种随机数种子。
    rand.Seed(time.Now().UnixNano())

    // 创建 模拟公共区的 channel
    ch := make(chan int, 5)

    for i:=0; i<5; i++ {        // 同时创建 N 个 读go程
            go readGo2(i+1, ch)
    }
    for i:=0; i<5; i++ {        // 同时创建 N 个 写go程
        go writeGo2(i+1, ch)
    }
    for {                       // 防止 主 go 程 退出
        runtime.GC()
    }
}

这是一种隐形的死锁,我们来看一下结果: 
这里写图片描述

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/super_lixiang/article/details/82708252
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢