MyException - 我的异常网
当前位置:我的异常网» 操作系统 » barrier and rwlock兑现POSIX源码

barrier and rwlock兑现POSIX源码

www.MyException.Cn  网友分享于:2013-02-20  浏览:7次
barrier and rwlock实现POSIX源码

前言

在《POSIX多线程程序设计》中,作者David R. Butenhof给我们展示了诸多实用pthread_mutex_t 和 pthread_cond_t构建的线程同步工具,我最喜欢的两个是barrier和rwlock。所以用C实现并在虚拟机上爽了一把。先贴出代码以及注释,以供大家查阅,共同进步。

下载源代码

barrier 等待所有线程进入同一状态

barrier用于停止线程,直到所有在barrier的线程都到达当前状态,才返回。barrier经常用于确保某些并行算法中所有合作的线程到达同一点。比如启动了N个线程对某大数组进行分段处理,在所有线程处理完之后,再对结果进行合并或展示,这个时候就可以使用barrier。其作用这时就相当于Win 下的WaitForMutipleObjects。

要使所有线程达到同一状态,然后阻塞,最简单的想法就是保存当前已到达指定状态的线程数,如果线程数量与指定的最大线程数不同,线程进入等待状态,相同,达到预设条件,唤醒其他线程。

要使线程等待,在POSIX中,就理所当然应该使用pthread_cond_t作为等待的对象。同时要修改当前线程的计数,需要一个互斥变量,阻止线程同时读写这个计数。barrier的声明如下:

/*
 * Structure describing a barrier.
 */
typedef struct barrier_tag {
	pthread_mutex_t	mutex;	/* Control access to barrier */
	pthread_cond_t	cv;	/* Wait for barrier */
	int		valid;	/* Set when valid */
	int		threshold;	/* number of threads required */
	int		counter;	/* current number of threads */
	unsigned long	cycle;		/* count cycles */
} barrier_t;
要提供的操作barrier的api也只需要三个:init, wait, destroy。

/*
 * Define barrier operations.
 */
extern int barrier_init(barrier_t * barrier, int count); /* dynamic initialization of barriers */
extern int barrier_destroy(barrier_t * barrier);         /* destroy the barrier */
extern int barrier_wait(barrier_t * barrier);            /* wait until the barrier is actived */

barrier_init

此函数用于初始化barrier_t,分别初始化mutex, cond变量,设置等待线程和需要等待线程总数,设置谓语动词cycle,设置barrier有效标志

/*
 * Initialize a barrier for use.
 */
int barrier_init(barrier_t * barrier, int count)
{
	int status;

	barrier->threshold = barrier->counter = count;
	barrier->cycle = 0;
	status = pthread_mutex_init(&barrier->mutex, NULL);
	if (status != 0) {
		return status;
	}
	status = pthread_cond_init(&barrier->cv, NULL);
	if (status != 0) {
		pthread_mutex_destroy(&barrier->mutex);
		return status;
	}
	barrier->valid = BARRIER_VALID;

	return 0;
}


barrier_destroy

销毁barrier, 为了防止重复销毁barrier, 先对valid标志进行检查,然后置空valid标志,这样就可以阻止其他线程再试图等待该barrier。最后一一销毁mutex和cond。

/*
 * Destroy a barrier when done use it.
 */
int barrier_destroy(barrier_t * barrier)
{
	int status, status2;

	if (barrier->valid != BARRIER_VALID) {
		return EINVAL;
	}

	/* Set barrier invalid. */
	status = pthread_mutex_lock(&barrier->mutex);
	if (status != 0) {
		return status;
	}
	if (barrier->counter != barrier->threshold) {
		pthread_mutex_unlock(&barrier->mutex);
		return EBUSY;
	}
	barrier->valid = 0;
	status = pthread_mutex_unlock(&barrier->mutex);
	if (status != 0) {
		return status;
	}
	
	status = pthread_mutex_destroy(&barrier->mutex);
	status2 = pthread_cond_destroy(&barrier->cv);
	return (status != 0 ? status : status2);
}

barrier_wait

等待所有线程到达同一状态点。试图wait时,先检查该barrier是否有效。然后互斥修改barrier->count。如果count为0,唤醒所有其他线程。否则等待cv。

/*
 * Wait all threads reached.
 */
int barrier_wait(barrier_t * barrier)
{
	int status, cycle, cancel, tmp;
	if (barrier->valid != BARRIER_VALID) {
                return EINVAL;
        }

	status = pthread_mutex_lock(&barrier->mutex);
	if (status != 0) {
		return status;
	}

	cycle = barrier->cycle;

	/* If the last thread arrived, wake others */
	if (--barrier->counter == 0) {
		barrier->counter = barrier->threshold;
		barrier->cycle ++;
		status = pthread_cond_broadcast(&barrier->cv);

		/* The last thread return -1 rather than 0, so that
		 * it can be used to do some special serial code following
		 * the barrier.
		 */
		if (status == 0) {
			status = -1;
		}
	} else {
		/* Wait with cancellation disabled, because barrier_wait
		 * should not be a cancellation point.
		 */
		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel);

		while (cycle == barrier->cycle) {
			status = pthread_cond_wait(&barrier->cv, &barrier->mutex);
			if (status != 0) {
				break;
			}
		}
		pthread_setcancelstate(cancel, &tmp);
	}
	
	pthread_mutex_unlock(&barrier->mutex);
	return status;
}

