perf: significantly improve batch deletion performance with pipeline #123
This commit is contained in:
parent
655cd539ca
commit
cdac3c4496
|
@ -2073,7 +2073,7 @@ func (b *browserService) DeleteOneKey(server string, db int, k any) (resp types.
|
|||
}
|
||||
|
||||
// DeleteKeys delete keys sync with notification
|
||||
func (b *browserService) DeleteKeys(server string, db int, ks []any, notice bool, serialNo string) (resp types.JSResp) {
|
||||
func (b *browserService) DeleteKeys(server string, db int, ks []any, serialNo string) (resp types.JSResp) {
|
||||
// connect a new connection to export keys
|
||||
conf := Connection().getConnection(server)
|
||||
if conf == nil {
|
||||
|
@ -2096,47 +2096,28 @@ func (b *browserService) DeleteKeys(server string, db int, ks []any, notice bool
|
|||
cancelStopEvent := runtime.EventsOnce(ctx, cancelEvent, func(data ...any) {
|
||||
cancelFunc()
|
||||
})
|
||||
processEvent := "deleting:" + serialNo
|
||||
total := len(ks)
|
||||
var failed atomic.Int64
|
||||
var canceled bool
|
||||
var deletedKeys = make([]any, 0, total)
|
||||
var mutex sync.Mutex
|
||||
del := func(ctx context.Context, cli redis.UniversalClient) error {
|
||||
startTime := time.Now().Add(-10 * time.Second)
|
||||
supportUnlink := true
|
||||
for i, k := range ks {
|
||||
// emit progress per second
|
||||
if notice && (i >= total-1 || time.Now().Sub(startTime).Milliseconds() > 100) {
|
||||
startTime = time.Now()
|
||||
param := map[string]any{
|
||||
"total": total,
|
||||
"progress": i + 1,
|
||||
"processing": k,
|
||||
}
|
||||
runtime.EventsEmit(ctx, processEvent, param)
|
||||
// do some sleep to prevent blocking the Redis server
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
key := strutil.DecodeRedisKey(k)
|
||||
var delErr error
|
||||
if supportUnlink {
|
||||
if delErr = cli.Unlink(ctx, key).Err(); delErr != nil {
|
||||
supportUnlink = false
|
||||
delErr = nil
|
||||
const batchSize = 1000
|
||||
for i := 0; i < total; i += batchSize {
|
||||
pipe := cli.Pipeline()
|
||||
for j := 0; j < batchSize; j++ {
|
||||
if i+j < total {
|
||||
pipe.Del(ctx, strutil.DecodeRedisKey(ks[i+j]))
|
||||
}
|
||||
}
|
||||
if !supportUnlink {
|
||||
delErr = cli.Del(ctx, key).Err()
|
||||
}
|
||||
if notice {
|
||||
if delErr != nil {
|
||||
cmders, delErr := pipe.Exec(ctx)
|
||||
for j, cmder := range cmders {
|
||||
if cmder.(*redis.IntCmd).Val() != 1 {
|
||||
failed.Add(1)
|
||||
} else {
|
||||
// save deleted key
|
||||
mutex.Lock()
|
||||
deletedKeys = append(deletedKeys, k)
|
||||
deletedKeys = append(deletedKeys, ks[i+j])
|
||||
mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ const onConfirmDelete = async () => {
|
|||
deleting.value = true
|
||||
const { server, db, key, affectedKeys } = deleteForm
|
||||
await nextTick()
|
||||
browserStore.deleteKeys(server, db, affectedKeys, !deleteForm.async).catch((e) => {})
|
||||
browserStore.deleteKeys(server, db, affectedKeys).catch((e) => {})
|
||||
} catch (e) {
|
||||
$message.error(e.message)
|
||||
return
|
||||
|
@ -115,9 +115,9 @@ const onClose = () => {
|
|||
required>
|
||||
<n-input v-model:value="deleteForm.key" placeholder="" @input="resetAffected" />
|
||||
</n-form-item>
|
||||
<n-checkbox v-model:checked="deleteForm.async">
|
||||
{{ $t('dialogue.key.silent') }}
|
||||
</n-checkbox>
|
||||
<!-- <n-checkbox v-model:checked="deleteForm.async">-->
|
||||
<!-- {{ $t('dialogue.key.silent') }}-->
|
||||
<!-- </n-checkbox>-->
|
||||
<n-card
|
||||
v-if="deleteForm.showAffected"
|
||||
:title="$t('dialogue.key.affected_key') + `(${size(deleteForm.affectedKeys)})`"
|
||||
|
|
|
@ -263,7 +263,6 @@
|
|||
"confirm_delete_key": "Confirm Delete {num} Key(s)",
|
||||
"async_delete": "Asynchronously Execute",
|
||||
"async_delete_title": "Do not waiting for the operation's result",
|
||||
"silent": "Do not display deletion status in real time",
|
||||
"confirm_flush": "I know what I'm doing!",
|
||||
"confirm_flush_db": "Confirm Flush Database"
|
||||
},
|
||||
|
@ -271,8 +270,7 @@
|
|||
"success": "\"{key}\" has been deleted",
|
||||
"deleting": "Deleting",
|
||||
"doing": "Deleting key({index}/{count})",
|
||||
"completed": "Deletion process has been completed",
|
||||
"completed_status": "{success} successed, {fail} failed"
|
||||
"completed": "Deletion process has been completed, {success} successed, {fail} failed"
|
||||
},
|
||||
"field": {
|
||||
"new": "Add New Field",
|
||||
|
|
|
@ -263,7 +263,6 @@
|
|||
"confirm_delete_key": "确认删除{num}个键",
|
||||
"async_delete": "异步执行",
|
||||
"async_delete_title": "不等待操作结果",
|
||||
"silent": "不实时显示删除状态",
|
||||
"confirm_flush": "我知道我正在执行的操作!",
|
||||
"confirm_flush_db": "确认清空数据库"
|
||||
},
|
||||
|
@ -271,8 +270,7 @@
|
|||
"success": "{key} 已被删除",
|
||||
"deleting": "正在删除",
|
||||
"doing": "正在删除键({index}/{count})",
|
||||
"completed": "已完成删除操作",
|
||||
"completed_status": "成功{success}个,失败{fail}个"
|
||||
"completed": "已完成删除操作,成功{success}个,失败{fail}个"
|
||||
},
|
||||
"field": {
|
||||
"new": "添加新字段",
|
||||
|
|
|
@ -36,7 +36,7 @@ import {
|
|||
UpdateZSetValue,
|
||||
} from 'wailsjs/go/services/browserService.js'
|
||||
import useTabStore from 'stores/tab.js'
|
||||
import { decodeRedisKey, nativeRedisKey } from '@/utils/key_convert.js'
|
||||
import { nativeRedisKey } from '@/utils/key_convert.js'
|
||||
import { BrowserTabType } from '@/consts/browser_tab_type.js'
|
||||
import { KeyViewType } from '@/consts/key_view_type.js'
|
||||
import { ConnectionType } from '@/consts/connection_type.js'
|
||||
|
@ -1568,7 +1568,7 @@ const useBrowserStore = defineStore('browser', {
|
|||
// $message.error(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
// } else {
|
||||
// // some fail
|
||||
// $message.warn(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
// $message.warning(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
// }
|
||||
},
|
||||
|
||||
|
@ -1614,33 +1614,19 @@ const useBrowserStore = defineStore('browser', {
|
|||
* @param {string} server
|
||||
* @param {number} db
|
||||
* @param {string[]|number[][]} keys
|
||||
* @param {boolean} notice
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async deleteKeys(server, db, keys, notice) {
|
||||
async deleteKeys(server, db, keys) {
|
||||
const msgRef = $message.loading(i18nGlobal.t('dialogue.delete.deleting'), { duration: 0, closable: true })
|
||||
let deleted = []
|
||||
let failCount = 0
|
||||
let canceled = false
|
||||
const serialNo = Date.now().valueOf().toString()
|
||||
let maxProgress = 0
|
||||
const cancelEventFn = EventsOn('deleting:' + serialNo, ({ total, progress, processing }) => {
|
||||
// update delete progress
|
||||
if (progress > maxProgress) {
|
||||
maxProgress = progress
|
||||
}
|
||||
const k = decodeRedisKey(processing)
|
||||
msgRef.content = i18nGlobal.t('dialogue.delete.doing', {
|
||||
key: k,
|
||||
index: maxProgress,
|
||||
count: total,
|
||||
})
|
||||
})
|
||||
msgRef.onClose = () => {
|
||||
EventsEmit('delete:stop:' + serialNo)
|
||||
}
|
||||
try {
|
||||
const { data, success, msg } = await DeleteKeys(server, db, keys, notice, serialNo)
|
||||
const { success, msg, data } = await DeleteKeys(server, db, keys, serialNo)
|
||||
if (success) {
|
||||
canceled = get(data, 'canceled', false)
|
||||
deleted = get(data, 'deleted', [])
|
||||
|
@ -1649,7 +1635,6 @@ const useBrowserStore = defineStore('browser', {
|
|||
$message.error(msg)
|
||||
}
|
||||
} finally {
|
||||
cancelEventFn()
|
||||
msgRef.destroy()
|
||||
// clear checked keys
|
||||
const tab = useTabStore()
|
||||
|
@ -1661,40 +1646,13 @@ const useBrowserStore = defineStore('browser', {
|
|||
$message.info(i18nGlobal.t('dialogue.handle_cancel'))
|
||||
} else if (failCount <= 0) {
|
||||
// no fail
|
||||
let msg = i18nGlobal.t('dialogue.delete.completed')
|
||||
if (notice) {
|
||||
msg +=
|
||||
'\n' +
|
||||
i18nGlobal.t('dialogue.delete.completed_status', {
|
||||
success: deletedCount,
|
||||
fail: failCount,
|
||||
})
|
||||
}
|
||||
$message.success(msg)
|
||||
$message.success(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
} else if (failCount >= deletedCount) {
|
||||
// all fail
|
||||
let msg = i18nGlobal.t('dialogue.delete.completed')
|
||||
if (notice) {
|
||||
msg +=
|
||||
'\n' +
|
||||
i18nGlobal.t('dialogue.delete.completed_status', {
|
||||
success: deletedCount,
|
||||
fail: failCount,
|
||||
})
|
||||
}
|
||||
$message.error(msg)
|
||||
$message.error(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
} else {
|
||||
// some fail
|
||||
let msg = i18nGlobal.t('dialogue.delete.completed')
|
||||
if (notice) {
|
||||
msg +=
|
||||
'\n' +
|
||||
i18nGlobal.t('dialogue.delete.completed_status', {
|
||||
success: deletedCount,
|
||||
fail: failCount,
|
||||
})
|
||||
}
|
||||
$message.warn(msg)
|
||||
$message.warning(i18nGlobal.t('dialogue.delete.completed', { success: deletedCount, fail: failCount }))
|
||||
}
|
||||
// update ui
|
||||
timeout(100).then(async () => {
|
||||
|
@ -1765,7 +1723,12 @@ const useBrowserStore = defineStore('browser', {
|
|||
$message.error(i18nGlobal.t('dialogue.export.export_completed', { success: exported, fail: failCount }))
|
||||
} else {
|
||||
// some fail
|
||||
$message.warn(i18nGlobal.t('dialogue.export.export_completed', { success: exported, fail: failCount }))
|
||||
$message.warning(
|
||||
i18nGlobal.t('dialogue.export.export_completed', {
|
||||
success: exported,
|
||||
fail: failCount,
|
||||
}),
|
||||
)
|
||||
}
|
||||
},
|
||||
|
||||
|
|
Loading…
Reference in New Issue