| 20 | } |
| 21 | |
| 22 | func WorkMQ(threadID int, wg *sync.WaitGroup, conn *amqp.Connection, provider provider.StorageProvider, queue string, |
| 23 | threads int, doEnumerate bool, writeToDB bool) { |
| 24 | _, once := os.LookupEnv("TEST_MQ") // If we're being tested, exit after one bucket is scanned |
| 25 | defer wg.Done() |
| 26 | |
| 27 | // Wrap the whole thing in a for (while) loop so if the mq server kills the channel, we start it up again |
| 28 | for { |
| 29 | ch, chErr := mq.Connect(conn, queue, threads, threadID) |
| 30 | if chErr != nil { |
| 31 | FailOnError(chErr, "couldn't connect to message queue") |
| 32 | } |
| 33 | |
| 34 | msgs, consumeErr := ch.Consume(queue, fmt.Sprintf("%s_%v", queue, threadID), false, false, false, false, nil) |
| 35 | if consumeErr != nil { |
| 36 | log.Error(fmt.Errorf("failed to register a consumer: %w", consumeErr)) |
| 37 | return |
| 38 | } |
| 39 | |
| 40 | for j := range msgs { |
| 41 | bucketToScan := bucket.Bucket{} |
| 42 | |
| 43 | unmarshalErr := json.Unmarshal(j.Body, &bucketToScan) |
| 44 | if unmarshalErr != nil { |
| 45 | log.Error(unmarshalErr) |
| 46 | } |
| 47 | |
| 48 | if !bucket.IsValidS3BucketName(bucketToScan.Name) { |
| 49 | log.Info(fmt.Sprintf("invalid | %s", bucketToScan.Name)) |
| 50 | FailOnError(j.Ack(false), "failed to ack") |
| 51 | continue |
| 52 | } |
| 53 | |
| 54 | b, existsErr := provider.BucketExists(&bucketToScan) |
| 55 | if existsErr != nil { |
| 56 | log.WithFields(log.Fields{"bucket": b.Name, "step": "checkExists"}).Error(existsErr) |
| 57 | FailOnError(j.Reject(false), "failed to reject") |
| 58 | } |
| 59 | if b.Exists == bucket.BucketNotExist { |
| 60 | // ack the message and skip to the next |
| 61 | log.Infof("not_exist | %s", b.Name) |
| 62 | FailOnError(j.Ack(false), "failed to ack") |
| 63 | continue |
| 64 | } |
| 65 | |
| 66 | scanErr := provider.Scan(b, false) |
| 67 | if scanErr != nil { |
| 68 | log.WithFields(log.Fields{"bucket": b}).Error(scanErr) |
| 69 | FailOnError(j.Reject(false), "failed to reject") |
| 70 | continue |
| 71 | } |
| 72 | |
| 73 | if doEnumerate { |
| 74 | if b.PermAllUsersRead != bucket.PermissionAllowed { |
| 75 | PrintResult(&bucketToScan, false) |
| 76 | FailOnError(j.Ack(false), "failed to ack") |
| 77 | if writeToDB { |
| 78 | dbErr := db.StoreBucket(&bucketToScan) |
| 79 | if dbErr != nil { |