MyException - 我的异常网
当前位置:我的异常网» Go » 小弟我读《通过Go来处理每分钟达百万的数据请求》

小弟我读《通过Go来处理每分钟达百万的数据请求》

www.MyException.Cn  网友分享于:2013-09-28  浏览:0次
我读《通过Go来处理每分钟达百万的数据请求》

我读《通过Go来处理每分钟达百万的数据请求》

原文

原文作者为Malwarebytes公司的首席架构师Marcio Castilho http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

问题描述

当我们的服务端需要处理大量的耗时任务时,我们一般都会考虑将耗时任务异步处理。

简单粗暴法

golang恰恰给我们的异步处理带来了很大的便利--go func()。然而,绝大多数的时候,我们不能简单粗暴的创建协程来处理异步任务,原因是不可控。虽然协程相对于线程占用的系统资源更少,但这并不代表我们可以无休止的创建协程。积水成江,不停创建协程也有压垮系统的风险。这里引用原作者的demo,一个执行耗时任务的handler。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
             w.WriteHeader(http.StatusMethodNotAllowed)
             return
    }
    // Read the body into a string for json decoding
     var content = &PayloadCollection{}
     err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
             w.Header().Set("Content-Type", "application/json; charset=UTF-8")
             w.WriteHeader(http.StatusBadRequest)
             return
     }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }
    w.WriteHeader(http.StatusOK)
}

这就是我们遇到的第一个问题,简单粗暴起协程处理耗时任务导致的系统不可控性。我们自然而然就会想,怎么能让系统更可控呢。

优雅的方法

一个很自然的思路是,建立任务队列。golang提供了线程安全的任务队列实现方式--带缓冲的channal。但是这样只是延后了请求的爆发。作者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将作者的代码修改了一下,单文件执行,以记录这一模式。

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待执行的工作
type Job struct {
    Payload Payload
}

//任务channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
    JobChannel chan Job      //每个工作者单元包含一个任务管道 用于获取任务
    quit       chan bool     //退出信号
    no         int           //编号
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("创建一个新工作者单元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//循环  监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任务
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信号
                return
            }
        }
    }()
}

// 停止信号
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//调度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者数量
    MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 20; i++ {
        // 新建一个任务
        payLoad := Payload{Num: 1}
        work := Job{Payload: payLoad}
        // 任务放入任务队列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work")
        time.Sleep(1 * time.Second)
    }
}

/*
一个任务的执行过程如下
JobQueue <- work  新任务入队
job := <-JobQueue: 调度中心收到任务
jobChannel := <-d.WorkerPool 从工作者池取到一个工作者
jobChannel <- job 任务给到工作者
job := <-w.JobChannel 工作者取出任务
{{1}} 执行任务
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
*/

这样,我们已经能够主动的控制worker的数量。这时候,我们不妨设想一下,我们完全解决问题了么?如果有大量的任务同时涌入,会发生什么样的结果。程序会阻塞等待可用的workerjobChannel := <-d.WorkerPool

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

不要忘记,这个调度方法也是在不断的创建协程等待空闲的worker。我们改一下代码如下:

package main

import (
    "fmt"
    "reflect"
    "time"
    "runtime"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待执行的工作
type Job struct {
    Payload Payload
}

//任务channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
    JobChannel chan Job      //每个工作者单元包含一个任务管道 用于获取任务
    quit       chan bool     //退出信号
    no         int           //编号
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("创建一个新工作者单元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//循环  监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任务
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信号
                return
            }
        }
    }()
}

// 停止信号
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//调度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者数量
    MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                fmt.Println("等待空闲worker (任务多的时候会阻塞这里")
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher(MaxWorker)
    dispatcher.Run()
    time.Sleep(1 * time.Second)
    go addQueue()
    time.Sleep(1000 * time.Second)
}

func addQueue() {
    for i := 0; i < 100; i++ {
        // 新建一个任务
        payLoad := Payload{Num: i}
        work := Job{Payload: payLoad}
        // 任务放入任务队列channal
        JobQueue <- work
        fmt.Println("JobQueue <- work", i)
        fmt.Println("当前协程数:", runtime.NumGoroutine())
        time.Sleep(100 * time.Millisecond)
    }
}

执行结果如下:

...
...

JobQueue <- work 97
当前协程数: 100
job := <-JobQueue:
等待空闲worker (任务多的时候会阻塞这里
JobQueue <- work 98
当前协程数: 101
job := <-JobQueue:
等待空闲worker (任务多的时候会阻塞这里
JobQueue <- work 99
当前协程数: 102

我们发现,我们依然没能控制住协程数量,我们只是控制住了worker的数量。这种情况下,如果worker数量设置的合理且异步任务耗时不是特别长的情况下一般没有问题。但是出于安全的考虑,我要把这个阻塞的协程数做一个控制,如果达到限制时候拒绝服务以保护系统该怎么处理?

真正控制协程数量(并发执行的任务数)

我们可以控制并发执行(包括等待执行)的任务数。我们加入任务使用如下判断。用一个带缓冲的Channel控制并发执行的任务数。当任务异步处理完成的时候执行<- DispatchNumControl释放控制即可。用这种方法,我们可以根据压测结果设置合适的并发数从而保证系统能够尽可能的发挥自己的能力,同时保证不会因为任务量太大而崩溃(因为达到极限的时候,系统会告诉调用方--我很忙)。

//用于控制并发处理的协程数
var DispatchNumControl = make(chan bool, 10000)

func Limit(work Job) bool {
   select {
   case <-time.After(time.Millisecond * 100):
      fmt.println("我很忙")
      return false
   case DispatchNumControl <- true:
   // 任务放入任务队列channal
      jobChannel <- work
      return true
   }
}

总结

总结一波,协程是个好的设计,但任何东西都不能过度使用。我们做系统设计的时候,一定也要时刻想着控制--要对自己设计的系统有着足够的控制力。

文章评论

总结2014中国互联网十大段子
总结2014中国互联网十大段子
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
程序员和编码员之间的区别
程序员和编码员之间的区别
程序员都该阅读的书
程序员都该阅读的书
如何成为一名黑客
如何成为一名黑客
程序员的鄙视链
程序员的鄙视链
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
漫画:程序员的工作
漫画:程序员的工作
程序员应该关注的一些事儿
程序员应该关注的一些事儿
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
那些争议最大的编程观点
那些争议最大的编程观点
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
程序员必看的十大电影
程序员必看的十大电影
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
鲜为人知的编程真相
鲜为人知的编程真相
我是如何打败拖延症的
我是如何打败拖延症的
为什么程序员都是夜猫子
为什么程序员都是夜猫子
老程序员的下场
老程序员的下场
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
我的丈夫是个程序员
我的丈夫是个程序员
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
 程序员的样子
程序员的样子
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
代码女神横空出世
代码女神横空出世
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
一个程序员的时间管理
一个程序员的时间管理
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
旅行,写作,编程
旅行,写作,编程
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
10个调试和排错的小建议
10个调试和排错的小建议
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有