开发者

golang的tunny的用法示例教程

开发者 https://www.devze.com 2023-11-21 11:11 出处:网络 作者: codecraft
目录序WorkerclosureWorkercallbackWorkerPoolProcessSetSizeClose实例小结序 本文主要研究一下tunny
目录
  • Worker
  • closureWorker
  • callbackWorker
  • Pool
    • Process
    • SetSize
    • Close
  • 实例
    • 小结

      本文主要研究一下tunny

      Worker

      type Worker interface {
          // Process will synchronously perform a job and return the result.
          Process(interface{}) interface{}
          // blockUntilReady is called before each job is processed and must block the
          // calling goroutine until the Worker is ready to process the next job.
          BlockUntilReady()
          // Interrupt is called when a job is cancelled. The worker is responsible
          // for unblocking the Process implementation.
          Interrupt()
          // Terminate is called when a Worker is removed from the processing pool
          // and is responsible for cleaning up any held resources.
          Terminate()
      }

       Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法

      closureWorker

      type closureWorker struct {
          processor func(interface{}) interface{}
      }
      func (w *closureWorker) Process(payload interface{}) interface{} {
          return w.processor(payload)
      }
      func (w *closureWorker) BlockUntilReady() {}
      func (w *closureWorker) Interrupt()       {}
      func (w *closureWorker) Terminate()       {}

       closureWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法委托给processor

      callbackWorker

      type callbackWorker struct{}
      func (w *callbackWorker) Process(payload interface{}) interface{} {
          f, ok := payload.(func())
          if !ok {
              return ErrJobNotFunc
          }
          f()
          return nil
      }
      func (w *callbackWorker) BlockUntilReady() {}
      func (w *callbackWorker) Interrupt()       {}
      func (w *callbackWorker) TerIZUolZQpZminate()       {}

       www.devze.com;callbackWorker定义了processor属性,它实现了Worker接口的Process、BlockUntilReady、Interrupt、Terminate方法,其中Process方法执行的是payload函数

      Pool

      type Pool struct {
          queuedJobs int64
          ctor    func() Worker
          workers []*workerWrapper
          reqChan chan workRequest
          workerMut sync.Mutex
      }
      func New(n int, ctor func() Worker) *Pool {
          p := &Pool{
              ctor:    ctor,
              reqChan: make(chan workRequest),
      编程    }
          p.SetSize(n)
          return p
      }
      func NewFunc(n int, f func(interface{}) interface{}) *Pool {
          return New(n, func() Worker {
              return &closureWorker{
                  processor: f,
              }
          })
      }
      func NewCallback(n int) *Pool {
          return New(n, func() Worker {
              return &callbackWorker{}
          })
      }

       Pool定义了queuedJobs、ctor、workers、reqChan、workerMut属性;New方法根据n和ctor创建Pool;NewFunc方法根据n和f来创建closureWorker;NewCallback方法创建callbackWorker

      Process

      func (p *Pool) Process(payload interface{}) interface{} {
          atomic.AddInt64(&p.queuedJobs, 1)
          request, open := <-p.reqChan
          if !open {
              panic(ErrPoolNotRunning)
          }
          request.jobChan <- payload
          payload, open = <-request.retChan
          if !open {
              panic(ErrWorkerClosed)
          }
          atomic.AddInt64(&p.queuedJobs, -1)
          return payload
      }

       Process方法首先递增queuedJobs,然后从reqChan读取request,然后往jobChan写入payload,之后再等待retChan,最后递减queuedJobs

      SetSize

      func (p *Pool) SetSize(n int) {
          p.workerMut.Lock()
          defer p.workerMut.Unlock()
          lWorkers := len(p.workers)
          if lWorkers == n {
              return
          }
          // Add extra workers if N > len(workers)
          for i := lWorkers; i < n; i++ {
              p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
          }
          // Asynchronously stop all workers > N
          for i := n; i < lWorkers; i++ {
         IZUolZQpZ     p.workers[i].stop()
          }
          // Synchronously wait for all workers > N to stop
          for i := n; i < lWorkers; i++ {
              p.workers[i].join()
          }
          // Remove stopp编程客栈ed workers from slice
          p.workers = p.workers[:n]
      }

       SetSize方法首先通过workerMut加锁,然后根据lWorkers创建newWorkerWrapper,之后执行worker.stop,再执行worker.join(),然后清空workers

      Close

      func (p *Pool) Close() {
          p.SetSize(0)
          close(p.reqChan)
      }

       Close方法执行SetSize(0)及close(p.reqChan)

      实例

      func TestFuncJob(t *testing.T) {
          pool := NewFunc(10, func(in interface{}) interface{} {
              intVal := in.(int)
              return intVal * 2
          })
          defer pool.Close()
          for i := 0; i < 10; i++ {
              ret := pool.Process(10)
              if exp, act := 20, ret.(int); exp != act {
                  t.Errorf("Wrong result: %v != %v", act, exp)
              }
          }
      }

       TestFuncJob通过NewFunc创建pool,

      小结

      tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是closureWorker,NewCallback方法创建的是callbackWorker。

      doc

      • tunny

      以上就是golang的tunny的详细内容,更多关于golang tunny的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号