MCPcopy
hub / github.com/twmb/franz-go

github.com/twmb/franz-go @v1.21.5 sqlite

repository ↗ · DeepWiki ↗ · release v1.21.5 ↗
9,127 symbols 38,094 edges 285 files 4,501 documented · 49%
README

franz-go

A complete Apache Kafka client written in Go

GoDev GitHub Discord Chat

Franz-go is an all-encompassing Apache Kafka client fully written in Go. This library aims to provide every Kafka feature from Apache Kafka v0.8.0 onward. It has support for transactions, regex topic consuming, the latest partitioning strategies, data loss detection, closest replica fetching, and more. If a client KIP exists, this library aims to support it.

This library attempts to provide an intuitive API while interacting with Kafka the way Kafka expects (timeouts, etc.).

Features

  • Feature complete client (Kafka >= 0.8.0 through v4.2+) minus the next generation group protocol, which is implemented but deliberately hidden due to concerns with the broker implementation (see the 1.19.0 release notes for how to opt in)
  • Full Exactly-Once-Semantics (EOS)
  • Idempotent & transactional producers
  • Simple (legacy) consumer
  • Group consumers with eager (roundrobin, range, sticky) and cooperative (cooperative-sticky) balancers, with optional rack-aware partition assignment (KIP-881)
  • Share group (queue) consumers (KIP-932)
  • All compression types supported: gzip, snappy, lz4, zstd
  • SSL/TLS provided through custom dialer options
  • All SASL mechanisms supported (GSSAPI/Kerberos, PLAIN, SCRAM, and OAUTHBEARER)
  • Low-level admin functionality supported through a simple Request function
  • A high-level admin package with many helper types to make cluster administration easy
  • Utilizes modern & idiomatic Go (support for contexts, variadic configuration options, ...)
  • Highly performant by avoiding channels and goroutines where not necessary
  • Written in pure Go (no wrapper lib for a C library or other bindings)
  • Ability to add detailed log messages or metrics using hooks
  • Plug-in metrics support for prometheus, zap, etc.
  • A schema registry client and convenience Serde type for encoding and decoding

Works with any Kafka compatible brokers:

  • Redpanda: the fastest and most efficient Kafka compatible event streaming platform
  • Kafka: the original Java project
  • Confluent Platform
  • Microsoft Event Hubs
  • Event Hubs does not support producing with compression; be sure to use kgo.ProducerBatchCompression(kgo.NoCompression).
  • Amazon MSK

Install

This repo contains multiple tags to allow separate features to be developed and released independently. The main client is in franz-go. Plugins are released from plugin/{plugin}. The raw-protocol package is released from pkg/kmsg, and the admin package is released from pkg/kadm.

The main client is located in the package github.com/twmb/franz-go/pkg/kgo, while the root of the project is at github.com/twmb/franz-go. There are a few extra packages within the project, as well as a few sub-modules. To use the main kgo package,

go get github.com/twmb/franz-go

To use a plugin,

go get github.com/twmb/franz-go/plugin/kzap

To use kadm,

go get github.com/twmb/franz-go/pkg/kadm

As an example, your require section in go.mod may look like this:

require (
    github.com/twmb/franz-go v1.21.0
    github.com/twmb/franz-go/pkg/kmsg v1.13.1
)

Getting started

Here's a basic overview of producing and consuming:

seeds := []string{"localhost:9092"}
// One client can both produce and consume!
// Consuming can either be direct (no consumer group), or through a group. Below, we use a group.
cl, err := kgo.NewClient(
    kgo.SeedBrokers(seeds...),
    kgo.ConsumerGroup("my-group-identifier"),
    kgo.ConsumeTopics("foo"),
)
if err != nil {
    panic(err)
}
defer cl.Close()

ctx := context.Background()

// 1.) Producing a message
// All record production goes through Produce, and the callback can be used
// to allow for synchronous or asynchronous production.
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "foo", Value: []byte("bar")}
cl.Produce(ctx, record, func(_ *kgo.Record, err error) {
    defer wg.Done()
    if err != nil {
        fmt.Printf("record had a produce error: %v\n", err)
    }

})
wg.Wait()

