Skip to content

Commit df0ccd4

Browse files
committed
imporve TDD facades
1 parent a3369cf commit df0ccd4

File tree

5 files changed

+48
-326
lines changed

5 files changed

+48
-326
lines changed

streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
* @param <OutV> output value type (exposed by this facade)
3232
*/
3333
public class GenericReadOnlyKeyValueStoreFacade<K, InV, OutV> implements ReadOnlyKeyValueStore<K, OutV> {
34-
protected final ReadOnlyKeyValueStore<K, InV> inner;
35-
protected final Function<InV, OutV> valueConverter;
34+
private final ReadOnlyKeyValueStore<K, InV> inner;
35+
private final Function<InV, OutV> valueConverter;
3636

3737
public GenericReadOnlyKeyValueStoreFacade(final ReadOnlyKeyValueStore<K, InV> inner,
3838
final Function<InV, OutV> valueConverter) {

streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
* @param <OutV> output value type (exposed by this facade)
3434
*/
3535
public class GenericReadOnlyWindowStoreFacade<K, InV, OutV> implements ReadOnlyWindowStore<K, OutV> {
36-
protected final ReadOnlyWindowStore<K, InV> inner;
37-
protected final Function<InV, OutV> valueConverter;
36+
private final ReadOnlyWindowStore<K, InV> inner;
37+
private final Function<InV, OutV> valueConverter;
3838

3939
public GenericReadOnlyWindowStoreFacade(final ReadOnlyWindowStore<K, InV> inner,
4040
final Function<InV, OutV> valueConverter) {

streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

Lines changed: 12 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,49 +1281,12 @@ public void waitObject(final Object obj, final Supplier<Boolean> condition, fina
12811281
}
12821282
}
12831283

1284-
static class KeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
1284+
static class KeyValueStoreFacade<K, V> extends GenericReadOnlyKeyValueStoreFacade<K, ValueAndTimestamp<V>, V> implements KeyValueStore<K, V> {
12851285
private final TimestampedKeyValueStore<K, V> inner;
1286-
private final GenericReadOnlyKeyValueStoreFacade<K, ValueAndTimestamp<V>, V> readFacade;
12871286

1288-
public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
1289-
this.inner = inner;
1290-
this.readFacade = new GenericReadOnlyKeyValueStoreFacade<>(inner, ValueConverters.extractValue());
1291-
}
1292-
1293-
@Override
1294-
public V get(final K key) {
1295-
return readFacade.get(key);
1296-
}
1297-
1298-
@Override
1299-
public KeyValueIterator<K, V> range(final K from, final K to) {
1300-
return readFacade.range(from, to);
1301-
}
1302-
1303-
@Override
1304-
public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
1305-
return readFacade.reverseRange(from, to);
1306-
}
1307-
1308-
@Override
1309-
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix,
1310-
final PS prefixKeySerializer) {
1311-
return readFacade.prefixScan(prefix, prefixKeySerializer);
1312-
}
1313-
1314-
@Override
1315-
public KeyValueIterator<K, V> all() {
1316-
return readFacade.all();
1317-
}
1318-
1319-
@Override
1320-
public KeyValueIterator<K, V> reverseAll() {
1321-
return readFacade.reverseAll();
1322-
}
1323-
1324-
@Override
1325-
public long approximateNumEntries() {
1326-
return readFacade.approximateNumEntries();
1287+
public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store) {
1288+
super(store, ValueConverters.extractValue());
1289+
this.inner = store;
13271290
}
13281291

13291292
@Override
@@ -1384,25 +1347,17 @@ public Position getPosition() {
13841347
}
13851348
}
13861349

