A priority queue with O(1) operations: C, Linux implementation

I have implemented a priority queue that enable O(1) time complexity for major operations i.e. enqueue and dequeue.
It is C, POSIX compliant code.

Things to note
– It uses buffer pools
– Its behavior is to return the highest priority node from the queue
– If queue is empty/full, we use conditional waits
– Priorities are defined from 0..PRIMAX-1 as an array
– Each priority bucket has a list of node that share that priority
– It is thread-safe implementation
– It could be used between two threads for data exchange (producer/consumer)

Here is the GitHub link

/* queue.h */

#include "stdlib.h"
#include "stdio.h"
#include "pthread.h"
#include "assert.h"
#include "string.h"

#define ASSERT(x) assert(x)
#define PRI_MAX 10
#define BUF_POOL_SIZE 1024
#define uint32_t int
#define LOG printf
#define LOG2 printf
#define LOG3 printf
#define LOCK(x) pthread_mutex_lock(&x)
#define UNLOCK(x) pthread_mutex_unlock(&x)

typedef enum bool_ {
  false,
  true
} bool;

typedef struct queue_stats_ {
  int enqueue;
  int dequeue;
  int wait_time;
} queue_stats;

int priority[PRI_MAX];

/*
 * List of nodes in a hash bucket
 */
 typedef struct node_ {
  int key;
  int priority;
  struct node_* next;
} node;

/*
 * Define the hash table
 * |p1| ->|a|->|b|->|c|
 * |p2|->|e|->|f|
 */
 typedef struct ptable_entry_ {
  int priority;
  node* n;
} ptable_entry;

typedef struct ptable_ {
  ptable_entry entry[PRI_MAX];
  node* last[PRI_MAX];
  node* buf_pool;
  node* free_bbuf_pool;
  int ent_count;
  pthread_mutex_t lock;
  pthread_cond_t cv;
  bool is_available;
  queue_stats *stats;
} ptable;

void create(ptable*);
void get_data(ptable*, int*, int*);
void put_data(ptable*, int key, int priority);
void destroy(ptable*);
void display(ptable*);
void put_buf(ptable* p, void* buf);
void create_pool(ptable** p, uint32_t num);
void* get_buf(ptable* p);
void display_buf_pool(ptable* p);

/*
 * Helper functions
 */

void add_a_node(ptable* p, node** last, node** m, int key, int priority);

/* queue.c */

#include "queue.h"

/*
 * Adds a node of a given priority to the queue. Since a node is
 * allocated from a fixed size buffer pool, this function blocks
 * if pool has no free buffer object. 
 */
void add_a_node(ptable* p, node** last, node** m, int key, int priority)
{
  ASSERT(p);

  LOCK(p->lock);
  node *n = NULL;

  n = (node*)get_buf(p);

  LOG3("oo-get_data-oo\n");
  display_buf_pool(p);
  LOG3("---get_data--\n");

  if (NULL == n) {
    LOG2("Buf pool is over. Waiting for dequeue\n");
    pthread_cond_wait(&p->cv, &p->lock);
    n = (node*)get_buf(p);
    LOG2("Producer: wait over. Got a buffer back\n");
  }

  /*
   * Collided nodes are arranged in a list (queue)
   */
  n->key = key;
  n->priority = priority;
  n->next = NULL;

  if (NULL == *m) {
    *m = n;
  } else {
    (*last)->next = n;
  }

  *last = n;
  LOG("Enqueue: %d\n", p->stats->enqueue++);

  p->is_available = true;
  pthread_cond_signal(&p->cv);
  UNLOCK(p->lock);
}

/*
 * Gets a buffer from the buffer pool
 */
void* get_buf(ptable *p)
{
  /*
   * Check if we have at least two nodes
   */
  node* head = p->buf_pool;

  if(p->buf_pool != NULL) {
    p->buf_pool = head->next;
    LOG2("Stealing a buffer %p\n", head);
    return head;
  } else {
    LOG2("\nBuffer overrun\n");
    return NULL;
  }
}

/*
 * Returns a buffer to buffer pool
 */
