rewritten threads with heavy debug information pyrit
authorRadek Brich <radek.brich@devl.cz>
Fri, 28 Mar 2008 17:13:21 +0100
branchpyrit
changeset 50 14a727b70d07
parent 49 558fde7da82a
child 51 89fec8668768
rewritten threads with heavy debug information disabling pthreads library will not be possible any more
include/raytracer.h
include/sampler.h
include/scene.h
src/raytracer.cc
src/sampler.cc
--- a/include/raytracer.h	Fri Mar 28 00:53:20 2008 +0100
+++ b/include/raytracer.h	Fri Mar 28 17:13:21 2008 +0100
@@ -28,6 +28,7 @@
 #define RAYTRACER_H
 
 #include <vector>
+#include <queue>
 
 #include "common.h"
 #include "container.h"
@@ -60,9 +61,30 @@
 	int max_depth;
 
 	Vector3 SphereDistribute(int i, int n, Float extent, Vector3 &normal);
+	
+	queue<Sample*> sample_queue;
+	bool sample_queue_end;
+	pthread_mutex_t sample_queue_mutex, sampler_mutex;
+	pthread_cond_t sample_queue_cond, worker_ready_cond;
+
+	static void *raytrace_worker(void *d);
 public:
 	Raytracer(): top(NULL), camera(NULL), lights(), bg_colour(0.0, 0.0, 0.0),
-		ao_samples(0), num_threads(2), max_depth(3) {};
+		ao_samples(0), num_threads(2), max_depth(3)
+	{
+		pthread_mutex_init(&sample_queue_mutex, NULL);
+		pthread_mutex_init(&sampler_mutex, NULL);
+		pthread_cond_init (&sample_queue_cond, NULL);
+		pthread_cond_init (&worker_ready_cond, NULL);
+	};
+	~Raytracer()
+	{
+		pthread_mutex_destroy(&sample_queue_mutex);
+		pthread_mutex_destroy(&sampler_mutex);
+		pthread_cond_destroy (&sample_queue_cond);
+		pthread_cond_destroy (&worker_ready_cond);
+	}
+
 	void render();
 	Colour raytrace(Ray &ray, int depth, Shape *origin_shape);
 	void addshape(Shape *shape) { top->addShape(shape); };
--- a/include/sampler.h	Fri Mar 28 00:53:20 2008 +0100
+++ b/include/sampler.h	Fri Mar 28 17:13:21 2008 +0100
@@ -62,7 +62,7 @@
 	void resetBuffer(Float *abuffer, int &aw, int &ah) { buffer = abuffer; w = aw; h = ah; };
 	virtual void init() = 0;
 	virtual int initSampleSet() = 0;
-	virtual Sample *nextSample(Sample *prev) = 0;
+	virtual Sample *nextSample() = 0;
 	virtual void saveSample(Sample *samp, Colour &col) = 0;
 };
 
@@ -84,12 +84,13 @@
 	int phase;
 	int subsample;
 	int oversample; // 0 = no, 1 = 5x, 2 = 9x, 3 = 16x
+	int sx,sy,osa_samp; // current sample properties
 public:
 	DefaultSampler(Float *abuffer, int &aw, int &ah):
 		Sampler(abuffer, aw, ah), phase(-1), subsample(8), oversample(0) {};
 	void init();
 	int initSampleSet();
-	Sample *nextSample(Sample *prev);
+	Sample *nextSample();
 	void saveSample(Sample *samp, Colour &col);
 
 	void setSubsample(int sub) { subsample = sub; };
--- a/include/scene.h	Fri Mar 28 00:53:20 2008 +0100
+++ b/include/scene.h	Fri Mar 28 17:13:21 2008 +0100
@@ -52,6 +52,7 @@
 {
 public:
 	Vector3 o, dir;
+	Ray(): o(), dir() {};
 	Ray(const Vector3 &ao, const Vector3 &adir):
 		o(ao), dir(adir) {};
 };
--- a/src/raytracer.cc	Fri Mar 28 00:53:20 2008 +0100
+++ b/src/raytracer.cc	Fri Mar 28 17:13:21 2008 +0100
@@ -396,39 +396,139 @@
 }
 #endif
 
