MCPcopy
hub / github.com/uber/aresdb / NewIngestionAssignmentTask

Function NewIngestionAssignmentTask

controller/tasks/etcd/ingestion_assignment.go:84–121  ·  view source on GitHub ↗

NewIngestionAssignmentTask creates a new instance of ingestionAssignmentTask

(p common.IngestionAssignmentTaskParams)

Source from the content-addressed store, hash-verified

82
83// NewIngestionAssignmentTask creates a new instance of ingestionAssignmentTask
84func NewIngestionAssignmentTask(p common.IngestionAssignmentTaskParams) common.Task {
85 var iaconfig ingestionAssignmentTaskConfig
86 logger := p.Logger.With("task", taskTagValue)
87 scope := p.Scope.Tagged(map[string]string{"task": taskTagValue})
88
89 if err := p.ConfigProvider.Get(ingestionAssignmentConfigKey).Populate(&iaconfig); err != nil {
90 logger.Fatal("failed to load config")
91 }
92
93 serviceID := services.NewServiceID().
94 SetEnvironment(p.EtcdClient.Environment).
95 SetZone(p.EtcdClient.Zone).
96 SetName(p.EtcdClient.ServiceName)
97 leaderService, err := p.EtcdClient.Services.LeaderService(serviceID, nil)
98 if err != nil {
99 logger.Fatal("failed to create leader service")
100 }
101
102 task := &ingestionAssignmentTask{
103 intervalSeconds: iaconfig.IntervalInSeconds,
104 logger: logger,
105 scope: scope,
106 stopChan: make(chan struct{}, 1),
107
108 zone: p.EtcdClient.Zone,
109 environment: p.EtcdClient.Environment,
110
111 etcdServices: p.EtcdClient.Services,
112 leaderElection: NewLeaderElector(leaderService),
113 namespaceMutator: p.NamespaceMutator,
114 jobMutator: p.JobMutator,
115 schemaMutator: p.SchemaMutator,
116 assignmentsMutator: p.AssignmentsMutator,
117 subscriberMutator: p.SubscriberMutator,
118 configHashes: make(map[string]configHash),
119 }
120 return task
121}
122
123// Run starts the ingestionAssignmentTask
124func (ia *ingestionAssignmentTask) Run() {

Callers 1

Calls 5

NewLeaderElectorFunction · 0.85
WithMethod · 0.65
GetMethod · 0.65
FatalMethod · 0.65
SetNameMethod · 0.65

Tested by 1