MCPcopy
hub / github.com/sa7mon/S3Scanner / WorkMQ

Function WorkMQ

worker/mq_worker.go:22–118  ·  view source on GitHub ↗
(threadID int, wg *sync.WaitGroup, conn *amqp.Connection, provider provider.StorageProvider, queue string,
	threads int, doEnumerate bool, writeToDB bool)

Source from the content-addressed store, hash-verified

20}
21
22func WorkMQ(threadID int, wg *sync.WaitGroup, conn *amqp.Connection, provider provider.StorageProvider, queue string,
23 threads int, doEnumerate bool, writeToDB bool) {
24 _, once := os.LookupEnv("TEST_MQ") // If we're being tested, exit after one bucket is scanned
25 defer wg.Done()
26
27 // Wrap the whole thing in a for (while) loop so if the mq server kills the channel, we start it up again
28 for {
29 ch, chErr := mq.Connect(conn, queue, threads, threadID)
30 if chErr != nil {
31 FailOnError(chErr, "couldn't connect to message queue")
32 }
33
34 msgs, consumeErr := ch.Consume(queue, fmt.Sprintf("%s_%v", queue, threadID), false, false, false, false, nil)
35 if consumeErr != nil {
36 log.Error(fmt.Errorf("failed to register a consumer: %w", consumeErr))
37 return
38 }
39
40 for j := range msgs {
41 bucketToScan := bucket.Bucket{}
42
43 unmarshalErr := json.Unmarshal(j.Body, &bucketToScan)
44 if unmarshalErr != nil {
45 log.Error(unmarshalErr)
46 }
47
48 if !bucket.IsValidS3BucketName(bucketToScan.Name) {
49 log.Info(fmt.Sprintf("invalid | %s", bucketToScan.Name))
50 FailOnError(j.Ack(false), "failed to ack")
51 continue
52 }
53
54 b, existsErr := provider.BucketExists(&bucketToScan)
55 if existsErr != nil {
56 log.WithFields(log.Fields{"bucket": b.Name, "step": "checkExists"}).Error(existsErr)
57 FailOnError(j.Reject(false), "failed to reject")
58 }
59 if b.Exists == bucket.BucketNotExist {
60 // ack the message and skip to the next
61 log.Infof("not_exist | %s", b.Name)
62 FailOnError(j.Ack(false), "failed to ack")
63 continue
64 }
65
66 scanErr := provider.Scan(b, false)
67 if scanErr != nil {
68 log.WithFields(log.Fields{"bucket": b}).Error(scanErr)
69 FailOnError(j.Reject(false), "failed to reject")
70 continue
71 }
72
73 if doEnumerate {
74 if b.PermAllUsersRead != bucket.PermissionAllowed {
75 PrintResult(&bucketToScan, false)
76 FailOnError(j.Ack(false), "failed to ack")
77 if writeToDB {
78 dbErr := db.StoreBucket(&bucketToScan)
79 if dbErr != nil {

Callers 2

RunFunction · 0.92
TestMqWorkFunction · 0.85

Calls 8

ConnectFunction · 0.92
IsValidS3BucketNameFunction · 0.92
StoreBucketFunction · 0.92
FailOnErrorFunction · 0.85
PrintResultFunction · 0.85
BucketExistsMethod · 0.65
ScanMethod · 0.65
EnumerateMethod · 0.65

Tested by 1

TestMqWorkFunction · 0.68