void put_buf(ptable* p, void* buf)
{
  if (p->buf_pool) {
    node* head = (node*)buf;
    head->next = p->buf_pool;
    p->buf_pool = head;
    LOG2("Unstealing a buffer %p\n", buf);
  } else {
    p->buf_pool = buf;
    LOG2("Unstealing the last buffer %p\n", buf);
  }
}

void display_buf_pool(ptable* p)
{
  ASSERT(p);

  int i = 1;
  node* temp = p->buf_pool;

  while(temp) {
    LOG2("Buf %d: %p\n", i++, temp);
    temp = temp->next;
  }
}

void create_pool(ptable** p, uint32_t num)
{
  node* head= NULL;
  node* temp = NULL;

  int i = 0;

  head = malloc(sizeof(node));

  temp = head;

  for(i = 1; i < num; i++) {
    temp->next = malloc(sizeof(node));
    temp = temp->next;
  }
  temp->next = NULL;

  /*
   * Set the buf pool
   */
  if (NULL == (*p)->buf_pool) {
    (*p)->buf_pool = head;
  }

#ifdef DEBUG
  display_buf_pool(*p);
#endif

}

/*
 * Create a priority queue object of priority ranging from 0..PRIMAX-1
 */
void create(ptable* p)
{
  ASSERT(p);

  int i = 0;

  /*
   * Initialize the entries
   */
  for(i = 0; i < PRI_MAX; i++) {
    p->entry[i].priority = i;
    p->entry[i].n = NULL;
    p->last[i] = NULL;
  }

  create_pool(&p, BUF_POOL_SIZE);

  p->stats = malloc(sizeof(queue_stats));

  memset ( &(p->lock), 0, sizeof(pthread_mutex_t));
  memset ( &(p->cv), 0, sizeof(pthread_cond_t));

  p->is_available = false;
  p->ent_count = PRI_MAX;
}

/*
 * Adds a node to the queue 
 */
void put_data(ptable* p, int key, int priority)
{
  ASSERT(p);
  ASSERT(priority < PRI_MAX);

  add_a_node(p, &(p->last[priority]), &(p->entry[priority].n),
                key, priority);
}

/*
 * Gets the highest priority node from the queue. If queue is empty,
 * then this routine blocks.
 */
void get_data(ptable* p, int* key, int* pri)
{
  ASSERT(p);

  LOCK(p->lock);
  int i = 0;
  node* temp = NULL;

wait_again:
  while (false == p->is_available) {
    /*
     * Else wait for the next element to get in
     */
    LOG2("Nothing in queue; waiting for data\n");
    pthread_cond_wait(&p->cv, &p->lock);
    LOG2("Waiting completed: got data\n");
  }

  for (i = 0; i < PRI_MAX; i++) {
    if (NULL != p->entry[i].n) {
      temp = (p->entry[i].n);

      *key = p->entry[i].n->key;
      *pri = p->entry[i].n->priority;

      p->entry[i].n = temp->next;

      LOG(" Dequeued: %d\n", p->stats->dequeue++);
      put_buf(p, temp);
#ifdef DEBUG
      LOG3("oo-put_data-oo\n");
      display_buf_pool(p);
      LOG3("---put_data--\n");
#endif
      pthread_cond_signal(&p->cv);
      UNLOCK(p->lock);
      return;
    }
  }
  p->is_available = false;
  goto wait_again;
}

/*
 * Test code
 * Uses two threads, acting as producer and consumer
 */
void* producer(void* p)
{
  ASSERT(p);

  ptable *table = (ptable*)p;

  printf("Thread producer\n");
  int i = 0;
  while(1) {
    /*
     * We break the producer after enqueuing 16 messages
     */
    if (i == 16) {
      break;
    }
    printf("Calling put_data %d\n\t", i);
    /*
     * Using max bucket as (MAX_PRI - 1)
     */ 
    put_data(p, i++, (i % 9));
  }
}

void* consumer(void* p)
{
  sleep(2);
  ptable *table = (ptable*)p;

  int key, priority;

  printf("Thread consumer\n");
  int i = 0;
  while(1) {
     printf("Calling get_data\n");
     get_data(p, &key, &priority);
     printf("\nSearch-> Priority=%d key= %d\n", priority, key);
    /*
     * We break the consumer after dequeuing 16 messages.
     * The next call to get_data will block since there
     * will be no data from the producer
     */
      if (i == 15) {
        break;
      }
  }
}

void cleanup(ptable *p)
{
  node *n = p->buf_pool;

  while(n) {
    node* temp = n;
    n = n->next;
    free(temp);
  }
  free(p);
}

int main()
{
  ptable *p = malloc(sizeof(ptable));
  create(p);

  pthread_t thread1, thread2;

  int iret1, iret2;

  iret1 = pthread_create( &thread1, NULL, producer, (void*) p);
  iret2 = pthread_create( &thread2, NULL, consumer, (void*) p);

  display(p);

  pthread_join( thread1, NULL);
  pthread_join( thread2, NULL);

  cleanup(p);
}

/*
 * Function to display the queue
 */
void display(ptable* p)
{
  ASSERT(p);
  int i = 0;
  node* t = NULL;
  for(i = 0; i < PRI_MAX; i++) {
    t = p->entry[i].n;
    while(t) {
      printf("\nBucket=%d|Key=%d|Priority=%d\n", p->entry[i].priority,
          t->key,
          t->priority);
      t = t->next;
    }
  }
}

XMPP, Openfire and Pidgin: A weekend buffet

My friend Pritam and I have embarked on a small walk to chat-world. The objects are XMPP based Openfire, and Pidgin. Our objective is to create a XMPP based private chat server that we would later host on a private IP. We plan to develop a plugin for customized behavior of server towards a client.

Why XMPP?

Because it is widely used standard for message-oriented communication. XMPP wiki is here.

Why Openfire? Because it is popular, free and open.

Our first day

– Installing Openfire is simple and so is configuring, adding users, and going online. In your local network, it is advisable to turn-off firewall. Read more here about cryptic Openfire parameters.

– Installing Pidgin is simple. You should be able to connect to your local chat server in an hour.

– We wanted our server to interact with all users and we found “Message of the Day” plugin. This plugin is part of Openfire sources. Code is in Java. We played with the code of this plugin and enjoyed different behaviors.

– Building Openfire sources requires Apache ANT and JRE 1.5 or higher. You can find more details here.

After downloading the Openfire sources, you would find “plugin” directory that has sources for “Message of the Day”. We have to build it. A build for plugin is as following which builds all plugins:

xyz@localhost:~/Downloads/openfire/src/openfire_src/build> ~/apps/netbeans-6.9.1/java/ant/bin/ant plugins

– This run will build all plugins. Our plugin is created as “motd.jar”. Check if the “JAR” is created.

xyz@localhost:~/Downloads/openfire/src/openfire_src/build> ll ../target/openfire/plugins/motd.jar

– Now we have your plugin ready and we deploy it in our Openfire instance. Deployment is trivial. We have to copy the new JAR file to plugins directory of Openfire. It will automatically create the necessary directory structure for our plugin.

xyz@localhost:~/Downloads/openfire/src/openfire_src/build> cp ../target/openfire/plugins/motd.jar ~/Downloads/openfire/plugins/
xyz@localhost:~/Downloads/openfire/src/openfire_src/build> ll !$
ll ~/Downloads/openfire/plugins/
total 60
drwxr-xr-x 3 xyz users 4096 2011-10-02 03:22 admin
drwxr-xr-x 5 xyz users 4096 2012-05-06 20:16 motd
-rw-r--r-- 1 xyz users 12621 2012-05-06 20:28 motd.jar
xyz@localhost:~/Downloads/openfire/src/openfire_src/build> ll ~/Downloads/openfire/plugins/
total 60
drwxr-xr-x 3 xyz users 4096 2011-10-02 03:22 admin
drwxr-xr-x 5 xyz users 4096 2012-05-06 20:28 motd
-rw-r--r-- 1 xyz users 12621 2012-05-06 20:28 motd.jar

– We played with different type of messages that we can send to a newly joined client. Then we dissected the Session type, hostname and many other useful attributes including Packets, Messages. One example is that the message of the day now tells the user his name 😛

(8:30:27 PM) localhost: vishal@localhost/59b71259

This is the message from the Openfire server to a client as soon as the client creates a session.

