1
0
Fork 0

Ex10: CPP code working

This commit is contained in:
Eric Teunis de Boone 2020-07-09 21:22:39 +02:00
parent 5fbca6ec01
commit c5f7489717
4 changed files with 51 additions and 22 deletions

View File

@ -9,11 +9,11 @@ int main() {
ThreadPool tp; ThreadPool tp;
std::cout << "Add Producer" << std::endl; std::cout << "Add Producer" << std::endl;
tp.newProducer(); tp.addProducer();
// std::cout << "Add Consumers" << std::endl; std::cout << "Add Consumers" << std::endl;
// tp.newConsumer(); tp.addConsumer();
// tp.newConsumer(); tp.addConsumer();
std::cout << "Run" << std::endl; std::cout << "Run" << std::endl;
tp.run(); tp.run();

View File

@ -1,5 +1,6 @@
// Shared_Queue.hh // Shared_Queue.hh
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
@ -17,8 +18,9 @@ class SharedQueue
SharedQueue( const SharedQueue& other ) SharedQueue( const SharedQueue& other )
{ {
std::cout << "Copy Constructor SharedQueue" << std::endl;
// (Read)Lock the other queue for copying // (Read)Lock the other queue for copying
std::unique_lock<std::mutex> other_lock(other._mutex); std::unique_lock<std::mutex> other_lock(other._q_mutex);
_q = other._q; _q = other._q;
} }
@ -31,7 +33,7 @@ class SharedQueue
std::queue<T> _q; std::queue<T> _q;
// Synchronisation // Synchronisation
mutable std::mutex _q_mutex; std::mutex _q_mutex;
mutable std::condition_variable _q_cv; std::condition_variable _q_cv;
}; };
#endif #endif

View File

@ -13,42 +13,65 @@ void producer( const int id, SharedQueue<int>& q, int max_t, int max_n )
std::mt19937 mt(d()); std::mt19937 mt(d());
std::uniform_int_distribution<> distr(0., max_t); std::uniform_int_distribution<> distr(0., max_t);
for ( int n = 0; n < max_n ; n++ ) std::cout << " -- Starting Producer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
for ( int n = 0; n < max_n ; ++n )
{ {
n = distr(mt); std::cout << " -- Producer " << id << "; n = " << n << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(n));
int l = distr(mt);
std::this_thread::sleep_for(std::chrono::milliseconds(l));
// Lock q so we can push a new element // Lock q so we can push a new element
std::unique_lock<std::mutex> lock(*q.mutex()); std::unique_lock<std::mutex> lock(*q.mutex());
q.queue()->push(n); q.queue()->push(l);
std::cout << " -- Producer " << id << " pushed = " << l << std::endl;
// Notify one of the consumers // Notify one of the consumers
q.cv()->notify_one(); q.cv()->notify_one();
} }
std::cout << "Exiting Producer " << id << std::endl;
} }
/* void consumer( const int id, SharedQueue<int>& q, int max_t, int max_n )
void consumer( const int id, SharedQueue<int& q, int max_t, int max_n )
{ {
const int time_out = 5 ;// seconds
int b = 0; int b = 0;
for ( int n = 0; n < max_n ; n++ ){
std::cout << " -- Starting Consumer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
for ( int n = 0; n < max_n ; n++ )
{
std::cout << " -- Consumer " << id << "; n = " << n << std::endl;
{ // Scope the lock of the queue { // Scope the lock of the queue
std::unique_lock<std::mutex> q_lock(q_mutex); std::unique_lock<std::mutex> q_lock(*q.mutex());
while ( q.queue()->empty() ) { while ( q.queue()->empty() ) {
// Wait until we get a signal from the condition variable // Wait until we get a signal from the condition variable
q_cv->wait(q_lock); // or we reach a timeout
if ( q.cv()->wait_for(q_lock, std::chrono::seconds(time_out)) == std::cv_status::timeout )
{
// We reached the timeout
std::cout << " -- Consumer " << id << " timed out (" << time_out << "s)" << std::endl;
// this consumer does nothing after stopping the loop
// so we can return to break the loops
return;
}
} }
// Get a value and pop it off // Get a value and pop it off
b = q.front(); b = q.queue()->front();
q.pop(); q.queue()->pop();
} }
//std::cout << " -- Consumer " << id << " : " << b << std::endl; std::cout << " -- Consumer " << id << " read: " << b << std::endl;
// Slow down this thread to have a race between the consumers and the producers // Slow down this thread to have a race between the consumers and the producers
// This only works after we have successfully read an int // This only works after we have successfully read an int
std::this_thread::sleep_for(std::chrono::milliseconds(max_t)); std::this_thread::sleep_for(std::chrono::milliseconds(max_t));
} }
std::cout << "Exiting Consumer " << id << std::endl;
} }
*/

View File

@ -19,18 +19,22 @@ class ThreadPool
_consumers(), _consumers(),
_producers() _producers()
{}; {};
void newProducer( int max_t = 3000, int max_n = 10) auto addProducer( int max_t = 3000, int max_n = 10)
{ {
std::thread * tp = new std::thread( producer, _producers.size(), std::ref(_q), max_t, max_n ); std::thread * tp = new std::thread( producer, _producers.size(), std::ref(_q), max_t, max_n );
_producers.push_back(tp); _producers.push_back(tp);
return tp;
} }
void newConsumer( int max_t = 2000, int max_n = 10) auto addConsumer( int max_t = 2000, int max_n = 10)
{ {
std::thread * tp = new std::thread( consumer, _consumers.size(), std::ref(_q), max_t, max_n ); std::thread * tp = new std::thread( consumer, _consumers.size(), std::ref(_q), max_t, max_n );
_consumers.push_back(tp); _consumers.push_back(tp);
return tp;
} }
void run() void run()