// Alternatively, ProduceSync exists to synchronously produce a batch of records.
if err := cl.ProduceSync(ctx, record).FirstErr(); err != nil {
    fmt.Printf("record had a produce error while synchronously producing: %v\n", err)
}

// 2.) Consuming messages from a topic
for {
    fetches := cl.PollFetches(ctx)
    if errs := fetches.Errors(); len(errs) > 0 {
        // All errors are retried internally when fetching, but non-retriable errors are
        // returned from polls so that users can notice and take action.
        panic(fmt.Sprint(errs))
    }

    // We can iterate through a record iterator...
    iter := fetches.RecordIter()
    for !iter.Done() {
        record := iter.Next()
        fmt.Println(string(record.Value), "from an iterator!")
    }

    // or a callback function.
    fetches.EachPartition(func(p kgo.FetchTopicPartition) {
        for _, record := range p.Records {
            fmt.Println(string(record.Value), "from range inside a callback!")
        }

        // We can even use a second callback!
        p.EachRecord(func(record *kgo.Record) {
            fmt.Println(string(record.Value), "from a second callback!")
        })
    })
}

This only shows producing and consuming in the most basic sense, and does not show the full list of options to customize how the client runs, nor does it show transactional producing / consuming. Check out the examples directory for more!

API reference documentation can be found on GoDev. Supplementary information can be found in the docs directory:

docs
├── admin requests — an overview of how to issue admin requests
├── metrics and logging — a small writeup on how to enable metrics & logging in franz-go, as well as a few thoughts on latency tracking
├── package layout — describes the packages in franz-go
├── producing and consuming — descriptions of producing & consuming & the guarantees
├── transactions — a description of transactions and the safety even in a pre-KIP-447 world
└── testing — describes how to write tests for your code that uses franz-go

Who uses this?

In alphabetical order,

If you use this library and want on the list above, please either open a PR or comment on #142!

Version Pinning

By default, the client issues an ApiVersions request on connect to brokers and defaults to using the maximum supported version for requests that each broker supports. If you want to pin to an exact version, you can use the MaxVersions option.

Kafka 0.10.0 introduced the ApiVersions request; if you are working with brokers older than that, you must use the kversions package. Use the MaxVersions option for the client if you do so.

Metrics & logging

Note there exists plug-in packages that allow you to easily add prometheus metrics, go-metrics, zap logging, etc. to your client! See the plugin directory for more information! These plugins are provided under dedicated modules, e.g. github.com/twmb/franz-go/plugin/kprom@v1.3.0.

The franz-go client takes a neutral approach to metrics by providing hooks that you can use to plug in your own metrics.

All connections, disconnections, reads, writes, and throttles can be hooked into, as well as per-batch produce & consume metrics. If there is an aspect of the library that you wish you could have insight into, please open an issue and we can discuss adding another hook.

Hooks allow you to log in the event of specific errors, or to trace latencies, count bytes, etc., all with your favorite monitoring systems.

In addition to hooks, logging can be plugged in with a general Logger interface. A basic logger is provided if you just want to write to a given file in a simple format. All logs have a message and then key/value pairs of supplementary information. It is recommended to always use a logger and to use LogLevelInfo.

See this example for an expansive example of integrating with prometheus! Alternatively, see this example for how to use the plug-in prometheus package!

Performance

This client is fast, and is consistently among the fastest and most cpu and memory efficient Kafka clients in Go. To measure yourself, see the bench example; the compare subdirectory has code for comparing against other clients.

Supported KIPs

Theoretically, this library supports every (non-Java-specific) client facing KIP. Any KIP that simply adds or modifies a protocol is supported by code generation.

KIP Kafka release Status
KIP-1 — Disallow acks > 1 0.8.3 Supported & Enforced
KIP-4 — Request protocol changes 0.9.0 through 0.10.1 Supported
KIP-8 — Flush method on Producer 0.8.3 Supported
KIP-12 — SASL & SSL 0.9.0 Supported
KIP-13 — Throttling (on broker) 0.9.0 Supported
KIP-15 — Close with a timeout 0.9.0 Supported (via context)
KIP-19 — Request timeouts 0.9.0 Supported
KIP-22 — Custom partitioners 0.9.0 Supported
KIP-31 — Relative offsets in message sets 0.10.0 Supported
KIP-32 — Timestamps in message set v1 0.10.0 Supported
KIP-35 — ApiVersion 0.10.0 Supported
KIP-40 — ListGroups and DescribeGroups 0.9.0 Supported
KIP-41 — max.poll.records 0.10.0 Supported (via PollRecords)
KIP-42 — Producer & consumer interceptors 0.10.0 Supported via hooks
KIP-43 — SASL PLAIN & handshake 0.10.0 Supported
KIP-48 — Delegation tokens 1.1 Supported
KIP-54 — Sticky partitioning 0.11.0 Supported
KIP-57 — Fix lz4 0.10.0 Supported
KIP-62 — background heartbeats & improvements 0.10.1 Supported
KIP-70 — On{Assigned,Revoked} 0.10.1 Supported
KIP-74 — Fetch response size limits 0.10.1 Supported
KIP-78 — ClusterID

Extension points exported contracts — how you extend this code

Opt (Interface)
Opt is an option to configure Metrics. [16 implementers]
plugin/kprom/config.go
Opt (Interface)
Opt applies options to the logger. [16 implementers]
plugin/kzap/kzap.go
Opt (Interface)
Opt applies options to the logger. [16 implementers]
plugin/klogr/klogr.go
MeterOpt (Interface)
MeterOpt interface used for setting optional config properties. [16 implementers]
plugin/kotel/meter.go
Opt (Interface)
Opt is an option to configure Metrics. [16 implementers]
plugin/kvictoria/config.go
Opt (Interface)
Opt applies options to further tune how metrics are gathered. [16 implementers]
plugin/kgmetrics/kgmetrics.go
Request (Interface)
Request represents a type that can be requested to Kafka. [95 implementers]
pkg/kmsg/api.go
ClientOpt (Interface)
ClientOpt is an option to configure a client. [16 implementers]
pkg/sr/clientopt.go

Core symbols most depended-on inside this repo

Complete
called by 660
pkg/kmsg/internal/kbin/primitives.go
AppendUvarint
called by 590
pkg/kmsg/internal/kbin/primitives.go
Len
called by 526
pkg/kmsg/api.go
AppendEach
called by 522
pkg/kmsg/api.go
internalReadTags
called by 503
pkg/kmsg/api.go
AppendInt32
called by 488
pkg/kmsg/internal/kbin/primitives.go
Int32
called by 484
pkg/kmsg/internal/kbin/primitives.go
Ok
called by 433
pkg/kmsg/internal/kbin/primitives.go

Shape

Method 5,299
Function 2,442
Struct 1,142
TypeAlias 152
Interface 79
FuncType 13

Languages

Go99%
Python1%

Modules by API surface

pkg/kmsg/generated.go4,066 symbols
pkg/kgo/client.go198 symbols
pkg/kadm/groups.go159 symbols
pkg/kgo/config.go130 symbols
pkg/kfake/groups.go124 symbols
pkg/kfake/behavior_test.go108 symbols
generate/gen.go107 symbols
pkg/kfake/persist.go103 symbols
generate/main.go97 symbols
pkg/kgo/consumer.go96 symbols
pkg/kgo/sink.go95 symbols
pkg/kgo/source.go87 symbols

Dependencies from manifests, versioned

github.com/IBM/saramav1.48.2 · 1×
github.com/VictoriaMetrics/metricsv1.43.2 · 1×
github.com/aws/aws-sdk-gov1.55.8 · 1×
github.com/beorn7/perksv1.0.1 · 1×
github.com/cespare/xxhash/v2v2.3.0 · 1×
github.com/confluentinc/confluent-kafka-gov1.9.2 · 1×
github.com/eapache/queuev1.1.0 · 1×
github.com/go-logr/logrv1.4.3 · 1×
github.com/go-logr/stdrv1.2.2 · 1×
github.com/go-viper/mapstructure/v2v2.5.0 · 1×

For agents

$ claude mcp add franz-go \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact