1
0
Fork 0

Ex10: almost able to compile

This commit is contained in:
Eric Teunis de Boone 2020-07-09 18:09:10 +02:00
parent 70f6134222
commit 5fbca6ec01
4 changed files with 126 additions and 35 deletions

24
ex10/main.cpp Normal file
View File

@ -0,0 +1,24 @@
#include "shared_queue.hh"
#include "threadpool.hh"
#include <string>
#include <iostream>
int main() {
std::cout << "Set up ThreadPool" << std::endl;
ThreadPool tp;
std::cout << "Add Producer" << std::endl;
tp.newProducer();
// std::cout << "Add Consumers" << std::endl;
// tp.newConsumer();
// tp.newConsumer();
std::cout << "Run" << std::endl;
tp.run();
std::cout << "Finishing up" << std::endl;
return 0;
}

View File

@ -8,11 +8,30 @@
template <class T>
class SharedQueue
{
public:
SharedQueue() :
_q(),
_q_mutex(),
_q_cv()
{};
SharedQueue( const SharedQueue& other )
{
// (Read)Lock the other queue for copying
std::unique_lock<std::mutex> other_lock(other._mutex);
_q = other._q;
}
std::queue<T> * queue() { return& _q; }
std::mutex * mutex() { return& _q_mutex; }
std::condition_variable * cv() { return& _q_cv; }
private:
// Queue
std::queue<T> queue;
std::queue<T> _q;
// Synchronisation
std::mutex mutex;
std::condition_variable cv;
mutable std::mutex _q_mutex;
mutable std::condition_variable _q_cv;
};
#endif

54
ex10/threadpool.cc Normal file
View File

@ -0,0 +1,54 @@
// ThreadPool.cc
#include "threadpool.hh"
#include "shared_queue.hh"
#include <chrono>
#include <iostream>
#include <mutex>
#include <random>
void producer( const int id, SharedQueue<int>& q, int max_t, int max_n )
{
std::random_device d;
std::mt19937 mt(d());
std::uniform_int_distribution<> distr(0., max_t);
for ( int n = 0; n < max_n ; n++ )
{
n = distr(mt);
std::this_thread::sleep_for(std::chrono::milliseconds(n));
// Lock q so we can push a new element
std::unique_lock<std::mutex> lock(*q.mutex());
q.queue()->push(n);
// Notify one of the consumers
q.cv()->notify_one();
}
}
/*
void consumer( const int id, SharedQueue<int& q, int max_t, int max_n )
{
int b = 0;
for ( int n = 0; n < max_n ; n++ ){
{ // Scope the lock of the queue
std::unique_lock<std::mutex> q_lock(q_mutex);
while ( q.queue()->empty() ) {
// Wait until we get a signal from the condition variable
q_cv->wait(q_lock);
}
// Get a value and pop it off
b = q.front();
q.pop();
}
//std::cout << " -- Consumer " << id << " : " << b << std::endl;
// Slow down this thread to have a race between the consumers and the producers
// This only works after we have successfully read an int
std::this_thread::sleep_for(std::chrono::milliseconds(max_t));
}
}
*/

View File

@ -7,60 +7,54 @@
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
// Implementations for producers and consumers
void producer( const int id, SharedQueue<int>& _q, int max_t, int max_n );
void consumer( const int id, SharedQueue<int>& _q, int max_t, int max_n );
class ThreadPool
{
public:
/**
* @param int max_t Maximum time in milliseconds for a producer iteration.
* @param int max_n Maximum number of iterations.
* @return std::thread The created thread.
*/
std::thread newProducer( int max_t = 3000, int max_n = 10)
ThreadPool() :
_q(),
_consumers(),
_producers()
{};
void newProducer( int max_t = 3000, int max_n = 10)
{
std::thread t( producer, std::ref(q), producers.size(), max_t, max_n );
producers.push_back(t);
std::thread * tp = new std::thread( producer, _producers.size(), std::ref(_q), max_t, max_n );
return t;
_producers.push_back(tp);
}
std::thread newConsumer( int max_t = 2000, int max_n = 10)
void newConsumer( int max_t = 2000, int max_n = 10)
{
std::thread cons( consumer, std::ref(q), consumers.size(), max_t, max_n );
consumers.push_back(cons);
std::thread * tp = new std::thread( consumer, _consumers.size(), std::ref(_q), max_t, max_n );
return cons;
_consumers.push_back(tp);
}
void run()
{
// join all producers
for ( int i = 0; i < producers.size() ; i++ )
for ( int i = 0; i < _producers.size() ; i++ )
{
producers[i].join();
(*_producers[i]).join();
}
// join all consumers
for ( int i = 0; i < consumers.size() ; i++ )
for ( int i = 0; i < _consumers.size() ; i++ )
{
consumers[i].join();
(*_consumers[i]).join();
}
}
private:
// Implementations for producers and consumers
void static producer( SharedQueue<int>& q, const int id, int max_t, int max_n );
void static consumer( SharedQueue<int>& q, const int id, int max_t = 2000, int max_n = 10 );
// Disallow copying for now
ThreadPool( const ThreadPool& tp );
// Consumer and Producers threads
std::vector<std::thread> consumers;
std::vector<std::thread> producers;
std::vector<std::thread * > _consumers;
std::vector<std::thread * > _producers;
// queue
SharedQueue<int> q;
SharedQueue<int> _q;
};
#endif