Go 并发入门:从 goroutine、channel 到工作池

发布时间:2026/7/1 11:31:14
Go 并发入门:从 goroutine、channel 到工作池 刚开始学 Go 的并发时最容易产生一种错觉只要在函数前面加一个 go程序就会变快。这句话只对了一小半。go关键字确实可以启动一个 goroutine让一段代码和当前代码“同时推进”。但真正写出可靠的并发程序还要回答几个问题主程序怎么知道 goroutine 已经结束goroutine 之间怎么传递数据多个 goroutine 同时改同一个变量会发生什么任务很多时能不能限制同时工作的 goroutine 数量这篇文章从新手视角讲清楚这些问题。我们会先理解几个重要名词再逐步写代码最后实现一个完整的工作池。这篇文章适合谁如果你已经会写最基本的 Go 程序例如package main import fmt func main() { fmt.Println(hello, go) }并且知道func、for、struct、import的基本用法那就可以开始学习并发了。先分清并发和并行很多新手会把“并发”和“并行”混在一起。并发并发指的是程序能够同时处理多个任务。注意这里的“同时处理”不一定表示同一瞬间真的有多个任务在 CPU 上运行。它更强调任务的组织方式程序可以在多个任务之间切换、等待、调度。举个生活例子你一边烧水一边切菜一边等电饭锅煮饭。你不是同时拥有三双手而是在多个任务之间安排时间。并行并行指的是多个任务在同一时刻真的一起执行。例如你的电脑有多个 CPU 核心两个任务分别跑在不同核心上这就是并行。Go 里的关系Go 让你很容易写并发程序。至于它是否并行运行要看运行环境、CPU 核心数、调度器和GOMAXPROCS等因素。对新手来说先记住一句话并发是一种程序结构并行是一种运行状态。写 Go 并发程序时我们最先关心的是怎么把任务拆开怎么让任务协作怎么安全地拿到结果。重要名词速查先把几个词放在这里后面看到代码会轻松很多。goroutinegoroutine 是 Go 里的轻量级并发执行单元。你可以把它粗略理解成“由 Go 运行时管理的轻量任务”。启动一个 goroutine 很简单go doSomething()这表示启动一个新的 goroutine 去执行doSomething()当前函数继续往下走。channelchannel 是 goroutine 之间传递数据的管道。例如ch : make(chan string)这行代码创建了一个传递string的 channel。发送数据ch - hello接收数据msg : -chWaitGroupsync.WaitGroup用来等待一组 goroutine 结束。常见流程是Add登记有几个任务 Done某个任务完成 Wait等待所有任务完成Mutexsync.Mutex是互斥锁。当多个 goroutine 要同时访问同一份共享数据时Mutex 可以保证同一时刻只有一个 goroutine 进入关键区域。race conditionrace condition 通常翻译成竞态条件。如果多个 goroutine 同时读写同一个变量而且没有使用 channel、Mutex 等同步手段程序结果就可能变得不确定。这类问题很难靠肉眼发现所以 Go 提供了 race detector可以用-race检查。第一个 goroutine先看一个最小例子package main import ( fmt time ) func sayHello() { fmt.Println(hello from goroutine) } func main() { go sayHello() fmt.Println(hello from main) // 暂停一小会儿给 goroutine 运行的机会。 // 注意真实项目里不应该用 Sleep 等待 goroutine 结束。 time.Sleep(100 * time.Millisecond) }运行后你可能看到hello from main hello from goroutine也可能顺序反过来。这是第一个要适应的地方并发程序里的执行顺序不一定固定。为什么要 Sleep如果去掉time.Sleepfunc main() { go sayHello() fmt.Println(hello from main) }程序可能只打印hello from main原因是main函数结束时整个程序就结束了。新启动的 goroutine 还没来得及执行程序可能已经退出。但是Sleep不是好办法。它只是“猜一个等待时间”。正确方式是用sync.WaitGroup。用 WaitGroup 等 goroutine 结束下面把上面的例子改成可靠版本package main import ( fmt sync ) func sayHello(wg *sync.WaitGroup) { // defer 表示函数结束前执行。 // 无论函数中间怎么返回Done 都会被调用。 defer wg.Done() fmt.Println(hello from goroutine) } func main() { var wg sync.WaitGroup // Add(1) 表示接下来有 1 个任务需要等待。 wg.Add(1) // 把 WaitGroup 指针传给 goroutine。 go sayHello(wg) fmt.Println(hello from main) // Wait 会阻塞直到所有登记的任务都调用 Done。 wg.Wait() }这里有三个细节很重要wg.Add(1)要在启动 goroutine 之前调用。goroutine 里要调用wg.Done()。WaitGroup通常传指针也就是*sync.WaitGroup。如果你把WaitGroup复制给 goroutine容易出现等待状态不一致的问题。一次启动多个 goroutine我们再写一个稍微真实一点的例子同时处理 5 个任务。package main import ( fmt sync time ) func handleTask(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(task %d started\n, id) // 模拟耗时操作比如请求接口、读文件、计算数据。 time.Sleep(500 * time.Millisecond) fmt.Printf(task %d finished\n, id) } func main() { var wg sync.WaitGroup for i : 1; i 5; i { wg.Add(1) go handleTask(i, wg) } wg.Wait() fmt.Println(all tasks done) }这段代码里5 个任务会并发执行。总耗时大约接近 500ms而不是 5 * 500ms。新手易错点循环变量在并发代码里循环变量很容易被写错。下面这个写法在旧代码中很常见for i : 1; i 5; i { wg.Add(1) go func() { defer wg.Done() fmt.Println(i) }() }为了让代码意图更清楚可以显式把i作为参数传进去for i : 1; i 5; i { wg.Add(1) go func(id int) { defer wg.Done() fmt.Println(id) }(i) }这个写法的意思是每次循环都把当前的i复制一份传给 goroutine 里的id。channel让 goroutine 传递数据WaitGroup可以等待任务结束但它不负责传递结果。如果 goroutine 算出了一个值想交给 main该怎么办这时就可以用 channel。package main import fmt func sendMessage(ch chan string) { // 把字符串发送到 channel。 ch - hello from goroutine } func main() { // 创建一个传递 string 的 channel。 ch : make(chan string) go sendMessage(ch) // 从 channel 接收数据。 msg : -ch fmt.Println(msg) }这里的执行过程是main 创建 channel main 启动 goroutine goroutine 往 channel 发送字符串 main 从 channel 接收字符串 main 打印结果无缓冲 channel上面的make(chan string)创建的是无缓冲 channel。无缓冲 channel 的特点是发送方和接收方必须同时准备好数据才能交接。如果只有发送没有接收发送方会阻塞。如果只有接收没有发送接收方会阻塞。这像两个人当面递东西一个人伸手递另一个人也要伸手接。带缓冲 channel带缓冲 channel 可以临时存放一些数据。package main import fmt func main() { // 创建一个容量为 2 的 channel。 ch : make(chan string, 2) // 因为 channel 有 2 个缓冲位所以这两次发送不会阻塞。 ch - first ch - second fmt.Println(-ch) fmt.Println(-ch) }它像一个容量有限的队列。但是不要把缓冲当成“万能加速器”。缓冲只是改变阻塞时机不会自动解决设计问题。close 和 range告诉接收方没有更多数据了如果发送方会发送多个值接收方怎么知道什么时候结束可以用close关闭 channel。package main import fmt func producer(ch chan int) { for i : 1; i 3; i { ch - i } // 关闭 channel表示不会再发送新数据。 close(ch) } func main() { ch : make(chan int) go producer(ch) // range 会一直接收直到 channel 被关闭。 for value : range ch { fmt.Println(value) } fmt.Println(done) }关于 close 的规则新手可以先记住三条通常由发送方关闭 channel。不要在接收方关闭 channel。不要重复关闭同一个 channel否则会 panic。为什么通常由发送方关闭因为只有发送方最清楚“以后还会不会继续发送数据”。单向 channel让函数职责更清楚Go 支持把 channel 参数声明成“只能发送”或“只能接收”。func producer(ch chan- int) { // chan- int 表示这个函数只能发送 int。 ch - 1 } func consumer(ch -chan int) { // -chan int 表示这个函数只能接收 int。 value : -ch fmt.Println(value) }这样做的好处是函数签名直接表达了职责。在多人协作或项目变大以后这种写法能减少误用。select同时等待多个 channelselect有点像 channel 专用的switch。它可以同时等待多个 channel 操作哪个先准备好就执行哪个分支。package main import ( fmt time ) func main() { ch : make(chan string) go func() { time.Sleep(2 * time.Second) ch - result }() select { case msg : -ch: fmt.Println(got:, msg) case -time.After(1 * time.Second): fmt.Println(timeout) } }这段代码会打印timeout因为 goroutine 要 2 秒才发送结果而time.After(1 * time.Second)1 秒后就会触发。select 常见用途select常用于设置超时监听取消信号同时处理多个 channel避免 goroutine 永远阻塞共享变量和竞态条件很多新手第一次写并发计数器会这样写package main import ( fmt sync ) func main() { var wg sync.WaitGroup count : 0 for i : 0; i 1000; i { wg.Add(1) go func() { defer wg.Done() // 这行不是安全的。 // count 本质上包含读取、加一、写回三个步骤。 count }() } wg.Wait() fmt.Println(count) }你可能期望结果永远是1000但实际上不一定。因为count不是一个不可拆分的动作。它大致可以理解成读取 count 计算 count 1 写回 count如果两个 goroutine 同时读到count 10它们都算出11再都写回11就丢了一次加法。这就是竞态条件。用 Mutex 保护共享变量如果多个 goroutine 必须共享一个变量就要用同步手段保护它。下面用sync.Mutex改写计数器package main import ( fmt sync ) func main() { var wg sync.WaitGroup var mu sync.Mutex count : 0 for i : 0; i 1000; i { wg.Add(1) go func() { defer wg.Done() // 加锁同一时刻只允许一个 goroutine 进入下面的区域。 mu.Lock() count mu.Unlock() }() } wg.Wait() fmt.Println(count) }被锁保护的区域通常叫关键区域。更稳妥的写法是用defer解锁mu.Lock() defer mu.Unlock() count在很小的代码块里直接Unlock也可以但一旦中间逻辑变复杂defer更不容易漏掉。channel 和 Mutex 怎么选新手常问我应该用 channel还是用 Mutex可以先用这个简单判断如果重点是传递数据和组织任务用 channel。 如果重点是保护共享状态用 Mutex。例如多个 worker 从任务队列里拿任务适合 channel。多个 goroutine 更新同一个 map适合 Mutex。一个 goroutine 生产数据另一个 goroutine 消费数据适合 channel。统计全局计数器适合 Mutex 或 atomic。不要为了“看起来更 Go”而强行用 channel。写并发程序的目标是清楚、正确、可维护。用 race detector 检查数据竞争Go 提供了 race detector。如果你的代码在main.go里可以这样运行go run -race main.go如果是测试go test -race ./...它不能证明程序绝对没有并发问题但能帮你发现很多真实的数据竞争。建议你养成习惯并发代码写完后跑一遍-race。工作池为什么需要 worker pool现在进入最后一个例子工作池。假设你有 10000 个任务要处理例如给 10000 个用户发通知处理 10000 张图片请求 10000 个 URL计算 10000 条数据一种直接写法是每个任务启动一个 goroutine。for _, job : range jobs { go handle(job) }这在任务少时没问题但任务很多时可能造成goroutine 数量过多内存压力变大数据库、接口、文件系统被打爆错误处理和退出控制变复杂工作池的核心思想是任务可以很多但同时工作的 worker 数量固定。比如有 10000 个任务但只启动 5 个 worker。每个 worker 不断从任务 channel 里取任务处理完一个再取下一个。工作池结构图可以把工作池想象成这样jobs channel main ----------------- worker 1 ----------------- worker 2 ----------------- worker 3 worker 1 --------------\ worker 2 --------------- results channel --- main 收集结果 worker 3 --------------/里面有三个角色main创建任务、启动 worker、收集结果。jobs任务队列。results结果队列。完整工作池例子下面是完整可运行代码package main import ( fmt sync time ) // Job 表示一个任务。 // 这里为了方便演示让任务内容变成“计算某个数字的平方”。 type Job struct { ID int Number int } // Result 表示任务处理结果。 type Result struct { JobID int Number int Square int } // worker 是真正干活的函数。 // // 参数说明 // id worker 编号方便观察是哪一个 worker 在处理任务。 // jobs 只接收的 channelworker 从这里拿任务。 // results 只发送的 channelworker 把结果发到这里。 // wg 通知 main这个 worker 什么时候结束。 func worker(id int, jobs -chan Job, results chan- Result, wg *sync.WaitGroup) { defer wg.Done() // range jobs 会不断从 jobs channel 接收任务。 // 当 jobs 被关闭并且里面的数据被取完后循环自动结束。 for job : range jobs { fmt.Printf(worker %d started job %d\n, id, job.ID) // 模拟耗时操作。 time.Sleep(300 * time.Millisecond) result : Result{ JobID: job.ID, Number: job.Number, Square: job.Number * job.Number, } // 把处理结果发送给 results channel。 results - result fmt.Printf(worker %d finished job %d\n, id, job.ID) } } func main() { const workerCount 3 const jobCount 8 jobs : make(chan Job) results : make(chan Result) var wg sync.WaitGroup // 1. 启动固定数量的 worker。 for i : 1; i workerCount; i { wg.Add(1) go worker(i, jobs, results, wg) } // 2. 单独启动一个 goroutine 发送任务。 go func() { for i : 1; i jobCount; i { jobs - Job{ ID: i, Number: i, } } // 所有任务都发送完后关闭 jobs。 // worker 收到关闭信号后会在取完剩余任务后退出。 close(jobs) }() // 3. 单独启动一个 goroutine 等待所有 worker 结束。 go func() { wg.Wait() // 所有 worker 都结束了说明不会再产生新结果。 // 这时可以关闭 results让 main 的 range 循环结束。 close(results) }() // 4. main 收集所有结果。 for result : range results { fmt.Printf( result: job%d number%d square%d\n, result.JobID, result.Number, result.Square, ) } fmt.Println(all jobs done) }可能输出如下worker 1 started job 1 worker 2 started job 2 worker 3 started job 3 worker 2 finished job 2 worker 2 started job 4 result: job2 number2 square4 worker 1 finished job 1 worker 1 started job 5 result: job1 number1 square1 ... all jobs done每次运行的顺序可能不同这是正常的。并发程序通常不应该依赖日志顺序判断正确性。工作池代码逐段讲解为什么 worker 数量固定const workerCount 3这表示最多同时有 3 个 worker 处理任务。即使有 8 个任务也不会同时启动 8 个 worker。任务会排队哪个 worker 空闲了就从jobs里取下一个。为什么 jobs 由发送方关闭close(jobs)jobs是 main 负责发送的所以 main 最清楚任务什么时候发送完。关闭jobs后worker 里的这段循环会自然结束for job : range jobs { // 处理任务 }为什么 results 不能马上关闭很多新手会想close(jobs) close(results)这通常是错的。因为jobs关闭只表示“不再发送新任务”不表示 worker 已经处理完已有任务。worker 可能还在计算并准备往results发送结果。如果你过早关闭resultsworker 再发送数据就会 panic。所以正确顺序是关闭 jobs 等待 worker 全部结束 关闭 results这就是为什么代码里有go func() { wg.Wait() close(results) }()为什么 results 用 range 接收for result : range results { fmt.Println(result) }只要results没关闭range就会继续等结果。当所有 worker 结束results被关闭后循环自然结束。这比手动数“收到了几个结果”更适合很多真实场景。新手常见问题goroutine 越多越好吗不是。goroutine 很轻量但不是没有成本。启动过多 goroutine 会带来内存、调度、外部资源压力。如果任务数量很多优先考虑工作池、限流、批处理。channel 需要每次都 close 吗不是。只有当接收方需要通过关闭信号判断“不会再有数据”时才需要 close。如果只是发送一个值、接收一个值不一定要 close。close channel 是不是会清空数据不是。关闭 channel 后里面已经发送但还没接收的数据仍然可以被接收。range ch会把剩余数据取完然后结束。WaitGroup 能不能重复使用可以但要非常小心。新手阶段建议一个 WaitGroup 对应一批任务不要在上一批Wait还没结束时又开始乱加任务。Mutex 会不会让并发变慢锁会让部分代码串行执行但它换来的是正确性。错误的并发比慢一点的并发更危险。如果锁竞争很严重再考虑调整数据结构、减少共享状态、改用 channel 或分片锁等方案。学 Go 并发的建议路线你可以按这个顺序练习用go启动一个 goroutine。用WaitGroup等多个 goroutine 结束。用 channel 从 goroutine 拿结果。用close和range处理多条数据。用select做超时控制。用Mutex保护共享变量。用go run -race检查数据竞争。写一个工作池限制并发数量。如果这 8 步都能写顺Go 并发的入门门槛基本就跨过去了。总结Go 的并发模型很适合写清楚的任务协作代码。你需要记住几件事goroutine用来并发执行任务。WaitGroup用来等待一组 goroutine 结束。channel用来在 goroutine 之间传递数据。close用来告诉接收方不会再有新数据。select用来同时等待多个 channel 操作。Mutex用来保护共享变量。-race可以帮助发现数据竞争。工作池可以限制并发数量让大量任务更可控。最后再强调一句并发程序首先要正确其次才是快。不要为了“看起来高级”而滥用 goroutine。能用简单同步写清楚就先写清楚等真的遇到吞吐量、延迟、资源限制问题再引入更复杂的并发结构。参考资料A Tour of Go: GoroutinesA Tour of Go: ChannelsGo sync package documentationThe Go Memory ModelData Race DetectorGo Concurrency Patterns: Pipelines and cancellation