– You should keep Openfire API handy with you.

– We have planned to do a few more experiments in coming weeks to understand:

  • How XMPP server processes client connections?
  • How to receive and send packet to a client?
  • Could it be possible to get a slimmer Openfire 😉

Synchronize two threads to print ordered even and odd numbers in C

Problem:You have two threads, that independently print even and odd values. The goal is to synchronize these threads so as to get an non-decreasing ordered set of numbers and order should preserve natural ordering of numbers. So the output should be 1,2,3,4,5,6,7,8…..

It is an interesting problem and could be solved with a conditional variable.

Following is C implementation of the solution:

#include "stdio.h"
#include "stdlib.h"
#include "pthread.h"

pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;
void *functionCount1();
void *functionCount2();
int count = 0;
#define COUNT_DONE 200

void main()
{
 pthread_t thread1, thread2;
 pthread_create( &thread1, NULL, &functionCount1, NULL);
 pthread_create( &thread2, NULL, &functionCount2, NULL);
 pthread_join( thread1, NULL);
 pthread_join( thread2, NULL);
 exit(0);
}

// Print odd numbers
void *functionCount1()
{
  for(;;) {
   // Lock mutex and then wait for signal to relase mutex
   pthread_mutex_lock( &count_mutex );
   if ( count % 2 != 0 ) {
     pthread_cond_wait( &condition_var, &count_mutex );
   }
   count++;
   printf("Counter value functionCount1: %d\n",count);
   pthread_cond_signal( &condition_var );
   if ( count >= COUNT_DONE ) {
     pthread_mutex_unlock( &count_mutex );
     return(NULL);
   }
   pthread_mutex_unlock( &count_mutex );
 }
}

// print even numbers
void *functionCount2()
{
  for(;;) {
  // Lock mutex and then wait for signal to relase mutex
  pthread_mutex_lock( &count_mutex );
  if ( count % 2 == 0 ) {
    pthread_cond_wait( &condition_var, &count_mutex );
  }
  count++;
  printf("Counter value functionCount2: %d\n",count);
  pthread_cond_signal( &condition_var );
  if( count >= COUNT_DONE ) {
    pthread_mutex_unlock( &count_mutex );
    return(NULL);
  }
  pthread_mutex_unlock( &count_mutex );
 }
}

Data Mining: Hash tree based support counting

Data Mining: Hash Tree based support counting

Hash tree is a very quick way to search an item. When there are many itemsets, hash tree could be used to find out if a given itemset has got required support count. But, how do we construct hash tree? The links I came across were very abstarct to define the hash tree implementation.

Suppose we want to insert (i.e. hash)
the following 3-itemsets into the tree
(9,3,6)
(8,7,1)

We have taken hashing function h(x) = N mod 3. This has three possible values for h(x) = {0, 1, 2}. Each value is a branch of a Hash tree node. So, each node will have three branches.

Now, lets insert Itemsets = {8, 7, 5} {9, 3, 6}

Start with I1 = {8, 7, 5}

Image

Remember, that we are considering 8 because it is lavel 1 of the tree. Next level, we will consider 7 and the next with 5.

The first item = 8
h(8) = 8 mod 3 = 2

Image

I2 = {9, 3, 6}
The first item = 9
h(9) = 9 mod 3 = 0

Image

Level 1
Now, the secons item of set I1 = 7
h(7) = 7 mod 3 = 1

Image

Now, the secons item of set I2 = 3
h(7) = 3 mod 3 = 0

Image


A gold mine: Data mining

A science to sift through huge data-set and make out useful interpretations, means Data Mining to me. It is a very interesting way to find out “behavior” from data. During my graduation, I could never appreciate usefulness of data analysis for non-technical purposes aka business decision making.

But, probably maturity is dawning upon me 😉 It is such an exciting field that employs statistics, computer science and businesses. Tools including decision trees, rule based classifier are based on fundamentals of Information theory (Entropy, Information gain, Euclid distances) and could be implemented with ease on huge data-sets with emerging technologies like Hadoop.

My introduction to mining like methodology my professional work was based on Application performance profilers. This tool could collect and save samples on active application and later these samples were analyzed to make various interpretations on application performance.

