| 31 | } |
| 32 | |
| 33 | func main() { |
| 34 | // 1. 连接 MySQL 数据库 |
| 35 | cftool.Register[ServerConfig](fmt.Sprintf("root:%s", confPath)) |
| 36 | cftool.ReadFile(confPath) |
| 37 | |
| 38 | handler := &NSQHandler{} |
| 39 | autowire.Autowired(handler) |
| 40 | err := autowire.CheckComplete() |
| 41 | if err != nil { |
| 42 | log.Fatal("check autowired:", err) |
| 43 | return |
| 44 | } |
| 45 | // 2. 创建 NSQ 消费者 |
| 46 | config := nsq.NewConfig() |
| 47 | hostname, err := os.Hostname() |
| 48 | if err != nil { |
| 49 | log.Fatalf("Failed to get hostname: %v", err) |
| 50 | return |
| 51 | } |
| 52 | nsqConfig := handler.nsqConfig |
| 53 | consumer, err := nsq.NewConsumer(fmt.Sprintf("%s_ai_event", nsqConfig.TopicPrefix), hostname, config) |
| 54 | if err != nil { |
| 55 | log.Fatalf("Failed to create NSQ consumer: %v", err) |
| 56 | } |
| 57 | |
| 58 | consumer.AddHandler(handler) |
| 59 | |
| 60 | // 4. 连接到 NSQ |
| 61 | //nsqAddress := "172.18.166.219:9150" // NSQ 地址 |
| 62 | err = consumer.ConnectToNSQD(nsqConfig.Addr) |
| 63 | if err != nil { |
| 64 | log.Fatalf("Failed to connect to NSQ: %v", err) |
| 65 | } |
| 66 | log.Println("Connected to NSQ") |
| 67 | |
| 68 | // 5. 捕获系统信号,优雅关闭 |
| 69 | sigChan := make(chan os.Signal, 1) |
| 70 | signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) |
| 71 | <-sigChan |
| 72 | |
| 73 | // 优雅停止消费者 |
| 74 | consumer.Stop() |
| 75 | <-consumer.StopChan |
| 76 | log.Println("NSQ Consumer stopped") |
| 77 | } |