线程池与线程封装
1. 线程池
1. ThreadPool.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
| #pragma once #include <iostream> #include <vector> #include <string> #include <queue> #include <pthread.h> #include <unistd.h> using namespace std;
struct ThreadInfo { pthread_t tid; string name; };
static const int default_num = 5;
template <class T> class ThreadPool { public: void Lock() { pthread_mutex_lock(&mutex_); } void Unlock() { pthread_mutex_unlock(&mutex_); } void Wakeup() { pthread_cond_signal(&cond_); } void ThreadSleep() { pthread_cond_wait(&cond_, &mutex_); } bool IsQueueEmpty() { return tasks_.empty(); } string GetThreadName(pthread_t tid) { for (const auto &ti : threads_) { if (ti.tid == tid) return ti.name; }
return "Unknown"; }
public: static void *HandlerTask(void *args) { ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); string name = tp->GetThreadName(pthread_self()); while (true) { tp->Lock(); while (tp->IsQueueEmpty()) { cout << name << " 等待任务..." << endl; tp->ThreadSleep(); } T t = tp->Pop(); tp->Unlock();
t(); cout << name << " 运行任务,结果是:" << t.get_result() << endl; } return nullptr; } void Start() { int num = threads_.size(); cout << "启动 " << num << " 个线程..." << endl; for (int i = 0; i < num; i++) { threads_[i].name = "[线程 " + to_string(i + 1) + "]"; pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this); cout << "创建线程 " << threads_[i].name << endl; } } T Pop() { T t = tasks_.front(); tasks_.pop(); return t; } void Push(const T &t) { Lock(); tasks_.push(t); Wakeup(); Unlock(); } static ThreadPool<T> *GetInstance() { if (nullptr == tp_) { pthread_mutex_lock(&lock_); if (nullptr == tp_) { cout << "log:单例线程池首次创建完成!" << endl; tp_ = new ThreadPool<T>(); } pthread_mutex_unlock(&lock_); }
return tp_; }
private: ThreadPool(int num = default_num) : threads_(num) { pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&cond_, nullptr); } ~ThreadPool() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&cond_); } ThreadPool(const ThreadPool<T> &) = delete; const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
private: vector<ThreadInfo> threads_; queue<T> tasks_;
pthread_mutex_t mutex_; pthread_cond_t cond_;
static ThreadPool<T> *tp_; static pthread_mutex_t lock_; };
template <class T> ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T> pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
|
2. Task.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
| #pragma once #include <iostream> #include <string> using namespace std;
const string opers = "+-*/%";
enum { SUCCESS = 0, DIV_ERROR = 1, MOD_ERROR = 2, UNKNOWN_ERROR = 3 };
class Task { public: Task() :_x(0), _y(0), _op('+'), _ret(0), _code(SUCCESS) {} Task(int x, int y, char op = '+') :_x(x), _y(y), _op(op), _ret(0), _code(SUCCESS) { if(op != '+' && op != '-' && op != '*' && op != '/' && op != '%') { _op = '+'; _code = UNKNOWN_ERROR; } }
void run() { _ret = 0; _code = SUCCESS; switch(_op) { case '+': _ret = _x + _y; break; case '-': _ret = _x - _y; break; case '*': _ret = _x * _y; break; case '/': if(_y == 0) { _code = DIV_ERROR; _ret = 0; } else { _ret = _x / _y; } break; case '%': if(_y == 0) { _code = MOD_ERROR; _ret = 0; } else { _ret = _x % _y; } break; default: _code = UNKNOWN_ERROR; _ret = 0; break; } }
void operator()() { run(); }
string get_task() const { return to_string(_x) + _op + to_string(_y) + "= ???"; }
string get_ret() const { string ret = to_string(_x) + _op + to_string(_y) + "=" + to_string(_ret) + " [错误代码:" + to_string(_code) + "]"; return ret; }
char get_operator() const { return _op; }
int get_first_operand() const { return _x; }
int get_second_operand() const { return _y; }
int get_result() const { return _ret; }
int get_error_code() const { return _code; }
~Task() { }
private: int _x, _y; int _ret; char _op; int _code; };
|
3. Main.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| #include "ThreadPool.hpp" #include "Task.hpp" #include <signal.h>
volatile bool running = true;
void signalHandler(int signum) { cout << "\n接收到信号 " << signum << ",正在退出..." << endl; running = false; }
int main() { signal(SIGINT, signalHandler); signal(SIGTERM, signalHandler); cout << "线程池启动中..." << endl; sleep(1); ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance(); tp->Start(); srand(time(nullptr) ^ getpid()); while(running) { int x = rand() % 20 + 1; usleep(10); int y = rand() % 10; char op = opers[rand() % opers.size()];
Task t(x, y, op); tp->Push(t); cout << "[主线程] 创建任务: " << t.get_task() << endl;
sleep(1); } cout << "程序正常退出" << endl; return 0; }
|
2. C++ 语言层面上的线程封装 demo(简易)
MyThread.hpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| #pragma once #include <iostream> #include <pthread.h> #include <string> #include <ctime> using namespace std;
typedef void (*callback_t)(); static int thread_num = 0;
class Thread { public: static void* thread_func(void* arg) { Thread* thread = static_cast<Thread*>(arg); thread->Enter_callback();
} public: Thread(callback_t cb) :tid_(0), name_(""), start_timestamp_(0), isrunning_(false), cb_(cb) {
}
~Thread() {
}
void run() { name_ = "thread-" + to_string(thread_num++); start_timestamp_ = time(nullptr); isrunning_ = true; pthread_create(&tid_, nullptr, thread_func, this); }
void Enter_callback() { cb_(); }
bool is_runing() { return isrunning_; }
uint64_t start_timestamp() { return start_timestamp_; }
string name() { return name_; }
void join() { pthread_join(tid_, nullptr); isrunning_ = false; }
private: pthread_t tid_; string name_; uint64_t start_timestamp_; bool isrunning_;
callback_t cb_; };
|
2. Main.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| #include "MyThread.hpp" #include <vector> #include <unistd.h>
int threads_num = 3;
void print() { int count = 0; while(count < threads_num) { cout << "我是一个 C++ 语言层面上封装的线程!" << "代号是:" << count++ << endl; sleep(1); }
cout << "所有线程都结束了!" << endl; }
int main() { vector<Thread> threads; for (int i = 0; i < threads_num; ++i) { threads.push_back(Thread(print)); }
cout << "开始启动所有线程喽~" << endl;
for(auto &x : threads) { x.run(); cout << "线程 " << x.name() << " 启动成功!其时间戳的值是:" << x.start_timestamp() << endl; }
for(auto &x : threads) { x.join(); }
cout << "所有线程都结束了!" << endl;
return 0; }
|
当然,这个 demo 仅实现了两个函数的封装,也没有进行加锁,会导致数据竞争,整体来说并不完美,仅为了展示如何实现底层封装。