2019独角兽企业重金招聘Python工程师标准>>>

I have been working in the anti-spam, anti-virus and anti-malware industry for over 15 years at a few different companies, and now I know how complex these systems could end up being due to the massive amount of data we handle daily.

Currently I am CEO of smsjunk.com and Chief Architect Officer at KnowBe4, both in companies active in the cybersecurity industry.

What is interesting is that for the last 10 years or so as a Software Engineer, all the web backend development that I have been involved in has been mostly done in Ruby on Rails. Don’t take me wrong, I love Ruby on Rails and I believe it’s an amazing environment, but after a while you start thinking and designing systems in the ruby way, and you forget how efficient and simple your software architecture could have been if you could leverage multi-threading, parallelization, fast executions and small memory overhead. For many years, I was a C/C++, Delphi and C# developer, and I just started realizing how less complex things could be with the right tool for the job.

I am not very big on the language and framework wars that the interwebs are always fighting about. I believe efficiency, productivity and code maintainability relies mostly on how simple you can architect your solution.

The Problem

While working on a piece of our anonymous telemetry and analytics system, our goal was to be able to handle a large amount of POST requests from millions of endpoints. The web handler would receive a JSON document that may contain a collection of many payloads that needed to be written to Amazon S3, in order for our map-reduce systems to later operate on this data.

Traditionally we would look into creating a worker-tier architecture, utilizing things such as:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • and so on…

And setup 2 different clusters, one for the web front-end and another for the workers, so we can scale up the amount of background work we can handle.

But since the beginning, our team knew that we should do this in Go because during the discussion phases we saw this could be potentially a very large traffic system. I have been using Go for about 2 years or so, and we had developed a few systems here at work but none that would get this amount of load.

We started by creating a few structures to define the web request payload that we would be receiving through the POST calls, and a method to upload it into our S3 bucket.

type PayloadCollection struct {WindowsVersion  string    `json:"version"`Token           string    `json:"token"`Payloads        []Payload `json:"data"`
}type Payload struct {// [redacted]
}func (p *Payload) UploadToS3() error {// the storageFolder method ensures that there are no name collision in// case we get same timestamp in the key namestorage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())bucket := S3Bucketb := new(bytes.Buffer)encodeErr := json.NewEncoder(b).Encode(payload)if encodeErr != nil {return encodeErr}// Everything we post to the S3 bucket should be marked 'private'var acl = s3.Privatevar contentType = "application/octet-stream"return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

Naive approach to Go routines

Initially we took a very naive implementation of the POST handler, just trying to parallelize the job processing into a simple goroutine:

func payloadHandler(w http.ResponseWriter, r *http.Request) {if r.Method != "POST" {w.WriteHeader(http.StatusMethodNotAllowed)return}// Read the body into a string for json decodingvar content = &PayloadCollection{}err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)if err != nil {w.Header().Set("Content-Type", "application/json; charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}// Go through each payload and queue items individually to be posted to S3for _, payload := range content.Payloads {go payload.UploadToS3()   // <----- DON'T DO THIS}w.WriteHeader(http.StatusOK)
}

For moderate loads, this could work for the majority of people, but this quickly proved to not work very well at a large scale. We were expecting a lot of requests but not in the order of magnitude we started seeing when we deployed the first version to production. We completely understimated the amount of traffic.

The approach above is bad in several different ways. There is no way to control how many go routines we are spawning. And since we were getting 1 million POST requests per minute of course this code crashed and burned very quickly.

Trying again

We needed to find a different way. Since the beginning we started discussing how we needed to keep the lifetime of the request handler very short and spawn processing in the background. Of course, this is what you must do in the Ruby on Rails world, otherwise you will block all the available worker web processors, whether you are using puma, unicorn, passenger (Let’s not get into the JRuby discussion please). Then we would have needed to leverage common solutions to do this, such as Resque, Sidekiq, SQS, etc. The list goes on since there are many ways of achieving this.

So the second iteration was to create a buffered channel where we could queue up some jobs and upload them to S3, and since we could control the maximum number of items in our queue and we had plenty of RAM to queue up jobs in memory, we thought it would be okay to just buffer jobs in the channel queue.

var Queue chan Payloadfunc init() {Queue = make(chan Payload, MAX_QUEUE)
}func payloadHandler(w http.ResponseWriter, r *http.Request) {...// Go through each payload and queue items individually to be posted to S3for _, payload := range content.Payloads {Queue <- payload}...
}

And then to actually dequeue jobs and process them, we were using something similar to this:

func StartProcessor() {for {select {case job := <-Queue:job.payload.UploadToS3()  // <-- STILL NOT GOOD}}
}

To be honest, I have no idea what we were thinking. This must have been a late night full of Red-Bulls. This approach didn’t buy us anything, we have traded flawed concurrency with a buffered queue that was simply postponing the problem. Our synchronous processor was only uploading one payload at a time to S3, and since the rate of incoming requests were much larger than the ability of the single processor to upload to S3, our buffered channel was quickly reaching its limit and blocking the request handler ability to queue more items.

We were simply avoiding the problem and started a count-down to the death of our system eventually. Our latency rates kept increasing in a constant rate minutes after we deployed this flawed version.

The Better Solution

We have decided to utilize a common pattern when using Go channels, in order to create a 2-tier channel system, one for queuing jobs and another to control how many workers operate on the JobQueue concurrently.

The idea was to parallelize the uploads to S3 to a somewhat sustainable rate, one that would not cripple the machine nor start generating connections errors from S3. So we have opted for creating a Job/Worker pattern. For those that are familiar with Java, C#, etc, think about this as the Golang way of implementing a Worker Thread-Pool utilizing channels instead.

var (MaxWorker = os.Getenv("MAX_WORKERS")MaxQueue  = os.Getenv("MAX_QUEUE")
)// Job represents the job to be run
type Job struct {Payload Payload
}// A buffered channel that we can send work requests on.
var JobQueue chan Job// Worker represents the worker that executes the job
type Worker struct {WorkerPool  chan chan JobJobChannel  chan Jobquit       chan bool
}func NewWorker(workerPool chan chan Job) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan Job),quit:       make(chan bool)}
}// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {go func() {for {// register the current worker into the worker queue.w.WorkerPool <- w.JobChannelselect {case job := <-w.JobChannel:// we have received a work request.if err := job.Payload.UploadToS3(); err != nil {log.Errorf("Error uploading to S3: %s", err.Error())}case <-w.quit:// we have received a signal to stopreturn}}}()
}// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {go func() {w.quit <- true}()
}

