// ThreadPool.cc #include "threadpool.hh" #include "shared_queue.hh" #include #include #include #include void producer( const int id, SharedQueue& q, int max_t, int max_n ) { std::random_device d; std::mt19937 mt(d()); std::uniform_int_distribution<> distr(0., max_t); std::cout << " -- Starting Producer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl; for ( int n = 0; n < max_n ; ++n ) { std::cout << " -- Producer " << id << "; n = " << n << std::endl; int l = distr(mt); std::this_thread::sleep_for(std::chrono::milliseconds(l)); // Lock q so we can push a new element std::unique_lock lock(*q.mutex()); q.queue()->push(l); std::cout << " -- Producer " << id << " pushed = " << l << std::endl; // Notify one of the consumers q.cv()->notify_one(); } std::cout << "Exiting Producer " << id << std::endl; } void consumer( const int id, SharedQueue& q, int max_t, int max_n ) { const int time_out = 5 ;// seconds int b = 0; std::cout << " -- Starting Consumer " << id << " (max_t: " << max_t << " max_n: " << max_n << ")" << std::endl; for ( int n = 0; n < max_n ; n++ ) { std::cout << " -- Consumer " << id << "; n = " << n << std::endl; { // Scope the lock of the queue std::unique_lock q_lock(*q.mutex()); while ( q.queue()->empty() ) { // Wait until we get a signal from the condition variable // or we reach a timeout if ( q.cv()->wait_for(q_lock, std::chrono::seconds(time_out)) == std::cv_status::timeout ) { // We reached the timeout std::cout << " -- Consumer " << id << " timed out (" << time_out << "s)" << std::endl; // this consumer does nothing after stopping the loop // so we can return to break the loops return; } } // Get a value and pop it off b = q.queue()->front(); q.queue()->pop(); } std::cout << " -- Consumer " << id << " read: " << b << std::endl; // Slow down this thread to have a race between the consumers and the producers // This only works after we have successfully read an int std::this_thread::sleep_for(std::chrono::milliseconds(max_t)); } std::cout << "Exiting Consumer " << id << std::endl; }