我们必要找到另一种的要领。从一开始我们就开始接头怎样让哀求处理赏罚措施的生命周期尽也许的短,并在靠山发生处理赏罚。虽然,这是在 RubyonRails必必要做的工作,不然,不管你是行使puma,unicorn照旧 passenger,你的全部的可用的web worker都将阻塞。
那么我们就必要操作常见的办理方案来完成这项事变,好比Resque,Sidekiq, SQS等。虽然尚有其他器材,由于有许多要领可以实现。
因此,我们第二次改造是建设一个buffer channel,我们可以将一些功课哀求扔举办列并将它们上传到S3,因为我们可以节制行列的最大长度,而且有足够的RAM来列队处理赏罚内存中的功课,因此我们以为只要在通道行列中缓冲功课就行了。
- var Queue chan Payload
-
- func init() {
- Queue = make(chan Payload, MAX_QUEUE)
- }
-
- func payloadHandler(w http.ResponseWriter, r *http.Request) {
- ...
- // Go through each payload and queue items individually to be posted to S3
- for _, payload := range content.Payloads {
- Queue <- payload
- }
- ...
- }
然后,为了将使命从buffer channel中取出并处理赏罚它们,我们正在行使这样的方法:
- func StartProcessor() {
- for {
- select {
- case job := <-Queue:
- job.payload.UploadToS3() // <-- STILL NOT GOOD
- }
- }
- }
说真话,我不知道我们在想什么,这必定是一个难得的夜晚。这种要领并没有给我们带来什么晋升,我们用一个缓冲的行列替代了有缺陷的并发,壹贝偾推迟了题目的发生时刻罢了。我们的同步处理赏罚器每次只向S3上传一个有用载荷,因为传入哀求的速度远宏大于单个处理赏罚器上传到S3的手段,因此我们的buffer channel敏捷到达极限,行列已经阻塞而且无法再往里边添加功课。
我们只是简朴的绕过了这个题目,最终导致我们的体系完全瓦解。在我们陈设这个有缺陷的版本后,我们的耽误一连的升高。

更好的办理方案
我们抉择在Go channel上行使一个通用模式来建设一个 2-tier(双重)channel体系,一个用来处理赏罚列队的job,一个用来节制有几多worker在 JobQueue上并发事变。
这个设法是将上传到S3的并行速率进步到一个可一连的速率,同时不会造成呆板瘫痪,也不会激发S3的毗连错误。
以是我们选择建设一个 Job/Worker模式。对付那些认识Java,C#等的人来说,可以将其视为Golang行使channel来实现WorkerThread-Pool的方法。
- var (
- MaxWorker = os.Getenv("MAX_WORKERS")
- MaxQueue = os.Getenv("MAX_QUEUE")
- )
-
- // Job represents the job to be run
- type Job struct {
- Payload Payload
- }
-
- // A buffered channel that we can send work requests on.
- var JobQueue chan Job
-
- // Worker represents the worker that executes the job
- type Worker struct {
- WorkerPool chan chan Job
- JobChannel chan Job
- quit chan bool
- }
-
- func NewWorker(workerPool chan chan Job) Worker {
- return Worker{
- WorkerPool: workerPool,
- JobChannel: make(chan Job),
- quit: make(chan bool)}
- }
-
- // Start method starts the run loop for the worker, listening for a quit channel in
- // case we need to stop it
- func (w Worker) Start() {
- go func() {
- for {
- // register the current worker into the worker queue.
- w.WorkerPool <- w.JobChannel
-
- select {
- case job := <-w.JobChannel:
- // we have received a work request.
- if err := job.Payload.UploadToS3(); err != nil {
- log.Errorf("Error uploading to S3: %s", err.Error())
- }
-
- case <-w.quit:
- // we have received a signal to stop
- return
- }
- }
- }()
- }
-
- // Stop signals the worker to stop listening for work requests.
- func (w Worker) Stop() {
- go func() {
- w.quit <- true
- }()
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|