如何用GO每秒处理100万条数据请求

简介: 最近看了一篇文章,用go处理每分钟达百万条的数据请求原文地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/翻译地址:https://www.jianshu.com/p/21de03ac682c这里作者为处理高峰期高并发的数据请求,用了3个版本的处理方式,下面是自己的一些理解:第一种方式很简单,就是用go的协程处理请求,来一条请求开一个协程处理,由于每个请求是一个数据上传任务,有一定的耗时和资源消耗,当高峰期请求突然增多达到每分钟百万条的时候,不可避免的造成了携程爆炸,系统崩溃。

最近看了一篇文章,用go处理每分钟达百万条的数据请求
原文地址:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
翻译地址:https://www.jianshu.com/p/21de03ac682c

这里作者为处理高峰期高并发的数据请求,用了3个版本的处理方式,下面是自己的一些理解:

第一种方式很简单,就是用go的协程处理请求,来一条请求开一个协程处理,由于每个请求是一个数据上传
任务,有一定的耗时和资源消耗,当高峰期请求突然增多达到每分钟百万条的时候,不可避免的造成了携程爆炸,系统崩溃。

第二种方式用channel做了一个任务队列,来一条请求加到任务队列里,简单的生产者消费者模式,但这种方式治标不治本,高峰期的时候,任务队列长度一样
会爆炸,造成内存不够用,所以也不太合适。

第三种方式就有点意思了,这里贴一下完整的代码,帮助理解。

package main

import (
"fmt"
"time"
"runtime"
)

var (
//最大任务队列数
MaxWorker = 10
)

//有效载荷
type Payload struct {
Num int
Test string
}

//待执行的工作
type Job struct {
Payload Payload
}

//任务队列channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
JobChannel chan Job //每个工作者单元包含一个任务管道 用于获取任务
quit chan bool //退出信号
no int //编号
}

// 停止信号
func (w Worker) Stop() {
go func() {

  w.quit <- true

}()
}

//调度中心
type Dispatcher struct {
//工作者池(任务队列池)
WorkerPool chan chan Job
//工作者单元数量
MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
//创建工作者池,存放待处理的任务队列,maxWorkers为最大任务队列数
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
fmt.Println("创建一个新工作者单元")
return Worker{

  WorkerPool: workerPool,
  JobChannel: make(chan Job),
  quit:       make(chan bool),
  no:         no,

}
}

//循环 监听任务和结束信号
func (w Worker) Start() {
//启动协程,监听任务
go func() {

  for {
     // register the current worker into the worker queue.
     //工作者放回工作者池
     w.WorkerPool <- w.JobChannel
     //fmt.Println("w.WorkerPool <- w.JobChannel", w)
     select {
     case job := <-w.JobChannel:
        //fmt.Println("job := <-w.JobChannel")
        // 收到任务,执行打印任务
        fmt.Println(job.Payload.Test)
         //执行任务需要1秒时间
         time.Sleep(500 * time.Microsecond)
     case <-w.quit:
        // 收到退出信号,停止监听,结束该协程
        return
     }
  }

}()
}

//调度,任务分发
func (d *Dispatcher) dispatch() {
for {

  select {
  //从任务队列中获取任务
  case job := <-JobQueue:
     //fmt.Println("job := <-JobQueue:")
     go func(job Job) {
        //等待空闲worker (任务多的时候会阻塞这里)
        //从(10个)工作者池中获取一个任务队列channel,
        jobChannel := <-d.WorkerPool
        //fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
        // 将任务放到上述woker的私有任务channal中,jobChannel是一个无缓冲信道,每次只能放一个任务
        jobChannel <- job
        //fmt.Println("jobChannel <- job")
     }(job)
  }

}
}

//工作者池的初始化,注意Run为Dispatcher结构体指针的方法,所以此方法内对Dispathcer的修改在方法外也可见
func (d *Dispatcher) Run() {
// starting n number of workers
//创建10个工作者单元
for i := 1; i < d.MaxWorkers+1; i++ {

  worker := NewWorker(d.WorkerPool, i)
  worker.Start()

}
go d.dispatch()
}

//新建任务并放入任务队列
func addQueue() {
for i := 0; i < 1000000; i++ {

  time.Sleep(10*time.Microsecond)
  fmt.Println("当前请求数:",i)
  // 新建一个任务
  payLoad := Payload{Num: 1, Test:"this is Test string"}
  work := Job{Payload: payLoad}
  // 任务放入任务队列channal
  fmt.Println("新任务入队列!")
  JobQueue <- work
  //fmt.Println("队列总长度:",cap(JobQueue))
  //fmt.Println("队列未消费任务数量:",len(JobQueue))
  //fmt.Println("JobQueue <- work")
  fmt.Println("当前协程数:", runtime.NumGoroutine())
  //time.Sleep(1 * time.Second)

}
}

