3535import org .slf4j .LoggerFactory ;
3636
3737import java .util .ArrayList ;
38+ import java .util .Collection ;
39+ import java .util .EnumMap ;
3840import java .util .EnumSet ;
3941import java .util .List ;
42+ import java .util .Map ;
4043import java .util .concurrent .CopyOnWriteArrayList ;
4144import java .util .concurrent .ExecutorService ;
4245import java .util .concurrent .Future ;
46+ import java .util .stream .Stream ;
47+
48+ import static java .util .Collections .unmodifiableMap ;
49+ import static java .util .EnumSet .allOf ;
50+ import static java .util .function .Function .identity ;
51+ import static java .util .stream .Collectors .toMap ;
52+ import static java .util .stream .Stream .concat ;
4353
4454/**
4555 * Per-cache component that manages cache event listener registrations, and provides event delivery based on desired
@@ -56,10 +66,12 @@ public class CacheEventDispatcherImpl<K, V> implements CacheEventDispatcher<K, V
5666 private static final Logger LOGGER = LoggerFactory .getLogger (CacheEventDispatcherImpl .class );
5767 private final ExecutorService unOrderedExectuor ;
5868 private final ExecutorService orderedExecutor ;
59- private int listenersCount = 0 ;
60- private int orderedListenerCount = 0 ;
61- private final List <EventListenerWrapper <K , V >> syncListenersList = new CopyOnWriteArrayList <>();
62- private final List <EventListenerWrapper <K , V >> aSyncListenersList = new CopyOnWriteArrayList <>();
69+
70+ private final Map <EventType , List <EventListenerWrapper <K , V >>> syncListenersList = unmodifiableMap (allOf (EventType .class ).stream ()
71+ .collect (toMap (identity (), t -> new CopyOnWriteArrayList <>(), (a , b ) -> { throw new AssertionError (); }, () -> new EnumMap <>(EventType .class ))));
72+ private final Map <EventType , List <EventListenerWrapper <K , V >>> asyncListenersList = unmodifiableMap (allOf (EventType .class ).stream ()
73+ .collect (toMap (identity (), t -> new CopyOnWriteArrayList <>(), (a , b ) -> { throw new AssertionError (); }, () -> new EnumMap <>(EventType .class ))));
74+
6375 private final StoreEventListener <K , V > eventListener = new StoreListener ();
6476
6577 private volatile Cache <K , V > listenerSource ;
@@ -95,69 +107,76 @@ public void registerCacheEventListener(CacheEventListener<? super K, ? super V>
95107 * @param wrapper the listener wrapper to register
96108 */
97109 private synchronized void registerCacheEventListener (EventListenerWrapper <K , V > wrapper ) {
98- if (aSyncListenersList .contains (wrapper ) || syncListenersList .contains (wrapper )) {
110+
111+ if (allListeners ().anyMatch (wrapper ::equals )) {
99112 throw new IllegalStateException ("Cache Event Listener already registered: " + wrapper .getListener ());
100113 }
101114
102- if (wrapper .isOrdered () && orderedListenerCount ++ == 0 ) {
115+ boolean firstListener = !allListeners ().findAny ().isPresent ();
116+
117+ if (wrapper .isOrdered () && (firstListener || allListeners ().noneMatch (EventListenerWrapper ::isOrdered ))) {
103118 storeEventSource .setEventOrdering (true );
104119 }
105120
106121 switch (wrapper .getFiringMode ()) {
107122 case ASYNCHRONOUS :
108- aSyncListenersList . add (wrapper );
123+ wrapper . getEventTypes (). forEach ( type -> asyncListenersList . get ( type ). add (wrapper ) );
109124 break ;
110125 case SYNCHRONOUS :
111- if (syncListenersList . isEmpty ()) {
126+ if (! syncListeners (). findAny (). isPresent ()) {
112127 storeEventSource .setSynchronous (true );
113128 }
114- syncListenersList .add (wrapper );
129+ wrapper . getEventTypes (). forEach ( type -> syncListenersList .get ( type ). add (wrapper ) );
115130 break ;
116131 default :
117132 throw new AssertionError ("Unhandled EventFiring value: " + wrapper .getFiringMode ());
118133 }
119134
120- if (listenersCount ++ == 0 ) {
135+ if (firstListener ) {
121136 storeEventSource .addEventListener (eventListener );
122137 }
123138 }
124139
140+ private Stream <EventListenerWrapper <K , V >> allListeners () {
141+ return concat (asyncListeners (), syncListeners ());
142+ }
143+
144+ private Stream <EventListenerWrapper <K , V >> syncListeners () {
145+ return syncListenersList .values ().stream ().flatMap (Collection ::stream );
146+ }
147+
148+ private Stream <EventListenerWrapper <K , V >> asyncListeners () {
149+ return asyncListenersList .values ().stream ().flatMap (Collection ::stream );
150+ }
151+
125152 /**
126153 * {@inheritDoc}
127154 */
128155 @ Override
129- public void deregisterCacheEventListener (CacheEventListener <? super K , ? super V > listener ) {
156+ public synchronized void deregisterCacheEventListener (CacheEventListener <? super K , ? super V > listener ) {
130157 EventListenerWrapper <K , V > wrapper = new EventListenerWrapper <>(listener );
131158
132- if (!removeWrapperFromList (wrapper , aSyncListenersList )) {
133- if (!removeWrapperFromList (wrapper , syncListenersList )) {
134- throw new IllegalStateException ("Unknown cache event listener: " + listener );
135- }
159+ boolean removed = Stream .of (asyncListenersList , syncListenersList )
160+ .flatMap (list -> list .values ().stream ())
161+ .map (list -> list .remove (wrapper ))
162+ .reduce ((a , b ) -> a || b ).orElse (false );
163+
164+ if (!removed ) {
165+ throw new IllegalStateException ("Unknown cache event listener: " + listener );
136166 }
137- }
138167
139- /**
140- * Synchronized to make sure listener removal is atomic
141- *
142- * @param wrapper the listener wrapper to unregister
143- * @param listenersList the listener list to remove from
144- */
145- private synchronized boolean removeWrapperFromList (EventListenerWrapper <K , V > wrapper , List <EventListenerWrapper <K , V >> listenersList ) {
146- int index = listenersList .indexOf (wrapper );
147- if (index != -1 ) {
148- EventListenerWrapper <K , V > containedWrapper = listenersList .remove (index );
149- if (containedWrapper .isOrdered () && --orderedListenerCount == 0 ) {
168+ if (!allListeners ().findAny ().isPresent ()) {
169+ storeEventSource .setSynchronous (false );
170+ storeEventSource .setEventOrdering (false );
171+ storeEventSource .removeEventListener (eventListener );
172+ } else {
173+ if (allListeners ().noneMatch (EventListenerWrapper ::isOrdered )) {
150174 storeEventSource .setEventOrdering (false );
151175 }
152- if (--listenersCount == 0 ) {
153- storeEventSource .removeEventListener (eventListener );
154- }
155- if (syncListenersList .isEmpty ()) {
176+ if (!syncListeners ().findAny ().isPresent ()) {
156177 storeEventSource .setSynchronous (false );
157178 }
158- return true ;
159179 }
160- return false ;
161180 }
162181
163182 /**
@@ -168,8 +187,8 @@ public synchronized void shutdown() {
168187 storeEventSource .removeEventListener (eventListener );
169188 storeEventSource .setEventOrdering (false );
170189 storeEventSource .setSynchronous (false );
171- syncListenersList .clear ( );
172- aSyncListenersList . clear ( );
190+ syncListenersList .values (). forEach ( Collection :: clear );
191+ asyncListenersList . values (). forEach ( Collection :: clear );
173192 unOrderedExectuor .shutdown ();
174193 orderedExecutor .shutdown ();
175194 }
@@ -189,11 +208,13 @@ void onEvent(CacheEvent<K, V> event) {
189208 } else {
190209 executor = unOrderedExectuor ;
191210 }
192- if (!aSyncListenersList .isEmpty ()) {
193- executor .submit (new EventDispatchTask <>(event , aSyncListenersList ));
211+ List <EventListenerWrapper <K , V >> asyncTargets = asyncListenersList .get (event .getType ());
212+ if (!asyncTargets .isEmpty ()) {
213+ executor .submit (new EventDispatchTask <>(event , asyncTargets ));
194214 }
195- if (!syncListenersList .isEmpty ()) {
196- Future <?> future = executor .submit (new EventDispatchTask <>(event , syncListenersList ));
215+ List <EventListenerWrapper <K , V >> syncTargets = syncListenersList .get (event .getType ());
216+ if (!syncTargets .isEmpty ()) {
217+ Future <?> future = executor .submit (new EventDispatchTask <>(event , syncTargets ));
197218 try {
198219 future .get ();
199220 } catch (Exception e ) {
0 commit comments