From 16fea2b6ccd75b8dae0221866291ea55705f2ef1 Mon Sep 17 00:00:00 2001 From: Lykin <137850705+tiny-craft@users.noreply.github.com> Date: Tue, 8 Apr 2025 16:39:20 +0800 Subject: [PATCH] pref: redesign the throttle of pub/sub --- backend/services/pubsub_service.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/backend/services/pubsub_service.go b/backend/services/pubsub_service.go index 4929751..5108840 100644 --- a/backend/services/pubsub_service.go +++ b/backend/services/pubsub_service.go @@ -120,8 +120,10 @@ func (p *pubsubService) StartSubscribe(server string) (resp types.JSResp) { } func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Message, closeCh <-chan struct{}, eventName string) { - lastEmitTime := time.Now().Add(-1 * time.Minute) cache := make([]subMessage, 0, 1000) + ticker := time.NewTicker(300 * time.Millisecond) + defer ticker.Stop() + for { select { case data := <-ch: @@ -134,10 +136,19 @@ func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Mes Channel: data.Channel, Message: data.Payload, }) - if time.Now().Sub(lastEmitTime) > 300*time.Millisecond || len(cache) > 300 { + if len(cache) > 300 { + runtime.EventsEmit(p.ctx, eventName, cache) + cache = cache[:0:cap(cache)] + } + }() + + case <-ticker.C: + func() { + mutex.Lock() + defer mutex.Unlock() + if len(cache) > 0 { runtime.EventsEmit(p.ctx, eventName, cache) cache = cache[:0:cap(cache)] - lastEmitTime = time.Now() } }()