Category Archives: linux

A Filesystem in a File (C, Linux)

I wrote a file system that can mimic standard calls (open, close).
Please enhance and also share bugs, if any 🙂

https://github.com/vishalkanaujia/dev-tools/tree/master/file-system-in-a-file

Advertisements

Netlink Sockets: Addind a new socket family

  • Kernel version 3.13, Ubuntu 14.04
  • $ uname -a
    Linux ubuntu 3.13.0-37-generic #64-Ubuntu SMP Mon Sep 22 21:28:38 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
  • We need two changes:
  • User space
    • Locate the file “netlink.h”
    • $ sudo locate “netlink.h”
    • You would get the header file at “/usr/include/linux/netlink.h”
    • Add the new family
    • #define NETLINK_MY     17
    • Keep the number less than 32
    • In you user application, add “#define NELINK_MY 17”
  • Kernel space
    • Locate the netlikn file for kernel space
    • /usr/src/linux-headers-3.13.0-24/include/uapi/linux/netlink.h
    • Add exact definition here as well
  • No recompilation of kernel is required.
  • References:

Linux FUSE Internals for developers

In this post, I will cover FUSE internals for FUSE 2.9.3.

  • Install package fuse and fuse-devel on CentOS.
  • getattr() is a must in a FUSE file-system. Any lame implementation is okay;
    • Just be careful of the file size in stat structure. If you forgot to compile user file system with 64-bit flags on. Otherwise the statst_size is signed int (32 – 1 bit field).
    • Your file size should not exceed > 2GB. Otherwise, it will be overflowed to zero.
  • In the user application, be careful with file I/O operations. A read () immediately followed by a write() would fetch you nothing. You should first lseek() to beginning of the file in your application.
  • FUSE has two modes of operation:
    • Single thread (very low performance, easy to debug)
    • Multi-thread (default operation)
  • Multi-thread spawns multiple threads during read operation. I observed almost single thread like behavior for writes.
  • Code to multi-thread I/O implementation is in lib/fuse_loop_mt.c.
  • FUSE uses worker threads to handle I/O requests, using struct fuse_worker. A worker thread is created in fuse_start_thread().
  • Each worker run fuse_do_work() function. This is an infinite loop and terminates only on session exit OR if number of active threads exceed than required.
  • User implementation of file system APIs are populated in const struct fuse_operations. It has address of all implemented APIs. FUSE ultimately calls these APIs for file system operations.
  • FUSE 2.7 reads 8K data by default, in two 4K chunks
    • Read happens in last 4K and the first 4K data block
  • An example:

    I had set the file size as 4MB in getattr () implementation. If you forget to compile with 64-bit flags, you will get zero length files.

    int bb_getattr(const char *path, struct stat *statbuf)
    {
        int retstat = 0;
        memset(statbuf, 0, sizeof(struct stat));
        if (strcmp(path, "/") == 0) {
            statbuf->st_mode = S_IFDIR | 0755;
            statbuf->st_nlink = 2;
        } else {
            statbuf->st_mode = S_IFREG | 0444;
            statbuf->st_nlink = 1;
            statbuf->st_size = 4 * 1024* 1024;
        }   
        return retstat;
    }

    The sequence of calls and their arguments is as follows:

    bb_getattr(path="/abcd.txt", statbuf=0xc5387960)
        rootdir = "/tmp", path = "/abcd.txt"
    bb_open(path"/abcd.txt", fi=0xc5daaa50)
        rootdir = "/tmp", path = "/abcd.txt"
        fi:
        flags = 0x00008002
        fh_old = 0x00000000
        writepage = 0
        direct_io = 0
        keep_cache = 0
        fh = 0x0000000000000001
        lock_owner = 0x0000000000000000
    bb_write(path="/abcd.txt", buf=0xc4966050, size=10, offset=0, fi=0xc5387a50)
        fi:
        flags = 0x00000000
        fh_old = 0x00000001
        writepage = 0
        direct_io = 0
        keep_cache = 0
        fh = 0x0000000000000001
        lock_owner = 0x0000000000000000
    bb_read(path="/abcd.txt", buf=0x06ccbd90, size=12288, offset=4096, fi=0xc5daaa50)  <- Here
        fi:
        flags = 0x00000000
        fh_old = 0x00000001
        writepage = 0
        direct_io = 0
        keep_cache = 0
        fh = 0x0000000000000001
        lock_owner = 0x0000000000000000
    
    bb_read(path="/abcd.txt", buf=0x06ccbd90, size=4096, offset=0, fi=0xc5daaa50)
        fi:
        flags = 0x00000000
        fh_old = 0x00000001
        writepage = 0
        direct_io = 0
        keep_cache = 0
        fh = 0x0000000000000001
        lock_owner = 0x0000000000000000
