From f98229b9fad72546c0ccc7692ab7dadd4cee536a Mon Sep 17 00:00:00 2001 From: Lykin <137850705+tiny-craft@users.noreply.github.com> Date: Tue, 19 Dec 2023 20:10:01 +0800 Subject: [PATCH] refactor: refactor the method for delete selected keys --- backend/services/browser_service.go | 198 +++++++++++++----- .../components/dialogs/DeleteKeyDialog.vue | 3 +- frontend/src/langs/en-us.json | 4 +- frontend/src/langs/zh-cn.json | 4 +- frontend/src/stores/browser.js | 70 +++++-- 5 files changed, 202 insertions(+), 77 deletions(-) diff --git a/backend/services/browser_service.go b/backend/services/browser_service.go index b0219b0..16a2f91 100644 --- a/backend/services/browser_service.go +++ b/backend/services/browser_service.go @@ -234,60 +234,63 @@ func (b *browserService) CloseConnection(name string) (resp types.JSResp) { return } +func (b *browserService) createRedisClient(selConn types.ConnectionConfig) (client redis.UniversalClient, err error) { + hook := redis2.NewHook(selConn.Name, func(cmd string, cost int64) { + now := time.Now() + //last := strings.LastIndex(cmd, ":") + //if last != -1 { + // cmd = cmd[:last] + //} + b.cmdHistory = append(b.cmdHistory, cmdHistoryItem{ + Timestamp: now.UnixMilli(), + Server: selConn.Name, + Cmd: cmd, + Cost: cost, + }) + }) + + client, err = Connection().createRedisClient(selConn) + if err != nil { + err = fmt.Errorf("create conenction error: %s", err.Error()) + return + } + + _ = client.Do(b.ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err() + // add hook to each node in cluster mode + if cluster, ok := client.(*redis.ClusterClient); ok { + err = cluster.ForEachShard(b.ctx, func(ctx context.Context, cli *redis.Client) error { + cli.AddHook(hook) + return nil + }) + if err != nil { + err = fmt.Errorf("get cluster nodes error: %s", err.Error()) + return + } + } else { + client.AddHook(hook) + } + + if _, err = client.Ping(b.ctx).Result(); err != nil && !errors.Is(err, redis.Nil) { + err = errors.New("can not connect to redis server:" + err.Error()) + return + } + return +} + // get a redis client from local cache or create a new open // if db >= 0, will also switch to db index -func (b *browserService) getRedisClient(connName string, db int) (item *connectionItem, err error) { +func (b *browserService) getRedisClient(server string, db int) (item *connectionItem, err error) { var ok bool var client redis.UniversalClient - if item, ok = b.connMap[connName]; ok { + if item, ok = b.connMap[server]; ok { client = item.client } else { - selConn := Connection().getConnection(connName) + selConn := Connection().getConnection(server) if selConn == nil { - err = fmt.Errorf("no match connection \"%s\"", connName) - return - } - - hook := redis2.NewHook(connName, func(cmd string, cost int64) { - now := time.Now() - //last := strings.LastIndex(cmd, ":") - //if last != -1 { - // cmd = cmd[:last] - //} - b.cmdHistory = append(b.cmdHistory, cmdHistoryItem{ - Timestamp: now.UnixMilli(), - Server: connName, - Cmd: cmd, - Cost: cost, - }) - }) - - client, err = Connection().createRedisClient(selConn.ConnectionConfig) - if err != nil { - err = fmt.Errorf("create conenction error: %s", err.Error()) - return - } - - _ = client.Do(b.ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err() - // add hook to each node in cluster mode - var cluster *redis.ClusterClient - if cluster, ok = client.(*redis.ClusterClient); ok { - err = cluster.ForEachShard(b.ctx, func(ctx context.Context, cli *redis.Client) error { - cli.AddHook(hook) - return nil - }) - if err != nil { - err = fmt.Errorf("get cluster nodes error: %s", err.Error()) - return - } - } else { - client.AddHook(hook) - } - - if _, err = client.Ping(b.ctx).Result(); err != nil && err != redis.Nil { - err = errors.New("can not connect to redis server:" + err.Error()) + err = fmt.Errorf("no match connection \"%s\"", server) return } + client, err = b.createRedisClient(selConn.ConnectionConfig) ctx, cancelFunc := context.WithCancel(b.ctx) item = &connectionItem{ client: client, @@ -300,7 +303,7 @@ func (b *browserService) getRedisClient(connName string, db int) (item *connecti if item.stepSize <= 0 { item.stepSize = consts.DEFAULT_LOAD_SIZE } - b.connMap[connName] = item + b.connMap[server] = item } // BUG: go-redis might not be executing commands on the corresponding database @@ -315,7 +318,7 @@ func (b *browserService) getRedisClient(connName string, db int) (item *connecti return } item.db = db - b.connMap[connName].db = db + b.connMap[server].db = db } } return @@ -328,12 +331,12 @@ func (b *browserService) loadDBSize(ctx context.Context, client redis.UniversalC } // save current scan cursor -func (b *browserService) setClientCursor(connName string, db int, cursor uint64) { - if _, ok := b.connMap[connName]; ok { +func (b *browserService) setClientCursor(server string, db int, cursor uint64) { + if _, ok := b.connMap[server]; ok { if cursor == 0 { - delete(b.connMap[connName].cursor, db) + delete(b.connMap[server].cursor, db) } else { - b.connMap[connName].cursor[db] = cursor + b.connMap[server].cursor[db] = cursor } } } @@ -1940,6 +1943,89 @@ func (b *browserService) DeleteOneKey(server string, db int, k any) (resp types. return } +// DeleteKeys delete keys sync with notification +func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo string) (resp types.JSResp) { + // connect a new connection to export keys + conf := Connection().getConnection(server) + if conf == nil { + resp.Msg = fmt.Sprintf("no connection profile named: %s", server) + return + } + var client redis.UniversalClient + var err error + var connConfig = conf.ConnectionConfig + connConfig.LastDB = db + if client, err = b.createRedisClient(connConfig); err != nil { + resp.Msg = err.Error() + return + } + ctx, cancelFunc := context.WithCancel(b.ctx) + defer client.Close() + defer cancelFunc() + + cancelEvent := "delete:stop:" + serialNo + runtime.EventsOnce(ctx, cancelEvent, func(data ...any) { + cancelFunc() + }) + processEvent := "deleting:" + serialNo + total := len(ks) + var failed atomic.Int64 + var canceled bool + var deletedKeys = make([]any, 0, total) + var mutex sync.Mutex + del := func(ctx context.Context, cli redis.UniversalClient) error { + for i, k := range ks { + // emit progress per second + param := map[string]any{ + "total": total, + "progress": i + 1, + "processing": k, + } + runtime.EventsEmit(b.ctx, processEvent, param) + + key := strutil.DecodeRedisKey(k) + delErr := cli.Del(ctx, key).Err() + // do some sleep to prevent blocking the Redis server + time.Sleep(100 * time.Microsecond) + if err != nil { + failed.Add(1) + } else { + // save deleted key + mutex.Lock() + deletedKeys = append(deletedKeys, k) + mutex.Unlock() + } + if errors.Is(delErr, context.Canceled) || canceled { + canceled = true + break + } + } + return nil + } + + if cluster, ok := client.(*redis.ClusterClient); ok { + // cluster mode + err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error { + return del(ctx, cli) + }) + } else { + err = del(ctx, client) + } + + runtime.EventsOff(ctx, cancelEvent) + resp.Success = true + resp.Data = struct { + Canceled bool `json:"canceled"` + Deleted any `json:"deleted"` + Failed int64 `json:"failed"` + }{ + Canceled: canceled, + Deleted: deletedKeys, + Failed: failed.Load(), + } + return +} + // ExportKey export keys func (b *browserService) ExportKey(server string, db int, ks []any, path string) (resp types.JSResp) { // connect a new connection to export keys @@ -1952,12 +2038,12 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string) var err error var connConfig = conf.ConnectionConfig connConfig.LastDB = db - if client, err = Connection().createRedisClient(connConfig); err != nil { + if client, err = b.createRedisClient(connConfig); err != nil { resp.Msg = err.Error() return } - // TODO: add cancel handle ctx, cancelFunc := context.WithCancel(b.ctx) + defer client.Close() defer cancelFunc() file, err := os.Create(path) @@ -1970,7 +2056,8 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string) writer := csv.NewWriter(file) defer writer.Flush() - runtime.EventsOnce(ctx, "export:stop:"+path, func(data ...any) { + cancelEvent := "export:stop:" + path + runtime.EventsOnce(ctx, cancelEvent, func(data ...any) { cancelFunc() }) processEvent := "exporting:" + path @@ -1987,7 +2074,7 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string) key := strutil.DecodeRedisKey(k) content, dumpErr := client.Dump(ctx, key).Bytes() - if errors.Is(dumpErr, context.Canceled) { + if errors.Is(dumpErr, context.Canceled) || canceled { canceled = true break } @@ -1998,6 +2085,7 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string) } } + runtime.EventsOff(ctx, cancelEvent) resp.Success = true resp.Data = struct { Canceled bool `json:"canceled"` diff --git a/frontend/src/components/dialogs/DeleteKeyDialog.vue b/frontend/src/components/dialogs/DeleteKeyDialog.vue index 0fc260a..209e634 100644 --- a/frontend/src/components/dialogs/DeleteKeyDialog.vue +++ b/frontend/src/components/dialogs/DeleteKeyDialog.vue @@ -1,5 +1,5 @@