阻塞式队列-支持多线程

//*****************************************************************************
//
// blocking queue source file
//
// author: yaobyron@gmail.com
// update: 2014/06/03
// version: v1.0
//
//*****************************************************************************

/******************************************************************************
* TODO:
* 2014/06/03 implement blocking queue
*
* History:
* 2014/06/03 create this file
******************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include “blockqueue.h”

/*******************************************************************************
* Function Name : blockqueue_init
* Description : create and init blockqueue
* Input : void
* Output : void
* Return : blockqueue_t * // blockqueue head point
*******************************************************************************/
blockqueue_t *blockqueue_init(void)
{
blockqueue_t *bq;

bq = malloc(sizeof(blockqueue_t));
if (!(bq))
return NULL;

pthread_mutex_init(&(bq->w_mutex), NULL);
pthread_mutex_init(&(bq->r_mutex), NULL);
pthread_mutex_lock(&(bq->r_mutex));

return bq;
}

/*******************************************************************************
* Function Name : blockqueue_push
* Description : set blockqueue_item to blockqueue
* Input : blockqueue_t *queue //blockqueue point
* void *data // data address point
* unsigned int len // data length
* Output : blockqueue_t *queue //blockqueue point
* Return : void
*******************************************************************************/
void blockqueue_push(blockqueue_t *queue, void *data, unsigned int len)
{
blockqueue_item_t *bqi;

bqi = malloc(sizeof(blockqueue_item_t));
if (!(bqi))
return;

bqi->data.data = data;
bqi->data.len = len;

pthread_mutex_lock(&(queue->w_mutex));

bqi->next = NULL;
if(queue->head == NULL)
{
queue->head = bqi;
queue->tail = bqi;
} else {
queue->tail->next = bqi;
queue->tail = bqi;
}
queue->size++;

pthread_mutex_unlock(&(queue->w_mutex));
pthread_mutex_unlock(&(queue->r_mutex));
}

/*******************************************************************************
* Function Name : blockqueue_pop
* Description : get blockqueue_item from blockqueue, decrease
* Input : blockqueue_t *queue //blockqueue point
* Output : blockqueue_t *queue //blockqueue point after pop
* Return : blockqueue_data_t // data address point and data length
*******************************************************************************/
blockqueue_data_t blockqueue_pop(blockqueue_t *queue)
{
blockqueue_item_t *bqi;
blockqueue_data_t data;

pthread_mutex_lock(&(queue->r_mutex));
pthread_mutex_lock(&(queue->w_mutex));

data.data = NULL;
data.len = 0;
if(!(bqi = queue->head)) {
pthread_mutex_unlock(&(queue->w_mutex));

return data;
}

data = queue->head->data;
queue->head = queue->head->next;
if (queue->size == 1)
{
queue->head = NULL;
queue->tail = NULL;
}
queue->size–;
free(bqi);

if (queue->size > 0)
pthread_mutex_unlock(&(queue->r_mutex));

pthread_mutex_unlock(&(queue->w_mutex));

return data;
}

/*******************************************************************************
* Function Name : blockqueue_peek
* Description : get blockqueue_item from blockqueue, not decrease
* Input : blockqueue_t *queue //blockqueue point
* Output : blockqueue_t *queue //blockqueue point
* Return : blockqueue_data_t // data address point and data length
*******************************************************************************/
blockqueue_data_t blockqueue_peek(blockqueue_t *queue)
{
blockqueue_data_t data;

data.data = NULL;
data.len = 0;
if(queue->head)
data = queue->head->data;

return data;
}

/*******************************************************************************
* Function Name : blockqueue_isempty
* Description : blockqueue is empty or not
* Input : blockqueue_t *queue //blockqueue point
* Output : void
* Return : unsigned int // blockqueue item size
*******************************************************************************/
unsigned int blockqueue_isempty(blockqueue_t *queue)
{
return queue->size;
}
#if 0
void print_queue_items(blockqueue_t *queue)
{
void *tmp;
blockqueue_data_t data;

//while (queue->items) {
while (queue->size) {
data = blockqueue_pop(queue);
fprintf(stderr, “size:%d data:%s len:%u\n”, queue->size, (char *)data.data, data.len);
}
}

void *pthread_job(void *arg)
{
blockqueue_data_t data;
blockqueue_t *queue = (blockqueue_t *)arg;

while(1){
data = blockqueue_pop(queue);
fprintf(stderr, “queue size:%d job-data:%s len:%u\n”, queue->size, (char *)data.data, data.len);
sleep(1);
}
}

int main(int argc, char *argv[])
{
blockqueue_t *queue;
pthread_t tid;

queue = blockqueue_init();
blockqueue_push(queue, “data 1”, sizeof(“data 1”));
blockqueue_push(queue, “data 2”, sizeof(“data 2”));

pthread_create(&tid, NULL, pthread_job, queue);

while(1){
blockqueue_push(queue, “data 3”, sizeof(“data 3”));
sleep(3);
}

//print_queue_items(queue);

return 0;
}
#endif

blockqueue.h

发表评论

电子邮件地址不会被公开。 必填项已用*标注