WRITE stack trace
================
(gdb) bt
#0  bb_write (path=0x7ffc68000990 "/test_file.0", buf=0x7ffff6f42060 "", size=4096, offset=4096, fi=0x7ffff6f40550) at bbfs.c:136
#1  0x00007ffff7dc885f in fuse_fs_write_buf (fs=0x280f090, path=0x7ffc68000990 "/test_file.0", buf=0x7ffff6f40580, off=4096, fi=0x7ffff6f40550)
    at fuse.c:1878
#2  0x00007ffff7dccb37 in fuse_lib_write_buf (req=0x7ffc680008c0, ino=2, buf=0x7ffff6f40580, off=4096, fi=0x7ffff6f40550) at fuse.c:3278
#3  0x00007ffff7dd461b in do_write_buf (req=0x7ffc680008c0, nodeid=2, inarg=0x7ffff6f42038, ibuf=0x7ffff6f40800) at fuse_lowlevel.c:1300
#4  0x00007ffff7dd7369 in fuse_ll_process_buf (data=0x280f220, buf=0x7ffff6f40800, ch=0x280ece0) at fuse_lowlevel.c:2437
#5  0x00007ffff7dd9aa5 in fuse_session_process_buf (se=0x280ed30, buf=0x7ffff6f40800, ch=0x280ece0) at fuse_session.c:87
#6  0x00007ffff7dd0f6a in fuse_do_work (data=0x7ffff00008c0) at fuse_loop_mt.c:117
#7  0x00000037bc2079d1 in start_thread () from /lib64/libpthread.so.0
#8  0x00000037bbee8b6d in clone () from /lib64/libc.so.6

READ stack trace
=================
(gdb) bt
#0  bb_read (path=0x7ffff38c55b0 "/test_file.0", buf=0x7ffff38c56c0 "", size=4096, offset=8192, fi=0x7ffff79635d0) at bbfs.c:111
#1  0x00007ffff7dc841e in fuse_fs_read_buf (fs=0x280f090, path=0x7ffff38c55b0 "/test_file.0", bufp=0x7ffff7963578, size=4096, off=8192, 
    fi=0x7ffff79635d0) at fuse.c:1794
#2  0x00007ffff7dcca1d in fuse_lib_read (req=0x7ffff002a1e0, ino=2, size=4096, off=8192, fi=0x7ffff79635d0) at fuse.c:3252
#3  0x00007ffff7dd42c7 in do_read (req=0x7ffff002a1e0, nodeid=2, inarg=0x7ffff7965038) at fuse_lowlevel.c:1232
#4  0x00007ffff7dd73ce in fuse_ll_process_buf (data=0x280f220, buf=0x7ffff7963800, ch=0x280ece0) at fuse_lowlevel.c:2441
#5  0x00007ffff7dd9aa5 in fuse_session_process_buf (se=0x280ed30, buf=0x7ffff7963800, ch=0x280ece0) at fuse_session.c:87
#6  0x00007ffff7dd0f6a in fuse_do_work (data=0x280ee30) at fuse_loop_mt.c:117
#7  0x00000037bc2079d1 in start_thread () from /lib64/libpthread.so.0
#8  0x00000037bbee8b6d in clone () from /lib64/libc.so.6

Compiling FUSE based file system with your FUSE build

Suppose hello.c has implementation of file system APIs and your FUSE installation resides in /home/k/Desktop/my_fuse_2.9.3.

