Skip to content

Commit f0b338b

Browse files
committed
added identity operations
1 parent 878c0af commit f0b338b

File tree

8 files changed

+1942
-12
lines changed

8 files changed

+1942
-12
lines changed

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,6 @@
6868
"**/.*": {
6969
"when": "$(basename).isDirectory"
7070
}
71-
}
71+
},
72+
"python.languageServer": "None"
7273
}

README.md

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,69 @@ print(list(bag)) # [1, 2, 3, 4]
7272

7373
### ConcurrentDictionary
7474

75-
A thread-safe dictionary. It has a few notable methods:
76-
77-
- `assign_atomic()`
78-
- `get_locked()`
79-
- `update_atomic()`
75+
A thread-safe dictionary. It has several atomic methods for safe concurrent operations:
76+
77+
- `assign_atomic()` - Atomically assign a value to a key
78+
- `update_atomic()` - Atomically update a value using a function
79+
- `remove_atomic()` - Atomically remove a key and return its value
80+
- `put_if_absent()` - Atomically put a value only if the key doesn't exist
81+
- `replace_if_present()` - Atomically replace a value only if the key exists
82+
- `replace_if_equal()` - Atomically replace a value only if it equals the expected value
83+
- `remove_if_exists()` - Atomically remove a key if it exists
84+
- `get_and_remove()` - Atomically get and remove a value
85+
- `get_locked()` - Context manager for safe read-modify-write operations
8086

8187
#### ConcurrentDictionary's `assign_atomic()`
8288

8389
Assigns a dictionary value under a key in a thread-safe way.
8490
While `dict["somekey"] = value` is allowed, it's best to use `assign_atomic()` for clarity of intent. Using normal assignment will work but raise a UserWarning.
8591

92+
#### ConcurrentDictionary's `remove_atomic()`
93+
94+
Atomically removes a key from the dictionary and returns its value, or None if the key doesn't exist.
95+
96+
```python
97+
from concurrent_collections import ConcurrentDictionary
98+
99+
d = ConcurrentDictionary({'x': 1, 'y': 2})
100+
value = d.remove_atomic('x') # Returns 1, removes 'x'
101+
```
102+
103+
#### ConcurrentDictionary's `put_if_absent()`
104+
105+
Atomically puts a value for a key only if the key is not already present. Returns the existing value if the key exists, None if the key was added.
106+
107+
```python
108+
from concurrent_collections import ConcurrentDictionary
109+
110+
d = ConcurrentDictionary({'x': 1})
111+
existing = d.put_if_absent('x', 2) # Returns 1, no change
112+
existing = d.put_if_absent('y', 3) # Returns None, adds 'y': 3
113+
```
114+
115+
#### ConcurrentDictionary's `replace_if_present()`
116+
117+
Atomically replaces the value for a key only if the key exists. Returns True if the key was replaced, False if the key doesn't exist.
118+
119+
```python
120+
from concurrent_collections import ConcurrentDictionary
121+
122+
d = ConcurrentDictionary({'x': 1})
123+
replaced = d.replace_if_present('x', 2) # Returns True
124+
replaced = d.replace_if_present('y', 3) # Returns False
125+
```
126+
127+
#### ConcurrentDictionary's `replace_if_equal()`
128+
129+
Atomically replaces the value for a key only if the current value equals the expected value. Returns True if the value was replaced, False otherwise.
130+
131+
```python
132+
from concurrent_collections import ConcurrentDictionary
133+
134+
d = ConcurrentDictionary({'x': 1})
135+
replaced = d.replace_if_equal('x', 1, 2) # Returns True
136+
replaced = d.replace_if_equal('x', 1, 3) # Returns False (current value is 2)
137+
```
86138

87139
#### ConcurrentDictionary's `get_locked()`
88140