func main() {
//建立任务队列,每个任务队列中可以放10个任务
JobQueue = make(chan Job, 10)
fmt.Println("成功建立任务队列!")
//新建任务分发器
dispatcher := NewDispatcher(MaxWorker)
fmt.Println("成功建立任务分发器!")
dispatcher.Run()

//time.Sleep(1 * time.Second)
go addQueue()
time.Sleep(1000 * time.Second)
}

这种方式简单来说就是用一个两级channel作为一个任务队列池,队列池中存放多个任务队列,来一个任务后加入其中的一个任务队列,每个任务队列开一个协程处理
数据上传任务,基本过程如下图所示:

    这种方式很好的解决了每个任务处理都要开协称协程造成携程太多的问题,因为总共只有10个协程在处理work,并且工作池中只有10个任务队列,美个任务队列只存放一个任务,所以也不存在队列长度爆炸的问题,但是这种方式也有问题,就是当工作池中没有任务队列空闲的时候,新来的任务只能开一个协程等待空闲的任务队列,如果访问高峰期过长或者任务处理很耗时,会造成等待任务队列的协程急剧增长,最后也会造成系统内存崩溃。这种方式的好处在于等待任务队列的协程都是轻量级协程,每个协程占用的内存资源很少,所以如果只是处理高峰期每分钟百万条的数据请求,是可以的,相当于高峰期把每个任务缓存在轻量级协程里了,过了高峰期,轻量级协程就会减少,不会造成协程爆炸导致内存崩溃。但是如果每分钟百万条数据请求是常态,这种方式肯定也会造成内存崩溃。
   解决的方式有两种:
 1.数据库级别的缓存作为任务队列,硬盘比内存更大更便宜,但是这种方式的延迟会很大
 2.就是加机器了做负载均衡了,没有什么事是加一台机器解决不了的,如果有,那就加两台。
目录
相关文章
|
16天前
|
Go 开发者
掌握Go语言:Go语言结构体,精准封装数据,高效管理实体对象(22)
掌握Go语言:Go语言结构体,精准封装数据,高效管理实体对象(22)
|
5月前
|
NoSQL API Go
go-mongox:简单高效,让文档操作和 bson 数据构造更流畅
`go-mongox` 基于 **泛型** 对 `MongoDB` 官方框架进行了二次封装,它通过使用链式调用的方式,让我们能够丝滑地操作文档。同时,其还提供了多种类型的 `bson` 构造器,帮助我们高效的构建 `bson` 数据。
72 0
|
6月前
|
JSON Go 数据格式
Go 语言怎么处理三方接口返回数据?
Go 语言怎么处理三方接口返回数据?
69 0
|
8月前
|
编译器 Go
Go的数据竞争
数据竞争(Data Race)
52 0
|
4月前
|
Go
go 将函数 当做参数传递 实现 不同类型数据求和
go 将函数 当做参数传递 实现 不同类型数据求和
37 1
|
5月前
|
JSON Linux 测试技术
go语言处理数据、基本通信以及环境配置 -- json,protobuf,grpc
go语言处理数据、基本通信以及环境配置 -- json,protobuf,grpc
|
6月前
|
安全 Go 开发者
Go 语言使用标准库 sync 包的 mutex 互斥锁解决数据静态
Go 语言使用标准库 sync 包的 mutex 互斥锁解决数据静态
24 0
|
存储 JSON Go
在go语言中通过Post的方法提交json的数据
1.把URL及info的对像这两个参数发给login函数把结构体对象转换成json, 2.用POST方法提交JSON的数据到服务器上 3.通过调用Client.Do方法得到服务器的响应response的JSON 4.把服务器响应回来的JSON解析成结构体对象来存储相应的信息 5.调用解析JSON的结构体对象的各属性得到相应的信息
745 0
|
11月前
|
运维 安全 网络协议
避坑:Go并发编程时,如何避免发生竞态条件和数据竞争
大家都知道,Go是一种支持并发编程的编程语言,但并发编程也是比较复杂和容易出错的。比如本篇分享的问题:竞态条件和数据竞争的问题。
150 0
|
12月前
|
前端开发 JavaScript Go
[GO实战]投票系统-实现按钮将前端数据传给后端并接收
[GO实战]投票系统-实现按钮将前端数据传给后端并接收