rdex-client: untested pthread fifo for consumer callbacks
[rdex:client.git] / src / pfifo.c
1 /* =====================================================================
2 rdex -- reaction-diffusion explorer
3 Copyright (C) 2009  Claude Heiland-Allen <claudiusmaximus@goto10.org>
4 ------------------------------------------------------------------------
5 pthread-based fifo
6 ===================================================================== */
7
8 #include <assert.h>
9 #include <string.h>
10 #include "pfifo.h"
11
12 //======================================================================
13 // prototypes
14 void *pfifo_consumerthread(void *);
15
16 //======================================================================
17 // create a new fifo with a consumer thread
18 struct pfifo *pfifo_create(pfifo_consumer *consumer, void *consumerdata) {
19   struct pfifo *p = malloc(sizeof(struct pfifo));
20   if (!p) return 0;
21   list_init(&(p->list));
22   p->consumer = consumer;
23   p->consumerdata = consumerdata;
24   pthread_create(&p->thread, 0, pfifo_consumerthread, p);
25   return p;
26 }
27
28 //======================================================================
29 // append to the fifo, copying data
30 void pfifo_enqueue(struct pfifo *p, size_t length, const void *data) {
31   struct pfifo_node *n = malloc(sizeof(struct pfifo_node));
32   assert(n);
33   n->length = length;
34   n->data = malloc(length);
35   assert(n->data);
36   memcpy(n->data, data, length);
37   pthread_mutex_lock(p->mutex);
38   list_inserttail(&(p->list), &(n->node));
39   pthread_mutex_unlock(p->mutex);
40   pthread_cond_signal(p->nonempty);
41 }
42
43 //======================================================================
44 // loop running consumer callback on each fifo item, freeing data
45 void *pfifo_consumerthread(void *fifo) {
46   struct pfifo *p = fifo;
47   while (1) {
48     pthread_mutex_lock(p->mutex);
49     while (list_isempty(&(p->list))) pthread_cond_wait(p->nonempty, p->mutex);
50     struct pfifo_node *n = (struct pfifo_node *) list_removehead(&(p->list));
51     pthread_mutex_unlock(p->mutex);
52     p->consumer(p->consumerdata, n->length, n->data);
53     free(n->data);
54     free(n);
55   }
56 }
57
58
59 // EOF