MCPcopy
hub / github.com/gobwas/ws

github.com/gobwas/ws @v1.4.0 sqlite

repository ↗ · DeepWiki ↗ · release v1.4.0 ↗
493 symbols 1,557 edges 63 files 251 documented · 51%
README

ws

GoDoc CI

RFC6455 WebSocket implementation in Go.

Features

  • Zero-copy upgrade
  • No intermediate allocations during I/O
  • Low-level API which allows to build your own logic of packet handling and buffers reuse
  • High-level wrappers and helpers around API in wsutil package, which allow to start fast without digging the protocol internals

Documentation

GoDoc.

Why

Existing WebSocket implementations do not allow users to reuse I/O buffers between connections in clear way. This library aims to export efficient low-level interface for working with the protocol without forcing only one way it could be used.

By the way, if you want get the higher-level tools, you can use wsutil package.

Status

Library is tagged as v1* so its API must not be broken during some improvements or refactoring.

This implementation of RFC6455 passes Autobahn Test Suite and currently has about 78% coverage.

Examples

Example applications using ws are developed in separate repository ws-examples.

Usage

The higher-level example of WebSocket echo server:

package main

import (
    "net/http"

    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsutil"
)

func main() {
    http.ListenAndServe(":8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        conn, _, _, err := ws.UpgradeHTTP(r, w)
        if err != nil {
            // handle error
        }
        go func() {
            defer conn.Close()

            for {
                msg, op, err := wsutil.ReadClientData(conn)
                if err != nil {
                    // handle error
                }
                err = wsutil.WriteServerMessage(conn, op, msg)
                if err != nil {
                    // handle error
                }
            }
        }()
    }))
}

Lower-level, but still high-level example:

import (
    "net/http"
    "io"

    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsutil"
)

func main() {
    http.ListenAndServe(":8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        conn, _, _, err := ws.UpgradeHTTP(r, w)
        if err != nil {
            // handle error
        }
        go func() {
            defer conn.Close()

            var (
                state  = ws.StateServerSide
                reader = wsutil.NewReader(conn, state)
                writer = wsutil.NewWriter(conn, state, ws.OpText)
            )
            for {
                header, err := reader.NextFrame()
                if err != nil {
                    // handle error
                }

                // Reset writer to write frame with right operation code.
                writer.Reset(conn, state, header.OpCode)

                if _, err = io.Copy(writer, reader); err != nil {
                    // handle error
                }
                if err = writer.Flush(); err != nil {
                    // handle error
                }
            }
        }()
    }))
}

We can apply the same pattern to read and write structured responses through a JSON encoder and decoder.:

    ...
    var (
        r = wsutil.NewReader(conn, ws.StateServerSide)
        w = wsutil.NewWriter(conn, ws.StateServerSide, ws.OpText)
        decoder = json.NewDecoder(r)
        encoder = json.NewEncoder(w)
    )
    for {
        hdr, err = r.NextFrame()
        if err != nil {
            return err
        }
        if hdr.OpCode == ws.OpClose {
            return io.EOF
        }
        var req Request
        if err := decoder.Decode(&req); err != nil {
            return err
        }
        var resp Response
        if err := encoder.Encode(&resp); err != nil {
            return err
        }
        if err = w.Flush(); err != nil {
            return err
        }
    }
    ...

The lower-level example without wsutil:

package main

import (
    "net"
    "io"

    "github.com/gobwas/ws"
)

func main() {
    ln, err := net.Listen("tcp", "localhost:8080")
    if err != nil {
        log.Fatal(err)
    }

    for {
        conn, err := ln.Accept()
        if err != nil {
            // handle error
        }
        _, err = ws.Upgrade(conn)
        if err != nil {
            // handle error
        }

        go func() {
            defer conn.Close()

            for {
                header, err := ws.ReadHeader(conn)
                if err != nil {
                    // handle error
                }

                payload := make([]byte, header.Length)
                _, err = io.ReadFull(conn, payload)
                if err != nil {
                    // handle error
                }
                if header.Masked {
                    ws.Cipher(payload, header.Mask, 0)
                }

                // Reset the Masked flag, server frames must not be masked as
                // RFC6455 says.
                header.Masked = false

                if err := ws.WriteHeader(conn, header); err != nil {
                    // handle error
                }
                if _, err := conn.Write(payload); err != nil {
                    // handle error
                }

                if header.OpCode == ws.OpClose {
                    return
                }
            }
        }()
    }
}

Zero-copy upgrade

Zero-copy upgrade helps to avoid unnecessary allocations and copying while handling HTTP Upgrade request.

Processing of all non-websocket headers is made in place with use of registered user callbacks whose arguments are only valid until callback returns.

