I have implemented a priority queue that enable O(1) time complexity for major operations i.e. enqueue and dequeue.
It is C, POSIX compliant code.
Things to note
– It uses buffer pools
– Its behavior is to return the highest priority node from the queue
– If queue is empty/full, we use conditional waits
– Priorities are defined from 0..PRIMAX-1 as an array
– Each priority bucket has a list of node that share that priority
– It is thread-safe implementation
– It could be used between two threads for data exchange (producer/consumer)
Here is the GitHub link
/* queue.h */
#include "stdlib.h" #include "stdio.h" #include "pthread.h" #include "assert.h" #include "string.h" #define ASSERT(x) assert(x) #define PRI_MAX 10 #define BUF_POOL_SIZE 1024 #define uint32_t int #define LOG printf #define LOG2 printf #define LOG3 printf #define LOCK(x) pthread_mutex_lock(&x) #define UNLOCK(x) pthread_mutex_unlock(&x) typedef enum bool_ { false, true } bool; typedef struct queue_stats_ { int enqueue; int dequeue; int wait_time; } queue_stats; int priority[PRI_MAX]; /* * List of nodes in a hash bucket */ typedef struct node_ { int key; int priority; struct node_* next; } node; /* * Define the hash table * |p1| ->|a|->|b|->|c| * |p2|->|e|->|f| */ typedef struct ptable_entry_ { int priority; node* n; } ptable_entry; typedef struct ptable_ { ptable_entry entry[PRI_MAX]; node* last[PRI_MAX]; node* buf_pool; node* free_bbuf_pool; int ent_count; pthread_mutex_t lock; pthread_cond_t cv; bool is_available; queue_stats *stats; } ptable; void create(ptable*); void get_data(ptable*, int*, int*); void put_data(ptable*, int key, int priority); void destroy(ptable*); void display(ptable*); void put_buf(ptable* p, void* buf); void create_pool(ptable** p, uint32_t num); void* get_buf(ptable* p); void display_buf_pool(ptable* p); /* * Helper functions */ void add_a_node(ptable* p, node** last, node** m, int key, int priority); /* queue.c */ #include "queue.h" /* * Adds a node of a given priority to the queue. Since a node is * allocated from a fixed size buffer pool, this function blocks * if pool has no free buffer object. */ void add_a_node(ptable* p, node** last, node** m, int key, int priority) { ASSERT(p); LOCK(p->lock); node *n = NULL; n = (node*)get_buf(p); LOG3("oo-get_data-oo\n"); display_buf_pool(p); LOG3("---get_data--\n"); if (NULL == n) { LOG2("Buf pool is over. Waiting for dequeue\n"); pthread_cond_wait(&p->cv, &p->lock); n = (node*)get_buf(p); LOG2("Producer: wait over. Got a buffer back\n"); } /* * Collided nodes are arranged in a list (queue) */ n->key = key; n->priority = priority; n->next = NULL; if (NULL == *m) { *m = n; } else { (*last)->next = n; } *last = n; LOG("Enqueue: %d\n", p->stats->enqueue++); p->is_available = true; pthread_cond_signal(&p->cv); UNLOCK(p->lock); } /* * Gets a buffer from the buffer pool */ void* get_buf(ptable *p) { /* * Check if we have at least two nodes */ node* head = p->buf_pool; if(p->buf_pool != NULL) { p->buf_pool = head->next; LOG2("Stealing a buffer %p\n", head); return head; } else { LOG2("\nBuffer overrun\n"); return NULL; } } /* * Returns a buffer to buffer pool */ void put_buf(ptable* p, void* buf) { if (p->buf_pool) { node* head = (node*)buf; head->next = p->buf_pool; p->buf_pool = head; LOG2("Unstealing a buffer %p\n", buf); } else { p->buf_pool = buf; LOG2("Unstealing the last buffer %p\n", buf); } } void display_buf_pool(ptable* p) { ASSERT(p); int i = 1; node* temp = p->buf_pool; while(temp) { LOG2("Buf %d: %p\n", i++, temp); temp = temp->next; } } void create_pool(ptable** p, uint32_t num) { node* head= NULL; node* temp = NULL; int i = 0; head = malloc(sizeof(node)); temp = head; for(i = 1; i < num; i++) { temp->next = malloc(sizeof(node)); temp = temp->next; } temp->next = NULL; /* * Set the buf pool */ if (NULL == (*p)->buf_pool) { (*p)->buf_pool = head; } #ifdef DEBUG display_buf_pool(*p); #endif } /* * Create a priority queue object of priority ranging from 0..PRIMAX-1 */ void create(ptable* p) { ASSERT(p); int i = 0; /* * Initialize the entries */ for(i = 0; i < PRI_MAX; i++) { p->entry[i].priority = i; p->entry[i].n = NULL; p->last[i] = NULL; } create_pool(&p, BUF_POOL_SIZE); p->stats = malloc(sizeof(queue_stats)); memset ( &(p->lock), 0, sizeof(pthread_mutex_t)); memset ( &(p->cv), 0, sizeof(pthread_cond_t)); p->is_available = false; p->ent_count = PRI_MAX; } /* * Adds a node to the queue */ void put_data(ptable* p, int key, int priority) { ASSERT(p); ASSERT(priority < PRI_MAX); add_a_node(p, &(p->last[priority]), &(p->entry[priority].n), key, priority); } /* * Gets the highest priority node from the queue. If queue is empty, * then this routine blocks. */ void get_data(ptable* p, int* key, int* pri) { ASSERT(p); LOCK(p->lock); int i = 0; node* temp = NULL; wait_again: while (false == p->is_available) { /* * Else wait for the next element to get in */ LOG2("Nothing in queue; waiting for data\n"); pthread_cond_wait(&p->cv, &p->lock); LOG2("Waiting completed: got data\n"); } for (i = 0; i < PRI_MAX; i++) { if (NULL != p->entry[i].n) { temp = (p->entry[i].n); *key = p->entry[i].n->key; *pri = p->entry[i].n->priority; p->entry[i].n = temp->next; LOG(" Dequeued: %d\n", p->stats->dequeue++); put_buf(p, temp); #ifdef DEBUG LOG3("oo-put_data-oo\n"); display_buf_pool(p); LOG3("---put_data--\n"); #endif pthread_cond_signal(&p->cv); UNLOCK(p->lock); return; } } p->is_available = false; goto wait_again; } /* * Test code * Uses two threads, acting as producer and consumer */ void* producer(void* p) { ASSERT(p); ptable *table = (ptable*)p; printf("Thread producer\n"); int i = 0; while(1) { /* * We break the producer after enqueuing 16 messages */ if (i == 16) { break; } printf("Calling put_data %d\n\t", i); /* * Using max bucket as (MAX_PRI - 1) */ put_data(p, i++, (i % 9)); } } void* consumer(void* p) { sleep(2); ptable *table = (ptable*)p; int key, priority; printf("Thread consumer\n"); int i = 0; while(1) { printf("Calling get_data\n"); get_data(p, &key, &priority); printf("\nSearch-> Priority=%d key= %d\n", priority, key); /* * We break the consumer after dequeuing 16 messages. * The next call to get_data will block since there * will be no data from the producer */ if (i == 15) { break; } } } void cleanup(ptable *p) { node *n = p->buf_pool; while(n) { node* temp = n; n = n->next; free(temp); } free(p); } int main() { ptable *p = malloc(sizeof(ptable)); create(p); pthread_t thread1, thread2; int iret1, iret2; iret1 = pthread_create( &thread1, NULL, producer, (void*) p); iret2 = pthread_create( &thread2, NULL, consumer, (void*) p); display(p); pthread_join( thread1, NULL); pthread_join( thread2, NULL); cleanup(p); } /* * Function to display the queue */ void display(ptable* p) { ASSERT(p); int i = 0; node* t = NULL; for(i = 0; i < PRI_MAX; i++) { t = p->entry[i].n; while(t) { printf("\nBucket=%d|Key=%d|Priority=%d\n", p->entry[i].priority, t->key, t->priority); t = t->next; } } }
Hi,
Nice post.
From where we can get the source code?
All the “#include” statements are blank from the above.
Mickael
LikeLike
Mickael,
Thanks for pointing this out. I have fixed the problems.
Let me know.
LikeLike