MCPcopy
hub / github.com/Allenxuxu/gev

github.com/Allenxuxu/gev @v0.5.0 sqlite

repository ↗ · DeepWiki ↗ · release v0.5.0 ↗
446 symbols 1,390 edges 61 files 225 documented · 50%
README

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size Sourcegraph

中文 | English

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库 / websocket server,支持自定义协议,轻松快速搭建高性能服务器。

特点

  • 基于 epoll 和 kqueue 实现的高性能事件循环
  • 支持多核多线程
  • 动态扩容 Ring Buffer 实现的读写缓冲区
  • 异步读写
  • 自动清理空闲连接
  • SO_REUSEPORT 端口重用支持
  • 支持 WebSocket/Protobuf, 自定义协议
  • 支持定时任务,延时任务
  • 开箱即用的高性能 websocket server

网络模型

gev 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

性能测试

📈 测试数据

测试环境 Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

吞吐量测试

限制 GOMAXPROCS=1(单线程),1 个 work 协程

image

限制 GOMAXPROCS=4,4 个 work 协程

image

其他测试

速度测试

和同类库的简单性能比较, 压测方式与 evio 项目相同。

  • gnet
  • eviop
  • evio
  • net (标准库)

限制 GOMAXPROCS=1,1 个 work 协程

image

限制 GOMAXPROCS=1,4 个 work 协程

image

限制 GOMAXPROCS=4,4 个 work 协程

image

安装

go get -u github.com/Allenxuxu/gev

快速入门

echo demo

package main

import (
    "flag"
    "net/http"
    _ "net/http/pprof"
    "strconv"
    "time"

    "github.com/Allenxuxu/gev"
    "github.com/Allenxuxu/gev/log"
    "github.com/Allenxuxu/toolkit/sync/atomic"
)

type example struct {
    Count atomic.Int64
}

func (s *example) OnConnect(c *gev.Connection) {
    s.Count.Add(1)
    //log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
    //log.Println("OnMessage")
    out = data
    return
}

func (s *example) OnClose(c *gev.Connection) {
    s.Count.Add(-1)
    //log.Println("OnClose")
}

func main() {
    go func() {
        if err := http.ListenAndServe(":6060", nil); err != nil {
            panic(err)
        }
    }()

    handler := new(example)
    var port int
    var loops int

    flag.IntVar(&port, "port", 1833, "server port")
    flag.IntVar(&loops, "loops", -1, "num loops")
    flag.Parse()

    s, err := gev.NewServer(handler,
        gev.Network("tcp"),
        gev.Address(":"+strconv.Itoa(port)),
        gev.NumLoops(loops),
        gev.MetricsServer("", ":9091"),
    )
    if err != nil {
        panic(err)
    }

    s.RunEvery(time.Second*2, func() {
        log.Info("connections :", handler.Count.Get())
    })

    s.Start()
}

Handler 是一个接口,我们的程序必须实现它。

type CallBack interface {
    OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
    OnClose(c *Connection)
}

type Handler interface {
    CallBack
    OnConnect(c *Connection)
}

OnMessage 会在一个完整的数据帧到来时被回调。用户可此可以拿到数据,处理业务逻辑,并返回需要发送的数据。

在有数据到来时,gev 并非立刻回调 OnMessage ,而是会先回调一个 UnPacket 函数。大概执行逻辑如下:

ctx, receivedData := c.protocol.UnPacket(c, buffer)
for ctx != nil || len(receivedData) != 0 {
    sendData := c.callBack.OnMessage(c, ctx, receivedData)
    if sendData != nil {
        *tmpBuffer = append(*tmpBuffer, c.protocol.Packet(c, sendData)...)
    }

    ctx, receivedData = c.protocol.UnPacket(c, buffer)
}

protocol

UnPacket 函数中会查看 ringbuffer 中的数据是否是一个完整的数据帧,如果是则会将数据拆包并返回 payload 数据;如果还不是一个完整的数据帧,则直接返回。

UnPacket 的返回值 (interface{}, []byte) 会作为 OnMessage 的入参 ctx interface{}, data []byte 被传入并回调。ctx 被设计用来传递在 UnPacket 函数中解析数据帧时生成的特殊信息(复杂的数据帧协议会需要),data 则是用来传递 payload 数据。

type Protocol interface {
    UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
    Packet(c *Connection, data interface{}) []byte
}


type DefaultProtocol struct{}

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
    s, e := buffer.PeekAll()
    if len(e) > 0 {
        size := len(s) + len(e)
        userBuffer := *c.UserBuffer()
        if size > cap(userBuffer) {
            userBuffer = make([]byte, size)
            *c.UserBuffer() = userBuffer
        }

        copy(userBuffer, s)
        copy(userBuffer[len(s):], e)

        return nil, userBuffer
    } else {
        buffer.RetrieveAll()

        return nil, s
    }
}

