refactor: refactor the method for delete selected keys

This commit is contained in:
Lykin 2023-12-19 20:10:01 +08:00
parent bce4e2323e
commit f98229b9fa
5 changed files with 202 additions and 77 deletions

View File

@ -234,60 +234,63 @@ func (b *browserService) CloseConnection(name string) (resp types.JSResp) {
return
}
func (b *browserService) createRedisClient(selConn types.ConnectionConfig) (client redis.UniversalClient, err error) {
hook := redis2.NewHook(selConn.Name, func(cmd string, cost int64) {
now := time.Now()
//last := strings.LastIndex(cmd, ":")
//if last != -1 {
// cmd = cmd[:last]
//}
b.cmdHistory = append(b.cmdHistory, cmdHistoryItem{
Timestamp: now.UnixMilli(),
Server: selConn.Name,
Cmd: cmd,
Cost: cost,
})
})
client, err = Connection().createRedisClient(selConn)
if err != nil {
err = fmt.Errorf("create conenction error: %s", err.Error())
return
}
_ = client.Do(b.ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err()
// add hook to each node in cluster mode
if cluster, ok := client.(*redis.ClusterClient); ok {
err = cluster.ForEachShard(b.ctx, func(ctx context.Context, cli *redis.Client) error {
cli.AddHook(hook)
return nil
})
if err != nil {
err = fmt.Errorf("get cluster nodes error: %s", err.Error())
return
}
} else {
client.AddHook(hook)
}
if _, err = client.Ping(b.ctx).Result(); err != nil && !errors.Is(err, redis.Nil) {
err = errors.New("can not connect to redis server:" + err.Error())
return
}
return
}
// get a redis client from local cache or create a new open
// if db >= 0, will also switch to db index
func (b *browserService) getRedisClient(connName string, db int) (item *connectionItem, err error) {
func (b *browserService) getRedisClient(server string, db int) (item *connectionItem, err error) {
var ok bool
var client redis.UniversalClient
if item, ok = b.connMap[connName]; ok {
if item, ok = b.connMap[server]; ok {
client = item.client
} else {
selConn := Connection().getConnection(connName)
selConn := Connection().getConnection(server)
if selConn == nil {
err = fmt.Errorf("no match connection \"%s\"", connName)
return
}
hook := redis2.NewHook(connName, func(cmd string, cost int64) {
now := time.Now()
//last := strings.LastIndex(cmd, ":")
//if last != -1 {
// cmd = cmd[:last]
//}
b.cmdHistory = append(b.cmdHistory, cmdHistoryItem{
Timestamp: now.UnixMilli(),
Server: connName,
Cmd: cmd,
Cost: cost,
})
})
client, err = Connection().createRedisClient(selConn.ConnectionConfig)
if err != nil {
err = fmt.Errorf("create conenction error: %s", err.Error())
return
}
_ = client.Do(b.ctx, "CLIENT", "SETNAME", url.QueryEscape(selConn.Name)).Err()
// add hook to each node in cluster mode
var cluster *redis.ClusterClient
if cluster, ok = client.(*redis.ClusterClient); ok {
err = cluster.ForEachShard(b.ctx, func(ctx context.Context, cli *redis.Client) error {
cli.AddHook(hook)
return nil
})
if err != nil {
err = fmt.Errorf("get cluster nodes error: %s", err.Error())
return
}
} else {
client.AddHook(hook)
}
if _, err = client.Ping(b.ctx).Result(); err != nil && err != redis.Nil {
err = errors.New("can not connect to redis server:" + err.Error())
err = fmt.Errorf("no match connection \"%s\"", server)
return
}
client, err = b.createRedisClient(selConn.ConnectionConfig)
ctx, cancelFunc := context.WithCancel(b.ctx)
item = &connectionItem{
client: client,
@ -300,7 +303,7 @@ func (b *browserService) getRedisClient(connName string, db int) (item *connecti
if item.stepSize <= 0 {
item.stepSize = consts.DEFAULT_LOAD_SIZE
}
b.connMap[connName] = item
b.connMap[server] = item
}
// BUG: go-redis might not be executing commands on the corresponding database
@ -315,7 +318,7 @@ func (b *browserService) getRedisClient(connName string, db int) (item *connecti
return
}
item.db = db
b.connMap[connName].db = db
b.connMap[server].db = db
}
}
return
@ -328,12 +331,12 @@ func (b *browserService) loadDBSize(ctx context.Context, client redis.UniversalC
}
// save current scan cursor
func (b *browserService) setClientCursor(connName string, db int, cursor uint64) {
if _, ok := b.connMap[connName]; ok {
func (b *browserService) setClientCursor(server string, db int, cursor uint64) {
if _, ok := b.connMap[server]; ok {
if cursor == 0 {
delete(b.connMap[connName].cursor, db)
delete(b.connMap[server].cursor, db)
} else {
b.connMap[connName].cursor[db] = cursor
b.connMap[server].cursor[db] = cursor
}
}
}
@ -1940,6 +1943,89 @@ func (b *browserService) DeleteOneKey(server string, db int, k any) (resp types.
return
}
// DeleteKeys delete keys sync with notification
func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo string) (resp types.JSResp) {
// connect a new connection to export keys
conf := Connection().getConnection(server)
if conf == nil {
resp.Msg = fmt.Sprintf("no connection profile named: %s", server)
return
}
var client redis.UniversalClient
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = b.createRedisClient(connConfig); err != nil {
resp.Msg = err.Error()
return
}
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()
cancelEvent := "delete:stop:" + serialNo
runtime.EventsOnce(ctx, cancelEvent, func(data ...any) {
cancelFunc()
})
processEvent := "deleting:" + serialNo
total := len(ks)
var failed atomic.Int64
var canceled bool
var deletedKeys = make([]any, 0, total)
var mutex sync.Mutex
del := func(ctx context.Context, cli redis.UniversalClient) error {
for i, k := range ks {
// emit progress per second
param := map[string]any{
"total": total,
"progress": i + 1,
"processing": k,
}
runtime.EventsEmit(b.ctx, processEvent, param)
key := strutil.DecodeRedisKey(k)
delErr := cli.Del(ctx, key).Err()
// do some sleep to prevent blocking the Redis server
time.Sleep(100 * time.Microsecond)
if err != nil {
failed.Add(1)
} else {
// save deleted key
mutex.Lock()
deletedKeys = append(deletedKeys, k)
mutex.Unlock()
}
if errors.Is(delErr, context.Canceled) || canceled {
canceled = true
break
}
}
return nil
}
if cluster, ok := client.(*redis.ClusterClient); ok {
// cluster mode
err = cluster.ForEachMaster(ctx, func(ctx context.Context, cli *redis.Client) error {
return del(ctx, cli)
})
} else {
err = del(ctx, client)
}
runtime.EventsOff(ctx, cancelEvent)
resp.Success = true
resp.Data = struct {
Canceled bool `json:"canceled"`
Deleted any `json:"deleted"`
Failed int64 `json:"failed"`
}{
Canceled: canceled,
Deleted: deletedKeys,
Failed: failed.Load(),
}
return
}
// ExportKey export keys
func (b *browserService) ExportKey(server string, db int, ks []any, path string) (resp types.JSResp) {
// connect a new connection to export keys
@ -1952,12 +2038,12 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
var err error
var connConfig = conf.ConnectionConfig
connConfig.LastDB = db
if client, err = Connection().createRedisClient(connConfig); err != nil {
if client, err = b.createRedisClient(connConfig); err != nil {
resp.Msg = err.Error()
return
}
// TODO: add cancel handle
ctx, cancelFunc := context.WithCancel(b.ctx)
defer client.Close()
defer cancelFunc()
file, err := os.Create(path)
@ -1970,7 +2056,8 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
writer := csv.NewWriter(file)
defer writer.Flush()
runtime.EventsOnce(ctx, "export:stop:"+path, func(data ...any) {
cancelEvent := "export:stop:" + path
runtime.EventsOnce(ctx, cancelEvent, func(data ...any) {
cancelFunc()
})
processEvent := "exporting:" + path
@ -1987,7 +2074,7 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
key := strutil.DecodeRedisKey(k)
content, dumpErr := client.Dump(ctx, key).Bytes()
if errors.Is(dumpErr, context.Canceled) {
if errors.Is(dumpErr, context.Canceled) || canceled {
canceled = true
break
}
@ -1998,6 +2085,7 @@ func (b *browserService) ExportKey(server string, db int, ks []any, path string)
}
}
runtime.EventsOff(ctx, cancelEvent)
resp.Success = true
resp.Data = struct {
Canceled bool `json:"canceled"`

View File

@ -1,5 +1,5 @@
<script setup>
import { computed, reactive, ref, watch } from 'vue'
import { computed, nextTick, reactive, ref, watch } from 'vue'
import useDialog from 'stores/dialog'
import { useI18n } from 'vue-i18n'
import { isEmpty, map, size } from 'lodash'
@ -70,6 +70,7 @@ const onConfirmDelete = async () => {
try {
deleting.value = true
const { server, db, key, affectedKeys } = deleteForm
await nextTick()
browserStore.deleteKeys(server, db, affectedKeys).catch((e) => {})
} catch (e) {
$message.error(e.message)

View File

@ -143,7 +143,7 @@
"remove_tip": "{type} \"{name}\" will be deleted",
"remove_group_tip": "Group \"{name}\" and all connections in it will be deleted",
"delete_key_succ": "\"{key}\" has been deleted",
"deleting_key": "Deleting key: {key} ({index}/{count})",
"deleting_key": "Deleting key({index}/{count}): {key}",
"delete_completed": "Deletion process has been completed, {success} successed, {fail} failed",
"rename_binary_key_fail": "Rename binary key name is unsupported",
"handle_succ": "Success!",
@ -276,7 +276,7 @@
"export": "Export",
"save_file": "Export Path",
"save_file_tip": "Select the export file save path",
"exporting": "Exporting key: {key} ({index}/{count})",
"exporting": "Exporting key({index}/{count}): {key}",
"export_completed": "Export process has been completed, {success} successed, {fail} failed"
},
"ttl": {

View File

@ -143,7 +143,7 @@
"remove_tip": "{type} \"{name}\" 将会被删除",
"remove_group_tip": "分组 \"{name}\"及其所有连接将会被删除",
"delete_key_succ": "{key} 已被删除",
"deleting_key": "正在删除键{key} ({index}/{count})",
"deleting_key": "正在删除键({index}/{count}){key}",
"delete_completed": "已完成删除操作,成功{success}个,失败{fail}个",
"rename_binary_key_fail": "不支持重命名二进制键名",
"handle_succ": "操作成功",
@ -275,7 +275,7 @@
"export": "确认导出",
"save_file": "导出路径",
"save_file_tip": "选择保存文件路径",
"exporting": "正在导出键{key} ({index}/{count})",
"exporting": "正在导出键({index}/{count}){key}",
"export_completed": "已完成导出操作,成功{success}个,失败{fail}个"
},
"ttl": {

View File

@ -25,7 +25,7 @@ import {
CloseConnection,
ConvertValue,
DeleteKey,
DeleteOneKey,
DeleteKeys,
ExportKey,
FlushDB,
GetCmdHistory,
@ -1974,35 +1974,52 @@ const useBrowserStore = defineStore('browser', {
*/
async deleteKeys(server, db, keys) {
const delMsgRef = $message.loading('', { duration: 0, closable: true })
let progress = 0
let count = size(keys)
let deletedCount = 0
let deleted = []
let failCount = 0
let canceled = false
const serialNo = Date.now().valueOf().toString()
const eventName = 'deleting:' + serialNo
const cancelEvent = 'deleting:' + serialNo
try {
for (const key of keys) {
delMsgRef.content = i18nGlobal.t('dialogue.deleting_key', {
key: decodeRedisKey(key),
index: ++progress,
count,
})
const { success } = await DeleteOneKey(server, db, key)
if (success) {
this._deleteKeyNode(server, db, key, false)
deletedCount += 1
} else {
failCount += 1
let maxProgress = 0
EventsOn(eventName, ({ total, progress, processing }) => {
// update delete progress
if (progress > maxProgress) {
maxProgress = progress
}
const k = decodeRedisKey(processing)
delMsgRef.content = i18nGlobal.t('dialogue.deleting_key', {
key: k,
index: maxProgress,
count: total,
})
// this._deleteKeyNode(server, db, k, false)
})
delMsgRef.onClose = () => {
EventsEmit(cancelEvent)
}
const { data, success, msg } = await DeleteKeys(server, db, keys, serialNo)
if (success) {
canceled = get(data, 'canceled', false)
deleted = get(data, 'deleted', [])
failCount = get(data, 'failed', 0)
} else {
$message.error(msg)
}
} finally {
delMsgRef.destroy()
EventsOff(eventName)
// clear checked keys
const tab = useTabStore()
tab.setCheckedKeys(server)
}
// refresh model data
const deletedCount = size(deleted)
this._tidyNode(server, db, '', true)
this._updateDBMaxKeys(server, db, -deletedCount)
if (failCount <= 0) {
if (canceled) {
$message.info(i18nGlobal.t('dialogue.handle_cancel'))
} else if (failCount <= 0) {
// no fail
$message.success(i18nGlobal.t('dialogue.delete_completed', { success: deletedCount, fail: failCount }))
} else if (failCount >= deletedCount) {
@ -2012,6 +2029,25 @@ const useBrowserStore = defineStore('browser', {
// some fail
$message.warn(i18nGlobal.t('dialogue.delete_completed', { success: deletedCount, fail: failCount }))
}
// FIXME: update tree view
// if (!isEmpty(deleted)) {
// let updateDeleted = []
// let count = size(deleted)
// for (const k of deleted) {
// updateDeleted.push(k)
// console.log(count)
// count -= 1
// if (size(updateDeleted) > 100 || count <= 0) {
// for (const dk of updateDeleted) {
// this._deleteKeyNode(server, db, dk, false)
// await nextTick()
// }
// updateDeleted = []
// console.warn('updateDeleted:', updateDeleted)
// }
// }
// }
},
/**