English | 简体中文 | 日本語 | Translators Wanted!
🎆 mochi-co/mqtt は新しい mochi-mqtt organisation の一部です. このページをお読みください
Mochi MQTT は Goで書かれたMQTT v5に完全に準拠しているMQTTブローカーで、IoTプロジェクトやテレメトリの開発プロジェクト向けに設計されています。 スタンドアロンのバイナリで使ったり、アプリケーションにライブラリとして組み込むことができ、プロジェクトのメンテナンス性と品質を確保できるように配慮しながら、 軽量で可能な限り速く動作するように設計されています。
MQTT は MQ Telemetry Transportを意味します。 Pub/Sub型のシンプルで軽量なメッセージプロトコルで、低帯域、高遅延、不安定なネットワーク下での制約を考慮して設計されています(MQTTについて詳しくはこちら)。 Mochi MQTTはMQTTプロトコルv5.0.0に完全準拠した実装をしています。
MQTTv5とそれ以前との互換性から、サーバーはv5とv3両方のクライアントを受け入れることができますが、v5とv3のクライアントが接続された場合はv5でクライアント向けの特徴と機能はv3クライアントにダウングレードされます(ユーザープロパティなど)。 MQTT v3.0.0 と v3.1.1 のサポートはハイブリッド互換性があるとみなされます。それはv3と仕様に制限されていない場合、例えば、送信メッセージ、保持メッセージの有効期限とQoSフロー制御制限などについては、よりモダンで安全なv5の動作が使用されます
クリティカルなイシュー出ない限り、新しいリリースがされるのは週末です。
Mochi MQTTはスタンドアロンのブローカーとして使うことができます。単純にこのレポジトリーをチェックアウトして、cmd/main.go を起動すると内部の cmd フォルダのエントリポイントにしてtcp (:1883), websocket (:1882), dashboard (:8080)のポートを外部にEXPOSEします。
cd cmd
go build -o mqtt && ./mqtt
Dockerレポジトリの official Mochi MQTT image から Pullして起動することができます。
docker pull mochimqtt/server
or
docker run mochimqtt/server
これは実装途中です。file-based configuration は、この実装をよりよくサポートするために開発中です。 より実質的なdockerのサポートが議論されています。Docker環境で使っている方は是非この議論に参加してください。 ここ や ここ。
cmd/main.goの Websocket, TCP, Statsサーバを実行するために、シンプルなDockerfileが提供されます。
docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
Mochi MQTTをパッケージとしてインポートするにはほんの数行のコードで始めることができます。
import (
"log"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
)
func main() {
// Create signals channel to run server until interrupted
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
// Create the new MQTT Server.
server := mqtt.New(nil)
// Allow all connections.
_ = server.AddHook(new(auth.AllowHook), nil)
// Create a TCP listener on a standard port.
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
// Run server until interrupted
<-done
// Cleanup
}
ブローカーの動作例は examplesフォルダにあります。
サーバは様々なプロトコルのコネクションのリスナーに対応しています。現在の対応リスナーは、
| Listener | Usage |
|---|---|
| listeners.NewTCP | TCPリスナー |
| listeners.NewUnixSock | Unixソケットリスナー |
| listeners.NewNet | net.Listenerリスナー |
| listeners.NewWebsocket | Websocketリスナー |
| listeners.NewHTTPStats | HTTP $SYSダッシュボード |
| listeners.NewHTTPHealthCheck | ヘルスチェック応答を提供するためのHTTPヘルスチェックリスナー(クラウドインフラ) |
新しいリスナーを開発するためには
listeners.Listenerを使ってください。使ったら是非教えてください!
TLSを設定するには*listeners.Configを渡すことができます。
examples フォルダと cmd/main.goに使用例があります。
たくさんのオプションが利用可能です。サーバーの動作を変更したり、特定の機能へのアクセスを制限することができます。
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
},
ClientNetWriteBufferSize: 4096,
ClientNetReadBufferSize: 4096,
SysTopicResendInterval: 10,
InlineClient: false,
})
mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。
必要に応じてClientNetWriteBufferSizeとClientNetReadBufferSizeはクライアントの使用するメモリに合わせて設定できます。
Capabilities.MaximumClientWritesPendingのサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。
いくつかのデフォルトの設定を決める際にいくつかの決定がなされましたのでここに記しておきます:
- デフォルトとして、敵対的なネットワーク上のDoSアタックにさらされるのを防ぐために server.Options.Capabilities.MaximumMessageExpiryIntervalは86400 (24時間)に、とセットされています。有効期限を無限にすると、保持、送信メッセージが無限に蓄積されるからです。もし信頼できる環境であったり、より大きな保存期間が可能であれば、この設定はオーバーライドできます(0 を設定すると有効期限はなくなります。)
ユニバーサルイベントフックシステムは、開発者にサーバとクライアントの様々なライフサイクルをフックすることができ、ブローカーの機能を追加/変更することができます。それらのユニバーサルフックは認証、永続ストレージ、デバッグツールなど、あらゆるものに使用されています。 フックは複数重ねることができ、サーバに複数のフックを設定することができます。それらは追加した順番に動作します。いくつかのフックは値を変えて、その値は動作コードに返される前にあとに続くフックに渡されます。
| Type | Import | Info |
|---|---|---|
| Access Control | mochi-mqtt/server/hooks/auth . AllowHook | すべてのトピックに対しての読み書きをすべてのクライアントに対して許可します。 |
| Access Control | mochi-mqtt/server/hooks/auth . Auth | ルールベースのアクセスコントロール台帳です。 |
| Persistence | mochi-mqtt/server/hooks/storage/bolt | BoltDB を使った永続ストレージ (非推奨). |
| Persistence | mochi-mqtt/server/hooks/storage/badger | BadgerDBを使った永続ストレージ |
| Persistence | mochi-mqtt/server/hooks/storage/redis | Redisを使った永続ストレージ |
| Debugging | mochi-mqtt/server/hooks/debug | パケットフローを可視化するデバッグ用のフック |
たくさんの内部関数が開発者に公開されています、なので、上記の例を使って自分でフックを作ることができます。もし作ったら是非Open an issueに投稿して教えてください!
デフォルトで、Mochi MQTTはアクセスコントロールルールにDENY-ALLを使用しています。コネクションを許可するためには、アクセスコントロールフックを上書きする必要があります。一番単純なのはauth.AllowAllフックで、ALLOW-ALLルールがすべてのコネクション、サブスクリプション、パブリッシュに適用されます。使い方は下記のようにするだけです:
server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)
もしインターネットや信頼できないネットワークにさらされる場合は行わないでください。これは開発・テスト・デバッグ用途のみであるべきです。
Auth Ledgerは構造体で定義したアクセスルールの洗練された仕組みを提供します。Auth Ledgerルール2つの形式から成ります、認証ルール(コネクション)とACLルール(パブリッシュ、サブスクライブ)です。
認証ルールは4つのクライテリアとアサーションフラグがあります: | Criteria | Usage | | -- | -- | | Client | 接続クライアントのID | | Username | 接続クライアントのユーザー名 | | Password | 接続クライアントのパスワード | | Remote | クライアントのリモートアドレスもしくはIP | | Allow | true(このユーザーを許可する)もしくはfalse(このユーザを拒否する) |
アクセスコントロールルールは3つのクライテリアとフィルターマッチがあります: | Criteria | Usage | | -- | -- | | Client | 接続クライアントのID | | Username | 接続クライアントのユーザー名 | | Remote | クライアントのリモートアドレスもしくはIP | | Filters | 合致するフィルターの配列 |
ルールはインデックス順(0,1,2,3)に処理され、はじめに合致したルールが適用されます。 hooks/auth/ledger.go の構造体を見てください。
server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
Ledger: &auth.Ledger{
Auth: auth.AuthRules{ // Auth disallows all by default
{Username: "peach", Password: "password1", Allow: true},
{Username: "melon", Password: "password2", Allow: true},
{Remote: "127.0.0.1:*", Allow: true},
{Remote: "localhost:*", Allow: true},
},
ACL: auth.ACLRules{ // ACL allows all by default
{Remote: "127.0.0.1:*"}, // local superuser allow all
{
// user melon can read and write to their own topic
Username: "melon", Filters: auth.Filters{
"melon/#": auth.ReadWrite,
"updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
},
},
{
// Otherwise, no clients have publishing permissions
Filters: auth.Filters{
"#": auth.ReadOnly,
"updates/#": auth.Deny,
},
},
},
}
})
ledgeはデータフィールドを使用してJSONもしくはYAML形式で保存したものを使用することもできます。
err := server.AddHook(new(auth.Hook), &auth.Options{
Data: data, // build ledger from byte slice: yaml or json
})
より詳しくはexamples/auth/encoded/main.goを見てください。
ブローカーに永続性を提供する基本的な Redis ストレージフックが利用可能です。他のフックと同じ方法で、いくつかのオプションを使用してサーバーに追加できます。それはフック内部で github.com/go-redis/redis/v8 を使用し、Optionsの値で詳しい設定を行うことができます。
err := server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: "localhost:6379", // default redis address
Password: "", // your password
DB: 0, // your redis db
},
})
if err != nil {
log.Fatal(err)
}
Redisフックがどのように動くか、どのように使用するかについての詳しくは、examples/persistence/redis/main.go か hooks/storage/redis のソースコードを見てください。
もしファイルベースのストレージのほうが適しているのであれば、BadgerDBストレージも使用することができます。それもまた、他のフックと同様に追加、設定することができます(オプションは若干少ないです)。
err := server.AddHook(new(badger.Hook), &badger.Options{
Path: badgerPath,
})
if err != nil {
log.Fatal(err)
}
badgerフックがどのように動くか、どのように使用するかについての詳しくは、examples/persistence/badger/main.go か hooks/storage/badger のソースコードを見てください。
BoltDBフックはBadgerに代わって非推奨となりましたが、もし必要ならば examples/persistence/bolt/main.goをチェックしてください。
ブローカーとクライアントのライフサイクルに関わるたくさんのフックが利用できます。
そのすべてのフックとmqtt.Hookインターフェイスの関数シグネチャはhooks.goに記載されています。
もっと柔軟なイベントフックはOnPacketRead、OnPacketEncodeとOnPacketSentです。それらは、すべての流入パケットと流出パケットをコントロール及び変更に使用されるフックです。
| Function | Usage |
|---|---|
| OnStarted | サーバーが正常にスタートした際に呼ばれます。 |
| OnStopped | サーバーが正常に終了した際に呼ばれます。 |
| OnConnectAuthenticate | ユーザーがサーバと認証を試みた際に呼ばれます。このメソッドはサーバーへのアクセス |
$ claude mcp add server \
-- python -m otcore.mcp_server <graph>