2020-07-09 18:09:10 +02:00
|
|
|
// ThreadPool.cc
|
|
|
|
#include "threadpool.hh"
|
|
|
|
#include "shared_queue.hh"
|
|
|
|
|
|
|
|
#include <chrono>
|
|
|
|
#include <iostream>
|
|
|
|
#include <mutex>
|
|
|
|
#include <random>
|
|
|
|
|
|
|
|
void producer( const int id, SharedQueue<int>& q, int max_t, int max_n )
|
|
|
|
{
|
|
|
|
std::random_device d;
|
|
|
|
std::mt19937 mt(d());
|
|
|
|
std::uniform_int_distribution<> distr(0., max_t);
|
|
|
|
|
2020-07-09 21:22:39 +02:00
|
|
|
std::cout << " -- Starting Producer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
|
|
|
|
for ( int n = 0; n < max_n ; ++n )
|
2020-07-09 18:09:10 +02:00
|
|
|
{
|
2020-07-09 21:22:39 +02:00
|
|
|
std::cout << " -- Producer " << id << "; n = " << n << std::endl;
|
|
|
|
|
|
|
|
int l = distr(mt);
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(l));
|
2020-07-09 18:09:10 +02:00
|
|
|
|
|
|
|
// Lock q so we can push a new element
|
|
|
|
std::unique_lock<std::mutex> lock(*q.mutex());
|
2020-07-09 21:22:39 +02:00
|
|
|
q.queue()->push(l);
|
|
|
|
|
|
|
|
std::cout << " -- Producer " << id << " pushed = " << l << std::endl;
|
2020-07-09 18:09:10 +02:00
|
|
|
|
|
|
|
// Notify one of the consumers
|
|
|
|
q.cv()->notify_one();
|
2020-07-09 21:22:39 +02:00
|
|
|
|
2020-07-09 18:09:10 +02:00
|
|
|
}
|
2020-07-09 21:22:39 +02:00
|
|
|
|
|
|
|
|
|
|
|
std::cout << "Exiting Producer " << id << std::endl;
|
2020-07-09 18:09:10 +02:00
|
|
|
}
|
2020-07-09 21:22:39 +02:00
|
|
|
void consumer( const int id, SharedQueue<int>& q, int max_t, int max_n )
|
2020-07-09 18:09:10 +02:00
|
|
|
{
|
2020-07-09 21:22:39 +02:00
|
|
|
const int time_out = 5 ;// seconds
|
2020-07-09 18:09:10 +02:00
|
|
|
int b = 0;
|
2020-07-09 21:22:39 +02:00
|
|
|
|
|
|
|
std::cout << " -- Starting Consumer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
|
|
|
|
for ( int n = 0; n < max_n ; n++ )
|
|
|
|
{
|
|
|
|
std::cout << " -- Consumer " << id << "; n = " << n << std::endl;
|
2020-07-09 18:09:10 +02:00
|
|
|
{ // Scope the lock of the queue
|
2020-07-09 21:22:39 +02:00
|
|
|
std::unique_lock<std::mutex> q_lock(*q.mutex());
|
2020-07-09 18:09:10 +02:00
|
|
|
|
|
|
|
while ( q.queue()->empty() ) {
|
|
|
|
// Wait until we get a signal from the condition variable
|
2020-07-09 21:22:39 +02:00
|
|
|
// or we reach a timeout
|
|
|
|
if ( q.cv()->wait_for(q_lock, std::chrono::seconds(time_out)) == std::cv_status::timeout )
|
|
|
|
{
|
|
|
|
// We reached the timeout
|
|
|
|
std::cout << " -- Consumer " << id << " timed out (" << time_out << "s)" << std::endl;
|
|
|
|
|
|
|
|
// this consumer does nothing after stopping the loop
|
|
|
|
// so we can return to break the loops
|
|
|
|
return;
|
|
|
|
}
|
2020-07-09 18:09:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get a value and pop it off
|
2020-07-09 21:22:39 +02:00
|
|
|
b = q.queue()->front();
|
|
|
|
q.queue()->pop();
|
2020-07-09 18:09:10 +02:00
|
|
|
}
|
|
|
|
|
2020-07-09 21:22:39 +02:00
|
|
|
std::cout << " -- Consumer " << id << " read: " << b << std::endl;
|
2020-07-09 18:09:10 +02:00
|
|
|
|
|
|
|
// Slow down this thread to have a race between the consumers and the producers
|
|
|
|
// This only works after we have successfully read an int
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(max_t));
|
|
|
|
}
|
2020-07-09 21:22:39 +02:00
|
|
|
|
|
|
|
std::cout << "Exiting Consumer " << id << std::endl;
|
2020-07-09 18:09:10 +02:00
|
|
|
}
|