1
0
Fork 0
This repository has been archived on 2021-11-03. You can view files and clone it, but cannot push or open issues or pull requests.
uni-m.cds-adv-prog/ex10/ThreadPool.cc

78 lines
2.2 KiB
C++

// ThreadPool.cc
#include "threadpool.hh"
#include "shared_queue.hh"
#include <chrono>
#include <iostream>
#include <mutex>
#include <random>
void producer( const int id, SharedQueue<int>& q, int max_t, int max_n )
{
std::random_device d;
std::mt19937 mt(d());
std::uniform_int_distribution<> distr(0., max_t);
std::cout << " -- Starting Producer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
for ( int n = 0; n < max_n ; ++n )
{
std::cout << " -- Producer " << id << "; n = " << n << std::endl;
int l = distr(mt);
std::this_thread::sleep_for(std::chrono::milliseconds(l));
// Lock q so we can push a new element
std::unique_lock<std::mutex> lock(*q.mutex());
q.queue()->push(l);
std::cout << " -- Producer " << id << " pushed = " << l << std::endl;
// Notify one of the consumers
q.cv()->notify_one();
}
std::cout << "Exiting Producer " << id << std::endl;
}
void consumer( const int id, SharedQueue<int>& q, int max_t, int max_n )
{
const int time_out = 5 ;// seconds
int b = 0;
std::cout << " -- Starting Consumer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl;
for ( int n = 0; n < max_n ; n++ )
{
std::cout << " -- Consumer " << id << "; n = " << n << std::endl;
{ // Scope the lock of the queue
std::unique_lock<std::mutex> q_lock(*q.mutex());
while ( q.queue()->empty() ) {
// Wait until we get a signal from the condition variable
// or we reach a timeout
if ( q.cv()->wait_for(q_lock, std::chrono::seconds(time_out)) == std::cv_status::timeout )
{
// We reached the timeout
std::cout << " -- Consumer " << id << " timed out (" << time_out << "s)" << std::endl;
// this consumer does nothing after stopping the loop
// so we can return to break the loops
return;
}
}
// Get a value and pop it off
b = q.queue()->front();
q.queue()->pop();
}
std::cout << " -- Consumer " << id << " read: " << b << std::endl;
// Slow down this thread to have a race between the consumers and the producers
// This only works after we have successfully read an int
std::this_thread::sleep_for(std::chrono::milliseconds(max_t));
}
std::cout << "Exiting Consumer " << id << std::endl;
}