diff --git a/cryptor/rsa_private_example.pem b/cryptor/rsa_private_example.pem index bb8a8a3..8cd50d3 100644 --- a/cryptor/rsa_private_example.pem +++ b/cryptor/rsa_private_example.pem @@ -1,51 +1,51 @@ -----BEGIN rsa private key----- -MIIJKAIBAAKCAgEA3aIM62NSgImSuiZwlvnVY9U+nJbq/77e5TMQa8h0OiY8d/c4 -OfyCHM2vQIlxl28a3WpbUkI3kebkDEefLDoBxaL1+A+x8S5Lvy0zfNk/Eo2vxNmJ -OnWF8FLAPFhZq7DzGIGsmgDHe9uXDhsbosfq34chnYOQuZ9/mFy9+Jv8axI+Cgbr -d0N6MVlZ50LEQ3x5SY0xVnCPDxPFKPumvZNeD3prp5BvX4v3IoA5Xx1Kkm2tM4yI -kJrfZFX+B+BGjpyCt6HU+yz+i7gtSjtZ5IvpnpvsJ0SseSz9AXCIfofFxqpO40Vr -GRGK3bKyTDaZjvybt7+5IMb4dUyzKMdjujjLStKP0S+W2phEaNWHLEzXA3AXOJ+n -RN6oplso0BNtyiC9sWKSJUWbUqmdZXmbVuB6vZDb5Ae9KLgeHeb37HUdFbNs5rus -pfdoDaXZzUI2zM2M8A0NRzxQJUGbDxqyPvPUEr84O1P6A6WQ1JlOpljGB6fprgF7 -Su/HHYesEYPbTUoV/WrcYTVdoOiC7K5jNgRVapbng8NNDvaQvsJwKgeRDonzxsMG -21Iqd0/ekaRp+t8BwX6kHJtkadGUkF5aDoWecnyk8baN/JAVuGv4IN8K7dtKBOTm -KxPFtyPHAl6rUlc/oKnQ/36MoRTXJwiqRCGnaSrjaU++9M2VC6zBecCERYECAwEA -AQKCAgEAjdRM9jlKK41eQxekR0k7gDaPab++RMkNdJj38jGGB0w+t/qRlbH8RZhu -hRsvgNwN0hFkvUA4tXqPBziyKKg6SBJf202X7qJUwNOZNlUD4sie6ZbYFXvtqXwb -HsLfJ1sGRfF91dOX1LASe2lnhwTuTfr4zQbLj639BjCbNUQFBTPYVaxV9K1OvdPT -D4YPeKxoJWRgZVOEiP561h4sdvaeY8NQrxtj2j4EeaSakj55YTkkdG+DWR5yxI+v -D7U7EboggIjkdZQ2lIzZFr7iaLoMV36qYfq1cJoUkl5ESsxyCQ8lipT600EBn5vi -M5lhLTqEH9NmEg6iItZhdEAclqgPlvI4wXjdqwM080zYwlcWGM5tu7c0C+kPryC/ -PnUTxGx+IDh92H/iuedoPuYKB3cQtMCFsmo7MyamSJXGhU/cxsSHNZR6P5mM2X8j -Kg25x95m+hiM8NoqlhAsDVKd/yb1WeoZSjwdQWllLSp98jgKeuG8bK7Ww9ZY/brv -Pp6GRivRce6WHwgKwRv0Srt8VIzdpT+/DZCkgxkUaBRHMN1Gd4QaFqc9d0ZQmgnN -oJoVL0FbWMiRHnbQOF0Y91oBut8vGYEIvOujqEhwvSsLus0oQpHGn7YpOxgtr3Nl -LJiQhwVJ7Ye/7u8A1BQR6BuO6DC0YE1KwdFGeomcupvD9Ke1NJECggEBAPMgsMpq -ykI+kesCeTljGmg4o+ApbH+pGlySUWvLraBXfKr17t7k+4q5t8BzuITK0IFdtRxc -mI5HM5A6UcImmq5NVWVckn0UoYDY6dilccqIX7OBjNBPz85DjzA6QNddOXxVjOqq -LbTZ01dCszbHxJDe2SJ9MGIMbgqYosKREh5M+T8vsAZsBtZgXfLqjjDS9QRWmOAx -KWUAJsf4IgbbZ2+GVOjs2qw9BdRo3tvXxQTjfOBO0Jpoe3COTuLiJDMF1FclbETt -B339JImevoAQCGxVJAcDuI+Qhz+j+NSEpJlinkI0V+l5F6BTnQeQopifllPO0Q01 -3fjr6uhKeWMmZ40CggEBAOleBlOu2eBAHk+YiWXqosBAuNFK59/2TEiH3KmDDoP1 -71WPQOh0H9ERNp5F0ZCdZrOit3sbVYvxJ86KBJpZvEuhx6EED6uL7xcAsqv5sHfW -z4F3Z7qbdkrHgceYEjM3n4aiOEz0js4hq7LkfpyT4VZrWmivrJAnU2u7+1u+JNE3 -AfXyKL5TCNyyfPVhbFJ8aM5znB2UZ9IKFSnMZrxRMuPMVzMOUn5JyLUaVGXCW4LT -bL3FGXZX8X8K2wTb7Pp7zVxkDyZvU0fkLrQwx9vIwK++jIP0AdzaV4r9HM5Iynsb -8Z1DoNHCN7KIF3bMoqeaQUDSTCQ7oRaNhh3xA4dYbsUCggEAQ0fQpLNYtWxLRRWy -Jkdej2jdMLNF6y4ItYVoMsRyj+SmA0l7iQMk+Qbb6s4bSeQ2PxaHgAm/zd+2TTtW -VLwKIiIUd7BeeW60IsvkKqfeDYYftbUsGpl7kEDx5w630uFhfx7NmELv0xRUf9ld -btNpeg2xWPH76aY27Ye/wsgSk4AJmYrA04YhfkG8vfRa1PgMBd9Q/vmb0u9vy/bG -s88TmLE73hltiix46Ib85SmYw/mQHSKyZ4hyYHuBKRgbnGMIl/UrOQe/AwaCjfL4 -FMhbDF+jUK2e7Vu5kcr2mRj709aOpROHIHz6JMv+sJE97a58E0UwZM97Vd8zaoTx -gpamIQKCAQArrWdtvioVKKsDpr8AjjvL09FDist/RW/dm2AXceoDlMIot1kkqKdT -z+7zDIo+kNcqA+hnaCRIvuf+ZiKaaPUvCqZ8YnA0YUpsebr3KRJ4O4I27wxBBtvK -/zAxFStC3sRCxJXZAWTA+9hQ8ScpUxw3unv/X/HiQRoB7fsLnrjxV2RMjfhGNvBP -rjBpFMTbY2GSUl0DxETyMOTpH9KSqHfn3tTrP2D9Nf4Ut0rYiNnr0Hpnwj4Twj32 -0ydO74KZFxbGlgun2+owaGq9WuvtHNPDkNxnzgGTPmJoJxt/GGydQgukrYWp/LnD -9mi92WsQB3TzFukdVvO9btuNOxC4AjspAoIBAFeJ8ND4aVzHHfh9BJKTytlFMNXY -Z7VrCNxXN5ab1L2DL1JeUWAF0pwQifkcSUWB0nXvf5+zfhclIOkCOQSjQgVEZbF1 -uma21nrLd+EgrqaF3XtBIiYfZCiGUcI2dj+WT15ZBiSEqX5VbhTMBD37GyvXIZBp -Gs95zje9JJ8kDtaTLYf08FtveLAyPwt4WaQnNY+5dE6wHQij+ueMgNlnxy1i0CRK -xzrxTxksQkdJ0BQTiRlAsqwTpEI8toCeeUZk3zOcX4maxjlabiGp7ZAgqyEkP8wA -guchSFfon1+Ukf6jnOhGNFVZwL2cXxlDiZGozTdztLc5GF2IQ/NagL7eyyo= +MIIJKAIBAAKCAgEAw31V78b2gezAzuWcjAqseqWAib2AwuUpf9UdBTD0NVjxWR68 +SNT09BXaUW6OGdGl0QgNOoalYlj/sk9zsHHIdEDtez/RJCflBUE6knSquB5Dmyge +lVVJ0JgJO95j6ivwuVLl2Ub/TpGoc2mkN8PiqJG6GU6Nd8T7YWEtGH4zpI7Osn5V +WEkKGyXZ3YHkoi5OxtDxJ4EERPhr/n5L7Ah4rOZUq8A+2qje8/c1SKwxzmAwiHqX +3haYcCtXqesd0jG5TmeEWkmjEXyRrPbVjLVzEhQoSIxy+L76gH5OfPjmVR5UzhV0 +PZdHZbxJEpwQT0yjMn8HrUlWHVRNVijzYJNVlvezdJCM1UQm6WzDJeVAnwntw9FJ +VgBON8eXG/48kZC+s/tBHJG18P2dh19C5lmMTrMh5LhB4E9+EVBW7Prb79sNMThq +BSmi6qeeEPPsBo7NbEi7AkNu/y2uJNl0nT1aPsFRlXyx1wHLSg1lI9RNNgXJpXdO +/rxA7ieXQHz4QExXxOdkUgvqDSRSA/3nz2jHU8BD5t0PX6GDLPRuiJtG2Nyz3LlX +07LBeGtP6LxZLBaCAb+WLlw294qSWFDAr8bAlenRJYPzH4WXqlgJ4luArF5dI1sf +q3XPjatI6N0Ytm7XCD4jHRFnXm0SwA+Xc5HCIb4y4AfQbSlkmJu5FhSajsMCAwEA +AQKCAgA6ORciPfDnS1s1g993OiqasLYyz/UMlvIgN5nYFPd2+BGRyHyteHeoRuXq +APJjsDQhqLM/7iBNxVIKim1bbNgV/8x7O8uPaC5mvUGzovnBsonSKUXKlkdQ0CdX +5Gl/siyVFaKNbv7VKL6x+RHG5hmYuSZjKSkrNcjk/OtQcy3wYfQf506F5+MRqNlV +hr1QrFgcENjyOhPKcu6/MmBsYS2H2WX4bV6LA2a75LHC/j4rSCzMSS74H2Sxa8dF +PVh5ZQPa3SvnNC1UJPTCPhXmPZiTFtvUl7gaa87x/CxmPj9jtI/vfSZMvcSvAUAG +QeL9q0aj4H+p1cDHJXePJM0vw1HnCdjP0p6gCUaShoeOyyKfuzMTnGWEeez3TUS3 +iaxjTrIPO7XhZKEszhPU+LZQ//G7pnN39qKd5edQaSUxQtg+Oro6NLUrK7MjsQH0 +K/5RDtraR4Cj0CcuzP5aAqNAixTkQWxyVA4pVtiLT5hKxGfWO7BSkZw21ciucPU7 +SKDzVAUvGug8+2T3MpmFEkCi2KHIyH6RnaFJFQ0D+v9nQ6sYzjysOgc5UZGG/FfH +QYyj6mOCZf0PfBpTivEmObQBzX+kgv2Rt/qAURPBew0bNNyMD1N1R8lq/xxDEtC2 +ARoulvxUDxWyCnPTG/wyRs2PWv1C1+HmC9piJaJy+yxODN9qKQKCAQEA94S4bKZX +m5F+sofMT3DaCaHTW8koV6jcFiXqUosIJsqPis+FbEoEOU4A7TRGsdmgHnoc456u +IVhKOyVyrs4dygUzx+GDdt3ydLr0aiaBa7Xi04922+zSbyKrgWyo2ao9EmQGYgkn +n9jdiCM8FIYqDJ19eGmlBt0XALi6NAWLx6LgJMG+HpI1NACofpGhCff3XWQv3m0U +Vjr3gRIiosjH5buCWpBY72eGSCMuKo3+CqYnWTt1aPYYzxFf/SgqlP4Qumv4+9G/ +MHj23hTeDyOhlJQ9JdXTpDQxkZytx7GFwoyZ8ihaOTmJ137CsHuwNs82XUqS9Oqp +lqOoDzmiHo2+xwKCAQEAyjA1S31tfQ8GlwCxipDyyhfVjZQ1ECxdihHEy15EZkAU +4ymwtu4GMcAiMnud2tapswd3He6W8HMnkSOmE22WpBNL3uUGQfWmG35vY5YN/opY +/dV6G5ixz6U015aYfDiFR4RI5AN9uj3V5rJ4YHxsu2eXixOQN17GYcZgOc9Oo10f +nwJSaEL4FyFLL09mL6N/WSnyWju/lYpSwBTFg/8aTGyJVyqWmMSAXN9hF35iCRX7 +xaUiGC0BPFB+Q2HPKKOHQx6n94sEZh3XXNwAlkkEEyVKGurTGnkIxud2YYJr+pCO +9j9JndtC7Ubk0cdc/sq1sfotpwsyBJ0tzYU4iGYkJQKCAQEAgfOEgEa1KFAzHLh/ +ZBXz46q6tS3fqGZXZqlxZPTpyrP+uPpLKpeRsCRsCB2yIB4KGvYxxK/C7G7WSkdW +9r/wk+g3T3Hu+7F7Yh3lxPWCbXKJLFneJYko0oB5WvIZlG30snshfscfrqTFanxV +RtFJsmXdQiL1Ka1aLl7OJBHlWWMCG0o87wAEPtE5pXn/+qPdk4LOPxE15DABUhqb +ZKvVh5/yyGmQx+Cqd6VAz8bM4fpqHkdBVWIflFei9d7ovGDHl5iomE1++4wG0IJU +M86ZPtC1ae/htgxRGgoQkw95ErOO/7x/4ht7a1B6Fv5Zfe08ta45iazKbPRvf87a +dwAkcQKCAQAhUnqmG2DdGIncfyfDj4oEFpOChueL59Puumj1n26kb3FB0xAlxerN +bj30lfz5ISHMuLFABJ9pt4DQ4dAwKYKHPgrmDFbQrhyXMWpIPmXGCqUKCJHcVHkR +oMeptE8jQeltVPbyZUTmWeST5dwDAt6ksDkAmdPN3FfTq28Wd/0c3W8ltKtCNa9P +mJQckvhjZ43IcypXBrvXeHmU4JeA3p1F9SlZbVJukQwawlHtHk4n8zCQbVvGIjyu +5RE6J7eNmg3j2PyTxpyN++MPkc6i9huu0Bl7gDIUjdgNd/5Bqc4D+KknGl0WoBlS +o7l4fWPtNhll4LZ/bk/rBMCUu50KldLlAoIBAH9mnRqvMYisUMpy0suOpL6OqV9L +Ys7SjT2VgP/g//4/ykN2XPXbquf2E9u5pQZPs8eXvRtTN3c78KV894f3sRFJV/YT +pZjJB0cvyDgBZO9sHQ9tBRb4V4bWjhOI+7qYfOtxuCaTghlAVsTZOCZ6W44r96hm +I5uEHQrLQmc7UUnUVE+LCyNq07CITssHBUWgHAsFUo5zrj4YWtqc5u5h3ZqhvAhk +XcXh3PSuNMaZPe8V5BHFXdZKRH1+yuuv8ADiSG49lUamkKilhIPeMiELve2O8zx8 +FUULEeqffZ6Cp75CFd9OvwXWxAef9H6Btku9otZ9731yYLBzeYhFKkhZHDI= -----END rsa private key----- diff --git a/cryptor/rsa_public_example.pem b/cryptor/rsa_public_example.pem index 066e325..2a560f7 100644 --- a/cryptor/rsa_public_example.pem +++ b/cryptor/rsa_public_example.pem @@ -1,14 +1,14 @@ -----BEGIN rsa public key----- -MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA3aIM62NSgImSuiZwlvnV -Y9U+nJbq/77e5TMQa8h0OiY8d/c4OfyCHM2vQIlxl28a3WpbUkI3kebkDEefLDoB -xaL1+A+x8S5Lvy0zfNk/Eo2vxNmJOnWF8FLAPFhZq7DzGIGsmgDHe9uXDhsbosfq -34chnYOQuZ9/mFy9+Jv8axI+Cgbrd0N6MVlZ50LEQ3x5SY0xVnCPDxPFKPumvZNe -D3prp5BvX4v3IoA5Xx1Kkm2tM4yIkJrfZFX+B+BGjpyCt6HU+yz+i7gtSjtZ5Ivp -npvsJ0SseSz9AXCIfofFxqpO40VrGRGK3bKyTDaZjvybt7+5IMb4dUyzKMdjujjL -StKP0S+W2phEaNWHLEzXA3AXOJ+nRN6oplso0BNtyiC9sWKSJUWbUqmdZXmbVuB6 -vZDb5Ae9KLgeHeb37HUdFbNs5ruspfdoDaXZzUI2zM2M8A0NRzxQJUGbDxqyPvPU -Er84O1P6A6WQ1JlOpljGB6fprgF7Su/HHYesEYPbTUoV/WrcYTVdoOiC7K5jNgRV -apbng8NNDvaQvsJwKgeRDonzxsMG21Iqd0/ekaRp+t8BwX6kHJtkadGUkF5aDoWe -cnyk8baN/JAVuGv4IN8K7dtKBOTmKxPFtyPHAl6rUlc/oKnQ/36MoRTXJwiqRCGn -aSrjaU++9M2VC6zBecCERYECAwEAAQ== +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAw31V78b2gezAzuWcjAqs +eqWAib2AwuUpf9UdBTD0NVjxWR68SNT09BXaUW6OGdGl0QgNOoalYlj/sk9zsHHI +dEDtez/RJCflBUE6knSquB5DmygelVVJ0JgJO95j6ivwuVLl2Ub/TpGoc2mkN8Pi +qJG6GU6Nd8T7YWEtGH4zpI7Osn5VWEkKGyXZ3YHkoi5OxtDxJ4EERPhr/n5L7Ah4 +rOZUq8A+2qje8/c1SKwxzmAwiHqX3haYcCtXqesd0jG5TmeEWkmjEXyRrPbVjLVz +EhQoSIxy+L76gH5OfPjmVR5UzhV0PZdHZbxJEpwQT0yjMn8HrUlWHVRNVijzYJNV +lvezdJCM1UQm6WzDJeVAnwntw9FJVgBON8eXG/48kZC+s/tBHJG18P2dh19C5lmM +TrMh5LhB4E9+EVBW7Prb79sNMThqBSmi6qeeEPPsBo7NbEi7AkNu/y2uJNl0nT1a +PsFRlXyx1wHLSg1lI9RNNgXJpXdO/rxA7ieXQHz4QExXxOdkUgvqDSRSA/3nz2jH +U8BD5t0PX6GDLPRuiJtG2Nyz3LlX07LBeGtP6LxZLBaCAb+WLlw294qSWFDAr8bA +lenRJYPzH4WXqlgJ4luArF5dI1sfq3XPjatI6N0Ytm7XCD4jHRFnXm0SwA+Xc5HC +Ib4y4AfQbSlkmJu5FhSajsMCAwEAAQ== -----END rsa public key----- diff --git a/eventbus/eventbus.go b/eventbus/eventbus.go new file mode 100644 index 0000000..d6377e0 --- /dev/null +++ b/eventbus/eventbus.go @@ -0,0 +1,185 @@ +// Copyright 2025 dudaodong@gmail.com. All rights reserved. +// Use of this source code is governed by MIT license + +// Package eventbus implements a simple event bus. +package eventbus + +import ( + "fmt" + "sort" + "sync" +) + +// Event is the struct that is passed to the event listener, now it directly uses the generic Payload type. +type Event[T any] struct { + Topic string + Payload T +} + +// EventBus is the struct that holds the listeners and the error handler. +type EventBus[T any] struct { + // listeners map[string][]*EventListener[T] + listeners sync.Map + mu sync.RWMutex + errorHandler func(err error) +} + +// EventListener is the struct that holds the listener function and its priority. +type EventListener[T any] struct { + priority int + listener func(eventData T) + async bool + filter func(eventData T) bool +} + +// NewEventBus creates a new EventBus. +func NewEventBus[T any]() *EventBus[T] { + return &EventBus[T]{ + listeners: sync.Map{}, + } +} + +// Subscribe subscribes to an event with a specific event topic and listener function. +func (eb *EventBus[T]) Subscribe(topic string, listener func(eventData T), async bool, priority int, filter func(eventData T) bool) { + eb.mu.Lock() + defer eb.mu.Unlock() + + el := &EventListener[T]{ + priority: priority, + listener: listener, + async: async, + filter: filter, + } + + listenersInterface, _ := eb.listeners.LoadOrStore(topic, []*EventListener[T]{}) + listeners := listenersInterface.([]*EventListener[T]) + + listeners = append(listeners, el) + sort.Slice(listeners, func(i, j int) bool { + return listeners[i].priority > listeners[j].priority + }) + + eb.listeners.Store(topic, listeners) +} + +// Unsubscribe unsubscribes from an event with a specific event topic and listener function. +func (eb *EventBus[T]) Unsubscribe(topic string, listener func(eventData T)) { + eb.mu.Lock() + defer eb.mu.Unlock() + + listenersInterface, ok := eb.listeners.Load(topic) + if !ok { + return + } + + listeners := listenersInterface.([]*EventListener[T]) + listenerPtr := fmt.Sprintf("%p", listener) + + var updatedListeners []*EventListener[T] + for _, l := range listeners { + if fmt.Sprintf("%p", l.listener) != listenerPtr { + updatedListeners = append(updatedListeners, l) + } + } + + eb.listeners.Store(topic, updatedListeners) +} + +// Publish publishes an event with a specific event topic and data payload. +func (eb *EventBus[T]) Publish(event Event[T]) { + eb.mu.RLock() + defer eb.mu.RUnlock() + + listenersInterface, exists := eb.listeners.Load(event.Topic) + if !exists { + return + } + + listeners := listenersInterface.([]*EventListener[T]) + + for _, listener := range listeners { + if listener.filter != nil && !listener.filter(event.Payload) { + continue + } + + if listener.async { + go eb.publishToListener(listener, event) + } else { + eb.publishToListener(listener, event) + } + } +} + +func (eb *EventBus[T]) publishToListener(listener *EventListener[T], event Event[T]) { + defer func() { + if r := recover(); r != nil && eb.errorHandler != nil { + eb.errorHandler(fmt.Errorf("%v", r)) + } + }() + + listener.listener(event.Payload) +} + +// SetErrorHandler sets the error handler function. +func (eb *EventBus[T]) SetErrorHandler(handler func(err error)) { + eb.errorHandler = handler +} + +// ClearListeners clears all the listeners. +func (eb *EventBus[T]) ClearListeners() { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.listeners = sync.Map{} +} + +// ClearListenersByTopic clears all the listeners by topic. +func (eb *EventBus[T]) ClearListenersByTopic(topic string) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.listeners.Delete(topic) +} + +// GetListenersCount returns the number of listeners for a specific event topic. +func (eb *EventBus[T]) GetListenersCount(topic string) int { + eb.mu.RLock() + defer eb.mu.RUnlock() + + listenersInterface, ok := eb.listeners.Load(topic) + if !ok { + return 0 + } + + listeners := listenersInterface.([]*EventListener[T]) + return len(listeners) +} + +// GetAllListenersCount returns the total number of listeners. +func (eb *EventBus[T]) GetAllListenersCount() int { + eb.mu.RLock() + defer eb.mu.RUnlock() + + count := 0 + eb.listeners.Range(func(key, value interface{}) bool { + count += len(value.([]*EventListener[T])) + return true + }) + + return count +} + +// GetEvents returns all the events topics. +func (eb *EventBus[T]) GetEvents() []string { + eb.mu.RLock() + defer eb.mu.RUnlock() + + var events []string + + eb.listeners.Range(func(key, value interface{}) bool { + events = append(events, key.(string)) + return true + }) + + return events +} diff --git a/eventbus/eventbus_test.go b/eventbus/eventbus_test.go new file mode 100644 index 0000000..15ce3de --- /dev/null +++ b/eventbus/eventbus_test.go @@ -0,0 +1,220 @@ +package eventbus + +import ( + "sync" + "testing" + "time" + + "github.com/duke-git/lancet/v2/internal" +) + +func TestEventBus_Subscribe(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_Subscribe") + + eb := NewEventBus[int]() + + eb.Subscribe("event1", func(eventData int) { + assert.Equal(1, eventData) + }, false, 0, nil) + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) +} + +func TestEventBus_Unsubscribe(t *testing.T) { + + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_Unsubscribe") + + eb := NewEventBus[int]() + + receivedData := 0 + listener := func(eventData int) { + receivedData = eventData + } + + eb.Subscribe("event1", listener, false, 0, nil) + eb.Unsubscribe("event1", listener) + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) + + assert.Equal(0, receivedData) +} + +func TestEventBus_Subscribe_WithFilter(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_Subscribe_WithFilter") + + eb := NewEventBus[int]() + + receivedData := 0 + listener := func(eventData int) { + receivedData = eventData + } + + filter := func(eventData int) bool { + return eventData == 1 + } + + eb.Subscribe("event1", listener, false, 0, filter) + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) + eb.Publish(Event[int]{Topic: "event1", Payload: 2}) + + assert.Equal(1, receivedData) +} + +func TestEventBus_Subscribe_WithPriority(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_Subscribe_WithPriority") + + eb := NewEventBus[int]() + + var receivedData []int + listener1 := func(eventData int) { + receivedData = append(receivedData, 1) + } + + listener2 := func(eventData int) { + receivedData = append(receivedData, 2) + } + + eb.Subscribe("event1", listener1, false, 1, nil) + eb.Subscribe("event1", listener2, false, 2, nil) + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) + + assert.Equal([]int{2, 1}, receivedData) +} + +func TestEventBus_Subscribe_Async(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_Subscribe_Async") + + eb := NewEventBus[string]() + + var wg sync.WaitGroup + wg.Add(1) + + eb.Subscribe("event1", func(eventData string) { + time.Sleep(100 * time.Millisecond) + assert.Equal("hello", eventData) + wg.Done() + }, true, 1, nil) + + eb.Publish(Event[string]{Topic: "event1", Payload: "hello"}) + + wg.Wait() +} + +func TestEventBus_ErrorHandler(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_ErrorHandler") + + eb := NewEventBus[string]() + + eb.SetErrorHandler(func(err error) { + assert.Equal("error", err.Error()) + }) + + eb.Subscribe("event1", func(eventData string) { + panic("error") + }, false, 0, nil) + + eb.Publish(Event[string]{Topic: "event1", Payload: "hello"}) +} + +func TestEventBus_ClearListeners(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_ClearListeners") + + eb := NewEventBus[int]() + + receivedData1 := 0 + receivedData2 := 0 + + eb.Subscribe("event1", func(eventData int) { + receivedData1 = eventData + }, false, 0, nil) + + eb.Subscribe("event2", func(eventData int) { + receivedData2 = eventData + }, false, 0, nil) + + eb.ClearListeners() + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) + eb.Publish(Event[int]{Topic: "event1", Payload: 2}) + + assert.Equal(0, receivedData1) + assert.Equal(0, receivedData2) +} + +func TestEventBus_ClearListenersByTopic(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_ClearListenersByTopic") + + eb := NewEventBus[int]() + + receivedData1 := 0 + receivedData2 := 0 + + eb.Subscribe("event1", func(eventData int) { + receivedData1 = eventData + }, false, 0, nil) + + eb.Subscribe("event2", func(eventData int) { + receivedData2 = eventData + }, false, 0, nil) + + eb.ClearListenersByTopic("event1") + + eb.Publish(Event[int]{Topic: "event1", Payload: 1}) + eb.Publish(Event[int]{Topic: "event2", Payload: 2}) + + assert.Equal(0, receivedData1) + assert.Equal(2, receivedData2) +} + +func TestEventBus_GetListenersCount(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_GetListenersCount") + + eb := NewEventBus[int]() + + eb.Subscribe("event1", func(eventData int) {}, false, 0, nil) + eb.Subscribe("event1", func(eventData int) {}, false, 0, nil) + eb.Subscribe("event2", func(eventData int) {}, false, 0, nil) + + assert.Equal(2, eb.GetListenersCount("event1")) + assert.Equal(1, eb.GetListenersCount("event2")) +} + +func TestEventBus_GetAllListenersCount(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_GetAllListenersCount") + + eb := NewEventBus[int]() + + eb.Subscribe("event1", func(eventData int) {}, false, 0, nil) + eb.Subscribe("event1", func(eventData int) {}, false, 0, nil) + eb.Subscribe("event2", func(eventData int) {}, false, 0, nil) + + assert.Equal(3, eb.GetAllListenersCount()) +} + +func TestEventBus_GetEvents(t *testing.T) { + t.Parallel() + assert := internal.NewAssert(t, "TestEventBus_GetEvents") + + eb := NewEventBus[int]() + + eb.Subscribe("event1", func(eventData int) {}, false, 0, nil) + eb.Subscribe("event2", func(eventData int) {}, false, 0, nil) + + events := eb.GetEvents() + + assert.Equal(2, len(events)) + assert.Equal("event1", events[0]) + assert.Equal("event2", events[1]) +}