1
0
mirror of https://github.com/duke-git/lancet.git synced 2026-02-04 12:52:28 +08:00

feat: add eventbus

This commit is contained in:
dudaodong
2025-01-14 15:57:43 +08:00
parent 7653afa919
commit cb308f628c
4 changed files with 466 additions and 61 deletions

185
eventbus/eventbus.go Normal file
View File

@@ -0,0 +1,185 @@
// Copyright 2025 dudaodong@gmail.com. All rights reserved.
// Use of this source code is governed by MIT license
// Package eventbus implements a simple event bus.
package eventbus
import (
"fmt"
"sort"
"sync"
)
// Event is the struct that is passed to the event listener, now it directly uses the generic Payload type.
type Event[T any] struct {
Topic string
Payload T
}
// EventBus is the struct that holds the listeners and the error handler.
type EventBus[T any] struct {
// listeners map[string][]*EventListener[T]
listeners sync.Map
mu sync.RWMutex
errorHandler func(err error)
}
// EventListener is the struct that holds the listener function and its priority.
type EventListener[T any] struct {
priority int
listener func(eventData T)
async bool
filter func(eventData T) bool
}
// NewEventBus creates a new EventBus.
func NewEventBus[T any]() *EventBus[T] {
return &EventBus[T]{
listeners: sync.Map{},
}
}
// Subscribe subscribes to an event with a specific event topic and listener function.
func (eb *EventBus[T]) Subscribe(topic string, listener func(eventData T), async bool, priority int, filter func(eventData T) bool) {
eb.mu.Lock()
defer eb.mu.Unlock()
el := &EventListener[T]{
priority: priority,
listener: listener,
async: async,
filter: filter,
}
listenersInterface, _ := eb.listeners.LoadOrStore(topic, []*EventListener[T]{})
listeners := listenersInterface.([]*EventListener[T])
listeners = append(listeners, el)
sort.Slice(listeners, func(i, j int) bool {
return listeners[i].priority > listeners[j].priority
})
eb.listeners.Store(topic, listeners)
}
// Unsubscribe unsubscribes from an event with a specific event topic and listener function.
func (eb *EventBus[T]) Unsubscribe(topic string, listener func(eventData T)) {
eb.mu.Lock()
defer eb.mu.Unlock()
listenersInterface, ok := eb.listeners.Load(topic)
if !ok {
return
}
listeners := listenersInterface.([]*EventListener[T])
listenerPtr := fmt.Sprintf("%p", listener)
var updatedListeners []*EventListener[T]
for _, l := range listeners {
if fmt.Sprintf("%p", l.listener) != listenerPtr {
updatedListeners = append(updatedListeners, l)
}
}
eb.listeners.Store(topic, updatedListeners)
}
// Publish publishes an event with a specific event topic and data payload.
func (eb *EventBus[T]) Publish(event Event[T]) {
eb.mu.RLock()
defer eb.mu.RUnlock()
listenersInterface, exists := eb.listeners.Load(event.Topic)
if !exists {
return
}
listeners := listenersInterface.([]*EventListener[T])
for _, listener := range listeners {
if listener.filter != nil && !listener.filter(event.Payload) {
continue
}
if listener.async {
go eb.publishToListener(listener, event)
} else {
eb.publishToListener(listener, event)
}
}
}
func (eb *EventBus[T]) publishToListener(listener *EventListener[T], event Event[T]) {
defer func() {
if r := recover(); r != nil && eb.errorHandler != nil {
eb.errorHandler(fmt.Errorf("%v", r))
}
}()
listener.listener(event.Payload)
}
// SetErrorHandler sets the error handler function.
func (eb *EventBus[T]) SetErrorHandler(handler func(err error)) {
eb.errorHandler = handler
}
// ClearListeners clears all the listeners.
func (eb *EventBus[T]) ClearListeners() {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.listeners = sync.Map{}
}
// ClearListenersByTopic clears all the listeners by topic.
func (eb *EventBus[T]) ClearListenersByTopic(topic string) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.listeners.Delete(topic)
}
// GetListenersCount returns the number of listeners for a specific event topic.
func (eb *EventBus[T]) GetListenersCount(topic string) int {
eb.mu.RLock()
defer eb.mu.RUnlock()
listenersInterface, ok := eb.listeners.Load(topic)
if !ok {
return 0
}
listeners := listenersInterface.([]*EventListener[T])
return len(listeners)
}
// GetAllListenersCount returns the total number of listeners.
func (eb *EventBus[T]) GetAllListenersCount() int {
eb.mu.RLock()
defer eb.mu.RUnlock()
count := 0
eb.listeners.Range(func(key, value interface{}) bool {
count += len(value.([]*EventListener[T]))
return true
})
return count
}
// GetEvents returns all the events topics.
func (eb *EventBus[T]) GetEvents() []string {
eb.mu.RLock()
defer eb.mu.RUnlock()
var events []string
eb.listeners.Range(func(key, value interface{}) bool {
events = append(events, key.(string))
return true
})
return events
}

220
eventbus/eventbus_test.go Normal file
View File

@@ -0,0 +1,220 @@
package eventbus
import (
"sync"
"testing"
"time"
"github.com/duke-git/lancet/v2/internal"
)
func TestEventBus_Subscribe(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_Subscribe")
eb := NewEventBus[int]()
eb.Subscribe("event1", func(eventData int) {
assert.Equal(1, eventData)
}, false, 0, nil)
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
}
func TestEventBus_Unsubscribe(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_Unsubscribe")
eb := NewEventBus[int]()
receivedData := 0
listener := func(eventData int) {
receivedData = eventData
}
eb.Subscribe("event1", listener, false, 0, nil)
eb.Unsubscribe("event1", listener)
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
assert.Equal(0, receivedData)
}
func TestEventBus_Subscribe_WithFilter(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_Subscribe_WithFilter")
eb := NewEventBus[int]()
receivedData := 0
listener := func(eventData int) {
receivedData = eventData
}
filter := func(eventData int) bool {
return eventData == 1
}
eb.Subscribe("event1", listener, false, 0, filter)
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
eb.Publish(Event[int]{Topic: "event1", Payload: 2})
assert.Equal(1, receivedData)
}
func TestEventBus_Subscribe_WithPriority(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_Subscribe_WithPriority")
eb := NewEventBus[int]()
var receivedData []int
listener1 := func(eventData int) {
receivedData = append(receivedData, 1)
}
listener2 := func(eventData int) {
receivedData = append(receivedData, 2)
}
eb.Subscribe("event1", listener1, false, 1, nil)
eb.Subscribe("event1", listener2, false, 2, nil)
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
assert.Equal([]int{2, 1}, receivedData)
}
func TestEventBus_Subscribe_Async(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_Subscribe_Async")
eb := NewEventBus[string]()
var wg sync.WaitGroup
wg.Add(1)
eb.Subscribe("event1", func(eventData string) {
time.Sleep(100 * time.Millisecond)
assert.Equal("hello", eventData)
wg.Done()
}, true, 1, nil)
eb.Publish(Event[string]{Topic: "event1", Payload: "hello"})
wg.Wait()
}
func TestEventBus_ErrorHandler(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_ErrorHandler")
eb := NewEventBus[string]()
eb.SetErrorHandler(func(err error) {
assert.Equal("error", err.Error())
})
eb.Subscribe("event1", func(eventData string) {
panic("error")
}, false, 0, nil)
eb.Publish(Event[string]{Topic: "event1", Payload: "hello"})
}
func TestEventBus_ClearListeners(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_ClearListeners")
eb := NewEventBus[int]()
receivedData1 := 0
receivedData2 := 0
eb.Subscribe("event1", func(eventData int) {
receivedData1 = eventData
}, false, 0, nil)
eb.Subscribe("event2", func(eventData int) {
receivedData2 = eventData
}, false, 0, nil)
eb.ClearListeners()
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
eb.Publish(Event[int]{Topic: "event1", Payload: 2})
assert.Equal(0, receivedData1)
assert.Equal(0, receivedData2)
}
func TestEventBus_ClearListenersByTopic(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_ClearListenersByTopic")
eb := NewEventBus[int]()
receivedData1 := 0
receivedData2 := 0
eb.Subscribe("event1", func(eventData int) {
receivedData1 = eventData
}, false, 0, nil)
eb.Subscribe("event2", func(eventData int) {
receivedData2 = eventData
}, false, 0, nil)
eb.ClearListenersByTopic("event1")
eb.Publish(Event[int]{Topic: "event1", Payload: 1})
eb.Publish(Event[int]{Topic: "event2", Payload: 2})
assert.Equal(0, receivedData1)
assert.Equal(2, receivedData2)
}
func TestEventBus_GetListenersCount(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_GetListenersCount")
eb := NewEventBus[int]()
eb.Subscribe("event1", func(eventData int) {}, false, 0, nil)
eb.Subscribe("event1", func(eventData int) {}, false, 0, nil)
eb.Subscribe("event2", func(eventData int) {}, false, 0, nil)
assert.Equal(2, eb.GetListenersCount("event1"))
assert.Equal(1, eb.GetListenersCount("event2"))
}
func TestEventBus_GetAllListenersCount(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_GetAllListenersCount")
eb := NewEventBus[int]()
eb.Subscribe("event1", func(eventData int) {}, false, 0, nil)
eb.Subscribe("event1", func(eventData int) {}, false, 0, nil)
eb.Subscribe("event2", func(eventData int) {}, false, 0, nil)
assert.Equal(3, eb.GetAllListenersCount())
}
func TestEventBus_GetEvents(t *testing.T) {
t.Parallel()
assert := internal.NewAssert(t, "TestEventBus_GetEvents")
eb := NewEventBus[int]()
eb.Subscribe("event1", func(eventData int) {}, false, 0, nil)
eb.Subscribe("event2", func(eventData int) {}, false, 0, nil)
events := eb.GetEvents()
assert.Equal(2, len(events))
assert.Equal("event1", events[0])
assert.Equal("event2", events[1])
}