Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
!/extensions/enabled/.gitkeep

/tmp
/tmp/events.bin

/htdocs/*
!/htdocs/.htaccess
Expand Down
2 changes: 2 additions & 0 deletions chandler-example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ chandler:
pass: "word"
addr: "noreply@example.com"
ssl: true

redisUrl: "tcp://10.0.0.1:6379"
197 changes: 167 additions & 30 deletions chandler/Signaling/SignalManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,70 @@
namespace Chandler\Signaling;

use Chandler\Patterns\TSimpleSingleton;
use Predis\Client as RedisClient;

/**
* Signal manager (singleton).
* Signals are events, that are meant to be recieved by end user.
*
* @author kurotsun <celestine@vriska.ru>
* @author Vladimir Barinov <veselcraft@icloud.com>
*/
class SignalManager
{
use TSimpleSingleton;

/**
* @var int Latest event timestamp.
*/
private $since;

/**
* @var \PDO PDO Connection to events SQLite DB.
*/
private $connection;

/**
* @var RedisClient|null Redis client for events.
*/
private $redisClient;

/**
* @var bool Whether to use Redis for operations.
*/
private $useRedis = false;

/**
* @internal
*/
private function __construct()
{
$this->since = time();
$this->connection = new \PDO(
'sqlite:' . CHANDLER_ROOT . '/tmp/events.bin',
null,
null,
[\PDO::ATTR_PERSISTENT => true]
);
$this->connection->query("CREATE TABLE IF NOT EXISTS pool(id INTEGER PRIMARY KEY AUTOINCREMENT, since INTEGER, for INTEGER, event TEXT);");

// Check if Redis is configured
if (!empty(CHANDLER_ROOT_CONF["redisUrl"])) {
try {
$this->redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]);
$this->useRedis = true;
// Test connection
$this->redisClient->ping();
} catch (\Exception $e) {
error_log("Redis connection failed, falling back to SQLite: " . $e->getMessage());
$this->useRedis = false;
$this->redisClient = null;
}
}

// Initialize SQLite connection (fallback)
if (!$this->useRedis) {
$this->connection = new \PDO(
'sqlite:' . CHANDLER_ROOT . '/tmp/events.bin',
null,
null,
[\PDO::ATTR_PERSISTENT => true]
);
$this->connection->query("CREATE TABLE IF NOT EXISTS pool(id INTEGER PRIMARY KEY AUTOINCREMENT, since INTEGER, for INTEGER, event TEXT);");
}
}

/**
Expand All @@ -49,15 +81,31 @@ private function __construct()
*/
private function eventFor(int $for): ?array
{
$since = $this->since - 1;
$statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC");
$event = $statement->fetch(\PDO::FETCH_LAZY);
if (!$event) {
return null;
}
if ($this->useRedis && $this->redisClient) {
// Use Redis
$since = $this->since - 1;
$key = "events:$for";
$events = $this->redisClient->zrevrangebyscore($key, '+inf', $since, ['LIMIT' => [0, 1]]);

$this->since = time();
return [$event->id, unserialize(hex2bin($event->event))];
if (empty($events)) {
return null;
}

$eventData = json_decode($events[0], true);
$this->since = time();
return [$eventData['id'], unserialize(hex2bin($eventData['event']))];
} else {
// Use SQLite
$since = $this->since - 1;
$statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC");
$event = $statement->fetch(\PDO::FETCH_LAZY);
if (!$event) {
return null;
}

$this->since = time();
return [$event->id, unserialize(hex2bin($event->event))];
}
}

