mirror of
https://github.com/tiny-craft/tiny-rdm.git
synced 2025-04-13 01:58:05 +08:00
pref: redesign the throttle of pub/sub
This commit is contained in:
parent
84d1141f8c
commit
16fea2b6cc
@ -120,8 +120,10 @@ func (p *pubsubService) StartSubscribe(server string) (resp types.JSResp) {
|
||||
}
|
||||
|
||||
func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Message, closeCh <-chan struct{}, eventName string) {
|
||||
lastEmitTime := time.Now().Add(-1 * time.Minute)
|
||||
cache := make([]subMessage, 0, 1000)
|
||||
ticker := time.NewTicker(300 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case data := <-ch:
|
||||
@ -134,10 +136,19 @@ func (p *pubsubService) processSubscribe(mutex *sync.Mutex, ch <-chan *redis.Mes
|
||||
Channel: data.Channel,
|
||||
Message: data.Payload,
|
||||
})
|
||||
if time.Now().Sub(lastEmitTime) > 300*time.Millisecond || len(cache) > 300 {
|
||||
if len(cache) > 300 {
|
||||
runtime.EventsEmit(p.ctx, eventName, cache)
|
||||
cache = cache[:0:cap(cache)]
|
||||
}
|
||||
}()
|
||||
|
||||
case <-ticker.C:
|
||||
func() {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
if len(cache) > 0 {
|
||||
runtime.EventsEmit(p.ctx, eventName, cache)
|
||||
cache = cache[:0:cap(cache)]
|
||||
lastEmitTime = time.Now()
|
||||
}
|
||||
}()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user