From 52c005f9b244c21131ebcc91a44bc9f1a38bd7e3 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 21 Aug 2025 16:50:47 -0400 Subject: [PATCH 1/4] chore: upd gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index dea414f..a74ad8b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ !/extensions/available/.gitkeep !/extensions/enabled/.gitkeep +/tmp/events.bin /tmp/cache/di_* /tmp/plugin_artifacts/* /tmp/cache/database/* From 4a5f105a5221cb08a066d57e5c47b2e2d4e56203 Mon Sep 17 00:00:00 2001 From: Vladimir Barinov Date: Thu, 21 Aug 2025 18:18:04 -0400 Subject: [PATCH 2/4] feat(signaling): add experimental redis support and probably get rid of sqlite in future, cuz it sucks ass and slowing down the system :( --- chandler-example.yml | 2 ++ chandler/Signaling/SignalManager.php | 48 ++++++++++++++++++++++++++-- composer.json | 3 +- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/chandler-example.yml b/chandler-example.yml index 428ca16..7fe8f69 100644 --- a/chandler-example.yml +++ b/chandler-example.yml @@ -33,3 +33,5 @@ chandler: pass: "word" addr: "noreply@example.com" ssl: true + + redisUrl: "tcp://10.0.0.1:6379" diff --git a/chandler/Signaling/SignalManager.php b/chandler/Signaling/SignalManager.php index b38793f..459fe47 100644 --- a/chandler/Signaling/SignalManager.php +++ b/chandler/Signaling/SignalManager.php @@ -1,12 +1,14 @@ + * @author Vladimir Barinov */ class SignalManager { @@ -43,7 +45,7 @@ private function __construct() * @return array|null Array of events if there are any, null otherwise */ private function eventFor(int $for): ?array - { + { $since = $this->since - 1; $statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC"); $event = $statement->fetch(\PDO::FETCH_LAZY); @@ -66,8 +68,41 @@ private function eventFor(int $for): ?array */ function listen(\Closure $callback, int $for, int $time = 25): void { + try { + $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"], ['read_write_timeout' => $time]); + + // We will catch the old message first + $oldEvent = $this->eventFor($for); + + if ($oldEvent) { + list($id, $evt) = $oldEvent; + $id = crc32((string)$id); + $callback($evt, $id); + } + + // And then we will subscribe to user's channel + $subscriber = $redisClient->pubSubLoop(); + $subscriber->subscribe('im'.$for); + + foreach($subscriber as $event) { + if ($event->kind == 'message' && $event->channel == 'im'.$for) { + list($id, $evt) = json_decode($event->payload); + $id = crc32((string)$id); + $evt = unserialize(hex2bin($evt)); + $callback($evt, $id); + } + } + + // On timeout we're returning nothing + exit("[]"); + } + catch (Exception $e) + { + error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: ".$e->getMessage()); + } + $this->since = time() - 1; - for($i = 0; $i < $time; $i++) { + for($i = 0; $i < ($time / 5); $i++) { sleep(1); $event = $this->eventFor($for); @@ -131,7 +166,16 @@ function triggerEvent(object $event, int $for): bool $event = bin2hex(serialize($event)); $since = time(); + // add it to the history $this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')"); + $id = $this->connection->lastInsertId(); + + try { + $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); + $redisClient->publish('im'.$for, json_encode([$id, $event])); + } catch (Exception $e) { + error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage()); + } return true; } diff --git a/composer.json b/composer.json index 7ed21c6..31abd70 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,8 @@ "symfony/yaml": "^5.3", "guzzlehttp/guzzle": "^6.0", "wildbit/postmark-php": "^4.0", - "tracy/tracy": "^2.10" + "tracy/tracy": "^2.10", + "predis/predis": "^3.2" }, "suggest": { "ext-yaml": "for faster yaml parsing" From 419ccea795e2f6c07446059ac593fe3e0362e962 Mon Sep 17 00:00:00 2001 From: Vladimir Barinov Date: Sun, 22 Feb 2026 17:09:34 +0300 Subject: [PATCH 3/4] feat(signaling): redis will handle history, add some mechanisms chore: composer --- chandler/Signaling/SignalManager.php | 169 +++++++++++++++++++++------ composer.json | 2 +- composer.lock | 67 ++++++++++- 3 files changed, 198 insertions(+), 40 deletions(-) diff --git a/chandler/Signaling/SignalManager.php b/chandler/Signaling/SignalManager.php index 1dc2e56..ab705b4 100644 --- a/chandler/Signaling/SignalManager.php +++ b/chandler/Signaling/SignalManager.php @@ -17,14 +17,26 @@ class SignalManager { use TSimpleSingleton; + /** * @var int Latest event timestamp. */ private $since; + /** * @var \PDO PDO Connection to events SQLite DB. */ private $connection; + + /** + * @var RedisClient|null Redis client for events. + */ + private $redisClient; + + /** + * @var bool Whether to use Redis for operations. + */ + private $useRedis = false; /** * @internal @@ -32,13 +44,31 @@ class SignalManager private function __construct() { $this->since = time(); - $this->connection = new \PDO( - 'sqlite:' . CHANDLER_ROOT . '/tmp/events.bin', - null, - null, - [\PDO::ATTR_PERSISTENT => true] - ); - $this->connection->query("CREATE TABLE IF NOT EXISTS pool(id INTEGER PRIMARY KEY AUTOINCREMENT, since INTEGER, for INTEGER, event TEXT);"); + + // Check if Redis is configured + if (!empty(CHANDLER_ROOT_CONF["redisUrl"])) { + try { + $this->redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); + $this->useRedis = true; + // Test connection + $this->redisClient->ping(); + } catch (\Exception $e) { + error_log("Redis connection failed, falling back to SQLite: " . $e->getMessage()); + $this->useRedis = false; + $this->redisClient = null; + } + } + + // Initialize SQLite connection (fallback) + if (!$this->useRedis) { + $this->connection = new \PDO( + 'sqlite:' . CHANDLER_ROOT . '/tmp/events.bin', + null, + null, + [\PDO::ATTR_PERSISTENT => true] + ); + $this->connection->query("CREATE TABLE IF NOT EXISTS pool(id INTEGER PRIMARY KEY AUTOINCREMENT, since INTEGER, for INTEGER, event TEXT);"); + } } /** @@ -50,16 +80,32 @@ private function __construct() * @return array|null Array of events if there are any, null otherwise */ private function eventFor(int $for): ?array - { - $since = $this->since - 1; - $statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC"); - $event = $statement->fetch(\PDO::FETCH_LAZY); - if (!$event) { - return null; - } + { + if ($this->useRedis && $this->redisClient) { + // Use Redis + $since = $this->since - 1; + $key = "events:$for"; + $events = $this->redisClient->zrevrangebyscore($key, '+inf', $since, ['LIMIT' => [0, 1]]); + + if (empty($events)) { + return null; + } + + $eventData = json_decode($events[0], true); + $this->since = time(); + return [$eventData['id'], unserialize(hex2bin($eventData['event']))]; + } else { + // Use SQLite + $since = $this->since - 1; + $statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC"); + $event = $statement->fetch(\PDO::FETCH_LAZY); + if (!$event) { + return null; + } - $this->since = time(); - return [$event->id, unserialize(hex2bin($event->event))]; + $this->since = time(); + return [$event->id, unserialize(hex2bin($event->event))]; + } } /** @@ -135,13 +181,27 @@ public function listen(\Closure $callback, int $for, int $time = 25): void */ public function tipFor(int $for): int { - $statement = $this->connection->query("SELECT since FROM pool WHERE `for` = $for ORDER BY since DESC"); - $result = $statement->fetch(\PDO::FETCH_LAZY); - if (!$result) { - return 1; - } + if ($this->useRedis && $this->redisClient) { + // Use Redis + $key = "events:$for"; + $events = $this->redisClient->zrevrange($key, 0, 0, ['WITHSCORES' => true]); + + if (empty($events)) { + return 1; + } + + // Return the score (timestamp) of the latest event + return (int) reset($events); + } else { + // Use SQLite + $statement = $this->connection->query("SELECT since FROM pool WHERE `for` = $for ORDER BY since DESC"); + $result = $statement->fetch(\PDO::FETCH_LAZY); + if (!$result) { + return 1; + } - return $result->since; + return $result->since; + } } /** @@ -155,14 +215,32 @@ public function tipFor(int $for): int */ public function getHistoryFor(int $for, ?int $tip = null, int $limit = 1000): array { - $res = []; - $tip ??= $this->tipFor($for); - $query = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $tip ORDER BY since DESC LIMIT $limit"); - foreach ($query as $event) { - $res[] = unserialize(hex2bin($event["event"])); - } + if ($this->useRedis && $this->redisClient) { + // Use Redis + $res = []; + $tip ??= $this->tipFor($for); + $key = "events:$for"; + + // Get events with scores greater than tip + $events = $this->redisClient->zrevrangebyscore($key, '+inf', $tip, ['LIMIT' => [0, $limit]]); + + foreach ($events as $eventJson) { + $eventData = json_decode($eventJson, true); + $res[] = unserialize(hex2bin($eventData['event'])); + } + + return $res; + } else { + // Use SQLite + $res = []; + $tip ??= $this->tipFor($for); + $query = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $tip ORDER BY since DESC LIMIT $limit"); + foreach ($query as $event) { + $res[] = unserialize(hex2bin($event["event"])); + } - return $res; + return $res; + } } /** @@ -177,17 +255,34 @@ public function triggerEvent(object $event, int $for): bool { $event = bin2hex(serialize($event)); $since = time(); + $id = null; - // add it to the history - $this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')"); - $id = $this->connection->lastInsertId(); + if ($this->useRedis && $this->redisClient) { + // Use Redis + $key = "events:$for"; + $id = time() . rand(1000, 9999); // Simple ID generation + $eventData = [ + 'id' => $id, + 'event' => $event, + 'since' => $since + ]; + $this->redisClient->zadd($key, $since, json_encode($eventData)); + } else { + // Use SQLite + $this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')"); + $id = $this->connection->lastInsertId(); + } - try { - $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); - $redisClient->publish('im'.$for, json_encode([$id, $event])); - } catch (Exception $e) { - error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage()); + // Try to publish to Redis for real-time notifications (existing behavior) + if ($id !== null) { + try { + $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); + $redisClient->publish('im'.$for, json_encode([$id, $event])); + } catch (Exception $e) { + error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage()); + } } + return true; } } diff --git a/composer.json b/composer.json index 88dd4d6..10caa0c 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,7 @@ "symfony/yaml": "^7.3", "wildbit/postmark-php": "^7.0", "tracy/tracy": "^2.11", - "predis/predis": "^3.2" + "predis/predis": "^3.4" }, "suggest": { "ext-yaml": "for faster yaml parsing" diff --git a/composer.lock b/composer.lock index 4c29627..fcaa534 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "5387d7805bbcc789e0484cbc3159d4a4", + "content-hash": "bd0a1816f4f0697e5ac163c793348fd1", "packages": [ { "name": "doctrine/deprecations", @@ -1273,6 +1273,69 @@ }, "time": "2025-10-31T00:45:47+00:00" }, + { + "name": "predis/predis", + "version": "v3.4.0", + "source": { + "type": "git", + "url": "https://github.com/predis/predis.git", + "reference": "1183f5732e6b10efd33f64984a96726eaecb59aa" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/predis/predis/zipball/1183f5732e6b10efd33f64984a96726eaecb59aa", + "reference": "1183f5732e6b10efd33f64984a96726eaecb59aa", + "shasum": "" + }, + "require": { + "php": "^7.2 || ^8.0", + "psr/http-message": "^1.0|^2.0" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "^3.3", + "phpstan/phpstan": "^1.9", + "phpunit/phpcov": "^6.0 || ^8.0", + "phpunit/phpunit": "^8.0 || ~9.4.4" + }, + "suggest": { + "ext-relay": "Faster connection with in-memory caching (>=0.6.2)" + }, + "type": "library", + "autoload": { + "psr-4": { + "Predis\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Till Krüss", + "homepage": "https://till.im", + "role": "Maintainer" + } + ], + "description": "A flexible and feature-complete Redis/Valkey client for PHP.", + "homepage": "http://github.com/predis/predis", + "keywords": [ + "nosql", + "predis", + "redis" + ], + "support": { + "issues": "https://github.com/predis/predis/issues", + "source": "https://github.com/predis/predis/tree/v3.4.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/tillkruss", + "type": "github" + } + ], + "time": "2026-02-11T17:30:28+00:00" + }, { "name": "psr/http-client", "version": "1.0.3", @@ -4590,5 +4653,5 @@ "php": "~8.2" }, "platform-dev": {}, - "plugin-api-version": "2.9.0" + "plugin-api-version": "2.6.0" } From 14f23e8066eb9c014e3dbd33df5c3173d5f48f90 Mon Sep 17 00:00:00 2001 From: Vladimir Barinov Date: Sun, 22 Feb 2026 17:12:29 +0300 Subject: [PATCH 4/4] chore: lint --- chandler/Signaling/SignalManager.php | 58 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/chandler/Signaling/SignalManager.php b/chandler/Signaling/SignalManager.php index ab705b4..659891c 100644 --- a/chandler/Signaling/SignalManager.php +++ b/chandler/Signaling/SignalManager.php @@ -17,22 +17,22 @@ class SignalManager { use TSimpleSingleton; - + /** * @var int Latest event timestamp. */ private $since; - + /** * @var \PDO PDO Connection to events SQLite DB. */ private $connection; - + /** * @var RedisClient|null Redis client for events. */ private $redisClient; - + /** * @var bool Whether to use Redis for operations. */ @@ -44,7 +44,7 @@ class SignalManager private function __construct() { $this->since = time(); - + // Check if Redis is configured if (!empty(CHANDLER_ROOT_CONF["redisUrl"])) { try { @@ -58,7 +58,7 @@ private function __construct() $this->redisClient = null; } } - + // Initialize SQLite connection (fallback) if (!$this->useRedis) { $this->connection = new \PDO( @@ -86,11 +86,11 @@ private function eventFor(int $for): ?array $since = $this->since - 1; $key = "events:$for"; $events = $this->redisClient->zrevrangebyscore($key, '+inf', $since, ['LIMIT' => [0, 1]]); - + if (empty($events)) { return null; } - + $eventData = json_decode($events[0], true); $this->since = time(); return [$eventData['id'], unserialize(hex2bin($eventData['event']))]; @@ -126,21 +126,21 @@ public function listen(\Closure $callback, int $for, int $time = 25): void // We will catch the old message first $oldEvent = $this->eventFor($for); - + if ($oldEvent) { - list($id, $evt) = $oldEvent; - $id = crc32((string)$id); + [$id, $evt] = $oldEvent; + $id = crc32((string) $id); $callback($evt, $id); } // And then we will subscribe to user's channel $subscriber = $redisClient->pubSubLoop(); - $subscriber->subscribe('im'.$for); + $subscriber->subscribe('im' . $for); - foreach($subscriber as $event) { - if ($event->kind == 'message' && $event->channel == 'im'.$for) { - list($id, $evt) = json_decode($event->payload); - $id = crc32((string)$id); + foreach ($subscriber as $event) { + if ($event->kind == 'message' && $event->channel == 'im' . $for) { + [$id, $evt] = json_decode($event->payload); + $id = crc32((string) $id); $evt = unserialize(hex2bin($evt)); $callback($evt, $id); } @@ -148,10 +148,8 @@ public function listen(\Closure $callback, int $for, int $time = 25): void // On timeout we're returning nothing exit("[]"); - } - catch (Exception $e) - { - error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: ".$e->getMessage()); + } catch (Exception $e) { + error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: " . $e->getMessage()); } $this->since = time() - 1; @@ -185,11 +183,11 @@ public function tipFor(int $for): int // Use Redis $key = "events:$for"; $events = $this->redisClient->zrevrange($key, 0, 0, ['WITHSCORES' => true]); - + if (empty($events)) { return 1; } - + // Return the score (timestamp) of the latest event return (int) reset($events); } else { @@ -220,15 +218,15 @@ public function getHistoryFor(int $for, ?int $tip = null, int $limit = 1000): ar $res = []; $tip ??= $this->tipFor($for); $key = "events:$for"; - + // Get events with scores greater than tip $events = $this->redisClient->zrevrangebyscore($key, '+inf', $tip, ['LIMIT' => [0, $limit]]); - + foreach ($events as $eventJson) { $eventData = json_decode($eventJson, true); $res[] = unserialize(hex2bin($eventData['event'])); } - + return $res; } else { // Use SQLite @@ -256,7 +254,7 @@ public function triggerEvent(object $event, int $for): bool $event = bin2hex(serialize($event)); $since = time(); $id = null; - + if ($this->useRedis && $this->redisClient) { // Use Redis $key = "events:$for"; @@ -264,7 +262,7 @@ public function triggerEvent(object $event, int $for): bool $eventData = [ 'id' => $id, 'event' => $event, - 'since' => $since + 'since' => $since, ]; $this->redisClient->zadd($key, $since, json_encode($eventData)); } else { @@ -277,12 +275,12 @@ public function triggerEvent(object $event, int $for): bool if ($id !== null) { try { $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); - $redisClient->publish('im'.$for, json_encode([$id, $event])); + $redisClient->publish('im' . $for, json_encode([$id, $event])); } catch (Exception $e) { - error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage()); + error_log("Couldn't connect to Redis server and push the event. Exception Message: " . $e->getMessage()); } } - + return true; } }