开始读这段代码时,比较让我迷惑的是当前线程T1先调用pthread_mutex_lock获得mutex的拥有权,然后调用pthread_cond_wait(),最后才调用pthread_mutex_unlock。在该线程T1等待时,并没有释放mutex(pthread_mutex_unlock在方法的最后),当另外一个线程T2要获得mutex时,因为T1已经劫持了这把锁,T2理所当然要进入mutex的等待队列。如果是这样的话这段代码就不应该能够工作了。书中有这样两句话:等待条件变量总是返回锁住的互斥变量。还有,在条件变量上等待会导致以下原子操作:释放相关互斥量,等待其他线程发给该条件变量的信号或广播。也就是说当线程调用pthread_cond_wait时,会释放线程所拥有的mutex,当线程从pthead_cond_wait()返回时,会重新获得mutex的拥有权。如果这样解释起来,这段代码就可以正确运行了。(ps,如果有能力/时间,还是看原著的好,中国的直译式的翻译,是中国软件发展的软肋乎?!)


完整代码以及测试用例,在这里。


rwlock 读写锁

读写锁用于线程之间写异步,读同步操作(当然您也可以反过来,如果确实需要的话:P)。这里给出一个读优先的例子。

实现读写锁,只需要,在写的时候,查看当前是否有读或写线程,如果没有,那么设置当前有一个写线程,执行写操作;如果有等待被唤醒。而在读的时候只需要查看有没有写线程,如果没有,直接去读,不管是否有无其他读线程;如果有,则等待被唤醒。

要设计一个读写锁,需要一个mutex用于保护读写变量的安全,一个可读条件,一个可写条件。另外还需要当前读变量数,当前写变量数,当前等待的读变量数,当前等待的写变量数。

读写锁的结构如下:

typedef struct rwlock_tag{
	pthread_mutex_t mutex;     /* Access locker    */
	pthread_cond_t  read;      /* Wait for read    */
	pthread_cond_t  write;     /* Wait for write   */
	int             r_wait;    /* Waiting readers  */
	int             w_wait;    /* Waiting writers  */
	int             r_active;  /* Activing readers */
	int             w_active;  /* Activing writers */
	int             valid;     /* Set when valid   */
}rwlock_t;

其操作包括:

extern int rwlock_init(rwlock_t * rwlock);  
extern int rwlock_destroy(rwlock_t * rwlock);
extern int rwlock_readlock(rwlock_t * rwlock);
extern int rwlock_tryreadlock(rwlock_t * rwlock);
extern int rwlock_writelock(rwlock_t * rwlock);
extern int rwlock_trywritelock(rwlock_t * rwlock);
extern int rwlock_readunlock(rwlock_t * rwlock);
extern int rwlock_writeunlock(rwlock_t * rwlock);

读写锁的init和destroy与barrier的类似,这里不再赘述。由于读写的加解锁操作类似,只描述一个即可。这里只讲解一下readlock与readunlock的实现。


rwlock_readlock

实现读的原理前面已经叙述过:检查当前有木有正在读的线程,如果木有,增加正在读线程的计数,返回成功。由于读线程可以被取消,为了保证其他的读写线程可以正确工作,不能破坏rwlock的内部变量,当线程被取消时需要进行清理工作。明白原理,直接看代码吧。

static void  rwlock_readcleanup(void * arg) 
{
	rwlock_t * rwlock = (rwlock_t *)arg;
	--rwlock->r_wait;
	pthread_mutex_unlock(&rwlock->mutex);
}


int rwlock_readlock(rwlock_t * rwlock)
{
	int status;

	if (rwlock->valid != RWLOCK_VALID) {
		return EINVAL;
	}

	status = pthread_mutex_lock(&rwlock->mutex);
	if (status != 0) {
		return status;
	}

	if (rwlock->w_active > 0) {
		rwlock->r_wait++;
		/* As read lock allow thread be canceled,
		 * set cleanup to release resource.
		 */
		pthread_cleanup_push(rwlock_readcleanup, (void *)rwlock);

		while (rwlock->w_active > 0) { 
			status = pthread_cond_wait(&rwlock->read,
					 &rwlock->mutex);
			if (status != 0) {
				break;
			}
		}
		pthread_cleanup_pop(0);
		rwlock->r_wait--;
	} 

	if (status == 0) {
		rwlock->r_active ++;
	}

	pthread_mutex_unlock(&rwlock->mutex);
	return status;
}

rwlock_readunlock

读解锁时,只需要减少当前活动读线程计数,如果有待写线程,叫醒写线程。

int rwlock_readunlock(rwlock_t * rwlock)
{
	int status, status2;
	if (rwlock->valid != RWLOCK_VALID) {
		return EINVAL;
	}

	status = pthread_mutex_unlock(&rwlock->mutex);
	if (status != 0) {
		return status;
	}

	if (--rwlock->r_active == 0) {
		if (rwlock->w_wait > 0){
			status = pthread_cond_signal(&rwlock->write);
		}
	}

	status2 = pthread_mutex_unlock(&rwlock->mutex);
	return status != 0 ? status : status2;
}

打完收工!


后记

最近有些迷茫,原因是年龄的增长和自我感觉个人能力的不足以及目标的遥不可及。特别是每当看到大牛写的代码时,各种羡慕嫉妒恨,时常感到一阵压抑,脑中会充满各种问题,自己什么时候才能变成这样?有什么捷径?将来怎么办?

冰冻三尺,非一日之寒;为山九仞,岂一日之功。又:合抱之木,生于毫末;九层之台,起于累土;千里之行,始于足下。不积跬步,无以至千里。所以用心做好自己,从平时的点滴做起,每天进步一点点。关键时刻,然后抓住机遇,实现自己的目标。

大家共勉之!


文章评论

软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有