feat: support pub/sub

This commit is contained in:
Lykin 2024-01-17 20:47:25 +08:00
parent ffed68ae4c
commit c76a0a505f
14 changed files with 668 additions and 18 deletions

View File

@ -0,0 +1,184 @@
package services
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"github.com/wailsapp/wails/v2/pkg/runtime"
"strconv"
"sync"
"time"
"tinyrdm/backend/types"
)
type pubsubItem struct {
client *redis.Client
pubsub *redis.PubSub
mutex sync.Mutex
closeCh chan struct{}
eventName string
}
type subMessage struct {
Timestamp int64 `json:"timestamp"`
Channel string `json:"channel"`
Message string `json:"message"`
}
type pubsubService struct {
ctx context.Context
ctxCancel context.CancelFunc
mutex sync.Mutex
items map[string]*pubsubItem
}
var pubsub *pubsubService
var oncePubsub sync.Once
func Pubsub() *pubsubService {
if pubsub == nil {
oncePubsub.Do(func() {
pubsub = &pubsubService{
items: map[string]*pubsubItem{},
}
})
}
return pubsub
}
func (p *pubsubService) getItem(server string) (*pubsubItem, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
item, ok := p.items[server]
if !ok {
var err error
conf := Connection().getConnection(server)
if conf == nil {
return nil, fmt.Errorf("no connection profile named: %s", server)
}
var uniClient redis.UniversalClient
if uniClient, err = Connection().createRedisClient(conf.ConnectionConfig); err != nil {
return nil, err
}
var client *redis.Client
if client, ok = uniClient.(*redis.Client); !ok {
return nil, errors.New("create redis client fail")
}
item = &pubsubItem{
client: client,
}
p.items[server] = item
}
return item, nil
}
func (p *pubsubService) Start(ctx context.Context) {
p.ctx, p.ctxCancel = context.WithCancel(ctx)
}
// Publish publish message to channel
func (p *pubsubService) Publish(server, channel, payload string) (resp types.JSResp) {
rdb, err := Browser().getRedisClient(server, -1)
if err != nil {
resp.Msg = err.Error()
return
}
var received int64
received, err = rdb.client.Publish(p.ctx, channel, payload).Result()
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
resp.Data = struct {
Received int64 `json:"received"`
}{
Received: received,
}
return
}
// StartSubscribe start to subscribe a channel
func (p *pubsubService) StartSubscribe(server string) (resp types.JSResp) {
item, err := p.getItem(server)
if err != nil {
resp.Msg = err.Error()
return
}
item.closeCh = make(chan struct{})
item.eventName = "sub:" + strconv.Itoa(int(time.Now().Unix()))
item.pubsub = item.client.PSubscribe(p.ctx, "*")
go p.processSubscribe(&item.mutex, item.pubsub.Channel(), item.closeCh, item.eventName)
resp.Success = true
resp.Data = struct {
EventName string `json:"eventName"`
}{
EventName: item.eventName,
}
return
}
func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Message, closeCh <-chan struct{}, eventName string) {
lastEmitTime := time.Now().Add(-1 * time.Minute)
cache := make([]subMessage, 0, 1000)
for {
select {
case data := <-ch:
go func() {
timestamp := time.Now().UnixMilli()
mutex.Lock()
defer mutex.Unlock()
cache = append(cache, subMessage{
Timestamp: timestamp,
Channel: data.Channel,
Message: data.Payload,
})
if time.Now().Sub(lastEmitTime) > 300*time.Millisecond || len(cache) > 300 {
runtime.EventsEmit(p.ctx, eventName, cache)
cache = cache[:0:cap(cache)]
lastEmitTime = time.Now()
}
}()
case <-closeCh:
// subscribe stopped
return
}
}
}
// StopSubscribe stop subscribe by server name
func (p *pubsubService) StopSubscribe(server string) (resp types.JSResp) {
p.mutex.Lock()
defer p.mutex.Unlock()
item, ok := p.items[server]
if !ok || item.pubsub == nil {
resp.Success = true
return
}
//item.pubsub.Unsubscribe(p.ctx, "*")
item.pubsub.Close()
close(item.closeCh)
delete(p.items, server)
resp.Success = true
return
}
// StopAll stop all subscribe
func (p *pubsubService) StopAll() {
if p.ctxCancel != nil {
p.ctxCancel()
}
for server := range p.items {
p.StopSubscribe(server)
}
}

