Spin up the multiprocess event returner
(self)
| 1240 | self._warned_returners[event_return] = now |
| 1241 | |
| 1242 | def run(self): |
| 1243 | """ |
| 1244 | Spin up the multiprocess event returner |
| 1245 | """ |
| 1246 | if self.opts["event_return_niceness"] and not salt.utils.platform.is_windows(): |
| 1247 | log.info( |
| 1248 | "setting EventReturn niceness to %i", self.opts["event_return_niceness"] |
| 1249 | ) |
| 1250 | os.nice(self.opts["event_return_niceness"]) |
| 1251 | |
| 1252 | self.event = get_event("master", opts=self.opts, listen=True) |
| 1253 | events = self.event.iter_events(full=True) |
| 1254 | self.event.fire_event({}, "salt/event_listen/start") |
| 1255 | try: |
| 1256 | # events below is a generator, we will iterate until we get the salt/event/exit tag |
| 1257 | oldestevent = None |
| 1258 | for event in events: |
| 1259 | |
| 1260 | if event["tag"] == "salt/event/exit": |
| 1261 | # We're done eventing |
| 1262 | self.stop = True |
| 1263 | if self._filter( |
| 1264 | event, |
| 1265 | allow=self.opts["event_return_whitelist"], |
| 1266 | deny=self.opts["event_return_blacklist"], |
| 1267 | ): |
| 1268 | # This event passed the filter, add it to the queue |
| 1269 | self.event_queue.append(event) |
| 1270 | too_long_in_queue = False |
| 1271 | |
| 1272 | # if max_seconds is >0, then we want to make sure we flush the queue |
| 1273 | # every event_return_queue_max_seconds seconds, If it's 0, don't |
| 1274 | # apply any of this logic |
| 1275 | if self.event_return_queue_max_seconds > 0: |
| 1276 | rightnow = datetime.datetime.now() |
| 1277 | if not oldestevent: |
| 1278 | oldestevent = rightnow |
| 1279 | age_in_seconds = (rightnow - oldestevent).seconds |
| 1280 | if age_in_seconds > 0: |
| 1281 | log.debug( |
| 1282 | "Oldest event in queue is %s seconds old.", age_in_seconds |
| 1283 | ) |
| 1284 | if age_in_seconds >= self.event_return_queue_max_seconds: |
| 1285 | too_long_in_queue = True |
| 1286 | oldestevent = None |
| 1287 | else: |
| 1288 | too_long_in_queue = False |
| 1289 | |
| 1290 | if too_long_in_queue: |
| 1291 | log.debug( |
| 1292 | "Oldest event has been in queue too long, will flush queue" |
| 1293 | ) |
| 1294 | |
| 1295 | # If we are over the max queue size or the oldest item in the queue has been there too long |
| 1296 | # then flush the queue |
| 1297 | if ( |
| 1298 | len(self.event_queue) >= self.event_return_queue |
| 1299 | or too_long_in_queue |
nothing calls this directly
no test coverage detected