您的位置:首页 > 其它

[C]只存储整形的无锁队列

2015-10-02 15:00 453 查看
/*
* lock free queue, thread safe, int only
* this paper "A practical nonblocking queue algorithm using compare-and-swap" provide the main idea
* queue.hpp
* enqueue into the tail
* dequeue from head
*/

#ifndef __LOCK_FREE_RING_QUEUE_INT_H__
#define __LOCK_FREE_RING_QUEUE_INT_H__

typedef int int_t;
typedef long long int_2t;

#define sync_get_2t(val_ptr)					__sync_fetch_and_add((val_ptr),0)
#define sync_inc_t(val_ptr)						__sync_fetch_and_add((val_ptr),1)
#define sync_dec_t(val_ptr)						__sync_fetch_and_sub((val_ptr),1)
#define sync_cas_2t(val_ptr,old_val,new_val)	__sync_bool_compare_and_swap((val_ptr),(old_val),(new_val))

#define int_2t_low_part_ptr(x)		((int_t*)(&(x)))
#define int_2t_high_part_ptr(x)		(((int_t*)(&(x)))+1)
#define int_2t_low_part(x)			(*(int_2t_low_part_ptr(x)))
#define int_2t_high_part(x)			(*(int_2t_high_part_ptr(x)))

typedef struct{
int_2t *m_data;
int_2t m_head;
int_2t m_tail;
int_t  m_cap;
int_t m_null;
int_t  m_size;
}lfrq_queue_int_t;

lfrq_queue_int_t* lfrq_queue_int_create(int_t cap,int_t null_val);
void lfrq_queue_int_free(lfrq_queue_int_t *p);
int lfrq_queue_int_enqueue(lfrq_queue_int_t *p, int_t val); //if sucess return 0, else return -1;
int lfrq_queue_int_dequeue(lfrq_queue_int_t *p, int_t *val_ptr);
int_t lfrq_queue_int_size(lfrq_queue_int_t *p);
int_t lfrq_queue_int_null(lfrq_queue_int_t *p);

#endif

=======================

/*
* lock free queue, thread safe, int only
* this paper "A practical nonblocking queue algorithm using compare-and-swap" provide the main idea
* queue.hpp
* enqueue into the tail
* dequeue from head
*/

#include"lfrq_queue_int.h"
#include<stdlib.h>

lfrq_queue_int_t* lfrq_queue_int_create(int_t cap,int_t null_val)
{
int i;
if(cap<1) return NULL;
lfrq_queue_int_t *p = (lfrq_queue_int_t*)malloc(sizeof(lfrq_queue_int_t));
if(p==NULL) return NULL;
p->m_cap=cap;
p->m_null=null_val;
p->m_data =(int_2t*)calloc(cap,sizeof(int_2t));
if(p->m_data==NULL){
free(p);
return NULL;
}
for(i=0;i<cap;++i){
int_2t_low_part(p->m_data[i])=null_val;
int_2t_high_part(p->m_data[i])=0;
}
p->m_head=0;
p->m_tail=0;
p->m_size=0;
return p;
}

void lfrq_queue_int_free(lfrq_queue_int_t *p)
{
if(p->m_cap>0){
free(p->m_data);
free(p);
}
}

int lfrq_queue_int_enqueue(lfrq_queue_int_t *p, int_t val)
{
for(;;){
int_2t tail = sync_get_2t(&(p->m_tail));
int_2t head = sync_get_2t(&(p->m_head));
int_t tail_ptr = int_2t_low_part(tail);
int_t head_ptr = int_2t_low_part(head);
int_t tail_next_ptr = tail_ptr+1;
if(tail_next_ptr==p->m_cap) tail_next_ptr=0;
if(tail!=sync_get_2t(&(p->m_tail))) continue;
if(tail_next_ptr==head_ptr) return -1;
int_2t tail_ptr_old = sync_get_2t(&(p->m_data[tail_ptr]));
int_t tail_ptr_old_val = int_2t_low_part(tail_ptr_old);
if(tail_ptr_old_val==p->m_null){
int_2t tail_ptr_new = tail_ptr_old;
int_2t_low_part(tail_ptr_new) = val;
int_2t_high_part(tail_ptr_new) = int_2t_high_part(tail_ptr_old)+1;
if(sync_cas_2t(&(p->m_data[tail_ptr]),tail_ptr_old,tail_ptr_new)){
int_2t tail_new = tail;
int_2t_low_part(tail_new)=tail_next_ptr;
int_2t_high_part(tail_new)=int_2t_high_part(tail)+1;
sync_cas_2t(&(p->m_tail),tail,tail_new);
sync_inc_t(&(p->m_size));
return 0;
}
}
else{
int_2t tail_new = tail;
int_2t_low_part(tail_new)=tail_next_ptr;
int_2t_high_part(tail_new)=int_2t_high_part(tail)+1;
sync_cas_2t(&(p->m_tail),tail,tail_new);
}
}
}