$gcc -g hello.c -o hi -D_FILE_OFFSET_BITS=64 -I/home/k/Desktop/my_fuse_2.9.3/include -lpthread
-L/home/k/Desktop/my_fuse_2.9.3/lib -lfuse -LLIBDIR=/home/k/my_fuse_2.9.3/lib
-Wl,-rpath -Wl,/home/k/my_fuse_2.9.3/lib

Pylucene: Installation on Ubuntu

If you want to install pylucene automatically, try Synaptic package manager or apt-get. This installation gives you Pylucene2.3. This is old. If you wish to get the latest pylucene 3.6 or higher, please look for manual installation discussed in this post.

Automatic installation (pylucene 2.3)

  • Install everything mentioned here
  • sudo apt-get install pylucene
  • sudo apt-get install python-dev
  • I borrowed a test program from here. The immediate error you would see if you try to run a pylucene based program is:
kanaujia@ubuntu:~/work/Py$ python myluce.py 
Traceback (most recent call last):
  File "myluce.py", line 11, in 
    import lucene
  File "/usr/lib/python2.7/dist-packages/lucene/__init__.py", line 2, in 
    import os, _lucene
ImportError: libjvm.so: cannot open shared object file: No such file or directory
  • Run following:
    $ ldconfig -p | grep libjvm

    If you find nothing, see the next point.

  • Make sure you have Java JDK/ JRE available on your machine.
    root@ubuntu:~# find / -type f -name libjvm.so 
            /usr/lib/jvm/java-6-openjdk-i386/jre/lib/i386/cacao/libjvm.so
    /usr/lib/jvm/java-6-openjdk-i386/jre/lib/i386/server/libjvm.so
    /usr/lib/jvm/java-6-openjdk-i386/jre/lib/i386/client/libjvm.so
    /usr/lib/jvm/java-6-openjdk-i386/jre/lib/i386/jamvm/libjvm.so
  • Export the path to your library.
    root@ubuntu:~/work/Py$ export LD_LIBRARY_PATH=
    /usr/lib/jvm/java-6-openjdk-i386/jre/lib/i386/server:$LD_LIBRARY_PATH
  • And, you are done 🙂
    root@ubuntu:~/work/Py$ python myluce.py
    Usage:
    myluce.py <field_name> <index_url>

Installing Pylucene 3.0 or higher version manually

  1. Install everything mentioned here
  2. wget https://bootstrap.pypa.io/ez_setup.py -O - | python
  3. sudo apt-get install python-dev
  4. Get the package from http://lucene.apache.org/pylucene
  5. Find out the JVM path on your machine
  6. $ sudo update-java-alternatives -l
    java-1.6.0-openjdk-i386 1061 /usr/lib/jvm/java-1.6.0-openjdk-i386
  7. Now goto your pylucene package, unzip and untar.

Note: I am following instructions at: http://lucene.apache.org/pylucene/install.html

  • cd ./pylucene-3.6.1-2/
  • pushd jcc
  • <edit setup.py to match your environment>

Open setup.py, and search for “JDK”. Update the JDK path to what you found in step 3.

  • 51 JDK = {
     52     'darwin': JAVAHOME,
     53     'ipod': '/usr/include/gcc',
     54     #'linux2': '/usr/lib/jvm/java-6-openjdk',
     55     'linux2': '/usr/lib/jvm/java-1.6.0-openjdk-i386',
     56     'sunos5': '/usr/jdk/instances/jdk1.6.0',
     57     'win32': JAVAHOME,
     58     'mingw32': JAVAHOME,
     59     'freebsd7': '/usr/local/diablo-jdk1.6.0'
     60 }
    • python setup.py build
    • sudo python setup.py install
    • popd
    • <edit Makefile to match your environment>
    $ vi Makefile

    Uncomment following:

     48 # Mac OS X 10.6 (64-bit Python 2.6, Java 1.6)
     49 PREFIX_PYTHON=/usr
     50 ANT=ant
     51 PYTHON=$(PREFIX_PYTHON)/bin/python
     52 JCC=$(PYTHON) -m jcc.__main__ --shared --arch x86_64
     53 NUM_FILES=4
    • make

    This ‘make’ is very very slow. On my dual core laptop, it took about 15 minutes to complete.

    • sudo make install
    • make test (look for failures)
    • Done… sigh (Thanks a bunch to this blog)