We have modified our Web request handler to create an instance of Jobstruct with the payload and send into the JobQueue channel for the workers to pickup.

func payloadHandler(w http.ResponseWriter, r *http.Request) {if r.Method != "POST" {w.WriteHeader(http.StatusMethodNotAllowed)return}// Read the body into a string for json decodingvar content = &PayloadCollection{}err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)if err != nil {w.Header().Set("Content-Type", "application/json; charset=UTF-8")w.WriteHeader(http.StatusBadRequest)return}// Go through each payload and queue items individually to be posted to S3for _, payload := range content.Payloads {// let's create a job with the payloadwork := Job{Payload: payload}// Push the work onto the queue.JobQueue <- work}w.WriteHeader(http.StatusOK)
}

During our web server initialization we create a Dispatcher and call Run()to create the pool of workers and to start listening for jobs that would appear in the JobQueue.

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

Below is the code for our dispatcher implementation:

type Dispatcher struct {// A pool of workers channels that are registered with the dispatcherWorkerPool chan chan Job
}func NewDispatcher(maxWorkers int) *Dispatcher {pool := make(chan chan Job, maxWorkers)return &Dispatcher{WorkerPool: pool}
}func (d *Dispatcher) Run() {// starting n number of workersfor i := 0; i < d.maxWorkers; i++ {worker := NewWorker(d.pool)worker.Start()}go d.dispatch()
}func (d *Dispatcher) dispatch() {for {select {case job := <-JobQueue:// a job request has been receivedgo func(job Job) {// try to obtain a worker job channel that is available.// this will block until a worker is idlejobChannel := <-d.WorkerPool// dispatch the job to the worker job channeljobChannel <- job}(job)}}
}

Note that we provide the number of maximum workers to be instantiated and be added to our pool of workers. Since we have utilized Amazon Elasticbeanstalk for this project with a dockerized Go environment, and we always try to follow the 12-factor methodology to configure our systems in production, we read these values from environment variables. That way we could control how many workers and the maximum size of the Job Queue, so we can quickly tweak these values without requiring re-deployment of the cluster.

var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue  = os.Getenv("MAX_QUEUE")
)

Immediately after we have deployed it we saw all of our latency rates drop to insignificant numbers and our ability to handle requests surged drastically.

Minutes after our Elastic Load Balancers were fully warmed up, we saw our ElasticBeanstalk application serving close to 1 million requests per minute. We usually have a few hours during the morning hours in which our traffic spikes over to more than a million per minute.

As soon as we have deployed the new code, the number of servers dropped considerably from 100 servers to about 20 servers.

After we had properly configured our cluster and the auto-scaling settings, we were able to lower it even more to only 4x EC2 c4.Large instances and the Elastic Auto-Scaling set to spawn a new instance if CPU goes above 90% for 5 minutes straight.

Conclusion

Simplicity always wins in my book. We could have designed a complex system with many queues, background workers, complex deployments, but instead we decided to leverage the power of Elasticbeanstalk auto-scaling and the efficiency and simple approach to concurrency that Golang provides us out of the box.

It’s not everyday that you have a cluster of only 4 machines, that are probably much less powerful than my current MacBook Pro, handling POST requests writing to an Amazon S3 bucket 1 million times every minute.

There is always the right tool for the job. For sometimes when your Ruby on Rails system needs a very powerful web handler, think a little outside of the ruby eco-system for simpler yet more powerful alternative solutions.

Before you go…

I would really appreciate if you follow us on Twitter and share this post with your friends. You can find me on Twitter at http://twitter.com/mcastilho

转载于:https://my.oschina.net/lemonwater/blog/1526925

Go处理百万每分钟的请求相关推荐

  1. python分析nginx日志,每分钟nginx请求超过10ms的比例

    代码如下: #!/usr/bin/python # --*-- coding:utf-8 --*-- import time import datetime import sys import os ...

  2. FW 每秒百万级别的 HTTP 请求 sung: 重型的(heavy-duty)、分布式的、多协议测试工具...

    本文是构建能够每秒处理 3 百万请求的高性能 Web 集群系列文章的第一篇.它记录了我使用负载生成器工具的一些经历,希望它能帮助每一个像我一样不得不使用这些工具的人节省时间. 负载生成器是一些生成用于 ...

  3. 如何生成每秒百万级别的 HTTP 请求

    第一篇:<如何生成每秒百万级别的 HTTP 请求?> 第二篇:<为最佳性能调优 Nginx> 第三篇:<用 LVS 搭建一个负载均衡集群> 本文是构建能够每秒处理 ...

  4. 记一次线上请求偶尔变慢的排查

    前言 最近解决了个比较棘手的问题,由于排查过程挺有意思,于是就以此为素材写出了本篇文章. Bug现场 这是一个偶发的性能问题.在每天几百万比交易请求中,平均耗时大约为300ms,但总有那么100多笔会 ...

  5. 从“蛛丝马迹”追踪百万IP爬虫攻击

    API正在成为一种新的资产,驱动商业创新与数字化转型的同时,其商业价值也使其成为黑客的又一攻击目标. 根据分析机构Gartner 预测:到 2022 年,API滥用将成为最常见的攻击媒介,导致企业 W ...

  6. nginx请求频率限制模块ngx_http_limit_req_module

    模块: ngx_http_limit_req_module 作用: 限制客户端请求频率,防止恶意攻击 配置示例: http {limit_req_zone $binary_remote_addr zo ...

  7. 用Python实现每秒处理120万次HTTP请求

    用 Python 做到每秒处理上百万次 HTTP 请求,可能吗?也许不能,但直到最近,这已成为现实. 很多公司都在为了提升程序的执行性能和降低服务器的运营成本,而放弃 Python 去选择其它编程语言 ...

  8. linux的TCP连接数量最大不能超过65535个,那服务器是如何应对百万千万的并发的?

    光是在知乎上这个问题我都看了好几遍了,问之前先搜一下不好吗?简单说来: TCP连接数量最大不能超过65535是错的. 一个连接由一个5元组决定(protocol,local IP,local port ...

  9. shell脚本统计网站pv,每分钟/小时/天~

    脚本内容如下: #!/bin/bash ####获取输入 read -p "请输入日志名称: " log_name ####将日志进行统计,每秒钟的请求数 cat ./$log_n ...

最新文章

  1. 65%的家庭有人“啃老”,数据解读国内版巨婴是如何炼成的?
  2. OpenGL text rendering文字渲染的实例
  3. 转发一份GoldenGate 配置文档,里面有参数说明,值得看
  4. 程序员辞职的7个常用理由,你用的是哪一个?
  5. RabbitMQ 入门:2. Exchange 和 Queue
  6. mysql 字段唯一性问题
  7. C++/OpenCV:Error: Assertion failed ((unsigned)i0 < (unsigned)(size.p[0] * size.p[1]))
  8. linux md5sum命令
  9. C#预处理器指令 用法
  10. Android四大组件每个组件的作用?它们都可以开启多进程吗?
  11. 每日一句20191105
  12. 转: Eclipse自动提示功能
  13. 21年,周杰伦越发孤独
  14. linux shell通过i2cget命令获取I2C器件寄存器的值
  15. volte的sip信令流程_VOLTESIP代码详解及SIP流程图解
  16. extern关键字的作用
  17. B站UP主稚晖君自制机械臂给葡萄做缝合手术,工业制造趋于智能化
  18. 计算机专业装win几,老电脑装win7还是win10_老电脑装win10还是win7
  19. IT人员升职必会的软技能
  20. 转换率是什么?如何提升转换率(CVR)?

热门文章

  1. bat批处理文件启动Eclipse和ivy本地仓库的配置
  2. Get/POST方法提交的长度限制
  3. 从普本到北大:我的跨校跨专业考研经验
  4. 在Ubuntu下FFmpeg编译,支持x264和x265(HECV)
  5. JS-arguments分析
  6. 接口学习笔记(2009.11.24)
  7. iOS视频流采集概述(AVCaptureSession)
  8. java中两个Integer类型的值相比较的问题
  9. JHipster技术简介
  10. python 多线程爬虫 实例