开发者

Go高级特性探究之协程池详解

开发者 https://www.devze.com 2023-11-21 10:43 出处:网络 作者: tracy小猫
目录PoolNewpool 函数Submit 函数createWorker 函数incRunning、decRunning 函数Stop 函数解决函数传参问题优雅关闭协程池保证协程安全测试用例改进总结在并发编程中,协程是 GRqGeNmvGo 语言的核心特性之一,但是在
目录
  • Pool
    • Newpool 函数
    • Submit 函数
    • createWorker 函数
    • incRunning、decRunning 函数
    • Stop 函数
  • 解决函数传参问题
    • 优雅关闭协程池
      • 保证协程安全
        • 测试用例
          • 改进
            • 总结

              在并发编程中,协程是 GRqGeNmvGo 语言的核心特性之一,但是在实际应用中,协程的创建和销毁成本比较高。当需要同时处理大量的任务时,创建大量的协程会导致系统开销变大,进而影响程序的性能。这时候,就需要使用协程池来管理协程的生命周期,将协程的创建和销毁成本降至最小,提高程序的并发性能。

              本文将介绍如何使用 Go 协程池构造一个协程池,并解决函数传参问题、优雅关闭协程池和保证协程安全的问题。

              Pool

              type Pool struct {
                 capacity       uint64            // 最大协程数
                 runningWorkers uint64            // 当前正在运行的协程数
                 status         int64             // 协程池的状态
                 chtask         chan *Task        // 执行任务的 channel
                 PanicHandler   func(interface{}) // 处理协程中的 panic 异常
                 sync.Once
                 sync.Mutex
              }

              Pool 类型是协程池的主要类型,包含了以下属性:

              • capacity:最大协程数。
              • runningWorkers:当前正在运行的协程数。
              • status:协程池的状态。
              • chTask:执行任务的 channel。
              • PanicHandler:处理协程中的 panic 异常。
              • sync.Once:防止 Stop 函数被多次调用。
              • sync.Mutex: 用于锁定协程池的状态和 channel。

              同时 Pool 类型包含以下函数:

              • NewPool:用于初始化协程池。
              • Submit:将任务放到 channel 中供协程进行任务处理。
              • createWorker:用于创建并启动一个协程来执行任务。
              • incRunning:增加协程池的运行协程数。
              • decRunning:减少协程池的运行协程数。
              • Stop:关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。

              NewPool 函数

              NewPool 函数用于创建和初始化一个协程池。将最大协程数 n 和处理协程中 panic 异常的函数 panicHandler 传入函数中,创建一个 Pool 类型,并将属性初始化后返回一个 Pool 的指针类型。

              func NewPool(n uint64, panicHandler func(interface{})) *Pool {
                 return &Pool{
                    capacity:     n,
                    status:       Running,
                    chTask:       make(chan *Task, n),
                    PanicHandler: panicHandler,
                 }
              }

              Submit 函数

              Submit 函数用于将任务放到 channel 中供协程进行任务处理。首先判断协程池状态是否为 Stopped,如果已经关闭,则返回一个错误;接着加锁,并判断 channel 中是否已满,如果已经满了,则返回一个错误,否则将任务放到 channel 中并返回 nil。

              // 将任务放到 channel 中供协程进行任务处理
              func (p *Pool) Submit(t *Task) error {
                 if p.status == Stopped {
                    return errors.New("协程池已关闭,不能提交任务")
                 }
                 p.Lock()
                 defer p.Unlock()
                 if len(p.chTask) =编程= int(p.capacity) {
                    return errors.New("协程池已满,不能接受新任务")
                 }
                 p.chTask <- t
                 return nil
              }

              createWorker 函数

              createWorker 函数用于创建并启动一个协程来执行任务。首先增加当前运行的协程数,然后在一个 go 协程内执行任务。如果在执行任务的过程中出现 panic 异常,则调用 PanicHandler 处理函数,如果没有设置 PanicHandler 处理函数,则直接将异常信息打印出来。执行完任务后,减少当前运行的协程数。

              // 初始化协程池的协程数量
              func (p *Pool) createWorker() {
                 p.incRunning()
                 // 每一个协程获取一个任务,执行任务
                 go func() {
                    defer func() {
                       if r := recover(); r != nil js{
                          if p.PanicHandler != nil {
                             p.PanicHandler(r)
                          } else {
                             fmt.Println("Panic:", r)
                          }
                       }
                       p.decRunning()
                    }()
                    for {
                       select {
                       case t := <-p.chTask:
                          if t == nil {
                             return
                          }
                          t.Handler(t.Params...)
                       }
                    }
                 }()
              }

              incRunning、decRunning 函数

              incRunning、decRunning 函数用于增加和减少协程池的运行协程数,使用了 atomic.AddUint64 函数来保证操作的原子性。

              // 增加协程池的运行协程数
              func (p *Pool) incRunning() {
                 atomic.AddUint64(&p.runningWorkers, 1)
              }
              // 减少协程池的运行协程数
              func (p *Pool) decRunning() {
                 atomic.AddUint64(&p.runningWorkers, ^uint64(0))
              }

              Stop 函数

              Stop 函数用于关闭协程池,停止接受任务并且等待所有任务执行完毕后关闭协程池。首先判断协程池状态是否为 Running,如果已经关闭,则直接返回;接着将协程池状态设置为 Stopped,然后使用 sync.Once 确保关闭 channel 的操作仅被执行一次,同时创建运行的协程数个协程,等待它们执行完毕后关闭协程池。

              // 关闭协程池
              func (p *Pool) Stop() {
                 if p.status == Running {
                    p.status = Stopped
                    p.Once.Do(func() {
                       close(p.chTask)
                       for i := uint64(0); i <php p.runningWorkers; i++ {
                          p.createWorker()
                       }
                    })
                 }
              }

              解决函数传参问题

              在使用协程池时,需要向协程池提交任务,但是协程池内部的协程如何知道要执行什么样的任务,参数又应该如何传递呢?

              为了解决这个问题,可以定义一个 Task 结构体,用于存储要执行的函数和函数参数,如下所示:

              type Task struct {
                Handler func(v ...interface{})
                Params []interface{}
              }

              Task 类型是一个结构体,用于封装协程池的任务。其中 Handler 是一个函数类型,用于任务执行的函数;Params 是一个可变参数,调用 Handler 时传递给它的参数。

              其中,Handler 是一个无返回值的函数,且该函数可接受变长参数,Params 是一个任意类型的切片,用于传递函数的参数列表。

              在向协程池提交任务时,可以将 Task 对象作为参数进行提交。

              pool.Submit(&Task{
                Handler: func(v ...interface{}) {
                  // 执行任务的代码
                },
                Params: []interface{}{...}, // 任务的参数列表
              })

              在协程内部,可以通过调用 Task.Handler 方法,并将 Task.Params 作为参数传递进去,来运行具体的任务。

              select {
              case t := <-p.chTask:
                if t == nil {
                  return
                }
                t.Handler(t.Params...)
              }

              通过这种方式,协程池就能够动态地执行不同的任务,并且传递任意类型和数量的参数。

              优雅关闭协程池

              在使用协程池时,如何正确地关闭协程池,以避免因未正确关闭而导致的内存泄漏和程序崩溃呢?

              首先,需要明确协程池的运行状态,通过内部的 status 参数控制协程池的开关。当协程池处于运行状态时,协程池才能够接受新的任务,否则应该拒绝新的任务请求,并尽快释放内部的资源。

              其次,在关闭协程池时,需要确保所有的已运行的协程都已经执行完任务并退出。这时,可以使用 sync.Once 来执行一次协程池的清理工作。当协程池处于关闭状态时,不再接受新的任务,并通知所有的协程退出任务循环,最终实现协程池的优雅关闭。

              func (p *Pool) Stop() {
              if p.status == Running {
               p.status = Stopped
               p.Once.Do(func() {
               close(p.chTask)
               for i := uint64(0); i < p.runningWorkers; i++ {
                p.createWorker()
               }
               })
              }
              }

              保证协程安全

              在使用协程池时,需要注意线程安全问题,尤其是在多个协程同时访问协程池时,需要保证协程池的内部状态是线程安全的。

              同时对于状态的变更以及数量的增减,还需要保证代码的安全性。

              为了保证线程安全,可以使用互斥锁 sync.Mutex 来锁定协程池,以避免多个协程同时读写协程池的运行状态和其他内部参数。

              在协程池的内部实现中,使用的 sync.Once 只会单次执行的特性可以保证协程池只会初始化一次,防止因多次初始化而导致的内存泄漏或其他异常。

              测试用例

              为了测试协程池的正确性,以下是一个简单的测试用例。该测试用例创建一个容量为 3 的协程池,并向其中提交 10 个任务,每个任务随机睡眠一段时间,并输出当前时间。

              package main
              import (
              "fmt"
              "math/rand"
              "sync"
              "testing"
              "time"
              )
              func TestPool(t *testing.T) {
              pool := NewPool(3, func(err interface{}) {
               fmt.Println("发生 panic,错误信息:", err)
              })
              var wg sync.WaitGroup
              for i := 0; i < 10; i++ {
               wg.Add(1)
               go func(id int) {
               defer wg.Done()
               task := \&Task{
                Handler: func(v ...interface{}) {
                fmt.Printf("任务 %d 开始执行,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05"))
                rand.Seed(time.Now().UnixNano())
                time.Sleep(time.Duration(rand.Intn(5)) \* time.Second)
                fmt.Printf("任务 %d 执行完毕,时间:%v\n", id, time.Now().Format("2006-01-02 15:04:05"))
                },
                Params: \[]interface{}{},
               }
               pool.Submit(task)
               }(i)
              }
              wg.Wait()
              }

              输出结果如下:

              任务 0 开始执行,时间:2021-10-05 16:52:22

              任务 1 开始执行,时间:2021-10-05 16:52:22

              任务 2 开始执行,时间:2021-10-05 16:52:22

              任务 0 执行完毕,时间:2021-10-05 16:52:27

              任务 3 开始执行,时间:2021-10-05 16:52:27

              任务 4 开始执行,时间:2021-10-05 16:52:27

              任务 1 执行完毕,时间:2021-10-05 16:52:28

              任务 5 开始执行,时间:2021-10-05 16:52:28

              任务 6 开始执行python,时间:2021-10-05 16:52:28

              任务 7 开始执行,时间:2021-10-05 16:52:28

              任务 4 执行完毕,时间:2021-10-05 16:52:29

              任务 8 开始执行,时间:2021-10-05 16:52:29

              任务 9 开始执行,时间:2021-10-05 16:52:29

              任务 2 执行完毕,时间:2021-10-05 16:52:32

              任务 5 执行完毕,时间:2021-10-05 16:52:33

              任务 7 执行完毕,时间:2021-10-05 16:52:33

              任务 6 执行完毕,时间:2021-10-05 16:52:34

              任务 3 执行完毕,时间:2021-10-05 16:52:35

              任务 9 执行完毕,时间:2021-10-05 16:52:35

              任务 8 执行完毕,时间:2021-10-05 16:52:37

              从输出结果可以看出,协程池成功并行处理了所有的任务,并且在容量限制的情况下,成功地保证了协程池的线程安全性。

              改进

              可考虑增加对协程池容量的动态调整算法,例如在高峰期时增加协程池的容量,低谷期时降低协程池的容量。另外可以增加协程池的超时控制机制,以避免任务执行时间过长导致系统资源浪费和性能下降。

              总结

              协程池是 Go 语言中一种重要的并发编程模式,通过协程池可以高效地管理协程的生命周期、避免协程的频繁创建和销毁,提高程序的并发性能。在使用协程池时,需要注意解决函数传参问题、优雅关闭协程池和保证协程安全的问题,通过合理使用互斥锁和 sync.Once 可以有效解决这些问题,从而保证协程池的正确性和高效性。

              以上就是Go高级特性探究之协程池详解的详细内容,更多关于Go协程池的资料请关注我们其它相关文章!

              0

              精彩评论

              暂无评论...
              验证码 换一张
              取 消

              关注公众号