Ubuntu on a Windows host: Alternative to VirtualBox and VMWare

Wubi is a cool alternative to VMPlayer and VirtualBox to run Ubuntu “almost” natively on your Windows system. It gives you a dual boot machine without partitioning your filesystem.

How to do it
• Install Wubi
• Plave your Ubuntu ISO in the _same_ place where your Wubi binaries are
• Install Ubuntu from Wubi installer

Now, after Ubuntu installation, and reboot, you will see a dual boot option of Windows and Ubuntu. Ubuntu runs on bare hardware except the disk accesses.

How it works
Wubi is based on loopback devices in Linux. A looback device exports a file as a device. You can mount this “file” and craete a file-system on it.
Wubi creates a file in your Windows NTFS file system (“root.disk”) which is exported as a loopback device in Ubuntu. This file is formatted to a file-system and used by Ubuntu.

In my Ubuntu system:

kanaujia@ubuntu:/tmp$ sudo mount
[sudo] password for kanaujia: 
/dev/loop0 on / type ext4 (rw,errors=remount-ro)

kanaujia@ubuntu:/tmp$ sudo losetup -a
/dev/loop0: [0801]:115068 (/host/ubuntu/disks/root.disk)

kanaujia@ubuntu:/tmp$ cat !$
cat /etc/fstab
# /etc/fstab: static file system information.
#
# Use 'blkid' to print the universally unique identifier for a
# device; this may be used with UUID= as a more robust way to name devices
# that works even if disks are added and removed. See fstab(5).
#
# <file system> <mount point>   <type>  <options>       <dump>  <pass>
proc            /proc           proc    nodev,noexec,nosuid 0       0
/host/ubuntu/disks/root.disk /               ext4    loop,errors=remount-ro 0       1
/host/ubuntu/disks/swap.disk none            swap    loop,sw         0       0

That’s it! it is a simple concept used beautifully. I think if this setup has negative performance impact? I will find that out too later.

Anyway for fun, I experimented creating my own file-system with loop-back device:

Create a file with random data
==============================
kanaujia@ubuntu:/tmp$ dd if=/dev/urandom of=/home/kanaujia/Desktop/myfs bs=1M count=10
10+0 records in
10+0 records out
10485760 bytes (10 MB) copied, 1.16245 s, 9.0 MB/s

Create a mount point
====================
kanaujia@ubuntu:/tmp$ sudo mkdir /mnt/myfs

Update /etc/fstab
=================
kanaujia@ubuntu:/tmp$ sudo vi /etc/fstab

Setup the loopback device
=========================
kanaujia@ubuntu:/tmp$ sudo losetup /dev/loop1 /home/kanaujia/Desktop/myfs

Format the device as a file-system
==================================
kanaujia@ubuntu:/tmp$ mkfs.ext3 -c /dev/loop1
mke2fs 1.42 (29-Nov-2011)
mkfs.ext3: Permission denied while trying to determine filesystem size
kanaujia@ubuntu:/tmp$ sudo mkfs.ext3 -c /dev/loop1
mke2fs 1.42 (29-Nov-2011)
Discarding device blocks: done                            
Filesystem label=
OS type: Linux
Block size=1024 (log=0)
Fragment size=1024 (log=0)
Stride=0 blocks, Stripe width=0 blocks
2560 inodes, 10240 blocks
512 blocks (5.00%) reserved for the super user
First data block=1
Maximum filesystem blocks=10485760
2 block groups
8192 blocks per group, 8192 fragments per group
1280 inodes per group
Superblock backups stored on blocks: 
    8193

Checking for bad blocks (read-only test): done                                                 
Allocating group tables: done                            
Writing inode tables: done                            
Creating journal (1024 blocks): done
Writing superblocks and filesystem accounting information: done

kanaujia@ubuntu:/tmp$ sudo mount /dev/loop1
kanaujia@ubuntu:/tmp$ mount
/dev/loop0 on / type ext4 (rw,errors=remount-ro)
proc on /proc type proc (rw,noexec,nosuid,nodev)
sysfs on /sys type sysfs (rw,noexec,nosuid,nodev)
none on /sys/fs/fuse/connections type fusectl (rw)
none on /sys/kernel/debug type debugfs (rw)
none on /sys/kernel/security type securityfs (rw)
udev on /dev type devtmpfs (rw,mode=0755)
devpts on /dev/pts type devpts (rw,noexec,nosuid,gid=5,mode=0620)
tmpfs on /run type tmpfs (rw,noexec,nosuid,size=10%,mode=0755)
none on /run/lock type tmpfs (rw,noexec,nosuid,nodev,size=5242880)
none on /run/shm type tmpfs (rw,nosuid,nodev)
/dev/sda1 on /host type fuseblk (rw,nosuid,nodev,relatime,user_id=0,group_id=0,allow_other,blksize=4096)
gvfs-fuse-daemon on /home/kanaujia/.gvfs type fuse.gvfs-fuse-daemon (rw,nosuid,nodev,user=kanaujia)
/dev/loop1 on /mnt/myfs type ext3 (rw,noexec,nosuid,nodev)

kanaujia@ubuntu:/tmp$ cd /mnt/
kanaujia@ubuntu:/mnt$ ls
myfs

kanaujia@ubuntu:/mnt$ cd myfs/

kanaujia@ubuntu:/mnt/myfs$ ls
lost+found

kanaujia@ubuntu:/mnt/myfs$ ll
total 17
drwxr-xr-x 3 root root  1024 Jul 11 13:32 ./
drwxr-xr-x 3 root root  4096 Jul 11 13:30 ../
drwx------ 2 root root 12288 Jul 11 13:32 lost+found/

kanaujia@ubuntu:/mnt/myfs$ sudo touch hh
kanaujia@ubuntu:/mnt/myfs$ ls
hh  lost+found
kanaujia@ubuntu:/mnt/myfs$ ll
total 17
drwxr-xr-x 3 root root  1024 Jul 11 13:34 ./
drwxr-xr-x 3 root root  4096 Jul 11 13:30 ../
-rw-r--r-- 1 root root     0 Jul 11 13:34 hh
drwx------ 2 root root 12288 Jul 11 13:32 lost+found/

References:
Loopback Devices in Linux
http://csulb.pnguyen.net/loopbackDev.html

Having slow Ubuntu 11.04?

I am using Ubuntu 11.04 as a VM hosted on a Windows machine. I suspend the Ubuntu VM most of the times to resume my work later. Of late, I observed that VM was performing very slow. I checked the CPU consumption behavior and found that a process “telepathy-logger” was using 100% of CPU!

After digging more, I found that this was a daemon process and used by Empathy messenger. In fact this is a known bug in “telepathy-logger” that if a machine resumes, this daemon misbehaves and consumes up to 100% CPU. Link.

Solution:

  • Kill the daemon [   $killall telepathy-logger   ]
  • Do not use Empathy messenger if you plan to resume a suspended workstation

A priority queue with O(1) operations: C, Linux implementation

I have implemented a priority queue that enable O(1) time complexity for major operations i.e. enqueue and dequeue.
It is C, POSIX compliant code.

Things to note
– It uses buffer pools
– Its behavior is to return the highest priority node from the queue
– If queue is empty/full, we use conditional waits
– Priorities are defined from 0..PRIMAX-1 as an array
– Each priority bucket has a list of node that share that priority
– It is thread-safe implementation
– It could be used between two threads for data exchange (producer/consumer)

Here is the GitHub link

/* queue.h */

#include "stdlib.h"
#include "stdio.h"
#include "pthread.h"
#include "assert.h"
#include "string.h"

#define ASSERT(x) assert(x)
#define PRI_MAX 10
#define BUF_POOL_SIZE 1024
#define uint32_t int
#define LOG printf
#define LOG2 printf
#define LOG3 printf
#define LOCK(x) pthread_mutex_lock(&x)
#define UNLOCK(x) pthread_mutex_unlock(&x)

typedef enum bool_ {
  false,
  true
} bool;

typedef struct queue_stats_ {
  int enqueue;
  int dequeue;
  int wait_time;
} queue_stats;

int priority[PRI_MAX];

/*
 * List of nodes in a hash bucket
 */
 typedef struct node_ {
  int key;
  int priority;
  struct node_* next;
} node;

/*
 * Define the hash table
 * |p1| ->|a|->|b|->|c|
 * |p2|->|e|->|f|
 */
 typedef struct ptable_entry_ {
  int priority;
  node* n;
} ptable_entry;

typedef struct ptable_ {
  ptable_entry entry[PRI_MAX];
  node* last[PRI_MAX];
  node* buf_pool;
  node* free_bbuf_pool;
  int ent_count;
  pthread_mutex_t lock;
  pthread_cond_t cv;
  bool is_available;
  queue_stats *stats;
} ptable;

void create(ptable*);
void get_data(ptable*, int*, int*);
void put_data(ptable*, int key, int priority);
void destroy(ptable*);
void display(ptable*);
void put_buf(ptable* p, void* buf);
void create_pool(ptable** p, uint32_t num);
void* get_buf(ptable* p);
void display_buf_pool(ptable* p);

/*
 * Helper functions
 */

void add_a_node(ptable* p, node** last, node** m, int key, int priority);

/* queue.c */

#include "queue.h"

/*
 * Adds a node of a given priority to the queue. Since a node is
 * allocated from a fixed size buffer pool, this function blocks
 * if pool has no free buffer object. 
 */
void add_a_node(ptable* p, node** last, node** m, int key, int priority)
{
  ASSERT(p);

  LOCK(p->lock);
  node *n = NULL;

  n = (node*)get_buf(p);

  LOG3("oo-get_data-oo\n");
  display_buf_pool(p);
  LOG3("---get_data--\n");

  if (NULL == n) {
    LOG2("Buf pool is over. Waiting for dequeue\n");
    pthread_cond_wait(&p->cv, &p->lock);
    n = (node*)get_buf(p);
    LOG2("Producer: wait over. Got a buffer back\n");
  }

  /*
   * Collided nodes are arranged in a list (queue)
   */
  n->key = key;
  n->priority = priority;
  n->next = NULL;

  if (NULL == *m) {
    *m = n;
  } else {
    (*last)->next = n;
  }

  *last = n;
  LOG("Enqueue: %d\n", p->stats->enqueue++);

  p->is_available = true;
  pthread_cond_signal(&p->cv);
  UNLOCK(p->lock);
}

/*
 * Gets a buffer from the buffer pool
 */
void* get_buf(ptable *p)
{
  /*
   * Check if we have at least two nodes
   */
  node* head = p->buf_pool;

  if(p->buf_pool != NULL) {
    p->buf_pool = head->next;
    LOG2("Stealing a buffer %p\n", head);
    return head;
  } else {
    LOG2("\nBuffer overrun\n");
    return NULL;
  }
}

/*
 * Returns a buffer to buffer pool
 */
void put_buf(ptable* p, void* buf)
{
  if (p->buf_pool) {
    node* head = (node*)buf;
    head->next = p->buf_pool;
    p->buf_pool = head;
    LOG2("Unstealing a buffer %p\n", buf);
  } else {
    p->buf_pool = buf;
    LOG2("Unstealing the last buffer %p\n", buf);
  }
}

void display_buf_pool(ptable* p)
{
  ASSERT(p);

  int i = 1;
  node* temp = p->buf_pool;

  while(temp) {
    LOG2("Buf %d: %p\n", i++, temp);
    temp = temp->next;
  }
}

void create_pool(ptable** p, uint32_t num)
{
  node* head= NULL;
  node* temp = NULL;

  int i = 0;

  head = malloc(sizeof(node));

  temp = head;

  for(i = 1; i < num; i++) {
    temp->next = malloc(sizeof(node));
    temp = temp->next;
  }
  temp->next = NULL;

  /*
   * Set the buf pool
   */
  if (NULL == (*p)->buf_pool) {
    (*p)->buf_pool = head;
  }

#ifdef DEBUG
  display_buf_pool(*p);
#endif

}

/*
 * Create a priority queue object of priority ranging from 0..PRIMAX-1
 */
