From c5f7489717c6bc5d98d7dde0838a707349ce4dc7 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Thu, 9 Jul 2020 21:22:39 +0200 Subject: [PATCH] Ex10: CPP code working --- ex10/main.cpp | 8 ++++---- ex10/shared_queue.hh | 8 +++++--- ex10/threadpool.cc | 49 ++++++++++++++++++++++++++++++++------------ ex10/threadpool.hh | 8 ++++++-- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/ex10/main.cpp b/ex10/main.cpp index 125ad4e..f42467d 100644 --- a/ex10/main.cpp +++ b/ex10/main.cpp @@ -9,11 +9,11 @@ int main() { ThreadPool tp; std::cout << "Add Producer" << std::endl; - tp.newProducer(); + tp.addProducer(); -// std::cout << "Add Consumers" << std::endl; -// tp.newConsumer(); -// tp.newConsumer(); + std::cout << "Add Consumers" << std::endl; + tp.addConsumer(); + tp.addConsumer(); std::cout << "Run" << std::endl; tp.run(); diff --git a/ex10/shared_queue.hh b/ex10/shared_queue.hh index deb5e9b..143f802 100644 --- a/ex10/shared_queue.hh +++ b/ex10/shared_queue.hh @@ -1,5 +1,6 @@ // Shared_Queue.hh #include +#include #include #include @@ -17,8 +18,9 @@ class SharedQueue SharedQueue( const SharedQueue& other ) { + std::cout << "Copy Constructor SharedQueue" << std::endl; // (Read)Lock the other queue for copying - std::unique_lock other_lock(other._mutex); + std::unique_lock other_lock(other._q_mutex); _q = other._q; } @@ -31,7 +33,7 @@ class SharedQueue std::queue _q; // Synchronisation - mutable std::mutex _q_mutex; - mutable std::condition_variable _q_cv; + std::mutex _q_mutex; + std::condition_variable _q_cv; }; #endif diff --git a/ex10/threadpool.cc b/ex10/threadpool.cc index 1f57809..674ede4 100644 --- a/ex10/threadpool.cc +++ b/ex10/threadpool.cc @@ -13,42 +13,65 @@ void producer( const int id, SharedQueue& q, int max_t, int max_n ) std::mt19937 mt(d()); std::uniform_int_distribution<> distr(0., max_t); - for ( int n = 0; n < max_n ; n++ ) + std::cout << " -- Starting Producer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl; + for ( int n = 0; n < max_n ; ++n ) { - n = distr(mt); - std::this_thread::sleep_for(std::chrono::milliseconds(n)); + std::cout << " -- Producer " << id << "; n = " << n << std::endl; + + int l = distr(mt); + std::this_thread::sleep_for(std::chrono::milliseconds(l)); // Lock q so we can push a new element std::unique_lock lock(*q.mutex()); - q.queue()->push(n); + q.queue()->push(l); + + std::cout << " -- Producer " << id << " pushed = " << l << std::endl; // Notify one of the consumers q.cv()->notify_one(); + } + + + std::cout << "Exiting Producer " << id << std::endl; } -/* -void consumer( const int id, SharedQueue& q, int max_t, int max_n ) { + const int time_out = 5 ;// seconds int b = 0; - for ( int n = 0; n < max_n ; n++ ){ + + std::cout << " -- Starting Consumer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl; + for ( int n = 0; n < max_n ; n++ ) + { + std::cout << " -- Consumer " << id << "; n = " << n << std::endl; { // Scope the lock of the queue - std::unique_lock q_lock(q_mutex); + std::unique_lock q_lock(*q.mutex()); while ( q.queue()->empty() ) { // Wait until we get a signal from the condition variable - q_cv->wait(q_lock); + // or we reach a timeout + if ( q.cv()->wait_for(q_lock, std::chrono::seconds(time_out)) == std::cv_status::timeout ) + { + // We reached the timeout + std::cout << " -- Consumer " << id << " timed out (" << time_out << "s)" << std::endl; + + // this consumer does nothing after stopping the loop + // so we can return to break the loops + return; + } } // Get a value and pop it off - b = q.front(); - q.pop(); + b = q.queue()->front(); + q.queue()->pop(); } - //std::cout << " -- Consumer " << id << " : " << b << std::endl; + std::cout << " -- Consumer " << id << " read: " << b << std::endl; // Slow down this thread to have a race between the consumers and the producers // This only works after we have successfully read an int std::this_thread::sleep_for(std::chrono::milliseconds(max_t)); } + + std::cout << "Exiting Consumer " << id << std::endl; } -*/ diff --git a/ex10/threadpool.hh b/ex10/threadpool.hh index 4baabb8..e9e56a6 100644 --- a/ex10/threadpool.hh +++ b/ex10/threadpool.hh @@ -19,18 +19,22 @@ class ThreadPool _consumers(), _producers() {}; - void newProducer( int max_t = 3000, int max_n = 10) + auto addProducer( int max_t = 3000, int max_n = 10) { std::thread * tp = new std::thread( producer, _producers.size(), std::ref(_q), max_t, max_n ); _producers.push_back(tp); + + return tp; } - void newConsumer( int max_t = 2000, int max_n = 10) + auto addConsumer( int max_t = 2000, int max_n = 10) { std::thread * tp = new std::thread( consumer, _consumers.size(), std::ref(_q), max_t, max_n ); _consumers.push_back(tp); + + return tp; } void run()