加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 建站 > 正文

使用Go处理每分钟百万请求

发布时间:2019-07-05 20:53:22 所属栏目:建站 来源:MarcioCastilho
导读:这篇文章在medium上很火,作者以现实案例来说明,讲得很好。 我们常常传闻行使Go的goroutine和channel很轻易实现高并发,那是不是所有代码都放在goroutine中运行就可以实现高并发措施了呢?很显然并不是。这篇文章将教各人怎样一步一步写出一个简朴的, 高

我们必要找到另一种的要领。从一开始我们就开始接头怎样让哀求处理赏罚措施的生命周期尽也许的短,并在靠山发生处理赏罚。虽然,这是在 RubyonRails必必要做的工作,不然,不管你是行使puma,unicorn照旧 passenger,你的全部的可用的web worker都将阻塞。

那么我们就必要操作常见的办理方案来完成这项事变,好比Resque,Sidekiq, SQS等。虽然尚有其他器材,由于有许多要领可以实现。

因此,我们第二次改造是建设一个buffer channel,我们可以将一些功课哀求扔举办列并将它们上传到S3,因为我们可以节制行列的最大长度,而且有足够的RAM来列队处理赏罚内存中的功课,因此我们以为只要在通道行列中缓冲功课就行了。

  1. var Queue chan Payload 
  2.  
  3. func init() { 
  4.     Queue = make(chan Payload, MAX_QUEUE) 
  5.  
  6. func payloadHandler(w http.ResponseWriter, r *http.Request) { 
  7.     ... 
  8.     // Go through each payload and queue items individually to be posted to S3 
  9.     for _, payload := range content.Payloads { 
  10.         Queue <- payload 
  11.     } 
  12.     ... 

然后,为了将使命从buffer channel中取出并处理赏罚它们,我们正在行使这样的方法:

  1. func StartProcessor() { 
  2.     for { 
  3.         select { 
  4.         case job := <-Queue: 
  5.             job.payload.UploadToS3()  // <-- STILL NOT GOOD 
  6.         } 
  7.     } 

说真话,我不知道我们在想什么,这必定是一个难得的夜晚。这种要领并没有给我们带来什么晋升,我们用一个缓冲的行列替代了有缺陷的并发,壹贝偾推迟了题目的发生时刻罢了。我们的同步处理赏罚器每次只向S3上传一个有用载荷,因为传入哀求的速度远宏大于单个处理赏罚器上传到S3的手段,因此我们的buffer channel敏捷到达极限,行列已经阻塞而且无法再往里边添加功课。

我们只是简朴的绕过了这个题目,最终导致我们的体系完全瓦解。在我们陈设这个有缺陷的版本后,我们的耽误一连的升高。

行使Go处理赏罚每分钟百万哀求

更好的办理方案

我们抉择在Go channel上行使一个通用模式来建设一个 2-tier(双重)channel体系,一个用来处理赏罚列队的job,一个用来节制有几多worker在 JobQueue上并发事变。

这个设法是将上传到S3的并行速率进步到一个可一连的速率,同时不会造成呆板瘫痪,也不会激发S3的毗连错误。

以是我们选择建设一个 Job/Worker模式。对付那些认识Java,C#等的人来说,可以将其视为Golang行使channel来实现WorkerThread-Pool的方法。

  1. var ( 
  2.     MaxWorker = os.Getenv("MAX_WORKERS") 
  3.     MaxQueue  = os.Getenv("MAX_QUEUE") 
  4.  
  5. // Job represents the job to be run 
  6. type Job struct { 
  7.     Payload Payload 
  8.  
  9. // A buffered channel that we can send work requests on. 
  10. var JobQueue chan Job 
  11.  
  12. // Worker represents the worker that executes the job 
  13. type Worker struct { 
  14.     WorkerPool  chan chan Job 
  15.     JobChannel  chan Job 
  16.     quit        chan bool 
  17.  
  18. func NewWorker(workerPool chan chan Job) Worker { 
  19.     return Worker{ 
  20.         WorkerPool: workerPool, 
  21.         JobChannel: make(chan Job), 
  22.         quit:       make(chan bool)} 
  23.  
  24. // Start method starts the run loop for the worker, listening for a quit channel in 
  25. // case we need to stop it 
  26. func (w Worker) Start() { 
  27.     go func() { 
  28.         for { 
  29.             // register the current worker into the worker queue. 
  30.             w.WorkerPool <- w.JobChannel 
  31.  
  32.             select { 
  33.             case job := <-w.JobChannel: 
  34.                 // we have received a work request. 
  35.                 if err := job.Payload.UploadToS3(); err != nil { 
  36.                     log.Errorf("Error uploading to S3: %s", err.Error()) 
  37.                 } 
  38.  
  39.             case <-w.quit: 
  40.                 // we have received a signal to stop 
  41.                 return 
  42.             } 
  43.         } 
  44.     }() 
  45.  
  46. // Stop signals the worker to stop listening for work requests. 
  47. func (w Worker) Stop() { 
  48.     go func() { 
  49.         w.quit <- true 
  50.     }() 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读