From 5fbca6ec014e33d4fe3e7ce9638c29667138513e Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Thu, 9 Jul 2020 18:09:10 +0200 Subject: [PATCH] Ex10: almost able to compile --- ex10/main.cpp | 24 ++++++++++++++++++++ ex10/shared_queue.hh | 29 ++++++++++++++++++++---- ex10/threadpool.cc | 54 ++++++++++++++++++++++++++++++++++++++++++++ ex10/threadpool.hh | 54 ++++++++++++++++++++------------------------ 4 files changed, 126 insertions(+), 35 deletions(-) create mode 100644 ex10/main.cpp create mode 100644 ex10/threadpool.cc diff --git a/ex10/main.cpp b/ex10/main.cpp new file mode 100644 index 0000000..125ad4e --- /dev/null +++ b/ex10/main.cpp @@ -0,0 +1,24 @@ +#include "shared_queue.hh" +#include "threadpool.hh" + +#include +#include +int main() { + + std::cout << "Set up ThreadPool" << std::endl; + ThreadPool tp; + + std::cout << "Add Producer" << std::endl; + tp.newProducer(); + +// std::cout << "Add Consumers" << std::endl; +// tp.newConsumer(); +// tp.newConsumer(); + + std::cout << "Run" << std::endl; + tp.run(); + + std::cout << "Finishing up" << std::endl; + + return 0; +} diff --git a/ex10/shared_queue.hh b/ex10/shared_queue.hh index 4ba5ff9..deb5e9b 100644 --- a/ex10/shared_queue.hh +++ b/ex10/shared_queue.hh @@ -8,11 +8,30 @@ template class SharedQueue { - // Queue - std::queue queue; + public: + SharedQueue() : + _q(), + _q_mutex(), + _q_cv() + {}; - // Synchronisation - std::mutex mutex; - std::condition_variable cv; + SharedQueue( const SharedQueue& other ) + { + // (Read)Lock the other queue for copying + std::unique_lock other_lock(other._mutex); + _q = other._q; + } + + std::queue * queue() { return& _q; } + std::mutex * mutex() { return& _q_mutex; } + std::condition_variable * cv() { return& _q_cv; } + + private: + // Queue + std::queue _q; + + // Synchronisation + mutable std::mutex _q_mutex; + mutable std::condition_variable _q_cv; }; #endif diff --git a/ex10/threadpool.cc b/ex10/threadpool.cc new file mode 100644 index 0000000..1f57809 --- /dev/null +++ b/ex10/threadpool.cc @@ -0,0 +1,54 @@ +// ThreadPool.cc +#include "threadpool.hh" +#include "shared_queue.hh" + +#include +#include +#include +#include + +void producer( const int id, SharedQueue& q, int max_t, int max_n ) +{ + std::random_device d; + std::mt19937 mt(d()); + std::uniform_int_distribution<> distr(0., max_t); + + for ( int n = 0; n < max_n ; n++ ) + { + n = distr(mt); + std::this_thread::sleep_for(std::chrono::milliseconds(n)); + + // Lock q so we can push a new element + std::unique_lock lock(*q.mutex()); + q.queue()->push(n); + + // Notify one of the consumers + q.cv()->notify_one(); + } +} +/* +void consumer( const int id, SharedQueue q_lock(q_mutex); + + while ( q.queue()->empty() ) { + // Wait until we get a signal from the condition variable + q_cv->wait(q_lock); + } + + // Get a value and pop it off + b = q.front(); + q.pop(); + } + + //std::cout << " -- Consumer " << id << " : " << b << std::endl; + + // Slow down this thread to have a race between the consumers and the producers + // This only works after we have successfully read an int + std::this_thread::sleep_for(std::chrono::milliseconds(max_t)); + } +} +*/ diff --git a/ex10/threadpool.hh b/ex10/threadpool.hh index c47ccb6..4baabb8 100644 --- a/ex10/threadpool.hh +++ b/ex10/threadpool.hh @@ -7,60 +7,54 @@ #ifndef THREAD_POOL_H #define THREAD_POOL_H +// Implementations for producers and consumers +void producer( const int id, SharedQueue& _q, int max_t, int max_n ); +void consumer( const int id, SharedQueue& _q, int max_t, int max_n ); + class ThreadPool { public: - - /** - * @param int max_t Maximum time in milliseconds for a producer iteration. - * @param int max_n Maximum number of iterations. - * @return std::thread The created thread. - */ - std::thread newProducer( int max_t = 3000, int max_n = 10) + ThreadPool() : + _q(), + _consumers(), + _producers() + {}; + void newProducer( int max_t = 3000, int max_n = 10) { - std::thread t( producer, std::ref(q), producers.size(), max_t, max_n ); - producers.push_back(t); - - return t; + std::thread * tp = new std::thread( producer, _producers.size(), std::ref(_q), max_t, max_n ); + + _producers.push_back(tp); } - std::thread newConsumer( int max_t = 2000, int max_n = 10) + void newConsumer( int max_t = 2000, int max_n = 10) { - std::thread cons( consumer, std::ref(q), consumers.size(), max_t, max_n ); - consumers.push_back(cons); - - return cons; + std::thread * tp = new std::thread( consumer, _consumers.size(), std::ref(_q), max_t, max_n ); + + _consumers.push_back(tp); } void run() { // join all producers - for ( int i = 0; i < producers.size() ; i++ ) + for ( int i = 0; i < _producers.size() ; i++ ) { - producers[i].join(); + (*_producers[i]).join(); } // join all consumers - for ( int i = 0; i < consumers.size() ; i++ ) + for ( int i = 0; i < _consumers.size() ; i++ ) { - consumers[i].join(); + (*_consumers[i]).join(); } } private: - // Implementations for producers and consumers - void static producer( SharedQueue& q, const int id, int max_t, int max_n ); - void static consumer( SharedQueue& q, const int id, int max_t = 2000, int max_n = 10 ); - - // Disallow copying for now - ThreadPool( const ThreadPool& tp ); - // Consumer and Producers threads - std::vector consumers; - std::vector producers; + std::vector _consumers; + std::vector _producers; // queue - SharedQueue q; + SharedQueue _q; }; #endif