Skip to content

Commit e1823f8

Browse files
authored
feat: Add sync sharding (#1891)
#### Summary Goes with cloudquery/plugin-pb-go#401. ~~Still testing this so in draft~~ Part of cloudquery/cloudquery-issues#2214 (internal issue) ---
1 parent b05d24b commit e1823f8

File tree

12 files changed

+198
-18
lines changed

12 files changed

+198
-18
lines changed

examples/simple_plugin/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ require (
5454
github.com/oapi-codegen/runtime v1.1.1 // indirect
5555
github.com/pierrec/lz4/v4 v4.1.21 // indirect
5656
github.com/pmezard/go-difflib v1.0.0 // indirect
57+
github.com/samber/lo v1.47.0 // indirect
5758
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
5859
github.com/spf13/cobra v1.8.1 // indirect
5960
github.com/spf13/pflag v1.0.5 // indirect

examples/simple_plugin/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
126126
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
127127
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
128128
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
129+
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
130+
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
129131
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
130132
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
131133
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/hashicorp/go-retryablehttp v0.7.7
2323
github.com/invopop/jsonschema v0.12.0
2424
github.com/rs/zerolog v1.33.0
25+
github.com/samber/lo v1.47.0
2526
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
2627
github.com/spf13/cobra v1.8.1
2728
github.com/stretchr/testify v1.9.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
126126
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
127127
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
128128
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
129+
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
130+
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
129131
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw=
130132
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
131133
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=

internal/servers/plugin/v3/plugin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,12 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
178178
Connection: req.Backend.Connection,
179179
}
180180
}
181+
if req.Shard != nil {
182+
syncOptions.Shard = &plugin.Shard{
183+
Num: req.Shard.Num,
184+
Total: req.Shard.Total,
185+
}
186+
}
181187

182188
go func() {
183189
defer flushMetrics()

plugin/plugin_source.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,18 @@ type BackendOptions struct {
1515
Connection string
1616
}
1717

18+
type Shard struct {
19+
Num int32
20+
Total int32
21+
}
22+
1823
type SyncOptions struct {
1924
Tables []string
2025
SkipTables []string
2126
SkipDependentTables bool
2227
DeterministicCQID bool
2328
BackendOptions *BackendOptions
29+
Shard *Shard
2430
}
2531

2632
type SourceClient interface {

scheduler/scheduler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cloudquery/plugin-sdk/v4/message"
1414
"github.com/cloudquery/plugin-sdk/v4/schema"
1515
"github.com/rs/zerolog"
16+
"github.com/samber/lo"
1617
"github.com/thoas/go-funk"
1718
"go.opentelemetry.io/otel"
1819
"go.opentelemetry.io/otel/attribute"
@@ -90,6 +91,12 @@ func WithInvocationID(invocationID string) Option {
9091
}
9192
}
9293

94+
func WithShard(num int32, total int32) SyncOption {
95+
return func(s *syncClient) {
96+
s.shard = &shard{num: num, total: total}
97+
}
98+
}
99+
93100
type Client interface {
94101
ID() string
95102
}
@@ -119,6 +126,11 @@ type Scheduler struct {
119126
invocationID string
120127
}
121128

129+
type shard struct {
130+
num int32
131+
total int32
132+
}
133+
122134
type syncClient struct {
123135
tables schema.Tables
124136
client schema.ClientMeta
@@ -128,6 +140,8 @@ type syncClient struct {
128140
metrics *Metrics
129141
logger zerolog.Logger
130142
invocationID string
143+
144+
shard *shard
131145
}
132146

133147
func NewScheduler(opts ...Option) *Scheduler {
@@ -346,3 +360,24 @@ func maxDepth(tables schema.Tables) uint64 {
346360
}
347361
return depth
348362
}
363+
364+
func shardTableClients(tableClients []tableClient, shard *shard) []tableClient {
365+
// For sharding to work as expected, tableClients must be deterministic between different shards.
366+
if shard == nil || len(tableClients) == 0 {
367+
return tableClients
368+
}
369+
num := int(shard.num)
370+
total := int(shard.total)
371+
chunkSize := len(tableClients) / total
372+
if chunkSize == 0 {
373+
chunkSize = 1
374+
}
375+
chunks := lo.Chunk(tableClients, chunkSize)
376+
if num > len(chunks) {
377+
return nil
378+
}
379+
if len(chunks) > total && num == total {
380+
return append(chunks[num-1], chunks[num]...)
381+
}
382+
return chunks[num-1]
383+
}

scheduler/scheduler_debug.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
6767
}
6868
}
6969
shuffle(allClients, seed)
70+
allClients = shardTableClients(allClients, s.shard)
7071

7172
var wg sync.WaitGroup
7273
for _, tc := range allClients {

scheduler/scheduler_dfs.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,34 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
4040
s.metrics.initWithClients(table, clients)
4141
}
4242

43-
var wg sync.WaitGroup
43+
tableClients := make([]tableClient, 0)
4444
for i, table := range s.tables {
45-
table := table
46-
clients := preInitialisedClients[i]
47-
for _, client := range clients {
48-
client := client
49-
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
50-
// This means context was cancelled
51-
wg.Wait()
52-
return
53-
}
54-
wg.Add(1)
55-
go func() {
56-
defer wg.Done()
57-
defer s.scheduler.tableSems[0].Release(1)
58-
// not checking for error here as nothing much todo.
59-
// the error is logged and this happens when context is cancelled
60-
s.resolveTableDfs(ctx, table, client, nil, resolvedResources, 1)
61-
}()
45+
for _, client := range preInitialisedClients[i] {
46+
tableClients = append(tableClients, tableClient{table: table, client: client})
6247
}
6348
}
49+
tableClients = shardTableClients(tableClients, s.shard)
50+
51+
var wg sync.WaitGroup
52+
for _, tc := range tableClients {
53+
table := tc.table
54+
cl := tc.client
55+
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
56+
// This means context was cancelled
57+
wg.Wait()
58+
return
59+
}
60+
wg.Add(1)
61+
go func() {
62+
defer wg.Done()
63+
defer s.scheduler.tableSems[0].Release(1)
64+
// not checking for error here as nothing much to do.
65+
// the error is logged and this happens when context is cancelled
66+
// Round Robin currently uses the DFS algorithm to resolve the tables, but this
67+
// may change in the future.
68+
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
69+
}()
70+
}
6471

6572
// Wait for all the worker goroutines to finish
6673
wg.Wait()

scheduler/scheduler_round_robin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
3737
}
3838

3939
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
40+
tableClients = shardTableClients(tableClients, s.shard)
4041

4142
var wg sync.WaitGroup
4243
for _, tc := range tableClients {

scheduler/scheduler_shuffle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
4444
// however, if the table order changes, the seed will change and the shuffle order will be different,
4545
// so users have a little bit of control over the randomization.
4646
seed := hashTableNames(tableNames)
47+
tableClients = shardTableClients(tableClients, s.shard)
4748
shuffle(tableClients, seed)
4849

4950
var wg sync.WaitGroup

scheduler/scheduler_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,3 +481,120 @@ func TestScheduler_Cancellation(t *testing.T) {
481481
}
482482
}
483483
}
484+
485+
func Test_shardTableClients(t *testing.T) {
486+
type testCase struct {
487+
name string
488+
tableClients []tableClient
489+
shard *shard
490+
expected []tableClient
491+
}
492+
493+
tests := []testCase{
494+
{
495+
name: "nil shard returns all table clients",
496+
tableClients: []tableClient{
497+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
498+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
499+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
500+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
501+
},
502+
expected: []tableClient{
503+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
504+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
505+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
506+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
507+
},
508+
},
509+
{
510+
name: "nil table clients",
511+
tableClients: nil,
512+
shard: &shard{num: 1, total: 2},
513+
expected: nil,
514+
},
515+
{
516+
name: "empty table clients",
517+
tableClients: []tableClient{},
518+
shard: &shard{num: 1, total: 2},
519+
expected: []tableClient{},
520+
},
521+
{
522+
name: "even shard 1 of 2",
523+
tableClients: []tableClient{
524+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
525+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
526+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
527+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
528+
},
529+
shard: &shard{num: 1, total: 2},
530+
expected: []tableClient{
531+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
532+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
533+
},
534+
},
535+
{
536+
name: "even shard 2 of 2",
537+
tableClients: []tableClient{
538+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
539+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
540+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
541+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
542+
},
543+
shard: &shard{num: 2, total: 2},
544+
expected: []tableClient{
545+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
546+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
547+
},
548+
},
549+
{
550+
name: "uneven split 1 of 2",
551+
tableClients: []tableClient{
552+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
553+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
554+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
555+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
556+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
557+
},
558+
shard: &shard{num: 1, total: 2},
559+
expected: []tableClient{
560+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
561+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
562+
},
563+
},
564+
{
565+
name: "uneven split 2 of 2",
566+
tableClients: []tableClient{
567+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
568+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
569+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
570+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
571+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
572+
},
573+
shard: &shard{num: 2, total: 2},
574+
expected: []tableClient{
575+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
576+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
577+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_5"}},
578+
},
579+
},
580+
{
581+
name: "more shards than table clients",
582+
tableClients: []tableClient{
583+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_1"}},
584+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_2"}},
585+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_3"}},
586+
{client: &testExecutionClient{}, table: &schema.Table{Name: "table_4"}},
587+
},
588+
shard: &shard{num: 5, total: 100},
589+
expected: nil,
590+
},
591+
}
592+
593+
for _, tc := range tests {
594+
tc := tc
595+
t.Run(tc.name, func(t *testing.T) {
596+
actual := shardTableClients(tc.tableClients, tc.shard)
597+
require.Equal(t, tc.expected, actual)
598+
})
599+
}
600+
}

0 commit comments

Comments
 (0)