void create(ptable* p)
{
  ASSERT(p);

  int i = 0;

  /*
   * Initialize the entries
   */
  for(i = 0; i < PRI_MAX; i++) {
    p->entry[i].priority = i;
    p->entry[i].n = NULL;
    p->last[i] = NULL;
  }

  create_pool(&p, BUF_POOL_SIZE);

  p->stats = malloc(sizeof(queue_stats));

  memset ( &(p->lock), 0, sizeof(pthread_mutex_t));
  memset ( &(p->cv), 0, sizeof(pthread_cond_t));

  p->is_available = false;
  p->ent_count = PRI_MAX;
}

/*
 * Adds a node to the queue 
 */
void put_data(ptable* p, int key, int priority)
{
  ASSERT(p);
  ASSERT(priority < PRI_MAX);

  add_a_node(p, &(p->last[priority]), &(p->entry[priority].n),
                key, priority);
}

/*
 * Gets the highest priority node from the queue. If queue is empty,
 * then this routine blocks.
 */
void get_data(ptable* p, int* key, int* pri)
{
  ASSERT(p);

  LOCK(p->lock);
  int i = 0;
  node* temp = NULL;

wait_again:
  while (false == p->is_available) {
    /*
     * Else wait for the next element to get in
     */
    LOG2("Nothing in queue; waiting for data\n");
    pthread_cond_wait(&p->cv, &p->lock);
    LOG2("Waiting completed: got data\n");
  }

  for (i = 0; i < PRI_MAX; i++) {
    if (NULL != p->entry[i].n) {
      temp = (p->entry[i].n);

      *key = p->entry[i].n->key;
      *pri = p->entry[i].n->priority;

      p->entry[i].n = temp->next;

      LOG(" Dequeued: %d\n", p->stats->dequeue++);
      put_buf(p, temp);
#ifdef DEBUG
      LOG3("oo-put_data-oo\n");
      display_buf_pool(p);
      LOG3("---put_data--\n");
#endif
      pthread_cond_signal(&p->cv);
      UNLOCK(p->lock);
      return;
    }
  }
  p->is_available = false;
  goto wait_again;
}

/*
 * Test code
 * Uses two threads, acting as producer and consumer
 */
void* producer(void* p)
{
  ASSERT(p);

  ptable *table = (ptable*)p;

  printf("Thread producer\n");
  int i = 0;
  while(1) {
    /*
     * We break the producer after enqueuing 16 messages
     */
    if (i == 16) {
      break;
    }
    printf("Calling put_data %d\n\t", i);
    /*
     * Using max bucket as (MAX_PRI - 1)
     */ 
    put_data(p, i++, (i % 9));
  }
}

void* consumer(void* p)
{
  sleep(2);
  ptable *table = (ptable*)p;

  int key, priority;

  printf("Thread consumer\n");
  int i = 0;
  while(1) {
     printf("Calling get_data\n");
     get_data(p, &key, &priority);
     printf("\nSearch-> Priority=%d key= %d\n", priority, key);
    /*
     * We break the consumer after dequeuing 16 messages.
     * The next call to get_data will block since there
     * will be no data from the producer
     */
      if (i == 15) {
        break;
      }
  }
}

void cleanup(ptable *p)
{
  node *n = p->buf_pool;

  while(n) {
    node* temp = n;
    n = n->next;
    free(temp);
  }
  free(p);
}

int main()
{
  ptable *p = malloc(sizeof(ptable));
  create(p);

  pthread_t thread1, thread2;

  int iret1, iret2;

  iret1 = pthread_create( &thread1, NULL, producer, (void*) p);
  iret2 = pthread_create( &thread2, NULL, consumer, (void*) p);

  display(p);

  pthread_join( thread1, NULL);
  pthread_join( thread2, NULL);

  cleanup(p);
}

/*
 * Function to display the queue
 */
void display(ptable* p)
{
  ASSERT(p);
  int i = 0;
  node* t = NULL;
  for(i = 0; i < PRI_MAX; i++) {
    t = p->entry[i].n;
    while(t) {
      printf("\nBucket=%d|Key=%d|Priority=%d\n", p->entry[i].priority,
          t->key,
          t->priority);
      t = t->next;
    }
  }
}