GitHub 源码
https://github.com/panjf2000/gnet
📖 简介
gnet
是一个基于事件驱动的高性能和轻量级网络框架。它直接使用 epoll 和 kqueue 系统调用而非标准 Go 网络包:net 来构建网络应用,它的工作原理类似两个开源的网络库:netty 和 libuv,这也使得 gnet
达到了一个远超 Go net 的性能表现。
gnet
设计开发的初衷不是为了取代 Go 的标准网络库:net,而是为了创造出一个类似于 Redis、Haproxy 能高效处理网络包的 Go 语言网络服务器框架。
gnet
的卖点在于它是一个高性能、轻量级、非阻塞的纯 Go 实现的传输层(TCP/UDP/Unix Domain Socket)网络框架,开发者可以使用 gnet
来实现自己的应用层网络协议(HTTP、RPC、Redis、WebSocket 等等),从而构建出自己的应用层网络应用:比如在 gnet
上实现 HTTP 协议就可以创建出一个 HTTP 服务器 或者 Web 开发框架,实现 Redis 协议就可以创建出自己的 Redis 服务器等等。
gnet
衍生自另一个项目: evio
,但拥有更丰富的功能特性,且性能远胜之。
🚀 功能
- 高性能 的基于多线程/Go 程网络模型的 event-loop 事件驱动
- 内置 goroutine 池,由开源库 ants 提供支持
- 内置 bytes 内存池,由开源库 bytebufferpool 提供支持
- 整个生命周期是无锁的
- 简洁的 APIs
- 基于 Ring-Buffer 的高效内存利用
- 支持多种网络协议/IPC 机制:
TCP
、UDP
和Unix Domain Socket
- 支持多种负载均衡算法:
Round-Robin(轮询)
、Source Addr Hash(源地址哈希)
和Least-Connections(最少连接数)
- 支持两种事件驱动机制:Linux 里的
epoll
以及 FreeBSD/Darwin 里的kqueue
- 支持异步写操作
- 灵活的事件定时器
- SO_REUSEPORT 端口重用
- 内置多种编解码器,支持对 TCP 数据流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,参考自 netty codec,而且支持自定制编解码器
- 支持 Windows 平台,基于
IOCP 事件驱动机制Go 标准网络库 - 实现
gnet
客户端
💡 核心设计
多线程/Go 程网络模型
主从多 Reactors
gnet
重新设计开发了一个新内置的多线程/Go 程网络模型:『主从多 Reactors』,这也是 netty
默认的多线程网络模型,下面是这个模型的原理图:
它的运行流程如下面的时序图:
主从多 Reactors + 线程/Go 程池
你可能会问一个问题:如果我的业务逻辑是阻塞的,那么在 EventHandler.React
注册方法里的逻辑也会阻塞,从而导致阻塞 event-loop 线程,这时候怎么办?
正如你所知,基于 gnet
编写你的网络服务器有一条最重要的原则:永远不能让你业务逻辑(一般写在 EventHandler.React
里)阻塞 event-loop 线程,这也是 netty
的一条最重要的原则,否则的话将会极大地降低服务器的吞吐量。
我的回答是,基于 gnet
的另一种多线程/Go 程网络模型:『带线程/Go 程池的主从多 Reactors』可以解决阻塞问题,这个新网络模型通过引入一个 worker pool 来解决业务逻辑阻塞的问题:它会在启动的时候初始化一个 worker pool,然后在把 EventHandler.React
里面的阻塞代码放到 worker pool 里执行,从而避免阻塞 event-loop 线程。
模型的架构图如下所示:
它的运行流程如下面的时序图:
gnet
通过利用 ants goroutine 池(一个基于 Go 开发的高性能的 goroutine 池 ,实现了对大规模 goroutines 的调度管理、goroutines 复用)来实现『主从多 Reactors + 线程/Go 程池』网络模型。关于 ants
的全部功能和使用,可以在 ants 文档 里找到。
gnet
内部集成了 ants
以及提供了 pool.goroutine.Default()
方法来初始化一个 ants
goroutine 池,然后你可以把 EventHandler.React
中阻塞的业务逻辑提交到 goroutine 池里执行,最后在 goroutine 池里的代码调用 gnet.Conn.AsyncWrite([]byte)
方法把处理完阻塞逻辑之后得到的输出数据异步写回客户端,这样就可以避免阻塞 event-loop 线程。
有关在 gnet
里使用 ants
goroutine 池的细节可以到这里进一步了解。
自动扩容的 Ring-Buffer
gnet
内置了 inbound 和 outbound 两个 buffers,基于 Ring-Buffer 原理实现,分别用来缓冲输入输出的网络数据以及管理内存。
对于 TCP 协议的流数据,使用 gnet
不需要业务方为了解析应用层协议而自己维护和管理 buffers, gnet
会替业务方完成缓冲和管理网络数据的任务,降低业务代码的复杂性以及降低开发者的心智负担,使得开发者能够专注于业务逻辑而非一些底层实现。
🎉 开始使用
前提
gnet
需要 Go 版本 >= 1.9。
安装
1go get -u github.com/panjf2000/gnet
gnet
支持作为一个 Go module 被导入,基于 Go 1.11 Modules (Go 1.11+),只需要在你的项目里直接 import "github.com/panjf2000/gnet"
,然后运行 go [build|run|test]
自动下载和构建需要的依赖包。
使用示例
详细的文档在这里:gnet 接口文档,不过下面我们先来了解下使用 gnet
的简略方法。
用 gnet
来构建网络服务器是非常简单的,只需要实现 gnet.EventHandler
接口然后把你关心的事件函数注册到里面,最后把它连同监听地址一起传递给 gnet.Serve
函数就完成了。在服务器开始工作之后,每一条到来的网络连接会在各个事件之间传递,如果你想在某个事件中关闭某条连接或者关掉整个服务器的话,直接在事件函数里把 gnet.Action
设置成 Cosed
或者 Shutdown
就行了。
Echo 服务器是一种最简单网络服务器,把它作为 gnet
的入门例子在再合适不过了,下面是一个最简单的 echo server,它监听了 9000 端口:
不带阻塞逻辑的 echo 服务器
Old version(<=v1.0.0-rc.4)
1package main
2
3import (
4 "log"
5
6 "github.com/panjf2000/gnet"
7)
8
9type echoServer struct {
10 *gnet.EventServer
11}
12
13func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
14 out = c.Read()
15 c.ResetBuffer()
16 return
17}
18
19func main() {
20 echo := new(echoServer)
21 log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
22}
1package main
2
3import (
4 "log"
5
6 "github.com/panjf2000/gnet"
7)
8
9type echoServer struct {
10 *gnet.EventServer
11}
12
13func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
14 out = frame
15 return
16}
17
18func main() {
19 echo := new(echoServer)
20 log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
21}
正如你所见,上面的例子里 gnet
实例只注册了一个 EventHandler.React
事件。一般来说,主要的业务逻辑代码会写在这个事件方法里,这个方法会在服务器接收到客户端写过来的数据之时被调用,此时的输入参数: frame
已经是解码过后的一个完整的网络数据包,一般来说你需要实现 gnet
的codec 接口作为你自己的业务编解码器来处理 TCP 组包和分包的问题,如果你不实现那个接口的话,那么 gnet
将会使用默认的 codec,这意味着在 EventHandler.React
被触发调用之时输入参数: frame
里存储的是所有网络数据:包括最新的以及还在 buffer 里的旧数据,然后处理输入数据(这里只是把数据 echo 回去)并且在处理完之后把需要输出的数据赋值给 out
变量并返回,接着输出的数据会经过编码,最后被写回客户端。
带阻塞逻辑的 echo 服务器
Old version(<=v1.0.0-rc.4)
1package main
2
3import (
4 "log"
5 "time"
6
7 "github.com/panjf2000/gnet"
8 "github.com/panjf2000/gnet/pool/goroutine"
9)
10
11type echoServer struct {
12 *gnet.EventServer
13 pool *goroutine.Pool
14}
15
16func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
17 data := append([]byte{}, c.Read()...)
18 c.ResetBuffer()
19
20 // Use ants pool to unblock the event-loop.
21 _ = es.pool.Submit(func() {
22 time.Sleep(1 * time.Second)
23 c.AsyncWrite(data)
24 })
25
26 return
27}
28
29func main() {
30 p := goroutine.Default()
31 defer p.Release()
32
33 echo := &echoServer{pool: p}
34 log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
35}
1package main
2
3import (
4 "log"
5 "time"
6
7 "github.com/panjf2000/gnet"
8 "github.com/panjf2000/gnet/pool/goroutine"
9)
10
11type echoServer struct {
12 *gnet.EventServer
13 pool *goroutine.Pool
14}
15
16func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
17 data := append([]byte{}, frame...)
18
19 // Use ants pool to unblock the event-loop.
20 _ = es.pool.Submit(func() {
21 time.Sleep(1 * time.Second)
22 c.AsyncWrite(data)
23 })
24
25 return
26}
27
28func main() {
29 p := goroutine.Default()
30 defer p.Release()
31
32 echo := &echoServer{pool: p}
33 log.Fatal(gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true)))
34}
正如我在『主从多 Reactors + 线程/Go 程池』那一节所说的那样,如果你的业务逻辑里包含阻塞代码,那么你应该把这些阻塞代码变成非阻塞的,比如通过把这部分代码放到独立的 goroutines 去运行,但是要注意一点,如果你的服务器处理的流量足够的大,那么这种做法将会导致创建大量的 goroutines 极大地消耗系统资源,所以我一般建议你用 goroutine pool 来做 goroutines 的复用和管理,以及节省系统资源。
各种 gnet 示例:
TCP Echo Server
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7
8 "github.com/panjf2000/gnet"
9)
10
11type echoServer struct {
12 *gnet.EventServer
13}
14
15func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
16 log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
17 srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
18 return
19}
20func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
21 // Echo synchronously.
22 out = frame
23 return
24
25 /*
26 // Echo asynchronously.
27 data := append([]byte{}, frame...)
28 go func() {
29 time.Sleep(time.Second)
30 c.AsyncWrite(data)
31 }()
32 return
33 */
34}
35
36func main() {
37 var port int
38 var multicore bool
39
40 // Example command: go run echo.go --port 9000 --multicore=true
41 flag.IntVar(&port, "port", 9000, "--port 9000")
42 flag.BoolVar(&multicore, "multicore", false, "--multicore true")
43 flag.Parse()
44 echo := new(echoServer)
45 log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore)))
46}
UDP Echo Server
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7
8 "github.com/panjf2000/gnet"
9)
10
11type echoServer struct {
12 *gnet.EventServer
13}
14
15func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
16 log.Printf("UDP Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
17 srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
18 return
19}
20func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
21 // Echo synchronously.
22 out = frame
23 return
24
25 /*
26 // Echo asynchronously.
27 data := append([]byte{}, frame...)
28 go func() {
29 time.Sleep(time.Second)
30 c.SendTo(data)
31 }()
32 return
33 */
34}
35
36func main() {
37 var port int
38 var multicore, reuseport bool
39
40 // Example command: go run echo.go --port 9000 --multicore=true --reuseport=true
41 flag.IntVar(&port, "port", 9000, "--port 9000")
42 flag.BoolVar(&multicore, "multicore", false, "--multicore true")
43 flag.BoolVar(&reuseport, "reuseport", false, "--reuseport true")
44 flag.Parse()
45 echo := new(echoServer)
46 log.Fatal(gnet.Serve(echo, fmt.Sprintf("udp://:%d", port), gnet.WithMulticore(multicore), gnet.WithReusePort(reuseport)))
47}
UDS Echo Server
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7
8 "github.com/panjf2000/gnet"
9)
10
11type echoServer struct {
12 *gnet.EventServer
13}
14
15func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
16 log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n",
17 srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
18 return
19}
20func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
21 // Echo synchronously.
22 out = frame
23 return
24
25 /*
26 // Echo asynchronously.
27 data := append([]byte{}, frame...)
28 go func() {
29 time.Sleep(time.Second)
30 c.AsyncWrite(data)
31 }()
32 return
33 */
34}
35
36func main() {
37 var addr string
38 var multicore bool
39
40 // Example command: go run echo.go --sock echo.sock --multicore=true
41 flag.StringVar(&addr, "sock", "echo.sock", "--port 9000")
42 flag.BoolVar(&multicore, "multicore", false, "--multicore true")
43 flag.Parse()
44
45 echo := new(echoServer)
46 log.Fatal(gnet.Serve(echo, fmt.Sprintf("unix://%s", addr), gnet.WithMulticore(multicore)))
47}
HTTP Server
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7 "strconv"
8 "strings"
9 "time"
10 "unsafe"
11
12 "github.com/panjf2000/gnet"
13)
14
15var res string
16
17type request struct {
18 proto, method string
19 path, query string
20 head, body string
21 remoteAddr string
22}
23
24type httpServer struct {
25 *gnet.EventServer
26}
27
28var errMsg = "Internal Server Error"
29var errMsgBytes = []byte(errMsg)
30
31type httpCodec struct {
32 req request
33}
34
35func (hc *httpCodec) Encode(c gnet.Conn, buf []byte) (out []byte, err error) {
36 if c.Context() == nil {
37 return buf, nil
38 }
39 return appendResp(out, "500 Error", "", errMsg+"\n"), nil
40}
41
42func (hc *httpCodec) Decode(c gnet.Conn) (out []byte, err error) {
43 buf := c.Read()
44 c.ResetBuffer()
45
46 // process the pipeline
47 var leftover []byte
48pipeline:
49 leftover, err = parseReq(buf, &hc.req)
50 // bad thing happened
51 if err != nil {
52 c.SetContext(err)
53 return nil, err
54 } else if len(leftover) == len(buf) {
55 // request not ready, yet
56 return
57 }
58 out = appendHandle(out, res)
59 buf = leftover
60 goto pipeline
61}
62
63func (hs *httpServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
64 log.Printf("HTTP server is listening on %s (multi-cores: %t, loops: %d)\n",
65 srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
66 return
67}
68
69func (hs *httpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
70 if c.Context() != nil {
71 // bad thing happened
72 out = errMsgBytes
73 action = gnet.Close
74 return
75 }
76 // handle the request
77 out = frame
78 return
79}
80
81func main() {
82 var port int
83 var multicore bool
84
85 // Example command: go run http.go --port 8080 --multicore=true
86 flag.IntVar(&port, "port", 8080, "server port")
87 flag.BoolVar(&multicore, "multicore", true, "multicore")
88 flag.Parse()
89
90 res = "Hello World!\r\n"
91
92 http := new(httpServer)
93 hc := new(httpCodec)
94
95 // Start serving!
96 log.Fatal(gnet.Serve(http, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithCodec(hc)))
97}
98
99// appendHandle handles the incoming request and appends the response to
100// the provided bytes, which is then returned to the caller.
101func appendHandle(b []byte, res string) []byte {
102 return appendResp(b, "200 OK", "", res)
103}
104
105// appendResp will append a valid http response to the provide bytes.
106// The status param should be the code plus text such as "200 OK".
107// The head parameter should be a series of lines ending with "\r\n" or empty.
108func appendResp(b []byte, status, head, body string) []byte {
109 b = append(b, "HTTP/1.1"...)
110 b = append(b, ' ')
111 b = append(b, status...)
112 b = append(b, '\r', '\n')
113 b = append(b, "Server: gnet\r\n"...)
114 b = append(b, "Date: "...)
115 b = time.Now().AppendFormat(b, "Mon, 02 Jan 2006 15:04:05 GMT")
116 b = append(b, '\r', '\n')
117 if len(body) > 0 {
118 b = append(b, "Content-Length: "...)
119 b = strconv.AppendInt(b, int64(len(body)), 10)
120 b = append(b, '\r', '\n')
121 }
122 b = append(b, head...)
123 b = append(b, '\r', '\n')
124 if len(body) > 0 {
125 b = append(b, body...)
126 }
127 return b
128}
129
130func b2s(b []byte) string {
131 return *(*string)(unsafe.Pointer(&b))
132}
133
134// parseReq is a very simple http request parser. This operation
135// waits for the entire payload to be buffered before returning a
136// valid request.
137func parseReq(data []byte, req *request) (leftover []byte, err error) {
138 sdata := b2s(data)
139 var i, s int
140 var head string
141 var clen int
142 var q = -1
143 // method, path, proto line
144 for ; i < len(sdata); i++ {
145 if sdata[i] == ' ' {
146 req.method = sdata[s:i]
147 for i, s = i+1, i+1; i < len(sdata); i++ {
148 if sdata[i] == '?' && q == -1 {
149 q = i - s
150 } else if sdata[i] == ' ' {
151 if q != -1 {
152 req.path = sdata[s:q]
153 req.query = req.path[q+1 : i]
154 } else {
155 req.path = sdata[s:i]
156 }
157 for i, s = i+1, i+1; i < len(sdata); i++ {
158 if sdata[i] == '\n' && sdata[i-1] == '\r' {
159 req.proto = sdata[s:i]
160 i, s = i+1, i+1
161 break
162 }
163 }
164 break
165 }
166 }
167 break
168 }
169 }
170 if req.proto == "" {
171 return data, fmt.Errorf("malformed request")
172 }
173 head = sdata[:s]
174 for ; i < len(sdata); i++ {
175 if i > 1 && sdata[i] == '\n' && sdata[i-1] == '\r' {
176 line := sdata[s : i-1]
177 s = i + 1
178 if line == "" {
179 req.head = sdata[len(head)+2 : i+1]
180 i++
181 if clen > 0 {
182 if len(sdata[i:]) < clen {
183 break
184 }
185 req.body = sdata[i : i+clen]
186 i += clen
187 }
188 return data[i:], nil
189 }
190 if strings.HasPrefix(line, "Content-Length:") {
191 n, err := strconv.ParseInt(strings.TrimSpace(line[len("Content-Length:"):]), 10, 64)
192 if err == nil {
193 clen = int(n)
194 }
195 }
196 }
197 }
198 // not enough data
199 return data, nil
200}
Push Server
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7 "sync"
8 "time"
9
10 "github.com/panjf2000/gnet"
11)
12
13type pushServer struct {
14 *gnet.EventServer
15 tick time.Duration
16 connectedSockets sync.Map
17}
18
19func (ps *pushServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
20 log.Printf("Push server is listening on %s (multi-cores: %t, loops: %d), "+
21 "pushing data every %s ...\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop, ps.tick.String())
22 return
23}
24func (ps *pushServer) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
25 log.Printf("Socket with addr: %s has been opened...\n", c.RemoteAddr().String())
26 ps.connectedSockets.Store(c.RemoteAddr().String(), c)
27 return
28}
29func (ps *pushServer) OnClosed(c gnet.Conn, err error) (action gnet.Action) {
30 log.Printf("Socket with addr: %s is closing...\n", c.RemoteAddr().String())
31 ps.connectedSockets.Delete(c.RemoteAddr().String())
32 return
33}
34func (ps *pushServer) Tick() (delay time.Duration, action gnet.Action) {
35 log.Println("It's time to push data to clients!!!")
36 ps.connectedSockets.Range(func(key, value interface{}) bool {
37 addr := key.(string)
38 c := value.(gnet.Conn)
39 c.AsyncWrite([]byte(fmt.Sprintf("heart beating to %s\n", addr)))
40 return true
41 })
42 delay = ps.tick
43 return
44}
45func (ps *pushServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
46 out = frame
47 return
48}
49
50func main() {
51 var port int
52 var multicore bool
53 var interval time.Duration
54 var ticker bool
55
56 // Example command: go run push.go --port 9000 --tick 1s --multicore=true
57 flag.IntVar(&port, "port", 9000, "server port")
58 flag.BoolVar(&multicore, "multicore", true, "multicore")
59 flag.DurationVar(&interval, "tick", 0, "pushing tick")
60 flag.Parse()
61 if interval > 0 {
62 ticker = true
63 }
64 push := &pushServer{tick: interval}
65 log.Fatal(gnet.Serve(push, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore), gnet.WithTicker(ticker)))
66}
Codec Client/Server
Client:
1// Reference https://github.com/smallnest/goframe/blob/master/_examples/goclient/client.go
2
3package main
4
5import (
6 "encoding/binary"
7 "fmt"
8 "net"
9
10 "github.com/smallnest/goframe"
11)
12
13func main() {
14 conn, err := net.Dial("tcp", "127.0.0.1:9000")
15 if err != nil {
16 panic(err)
17 }
18 defer conn.Close()
19
20 encoderConfig := goframe.EncoderConfig{
21 ByteOrder: binary.BigEndian,
22 LengthFieldLength: 4,
23 LengthAdjustment: 0,
24 LengthIncludesLengthFieldLength: false,
25 }
26
27 decoderConfig := goframe.DecoderConfig{
28 ByteOrder: binary.BigEndian,
29 LengthFieldOffset: 0,
30 LengthFieldLength: 4,
31 LengthAdjustment: 0,
32 InitialBytesToStrip: 4,
33 }
34
35 fc := goframe.NewLengthFieldBasedFrameConn(encoderConfig, decoderConfig, conn)
36 err = fc.WriteFrame([]byte("hello"))
37 if err != nil {
38 panic(err)
39 }
40 err = fc.WriteFrame([]byte("world"))
41 if err != nil {
42 panic(err)
43 }
44
45 buf, err := fc.ReadFrame()
46 if err != nil {
47 panic(err)
48 }
49 fmt.Println("received: ", string(buf))
50 buf, err = fc.ReadFrame()
51 if err != nil {
52 panic(err)
53 }
54 fmt.Println("received: ", string(buf))
55}
Server:
1package main
2
3import (
4 "encoding/binary"
5 "flag"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/panjf2000/gnet"
11 "github.com/panjf2000/gnet/pool/goroutine"
12)
13
14type codecServer struct {
15 *gnet.EventServer
16 addr string
17 multicore bool
18 async bool
19 codec gnet.ICodec
20 workerPool *goroutine.Pool
21}
22
23func (cs *codecServer) OnInitComplete(srv gnet.Server) (action gnet.Action) {
24 log.Printf("Test codec server is listening on %s (multi-cores: %t, loops: %d)\n",
25 srv.Addr.String(), srv.Multicore, srv.NumEventLoop)
26 return
27}
28
29func (cs *codecServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) {
30 if cs.async {
31 data := append([]byte{}, frame...)
32 _ = cs.workerPool.Submit(func() {
33 c.AsyncWrite(data)
34 })
35 return
36 }
37 out = frame
38 return
39}
40
41func testCodecServe(addr string, multicore, async bool, codec gnet.ICodec) {
42 var err error
43 if codec == nil {
44 encoderConfig := gnet.EncoderConfig{
45 ByteOrder: binary.BigEndian,
46 LengthFieldLength: 4,
47 LengthAdjustment: 0,
48 LengthIncludesLengthFieldLength: false,
49 }
50 decoderConfig := gnet.DecoderConfig{
51 ByteOrder: binary.BigEndian,
52 LengthFieldOffset: 0,
53 LengthFieldLength: 4,
54 LengthAdjustment: 0,
55 InitialBytesToStrip: 4,
56 }
57 codec = gnet.NewLengthFieldBasedFrameCodec(encoderConfig, decoderConfig)
58 }
59 cs := &codecServer{addr: addr, multicore: multicore, async: async, codec: codec, workerPool: goroutine.Default()}
60 err = gnet.Serve(cs, addr, gnet.WithMulticore(multicore), gnet.WithTCPKeepAlive(time.Minute*5), gnet.WithCodec(codec))
61 if err != nil {
62 panic(err)
63 }
64}
65
66func main() {
67 var port int
68 var multicore bool
69
70 // Example command: go run server.go --port 9000 --multicore=true
71 flag.IntVar(&port, "port", 9000, "server port")
72 flag.BoolVar(&multicore, "multicore", true, "multicore")
73 flag.Parse()
74 addr := fmt.Sprintf("tcp://:%d", port)
75 testCodecServe(addr, multicore, false, nil)
76}
Custom Codec Demo with Client/Server
protocol intro:
1// CustomLengthFieldProtocol
2// 测试用的协议,由以下字段构成:
3// version+actionType+dataLength+data
4// 其中 version+actionType+dataLength 为 header,data 为 payload
5type CustomLengthFieldProtocol struct {
6 Version uint16
7 ActionType uint16
8 DataLength uint32
9 Data []byte
10}
11
12// Encode ...
13func (cc *CustomLengthFieldProtocol) Encode(c gnet.Conn, buf []byte) ([]byte, error) {
14 result := make([]byte, 0)
15
16 buffer := bytes.NewBuffer(result)
17
18 // 取出`React()`时存入的参数
19 item := c.Context().(CustomLengthFieldProtocol)
20
21 if err := binary.Write(buffer, binary.BigEndian, item.ActionType); err != nil {
22 s := fmt.Sprintf("Pack version error , %v", err)
23 return nil, errors.New(s)
24 }
25
26 if err := binary.Write(buffer, binary.BigEndian, item.ActionType); err != nil {
27 s := fmt.Sprintf("Pack type error , %v", err)
28 return nil, errors.New(s)
29 }
30 dataLen := uint32(len(buf))
31 if err := binary.Write(buffer, binary.BigEndian, dataLen); err != nil {
32 s := fmt.Sprintf("Pack datalength error , %v", err)
33 return nil, errors.New(s)
34 }
35 if dataLen > 0 {
36 if err := binary.Write(buffer, binary.BigEndian, buf); err != nil {
37 s := fmt.Sprintf("Pack data error , %v", err)
38 return nil, errors.New(s)
39 }
40 }
41
42 return buffer.Bytes(), nil
43}
44
45// Decode ...
46func (cc *CustomLengthFieldProtocol) Decode(c gnet.Conn) ([]byte, error) {
47 // parse header
48 headerLen := DefaultHeadLength // uint16+uint16+uint32
49 if size, header := c.ReadN(headerLen); size == headerLen {
50 byteBuffer := bytes.NewBuffer(header)
51 var pbVersion, actionType uint16
52 var dataLength uint32
53 binary.Read(byteBuffer, binary.BigEndian, &pbVersion)
54 binary.Read(byteBuffer, binary.BigEndian, &actionType)
55 binary.Read(byteBuffer, binary.BigEndian, &dataLength)
56 // to check the protocol version and actionType,
57 // reset buffer if the version or actionType is not correct
58 if pbVersion != DefaultProtocolVersion || isCorrectAction(actionType) == false {
59 c.ResetBuffer()
60 log.Println("not normal protocol:", pbVersion, DefaultProtocolVersion, actionType, dataLength)
61 return nil, errors.New("not normal protocol")
62 }
63 // parse payload
64 dataLen := int(dataLength) //max int32 can contain 210MB payload
65 protocolLen := headerLen + dataLen
66 if dataSize, data := c.ReadN(protocolLen); dataSize == protocolLen {
67 c.ShiftN(protocolLen)
68
69 // return the payload of the data
70 return data[headerLen:], nil
71 }
72 return nil, errors.New("not enough payload data")
73
74 }
75 return nil, errors.New("not enough header data")
76}
Client/Server:
Check out the source code.
更详细的代码在这里:gnet 示例。
I/O 事件
gnet
目前支持的 I/O 事件如下:
EventHandler.OnInitComplete
当 server 初始化完成之后调用。EventHandler.OnOpened
当连接被打开的时候调用。EventHandler.OnClosed
当连接被关闭的之后调用。EventHandler.React
当 server 端接收到从 client 端发送来的数据的时候调用。(你的核心业务代码一般是写在这个方法里)EventHandler.Tick
服务器启动的时候会调用一次,之后就以给定的时间间隔定时调用一次,是一个定时器方法。EventHandler.PreWrite
预先写数据方法,在 server 端写数据回 client 端之前调用。
定时器
EventHandler.Tick
会每隔一段时间触发一次,间隔时间你可以自己控制,设定返回的 delay
变量就行。
定时器的第一次触发是在 gnet server 启动之后,如果你要设置定时器,别忘了设置 option 选项: WithTicker(true)
。
1events.Tick = func() (delay time.Duration, action Action) {
2 log.Printf("tick")
3 delay = time.Second
4 return
5}
UDP 支持
gnet
支持 UDP 协议,所以在 gnet.Serve
里绑定允许绑定 UDP 地址, gnet
的 UDP 支持有如下的特性:
- 网络数据的读入和写出不做缓冲,会一次性读写客户端。
EventHandler.OnOpened
和EventHandler.OnClosed
这两个事件在 UDP 下不可用,唯一可用的事件是React
。- TCP 里的异步写操作是
AsyncWrite([]byte)
方法,而在 UDP 里对应的方法是SendTo([]byte)
。
Unix Domain Socket 支持
gnet
还支持 UDS(Unix Domain Socket) 机制,只需要把类似 "unix://xxx" 的 UDS 地址传参给 gnet.Serve
函数绑定就行了。
在 gnet
里使用 UDS 和使用 TCP 没有什么不同,所有 TCP 协议下可以使用的事件函数都可以在 UDS 中使用。
使用多核
gnet.WithMulticore(true)
参数指定了 gnet
是否会使用多核来进行服务,如果是 true
的话就会使用多核,否则就是单核运行,利用的核心数一般是机器的 CPU 数量。
负载均衡
gnet
目前支持三种负载均衡算法: Round-Robin(轮询)
、 Source Addr Hash(源地址哈希)
和 Least-Connections(最少连接数)
,你可以通过传递 functional option 的 LB
(RoundRobin/LeastConnections/SourceAddrHash) 的值给 gnet.Serve
来指定要使用的负载均衡算法。
如果没有显示地指定,那么 gnet
将会使用 Round-Robin
作为默认的负载均衡算法。
SO_REUSEPORT 端口复用
服务器支持 SO_REUSEPORT 端口复用特性,允许多个 sockets 监听同一个端口,然后内核会帮你做好负载均衡,每次只唤醒一个 socket 来处理 connect
请求,避免惊群效应。
默认情况下, gnet
也不会有惊群效应,因为 gnet
默认的网络模型是主从多 Reactors,只会有一个主 reactor 在监听端口以及接受新连接。所以,开不开启 SO_REUSEPORT
选项是无关紧要的,只是开启了这个选项之后 gnet
的网络模型将会切换成 evio
的旧网络模型,这一点需要注意一下。
开启这个功能也很简单,使用 functional options 设置一下即可:
1gnet.Serve(events, "tcp://:9000", gnet.WithMulticore(true), gnet.WithReusePort(true)))
多种内置的 TCP 流编解码器
gnet
内置了多种用于 TCP 流分包的编解码器。
目前一共实现了 4 种常见的编解码器:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,基本上能满足大多数应用场景的需求了;而且 gnet
还允许用户实现自己的编解码器:只需要实现 gnet.ICodec 接口,并通过 functional options 替换掉内部默认的编解码器即可。
这里有一个使用编解码器对 TCP 流分包的例子。
📊 性能测试
TechEmpower 性能测试
1# Hardware
2CPU: 28 HT Cores Intel(R) Xeon(R) Gold 5120 CPU @ 2.20GHz
3Mem: 32GB RAM
4OS : Ubuntu 18.04.3 4.15.0-88-generic #88-Ubuntu
5Net: Switched 10-gigabit ethernet
6Go : go1.14.x linux/amd64
这是包含全部编程语言框架的性能排名前 50 的结果,总榜单包含了全世界共计 382 个框架。
这是 Go 语言分类下的全部排名。
完整的排行可以通过 Full ranking list of Plaintext 查看。
同类型的网络库性能对比
Linux (epoll)
系统参数
1# Machine information
2 OS : Ubuntu 18.04/x86_64
3 CPU : 8 Virtual CPUs
4 Memory : 16.0 GiB
5
6# Go version and configurations
7Go Version : go1.12.9 linux/amd64
8GOMAXPROCS=8
Echo Server
HTTP Server
FreeBSD (kqueue)
系统参数
1# Machine information
2 OS : macOS Mojave 10.14.6/x86_64
3 CPU : 4 CPUs
4 Memory : 8.0 GiB
5
6# Go version and configurations
7Go Version : go version go1.12.9 darwin/amd64
8GOMAXPROCS=4
Echo Server
HTTP Server
📚 相关文章
- A Million WebSockets and Go
- Going Infinite, handling 1M websockets connections in Go
- Go netpoll I/O 多路复用构建原生网络模型之源码深度解析
- gnet: 一个轻量级且高性能的 Golang 网络库
- 最快的 Go 网络框架 gnet 来啦!
微信公众号

