遇到一个非常离奇的事情,我使用读写锁尝试了一下,发现出现了死锁,我本来以为是死锁出现问题,后来原来我犯了一个超级低级的错误。
package main | |
import ( | |
"fmt" | |
"sync" | |
) | |
var wg sync.WaitGroup | |
var rwLock sync.RWMutex | |
func main() { | |
wg.Add(20) | |
Data := 100 | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func(i int) { | |
// time.Sleep(time.Duration(i+1)*time.Second) | |
fmt.Println("wait Read") | |
rwLock.RLock() | |
fmt.Println("Reading") | |
fmt.Printf("Data: %v\n", Data) | |
// time.Sleep(time.Second) | |
defer func() { | |
fmt.Println("UnLock Read") | |
rwLock.RUnlock() | |
wg.Done() | |
}() | |
}(i) | |
//wg.Add(1) | |
go func(i int) { | |
// time.Sleep(time.Duration(i+10)*time.Second) | |
fmt.Println("wait write") | |
rwLock.Lock() | |
fmt.Println("writing") | |
Data += i + 1 | |
// time.Sleep(time.Second * 5) | |
defer func() { | |
fmt.Println("writed over") | |
rwLock.Unlock() | |
wg.Done() | |
}() | |
}(i) | |
wg.wait() | |
} | |
//wg.Wait() | |
} |
这个错误低级的可以,不是死锁出现了问题,而是 waitgroup 出现了问题,waitgroup 放在了 for 循环之内,导致其永远得不到释放。
关于读写锁,这里有一篇文章 go 的读写锁
# once 只执行一次
package main | |
import ( | |
"fmt" | |
"sync" | |
) | |
var once sync.Once | |
func main() { | |
for i := 0; i < 10; i++ { | |
fmt.Printf("i: %v\n", i) | |
once.Do(ones) | |
} | |
once.Do(second) | |
} | |
func ones() { | |
fmt.Println("ones") | |
} | |
func second() { | |
fmt.Println("second") | |
} |
# go 通道
# 定义
channel := make(chan int)
channel := make(chna int, 2)
可以定义有缓存的通道,这样就可以实现一步通信
通道主要维护了一个锁和两个队列, 发送队列
和 接收队列
,如果通道有缓冲区,那么通道还会维护维护一个缓冲队列。如果缓冲队列为空,那么发送队列一定为空,如果缓冲队列未空,那么接受队列一定为空。任意时刻,接受队列和缓冲队列一定有一个为空。
# 通道的遍历
可以看这篇文章:
channel 遍历
# select 来读取通道
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
var chanInt = make(chan int) | |
var chanStr = make(chan string) | |
func main() { | |
go func() { | |
chanInt <- 100 | |
chanStr <- "hello" | |
close(chanInt) | |
close(chanStr) | |
}() | |
for { | |
select { | |
case r := <-chanInt: | |
fmt.Printf("chanInt: %v\n", r) | |
case r := <-chanStr: | |
fmt.Printf("chanStr: %v\n", r) | |
default: | |
fmt.Println("default...") | |
} | |
time.Sleep(time.Second) | |
} | |
} |
注意通道关闭后仍然可读,并且读到的是默认值。
通道关闭后还可读,但通道关闭后不能写,不能 reopen 只能重新再开一个通道,并且关闭后的通道不能再关闭,否则会报 panic。
# timer 定时器
func main() { | |
timer := time.NewTimer(time.Second * 2) | |
fmt.Printf("time.Now(): %v\n", time.Now()) | |
t1 := <-timer.C | |
fmt.Printf("t1: %v\n", t1) | |
t2 := time.After(time.Second*2) | |
fmt.Printf("t2: %v\n", <-t2) | |
} |
<- timer.C
才会触发定时器,返回当前的时间
time.After
会直接 wait,并且返回一个 channel,要得到时间只需要 <-t2
注意:定时器只有使用时才会触发等待,而 After
立即触发等待。
# sync.Cond
这个玩意有点难理解,我想了挺久才想明白这个干嘛用的。。。。。
Cond
主要是为了解决这样一个场景 —— 一个进程通知多个进程。只有当某一个协程完成后,其他协程才能继续执行,这就需要 1 对 N 的消息通知,而 Cond 就是做这样的工作的。
Cond
初始化需要关联一个锁,这个锁用于防止消息竞争,比如对于下面这个代码中,通过加锁保证了只有一个协程可以写数据,同时也保证了只有一个协程可以读数据或校验数据,锁也可以换成读写锁。
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"sync" | |
"time" | |
) | |
var wg sync.WaitGroup | |
func main() { | |
rand.Seed(time.Now().UnixNano()) | |
const N = 10 | |
var values [N]string | |
cond := sync.NewCond(&sync.Mutex{}) | |
for i := 0; i < N; i++ { | |
d := time.Second * time.Duration(rand.Intn(10)) / 10 | |
go func(i int) { | |
time.Sleep(d) // 模拟一个工作负载 | |
cond.L.Lock() | |
// 下面的修改必须在 cond.L 被锁定的时候执行 | |
values[i] = string('a' + i) | |
cond.Broadcast() // 可以在 cond.L 被解锁后发出通知 | |
cond.L.Unlock() | |
// 上面的通知也可以在 cond.L 未锁定的时候发出。 | |
//cond.Broadcast () // 上面的调用也可以放在这里 | |
}(i) | |
} | |
// 此函数必须在 cond.L 被锁定的时候调用。 | |
checkCondition := func() bool { | |
for i := 0; i < N; i++ { | |
if values[i] == "" { | |
return false | |
} | |
} | |
return true | |
} | |
f:= func (i int) { | |
defer wg.Done() | |
defer cond.L.Unlock() | |
cond.L.Lock() | |
for !checkCondition(){ // 这里要是 for 循环,因为只有满足条件才能执行下面的代码 | |
cond.Wait() | |
} | |
fmt.Println(i) | |
} | |
wg.Add(3) | |
go f(1) | |
go f(2) | |
go f(3) | |
wg.Wait() | |
} |
这几行代码实现的功能就是,values 被全部填充完毕后,其他协程才能执行。 checkcondition
的作用就是条件检查。
注意 wait
只有在加锁的条件下才可以使用,当 wait
执行时,会先 Unlock
然后阻塞自身,并塞到等待队列里。用 signal
会从等待队列中唤醒第一个协程,用 Broadcast
会唤醒等待队列中的所有协程。唤醒操作不需要加锁。