185 lines
3.9 KiB
Go
185 lines
3.9 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/wailsapp/wails/v2/pkg/runtime"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
"tinyrdm/backend/types"
|
|
)
|
|
|
|
type pubsubItem struct {
|
|
client *redis.Client
|
|
pubsub *redis.PubSub
|
|
mutex sync.Mutex
|
|
closeCh chan struct{}
|
|
eventName string
|
|
}
|
|
|
|
type subMessage struct {
|
|
Timestamp int64 `json:"timestamp"`
|
|
Channel string `json:"channel"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type pubsubService struct {
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
mutex sync.Mutex
|
|
items map[string]*pubsubItem
|
|
}
|
|
|
|
var pubsub *pubsubService
|
|
var oncePubsub sync.Once
|
|
|
|
func Pubsub() *pubsubService {
|
|
if pubsub == nil {
|
|
oncePubsub.Do(func() {
|
|
pubsub = &pubsubService{
|
|
items: map[string]*pubsubItem{},
|
|
}
|
|
})
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
func (p *pubsubService) getItem(server string) (*pubsubItem, error) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
item, ok := p.items[server]
|
|
if !ok {
|
|
var err error
|
|
conf := Connection().getConnection(server)
|
|
if conf == nil {
|
|
return nil, fmt.Errorf("no connection profile named: %s", server)
|
|
}
|
|
var uniClient redis.UniversalClient
|
|
if uniClient, err = Connection().createRedisClient(conf.ConnectionConfig); err != nil {
|
|
return nil, err
|
|
}
|
|
var client *redis.Client
|
|
if client, ok = uniClient.(*redis.Client); !ok {
|
|
return nil, errors.New("create redis client fail")
|
|
}
|
|
item = &pubsubItem{
|
|
client: client,
|
|
}
|
|
p.items[server] = item
|
|
}
|
|
return item, nil
|
|
}
|
|
|
|
func (p *pubsubService) Start(ctx context.Context) {
|
|
p.ctx, p.ctxCancel = context.WithCancel(ctx)
|
|
}
|
|
|
|
// Publish publish message to channel
|
|
func (p *pubsubService) Publish(server, channel, payload string) (resp types.JSResp) {
|
|
rdb, err := Browser().getRedisClient(server, -1)
|
|
if err != nil {
|
|
resp.Msg = err.Error()
|
|
return
|
|
}
|
|
|
|
var received int64
|
|
received, err = rdb.client.Publish(p.ctx, channel, payload).Result()
|
|
if err != nil {
|
|
resp.Msg = err.Error()
|
|
return
|
|
}
|
|
|
|
resp.Success = true
|
|
resp.Data = struct {
|
|
Received int64 `json:"received"`
|
|
}{
|
|
Received: received,
|
|
}
|
|
return
|
|
}
|
|
|
|
// StartSubscribe start to subscribe a channel
|
|
func (p *pubsubService) StartSubscribe(server string) (resp types.JSResp) {
|
|
item, err := p.getItem(server)
|
|
if err != nil {
|
|
resp.Msg = err.Error()
|
|
return
|
|
}
|
|
|
|
item.closeCh = make(chan struct{})
|
|
item.eventName = "sub:" + strconv.Itoa(int(time.Now().Unix()))
|
|
item.pubsub = item.client.PSubscribe(p.ctx, "*")
|
|
|
|
go p.processSubscribe(&item.mutex, item.pubsub.Channel(), item.closeCh, item.eventName)
|
|
resp.Success = true
|
|
resp.Data = struct {
|
|
EventName string `json:"eventName"`
|
|
}{
|
|
EventName: item.eventName,
|
|
}
|
|
return
|
|
}
|
|
|
|
func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Message, closeCh <-chan struct{}, eventName string) {
|
|
lastEmitTime := time.Now().Add(-1 * time.Minute)
|
|
cache := make([]subMessage, 0, 1000)
|
|
for {
|
|
select {
|
|
case data := <-ch:
|
|
go func() {
|
|
timestamp := time.Now().UnixMilli()
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
cache = append(cache, subMessage{
|
|
Timestamp: timestamp,
|
|
Channel: data.Channel,
|
|
Message: data.Payload,
|
|
})
|
|
if time.Now().Sub(lastEmitTime) > 300*time.Millisecond || len(cache) > 300 {
|
|
runtime.EventsEmit(p.ctx, eventName, cache)
|
|
cache = cache[:0:cap(cache)]
|
|
lastEmitTime = time.Now()
|
|
}
|
|
}()
|
|
|
|
case <-closeCh:
|
|
// subscribe stopped
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// StopSubscribe stop subscribe by server name
|
|
func (p *pubsubService) StopSubscribe(server string) (resp types.JSResp) {
|
|
p.mutex.Lock()
|
|
defer p.mutex.Unlock()
|
|
|
|
item, ok := p.items[server]
|
|
if !ok || item.pubsub == nil {
|
|
resp.Success = true
|
|
return
|
|
}
|
|
|
|
//item.pubsub.Unsubscribe(p.ctx, "*")
|
|
item.pubsub.Close()
|
|
close(item.closeCh)
|
|
delete(p.items, server)
|
|
resp.Success = true
|
|
return
|
|
}
|
|
|
|
// StopAll stop all subscribe
|
|
func (p *pubsubService) StopAll() {
|
|
if p.ctxCancel != nil {
|
|
p.ctxCancel()
|
|
}
|
|
|
|
for server := range p.items {
|
|
p.StopSubscribe(server)
|
|
}
|
|
}
|