Data mining is an interesting, promising field and I am keen to delve more into it.


Designing a performance stats framework

Perfromance stats are vital to understand dynamic efficiency of your code and may prove helpful to find and fix bottlenecks. This post discusses how to plug in these stats in your program.

There are primarily two classes of stats:

  • Counts
  • Time stamps

Stats are derived from events. You would be interested to know how many time an event happened, and how long did you spend to complete an operation. So, segragate your events into either “count” or “time”. Time could be computed for best, average and worst for an operation.

Next, figure out the fundamental events, e.g. “How many time printf was called” or “what is the average time my program spend in each call”? To compute latter, you need the frequency of “printf” and total time spent in all calls of printf. Such events are dependent on other events.

Design of perf stat framework need to have following phases:

  • Collection: Just collect raw data, most frequent operation.
  • Processing: Perform all calculation to compute derived events
  • Visualization

Please keep in mind that collection phase of events should be very quick. Do not perform any calculation during collection. This helps reducing intrusiveness of the event collection statements. The code to collect events should be tiny and can be inlined or #define’ed.

 


Problems and solution: Uninstalling VirtualBox old version

I had VirtualBox 3.2.8 installed on my system (Windows XP, i686). When I tried to install latest version 4.1.8, the installation sequence asked to uninstall version 3.2.8 and that required executable file of VirtualBox3.2.8.

Which I had deleted! 😦

Now, what to do?

  • I deleted VirtualBox folder from my program files directory.
  • Then, my friend Chetan suggested me to remove registry entries. Run “regedit” in ‘Start->Run’.
  • Now search(ctrl+F) for “virtualbox”; you would get many keys. The registry editor has two panes. Right side shows the key value and left side shows the containing location.
  • On the left pane, look out for folder that is in “open” icon (there would be only one folder looking different from others). Delete this folder.
  • Keep searching for the string by “F3” and keep deleting these folders.
  • You should get a message from Windows that registry scan finds no such word now.
  • Now, try installing VirtualBox. It worked for me 🙂

Python goof ups of a C/C++ programmer

Python is a new age language when compared to good old C. Writing code in Python needs a subtle shift from C mindset. Python offers so many things ready-made that make you feel that you wrote very less code. Anyway, I goofed up while using a very common feature of C: pass by reference.

Python too offers it, but with a caveat: you should know mutable and immutable data objects. When you pass a mutable object like list, dictionary; they can be changed. But, if you pass an immutable data object like string, tuple; they are kept unchanged in caller’s scope. This is very similar to passing “references” (C++) or constant pointers.

  1 class Node:
  2     def __init__(self, value, data=[]):
  3         self.char = value
  4         # We intend to keep a list of values for a key
  5         self.data = data
  6         # XXX: List of Node
  7         self.children = []
  8
  9
 10 n = Node('a')
 11 m = Node('b')
 12
 13 (n.data).append("north")
 14 (m.data).append("south")
 15
 16 print n.data
 17 print m.data

The output of the above code is:

ubuntu@ubuntu:~/Desktop/ToKeep/cprogs/pycon$ python pybug.py
['north', 'south']
['north', 'south']

I don’t quite understand why Python keeps a single instance of default argument(list). Nonetheless, it is interesting.

This thread on Stackoverflow is very informational.

  • Omitting “this” pointer

In C++, “this” which is pointer to the object is passed implicitly. Python has similar keyword “self” for this. But, contrary to an implicit understanding, “self” should be an argument to a function, else you would see an error:

NameError: global name ‘self’ is not defined

  1 class pydbg:
  2     def __init__(self, modname=None):
  3         self.level = 0
  4         self.modname = modname
  5
  6     def DLOG(self, lvl=None, fmt=None, *args):
  7         self.level = lvl
  8         print '%s' %self.modname
  9         print fmt
 10         for arg in args:
 11             print arg

So, always add the “self” argument while adding a function in your class.


GNU cflow: A tool to analyze C code flow

I was having a look at a code piece authored by someone else. And I wondered if we could have a tool that would at least find out the code flow. I googled for such tool and found “GNU cflow”.

