From 2a0052dda66fa9529d379bd630202f6386ee1a8c Mon Sep 17 00:00:00 2001 From: Michal Horejsek Date: Thu, 10 Dec 2020 13:00:16 +0100 Subject: [PATCH] Fix listener locking --- pkg/listener/listener.go | 15 ++++++++++++--- unreleased.md | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 387197a3..7bb693d0 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -59,6 +59,9 @@ func New() Listener { // is emitted within last time duration (`limit`), event is dropped. Zero limit clears // the limit for the specific `eventName`. func (l *listener) SetLimit(eventName string, limit time.Duration) { + l.lock.Lock() + defer l.lock.Unlock() + if limit == 0 { delete(l.limits, eventName) return @@ -95,13 +98,13 @@ func (l *listener) Remove(eventName string, channel chan<- string) { // Emit emits an event in parallel to all listeners (channels). func (l *listener) Emit(eventName string, data string) { + l.lock.Lock() + defer l.lock.Unlock() + l.emit(eventName, data, false) } func (l *listener) emit(eventName, data string, isReEmit bool) { - l.lock.RLock() - defer l.lock.RUnlock() - if !l.shouldEmit(eventName, data) { log.Warn("Emit of ", eventName, " with data ", data, " skipped") return @@ -159,12 +162,18 @@ func (l *listener) clearLastEmits() { } func (l *listener) SetBuffer(eventName string) { + l.lock.Lock() + defer l.lock.Unlock() + if _, ok := l.buffered[eventName]; !ok { l.buffered[eventName] = []string{} } } func (l *listener) RetryEmit(eventName string) { + l.lock.Lock() + defer l.lock.Unlock() + if _, ok := l.channels[eventName]; !ok || len(l.channels[eventName]) == 0 { return } diff --git a/unreleased.md b/unreleased.md index e422297a..a578d882 100644 --- a/unreleased.md +++ b/unreleased.md @@ -16,3 +16,4 @@ Changelog [format](http://keepachangelog.com/en/1.0.0/) ### Changed * GODT-858 Bump go-rfc5322 dependency to v0.4.0 to handle some invalid RFC5322 groups. +* GODT-923 Fix listener locking.