1387-
static class WindowStoreFacade<K, V> implements WindowStore<K, V> {
1350+
static class WindowStoreFacade<K, V> extends GenericReadOnlyWindowStoreFacade<K, ValueAndTimestamp<V>, V> implements WindowStore<K, V> {
13881351
private final TimestampedWindowStore<K, V> inner;
1389-
private final GenericReadOnlyWindowStoreFacade<K, ValueAndTimestamp<V>, V> readFacade;
13901352

13911353
public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
1354+
super(store, ValueConverters.extractValue());
13921355
this.inner = store;
1393-
this.readFacade = new GenericReadOnlyWindowStoreFacade<>(store, ValueConverters.extractValue());
13941356
}
13951357

13961358
@Override
1397-
public V fetch(final K key, final long time) {
1398-
return readFacade.fetch(key, time);
1399-
}
1400-
1401-
@Override
1402-
public WindowStoreIterator<V> fetch(final K key,
1403-
final Instant timeFrom,
1404-
final Instant timeTo) {
1405-
return readFacade.fetch(key, timeFrom, timeTo);
1359+
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
1360+
inner.init(stateStoreContext, root);
14061361
}
14071362

14081363
@Override
@@ -1413,89 +1368,17 @@ public WindowStoreIterator<V> fetch(final K key,
14131368
}
14141369

14151370
@Override
1416-
public WindowStoreIterator<V> backwardFetch(final K key,
1417-
final Instant timeFrom,
1418-
final Instant timeTo) {
1419-
return readFacade.backwardFetch(key, timeFrom, timeTo);
1420-
}
1421-
1422-
@Override
1423-
public WindowStoreIterator<V> backwardFetch(final K key,
1424-
final long timeFrom,
1425-
final long timeTo) {
1426-
return backwardFetch(key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
1427-
}
1428-
1429-
@Override
1430-
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
1431-
final K keyTo,
1432-
final Instant timeFrom,
1433-
final Instant timeTo) {
1434-
return readFacade.fetch(keyFrom, keyTo, timeFrom, timeTo);
1371+
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
1372+
final long timeTo) {
1373+
return fetchAll(Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
14351374
}
14361375

14371376
@Override
14381377
public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
14391378
final K keyTo,
14401379
final long timeFrom,
14411380
final long timeTo) {
1442-
return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
1443-
Instant.ofEpochMilli(timeTo));
1444-
}
1445-
1446-
@Override
1447-
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
1448-
final K keyTo,
1449-
final Instant timeFrom,
1450-
final Instant timeTo) {
1451-
return readFacade.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
1452-
}
1453-
1454-
@Override
1455-
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
1456-
final K keyTo,
1457-
final long timeFrom,
1458-
final long timeTo) {
1459-
return backwardFetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
1460-
}
1461-
1462-
@Override
1463-
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant timeFrom,
1464-
final Instant timeTo) {
1465-
return readFacade.fetchAll(timeFrom, timeTo);
1466-
}
1467-
1468-
@Override
1469-
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
1470-
final long timeTo) {
1471-
return fetchAll(Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
1472-
}
1473-
1474-
@Override
1475-
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant timeFrom,
1476-
final Instant timeTo) {
1477-
return readFacade.backwardFetchAll(timeFrom, timeTo);
1478-
}
1479-
1480-
@Override
1481-
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long timeFrom,
1482-
final long timeTo) {
1483-
return backwardFetchAll(Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
1484-
}
1485-
1486-
@Override
1487-
public KeyValueIterator<Windowed<K>, V> all() {
1488-
return readFacade.all();
1489-
}
1490-
1491-
@Override
1492-
public KeyValueIterator<Windowed<K>, V> backwardAll() {
1493-
return readFacade.backwardAll();
1494-
}
1495-
1496-
@Override
1497-
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
1498-
inner.init(stateStoreContext, root);
1381+
return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
14991382
}
15001383

15011384
@Override

streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java

Lines changed: 6 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
package org.apache.kafka.streams;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20-
import org.apache.kafka.common.serialization.StringSerializer;
2120
import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
2221
import org.apache.kafka.streams.processor.StateStore;
2322
import org.apache.kafka.streams.processor.StateStoreContext;
24-
import org.apache.kafka.streams.state.KeyValueIterator;
23+
import org.apache.kafka.streams.query.Position;
2524
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
2625
import org.apache.kafka.streams.state.ValueAndTimestamp;
2726

@@ -142,90 +141,11 @@ public void shouldReturnIsOpen() {
142141
}
143142

