MCPcopy
hub / github.com/NVIDIA/aistore / Test_CompletionCount

Function Test_CompletionCount

transport/obj_test.go:572–633  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

570}
571
572func Test_CompletionCount(t *testing.T) {
573 tutils.CheckSkip(t, tutils.SkipTestArgs{Long: true})
574 var (
575 numSent int64
576 numCompleted, numReceived atomic.Int64
577 )
578
579 receive := func(hdr transport.ObjHdr, objReader io.Reader, err error) error {
580 cos.Assert(err == nil)
581 written, _ := io.Copy(io.Discard, objReader)
582 cos.Assert(written == hdr.ObjAttrs.Size)
583 numReceived.Inc()
584 return nil
585 }
586 callback := func(_ transport.ObjHdr, _ io.ReadCloser, _ interface{}, _ error) {
587 numCompleted.Inc()
588 }
589
590 ts := httptest.NewServer(objmux)
591 defer ts.Close()
592
593 trname := "cmpl-cnt"
594 err := transport.HandleObjStream(trname, receive)
595 tassert.CheckFatal(t, err)
596 defer transport.Unhandle(trname)
597 httpclient := transport.NewIntraDataClient()
598 url := ts.URL + transport.ObjURLPath(trname)
599 t.Setenv("AIS_STREAM_BURST_NUM", "256")
600 stream := transport.NewObjStream(httpclient, url, cos.GenTie(), nil) // provide for sizeable queue at any point
601 random := newRand(mono.NanoTime())
602 rem := int64(0)
603 for idx := 0; idx < 10000; idx++ {
604 if idx%7 == 0 {
605 hdr := genStaticHeader(random)
606 hdr.ObjAttrs.Size = 0
607 hdr.Opaque = []byte(strconv.FormatInt(104729*int64(idx), 10))
608 stream.Send(&transport.Obj{Hdr: hdr, Callback: callback})
609 rem = random.Int63() % 13
610 } else {
611 hdr, rr := makeRandReader(random, false)
612 stream.Send(&transport.Obj{Hdr: hdr, Reader: rr, Callback: callback})
613 }
614 numSent++
615 if numSent > 5000 && rem == 3 {
616 stream.Stop()
617 break
618 }
619 }
620 // collect all pending completions until timeout
621 started := time.Now()
622 for numCompleted.Load() < numSent {
623 time.Sleep(time.Millisecond * 10)
624 if time.Since(started) > time.Second*10 {
625 break
626 }
627 }
628 if numSent == numCompleted.Load() {
629 tlog.Logf("sent %d = %d completed, %d received\n", numSent, numCompleted.Load(), numReceived.Load())

Callers

nothing calls this directly

Calls 15

CheckSkipFunction · 0.92
AssertFunction · 0.92
HandleObjStreamFunction · 0.92
CheckFatalFunction · 0.92
UnhandleFunction · 0.92
NewIntraDataClientFunction · 0.92
ObjURLPathFunction · 0.92
NewObjStreamFunction · 0.92
GenTieFunction · 0.92
NanoTimeFunction · 0.92
LogfFunction · 0.92
newRandFunction · 0.85

Tested by

no test coverage detected