Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/janus.eventhandler.rabbitmqevh.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ general: {
route_key = "janus-events" # Routing key to use when publishing messages
#exchange_type = "fanout" # Rabbitmq exchange_type can be one of the available types: direct, topic, headers and fanout (fanout by defualt).
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.
#block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established.
#declare_outgoing_queue = true # By default (for backwards compatibility), we declare an outgoing queue. Set this to false to disable that behavior

#ssl_enable = false # Whether ssl support must be enabled
Expand Down
1 change: 1 addition & 0 deletions conf/janus.transport.rabbitmq.jcfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ general: {
#queue_autodelete = false # Whether or not incoming queue should autodelete after janus disconnects from RabbitMQ
#queue_exclusive = false # Whether or not incoming queue should only allow one subscriber
#heartbeat = 60 # Defines the seconds without communication that should pass before considering the TCP connection unreachable.
#block_startup_until_connected = true # Whether to block the server from starting until a connection to the RabbitMQ server is established.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


#ssl_enabled = false # Whether ssl support must be enabled
#ssl_verify_peer = true # Whether peer verification must be enabled
Expand Down
28 changes: 27 additions & 1 deletion src/events/janus_rabbitmqevh.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ static GThread *handler_thread;
static GThread *in_thread;
static void *jns_rmqevh_hdlr(void *data);
static void *jns_rmqevh_hrtbt(void *data);
int janus_rabbitmqevh_tryconnect(void);
int janus_rabbitmqevh_connect(void);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why there's two functions here now. You made it inline in the transport, why not do the same in the event handler?


/* Queue of events to handle */
Expand Down Expand Up @@ -121,6 +122,7 @@ static gboolean ssl_verify_peer = FALSE;
static gboolean ssl_verify_hostname = FALSE;
static char *route_key = NULL, *exchange = NULL, *exchange_type = NULL ;
static uint16_t heartbeat = 0;
static gboolean block_startup_until_connected = FALSE;
static uint16_t rmqport = AMQP_PROTOCOL_PORT;
static gboolean declare_outgoing_queue = TRUE;

Expand Down Expand Up @@ -236,6 +238,10 @@ int janus_rabbitmqevh_init(const char *config_path) {
heartbeat = 0;
}

item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected");
if(item && item->value && janus_is_true(item->value))
block_startup_until_connected = TRUE;

/* SSL config*/
item = janus_config_get(config, config_general, janus_config_type_item, "ssl_enable");
if(!item || !item->value || !janus_is_true(item->value)) {
Expand Down Expand Up @@ -346,6 +352,26 @@ int janus_rabbitmqevh_init(const char *config_path) {
}

int janus_rabbitmqevh_connect(void) {
uint8_t retry_interval = 5; // Retry interval in seconds

while (!g_atomic_int_get(&stopping)) {
if (janus_rabbitmqevh_tryconnect() == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style: we prefer no space between the check (while, if etc.) and what comes next. As such, this should be

while(...

and

if(...

There's a few other occurrences of the same, so please change them there too.

return 0;
}

if (!block_startup_until_connected) {
break;
}

JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Connection failed, retrying in %d seconds\n", retry_interval);
g_usleep(retry_interval * 1000000);
}

JANUS_LOG(LOG_FATAL, "Failed to connect to RabbitMQ\n");
return -1;
}

int janus_rabbitmqevh_tryconnect(void){
rmq_conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
int status = AMQP_STATUS_OK;
Expand Down Expand Up @@ -665,7 +691,7 @@ static void *jns_rmqevh_hrtbt(void *data) {
}
if(!g_atomic_int_get(&stopping)) {
JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Trying to reconnect\n");
int result = janus_rabbitmqevh_connect();
int result = janus_rabbitmqevh_tryconnect();
if(result < 0) {
g_usleep(5000000);
} else {
Expand Down
23 changes: 22 additions & 1 deletion src/transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ static gboolean declare_outgoing_queue = FALSE, declare_outgoing_queue_admin = F
amqp_boolean_t queue_durable = 0, queue_exclusive = 0, queue_autodelete = 0,
queue_durable_admin = 0, queue_exclusive_admin = 0, queue_autodelete_admin = 0;
static uint16_t heartbeat = 0;
static gboolean block_startup_until_connected = FALSE;

/* Transport implementation */
int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_path) {
Expand Down Expand Up @@ -312,6 +313,11 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
heartbeat = 0;
}

/* Check if we need to block startup until RabbitMQ is connected */
item = janus_config_get(config, config_general, janus_config_type_item, "block_startup_until_connected");
if(item && item->value && janus_is_true(item->value))
block_startup_until_connected = TRUE;

/* Now check if the Janus API must be supported */
item = janus_config_get(config, config_general, janus_config_type_item, "enabled");
if(item == NULL) {
Expand Down Expand Up @@ -455,7 +461,22 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
rmq_client = g_malloc0(sizeof(janus_rabbitmq_client));

/* Connect */
int result = janus_rabbitmq_connect();
uint8_t retry_interval = 5; // Retry interval in seconds
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

int result = -1;
while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
result = janus_rabbitmq_connect();
if (result == 0) {
break;
}

if (!block_startup_until_connected) {
break;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these empty lines are unneeded: I think you can make this block more compact without them.

JANUS_LOG(LOG_ERR, "RabbitMQ: Connection failed, retrying in %d seconds\n", retry_interval);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be a LOG_WARN instead, since you're retrying.

g_usleep(retry_interval * 1000000);
}

if(result < 0) {
goto error;
}
Expand Down