144143
@Test
145-
public void shouldGetAndConvertValue() {
146-
when(mockedKeyValueTimestampStore.get("key1"))
147-
.thenReturn(ValueAndTimestamp.make("value1", 42L));
148-
when(mockedKeyValueTimestampStore.get("key2"))
149-
.thenReturn(null);
150-
151-
assertThat(keyValueStoreFacade.get("key1"), is("value1"));
152-
assertNull(keyValueStoreFacade.get("key2"));
153-
}
154-
155-
@Test
156-
public void shouldRangeAndConvertValues() {
157-
@SuppressWarnings("unchecked")
158-
final KeyValueIterator<String, ValueAndTimestamp<String>> mockIterator = mock(KeyValueIterator.class);
159-
when(mockedKeyValueTimestampStore.range("from", "to")).thenReturn(mockIterator);
160-
when(mockIterator.hasNext()).thenReturn(true, true, false);
161-
when(mockIterator.next())
162-
.thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 10L)))
163-
.thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 20L)));
164-
165-
final KeyValueIterator<String, String> iterator = keyValueStoreFacade.range("from", "to");
166-
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
167-
assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
168-
}
169-
170-
@Test
171-
public void shouldReverseRangeAndConvertValues() {
172-
@SuppressWarnings("unchecked")
173-
final KeyValueIterator<String, ValueAndTimestamp<String>> mockIterator = mock(KeyValueIterator.class);
174-
when(mockedKeyValueTimestampStore.reverseRange("from", "to")).thenReturn(mockIterator);
175-
when(mockIterator.hasNext()).thenReturn(true, false);
176-
when(mockIterator.next())
177-
.thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 10L)));
178-
179-
final KeyValueIterator<String, String> iterator = keyValueStoreFacade.reverseRange("from", "to");
180-
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
181-
}
182-
183-
@Test
184-
public void shouldPrefixScanAndConvertValues() {
185-
@SuppressWarnings("unchecked")
186-
final KeyValueIterator<String, ValueAndTimestamp<String>> mockIterator = mock(KeyValueIterator.class);
187-
final StringSerializer serializer = new StringSerializer();
188-
when(mockedKeyValueTimestampStore.prefixScan("prefix", serializer)).thenReturn(mockIterator);
189-
when(mockIterator.hasNext()).thenReturn(true, false);
190-
when(mockIterator.next())
191-
.thenReturn(KeyValue.pair("prefix-key", ValueAndTimestamp.make("value", 10L)));
192-
193-
final KeyValueIterator<String, String> iterator = keyValueStoreFacade.prefixScan("prefix", serializer);
194-
assertThat(iterator.next(), is(KeyValue.pair("prefix-key", "value")));
195-
}
196-
197-
@Test
198-
public void shouldGetAllAndConvertValues() {
199-
@SuppressWarnings("unchecked")
200-
final KeyValueIterator<String, ValueAndTimestamp<String>> mockIterator = mock(KeyValueIterator.class);
201-
when(mockedKeyValueTimestampStore.all()).thenReturn(mockIterator);
202-
when(mockIterator.hasNext()).thenReturn(true, true, false);
203-
when(mockIterator.next())
204-
.thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 10L)))
205-
.thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 20L)));
206-
207-
final KeyValueIterator<String, String> iterator = keyValueStoreFacade.all();
208-
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
209-
assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
210-
}
211-
212-
@Test
213-
public void shouldReverseAllAndConvertValues() {
214-
@SuppressWarnings("unchecked")
215-
final KeyValueIterator<String, ValueAndTimestamp<String>> mockIterator = mock(KeyValueIterator.class);
216-
when(mockedKeyValueTimestampStore.reverseAll()).thenReturn(mockIterator);
217-
when(mockIterator.hasNext()).thenReturn(true, false);
218-
when(mockIterator.next())
219-
.thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 10L)));
220-
221-
final KeyValueIterator<String, String> iterator = keyValueStoreFacade.reverseAll();
222-
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
223-
}
224-
225-
@Test
226-
public void shouldReturnApproximateNumEntries() {
227-
when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L);
144+
public void shouldReturnPosition() {
145+
when(mockedKeyValueTimestampStore.getPosition())
146+
.thenReturn(Position.emptyPosition());
228147

229-
assertThat(keyValueStoreFacade.approximateNumEntries(), is(42L));
148+
assertThat(keyValueStoreFacade.getPosition(), is(Position.emptyPosition()));
149+
verify(mockedKeyValueTimestampStore, times(1)).getPosition();
230150
}
231151
}

0 commit comments

Comments
 (0)