扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章给大家介绍Golang中怎么应付百万级请求,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
创新互联专注于揭阳网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供揭阳营销型网站建设,揭阳网站制作、揭阳网页设计、揭阳网站官网定制、小程序开发服务,打造揭阳网络公司原创品牌,更为您提供揭阳网站排名全网营销落地服务。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
幼稚地使用Go runtines
最开始的时候我们非常天真地实现一个POST的钩子方法如下,只是简单地将每个请求体的上传动作放到Go rutinues中让他们并行执行:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
在中等规模的负载情况下,这种方法对大部分人都是没有问题的,但在应对更大规模的请求量时候,我们很快就招架不住了。当我们把这个版本的代码部署到生产环境以后,我们期待能有大量的请求进来但实际还不能达到百万级别的数量级。我们完全低估了这个系统要处理的流量数。
但不管怎么说上面的方法都是欠妥的。因为它没有任何方法让我们去控制Go runtinues启动的数量。所以当我们的系统在面对每分钟百万级POST请求的时候很快就垮掉了。
再战
我们需要找到另外的方法。在一开始我们就在讨论如何让我们的请求处理程序的生命周期尽可能地缩短以及上传到S3的操作能在后台或者异步运行。当然,在Ruby on Rails里面你必须这么做,否则你将会阻塞到所有其他的网络请求处理程序。无论您使用的是美洲狮,独角兽还是过路人(请不要参与JRuby讨论)。然后我们想到使用消息队列这种比较常见的方法来处理来达到我们的目的,例如Resque, Sidekiq, SQS等等,还有数不清的工具因为实在有太多方法来实现这个功能。
所以在第二次迭代的时候,我们需要创建一个缓冲队列,我们会将任务放入队列里面然后再一个个地上传到S3上,但由于我们希望达到能够控制这个队列的最大容量的目的,并且我们有足够的RAM来允许我们将请求体储存到内存当中,所以我们认为直接使用了Go提供的channel,然后将我们的请求直接入队到channel中处理就可以了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
我们会从channel中获取任务并且执行他们的上传操作
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } }
但说句老实话,我并不知道这是在干嘛。肯定是因为那时已经太晚还有我们已经喝了太多的红牛。
这个改动并没有让我们的困境得到任何改善,我们将并发任务放到了队列中执行仅仅是看上去好像解决了问题。但是我们的异步程序一次只会上传一个请求体到S3上面,但是我们的请求数此时远远大于我们上传到S3的数量,可想而知我们的缓冲队列很快就到达了他的极限爆满了,然后它阻挡了其他网络请求的入队操作。
相当于我们仅仅回避了问题,并且让我们的系统的崩溃时间进入了倒数。我们这个缺陷的版本发布以后,整个系统的延迟率在持续性地每分钟在上涨。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
接下来修改我们网络请求的钩子函数,负责创建一个Job的结构体的实例然后将其放入JobQueue channel中等待worker来获取执行。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }
在我们网络服务初始化的时候创建一个Dispather并且调用Run()创建一个装有一定数量worker的线程池,用来接收和处理来自JobQueue的Job
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
下面是我们Dispather的实现
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
当我们将这个版本发布到生产环境以后我们的延迟率马上有明显的下降,我们处理请求的能力有一个质的飞跃。
关于Golang中怎么应付百万级请求就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流