The simple example looks like this:

package main

import (
    "net"
    "log"

    "github.com/gobwas/ws"
)

func main() {
    ln, err := net.Listen("tcp", "localhost:8080")
    if err != nil {
        log.Fatal(err)
    }
    u := ws.Upgrader{
        OnHeader: func(key, value []byte) (err error) {
            log.Printf("non-websocket header: %q=%q", key, value)
            return
        },
    }
    for {
        conn, err := ln.Accept()
        if err != nil {
            // handle error
        }

        _, err = u.Upgrade(conn)
        if err != nil {
            // handle error
        }
    }
}

Usage of ws.Upgrader here brings ability to control incoming connections on tcp level and simply not to accept them by some logic.

Zero-copy upgrade is for high-load services which have to control many resources such as connections buffers.

The real life example could be like this:

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "net/http"
    "runtime"

    "github.com/gobwas/httphead"
    "github.com/gobwas/ws"
)

func main() {
    ln, err := net.Listen("tcp", "localhost:8080")
    if err != nil {
        // handle error
    }

    // Prepare handshake header writer from http.Header mapping.
    header := ws.HandshakeHeaderHTTP(http.Header{
        "X-Go-Version": []string{runtime.Version()},
    })

    u := ws.Upgrader{
        OnHost: func(host []byte) error {
            if string(host) == "github.com" {
                return nil
            }
            return ws.RejectConnectionError(
                ws.RejectionStatus(403),
                ws.RejectionHeader(ws.HandshakeHeaderString(
                    "X-Want-Host: github.com\r\n",
                )),
            )
        },
        OnHeader: func(key, value []byte) error {
            if string(key) != "Cookie" {
                return nil
            }
            ok := httphead.ScanCookie(value, func(key, value []byte) bool {
                // Check session here or do some other stuff with cookies.
                // Maybe copy some values for future use.
                return true
            })
            if ok {
                return nil
            }
            return ws.RejectConnectionError(
                ws.RejectionReason("bad cookie"),
                ws.RejectionStatus(400),
            )
        },
        OnBeforeUpgrade: func() (ws.HandshakeHeader, error) {
            return header, nil
        },
    }
    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }
        _, err = u.Upgrade(conn)
        if err != nil {
            log.Printf("upgrade error: %s", err)
        }
    }
}

Compression

There is a ws/wsflate package to support Permessage-Deflate Compression Extension.

It provides minimalistic I/O wrappers to be used in conjunction with any deflate implementation (for example, the standard library's compress/flate).

It is also compatible with wsutil's reader and writer by providing wsflate.MessageState type, which implements wsutil.SendExtension and wsutil.RecvExtension interfaces.

package main

import (
    "bytes"
    "log"
    "net"

    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsflate"
)

func main() {
    ln, err := net.Listen("tcp", "localhost:8080")
    if err != nil {
        // handle error
    }
    e := wsflate.Extension{
        // We are using default parameters here since we use
        // wsflate.{Compress,Decompress}Frame helpers below in the code.
        // This assumes that we use standard compress/flate package as flate
        // implementation.
        Parameters: wsflate.DefaultParameters,
    }
    u := ws.Upgrader{
        Negotiate: e.Negotiate,
    }
    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }

        // Reset extension after previous upgrades.
        e.Reset()

        _, err = u.Upgrade(conn)
        if err != nil {
            log.Printf("upgrade error: %s", err)
            continue
        }
        if _, ok := e.Accepted(); !ok {
            log.Printf("didn't negotiate compression for %s", conn.RemoteAddr())
            conn.Close()
            continue
        }

        go func() {
            defer conn.Close()
            for {
                frame, err := ws.ReadFrame(conn)
                if err != nil {
                    // Handle error.
                    return
                }

                frame = ws.UnmaskFrameInPlace(frame)

                if wsflate.IsCompressed(frame.Header) {
                    // Note that even after successful negotiation of
                    // compression extension, both sides are able to send
                    // non-compressed messages.
                    frame, err = wsflate.DecompressFrame(frame)
                    if err != nil {
                        // Handle error.
                        return
                    }
                }

                // Do something with frame...

                ack := ws.NewTextFrame([]byte("this is an acknowledgement"))

                // Compress response unconditionally.
                ack, err = wsflate.CompressFrame(ack)
                if err != nil {
                    // Handle error.
                    return
                }
                if err = ws.WriteFrame(conn, ack); err != nil {
                    // Handle error.
                    return
                }
            }
        }()
    }
}

