@@ -3,8 +3,10 @@ package api
3
3
import (
4
4
"context"
5
5
"math/big"
6
+ "sync"
6
7
"sync/atomic"
7
8
"testing"
9
+ "time"
8
10
9
11
"github.com/stretchr/testify/require"
10
12
"google.golang.org/protobuf/proto"
@@ -35,3 +37,86 @@ func TestActionRadio(t *testing.T) {
35
37
radio .OnAdded (selp )
36
38
r .Equal (uint64 (1 ), atomic .LoadUint64 (& broadcastCount ))
37
39
}
40
+
41
+ func TestActionRadioRetry (t * testing.T ) {
42
+ r := require .New (t )
43
+ broadcastCnt := uint64 (0 )
44
+ pendings := make ([]* action.SealedEnvelope , 0 )
45
+ mutex := sync.Mutex {}
46
+ ar := NewActionRadio (func (ctx context.Context , chainID uint32 , msg proto.Message ) error {
47
+ atomic .AddUint64 (& broadcastCnt , 1 )
48
+ return nil
49
+ }, 0 , WithRetry (func () chan * action.SealedEnvelope {
50
+ ch := make (chan * action.SealedEnvelope , 1 )
51
+ go func () {
52
+ mutex .Lock ()
53
+ for _ , p := range pendings {
54
+ ch <- p
55
+ }
56
+ mutex .Unlock ()
57
+ close (ch )
58
+ }()
59
+ return ch
60
+ }, 3 , 20 * time .Millisecond ))
61
+ ar .tickInterval = 10 * time .Millisecond
62
+
63
+ r .NoError (ar .Start ())
64
+ defer ar .Stop ()
65
+
66
+ setPending := func (acts ... * action.SealedEnvelope ) {
67
+ mutex .Lock ()
68
+ pendings = acts
69
+ mutex .Unlock ()
70
+ }
71
+
72
+ tsf1 , err := action .SignedTransfer (identityset .Address (1 ).String (), identityset .PrivateKey (1 ), 1 , big .NewInt (1 ), nil , 10000 , big .NewInt (1 ))
73
+ r .NoError (err )
74
+ tsf2 , err := action .SignedTransfer (identityset .Address (1 ).String (), identityset .PrivateKey (1 ), 2 , big .NewInt (1 ), nil , 10000 , big .NewInt (1 ))
75
+ r .NoError (err )
76
+ tsf3 , err := action .SignedTransfer (identityset .Address (1 ).String (), identityset .PrivateKey (1 ), 3 , big .NewInt (1 ), nil , 10000 , big .NewInt (1 ))
77
+ r .NoError (err )
78
+
79
+ // -- case 1: retry pending actions at most 3 times
80
+ r .Equal (uint64 (0 ), atomic .LoadUint64 (& broadcastCnt ))
81
+ // add first action
82
+ ar .OnAdded (tsf1 )
83
+ r .Equal (uint64 (1 ), atomic .LoadUint64 (& broadcastCnt ))
84
+ // add second action
85
+ ar .OnAdded (tsf2 )
86
+ r .Equal (uint64 (2 ), atomic .LoadUint64 (& broadcastCnt ))
87
+ // set tsf1 as pending
88
+ time .Sleep (ar .retryInterval )
89
+ setPending (tsf1 )
90
+ // first retry after interval
91
+ time .Sleep (ar .retryInterval )
92
+ r .Equal (uint64 (3 ), atomic .LoadUint64 (& broadcastCnt ))
93
+ // retry 3 at most
94
+ time .Sleep (ar .retryInterval * 10 )
95
+ r .Equal (uint64 (5 ), atomic .LoadUint64 (& broadcastCnt ))
96
+ // tsf1 confirmed
97
+ setPending ()
98
+ ar .OnRemoved (tsf1 )
99
+
100
+ // -- case 2: retry + 1 if receive again
101
+ setPending (tsf2 )
102
+ // first retry after interval
103
+ time .Sleep (ar .retryInterval )
104
+ r .Equal (uint64 (6 ), atomic .LoadUint64 (& broadcastCnt ))
105
+ // receive tsf2 again and retry+1
106
+ ar .OnRejected (context .Background (), tsf2 , action .ErrExistedInPool )
107
+ time .Sleep (ar .retryInterval * 10 )
108
+ r .Equal (uint64 (7 ), atomic .LoadUint64 (& broadcastCnt ))
109
+
110
+ // -- case 3: ignore if receive again from API
111
+ ar .OnAdded (tsf3 )
112
+ r .Equal (uint64 (8 ), atomic .LoadUint64 (& broadcastCnt ))
113
+ time .Sleep (ar .retryInterval )
114
+ setPending (tsf3 )
115
+ // first retry after interval
116
+ time .Sleep (ar .retryInterval )
117
+ r .Equal (uint64 (9 ), atomic .LoadUint64 (& broadcastCnt ))
118
+ // receive tsf3 again from API
119
+ ar .OnRejected (WithAPIContext (context .Background ()), tsf3 , action .ErrExistedInPool )
120
+ time .Sleep (ar .retryInterval * 10 )
121
+ r .Equal (uint64 (11 ), atomic .LoadUint64 (& broadcastCnt ))
122
+ }
0 commit comments