/**
Expand All @@ -73,8 +121,39 @@ private function eventFor(int $for): ?array
*/
public function listen(\Closure $callback, int $for, int $time = 25): void
{
try {
$redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"], ['read_write_timeout' => $time]);

// We will catch the old message first
$oldEvent = $this->eventFor($for);

if ($oldEvent) {
[$id, $evt] = $oldEvent;
$id = crc32((string) $id);
$callback($evt, $id);
}

// And then we will subscribe to user's channel
$subscriber = $redisClient->pubSubLoop();
$subscriber->subscribe('im' . $for);

foreach ($subscriber as $event) {
if ($event->kind == 'message' && $event->channel == 'im' . $for) {
[$id, $evt] = json_decode($event->payload);
$id = crc32((string) $id);
$evt = unserialize(hex2bin($evt));
$callback($evt, $id);
}
}

// On timeout we're returning nothing
exit("[]");
} catch (Exception $e) {
error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: " . $e->getMessage());
}

$this->since = time() - 1;
for ($i = 0; $i < $time; $i++) {
for ($i = 0; $i < ($time / 5); $i++) {
sleep(1);

$event = $this->eventFor($for);
Expand All @@ -100,13 +179,27 @@ public function listen(\Closure $callback, int $for, int $time = 25): void
*/
public function tipFor(int $for): int
{
$statement = $this->connection->query("SELECT since FROM pool WHERE `for` = $for ORDER BY since DESC");
$result = $statement->fetch(\PDO::FETCH_LAZY);
if (!$result) {
return 1;
}
if ($this->useRedis && $this->redisClient) {
// Use Redis
$key = "events:$for";
$events = $this->redisClient->zrevrange($key, 0, 0, ['WITHSCORES' => true]);

if (empty($events)) {
return 1;
}

return $result->since;
// Return the score (timestamp) of the latest event
return (int) reset($events);
} else {
// Use SQLite
$statement = $this->connection->query("SELECT since FROM pool WHERE `for` = $for ORDER BY since DESC");
$result = $statement->fetch(\PDO::FETCH_LAZY);
if (!$result) {
return 1;
}

return $result->since;
}
}

/**
Expand All @@ -120,14 +213,32 @@ public function tipFor(int $for): int
*/
public function getHistoryFor(int $for, ?int $tip = null, int $limit = 1000): array
{
$res = [];
$tip ??= $this->tipFor($for);
$query = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $tip ORDER BY since DESC LIMIT $limit");
foreach ($query as $event) {
$res[] = unserialize(hex2bin($event["event"]));
}
if ($this->useRedis && $this->redisClient) {
// Use Redis
$res = [];
$tip ??= $this->tipFor($for);
$key = "events:$for";

// Get events with scores greater than tip
$events = $this->redisClient->zrevrangebyscore($key, '+inf', $tip, ['LIMIT' => [0, $limit]]);

foreach ($events as $eventJson) {
$eventData = json_decode($eventJson, true);
$res[] = unserialize(hex2bin($eventData['event']));
}

return $res;
} else {
// Use SQLite
$res = [];
$tip ??= $this->tipFor($for);
$query = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $tip ORDER BY since DESC LIMIT $limit");
foreach ($query as $event) {
$res[] = unserialize(hex2bin($event["event"]));
}

return $res;
return $res;
}
}

/**
Expand All @@ -142,8 +253,34 @@ public function triggerEvent(object $event, int $for): bool
{
$event = bin2hex(serialize($event));
$since = time();
$id = null;

if ($this->useRedis && $this->redisClient) {
// Use Redis
$key = "events:$for";
$id = time() . rand(1000, 9999); // Simple ID generation
$eventData = [
'id' => $id,
'event' => $event,
'since' => $since,
];
$this->redisClient->zadd($key, $since, json_encode($eventData));
} else {
// Use SQLite
$this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')");
$id = $this->connection->lastInsertId();
}

// Try to publish to Redis for real-time notifications (existing behavior)
if ($id !== null) {
try {
$redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]);
$redisClient->publish('im' . $for, json_encode([$id, $event]));
} catch (Exception $e) {
error_log("Couldn't connect to Redis server and push the event. Exception Message: " . $e->getMessage());
}
}

$this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')");
return true;
}
}
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"firebase/php-jwt": "^6.11",
"symfony/yaml": "^7.3",
"wildbit/postmark-php": "^7.0",
"tracy/tracy": "^2.11"
"tracy/tracy": "^2.11",
"predis/predis": "^3.4"
},
"suggest": {
"ext-yaml": "for faster yaml parsing"
Expand Down
67 changes: 65 additions & 2 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.