View File

@ -31,6 +31,8 @@ const props = defineProps({
buttonStyle: [String, Object],
buttonClass: [String, Object],
small: Boolean,
secondary: Boolean,
tertiary: Boolean,
})
const hasTooltip = computed(() => {
@ -44,13 +46,15 @@ const hasTooltip = computed(() => {
<n-button
:class="props.buttonClass"
:color="props.color"
:disabled="disabled"
:disabled="props.disabled"
:focusable="false"
:loading="loading"
:size="small ? 'small' : ''"
:secondary="props.secondary"
:size="props.small ? 'small' : ''"
:style="props.buttonStyle"
:text="!border"
:type="type"
:tertiary="props.tertiary"
:text="!props.border"
:type="props.type"
@click.prevent="emit('click')">
<template #icon>
<slot>
@ -67,13 +71,15 @@ const hasTooltip = computed(() => {
v-else
:class="props.buttonClass"
:color="props.color"
:disabled="disabled"
:disabled="props.disabled"
:focusable="false"
:loading="loading"
:size="small ? 'small' : ''"
:secondary="props.secondary"
:size="props.small ? 'small' : ''"
:style="props.buttonStyle"
:text="!border"
:type="type"
:tertiary="props.tertiary"
:text="!props.border"
:type="props.type"
@click.prevent="emit('click')">
<template #icon>
<slot>

View File

@ -340,7 +340,6 @@ const changeHistory = (prev) => {
*/
const flushTermInput = (flushCmd = false) => {
const currentLine = getCurrentInput()
console.log('===send cmd', currentLine, currentLine.length)
EventsEmit(`cmd:input:${props.name}`, currentLine)
inputCursor = 0
// historyIndex = inputHistory.length

View File

@ -12,6 +12,7 @@ import Copy from '@/components/icons/Copy.vue'
import Export from '@/components/icons/Export.vue'
import Delete from '@/components/icons/Delete.vue'
import IconButton from '@/components/common/IconButton.vue'
import Bottom from '@/components/icons/Bottom.vue'
const themeVars = useThemeVars()
@ -118,7 +119,7 @@ const onCleanLog = () => {
<div class="content-log content-container fill-height flex-box-v">
<n-form class="flex-item" label-align="left" label-placement="left" label-width="auto" size="small">
<n-form-item :feedback="$t('monitor.warning')" :label="$t('monitor.actions')">
<n-space>
<n-space :wrap="false" :wrap-item="false" style="width: 100%">
<n-button
v-if="!isMonitoring"
:focusable="false"
@ -153,6 +154,16 @@ const onCleanLog = () => {
t-tooltip="monitor.save_log"
@click="onExportLog" />
</n-button-group>
<icon-button
:icon="Bottom"
:secondary="data.autoShowLast"
:type="data.autoShowLast ? 'primary' : 'default'"
border
size="18"
stroke-width="3.5"
t-tooltip="monitor.always_show_last"
@click="data.autoShowLast = !data.autoShowLast" />
<div class="flex-item-expand" />
<icon-button
:icon="Delete"
border
@ -165,9 +176,6 @@ const onCleanLog = () => {
<n-form-item :label="$t('monitor.search')">
<n-input v-model:value="data.keyword" clearable placeholder="" />
</n-form-item>
<n-form-item :label="$t('monitor.always_show_last')">
<n-switch v-model:value="data.autoShowLast" />
</n-form-item>
</n-form>
<n-virtual-list ref="listRef" :item-size="25" :items="displayList" class="list-wrapper">
<template #default="{ item }">

View File

@ -0,0 +1,294 @@
<script setup>
import { computed, nextTick, onMounted, onUnmounted, reactive, ref } from 'vue'
import { debounce, get, isEmpty, size, uniq } from 'lodash'
import { useI18n } from 'vue-i18n'
import { useThemeVars } from 'naive-ui'
import useBrowserStore from 'stores/browser.js'
import { EventsOff, EventsOn } from 'wailsjs/runtime/runtime.js'
import dayjs from 'dayjs'
import Publish from '@/components/icons/Publish.vue'
import Subscribe from '@/components/icons/Subscribe.vue'
import Pause from '@/components/icons/Pause.vue'
import Delete from '@/components/icons/Delete.vue'
import { Publish as PublishSend, StartSubscribe, StopSubscribe } from 'wailsjs/go/services/pubsubService.js'
import Checked from '@/components/icons/Checked.vue'
import Bottom from '@/components/icons/Bottom.vue'
import IconButton from '@/components/common/IconButton.vue'
const themeVars = useThemeVars()
const browserStore = useBrowserStore()
const i18n = useI18n()
const props = defineProps({
server: {
type: String,
},
})
const data = reactive({
subscribeEvent: '',
list: [],
keyword: '',
autoShowLast: true,
ellipsisMessage: false,
channelHistory: [],
})
const publishData = reactive({
channel: '',
message: '',
received: 0,
lastShowReceived: -1,
})
const tableRef = ref(null)
const columns = computed(() => [
{
title: () => i18n.t('pubsub.time'),
key: 'timestamp',
width: 180,
align: 'center',
titleAlign: 'center',
render: ({ timestamp }, index) => {
return dayjs(timestamp).format('YYYY-MM-DD HH:mm:ss')
},
},
{
title: () => i18n.t('pubsub.channel'),
key: 'channel',
filterOptionValue: data.client,
resizable: true,
filter: (value, row) => {
return value === '' || row.client === value.toString() || row.addr === value.toString()
},
width: 200,
align: 'center',
titleAlign: 'center',
ellipsis: {
tooltip: {
style: {
maxWidth: '50vw',
maxHeight: '50vh',
},
scrollable: true,
},
},
},
{
title: () => i18n.t('pubsub.message'),
key: 'message',
titleAlign: 'center',
filterOptionValue: data.keyword,
resizable: true,
className: 'content-value',
ellipsis: data.ellipsisMessage
? {
tooltip: {
style: {
maxWidth: '50vw',
maxHeight: '50vh',
},
scrollable: true,
},
}
: undefined,
filter: (value, row) => {
return value === '' || !!~row.cmd.indexOf(value.toString())
},
},
])
onMounted(() => {
// try to stop prev subscribe first
onStopSubscribe()
})
onUnmounted(() => {
onStopSubscribe()
})
const isSubscribing = computed(() => {
return !isEmpty(data.subscribeEvent)
})
const publishEnable = computed(() => {
return !isEmpty(publishData.channel)
})
const _scrollToBottom = () => {
nextTick(() => {
tableRef.value?.scrollTo({ position: 'bottom' })
})
}
const scrollToBottom = debounce(_scrollToBottom, 300, { leading: true, trailing: true })
const onStartSubscribe = async () => {
if (isSubscribing.value) {
return
}
const { data: ret, success, msg } = await StartSubscribe(props.server)
if (!success) {
$message.error(msg)
return
}
data.subscribeEvent = get(ret, 'eventName')
EventsOn(data.subscribeEvent, (content) => {
if (content instanceof Array) {
data.list.push(...content)
} else {
data.list.push(content)
}
if (data.autoShowLast) {
scrollToBottom()
}
})
}
const onStopSubscribe = async () => {
const { success, msg } = await StopSubscribe(props.server)
if (!success) {
$message.error(msg)
return
}
EventsOff(data.subscribeEvent)
data.subscribeEvent = ''
}
const onCleanLog = () => {
data.list = []
}
const onPublish = async () => {
if (isEmpty(publishData.channel)) {
return
}
const {
success,
msg,
data: { received = 0 },
} = await PublishSend(props.server, publishData.channel, publishData.message || '')
if (!success) {
publishData.received = 0
if (!isEmpty(msg)) {
$message.error(msg)
}
return
}
publishData.message = ''
publishData.received = received
publishData.lastShowReceived = Date.now()
// save channel history
data.channelHistory = uniq(data.channelHistory.concat(publishData.channel))
// hide send status after 2 seconds
setTimeout(() => {
if (publishData.lastShowReceived > 0 && Date.now() - publishData.lastShowReceived > 2000) {
publishData.lastShowReceived = -1
}
}, 2100)
}
</script>
<template>
<div class="content-log content-container fill-height flex-box-v">
<n-form class="flex-item" label-align="left" label-placement="left" label-width="auto" size="small">
<n-form-item :show-label="false">
<n-space :wrap="false" :wrap-item="false" style="width: 100%">
<n-button
v-if="!isSubscribing"
:focusable="false"
secondary
strong
type="success"
@click="onStartSubscribe">
<template #icon>
<n-icon :component="Subscribe" size="18" />
</template>
{{ $t('pubsub.subscribe') }}
</n-button>
<n-button v-else :focusable="false" secondary strong type="warning" @click="onStopSubscribe">
<template #icon>
<n-icon :component="Pause" size="18" />
</template>
{{ $t('pubsub.unsubscribe') }}
</n-button>
<icon-button
:icon="Bottom"
:secondary="data.autoShowLast"
:type="data.autoShowLast ? 'primary' : 'default'"
border
size="18"
stroke-width="3.5"
t-tooltip="monitor.always_show_last"
@click="data.autoShowLast = !data.autoShowLast" />
<div class="flex-item-expand" />
<icon-button
:icon="Delete"
border
size="18"
stroke-width="3.5"
t-tooltip="pubsub.clear"
@click="onCleanLog" />
</n-space>
</n-form-item>
</n-form>
<n-data-table
ref="tableRef"
:columns="columns"
:data="data.list"
:loading="data.loading"
class="flex-item-expand"
flex-height
size="small"
virtual-scroll />
<div class="total-message">{{ $t('pubsub.receive_message', { total: size(data.list) }) }}</div>
<div class="flex-box-h publish-input">
<n-input-group>
<n-auto-complete
v-model:value="publishData.channel"
:get-show="() => true"
:options="data.channelHistory"
:placeholder="$t('pubsub.channel')"
style="width: 35%; max-width: 200px"
@keydown.enter="onPublish" />
<n-input
v-model:value="publishData.message"
:placeholder="$t('pubsub.message')"
@keydown.enter="onPublish">
<template #suffix>
<transition mode="out-in" name="fade">
<n-tag v-show="publishData.lastShowReceived > 0" bordered size="small" type="success">
<template #icon>
<n-icon :component="Checked" size="16" />
</template>
{{ publishData.received }}
</n-tag>
</transition>
</template>
</n-input>
</n-input-group>
<n-button :disabled="!publishEnable" type="info" @click="onPublish">
<template #icon>
<n-icon :component="Publish" size="18" />
</template>
{{ $t('pubsub.publish') }}
</n-button>
</div>
</div>
</template>
<style lang="scss" scoped>
@import '@/styles/content';
.total-message {
margin: 10px 0 0;
}
.publish-input {
margin: 10px 0 0;
gap: 10px;
}
</style>

View File

@ -215,6 +215,7 @@ const onListLimitChanged = (limit) => {
:loading="data.loading"
class="flex-item-expand"
flex-height
striped
virtual-scroll
@update:sorter="({ order }) => (data.sortOrder = order)" />
</div>

View File

@ -0,0 +1,33 @@
<script setup>
const props = defineProps({
strokeWidth: {
type: [Number, String],
default: 4,
},
})
</script>
<template>
<svg fill="none" viewBox="0 0 48 48" xmlns="http://www.w3.org/2000/svg">
<path
:stroke-width="props.strokeWidth"
d="M24.0083 33.8995V6"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M36 22L24 34L12 22"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M36 42H12"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
</svg>
</template>
<style lang="scss" scoped></style>

View File

@ -11,17 +11,17 @@ const props = defineProps({
<svg fill="none" viewBox="0 0 48 48" xmlns="http://www.w3.org/2000/svg">
<g clip-path="url(#icon-25e88d94353e4f38)">
<path
:stroke-width="props.strokeWidth"
d="M42 20V39C42 40.6569 40.6569 42 39 42H9C7.34315 42 6 40.6569 6 39V9C6 7.34315 7.34315 6 9 6H30"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="4" />
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M16 20L26 28L41 7"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="4" />
stroke-linejoin="round" />
</g>
<defs>
<clipPath id="icon-25e88d94353e4f38">

View File

@ -0,0 +1,27 @@
<script setup>
const props = defineProps({
strokeWidth: {
type: [Number, String],
default: 4,
},
})
</script>
<template>
<svg fill="none" viewBox="0 0 48 48" xmlns="http://www.w3.org/2000/svg">
<path
:stroke-width="props.strokeWidth"
d="M14 24L15.25 25.25M44 14L24 34L22.75 32.75"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M4 24L14 34L34 14"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
</svg>
</template>
<style lang="scss" scoped></style>

View File

@ -0,0 +1,26 @@
<script setup>
const props = defineProps({
strokeWidth: {
type: [Number, String],
default: 3,
},
})
</script>
<template>
<svg fill="none" viewBox="0 0 48 48" xmlns="http://www.w3.org/2000/svg">
<path
:stroke-width="props.strokeWidth"
d="M43 5L29.7 43L22.1 25.9L5 18.3L43 5Z"
stroke="currentColor"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M43.0001 5L22.1001 25.9"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
</svg>
</template>
<style lang="scss" scoped></style>

View File

@ -0,0 +1,45 @@
<script setup>
const props = defineProps({
strokeWidth: {
type: [Number, String],
default: 3,
},
})
</script>
<template>
<svg fill="none" viewBox="0 0 48 48" xmlns="http://www.w3.org/2000/svg">
<path
:stroke-width="props.strokeWidth"
d="M24 28.6292C26.5104 28.6292 28.5455 26.6004 28.5455 24.0979C28.5455 21.5954 26.5104 19.5667 24 19.5667C21.4897 19.5667 19.4546 21.5954 19.4546 24.0979C19.4546 26.6004 21.4897 28.6292 24 28.6292Z"
fill="none"
stroke="currentColor"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M16 15C10.6667 19.9706 10.6667 28.0294 16 33"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M32 33C37.3333 28.0294 37.3333 19.9706 32 15"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M9.85786 10C2.04738 17.7861 2.04738 30.4098 9.85786 38.1959"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
<path
:stroke-width="props.strokeWidth"
d="M38.1421 38.1959C45.9526 30.4098 45.9526 17.7861 38.1421 10"
stroke="currentColor"
stroke-linecap="round"
stroke-linejoin="round" />
</svg>
</template>
<style lang="scss" scoped></style>

View File

@ -386,5 +386,18 @@
"save_log": "Save Log",
"clean_log": "Clean Log",
"always_show_last": "Always To Last Line"
},
"pubsub": {
"title": "Pub/Sub",
"publish": "Publish",
"subscribe": "Subscribe",
"unsubscribe": "Unsubscribe",
"clear": "Clear Message",
"time": "Time",
"filter": "Filter",
"channel": "Channel",
"message": "Message",
"receive_message": "Received Messages {total}",
"always_show_last": "Always To Last Line"
}
}

View File

@ -136,7 +136,7 @@
"cli": "命令行",
"slow_log": "慢日志",
"cmd_monitor": "监控命令",
"pub_message": "推送/订阅"
"pub_message": "发布/订阅"
}
},
"ribbon": {
@ -386,5 +386,18 @@
"save_log": "保存日志",
"clean_log": "清空日志",
"always_show_last": "总是显示最新"
},
"pubsub": {
"title": "发布订阅",
"publish": "发布",
"subscribe": "开启订阅",
"unsubscribe": "取消订阅",
"clear": "清空消息",
"time": "时间",
"filter": "筛选",
"channel": "频道",
"message": "消息",
"receive_message": "已接收消息 {total} 条",
"always_show_last": "总是显示最新"
}
}

View File

@ -15,6 +15,7 @@
.content-value {
user-select: text;
cursor: text;
}
.tab-content {