fix:can not fully load in cluster mode
This commit is contained in:
parent
b0dfe348bd
commit
c433539869
|
@ -457,14 +457,16 @@ func (b *browserService) OpenDatabase(server string, db int) (resp types.JSResp)
|
||||||
// @return loaded keys
|
// @return loaded keys
|
||||||
// @return next cursor
|
// @return next cursor
|
||||||
// @return scan error
|
// @return scan error
|
||||||
func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalClient, match, keyType string, cursor uint64, count int64) ([]any, uint64, error) {
|
func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalClient, match, keyType string, cursor uint64) ([]any, uint64, error) {
|
||||||
var err error
|
var err error
|
||||||
filterType := len(keyType) > 0
|
filterType := len(keyType) > 0
|
||||||
scanSize := int64(Preferences().GetScanSize())
|
scanSize := int64(Preferences().GetScanSize())
|
||||||
// define sub scan function
|
|
||||||
scan := func(ctx context.Context, cli redis.UniversalClient, count int64, appendFunc func(k []any)) error {
|
// 定义子扫描函数
|
||||||
|
scan := func(ctx context.Context, cli *redis.Client, initialCursor uint64, appendFunc func(k []any)) error {
|
||||||
var loadedKey []string
|
var loadedKey []string
|
||||||
var scanCount int64
|
cursor := initialCursor // 为每个节点使用独立的游标
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if filterType {
|
if filterType {
|
||||||
loadedKey, cursor, err = cli.ScanType(ctx, cursor, match, scanSize, keyType).Result()
|
loadedKey, cursor, err = cli.ScanType(ctx, cursor, match, scanSize, keyType).Result()
|
||||||
|
@ -473,15 +475,16 @@ func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalCli
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else {
|
|
||||||
ks := sliceutil.Map(loadedKey, func(i int) any {
|
|
||||||
return strutil.EncodeRedisKey(loadedKey[i])
|
|
||||||
})
|
|
||||||
scanCount += int64(len(ks))
|
|
||||||
appendFunc(ks)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count > 0 && scanCount > count) || cursor == 0 {
|
ks := sliceutil.Map(loadedKey, func(i int) any {
|
||||||
|
return strutil.EncodeRedisKey(loadedKey[i])
|
||||||
|
})
|
||||||
|
|
||||||
|
appendFunc(ks)
|
||||||
|
|
||||||
|
// 如果游标为0,表示扫描完成
|
||||||
|
if cursor == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -492,25 +495,33 @@ func (b *browserService) scanKeys(ctx context.Context, client redis.UniversalCli
|
||||||
if cluster, ok := client.(*redis.ClusterClient); ok {
|
if cluster, ok := client.(*redis.ClusterClient); ok {
|
||||||
// cluster mode
|
// cluster mode
|
||||||
var mutex sync.Mutex
|
var mutex sync.Mutex
|
||||||
var totalMaster int64
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
|
cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
|
||||||
totalMaster += 1
|
wg.Add(1)
|
||||||
|
go func(cli *redis.Client) {
|
||||||
|
defer wg.Done()
|
||||||
|
initialCursor := uint64(0) // Each client starts with cursor 0
|
||||||
|
err := scan(ctx, cli, initialCursor, func(k []any) {
|
||||||
|
mutex.Lock()
|
||||||
|
keys = append(keys, k...)
|
||||||
|
mutex.Unlock()
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
//Error handling, such as logging
|
||||||
|
}
|
||||||
|
}(cli)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
partCount := count / max(totalMaster, 1)
|
|
||||||
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
|
wg.Wait() // 等待所有 goroutine 完成
|
||||||
// FIXME: BUG? can not fully load in cluster mode? maybe remove the shared "cursor"
|
|
||||||
return scan(ctx, cli, partCount, func(k []any) {
|
|
||||||
mutex.Lock()
|
|
||||||
keys = append(keys, k...)
|
|
||||||
mutex.Unlock()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
err = scan(ctx, client, count, func(k []any) {
|
// 非集群模式
|
||||||
|
err = scan(ctx, client.(*redis.Client), cursor, func(k []any) {
|
||||||
keys = append(keys, k...)
|
keys = append(keys, k...)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return keys, cursor, err
|
return keys, cursor, err
|
||||||
}
|
}
|
||||||
|
@ -551,7 +562,7 @@ func (b *browserService) LoadNextKeys(server string, db int, match, keyType stri
|
||||||
exactMatch = false
|
exactMatch = false
|
||||||
}
|
}
|
||||||
|
|
||||||
client, ctx, count := item.client, item.ctx, item.stepSize
|
client, ctx := item.client, item.ctx
|
||||||
var matchKeys []any
|
var matchKeys []any
|
||||||
var maxKeys int64
|
var maxKeys int64
|
||||||
cursor := item.cursor[db]
|
cursor := item.cursor[db]
|
||||||
|
@ -563,7 +574,7 @@ func (b *browserService) LoadNextKeys(server string, db int, match, keyType stri
|
||||||
}
|
}
|
||||||
b.setClientCursor(server, db, 0)
|
b.setClientCursor(server, db, 0)
|
||||||
} else {
|
} else {
|
||||||
matchKeys, cursor, err = b.scanKeys(ctx, client, match, keyType, cursor, count)
|
matchKeys, cursor, err = b.scanKeys(ctx, client, match, keyType, cursor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Msg = err.Error()
|
resp.Msg = err.Error()
|
||||||
return
|
return
|
||||||
|
@ -604,7 +615,7 @@ func (b *browserService) LoadNextAllKeys(server string, db int, match, keyType s
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cursor := item.cursor[db]
|
cursor := item.cursor[db]
|
||||||
matchKeys, _, err = b.scanKeys(ctx, client, match, keyType, cursor, 0)
|
matchKeys, _, err = b.scanKeys(ctx, client, match, keyType, cursor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Msg = err.Error()
|
resp.Msg = err.Error()
|
||||||
return
|
return
|
||||||
|
@ -641,7 +652,7 @@ func (b *browserService) LoadAllKeys(server string, db int, match, keyType strin
|
||||||
matchKeys = []any{match}
|
matchKeys = []any{match}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
matchKeys, _, err = b.scanKeys(ctx, client, match, keyType, 0, 0)
|
matchKeys, _, err = b.scanKeys(ctx, client, match, keyType, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Msg = err.Error()
|
resp.Msg = err.Error()
|
||||||
return
|
return
|
||||||
|
@ -2309,7 +2320,7 @@ func (b *browserService) DeleteKeysByPattern(server string, db int, pattern stri
|
||||||
defer cancelFunc()
|
defer cancelFunc()
|
||||||
|
|
||||||
var ks []any
|
var ks []any
|
||||||
ks, _, err = b.scanKeys(ctx, client, pattern, "", 0, 0)
|
ks, _, err = b.scanKeys(ctx, client, pattern, "", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Msg = err.Error()
|
resp.Msg = err.Error()
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue