Skip to content

Commit d9652b3

Browse files
committed
Add rtpp_queue_get_items_by(). Rework single-packet method to
use buld methods under the hood.
1 parent 7d543d9 commit d9652b3

File tree

2 files changed

+50
-84
lines changed

2 files changed

+50
-84
lines changed

src/rtpp_queue.c

Lines changed: 48 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -82,29 +82,6 @@ circ_buf_push(circ_buf_t *c, struct rtpp_wi *data)
8282
return(0); /* return success to indicate successful push. */
8383
}
8484

85-
static int
86-
circ_buf_pop(circ_buf_t *c, struct rtpp_wi **data)
87-
{
88-
unsigned int next;
89-
90-
if (circ_buf_isempty(c))
91-
return(-1);
92-
93-
next = c->tail + 1; /* next is where tail will point to after this read. */
94-
if (next == c->buflen)
95-
next = 0;
96-
#if RTPQ_DEBUG
97-
assert(c->tail < c->buflen);
98-
#endif
99-
100-
*data = c->buffer[c->tail]; /* Read data and then move */
101-
#if RTPQ_DEBUG
102-
c->buffer[c->tail] = NULL;
103-
#endif
104-
c->tail = next; /* tail to next offset. */
105-
return(0); /* return success to indicate successful pop. */
106-
}
107-
10885
static unsigned int
10986
circ_buf_popmany(circ_buf_t *c, struct rtpp_wi *data[], unsigned int howmany)
11087
{
@@ -341,6 +318,30 @@ rtpp_queue_getclen(const struct rtpp_queue *queue)
341318
return (clen);
342319
}
343320

321+
static int
322+
rtpp_queue_extract_items(struct rtpp_queue *queue, struct rtpp_wi **items, int ilen)
323+
{
324+
int i, j;
325+
326+
i = circ_buf_popmany(&queue->circb, items, ilen);
327+
if ((i == ilen) || (queue->length == 0))
328+
return (i);
329+
items += i;
330+
ilen -= i;
331+
for (j = 0; j < ilen; j++) {
332+
items[j] = queue->head;
333+
queue->head = items[j]->next;
334+
if (queue->head == NULL) {
335+
queue->tail = NULL;
336+
j += 1;
337+
break;
338+
}
339+
}
340+
queue->length -= j;
341+
i += j;
342+
return (i);
343+
}
344+
344345
unsigned int
345346
rtpp_queue_setqlen(struct rtpp_queue *queue, unsigned int qlen)
346347
{
@@ -413,29 +414,9 @@ rtpp_queue_get_item_by(struct rtpp_queue *queue, struct timespec *deadline, int
413414
{
414415
struct rtpp_wi *wi;
415416

416-
pthread_mutex_lock(&queue->mutex);
417-
while (rtpp_queue_getclen(queue) == 0) {
418-
int rc = pthread_cond_timedwait(&queue->cond, &queue->mutex, deadline);
419-
if (rval != NULL)
420-
*rval = rc;
421-
pthread_mutex_unlock(&queue->mutex);
417+
if (rtpp_queue_get_items_by(queue, &wi, 1, deadline, rval) == 0)
422418
return (NULL);
423-
}
424-
#if RTPQ_DEBUG
425-
assert(rtpp_queue_getclen(queue) > 0);
426-
#endif
427-
if (circ_buf_pop(&queue->circb, &wi) == 0) {
428-
pthread_mutex_unlock(&queue->mutex);
429-
return (wi);
430-
}
431-
wi = queue->head;
432-
#if RTPQ_DEBUG
433-
assert(rtpp_queue_getclen(queue) > 0);
434-
#endif
435-
RTPPQ_REMOVE_HEAD(queue);
436-
pthread_mutex_unlock(&queue->mutex);
437419
wi->next = NULL;
438-
439420
return (wi);
440421
}
441422

@@ -444,36 +425,16 @@ rtpp_queue_get_item(struct rtpp_queue *queue, int return_on_wake)
444425
{
445426
struct rtpp_wi *wi;
446427

447-
pthread_mutex_lock(&queue->mutex);
448-
while (rtpp_queue_getclen(queue) == 0) {
449-
pthread_cond_wait(&queue->cond, &queue->mutex);
450-
if (rtpp_queue_getclen(queue) == 0 && return_on_wake != 0) {
451-
pthread_mutex_unlock(&queue->mutex);
452-
return (NULL);
453-
}
454-
}
455-
#if RTPQ_DEBUG
456-
assert(rtpp_queue_getclen(queue) > 0);
457-
#endif
458-
if (circ_buf_pop(&queue->circb, &wi) == 0) {
459-
pthread_mutex_unlock(&queue->mutex);
460-
return (wi);
461-
}
462-
wi = queue->head;
463-
#if RTPQ_DEBUG
464-
assert(rtpp_queue_getclen(queue) > 0);
465-
#endif
466-
RTPPQ_REMOVE_HEAD(queue);
467-
pthread_mutex_unlock(&queue->mutex);
428+
if (rtpp_queue_get_items(queue, &wi, 1, return_on_wake) == 0)
429+
return (NULL);
468430
wi->next = NULL;
469-
470431
return (wi);
471432
}
472433

473434
int
474435
rtpp_queue_get_items(struct rtpp_queue *queue, struct rtpp_wi **items, int ilen, int return_on_wake)
475436
{
476-
int i, j;
437+
int i;
477438

478439
pthread_mutex_lock(&queue->mutex);
479440
while (rtpp_queue_getclen(queue) == 0) {
@@ -483,26 +444,29 @@ rtpp_queue_get_items(struct rtpp_queue *queue, struct rtpp_wi **items, int ilen,
483444
return (0);
484445
}
485446
}
486-
/* Pull out of circular buffer first */
487-
i = circ_buf_popmany(&queue->circb, items, ilen);
488-
if ((i == ilen) || (queue->length == 0))
489-
goto done;
490-
items += i;
491-
ilen -= i;
492-
for (j = 0; j < ilen; j++) {
493-
items[j] = queue->head;
494-
queue->head = items[j]->next;
495-
if (queue->head == NULL) {
496-
queue->tail = NULL;
497-
j += 1;
447+
i = rtpp_queue_extract_items(queue, items, ilen);
448+
pthread_mutex_unlock(&queue->mutex);
449+
return (i);
450+
}
451+
452+
int
453+
rtpp_queue_get_items_by(struct rtpp_queue *queue, struct rtpp_wi **items, int ilen,
454+
struct timespec *deadline, int *rval)
455+
{
456+
int i, rc;
457+
458+
pthread_mutex_lock(&queue->mutex);
459+
while (rtpp_queue_getclen(queue) == 0) {
460+
rc = pthread_cond_timedwait(&queue->cond, &queue->mutex, deadline);
461+
if (rtpp_queue_getclen(queue) != 0)
498462
break;
499-
}
463+
if (rval != NULL)
464+
*rval = rc;
465+
pthread_mutex_unlock(&queue->mutex);
466+
return (0);
500467
}
501-
queue->length -= j;
502-
i += j;
503-
done:
468+
i = rtpp_queue_extract_items(queue, items, ilen);
504469
pthread_mutex_unlock(&queue->mutex);
505-
506470
return (i);
507471
}
508472

src/rtpp_queue.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ struct rtpp_wi *rtpp_queue_get_item(struct rtpp_queue *queue, int return_on_wake
8686
struct rtpp_wi *rtpp_queue_get_item_by(struct rtpp_queue *queue, struct timespec *,
8787
int *);
8888
int rtpp_queue_get_items(struct rtpp_queue *, struct rtpp_wi **, int, int);
89+
int rtpp_queue_get_items_by(struct rtpp_queue *, struct rtpp_wi **, int,
90+
struct timespec *, int *);
8991
int rtpp_queue_get_length(struct rtpp_queue *);
9092
unsigned int rtpp_queue_setqlen(struct rtpp_queue *, unsigned int);
9193

0 commit comments

Comments
 (0)