func (d *DefaultProtocol) Packet(c *Connection, data interface{}) []byte {
    return data.([]byte)
}

如上,gev 提供一个默认的 Protocol 实现,会将接受缓冲区中( ringbuffer )的所有数据取出。 在实际使用中,通常会有自己的数据帧协议,gev 可以以插件的形式来设置:在创建 Server 的时候通过可变参数设置。

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

更详细的使用方式可以参考示例:自定义协议

Connection 还提供 Send 方法来发送数据。Send 并不会立刻发送数据,而是先添加到 event loop 的任务队列中,然后唤醒 event loop 去发送。

更详细的使用方式可以参考示例:服务端定时推送

func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error

Connection ShutdownWrite 会关闭写端,从而断开连接。

更详细的使用方式可以参考示例:限制最大连接数

func (c *Connection) ShutdownWrite() error

RingBuffer 是一个动态扩容的循环缓冲区实现。

WebSocket 支持

WebSocket 协议构建在 TCP 协议之上的,所以 gev 无需内置它,而是通过插件的形式提供支持,在 plugins/websocket 目录。

code

type Protocol struct {
    upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
    return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
    upgraded := c.Context()
    if upgraded == nil {
        var err error
        out, _, err = p.upgrade.Upgrade(buffer)
        if err != nil {
            log.Println("Websocket Upgrade :", err)
            return
        }
        c.SetContext(true)
    } else {
        header, err := ws.VirtualReadHeader(buffer)
        if err != nil {
            log.Println(err)
            return
        }
        if buffer.VirtualLength() >= int(header.Length) {
            buffer.VirtualFlush()

            payload := make([]byte, int(header.Length))
            _, _ = buffer.Read(payload)

            if header.Masked {
                ws.Cipher(payload, header.Mask, 0)
            }

            ctx = &header
            out = payload
        } else {
            buffer.VirtualRevert()
        }
    }
    return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
    return data
}

详细实现可以插件实现查看 源码,使用方式可以查看 websocket 示例

示例

请我喝杯咖啡

Paypal: Paypal/AllenXuxu

致谢

感谢 JetBrains 提供的免费开源 License

参考

本项目受 evio 启发,参考 muduo 实现。

Extension points exported contracts — how you extend this code

Handler (Interface)
Handler Server 注册接口 [17 implementers]
server.go
Handler (Interface)
Handler Server 注册接口 [17 implementers]
server_std.go
WSHandler (Interface)
WSHandler WebSocket Server 注册接口 [17 implementers]
plugins/websocket/wrap.go
Protocol (Interface)
Protocol 自定义协议编解码接口 [4 implementers]
protocol.go
CallBack (Interface)
(no doc) [17 implementers]
connection.go
Socket (Interface)
Socket 接口 [2 implementers]
eventloop/eventloop.go
Logger (Interface)
Logger is a generic logging interface [1 implementers]
log/log.go
Option (FuncType)
Option ...
options.go

Core symbols most depended-on inside this repo

Get
called by 45
context.go
WriteString
called by 34
plugins/websocket/ws/http.go
Close
called by 29
eventloop/eventloop.go
Write
called by 29
plugins/websocket/ws/http.go
Error
called by 27
log/log.go
Set
called by 23
context.go
Address
called by 22
options.go
NewServer
called by 20
server.go

Shape

Method 222
Function 146
Struct 51
TypeAlias 11
Interface 9
FuncType 7

Languages

Go100%

Modules by API surface

server_std.go31 symbols
plugins/websocket/ws/frame.go28 symbols
example/protobuf/proto/user.pb.go28 symbols
plugins/websocket/ws/http.go26 symbols
log/log.go26 symbols
connection.go23 symbols
eventloop/eventloop.go17 symbols
server_test.go14 symbols
poller/epoll.go14 symbols
server_conn_test.go12 symbols
options.go11 symbols
server.go10 symbols

Dependencies from manifests, versioned

github.com/Allenxuxu/eviopv0.0.0-2019090112380 · 1×
github.com/Allenxuxu/ringbufferv0.0.11 · 1×
github.com/Allenxuxu/toolkitv0.0.1 · 1×
github.com/RussellLuo/timingwheelv0.0.0-2020102901590 · 1×
github.com/gobwas/httpheadv0.0.0-2018013018473 · 1×
github.com/gobwas/poolv0.2.0 · 1×
github.com/libp2p/go-reuseportv0.0.1 · 1×
github.com/panjf2000/gnetv1.4.0 · 1×
github.com/tidwall/eviov1.0.2 · 1×

For agents

$ claude mcp add gev \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact