TP(Thread Pool,线程池)框架是一种用于管理和调度线程资源的技术,它可以提高系统性能,减少线程创建和销毁的开销。在TP框架中,消息队列是一种常用的任务调度和处理机制。以下是一个简单的消息队列实现:
class Task {
public:
virtual void execute() = 0; // 纯虚函数,子类需要实现具体的任务处理逻辑
};
#include<queue>
#include <mutex>
#include<condition_variable>
class MessageQueue {
public:
void push(Task* task) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(task);
cv_.notify_one(); // 通知等待的线程有新任务到来
}
Task* pop() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); }); // 当队列为空时,线程等待
Task* task = queue_.front();
queue_.pop();
return task;
}
private:
std::queue<Task*> queue_;
std::mutex mutex_;
std::condition_variable cv_;
};
#include<vector>
#include<thread>
class ThreadPool {
public:
ThreadPool(size_t num_threads, MessageQueue& message_queue) : message_queue_(message_queue) {
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back(&ThreadPool::worker, this);
}
}
~ThreadPool() {
for (auto& thread : threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
void add_task(Task* task) {
message_queue_.push(task);
}
private:
void worker() {
while (true) {
Task* task = message_queue_.pop();
if (task) {
task->execute();
delete task; // 任务执行完毕后,释放任务对象内存
}
}
}
std::vector<std::thread> threads_;
MessageQueue& message_queue_;
};
class MyTask : public Task {
public:
MyTask(int data) : data_(data) {}
void execute() override {
// 在这里实现具体的任务处理逻辑
std::cout << "Processing task with data: "<< data_<< std::endl;
}
private:
int data_;
};
int main() {
MessageQueue message_queue;
ThreadPool thread_pool(4, message_queue); // 创建一个包含4个工作线程的线程池
// 添加任务到线程池
for (int i = 0; i < 10; ++i) {
thread_pool.add_task(new MyTask(i));
}
// 主线程等待用户输入,以便观察任务执行情况
std::cin.get();
return 0;
}
这个简单的TP框架中的消息队列实现可以根据实际需求进行扩展和优化。例如,可以添加任务优先级、限制队列大小等功能。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。