“cflow” is an open source tool, that can facilitate static analysis in form of code-flow of your program. cflow reports are hierarchical, hence very intuitive. If you desire a GUI form of this data, we have another open source utility called Graphviz.

I found it very useful for:

o) Reviewing a new code
o) Understanding legacy C code
o) Remember, it can’t handle function pointers 😦
 

A typical cflow invocation look like this:

cflow --format=posix --omit-arguments --level-indent='0=\t'
--level-indent='1=\t' --level-indent=start='\t' "source_file" > "out.cf"

Output looks as follows:

$ ./cflow ./my2.c
main() :
    open()
    printf()
    read()

By default, “cflow” shows all symbols present in your program. We (Srikanth and I) didn’t like that, and hence changed the “cflow” to accommodate our request to ignore-symbols.

So, you just mention all symbols you’d like to ignore in a file “/tmp/skip_list.txt” in a format like this

Symbol_1
Symbol_2

The source code and binary are available here.

After collecting the profile, a browser based GUI can be generated with cflow2dot utility.


Extents in “ext3” file system

My Ubuntu Linux ship with ext3 file-system. This FS is very similar to classical model explained in UNIX OS. A file is logically arranged in a set of blocks, managed through an array of block pointers. In ext3, each inode has an array of fifteen elements. Twelve elements of this array point to a disk block. Usually, a disk block is configured to 4KB. Thus twelve such 4KB blocks (L0/ level 0) could be pointed by these array entries (i.e. a file of 48BK is always contained in L0 blocks).

As soon as a new block is allocated for this file, thirteenth element of this array comes to play. It is called L1/level1 block and keep pointers to 1024 L0 blocks. Thirteenth request creates an entry in L1 blocks. Fourteenth and fifteenth entry serve L2 and L3 blocks.

Instead of allocating a block at a time, Linux has optimized the way disk blocks are allocated for a file with “extents”. An extent is a contiguous set of disk blocks. Kernel allocates an extent of blocks.

Lets create a file and see how does Linux handle block-allocation:

1 #include <fcntl.h>
2 int main()
3 {
4   int i;
5   char buf[4096];
6
7   memset(buf, ‘a’, 4096);
8   int fd = open(“foo.txt”, O_CREAT|O_WRONLY|O_TRUNC);
9
10   for (i =0; i < 2; i++) {
11     write(fd, buf, 4096);
12   }
13   write(fd, “end”, 3);
14   close(fd);
15 }

We have created a 8192 +3  = 8195 bytes file.

kanaujia@ubuntu:~/Desktop/ToKeep/cprogs$ ls -l foo.txt
———- 1 kanaujia kanaujia 8195 2011-10-16 10:42 foo.txt

How much space this file occupy on disk?

kanaujia@ubuntu:~/Desktop/ToKeep/cprogs$ du -h !$
du -h foo.txt
12K    foo.txt

That means I need three disk blocks of 4KB to store this file. Now let’s see how Linux allot these block in a single extent. The ioctl() call has FS_IOC_FIEMAP flag that provides facility to get access to this information from user space. A file extent map structure is defined as follows:

[include/linux/fiemap.h]

struct fiemap {
  28        __u64 fm_start;         /* logical offset (inclusive) at
  29                                           * which to start mapping (in) */
  30        __u64 fm_length;        /* logical length of mapping which
  31                                              * userspace wants (in) */
  32        __u32 fm_flags;         /* FIEMAP_FLAG_* flags for request (in/out) */
  33        __u32 fm_mapped_extents; /* number of extents that were mapped (out) */
  34        __u32 fm_extent_count;  /* size of fm_extents array (in) */
  35        __u32 fm_reserved;
  36        struct fiemap_extent fm_extents[0]; /* array of mapped extents (out) */
  37};

A simple C program to fill-up this structure would fetch you extent information.

For my file, I got the following data:

kanaujia@ubuntu:~/Desktop/ToKeep/cprogs$ ./fiemap ./foo.txt
File ./foo.txt has 1 extents:
#    Logical                        Physical                           Length           Flags
0:    0000000000000000 0000000000000000 0000000000003000 0007

We have only one extent to accommodate this file. This extent spans three disk block as the length is 0x3000 or 12KB.

Ref: