MCPcopy
hub / github.com/mochi-mqtt/server

github.com/mochi-mqtt/server @v2.7.9 sqlite

repository ↗ · DeepWiki ↗ · release v2.7.9 ↗
1,539 symbols 7,922 edges 78 files 666 documented · 43%
README

Mochi-MQTT Server

build status Coverage Status Go Report Card Go Reference contributions welcome

English | 简体中文 | 日本語 | Translators Wanted!

🎆 mochi-co/mqtt は新しい mochi-mqtt organisation の一部です. このページをお読みください

Mochi-MQTTは MQTT v5 (と v3.1.1)に完全に準拠しているアプリケーションに組み込み可能なハイパフォーマンスなbroker/serverです.

Mochi MQTT は Goで書かれたMQTT v5に完全に準拠しているMQTTブローカーで、IoTプロジェクトやテレメトリの開発プロジェクト向けに設計されています。 スタンドアロンのバイナリで使ったり、アプリケーションにライブラリとして組み込むことができ、プロジェクトのメンテナンス性と品質を確保できるように配慮しながら、 軽量で可能な限り速く動作するように設計されています。

MQTTとは?

MQTT は MQ Telemetry Transportを意味します。 Pub/Sub型のシンプルで軽量なメッセージプロトコルで、低帯域、高遅延、不安定なネットワーク下での制約を考慮して設計されています(MQTTについて詳しくはこちら)。 Mochi MQTTはMQTTプロトコルv5.0.0に完全準拠した実装をしています。

Mochi-MQTTのもつ機能

  • MQTTv5への完全な準拠とMQTT v3.1.1 および v3.0.0 との互換性:
    • MQTT v5で拡張されたユーザープロパティ
    • トピック・エイリアス
    • 共有サブスクリプション
    • サブスクリプションオプションとサブスクリプションID
    • メッセージの有効期限
    • クライアントセッション
    • 送受信QoSフロー制御クォータ
    • サーバサイド切断と認証パケット
    • Will遅延間隔
    • 上記に加えてQoS(0,1,2)、$SYSトピック、retain機能などすべてのMQTT v1の特徴を持ちます
  • Developer-centric:
    • 開発者が制御できるように、ほとんどのコアブローカーのコードをエクスポートにしてアクセスできるようにしました。
    • フル機能で柔軟なフックベースのインターフェイスにすることで簡単に'プラグイン'を開発できるようにしました。
    • 特別なインラインクライアントを利用することでパケットインジェクションを行うか、既存のクライアントとしてマスカレードすることができます。
  • パフォーマンスと安定性:
    • 古典的なツリーベースのトピックサブスクリプションモデル
    • クライアント固有に書き込みバッファーをもたせることにより、読み込みの遅さや不規則なクライアントの挙動の問題を回避しています。
    • MQTT v5 and MQTT v3のすべてのPaho互換性テストをpassしています。
    • 慎重に検討された多くのユニットテストシナリオでテストされています。
  • TCP, Websocket (SSL/TLSを含む), $SYSのダッシュボードリスナー
  • フックを利用した保存機能としてRedis, Badger, Boltを使うことができます(自作のHookも可能です)。
  • フックを利用したルールベース認証機能とアクセス制御リストLedgerを使うことができます(自作のHookも可能です)。

互換性に関する注意事項

MQTTv5とそれ以前との互換性から、サーバーはv5とv3両方のクライアントを受け入れることができますが、v5とv3のクライアントが接続された場合はv5でクライアント向けの特徴と機能はv3クライアントにダウングレードされます(ユーザープロパティなど)。 MQTT v3.0.0 と v3.1.1 のサポートはハイブリッド互換性があるとみなされます。それはv3と仕様に制限されていない場合、例えば、送信メッセージ、保持メッセージの有効期限とQoSフロー制御制限などについては、よりモダンで安全なv5の動作が使用されます

リリースされる時期について

クリティカルなイシュー出ない限り、新しいリリースがされるのは週末です。

Roadmap

  • 新しい特徴やイベントフックのリクエストは open an issue へ!
  • クラスターのサポート
  • メトリックスサポートの強化
  • ファイルベースの設定(Dockerイメージのサポート)

Quick Start

GoでのBrokerの動かし方

Mochi MQTTはスタンドアロンのブローカーとして使うことができます。単純にこのレポジトリーをチェックアウトして、cmd/main.go を起動すると内部の cmd フォルダのエントリポイントにしてtcp (:1883), websocket (:1882), dashboard (:8080)のポートを外部にEXPOSEします。

cd cmd
go build -o mqtt && ./mqtt

Dockerで利用する

Dockerレポジトリの official Mochi MQTT image から Pullして起動することができます。

docker pull mochimqtt/server
or
docker run mochimqtt/server

これは実装途中です。file-based configuration は、この実装をよりよくサポートするために開発中です。 より実質的なdockerのサポートが議論されています。Docker環境で使っている方は是非この議論に参加してください。 ここここ

cmd/main.goの Websocket, TCP, Statsサーバを実行するために、シンプルなDockerfileが提供されます。

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

Mochi MQTTを使って開発するには

パッケージをインポート

Mochi MQTTをパッケージとしてインポートするにはほんの数行のコードで始めることができます。

import (
  "log"

  mqtt "github.com/mochi-mqtt/server/v2"
  "github.com/mochi-mqtt/server/v2/hooks/auth"
  "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
  // Create signals channel to run server until interrupted
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  // Create the new MQTT Server.
  server := mqtt.New(nil)

  // Allow all connections.
  _ = server.AddHook(new(auth.AllowHook), nil)

  // Create a TCP listener on a standard port.
  tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }


  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // Run server until interrupted
  <-done

  // Cleanup
}

ブローカーの動作例は examplesフォルダにあります。

Network Listeners

サーバは様々なプロトコルのコネクションのリスナーに対応しています。現在の対応リスナーは、

Listener Usage
listeners.NewTCP TCPリスナー
listeners.NewUnixSock Unixソケットリスナー
listeners.NewNet net.Listenerリスナー
listeners.NewWebsocket Websocketリスナー
listeners.NewHTTPStats HTTP $SYSダッシュボード
listeners.NewHTTPHealthCheck ヘルスチェック応答を提供するためのHTTPヘルスチェックリスナー(クラウドインフラ)

新しいリスナーを開発するためには listeners.Listener を使ってください。使ったら是非教えてください!

TLSを設定するには*listeners.Configを渡すことができます。

examples フォルダと cmd/main.goに使用例があります。

設定できるオプションと機能

たくさんのオプションが利用可能です。サーバーの動作を変更したり、特定の機能へのアクセスを制限することができます。

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    MaximumClientWritesPending: 3,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 4096,
  ClientNetReadBufferSize: 4096,
  SysTopicResendInterval: 10,
  InlineClient: false,
})

mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。 必要に応じてClientNetWriteBufferSizeClientNetReadBufferSizeはクライアントの使用するメモリに合わせて設定できます。 Capabilities.MaximumClientWritesPendingのサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。

デフォルト設定に関する注意事項

いくつかのデフォルトの設定を決める際にいくつかの決定がなされましたのでここに記しておきます: - デフォルトとして、敵対的なネットワーク上のDoSアタックにさらされるのを防ぐために server.Options.Capabilities.MaximumMessageExpiryIntervalは86400 (24時間)に、とセットされています。有効期限を無限にすると、保持、送信メッセージが無限に蓄積されるからです。もし信頼できる環境であったり、より大きな保存期間が可能であれば、この設定はオーバーライドできます(0 を設定すると有効期限はなくなります。)

Event Hooks

ユニバーサルイベントフックシステムは、開発者にサーバとクライアントの様々なライフサイクルをフックすることができ、ブローカーの機能を追加/変更することができます。それらのユニバーサルフックは認証、永続ストレージ、デバッグツールなど、あらゆるものに使用されています。 フックは複数重ねることができ、サーバに複数のフックを設定することができます。それらは追加した順番に動作します。いくつかのフックは値を変えて、その値は動作コードに返される前にあとに続くフックに渡されます。

Type Import Info
Access Control mochi-mqtt/server/hooks/auth . AllowHook すべてのトピックに対しての読み書きをすべてのクライアントに対して許可します。
Access Control mochi-mqtt/server/hooks/auth . Auth ルールベースのアクセスコントロール台帳です。
Persistence mochi-mqtt/server/hooks/storage/bolt BoltDB を使った永続ストレージ (非推奨).
Persistence mochi-mqtt/server/hooks/storage/badger BadgerDBを使った永続ストレージ
Persistence mochi-mqtt/server/hooks/storage/redis Redisを使った永続ストレージ
Debugging mochi-mqtt/server/hooks/debug パケットフローを可視化するデバッグ用のフック

たくさんの内部関数が開発者に公開されています、なので、上記の例を使って自分でフックを作ることができます。もし作ったら是非Open an issueに投稿して教えてください!

アクセスコントロール

Allow Hook

デフォルトで、Mochi MQTTはアクセスコントロールルールにDENY-ALLを使用しています。コネクションを許可するためには、アクセスコントロールフックを上書きする必要があります。一番単純なのはauth.AllowAllフックで、ALLOW-ALLルールがすべてのコネクション、サブスクリプション、パブリッシュに適用されます。使い方は下記のようにするだけです:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

もしインターネットや信頼できないネットワークにさらされる場合は行わないでください。これは開発・テスト・デバッグ用途のみであるべきです。

Auth Ledger

Auth Ledgerは構造体で定義したアクセスルールの洗練された仕組みを提供します。Auth Ledgerルール2つの形式から成ります、認証ルール(コネクション)とACLルール(パブリッシュ、サブスクライブ)です。

認証ルールは4つのクライテリアとアサーションフラグがあります: | Criteria | Usage | | -- | -- | | Client | 接続クライアントのID | | Username | 接続クライアントのユーザー名 | | Password | 接続クライアントのパスワード | | Remote | クライアントのリモートアドレスもしくはIP | | Allow | true(このユーザーを許可する)もしくはfalse(このユーザを拒否する) |

アクセスコントロールルールは3つのクライテリアとフィルターマッチがあります: | Criteria | Usage | | -- | -- | | Client | 接続クライアントのID | | Username | 接続クライアントのユーザー名 | | Remote | クライアントのリモートアドレスもしくはIP | | Filters | 合致するフィルターの配列 |

ルールはインデックス順(0,1,2,3)に処理され、はじめに合致したルールが適用されます。 hooks/auth/ledger.go の構造体を見てください。

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth disallows all by default
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL allows all by default
      {Remote: "127.0.0.1:*"}, // local superuser allow all
      {
        // user melon can read and write to their own topic
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
        },
      },
      {
        // Otherwise, no clients have publishing permissions
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

ledgeはデータフィールドを使用してJSONもしくはYAML形式で保存したものを使用することもできます。

err := server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // build ledger from byte slice: yaml or json
})

より詳しくはexamples/auth/encoded/main.goを見てください。

永続ストレージ

Redis

ブローカーに永続性を提供する基本的な Redis ストレージフックが利用可能です。他のフックと同じ方法で、いくつかのオプションを使用してサーバーに追加できます。それはフック内部で github.com/go-redis/redis/v8 を使用し、Optionsの値で詳しい設定を行うことができます。

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // default redis address
    Password: "",               // your password
    DB:       0,                // your redis db
  },
})
if err != nil {
  log.Fatal(err)
}

Redisフックがどのように動くか、どのように使用するかについての詳しくは、examples/persistence/redis/main.gohooks/storage/redis のソースコードを見てください。

Badger DB

もしファイルベースのストレージのほうが適しているのであれば、BadgerDBストレージも使用することができます。それもまた、他のフックと同様に追加、設定することができます(オプションは若干少ないです)。

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

badgerフックがどのように動くか、どのように使用するかについての詳しくは、examples/persistence/badger/main.gohooks/storage/badger のソースコードを見てください。

BoltDBフックはBadgerに代わって非推奨となりましたが、もし必要ならば examples/persistence/bolt/main.goをチェックしてください。

イベントフックを利用した開発

ブローカーとクライアントのライフサイクルに関わるたくさんのフックが利用できます。 そのすべてのフックとmqtt.Hookインターフェイスの関数シグネチャはhooks.goに記載されています。

もっと柔軟なイベントフックはOnPacketRead、OnPacketEncodeとOnPacketSentです。それらは、すべての流入パケットと流出パケットをコントロール及び変更に使用されるフックです。

Function Usage
OnStarted サーバーが正常にスタートした際に呼ばれます。
OnStopped サーバーが正常に終了した際に呼ばれます。
OnConnectAuthenticate ユーザーがサーバと認証を試みた際に呼ばれます。このメソッドはサーバーへのアクセス

Extension points exported contracts — how you extend this code

Listener (Interface)
Listener is an interface for network listeners. A network listener listens for incoming client connections and adds them [7 …
listeners/listeners.go
Serializable (Interface)
Serializable is an interface for objects that can be serialized and deserialized. [4 implementers]
hooks/storage/storage.go
Hook (Interface)
Hook provides an interface of handlers for different events which occur during the lifecycle of the broker. [1 implementers]
hooks.go
ReadFn (FuncType)
ReadFn is the function signature for the function used for reading and processing new packets.
clients.go
InlineSubFn (FuncType)
InlineSubFn is the signature for a callback function which will be called when an inline client receives a message on a
topics.go
BufferPool (Interface)
(no doc) [2 implementers]
mempool/bufpool.go
EstablishFn (FuncType)
EstablishFn is a callback function for establishing new clients.
listeners/listeners.go
Serializable (Interface)
Serializable is an interface for objects that can be serialized and deserialized. [4 implementers]
hooks/storage/badger/badger.go

Core symbols most depended-on inside this repo

Get
called by 368
mempool/bufpool.go
Error
called by 268
packets/codes.go
Close
called by 249
listeners/listeners.go
SetOpts
called by 169
hooks.go
Provides
called by 118
hooks.go
Add
called by 116
hooks.go
Init
called by 111
hooks.go
get
called by 110
topics.go

Shape

Function 838
Method 592
Struct 90
TypeAlias 10
Interface 5
FuncType 4

Languages

Go100%

Modules by API surface

server_test.go183 symbols
hooks.go131 symbols
topics.go65 symbols
topics_test.go63 symbols
server.go62 symbols
hooks/storage/redis/redis_test.go59 symbols
hooks/storage/badger/badger_test.go59 symbols
hooks/storage/pebble/pebble_test.go58 symbols
hooks/storage/bolt/bolt_test.go58 symbols
hooks_test.go57 symbols
packets/packets.go55 symbols
clients_test.go48 symbols

Dependencies from manifests, versioned

github.com/DataDog/zstdv1.4.5 · 1×
github.com/alicebob/gopher-jsonv0.0.0-2020052007255 · 1×
github.com/alicebob/miniredis/v2v2.23.0 · 1×
github.com/beorn7/perksv1.0.1 · 1×
github.com/cespare/xxhash/v2v2.2.0 · 1×
github.com/cockroachdb/errorsv1.11.1 · 1×
github.com/cockroachdb/logtagsv0.0.0-2023011820175 · 1×
github.com/cockroachdb/pebblev1.1.0 · 1×
github.com/cockroachdb/redactv1.1.5 · 1×
github.com/cockroachdb/tokenbucketv0.0.0-2023080717453 · 1×
github.com/davecgh/go-spewv1.1.1 · 1×

For agents

$ claude mcp add server \
  -- python -m otcore.mcp_server <graph>

⬇ download graph artifact