libtdepim

weaver.cpp
1/*
2 This file implements the Weaver, Job and Thread classes.
3
4 $ Author: Mirko Boehm $
5 $ Copyright: (C) 2004, Mirko Boehm $
6 $ Contact: mirko@kde.org
7 http://www.kde.org
8 http://www.hackerbuero.org $
9 $ License: LGPL with the following explicit clarification:
10 This code may be linked against any version of the TQt toolkit
11 from Troll Tech, Norway. $
12
13*/
14
15extern "C" {
16#include <signal.h>
17}
18
19#include <tqevent.h>
20#include <tqapplication.h>
21
22#include "weaver.h"
23
24namespace KPIM {
25namespace ThreadWeaver {
26
27 bool Debug = true;
28 int DebugLevel = 2;
29
30 Job::Job (TQObject* parent, const char* name)
31 : TQObject (parent, name),
32 m_finished (false),
33 m_mutex (new TQMutex (true) ),
34 m_thread (0)
35 {
36 }
37
39 {
40 }
41
42 void Job::lock()
43 {
44 m_mutex->lock();
45 }
46
48 {
49 m_mutex->unlock();
50 }
51
53 {
54 m_mutex->lock();
55 m_thread = th;
56 m_mutex->unlock();
57
58 run ();
59
60 m_mutex->lock();
61 setFinished (true);
62 m_thread = 0;
63 m_mutex->unlock();
64 }
65
67 {
68 TQMutexLocker l (m_mutex);
69 return m_thread;
70 }
71
72 bool Job::isFinished() const
73 {
74 TQMutexLocker l (m_mutex);
75 return m_finished;
76 }
77
78 void Job::setFinished(bool status)
79 {
80 TQMutexLocker l (m_mutex);
81 m_finished = status;
82 }
83
85 {
86 switch ( e->action() )
87 {
88 case Event::JobStarted:
89 emit ( started() );
90 break;
91 case Event::JobFinished:
92 emit ( done() );
93 break;
94 case Event::JobSPR:
95 emit ( SPR () );
96 m_wc->wakeOne ();
97 break;
98 case Event::JobAPR:
99 emit ( APR () );
100 // no wake here !
101 break;
102 default:
103 break;
104 }
105 }
106
108 {
109 m_mutex->lock ();
110 m_wc = new TQWaitCondition;
111 m_mutex->unlock ();
112
113 thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
114 m_wc->wait ();
115
116 m_mutex->lock ();
117 delete m_wc;
118 m_wc = 0;
119 m_mutex->unlock ();
120 }
121
123 {
124 m_mutex->lock ();
125 m_wc = new TQWaitCondition;
126 m_mutex->unlock ();
127
129 m_wc->wait ();
130 }
131
133 {
134 TQMutexLocker l(m_mutex);
135 if ( m_wc!=0 )
136 {
137 m_wc->wakeOne ();
138 delete m_wc;
139 m_wc = 0;
140 }
141 }
142
143 const int Event::Type = TQEvent::User + 1000;
144
145 Event::Event ( Action action, Thread *thread, Job *job)
146 : TQCustomEvent ( type () ),
147 m_action (action),
148 m_thread (thread),
149 m_job (job)
150 {
151 }
152
154 {
155 return Type;
156 }
157
159 {
160 if ( m_thread != 0)
161 {
162 return m_thread;
163 } else {
164 return 0;
165 }
166 }
167
168 Job* Event::job () const
169 {
170 return m_job;
171 }
172
174 {
175 return m_action;
176 }
177
178 unsigned int Thread::sm_Id;
179
181 : TQThread (),
182 m_parent ( parent ),
183 m_id ( makeId() )
184 {
185 }
186
188 {
189 }
190
191 unsigned int Thread::makeId()
192 {
193 static TQMutex mutex;
194 TQMutexLocker l (&mutex);
195
196 return ++sm_Id;
197 }
198
199 unsigned int Thread::id() const
200 {
201 return m_id;
202 }
203
205 {
206 Job *job = 0;
207
209
210 while (true)
211 {
212 debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
213
214 job = m_parent->applyForWork ( this, job );
215
216 if (job == 0)
217 {
218 break;
219 } else {
220 post ( Event::JobStarted, job );
221 job->execute (this);
222 post ( Event::JobFinished, job );
223 }
224 }
225
226 post ( Event::ThreadExiting );
227 }
228
230 {
231 m_parent->post ( a, this, j);
232 }
233
234 void Thread::msleep(unsigned long msec)
235 {
236 TQThread::msleep(msec);
237 }
238
239 Weaver::Weaver(TQObject* parent, const char* name,
240 int inventoryMin, int inventoryMax)
241 : TQObject(parent, name),
242 m_active(0),
243 m_inventoryMin(inventoryMin),
244 m_inventoryMax(inventoryMax),
245 m_shuttingDown(false),
246 m_running (false),
247 m_suspend (false),
248 m_mutex ( new TQMutex(true) )
249 {
250 lock();
251
252 for ( int count = 0; count < m_inventoryMin; ++count)
253 {
254 Thread *th = new Thread(this);
255 m_inventory.append(th);
256 // this will idle the thread, waiting for a job
257 th->start();
258
259 emit (threadCreated (th) );
260 }
261
262 unlock();
263 }
264
265 Weaver::~Weaver()
266 {
267 lock();
268
269 debug ( 1, "Weaver dtor: destroying inventory.\n" );
270
271 m_shuttingDown = true;
272
273 unlock();
274
275 m_jobAvailable.wakeAll();
276
277 // problem: Some threads might not be asleep yet, just finding
278 // out if a job is available. Those threads will suspend
279 // waiting for their next job (a rare case, but not impossible).
280 // Therefore, if we encounter a thread that has not exited, we
281 // have to wake it again (which we do in the following for
282 // loop).
283
284 for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
285 {
286 if ( !th->finished() )
287 {
288 m_jobAvailable.wakeAll();
289 th->wait();
290 }
291
292 emit (threadDestroyed (th) );
293 delete th;
294
295 }
296
297 m_inventory.clear();
298
299 delete m_mutex;
300
301 debug ( 1, "Weaver dtor: done\n" );
302
303 }
304
306 {
307 debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
308 ( m_mutex->locked() ? "locked" : "not locked" ) );
309 m_mutex->lock();
310 }
311
313 {
314 m_mutex->unlock();
315
316 debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
317 ( m_mutex->locked() ? "locked" : "not locked" ) );
318 }
319
320 int Weaver::threads () const
321 {
322 TQMutexLocker l (m_mutex);
323 return m_inventory.count ();
324 }
325
327 {
328 lock();
329
330 m_assignments.append(job);
331 m_running = true;
332
333 unlock();
334
335 assignJobs();
336 }
337
338 void Weaver::enqueue (TQPtrList <Job> jobs)
339 {
340 lock();
341
342 for ( Job * job = jobs.first(); job; job = jobs.next() )
343 {
344 m_assignments.append (job);
345 }
346
347 unlock();
348
349 assignJobs();
350 }
351
352 bool Weaver::dequeue ( Job* job )
353 {
354 TQMutexLocker l (m_mutex);
355 return m_assignments.remove (job);
356 }
357
359 {
360 TQMutexLocker l (m_mutex);
361 m_assignments.clear();
362 }
363
364 void Weaver::suspend (bool state)
365 {
366 lock();
367
368 if (state)
369 {
370 // no need to wake any threads here
371 m_suspend = true;
372 if ( m_active == 0 && isEmpty() )
373 { // instead of waking up threads:
375 }
376 } else {
377 m_suspend = false;
378 // make sure we emit suspended () even if all threads are sleeping:
379 assignJobs ();
380 debug (2, "Weaver::suspend: queueing resumed.\n" );
381 }
382
383 unlock();
384 }
385
387 {
388 m_jobAvailable.wakeAll();
389 }
390
391 bool Weaver::event (TQEvent *e )
392 {
393 if ( e->type() >= TQEvent::User )
394 {
395
396 if ( e->type() == Event::type() )
397 {
398 Event *event = (Event*) e;
399
400 switch (event->action() )
401 {
402 case Event::JobFinished:
403 if ( event->job() !=0 )
404 {
405 emit (jobDone (event->job() ) );
406 }
407 break;
408 case Event::Finished:
409 emit ( finished() );
410 break;
411 case Event::Suspended:
412 emit ( suspended() );
413 break;
414 case Event::ThreadSuspended:
415 if (!m_shuttingDown )
416 {
417 emit (threadSuspended ( event->thread() ) );
418 }
419 break;
420 case Event::ThreadBusy:
421 if (!m_shuttingDown )
422 {
423 emit (threadBusy (event->thread() ) );
424 }
425 break;
426 default:
427 break;
428 }
429
430 if ( event->job() !=0 )
431 {
432 event->job()->processEvent (event);
433 }
434 } else {
435 debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
436 }
437 return true;
438 } else {
439 // others - please make sure we are a TQObject!
440 return TQObject::event ( e );
441 }
442 }
443
445 {
446 Event *e = new Event ( a, t, j);
447 TQApplication::postEvent (this, e);
448 }
449
450 bool Weaver::isEmpty() const
451 {
452 TQMutexLocker l (m_mutex);
453 return m_assignments.count()==0;
454 }
455
457 {
458 Job *rc = 0;
459 bool lastjob = false;
460 bool suspended = false;
461
462 while (true)
463 {
464 lock();
465
466 if (previous != 0)
467 { // cleanup and send events:
468 --m_active;
469
470 debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
471 "%i active jobs left.\n",
473
474 if ( m_active == 0 && isEmpty() )
475 {
476 lastjob = true;
477 m_running = false;
478 post (Event::Finished);
479 debug ( 3, "Weaver::applyForWork: last job.\n" );
480 }
481
482 if (m_active == 0 && m_suspend == true)
483 {
484 suspended = true;
486 debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
487 }
488
489 m_jobFinished.wakeOne();
490 }
491
492 previous = 0;
493
494 if (m_shuttingDown == true)
495 {
496 unlock();
497
498 return 0;
499 } else {
500 if ( !isEmpty() && m_suspend == false )
501 {
502 rc = m_assignments.getFirst();
503 m_assignments.removeFirst ();
504 ++m_active;
505
506 debug ( 3, "Weaver::applyForWork: job assigned, "
507 "%i jobs in queue (%i active).\n",
508 m_assignments.count(), m_active );
509 unlock();
510
511 post (Event::ThreadBusy, th);
512
513 return rc;
514 } else {
515 unlock();
516
517 post (Event::ThreadSuspended, th);
518 m_jobAvailable.wait();
519 }
520 }
521 }
522 }
523
525 {
526 TQMutexLocker l (m_mutex);
527 return m_assignments.count();
528 }
529
530 bool Weaver::isIdle () const
531 {
532 TQMutexLocker l (m_mutex);
533 return isEmpty() && m_active == 0;
534 }
535
537 {
538 while ( !isIdle() )
539 {
540 debug (2, "Weaver::finish: not done, waiting.\n" );
541 m_jobFinished.wait();
542 }
543 debug (1, "Weaver::finish: done.\n\n\n" );
544 }
545
546}
547}
548
549#include "weaver.moc"
A class to represent the events threads generate and send to the Weaver object.
Definition: weaver.h:100
Thread * thread() const
The ID of the sender thread.
Definition: weaver.cpp:158
static int type()
Return the (custom defined) event type.
Definition: weaver.cpp:153
Action action() const
The action.
Definition: weaver.cpp:173
@ Suspended
All jobs in the queue are done.
Definition: weaver.h:105
@ ThreadStarted
Thread queueing halted.
Definition: weaver.h:106
@ JobAPR
Synchronous Process Request.
Definition: weaver.h:113
Job * job() const
The associated job.
Definition: weaver.cpp:168
A Job is a simple abstraction of an action that is to be executed in a thread context.
Definition: weaver.h:164
void triggerSPR()
Trigger a SPR.
Definition: weaver.cpp:107
virtual void execute(Thread *)
Perform the job.
Definition: weaver.cpp:52
virtual ~Job()
Destructor.
Definition: weaver.cpp:38
void unlock()
Unlock this Job's mutex.
Definition: weaver.cpp:47
Job(TQObject *parent=0, const char *name=0)
Construct a Job object.
Definition: weaver.cpp:30
void SPR()
This signal is emitted when the job needs some operation done by the main thread (usually the creator...
void lock()
Lock this Job's mutex.
Definition: weaver.cpp:42
virtual bool isFinished() const
Returns true if the jobs's execute method finished.
Definition: weaver.cpp:72
virtual void run()=0
The method that actually performs the job.
virtual void processEvent(Event *)
Process events related to this job (created by the processing thread or the weaver or whoever).
Definition: weaver.cpp:84
void APR()
Perform an Asynchronous Process Request.
void triggerAPR()
Trigger an APR.
Definition: weaver.cpp:122
void started()
This signal is emitted when a thread starts to process a job.
virtual void setFinished(bool status)
Call with status = true to mark this job as done.
Definition: weaver.cpp:78
void wakeAPR()
Wake the thread after an APR has been processed.
Definition: weaver.cpp:132
void done()
This signal is emitted when a job has been finished.
Thread * thread()
Return the thread that executes this job.
Definition: weaver.cpp:66
The class Thread is used to represent the worker threads in the weaver's inventory.
Definition: weaver.h:250
Thread(Weaver *parent)
Create a thread.
Definition: weaver.cpp:180
unsigned int id() const
Returns the thread id.
Definition: weaver.cpp:199
void run()
Overloaded to execute the assigned job.
Definition: weaver.cpp:204
void post(Event::Action, Job *=0)
Post an event, will be received and processed by the Weaver.
Definition: weaver.cpp:229
~Thread()
The destructor.
Definition: weaver.cpp:187
A weaver is the manager of worker threads (Thread objects) to which it assigns jobs from it's queue.
Definition: weaver.h:297
void jobDone(Job *)
This signal is emitted when a job is done.
void assignJobs()
Schedule enqueued jobs to be executed by idle threads.
Definition: weaver.cpp:386
virtual void dequeue()
Remove all queued jobs.
Definition: weaver.cpp:358
virtual void enqueue(Job *)
Add a job to be executed.
Definition: weaver.cpp:326
virtual void finish()
Get notified when a thread has finished a job.
Definition: weaver.cpp:536
bool m_suspend
If m_suspend is true, no new jobs will be assigned to threads.
Definition: weaver.h:442
bool m_running
m_running is set to true when a job is enqueued and set to false when the job finishes that was the l...
Definition: weaver.h:437
int threads() const
Returns the current number of threads in the inventory.
Definition: weaver.cpp:320
virtual void suspend(bool state)
Suspend job execution if state = true, otherwise resume job execution if it was suspended.
Definition: weaver.cpp:364
TQWaitCondition m_jobAvailable
Wait condition all idle or done threads wait for.
Definition: weaver.h:427
void lock()
Lock the mutex for this weaver.
Definition: weaver.cpp:305
void post(Event::Action, Thread *=0, Job *=0)
Post an event that is handled by this object, but in the main (GUI) thread.
Definition: weaver.cpp:444
virtual Job * applyForWork(Thread *thread, Job *previous)
Assign a job to the calling thread.
Definition: weaver.cpp:456
bool event(TQEvent *)
Check incoming events for user defined ones.
Definition: weaver.cpp:391
int m_active
The number of jobs that are assigned to the worker threads, but not finished.
Definition: weaver.h:421
bool isEmpty() const
Is the queue empty?
Definition: weaver.cpp:450
TQWaitCondition m_jobFinished
Wait for a job to finish.
Definition: weaver.h:429
void unlock()
Unlock.
Definition: weaver.cpp:312
bool isIdle() const
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition: weaver.cpp:530
bool m_shuttingDown
Indicates if the weaver is shutting down and exiting it's threads.
Definition: weaver.h:432
TQPtrList< Job > m_assignments
The job queue.
Definition: weaver.h:418
void finished()
This signal is emitted when the Weaver has finished ALL currently queued jobs.
int queueLength()
Returns the number of pending jobs.
Definition: weaver.cpp:524
TQPtrList< Thread > m_inventory
The thread inventory.
Definition: weaver.h:416
void suspended()
Thread queueing has been suspended.
TDEPIM classes for drag and drop of mails.