Skip to content

Commit 02684d8

Browse files
committed
add test
1 parent 8a6425b commit 02684d8

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed

api/action_radio_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package api
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"google.golang.org/protobuf/proto"
12+
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/iotexproject/iotex-core/v2/action"
16+
"github.com/iotexproject/iotex-core/v2/test/identityset"
17+
)
18+
19+
func TestActionRadioRetry(t *testing.T) {
20+
r := require.New(t)
21+
broadcastCnt := uint64(0)
22+
pendings := make([]*action.SealedEnvelope, 0)
23+
mutex := sync.Mutex{}
24+
ar := NewActionRadio(func(ctx context.Context, chainID uint32, msg proto.Message) error {
25+
atomic.AddUint64(&broadcastCnt, 1)
26+
return nil
27+
}, 0, WithRetry(func() chan *action.SealedEnvelope {
28+
ch := make(chan *action.SealedEnvelope, 1)
29+
go func() {
30+
mutex.Lock()
31+
for _, p := range pendings {
32+
ch <- p
33+
}
34+
mutex.Unlock()
35+
close(ch)
36+
}()
37+
return ch
38+
}, 3, 20*time.Millisecond))
39+
ar.tickInterval = 10 * time.Millisecond
40+
41+
r.NoError(ar.Start())
42+
defer ar.Stop()
43+
44+
setPending := func(acts ...*action.SealedEnvelope) {
45+
mutex.Lock()
46+
pendings = acts
47+
mutex.Unlock()
48+
}
49+
50+
tsf1, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, 10000, big.NewInt(1))
51+
r.NoError(err)
52+
tsf2, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 2, big.NewInt(1), nil, 10000, big.NewInt(1))
53+
r.NoError(err)
54+
tsf3, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 3, big.NewInt(1), nil, 10000, big.NewInt(1))
55+
r.NoError(err)
56+
57+
// -- case 1: retry pending actions at most 3 times
58+
r.Equal(uint64(0), atomic.LoadUint64(&broadcastCnt))
59+
// add first action
60+
ar.OnAdded(tsf1)
61+
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCnt))
62+
// add second action
63+
ar.OnAdded(tsf2)
64+
r.Equal(uint64(2), atomic.LoadUint64(&broadcastCnt))
65+
// set tsf1 as pending
66+
time.Sleep(ar.retryInterval)
67+
setPending(tsf1)
68+
// first retry after interval
69+
time.Sleep(ar.retryInterval)
70+
r.Equal(uint64(3), atomic.LoadUint64(&broadcastCnt))
71+
// retry 3 at most
72+
time.Sleep(ar.retryInterval * 10)
73+
r.Equal(uint64(5), atomic.LoadUint64(&broadcastCnt))
74+
// tsf1 confirmed
75+
setPending()
76+
ar.OnRemoved(tsf1)
77+
78+
// -- case 2: retry + 1 if receive again
79+
setPending(tsf2)
80+
// first retry after interval
81+
time.Sleep(ar.retryInterval)
82+
r.Equal(uint64(6), atomic.LoadUint64(&broadcastCnt))
83+
// receive tsf2 again and retry+1
84+
ar.OnRejected(context.Background(), tsf2, action.ErrExistedInPool)
85+
time.Sleep(ar.retryInterval * 10)
86+
r.Equal(uint64(7), atomic.LoadUint64(&broadcastCnt))
87+
88+
// -- case 3: ignore if receive again from API
89+
ar.OnAdded(tsf3)
90+
r.Equal(uint64(8), atomic.LoadUint64(&broadcastCnt))
91+
time.Sleep(ar.retryInterval)
92+
setPending(tsf3)
93+
// first retry after interval
94+
time.Sleep(ar.retryInterval)
95+
r.Equal(uint64(9), atomic.LoadUint64(&broadcastCnt))
96+
// receive tsf3 again from API
97+
ar.OnRejected(WithAPIContext(context.Background()), tsf3, action.ErrExistedInPool)
98+
time.Sleep(ar.retryInterval * 10)
99+
r.Equal(uint64(11), atomic.LoadUint64(&broadcastCnt))
100+
}

0 commit comments

Comments
 (0)