+void *Raytracer::raytrace_worker(void *d)
+{
+	Raytracer *rt = (Raytracer*)d;
+	Sample *sample;
+	Colour col;
+	Ray ray;
+	for (;;)
+	{
+		cout<<"#worker "<<pthread_self()<<" locking queue"<<endl;
+		pthread_mutex_lock(&rt->sample_queue_mutex);
+		pthread_cond_signal(&rt->worker_ready_cond);
+
+		// if queue is empty, wait for samples
+		while (rt->sample_queue.empty()) {
+			if (rt->sample_queue_end)
+			{
+				cout<<"#worker "<<pthread_self()<<" end of queue, unlocking queue and exiting..."<<endl;
+				pthread_mutex_unlock(&rt->sample_queue_mutex);
+				pthread_exit(NULL);
+			}
+			cout<<"#worker "<<pthread_self()<<" waiting for cond"<<endl;
+			pthread_cond_wait(&rt->sample_queue_cond, &rt->sample_queue_mutex);
+		}
+
+
+		sample = rt->sample_queue.front();
+		rt->sample_queue.pop();
+		cout<<"#worker "<<pthread_self()<<" unlocking queue"<<endl;
+		pthread_mutex_unlock(&rt->sample_queue_mutex);
+
+		// do the work
+		cout<<"#worker "<<pthread_self()<<" locking sampler (camera)"<<endl;
+		pthread_mutex_lock(&rt->sampler_mutex);
+		ray = rt->camera->makeRay(sample);
+		cout<<"#worker "<<pthread_self()<<" unlocking sampler (camera)"<<endl;
+		pthread_mutex_unlock(&rt->sampler_mutex);
+
+		cout<<"#worker "<<pthread_self()<<" ray tracing..."<<endl;
+		col = rt->raytrace(ray, 0, NULL);
+		
+		// save the result
+		cout<<"#worker "<<pthread_self()<<" locking sampler"<<endl;
+		pthread_mutex_lock(&rt->sampler_mutex);
+		rt->sampler->saveSample(sample, col);
+		cout<<"#worker "<<pthread_self()<<" unlocking sampler"<<endl;
+		pthread_mutex_unlock(&rt->sampler_mutex);
+
+		delete sample;
+	}
+}
+
 void Raytracer::render()
 {
 	if (!sampler || !camera || !top)
 		return;
 
+	sample_queue_end = false;
+
 	// create workers
-	// ...
+	dbgmsg(1, "* using %d threads\n", num_threads);
+	pthread_t threads[num_threads];
+	for (int t = 0; t < num_threads; t++)
+	{
+		int rc = pthread_create(&threads[t], NULL, raytrace_worker, (void*)this);
+		if (rc) {
+			dbgmsg(0, "\nE pthread_create unsuccessful, return code was %d\n", rc);
+			exit(1);
+		}
+	}
 
 	sampler->init();
 	int sampnum = 0;
+	cout<<"locking sampler"<<endl;
+	pthread_mutex_lock(&sampler_mutex);
 	while ( (sampnum = sampler->initSampleSet()) > 0 )
 	{
-		Sample *sample, *prev = NULL;
-		while ( (sample = sampler->nextSample(prev)) != NULL )
+		Sample *sample;
+		while ( (sample = sampler->nextSample()) != NULL )
 		{
-			Ray ray = camera->makeRay(sample);
-			//raystack->push(ray);
+			cout<<"unlocking sampler and locking queue"<<endl;
+			pthread_mutex_unlock(&sampler_mutex);
+			pthread_mutex_lock(&sample_queue_mutex);
+			sample_queue.push(sample);
 
-			// in worker:
-			Colour col = raytrace(ray, 0, NULL);
-			sampler->saveSample(sample, col);
+			if (sample_queue.size() > 1000)
+			{
+			    while (sample_queue.size() > 100)
+			    {
+				pthread_cond_signal(&sample_queue_cond);
+				pthread_cond_wait(&worker_ready_cond, &sample_queue_mutex);
+			    }
+			}
 
-			delete prev;
-			prev = sample;
+			cout<<"sending signal and unlocking queue"<<endl;
+			pthread_cond_signal(&sample_queue_cond);
+			pthread_mutex_unlock(&sample_queue_mutex);
+
 			sampnum--;
 			if ((sampnum % 1000) == 0)
 				dbgmsg(2, "\b\b\b\b\b\b\b\b%8d", sampnum);
+
+
+			cout<<"locking sampler"<<endl;
+			pthread_mutex_lock(&sampler_mutex);
 		}
 		dbgmsg(2, "\n");
 	}
+	cout<<"unlocking sampler"<<endl;
+	pthread_mutex_unlock(&sampler_mutex);
 
 	// wait for workers
-	// ...
+	dbgmsg(2, "- waiting for threads to finish\n");
+
+	cout<<"locking queue"<<endl;
+	pthread_mutex_lock(&sample_queue_mutex);
+	sample_queue_end = true;
+	while (!sample_queue.empty())
+	{
+		cout<<"broadcasting signal and unlocking queue"<<endl;
+		pthread_cond_broadcast(&sample_queue_cond);
+		pthread_mutex_unlock(&sample_queue_mutex);
+		cout<<"locking queue"<<endl;
+		pthread_mutex_lock(&sample_queue_mutex);
+	}
+	cout<<"broadcasting signal and unlocking queue"<<endl;
+	pthread_cond_broadcast(&sample_queue_cond);
+	pthread_mutex_unlock(&sample_queue_mutex);
+
+	cout<<"joining threads"<<endl;
+	for (int t = 0; t < num_threads; t++)
+		pthread_join(threads[t], NULL);
+
+	cout<<"done!"<<endl;
 
 #if 0
 
--- a/src/sampler.cc	Fri Mar 28 00:53:20 2008 +0100
+++ b/src/sampler.cc	Fri Mar 28 17:13:21 2008 +0100
@@ -42,6 +42,7 @@
 	{
 		cout << "phase 1" << endl;
 		phase++;
+		sx = -1;
 		return w*h*samples;
 	}
 	else if ( phase == 1 && oversample )
@@ -60,7 +61,7 @@
 	}
 }
 
