MyException - 我的异常网
当前位置:我的异常网» 操作系统 » 一行来学POSIX thread 之 综合应用——线程池

一行来学POSIX thread 之 综合应用——线程池

www.MyException.Cn  网友分享于:2013-02-20  浏览:7次
一起来学POSIX thread 之 综合应用——线程池

一起来学POSIX thread 之 综合应用——线程池

1、为什么需要线程池?

部分应用程序需要执行很多细小的任务,对于每个任务都创建一个线程来完成,任务完成后销毁线程,而这就会产生一个问题:当执行的任务所需要的时间T1小于等于创建线程时间T2和销毁线程时间T3总和时即T1 <= T2 + T3,应用处理任务的响应能力会大大减弱,从而影响了应用程序性能,为了解决这类问题,线程池技术提供了很好的解决方案。线程池顾名思义就是把线程资源池化,在应用启动时一次性创建合适数量的线程,当需要执行任务时就从线程池中分配一个已经创建好的线程来执行,执行完在把线程归还,只在应用停时再一次性销毁所有的线程。

2、线程池的基本组成部分

一个简单的线程池至少包括下列的组成部分:

1)线程池管理器(ThreadPool:用于创建并管理线程池

2)工作线程(WorkThread:线程池中线程

3)任务接口(Task:每个任务必须实行的接口,以供工作线程调度任务执行。

3、Unix下的线程池实现

将给大家展示的线程池实现的类如下:

1CMutex:互斥量类,对POSIX互斥量的C++封装

2CAutoMutex:利用C++构造函数和析构函数的特性,实现CAutoMutex变量生命周期内对共享数据的加锁互斥量。

3CTask:任务的抽象基类,任何具体的任务都要继承该类,并实现自己的void * run()函数。

4CThread:线程基类,对POSIX线程的C++封装。

5CWorkThread:工作线程类,实现了执行任务的基本接口

6CThreadPool:线程池管理类,实现线程的创建管理和任务调度。

一下提供具体实现代码和一个简单的Demo如下:

AutoMutex.h

/**
  *	Author: ACb0y
  *	FileName: AutoMutex.h
  *	CreateTime: 2013年2月14日15:37:12
  *	Version: 1.0
  */

#ifndef __AUTO_MUTEX_LRL_20130214_HEADER__
#define __AUTO_MUTEX_LRL_20130214_HEADER__

#include "Mutex.h"

/*
	
  */
class CAutoMutex
{
///属性
public:
	///nothing.
protected:
	///nothing.
private:
	CMutex * m_pMutex;
	
///构造函数&& 析构函数
public:
	CAutoMutex(CMutex * pMutex)
	{
		m_pMutex = pMutex;
		m_pMutex->lock();
	}
	
	~CAutoMutex()
	{
		m_pMutex->unlock();
	}
protected:
	///nothing.
private:
	///nothing.
	
///服务
public:
	///nothing.
protected:
	///nothing.
private:
	///nothing.
};

#endif


Mutex.h

/**
  *	Author: ACb0y
  *	FileName: Mutex.h
  *	CreateTime: 2013年2月4日9:33:57
  *	Version: 1.0
  */

#ifndef __MUTEX_LRL_20130204_HEADER__
#define __MUTEX_LRL_20130204_HEADER__

#include <pthread.h>

class CMutex
{
///属性
public:
	///nothing.
protected:
	///nothing.
private:
	pthread_mutex_t m_sMutex;
	
///构造函数&&析构函数
public:
	CMutex(int nShared = PTHREAD_PROCESS_PRIVATE, int nType = PTHREAD_MUTEX_NORMAL);
	~CMutex();
protected:
	///nothing.
private:
	///禁止用户调用拷贝构造函数和赋值操作,调用时会编译报错
	CMutex(const CMutex & cMutex) 
	{
		///nothing.
	}
	CMutex & operator = (const CMutex & cMutex)
	{
		return *this;
	}

///服务
public:
	int lock();
	int unlock();
	int trylock();
protected:
	///nothing.
private:
	///nothing.
};

inline int CMutex::lock()
{
	return pthread_mutex_lock(&m_sMutex);
}

inline int CMutex::unlock()
{
	return pthread_mutex_unlock(&m_sMutex);
}

inline int CMutex::trylock()
{
	return pthread_mutex_trylock(&m_sMutex);
}

#endif

Mutex.cpp

/**
  *	Author: ACb0y
  *	FileName: Mutex.cpp
  *	CreateTime: 2013年2月4日15:42:15
  *	Version: 1.0
  */


#include "Mutex.h"

CMutex::CMutex(int nShared, int nType)
{
	pthread_mutexattr_t attr;
	pthread_mutexattr_init(&attr);
	pthread_mutexattr_setpshared(&attr, nShared);
	pthread_mutexattr_settype(&attr, nType);
	pthread_mutex_init(&m_sMutex, &attr);
	pthread_mutexattr_destroy(&attr);
}

CMutex::~CMutex()
{
	pthread_mutex_destroy(&m_sMutex);
}


Task.h

/**
  *	Author: ACb0y
  *	FileName: Task.h
  *	CreateTime: 2013年2月13日23:26:44
  *	Version: 1.0
  */

#ifndef __TASK_LRL_20130213_HEADER__
#define __TASK_LRL_20130213_HEADER__

/*
	抽象任务类
  */
class CTask
{
///属性
public:
	///nothing.
protected:
	///nothing.
private:
	///nothing.

///构造函数&&析构函数
public:
	///nothing.
protected:
	///nothing.
private:
	///nothing.

///服务
public:
	virtual void * run() = 0;
protected:
	///nothing.
private:	
	///nothing.
};


#endif


Thread.h

/**
  *	Author: ACb0y
  *	FileName: Thread.h
  *	CreateTime: 2013年2月13日12:24:02
  *	Version: 1.0
  */

#ifndef __THREAD_LRL_20130213_HEADER__
#define __THREAD_LRL_20130213_HEADER__

#include <pthread.h>
#include <sched.h>

///定义线程启动函数指针
typedef void * (* pFuncThreadStart)(void *);

/*
	对POSIX线程操作的C++封装
  */
class CThread
{
///属性
public:
	enum EThreadState
	{
		ERR_ALREADERY_INITIALIZED = -6,
		ERR_AT_CREATE_THREAD = -5,
		ERR_AT_CREATE_SEM = -4,
		ERR_NO_TASK = -3,
		ERR_NOT_IDLE = -2,
		UNINITIALIZED = -1,
		IDLE = 0,
		RUNNING = 1,
		QUITED = 9
	};
protected:
	///nothing.
private:
	pthread_t m_nThreadId;
	
///构造函数&&虚构函数
public:
	CThread();
	virtual ~ CThread();
protected:
	///nothing.
private:
	///nothing.
	
///服务
public:
	int create(pFuncThreadStart pFuncStartRoutine, void * pArg, bool bDetached = false, bool bSetScope = false);
	int detach();
	int join(void ** pRetValue = NULL);
	void exit(void * pRetValue = NULL);
	void yield();
	void reset();
	bool isCurrent();
	pthread_t getThreadId();
protected:
	///nothing.
private:
	///nothing.
};

inline pthread_t CThread::getThreadId()
{
	return m_nThreadId;
}

inline int CThread::detach()
{
	return pthread_detach(m_nThreadId);
}

inline int CThread::join(void ** pRetValue)
{
	return pthread_join(m_nThreadId, pRetValue);
}

inline void CThread::exit(void * pRetValue)
{
	if (isCurrent())
	{
		pthread_exit(pRetValue);
	}
}

inline bool CThread::isCurrent()
{
	if (pthread_equal(m_nThreadId, pthread_self()) != 0)
	{
		return true;	///表明是同一线程
	}
	else
	{
		return false;
	}
}

inline void CThread::yield()
{
	sched_yield();
}

inline void CThread::reset()
{
	join();
	m_nThreadId = -1;
}


#endif



Thread.cpp

/**
  *	Author: ACb0y
  *	FileName: Thread.cpp
  *	CreateTime: 2013年2月13日14:16:20
  *	Version: 1.0
  */

#include "Thread.h"

CThread::CThread() : m_nThreadId(-1)
{
	///nothing.
}

CThread::~CThread()
{
	///nothing.
}

int CThread::create(pFuncThreadStart pFuncStartRoutine, void * pArg, bool bDetached, bool bSetScope)
{
	pthread_attr_t sThread_attr;
	int nStatus;
	nStatus = pthread_attr_init(&sThread_attr);
	if (nStatus != 0)
	{
		return -1;
	}

	if (bDetached)
	{
		nStatus = pthread_attr_setdetachstate(&sThread_attr, PTHREAD_CREATE_DETACHED);
		if (nStatus != 0)
		{
			pthread_attr_destroy(&sThread_attr);
			return -1;
		}
	}

	if (bSetScope)
	{
		nStatus = pthread_attr_setscope(&sThread_attr, PTHREAD_SCOPE_SYSTEM);
		if (nStatus != 0)
		{
			pthread_attr_destroy(&sThread_attr);
			return -1;
		}
	}

	nStatus = pthread_create(&m_nThreadId, &sThread_attr, pFuncStartRoutine, pArg);
	pthread_attr_destroy(&sThread_attr);
	return nStatus;
}




WorkThread.h

/**
  *	Author: ACb0y
  *	FileName: WorkThread.h
  *	CreateTime: 2013年2月14日12:20:43
  *	Version: 1.0
  */

#ifndef __WORK_THREAD_LRL_20130214_HEADER__
#define __WORK_THREAD_LRL_20130214_HEADER__

#include <semaphore.h>
#include "Thread.h"
#include "Task.h"

/*
	工作线程
  */
class CWorkThread : public CThread
{
///属性
public:
	///nothing.
protected:
	///线程编号
	int m_nNo;
	int m_nStatus;
	sem_t * m_pSem;
	bool m_bNeedQuit;
	bool m_bAutoFinish;
	CTask * m_pTask;
private:
	///nothing.

///构造函数&&析构函数
public:
	CWorkThread(int nNo = 0);
	~CWorkThread();
protected:
	///nothing.
private:
	///nothing.
	
///服务
public:
	int getNo();
	int getStatus();
	///创建线程并将线程挂起
	///调用成功返回IDLE
	int initialize();
	void setTask(CTask * pTask);
	///设置线程执行一次后自动退出
	void setAutoFinish();
	///释放信号量让线程恢复运行,启动成功返回RUNNING
	///当Task执行结束后,线程自动变成IDLE状态
	///如果之前有调用setAutoFinish,则线程自动结束,状态变成了QUITED
	int run();
	///通知并等待线程退出
	void finish();
protected:
	static void * doRun(void * pContext);
private:
	///nothing.
};

inline void CWorkThread::setTask(CTask * pTask)
{
	m_pTask = pTask;
}

inline int CWorkThread::getNo()
{
	return m_nNo;
}

inline int CWorkThread::getStatus()
{
	return m_nStatus;
}

inline void CWorkThread::setAutoFinish()
{
	m_bAutoFinish = true;
}


#endif


WorkThread.cpp

/**
  *	Author: ACb0y
  *	FileName: WorkThread.cpp
  *	CreateTime: 2013年2月14日13:01:35
  *	Version: 1.0
  */

#include "WorkThread.h"
#include "errno.h"

CWorkThread:: CWorkThread(int nNo)
	:m_pTask(NULL), m_nStatus(UNINITIALIZED), m_bNeedQuit(false), m_bAutoFinish(false), m_nNo(nNo), m_pSem(NULL)
{
	///nothing.
}

CWorkThread::~CWorkThread()
{
	finish();
}

int CWorkThread::initialize()
{
	m_pSem = new sem_t;
	if (m_nStatus != UNINITIALIZED && m_nStatus != QUITED)
	{
		return ERR_ALREADERY_INITIALIZED;
	}

	///创建一个线程间共享初值为0的匿名信号量
	if (sem_init(m_pSem, 0, 0) < 0)
	{
		return ERR_AT_CREATE_SEM;
	}

	if (create(&doRun, (void *)this) < 0)
	{
		return ERR_AT_CREATE_THREAD;
	}

	if (m_bNeedQuit)
	{
		m_bNeedQuit = false;
	}

	if (m_bAutoFinish)
	{
		m_bAutoFinish = false;
	}

	m_nStatus = IDLE;
	return m_nStatus;
}

int CWorkThread::run()
{
	if (m_nStatus != IDLE)
	{
		return ERR_NOT_IDLE;
	}

	if (NULL == m_pTask)
	{
		return ERR_NO_TASK;
	}

	m_nStatus = RUNNING;
	sem_post(m_pSem);
	return m_nStatus;
}

void CWorkThread::finish()
{
	if (m_nStatus != UNINITIALIZED && m_nStatus != QUITED)
	{
		m_bNeedQuit = true;
		sem_post(m_pSem);
		reset();
		sem_destroy(m_pSem);
		delete m_pSem;
		m_pSem = NULL;
	}
}

void * CWorkThread::doRun(void * pArg)
{
	CWorkThread * pWorkThread = (CWorkThread *)pArg;
	CTask * pTask = pWorkThread->m_pTask;
	
	while (!pWorkThread->m_bNeedQuit)
	{
		sem_wait(pWorkThread->m_pSem);
		if (RUNNING == pWorkThread->m_nStatus)
		{
			if (NULL == pTask)
			{
				pWorkThread->m_nStatus = ERR_NO_TASK;
			}
			else
			{
				///执行任务
				pTask->run();	
				pWorkThread->m_nStatus = IDLE;
			}
		}

		if (pWorkThread->m_bAutoFinish)
		{
			pWorkThread->detach();
			break;
		}
	}

	pWorkThread->m_nStatus = QUITED;
	return (void *)0;
}



ThreadPool.h

 /**
  *	Author: ACb0y
  *	FileName: ThreadPool.h
  *	CreateTime: 2013年2月14日16:13:26
  *	Version: 1.0
  */

#ifndef __THREAD_LRL_20130214_HEADER__
#define __THREAD_LRL_20130214_HEADER__

#include <string.h>
#include "WorkThread.h"
#include "Mutex.h"
#include "AutoMutex.h"

class CThreadPool
{
///属性
public:
	///nothing.
protected:
	///nothing.
private:
	///线程池大小
	int m_nPoolSize;
	///线程池初始处于挂起状态的线程数
	int m_nInitializeCount;
	///活动中的线程数
	int m_nAliveCount;
	///线程数组
	CWorkThread ** m_pThreads;
	///保护线程数组的互斥量
	CMutex m_cMutex;
///构造函数&& 析构函数
public:
	CThreadPool(int nPoolSize = 128, int nInitializeCount = 0);
	~CThreadPool();
protected:
	///nothing.
private:
	///nothing.
	
///服务
public:
	bool postTask(CTask * pTask);
	int getPoolSize();
	int getInitializeCount();
	int getAliveCount();
	void waitAliveFinish();
protected:
	///nothing.
private:
	///nothing.
};

inline int CThreadPool::getPoolSize()
{
	return m_nPoolSize;
}

inline int CThreadPool::getInitializeCount()
{
	return m_nInitializeCount;
}

inline int CThreadPool::getAliveCount()
{
	CAutoMutex cAutoMutex(&m_cMutex);
	return m_nAliveCount;
}

#endif


ThreadPool.cpp

/**
  *	Author: ACb0y
  *	FileName: ThreadPool.cpp
  *	CreateTime: 2013年2月14日16:57:49
  *	Version: 1.0
  */

#include <iostream>
#include "ThreadPool.h"
using namespace std;

CThreadPool::CThreadPool(int nPoolSize, int nInitializeCount)
	: m_nPoolSize(nPoolSize), m_nInitializeCount(nInitializeCount), m_nAliveCount(0), m_pThreads(NULL)
{
	m_pThreads = new CWorkThread * [nPoolSize];
	if (NULL == m_pThreads)
	{
		return;
	}

	memset(m_pThreads, 0, sizeof(CWorkThread *) * nPoolSize);
	for (int i = 0; i < m_nInitializeCount; ++i)
	{
		m_pThreads[i] = new CWorkThread(i + 1);
		if (NULL == m_pThreads)
		{
			break;
		}
		if (m_pThreads[i]->initialize() != CThread::IDLE)
		{
			break;
		}
		++m_nAliveCount;
	}
}

CThreadPool::~CThreadPool()
{
	if (NULL == m_pThreads)
	{
		return;
	}

	for (int i = 0; i < m_nAliveCount; ++i)
	{
		if (NULL == m_pThreads[i])
		{
			continue;
		}
		m_pThreads[i]->finish();
		delete m_pThreads[i];
		m_pThreads[i] = NULL;
	}
	delete [] m_pThreads;
	m_pThreads = NULL;
}

bool CThreadPool::postTask(CTask * pTask)
{
	bool bPostSuccess = false;
	CAutoMutex cAutoMutex(&m_cMutex);
	for (int i = 0; i < m_nAliveCount; ++i)
	{
		if (m_pThreads[i]->getStatus() == CThread::IDLE)
		{
			m_pThreads[i]->setTask(pTask);
			m_pThreads[i]->run();
			bPostSuccess = true;
			break;
		}
	}

	if (!bPostSuccess && m_nAliveCount < m_nPoolSize)
	{
		m_pThreads[m_nAliveCount] = new CWorkThread(m_nAliveCount + 1);
		if (m_pThreads[m_nAliveCount] != NULL)
		{
			if (m_pThreads[m_nAliveCount]->initialize() == CThread::IDLE)
			{
				m_pThreads[m_nAliveCount]->setTask(pTask);
				m_pThreads[m_nAliveCount]->run();
				++m_nAliveCount;
				bPostSuccess = true;
			}
		}
	}

	return bPostSuccess;
}

void CThreadPool::waitAliveFinish()
{
	if (NULL == m_pThreads)
	{
		return;
	}

	///依次调用所有的活动线程,等待运行结束
	for (int i = 0; i < m_nAliveCount; ++i)
	{
		if (NULL == m_pThreads[i])
		{
			continue;
		}
		m_pThreads[i]->finish();
		delete m_pThreads[i];
		m_pThreads[i] = NULL;
	}

	delete [] m_pThreads;
	m_nAliveCount = 0;
}


Demo文件:ThreadPoolTest.cpp

#include <iostream>
#include "ThreadPool.h"

using namespace std;

class CTest : public CTask
{
public:
	void * run()
	{
		int nCount = 0;
		while (true)
		{
			sleep(1);
			cout << "[" << ++nCount << "] sleep ..." << endl;
			if (nCount >= 3)
			{
				break;
			}
		}
		sleep(100);
	}
};

int main()
{
	CThreadPool cThreadPool(128,1);
	CTest cTest;
	cThreadPool.postTask(&cTest);
	while (true)
	{
		sleep(4);
		cout << "Current AliveCount = " << cThreadPool.getAliveCount() << endl;
		cThreadPool.postTask(&cTest);
		cout << "Add one task." << endl;
	}
}


附带上makefile文件(所有的源文件放在src目录下,makefile文件放在exe目录下,src,obj,exe为统一目录(ThreadPoolTest)下的目录):

MAIN_TARGET=../ThreadPoolTest

SRC_PATH1=../src
SRC_PATH2=../src
OBJ_PATH=../obj

FILE1_1=Thread.cpp
SRC1_1=$(SRC_PATH1)/$(FILE1_1)
OBJ1_1=$(OBJ_PATH)/$(FILE1_1:.cpp=.o)

FILE1_2=WorkThread.cpp
SRC1_2=$(SRC_PATH1)/$(FILE1_2)
OBJ1_2=$(OBJ_PATH)/$(FILE1_2:.cpp=.o)

FILE1_3=Mutex.cpp
SRC1_3=$(SRC_PATH1)/$(FILE1_3)
OBJ1_3=$(OBJ_PATH)/$(FILE1_3:.cpp=.o)

FILE1_4=ThreadPool.cpp
SRC1_4=$(SRC_PATH1)/$(FILE1_4)    
OBJ1_4=$(OBJ_PATH)/$(FILE1_4:.cpp=.o)

FILE2_1=ThreadPoolTest.cpp
SRC2_1=$(SRC_PATH2)/$(FILE2_1)
OBJ2_1=$(OBJ_PATH)/$(FILE2_1:.cpp=.o)

CXX=g++

INCLUDE_PATH=-I$(SRC_PATH1) -I$(SRC_PATH2)

COMPILE_CPP=$(CXX) $(INCLUDE_PATH) -g -o $@ -c $?

COMPILE_LINK=$(CXX) -o  $@ -lpthread

OBJS=$(OBJ1_1) $(OBJ1_2) $(OBJ1_3) $(OBJ1_4) $(OBJ2_1)

$(MAIN_TARGET)::$(OBJS)
	$(COMPILE_LINK) $(OBJS)

$(OBJ1_1)::$(SRC1_1)
	$(COMPILE_CPP)

$(OBJ1_2)::$(SRC1_2)
	$(COMPILE_CPP)

$(OBJ1_3)::$(SRC1_3)
	$(COMPILE_CPP)

$(OBJ1_4)::$(SRC1_4)
	$(COMPILE_CPP)

$(OBJ2_1)::$(SRC2_1)
	$(COMPILE_CPP)

Demo运行结果如下:


运行结果分析如下:初始化创建线程池时,只有一个线程,后续隔4秒就多添加一个任务,此时线程池就动态添加一个线程。


文章评论

软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有