3434import org .slf4j .LoggerFactory ;
3535
3636import java .util .ArrayList ;
37+ import java .util .Collection ;
38+ import java .util .EnumMap ;
3739import java .util .EnumSet ;
3840import java .util .List ;
41+ import java .util .Map ;
3942import java .util .concurrent .CopyOnWriteArrayList ;
4043import java .util .concurrent .ExecutorService ;
4144import java .util .concurrent .Future ;
45+ import java .util .stream .Stream ;
46+
47+ import static java .util .Collections .unmodifiableMap ;
48+ import static java .util .EnumSet .allOf ;
49+ import static java .util .function .Function .identity ;
50+ import static java .util .stream .Collectors .toMap ;
51+ import static java .util .stream .Stream .concat ;
4252
4353/**
4454 * Per-cache component that manages cache event listener registrations, and provides event delivery based on desired
@@ -55,10 +65,12 @@ public class CacheEventDispatcherImpl<K, V> implements CacheEventDispatcher<K, V
5565 private static final Logger LOGGER = LoggerFactory .getLogger (CacheEventDispatcherImpl .class );
5666 private final ExecutorService unOrderedExectuor ;
5767 private final ExecutorService orderedExecutor ;
58- private int listenersCount = 0 ;
59- private int orderedListenerCount = 0 ;
60- private final List <EventListenerWrapper <K , V >> syncListenersList = new CopyOnWriteArrayList <>();
61- private final List <EventListenerWrapper <K , V >> aSyncListenersList = new CopyOnWriteArrayList <>();
68+
69+ private final Map <EventType , List <EventListenerWrapper <K , V >>> syncListenersList = unmodifiableMap (allOf (EventType .class ).stream ()
70+ .collect (toMap (identity (), t -> new CopyOnWriteArrayList <>(), (a , b ) -> { throw new AssertionError (); }, () -> new EnumMap <>(EventType .class ))));
71+ private final Map <EventType , List <EventListenerWrapper <K , V >>> asyncListenersList = unmodifiableMap (allOf (EventType .class ).stream ()
72+ .collect (toMap (identity (), t -> new CopyOnWriteArrayList <>(), (a , b ) -> { throw new AssertionError (); }, () -> new EnumMap <>(EventType .class ))));
73+
6274 private final StoreEventListener <K , V > eventListener = new StoreListener ();
6375
6476 private volatile Cache <K , V > listenerSource ;
@@ -94,69 +106,76 @@ public void registerCacheEventListener(CacheEventListener<? super K, ? super V>
94106 * @param wrapper the listener wrapper to register
95107 */
96108 private synchronized void registerCacheEventListener (EventListenerWrapper <K , V > wrapper ) {
97- if (aSyncListenersList .contains (wrapper ) || syncListenersList .contains (wrapper )) {
109+
110+ if (allListeners ().anyMatch (wrapper ::equals )) {
98111 throw new IllegalStateException ("Cache Event Listener already registered: " + wrapper .getListener ());
99112 }
100113
101- if (wrapper .isOrdered () && orderedListenerCount ++ == 0 ) {
114+ boolean firstListener = !allListeners ().findAny ().isPresent ();
115+
116+ if (wrapper .isOrdered () && (firstListener || allListeners ().noneMatch (EventListenerWrapper ::isOrdered ))) {
102117 storeEventSource .setEventOrdering (true );
103118 }
104119
105120 switch (wrapper .getFiringMode ()) {
106121 case ASYNCHRONOUS :
107- aSyncListenersList . add (wrapper );
122+ wrapper . getEventTypes (). forEach ( type -> asyncListenersList . get ( type ). add (wrapper ) );
108123 break ;
109124 case SYNCHRONOUS :
110- if (syncListenersList . isEmpty ()) {
125+ if (! syncListeners (). findAny (). isPresent ()) {
111126 storeEventSource .setSynchronous (true );
112127 }
113- syncListenersList .add (wrapper );
128+ wrapper . getEventTypes (). forEach ( type -> syncListenersList .get ( type ). add (wrapper ) );
114129 break ;
115130 default :
116131 throw new AssertionError ("Unhandled EventFiring value: " + wrapper .getFiringMode ());
117132 }
118133
119- if (listenersCount ++ == 0 ) {
134+ if (firstListener ) {
120135 storeEventSource .addEventListener (eventListener );
121136 }
122137 }
123138
139+ private Stream <EventListenerWrapper <K , V >> allListeners () {
140+ return concat (asyncListeners (), syncListeners ());
141+ }
142+
143+ private Stream <EventListenerWrapper <K , V >> syncListeners () {
144+ return syncListenersList .values ().stream ().flatMap (Collection ::stream );
145+ }
146+
147+ private Stream <EventListenerWrapper <K , V >> asyncListeners () {
148+ return asyncListenersList .values ().stream ().flatMap (Collection ::stream );
149+ }
150+
124151 /**
125152 * {@inheritDoc}
126153 */
127154 @ Override
128- public void deregisterCacheEventListener (CacheEventListener <? super K , ? super V > listener ) {
155+ public synchronized void deregisterCacheEventListener (CacheEventListener <? super K , ? super V > listener ) {
129156 EventListenerWrapper <K , V > wrapper = new EventListenerWrapper <>(listener );
130157
131- if (!removeWrapperFromList (wrapper , aSyncListenersList )) {
132- if (!removeWrapperFromList (wrapper , syncListenersList )) {
133- throw new IllegalStateException ("Unknown cache event listener: " + listener );
134- }
158+ boolean removed = Stream .of (asyncListenersList , syncListenersList )
159+ .flatMap (list -> list .values ().stream ())
160+ .map (list -> list .remove (wrapper ))
161+ .reduce ((a , b ) -> a || b ).orElse (false );
162+
163+ if (!removed ) {
164+ throw new IllegalStateException ("Unknown cache event listener: " + listener );
135165 }
136- }
137166
138- /**
139- * Synchronized to make sure listener removal is atomic
140- *
141- * @param wrapper the listener wrapper to unregister
142- * @param listenersList the listener list to remove from
143- */
144- private synchronized boolean removeWrapperFromList (EventListenerWrapper <K , V > wrapper , List <EventListenerWrapper <K , V >> listenersList ) {
145- int index = listenersList .indexOf (wrapper );
146- if (index != -1 ) {
147- EventListenerWrapper <K , V > containedWrapper = listenersList .remove (index );
148- if (containedWrapper .isOrdered () && --orderedListenerCount == 0 ) {
167+ if (!allListeners ().findAny ().isPresent ()) {
168+ storeEventSource .setSynchronous (false );
169+ storeEventSource .setEventOrdering (false );
170+ storeEventSource .removeEventListener (eventListener );
171+ } else {
172+ if (allListeners ().noneMatch (EventListenerWrapper ::isOrdered )) {
149173 storeEventSource .setEventOrdering (false );
150174 }
151- if (--listenersCount == 0 ) {
152- storeEventSource .removeEventListener (eventListener );
153- }
154- if (syncListenersList .isEmpty ()) {
175+ if (!syncListeners ().findAny ().isPresent ()) {
155176 storeEventSource .setSynchronous (false );
156177 }
157- return true ;
158178 }
159- return false ;
160179 }
161180
162181 /**
@@ -167,8 +186,8 @@ public synchronized void shutdown() {
167186 storeEventSource .removeEventListener (eventListener );
168187 storeEventSource .setEventOrdering (false );
169188 storeEventSource .setSynchronous (false );
170- syncListenersList .clear ( );
171- aSyncListenersList . clear ( );
189+ syncListenersList .values (). forEach ( Collection :: clear );
190+ asyncListenersList . values (). forEach ( Collection :: clear );
172191 unOrderedExectuor .shutdown ();
173192 orderedExecutor .shutdown ();
174193 }
@@ -188,11 +207,13 @@ void onEvent(CacheEvent<K, V> event) {
188207 } else {
189208 executor = unOrderedExectuor ;
190209 }
191- if (!aSyncListenersList .isEmpty ()) {
192- executor .submit (new EventDispatchTask <>(event , aSyncListenersList ));
210+ List <EventListenerWrapper <K , V >> asyncTargets = asyncListenersList .get (event .getType ());
211+ if (!asyncTargets .isEmpty ()) {
212+ executor .submit (new EventDispatchTask <>(event , asyncTargets ));
193213 }
194- if (!syncListenersList .isEmpty ()) {
195- Future <?> future = executor .submit (new EventDispatchTask <>(event , syncListenersList ));
214+ List <EventListenerWrapper <K , V >> syncTargets = syncListenersList .get (event .getType ());
215+ if (!syncTargets .isEmpty ()) {
216+ Future <?> future = executor .submit (new EventDispatchTask <>(event , syncTargets ));
196217 try {
197218 future .get ();
198219 } catch (Exception e ) {
0 commit comments