关于版权和转载
本文由 潘建锋 创作,采用 署名 4.0 国际 (CC BY 4.0) 国际许可协议进行授权。本站文章除注明转载/出处外,均为本站原创或翻译,转载时请务必署名,否则,本人将保留一切追究责任的权利。
转载规范
标题:gnet: 一个轻量级且高性能的 Go 网络框架作者:潘建锋
原文:HTTPS://strikefreedom.top/go-event-loop-networking-library-gnet
Comments | 27 条评论
如何做 player 进入和退出呢?进入时人数加1,全局广播。退出时人数减1,全局广播。
进入还可以做判断,每次判断是否存在,不存在就加1;退出怎么处理呢?
知道了,谢谢,使用
Conn.AsyncWrite(data)
同问?请指教
请教,server 端,建立好监听以后,
gnet.Serve(echo, "tcp://:9000", gnet.WithMulticore(true))
如何在server端往client端发数据?调用什么函数?谢谢
最新版本1.6.3为什么go get之后不能用了? undefined: gnet.Events
拷贝代码会把行号一起拷贝下来
evio里面reactor的实现好像是没有区分主的reactor和从的reactor,每个reactor都可以负责接受请求;gnet这里是把两者做了区分。这两种实现方式各有什么优缺点呢
谢谢大佬关注:我还有个疑惑
这2个应该都不是阻塞的行为 那么为什么会需要这个异步写的方法,直接conn->write不行吗👀️
不会,这个方法是往任务队列里添加任务以及可能往 eventfd 里写一个信号,不会阻塞。
麻烦大佬解惑:
AsyncWrite
这个方法不会阻塞住eventloop嘛感谢,我学下一下。我准备把goroutine池用在我的项目里,嘿嘿。
Sketch
大佬,能指点一二吗。😰
我比较好奇这些设计图是用什么工具画的 👍
好像的确应该是像你说的那样,我过会儿改下,谢谢指出!
看到 ring_buffer.go:314 有点疑惑,
为什么不是分成 r.r->r.size 0->r.w 两次
对,开启多核是多 IO 线程,Reactor 线程还是只有一个,多主 Reactors 的实现暂时没有计划,而且其实单 Reactor 已经足够了,Redis 的 IO multiplexer 不也是单线程,一样性能极高。
大佬,多核也是只有一个主 reactor 在监听端口吗?后面会有多主 Reactor 的计划吗
定时推送示例:
https://github.com/panjf2000/gnet/blob/master/examples/push-server/main.go
在Event.Opened 方法里可以在连接第一次被打开的时候主动写往客户端写数据,或者调用 conn.Wake() 方法也可以异步写数据。
大佬 主动给客户端发消息 怎么操作
嗯,谢谢提醒。
图片地址建议用绝对路径,不然社区帖子上显示不了 🤣
别别,不是大佬,小菜鸟😅,那个 bug 我也发现了,已解决,谢谢提醒~~
👍 感谢大佬关注,evio 里 eventfd 有个只 write 但是没去 read 的 bug ,如果没处理需要处理下。
嗯,ring-buffer 那一块的逻辑部分借鉴自你的项目,源码里保留版权了😄
😳 感觉和我的做法很像啊