
在Golang中实现一个简单的任务队列,核心思路是利用goroutine和channel来实现并发任务的提交与执行。这种方式轻量、高效,适合处理异步任务,比如发送邮件、处理上传、定时任务等。
使用Channel和Goroutine构建基础任务队列
Go的channel天然适合做任务队列。定义一个任务函数类型,用channel接收任务,多个worker并发处理。
示例代码:
package mainimport ( "fmt" "time" )
// 任务函数类型 type Task func()
// 创建任务队列 func NewTaskQueue(workerCount int) chan<- Task { taskChan := make(chan Task, 100) // 带缓冲的channel
// 启动worker for i := 0; i < workerCount; i++ { go func() { for task := range taskChan { task() // 执行任务 } }() } return taskChan}
立即学习“go语言免费学习笔记(深入)”;
func main() { queue := NewTaskQueue(3) // 3个worker
// 提交任务 for i := 1; i <= 5; i++ { i := i task := func() { fmt.Printf("处理任务 %d\n", i) time.Sleep(time.Second) } queue <- task } // 等待一段时间让任务执行完 time.Sleep(6 * time.Second)}
立即学习“go语言免费学习笔记(深入)”;
添加任务完成通知和优雅关闭
实际使用中,可能需要知道任务何时完成,或在程序退出时停止队列。
可以通过关闭channel并使用WaitGroup来管理生命周期。
func NewTaskQueueWithClose(workerCount int) (chan<- Task, func()) {
taskChan := make(chan Task, 100)
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range taskChan {
task()
}
}()
}
// 返回关闭函数
closeFunc := func() {
close(taskChan)
wg.Wait() // 等待所有worker完成
}
return taskChan, closeFunc}
立即学习“go语言免费学习笔记(深入)”;
调用close函数后,所有worker会在处理完剩余任务后退出,确保不丢失任务。
支持优先级或延迟任务的扩展思路
如果需要更复杂的功能,比如优先级或延迟执行,可以引入heap或time.Timer。
简单延迟任务示例:
func DelayTask(queue chan<- Task, delay time.Duration, task Task) {
time.AfterFunc(delay, func() {
queue <- task
})
}
这样可以在指定延迟后自动提交任务。
基本上就这些。用channel + goroutine就能快速搭建一个可靠的任务队列,不复杂但容易忽略关闭和缓冲设置。根据实际需求调整worker数量和channel容量即可。