You can use compression with wsutil package this way:

    // Upgrade somehow and negotiate compression to get the conn...

    // Initialize flate reader. We are using nil as a source io.Reader because
    // we will Reset() it in the message i/o loop below.
    fr := wsflate.NewReader(nil, func(r io.Reader) wsflate.Decompressor {
        return flate.NewReader(r)
    })
    // Initialize flate writer. We are using nil as a destination io.Writer
    // because we will Reset() it in the message i/o loop below.
    fw := wsflate.NewWriter(nil, func(w io.Writer) wsflate.Compressor {
        f, _ := flate.NewWriter(w, 9)
        return f
    })

    // Declare compression message state variable.
    //
    // It has two goals:
    // - Allow users to check whether received message is compressed or not.
    // - Help wsutil.Reader and wsutil.Writer to set/unset appropriate
    //   WebSocket header bits while writing next frame to the wire (it
    //   implements wsutil.RecvExtension and wsutil.SendExtension).
    var msg wsflate.MessageState

    // Initialize WebSocket reader as previously. 
    // Please note the use of Reader.Extensions field as well as
    // of ws.StateExtended flag.
    rd := &wsutil.Reader{
        Source:     conn,
        State:      ws.StateServerSide | ws.StateExtended,
        Extensions: []wsutil.RecvExtension{
            &msg, 
        },
    }

    // Initialize WebSocket writer with ws.StateExtended flag as well.
    wr := wsutil.NewWriter(conn, ws.StateServerSide|ws.StateExtended, 0)
    // Use the message state as wsutil.SendExtension.
    wr.SetExtensions(&msg)

    for {
        h, err := rd.NextFrame()
        if err != nil {
            // handle error.
        }
        if h.OpCode.IsControl() {
            // handle control frame.
        }
        if !msg.IsCompressed() {
            // handle uncompressed frame (skipped for the sake of example
            // simplicity).
        }

        // Reset the writer to echo same op code.
        wr.Reset(h.OpCode)

        // Reset both flate reader and writer to start the new round of i/o.
        fr.Reset(rd)
        fw.Reset(wr)

        // Copy whole message from reader to writer decompressing it and
        // compressing again.
        if _, err := io.Copy(fw, fr); err != nil {
            // handle error.
        }
        // Flush any remaining buffers from flate writer to WebSocket writer.
        if err := fw.Close(); err != nil {
            // handle error.
        }
        // Flush the whole WebSocket message to the wire.
        if err := wr.Flush(); err != nil {
            // handle error.
        }
    }

Extension points exported contracts — how you extend this code

ReadResetter (Interface)
ReadResetter is an optional interface that Decompressor can implement. [7 implementers]
wsflate/reader.go
WriteResetter (Interface)
WriteResetter is an optional interface that Compressor can implement. [7 implementers]
wsflate/writer.go
Buffer (Interface)
Buffer is an interface representing some bytes buffering object. [1 implementers]
wsflate/helper.go
RecvExtension (Interface)
RecvExtension is an interface for clearing fragment header RSV bits. [1 implementers]
wsutil/extenstion.go
HandshakeHeader (Interface)
HandshakeHeader is the interface that writes both upgrade request or response headers into a given io.Writer.
http.go
RejectOption (FuncType)
RejectOption represents an option used to control the way connection is rejected.
errors.go
FrameHandlerFunc (FuncType)
FrameHandlerFunc handles parsed frame header and its body represented by io.Reader. Note that reader represents already
wsutil/reader.go
Decompressor (Interface)
Decompressor is an interface holding deflate decompression implementation.
wsflate/reader.go

Core symbols most depended-on inside this repo

WriteString
called by 44
http.go
NewFrame
called by 40
frame.go
Bytes
called by 27
wsflate/helper.go
MustCompileFrame
called by 25
frame.go
MaskFrame
called by 17
frame.go
NewCloseFrame
called by 16
frame.go
WriteFrame
called by 15
write.go
NewTextFrame
called by 15
frame.go

Shape

Function 259
Method 155
Struct 52
TypeAlias 14
Interface 8
FuncType 5

Languages

Go100%

Modules by API surface

frame.go39 symbols
http.go35 symbols
wsutil/writer.go34 symbols
server_test.go27 symbols
wsutil/helper.go23 symbols
dialer_test.go20 symbols
wsutil/writer_test.go19 symbols
util_test.go19 symbols
wsflate/helper.go15 symbols
dialer.go15 symbols
wsutil/reader.go13 symbols
autobahn/main.go13 symbols

Dependencies from manifests, versioned

github.com/gobwas/httpheadv0.1.0 · 1×
github.com/gobwas/poolv0.2.1 · 1×
golang.org/x/sysv0.6.0 · 1×

For agents

$ claude mcp add ws \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact