()
| 469 | } |
| 470 | |
| 471 | func (d *dataNode) Serve() { |
| 472 | // wait for server is ready to serve |
| 473 | <-d.readyCh |
| 474 | |
| 475 | // start advertising to the cluster |
| 476 | d.advertise() |
| 477 | // enable archiving jobs |
| 478 | if !d.opts.ServerConfig().SchedulerOff { |
| 479 | d.memStore.GetScheduler().EnableJobType(memCom.ArchivingJobType, true) |
| 480 | d.opts.InstrumentOptions().Logger().Info("archiving jobs enabled") |
| 481 | } |
| 482 | |
| 483 | // start server |
| 484 | router := mux.NewRouter() |
| 485 | httpWrappers := append([]utils.HTTPHandlerWrapper{utils.WithMetricsFunc}, d.opts.HTTPWrappers()...) |
| 486 | schemaRouter := router.PathPrefix("/schema") |
| 487 | if d.opts.ServerConfig().Cluster.Enable { |
| 488 | schemaRouter = schemaRouter.Methods(http.MethodGet) |
| 489 | } |
| 490 | |
| 491 | d.handlers.schemaHandler.Register(schemaRouter.Subrouter(), httpWrappers...) |
| 492 | d.handlers.enumHandler.Register(router.PathPrefix("/schema").Subrouter(), httpWrappers...) |
| 493 | d.handlers.dataHandler.Register(router.PathPrefix("/data").Subrouter(), httpWrappers...) |
| 494 | d.handlers.queryHandler.Register(router.PathPrefix("/query").Subrouter(), httpWrappers...) |
| 495 | |
| 496 | router.PathPrefix("/swagger/").Handler(d.handlers.swaggerHandler) |
| 497 | router.PathPrefix("/node_modules/").Handler(d.handlers.nodeModuleHandler) |
| 498 | router.HandleFunc("/health", utils.WithMetricsFunc(d.handlers.healthCheckHandler.HealthCheck)) |
| 499 | router.HandleFunc("/version", d.handlers.healthCheckHandler.Version) |
| 500 | |
| 501 | // Support CORS calls. |
| 502 | allowOrigins := handlers.AllowedOrigins([]string{"*"}) |
| 503 | allowHeaders := handlers.AllowedHeaders([]string{"Accept", "Accept-Language", "Content-Language", "Origin", "Content-Type"}) |
| 504 | allowMethods := handlers.AllowedMethods([]string{"GET", "PUT", "POST", "DELETE", "OPTIONS"}) |
| 505 | |
| 506 | // record time from data node started to actually serving |
| 507 | d.metrics.restartTimer.Record(utils.Now().Sub(d.startedAt)) |
| 508 | |
| 509 | // start batch reporter |
| 510 | batchStatsReporter := memstore.NewBatchStatsReporter(5*60, d.memStore, d) |
| 511 | go batchStatsReporter.Run() |
| 512 | |
| 513 | d.opts.InstrumentOptions().Logger().Infof("Starting HTTP server on port %d with max connection %d", d.opts.ServerConfig().Port, d.opts.ServerConfig().HTTP.MaxConnections) |
| 514 | utils.LimitServe(d.opts.ServerConfig().Port, handlers.CORS(allowOrigins, allowHeaders, allowMethods)(mixedHandler(d.grpcServer, router)), d.opts.ServerConfig().HTTP) |
| 515 | } |
| 516 | |
| 517 | func (d *dataNode) advertise() { |
| 518 | serviceID := services.NewServiceID(). |
nothing calls this directly
no test coverage detected