-Sample* DefaultSampler::nextSample(Sample *prev)
+Sample* DefaultSampler::nextSample()
 {
 	DefaultSample *s = new DefaultSample;
 
@@ -84,58 +85,56 @@
 	const Float *osax = osaSx[oversample];
 	const Float *osay = osaSy[oversample];
 
-	if (!prev)
+	if (sx < 0)
 	{
 		// first sample
 		s->x = -(Float)w/h/2.0;
 		s->y = -0.5;
-		s->sx = 0;
-		s->sy = 0;
-		s->osa_samp = 0;
+		sx = 0;
+		sy = 0;
+		osa_samp = 0;
 	}
 	else
 	{
-		DefaultSample *sp = static_cast<DefaultSample*>(prev);
-
-		s->osa_samp = sp->osa_samp + 1;
+		osa_samp++;
 
-		if (oversample && oversample <= 3 && s->osa_samp < samples)
+		if (oversample && oversample <= 3 && osa_samp < samples)
 		{
-			s->sx = sp->sx;
-			s->sy = sp->sy;
-			s->x = osax[s->osa_samp]/h + (Float)s->sx/h - (Float)w/h/2.0;
-			s->y = osay[s->osa_samp]/h + (Float)s->sy/h - 0.5;
+			s->x = osax[osa_samp]/h + (Float)sx/h - (Float)w/h/2.0;
+			s->y = osay[osa_samp]/h + (Float)sy/h - 0.5;
 		}
 		else
 		{
-			s->sx = sp->sx + 1;
-			s->sy = sp->sy;
-			if (s->sx >= w)
+			sx++;
+			if (sx >= w)
 			{
-				s->sx = 0;
-				s->sy++;
+				sx = 0;
+				sy++;
 			}
-			if (s->sy >= h)
+			if (sy >= h)
 			{
 				delete s;
 				return NULL;
 			}
-			s->x = (Float)s->sx/h - (Float)w/h/2.0;
-			s->y = (Float)s->sy/h - 0.5;
-			s->osa_samp = 0;
+			s->x = (Float)sx/h - (Float)w/h/2.0;
+			s->y = (Float)sy/h - 0.5;
+			osa_samp = 0;
 		}
 	}
 
-	if (s->osa_samp == 0 && oversample && oversample <= 3)
+	if (osa_samp == 0 && oversample && oversample <= 3)
 	{
 		s->x += osax[0]/h;
 		s->y += osay[0]/h;
-		Float *buf = buffer + 3*(s->sy*w + s->sx);
+		Float *buf = buffer + 3*(sy*w + sx);
 		*(buf++) = 0;
 		*(buf++) = 0;
 		*(buf++) = 0;
 	}
 
+	s->sx = sx;
+	s->sy = sy;
+	s->osa_samp = osa_samp;
 	return s;
 }