@@ -126,6 +178,70 @@ Additionally, there are [other queue classes in the `multiprocessing` module](ht
126178
- `SimpleQueue` (again)
127179

128180

181+
## Equality and Identity Semantics
182+
183+
### ConcurrentBag Equality
184+
185+
`ConcurrentBag` compares as a **multiset** - order doesn't matter, but element frequency does:
186+
187+
```python
188+
from concurrent_collections import ConcurrentBag
189+
190+
# These are equal (same elements, same frequencies)
191+
bag1 = ConcurrentBag([1, 2, 2, 3])
192+
bag2 = ConcurrentBag([2, 1, 3, 2])
193+
assert bag1 == bag2 # True
194+
195+
# These are not equal (different frequencies)
196+
bag3 = ConcurrentBag([1, 2, 3, 3])
197+
assert bag1 != bag3 # True
198+
```
199+
200+
### ConcurrentQueue Equality
201+
202+
`ConcurrentQueue` compares elements in order, taking snapshots for consistency during concurrent operations:
203+
204+
```python
205+
from concurrent_collections import ConcurrentQueue
206+
207+
# These are equal (same elements, same order)
208+
queue1 = ConcurrentQueue([1, 2, 3])
209+
queue2 = ConcurrentQueue([1, 2, 3])
210+
assert queue1 == queue2 # True
211+
212+
# These are not equal (different order)
213+
queue3 = ConcurrentQueue([3, 2, 1])
214+
assert queue1 != queue3 # True
215+
```
216+
217+
### ConcurrentDictionary Equality
218+
219+
`ConcurrentDictionary` compares key-value pairs, order doesn't matter:
220+
221+
```python
222+
from concurrent_collections import ConcurrentDictionary
223+
224+
# These are equal (same key-value pairs)
225+
dict1 = ConcurrentDictionary({'a': 1, 'b': 2})
226+
dict2 = ConcurrentDictionary({'b': 2, 'a': 1})
227+
assert dict1 == dict2 # True
228+
229+
# These are not equal (different values)
230+
dict3 = ConcurrentDictionary({'a': 1, 'b': 3})
231+
assert dict1 != dict3 # True
232+
```
233+
234+
## Thread Safety Guarantees
235+
236+
All collections provide the following guarantees:
237+
238+
1. **Atomic Operations**: All individual operations (append, remove, get, set) are atomic
239+
2. **Consistent Snapshots**: Iteration and equality comparisons take consistent snapshots
240+
3. **No Race Conditions**: Multiple threads can safely access and modify the collections
241+
4. **Identity Consistency**: Hash values and equality comparisons are consistent within a single operation
242+
243+
**Note**: While individual operations are thread-safe, compound operations (like checking length then conditionally modifying) should use the provided atomic methods or context managers to ensure consistency.
244+
129245
## License
130246

131247
MIT License

concurrent_collections/concurrent_bag.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import threading
2-
from typing import Generic, Iterable, Iterator, List, Optional, TypeVar
2+
from typing import Generic, Iterable, Iterator, List, Optional, TypeVar, Any
33

44
T = TypeVar('T')
55

@@ -54,4 +54,38 @@ def clear(self) -> None:
5454

5555
def __repr__(self) -> str:
5656
with self._lock:
57-
return f"ConcurrentBag({self._items!r})"
57+
return f"ConcurrentBag({self._items!r})"
58+
59+
def __eq__(self, other: Any) -> bool:
60+
"""
61+
Thread-safe equality comparison.
62+
63+
Two ConcurrentBag instances are equal if they have the same elements with the same frequency
64+
(multiset equality). Order does not matter for bag equality.
65+
66+
Example:
67+
ConcurrentBag([1, 2, 2, 3]) == ConcurrentBag([2, 1, 3, 2]) # True
68+
ConcurrentBag([1, 2, 2, 3]) == ConcurrentBag([1, 2, 3, 3]) # False (different frequencies)
69+
"""
70+
if not isinstance(other, ConcurrentBag):
71+
return False
72+
73+
with self._lock:
74+
with other._lock:
75+
# Compare as multisets by counting element frequencies
76+
from collections import Counter
77+
return Counter(self._items) == Counter(other._items)
78+
79+
def __hash__(self) -> int:
80+
"""
81+
Thread-safe hash computation.
82+
83+
The hash is computed based on the multiset content (element frequencies).
84+
Note: The hash will change if the bag is modified.
85+
"""
86+
with self._lock:
87+
# Convert to frozenset of (element, count) pairs for consistent hashing
88+
from collections import Counter
89+
counter = Counter(self._items)
90+
items = frozenset(counter.items())
91+
return hash(items)

concurrent_collections/concurrent_deque.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import threading
22
from collections import deque
3-
from typing import Generic, Iterable, Iterator, Optional, TypeVar
3+
from typing import Generic, Iterable, Iterator, Optional, TypeVar, Any
44

55
T = TypeVar('T')
66

@@ -48,4 +48,37 @@ def extendleft(self, iterable: Iterable[T]) -> None:
4848

4949
def __repr__(self) -> str:
5050
with self._lock:
51-
return f"ConcurrentQueue({list(self._deque)})"
51+
return f"ConcurrentQueue({list(self._deque)})"
52+
53+
def __eq__(self, other: Any) -> bool:
54+
"""
55+
Thread-safe equality comparison.
56+
57+
Two ConcurrentQueue instances are equal if they have the same elements in the same order.
58+
For concurrent operations, this comparison takes a snapshot of both queues at the time
59+
of comparison to ensure consistency.
60+
61+
Note: Due to the concurrent nature of the queue, the order may change between
62+
comparisons, but this method provides a consistent snapshot-based comparison.
63+
"""
64+
if not isinstance(other, ConcurrentQueue):
65+
return False
66+
67+
with self._lock:
68+
with other._lock:
69+
# Take snapshots for consistent comparison
70+
self_snapshot = list(self._deque)
71+
other_snapshot = list(other._deque)
72+
return self_snapshot == other_snapshot
73+
74+
def __hash__(self) -> int:
75+
"""
76+
Thread-safe hash computation.
77+
78+
The hash is computed based on the current state of the queue.
79+
Note: The hash will change if the queue is modified.
80+
"""
81+
with self._lock:
82+
# Convert to tuple for consistent hashing
83+
items = tuple(self._deque)
84+
return hash(items)

concurrent_collections/concurrent_dict.py

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def update_atomic(self, key: K, func: Callable[[V], V]) -> None:
112112
Example:
113113
d = ConcurrentDictionary({'x': 0})
114114
# Atomically increment the value for 'x'
115-
d.modify_atomic('x', lambda v: v + 1)
115+
d.update_atomic('x', lambda v: v + 1)
116116
"""
117117
with self._lock:
118118
if key in self._dict:
@@ -123,6 +123,103 @@ def update_atomic(self, key: K, func: Callable[[V], V]) -> None:
123123
# If the key does not exist, we can set it directly
124124
self._dict[key] = func(None) # type: ignore
125125

126+
def remove_atomic(self, key: K) -> Optional[V]:
127+
"""
128+
Atomically remove a key from the dictionary and return its value.
129+
130+
This method ensures that the removal is performed atomically,
131+
preventing race conditions in concurrent environments.
132+
133+
Returns the value associated with the key, or None if the key doesn't exist.
134+
135+
Example:
136+
d = ConcurrentDictionary({'x': 1, 'y': 2})
137+
value = d.remove_atomic('x') # Returns 1, removes 'x'
138+
"""
139+
with self._lock:
140+
return self._dict.pop(key, None)
141+
142+
def remove_if_exists(self, key: K) -> bool:
143+
"""
144+
Atomically remove a key from the dictionary if it exists.
145+
146+
Returns True if the key was removed, False if it didn't exist.
147+
148+
Example:
149+
d = ConcurrentDictionary({'x': 1})
150+
removed = d.remove_if_exists('x') # Returns True
151+
removed = d.remove_if_exists('y') # Returns False
152+
"""
153+
with self._lock:
154+
if key in self._dict:
155+
del self._dict[key]
156+
return True
157+
return False
158+
159+
def get_and_remove(self, key: K, default: Optional[V] = None) -> Optional[V]:
160+
"""
161+
Atomically get the value for a key and remove it from the dictionary.
162+
163+
This is equivalent to pop(key, default) but with a more descriptive name
164+
for atomic operations.
165+
166+
Example:
167+
d = ConcurrentDictionary({'x': 1})
168+
value = d.get_and_remove('x') # Returns 1, removes 'x'
169+
"""
170+
return self.pop(key, default)
171+
172+
def put_if_absent(self, key: K, value: V) -> Optional[V]:
173+
"""
174+
Atomically put a value for a key only if the key is not already present.
175+
176+
Returns the existing value if the key exists, None if the key was added.
177+
178+
Example:
179+
d = ConcurrentDictionary({'x': 1})
180+
existing = d.put_if_absent('x', 2) # Returns 1, no change
181+
existing = d.put_if_absent('y', 3) # Returns None, adds 'y': 3
182+
"""
183+
with self._lock:
184+
if key in self._dict:
185+
return self._dict[key]
186+
else:
187+
self._dict[key] = value
188+
return None
189+
190+
def replace_if_present(self, key: K, value: V) -> bool:
191+
"""
192+
Atomically replace the value for a key only if the key exists.
193+
194+
Returns True if the key was replaced, False if the key doesn't exist.
195+
196+
Example:
197+
d = ConcurrentDictionary({'x': 1})
198+
replaced = d.replace_if_present('x', 2) # Returns True
199+
replaced = d.replace_if_present('y', 3) # Returns False
200+
"""
201+
with self._lock:
202+
if key in self._dict:
203+
self._dict[key] = value
204+
return True
205+
return False
206+
207+
def replace_if_equal(self, key: K, old_value: V, new_value: V) -> bool:
208+
"""
209+
Atomically replace the value for a key only if the current value equals old_value.
210+
211+
Returns True if the value was replaced, False otherwise.
212+
213+
Example:
214+
d = ConcurrentDictionary({'x': 1})
215+
replaced = d.replace_if_equal('x', 1, 2) # Returns True
216+
replaced = d.replace_if_equal('x', 1, 3) # Returns False (current value is 2)
217+
"""
218+
with self._lock:
219+
if key in self._dict and self._dict[key] == old_value:
220+
self._dict[key] = new_value
221+
return True
222+
return False
126223

127224
def pop(self, key: K, default: Optional[V] = None) -> Optional[V]:
128225
with self._lock:
@@ -171,4 +268,29 @@ def __iter__(self) -> Iterator[K]:
171268

172269
def __repr__(self) -> str:
173270
with self._lock:
174-
return f"ConcurrentDictionary({self._dict!r})"
271+
return f"ConcurrentDictionary({self._dict!r})"
272+
273+
def __eq__(self, other: Any) -> bool:
274+
"""
275+
Thread-safe equality comparison.
276+
277+
Two ConcurrentDictionary instances are equal if they have the same key-value pairs.
278+
"""
279+
if not isinstance(other, ConcurrentDictionary):
280+
return False
281+
282+
with self._lock:
283+
with other._lock:
284+
return self._dict == other._dict
285+
286+
def __hash__(self) -> int:
287+
"""
288+
Thread-safe hash computation.
289+
290+
The hash is computed based on the current state of the dictionary.
291+
Note: The hash will change if the dictionary is modified.
292+
"""
293+
with self._lock:
294+
# Convert to frozenset of items for consistent hashing
295+
items = frozenset(self._dict.items())
296+
return hash(items)

0 commit comments

Comments
 (0)