(t *testing.T)
| 570 | } |
| 571 | |
| 572 | func Test_CompletionCount(t *testing.T) { |
| 573 | tutils.CheckSkip(t, tutils.SkipTestArgs{Long: true}) |
| 574 | var ( |
| 575 | numSent int64 |
| 576 | numCompleted, numReceived atomic.Int64 |
| 577 | ) |
| 578 | |
| 579 | receive := func(hdr transport.ObjHdr, objReader io.Reader, err error) error { |
| 580 | cos.Assert(err == nil) |
| 581 | written, _ := io.Copy(io.Discard, objReader) |
| 582 | cos.Assert(written == hdr.ObjAttrs.Size) |
| 583 | numReceived.Inc() |
| 584 | return nil |
| 585 | } |
| 586 | callback := func(_ transport.ObjHdr, _ io.ReadCloser, _ interface{}, _ error) { |
| 587 | numCompleted.Inc() |
| 588 | } |
| 589 | |
| 590 | ts := httptest.NewServer(objmux) |
| 591 | defer ts.Close() |
| 592 | |
| 593 | trname := "cmpl-cnt" |
| 594 | err := transport.HandleObjStream(trname, receive) |
| 595 | tassert.CheckFatal(t, err) |
| 596 | defer transport.Unhandle(trname) |
| 597 | httpclient := transport.NewIntraDataClient() |
| 598 | url := ts.URL + transport.ObjURLPath(trname) |
| 599 | t.Setenv("AIS_STREAM_BURST_NUM", "256") |
| 600 | stream := transport.NewObjStream(httpclient, url, cos.GenTie(), nil) // provide for sizeable queue at any point |
| 601 | random := newRand(mono.NanoTime()) |
| 602 | rem := int64(0) |
| 603 | for idx := 0; idx < 10000; idx++ { |
| 604 | if idx%7 == 0 { |
| 605 | hdr := genStaticHeader(random) |
| 606 | hdr.ObjAttrs.Size = 0 |
| 607 | hdr.Opaque = []byte(strconv.FormatInt(104729*int64(idx), 10)) |
| 608 | stream.Send(&transport.Obj{Hdr: hdr, Callback: callback}) |
| 609 | rem = random.Int63() % 13 |
| 610 | } else { |
| 611 | hdr, rr := makeRandReader(random, false) |
| 612 | stream.Send(&transport.Obj{Hdr: hdr, Reader: rr, Callback: callback}) |
| 613 | } |
| 614 | numSent++ |
| 615 | if numSent > 5000 && rem == 3 { |
| 616 | stream.Stop() |
| 617 | break |
| 618 | } |
| 619 | } |
| 620 | // collect all pending completions until timeout |
| 621 | started := time.Now() |
| 622 | for numCompleted.Load() < numSent { |
| 623 | time.Sleep(time.Millisecond * 10) |
| 624 | if time.Since(started) > time.Second*10 { |
| 625 | break |
| 626 | } |
| 627 | } |
| 628 | if numSent == numCompleted.Load() { |
| 629 | tlog.Logf("sent %d = %d completed, %d received\n", numSent, numCompleted.Load(), numReceived.Load()) |
nothing calls this directly
no test coverage detected