int lfrq_queue_int_dequeue(lfrq_queue_int_t *p, int_t *val_ptr)
{
for(;;){
int_2t head = sync_get_2t(&(p->m_head));
int_2t tail = sync_get_2t(&(p->m_tail));
int_t head_ptr = int_2t_low_part(head);
int_t tail_ptr = int_2t_low_part(tail);
if(head!=sync_get_2t(&(p->m_head))) continue;
if(head_ptr==tail_ptr) return -1;
int_2t head_ptr_old = sync_get_2t(&(p->m_data[head_ptr]));
int_t head_ptr_old_val = int_2t_low_part(head_ptr_old);
if(head_ptr_old_val!=p->m_null){
int_2t head_ptr_new = head_ptr_old;
int_2t_low_part(head_ptr_new)=p->m_null;
int_2t_high_part(head_ptr_new)=int_2t_high_part(head_ptr_old)+1;
if(sync_cas_2t(&(p->m_data[head_ptr]),head_ptr_old,head_ptr_new)){
int_2t head_new = head;
int_t head_new_ptr = int_2t_low_part(head)+1;
if(head_new_ptr==p->m_cap) head_new_ptr=0;
int_2t_low_part(head_new)=head_new_ptr;
int_2t_high_part(head_new)=int_2t_high_part(head)+1;
sync_cas_2t(&(p->m_head),head,head_new);
*val_ptr=head_ptr_old_val;
sync_dec_t(&(p->m_size));
return 0;
}
}
else{
int_2t head_new = head;
int_t head_new_ptr = int_2t_low_part(head)+1;
if(head_new_ptr==p->m_cap) head_new_ptr=0;
int_2t_low_part(head_new)=head_new_ptr;
int_2t_high_part(head_new)=int_2t_high_part(head)+1;
sync_cas_2t(&(p->m_head),head,head_new);
}
}
}
int_t lfrq_queue_int_size(lfrq_queue_int_t *p) {return p->m_size;}
int_t lfrq_queue_int_null(lfrq_queue_int_t *p) {return p->m_null;}
=========================

#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include"lfrq_queue_int.h"

#define QUEUE_SIZE	100
#define THREAD_NUM	100
#define LOOP_NUM	1000000

lfrq_queue_int_t *q;

int total=1;

void* producer(void*p)
{
int d;
int i;
for(i=0;i<LOOP_NUM;++i){
d=__sync_fetch_and_add(&total,1);
while(lfrq_queue_int_enqueue(q,d));
}
}

void* consumer(void*p)
{
int d;
int i;
long long*cnt=(long long*)p;
for(i=0;i<LOOP_NUM;++i){
while(lfrq_queue_int_dequeue(q,&d));
*cnt += d;
}
}

int main()
{
pthread_t threads[THREAD_NUM*2];
long long cnt[THREAD_NUM];
long long sum=0;
long long expect=THREAD_NUM*LOOP_NUM;
int i;
q=lfrq_queue_int_create(QUEUE_SIZE,-1);
if(q==NULL){
printf("create lfrq error\n");
return 0;
}
for(i=0;i<THREAD_NUM;++i) cnt[i]=0;
for(i=0;i<THREAD_NUM;++i){
if(pthread_create(threads+i,NULL,producer,NULL)){
printf("Error at create thread %d\n",i);
}
}
for(i=0;i<THREAD_NUM;++i){
if(pthread_create(threads+i+THREAD_NUM,NULL,consumer,cnt+i)){
printf("Error at create thread %d\n",i+THREAD_NUM);
}
}
for(i=0;i<THREAD_NUM*2;++i){
if(pthread_join(threads[i],NULL)){
printf("Error at join thread %d\n",i);
}
}
for(i=0;i<THREAD_NUM;++i){
sum+=cnt[i];
printf("%d=%lld\n",i,cnt[i]);
}
printf("Actual Value : %lld\n",sum);
expect=(expect*(expect+1))/2;
printf("Expect Value : %lld\n",expect);
lfrq_queue_int_free(q);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: