libtdepim

weaver.cpp
1 /*
2  This file implements the Weaver, Job and Thread classes.
3 
4  $ Author: Mirko Boehm $
5  $ Copyright: (C) 2004, Mirko Boehm $
6  $ Contact: mirko@kde.org
7  http://www.kde.org
8  http://www.hackerbuero.org $
9  $ License: LGPL with the following explicit clarification:
10  This code may be linked against any version of the TQt toolkit
11  from Troll Tech, Norway. $
12 
13 */
14 
15 extern "C" {
16 #include <signal.h>
17 }
18 
19 #include <tqevent.h>
20 #include <tqapplication.h>
21 
22 #include "weaver.h"
23 
24 namespace KPIM {
25 namespace ThreadWeaver {
26 
27  bool Debug = true;
28  int DebugLevel = 2;
29 
30  Job::Job (TQObject* parent, const char* name)
31  : TQObject (parent, name),
32  m_finished (false),
33  m_mutex (new TQMutex (true) ),
34  m_thread (0)
35  {
36  }
37 
39  {
40  }
41 
42  void Job::lock()
43  {
44  m_mutex->lock();
45  }
46 
47  void Job::unlock()
48  {
49  m_mutex->unlock();
50  }
51 
52  void Job::execute(Thread *th)
53  {
54  m_mutex->lock();
55  m_thread = th;
56  m_mutex->unlock();
57 
58  run ();
59 
60  m_mutex->lock();
61  setFinished (true);
62  m_thread = 0;
63  m_mutex->unlock();
64  }
65 
67  {
68  TQMutexLocker l (m_mutex);
69  return m_thread;
70  }
71 
72  bool Job::isFinished() const
73  {
74  TQMutexLocker l (m_mutex);
75  return m_finished;
76  }
77 
78  void Job::setFinished(bool status)
79  {
80  TQMutexLocker l (m_mutex);
81  m_finished = status;
82  }
83 
85  {
86  switch ( e->action() )
87  {
88  case Event::JobStarted:
89  emit ( started() );
90  break;
91  case Event::JobFinished:
92  emit ( done() );
93  break;
94  case Event::JobSPR:
95  emit ( SPR () );
96  m_wc->wakeOne ();
97  break;
98  case Event::JobAPR:
99  emit ( APR () );
100  // no wake here !
101  break;
102  default:
103  break;
104  }
105  }
106 
108  {
109  m_mutex->lock ();
110  m_wc = new TQWaitCondition;
111  m_mutex->unlock ();
112 
113  thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
114  m_wc->wait ();
115 
116  m_mutex->lock ();
117  delete m_wc;
118  m_wc = 0;
119  m_mutex->unlock ();
120  }
121 
123  {
124  m_mutex->lock ();
125  m_wc = new TQWaitCondition;
126  m_mutex->unlock ();
127 
129  m_wc->wait ();
130  }
131 
132  void Job::wakeAPR ()
133  {
134  TQMutexLocker l(m_mutex);
135  if ( m_wc!=0 )
136  {
137  m_wc->wakeOne ();
138  delete m_wc;
139  m_wc = 0;
140  }
141  }
142 
143  const int Event::Type = TQEvent::User + 1000;
144 
145  Event::Event ( Action action, Thread *thread, Job *job)
146  : TQCustomEvent ( type () ),
147  m_action (action),
148  m_thread (thread),
149  m_job (job)
150  {
151  }
152 
153  int Event::type ()
154  {
155  return Type;
156  }
157 
159  {
160  if ( m_thread != 0)
161  {
162  return m_thread;
163  } else {
164  return 0;
165  }
166  }
167 
168  Job* Event::job () const
169  {
170  return m_job;
171  }
172 
174  {
175  return m_action;
176  }
177 
178  unsigned int Thread::sm_Id;
179 
181  : TQThread (),
182  m_parent ( parent ),
183  m_id ( makeId() )
184  {
185  }
186 
188  {
189  }
190 
191  unsigned int Thread::makeId()
192  {
193  static TQMutex mutex;
194  TQMutexLocker l (&mutex);
195 
196  return ++sm_Id;
197  }
198 
199  unsigned int Thread::id() const
200  {
201  return m_id;
202  }
203 
204  void Thread::run()
205  {
206  Job *job = 0;
207 
209 
210  while (true)
211  {
212  debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
213 
214  job = m_parent->applyForWork ( this, job );
215 
216  if (job == 0)
217  {
218  break;
219  } else {
220  post ( Event::JobStarted, job );
221  job->execute (this);
222  post ( Event::JobFinished, job );
223  }
224  }
225 
226  post ( Event::ThreadExiting );
227  }
228 
230  {
231  m_parent->post ( a, this, j);
232  }
233 
234  void Thread::msleep(unsigned long msec)
235  {
236  TQThread::msleep(msec);
237  }
238 
239  Weaver::Weaver(TQObject* parent, const char* name,
240  int inventoryMin, int inventoryMax)
241  : TQObject(parent, name),
242  m_active(0),
243  m_inventoryMin(inventoryMin),
244  m_inventoryMax(inventoryMax),
245  m_shuttingDown(false),
246  m_running (false),
247  m_suspend (false),
248  m_mutex ( new TQMutex(true) )
249  {
250  lock();
251 
252  for ( int count = 0; count < m_inventoryMin; ++count)
253  {
254  Thread *th = new Thread(this);
255  m_inventory.append(th);
256  // this will idle the thread, waiting for a job
257  th->start();
258 
259  emit (threadCreated (th) );
260  }
261 
262  unlock();
263  }
264 
265  Weaver::~Weaver()
266  {
267  lock();
268 
269  debug ( 1, "Weaver dtor: destroying inventory.\n" );
270 
271  m_shuttingDown = true;
272 
273  unlock();
274 
275  m_jobAvailable.wakeAll();
276 
277  // problem: Some threads might not be asleep yet, just finding
278  // out if a job is available. Those threads will suspend
279  // waiting for their next job (a rare case, but not impossible).
280  // Therefore, if we encounter a thread that has not exited, we
281  // have to wake it again (which we do in the following for
282  // loop).
283 
284  for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
285  {
286  if ( !th->finished() )
287  {
288  m_jobAvailable.wakeAll();
289  th->wait();
290  }
291 
292  emit (threadDestroyed (th) );
293  delete th;
294 
295  }
296 
297  m_inventory.clear();
298 
299  delete m_mutex;
300 
301  debug ( 1, "Weaver dtor: done\n" );
302 
303  }
304 
306  {
307  debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
308  ( m_mutex->locked() ? "locked" : "not locked" ) );
309  m_mutex->lock();
310  }
311 
313  {
314  m_mutex->unlock();
315 
316  debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
317  ( m_mutex->locked() ? "locked" : "not locked" ) );
318  }
319 
320  int Weaver::threads () const
321  {
322  TQMutexLocker l (m_mutex);
323  return m_inventory.count ();
324  }
325 
326  void Weaver::enqueue(Job* job)
327  {
328  lock();
329 
330  m_assignments.append(job);
331  m_running = true;
332 
333  unlock();
334 
335  assignJobs();
336  }
337 
338  void Weaver::enqueue (TQPtrList <Job> jobs)
339  {
340  lock();
341 
342  for ( Job * job = jobs.first(); job; job = jobs.next() )
343  {
344  m_assignments.append (job);
345  }
346 
347  unlock();
348 
349  assignJobs();
350  }
351 
352  bool Weaver::dequeue ( Job* job )
353  {
354  TQMutexLocker l (m_mutex);
355  return m_assignments.remove (job);
356  }
357 
359  {
360  TQMutexLocker l (m_mutex);
361  m_assignments.clear();
362  }
363 
364  void Weaver::suspend (bool state)
365  {
366  lock();
367 
368  if (state)
369  {
370  // no need to wake any threads here
371  m_suspend = true;
372  if ( m_active == 0 && isEmpty() )
373  { // instead of waking up threads:
375  }
376  } else {
377  m_suspend = false;
378  // make sure we emit suspended () even if all threads are sleeping:
379  assignJobs ();
380  debug (2, "Weaver::suspend: queueing resumed.\n" );
381  }
382 
383  unlock();
384  }
385 
387  {
388  m_jobAvailable.wakeAll();
389  }
390 
391  bool Weaver::event (TQEvent *e )
392  {
393  if ( e->type() >= TQEvent::User )
394  {
395 
396  if ( e->type() == Event::type() )
397  {
398  Event *event = (Event*) e;
399 
400  switch (event->action() )
401  {
402  case Event::JobFinished:
403  if ( event->job() !=0 )
404  {
405  emit (jobDone (event->job() ) );
406  }
407  break;
408  case Event::Finished:
409  emit ( finished() );
410  break;
411  case Event::Suspended:
412  emit ( suspended() );
413  break;
414  case Event::ThreadSuspended:
415  if (!m_shuttingDown )
416  {
417  emit (threadSuspended ( event->thread() ) );
418  }
419  break;
420  case Event::ThreadBusy:
421  if (!m_shuttingDown )
422  {
423  emit (threadBusy (event->thread() ) );
424  }
425  break;
426  default:
427  break;
428  }
429 
430  if ( event->job() !=0 )
431  {
432  event->job()->processEvent (event);
433  }
434  } else {
435  debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
436  }
437  return true;
438  } else {
439  // others - please make sure we are a TQObject!
440  return TQObject::event ( e );
441  }
442  }
443 
445  {
446  Event *e = new Event ( a, t, j);
447  TQApplication::postEvent (this, e);
448  }
449 
450  bool Weaver::isEmpty() const
451  {
452  TQMutexLocker l (m_mutex);
453  return m_assignments.count()==0;
454  }
455 
457  {
458  Job *rc = 0;
459  bool lastjob = false;
460  bool suspended = false;
461 
462  while (true)
463  {
464  lock();
465 
466  if (previous != 0)
467  { // cleanup and send events:
468  --m_active;
469 
470  debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
471  "%i active jobs left.\n",
472  queueLength(), m_active );
473 
474  if ( m_active == 0 && isEmpty() )
475  {
476  lastjob = true;
477  m_running = false;
478  post (Event::Finished);
479  debug ( 3, "Weaver::applyForWork: last job.\n" );
480  }
481 
482  if (m_active == 0 && m_suspend == true)
483  {
484  suspended = true;
486  debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
487  }
488 
489  m_jobFinished.wakeOne();
490  }
491 
492  previous = 0;
493 
494  if (m_shuttingDown == true)
495  {
496  unlock();
497 
498  return 0;
499  } else {
500  if ( !isEmpty() && m_suspend == false )
501  {
502  rc = m_assignments.getFirst();
503  m_assignments.removeFirst ();
504  ++m_active;
505 
506  debug ( 3, "Weaver::applyForWork: job assigned, "
507  "%i jobs in queue (%i active).\n",
508  m_assignments.count(), m_active );
509  unlock();
510 
511  post (Event::ThreadBusy, th);
512 
513  return rc;
514  } else {
515  unlock();
516 
517  post (Event::ThreadSuspended, th);
518  m_jobAvailable.wait();
519  }
520  }
521  }
522  }
523 
525  {
526  TQMutexLocker l (m_mutex);
527  return m_assignments.count();
528  }
529 
530  bool Weaver::isIdle () const
531  {
532  TQMutexLocker l (m_mutex);
533  return isEmpty() && m_active == 0;
534  }
535 
537  {
538  while ( !isIdle() )
539  {
540  debug (2, "Weaver::finish: not done, waiting.\n" );
541  m_jobFinished.wait();
542  }
543  debug (1, "Weaver::finish: done.\n\n\n" );
544  }
545 
546 }
547 }
548 
549 #include "weaver.moc"
A class to represent the events threads generate and send to the Weaver object.
Definition: weaver.h:100
Thread * thread() const
The ID of the sender thread.
Definition: weaver.cpp:158
static int type()
Return the (custom defined) event type.
Definition: weaver.cpp:153
Action action() const
The action.
Definition: weaver.cpp:173
@ Suspended
All jobs in the queue are done.
Definition: weaver.h:105
@ ThreadStarted
Thread queueing halted.
Definition: weaver.h:106
@ JobAPR
Synchronous Process Request.
Definition: weaver.h:113
Job * job() const
The associated job.
Definition: weaver.cpp:168
A Job is a simple abstraction of an action that is to be executed in a thread context.
Definition: weaver.h:164
void triggerSPR()
Trigger a SPR.
Definition: weaver.cpp:107
virtual void execute(Thread *)
Perform the job.
Definition: weaver.cpp:52
virtual ~Job()
Destructor.
Definition: weaver.cpp:38
void unlock()
Unlock this Job's mutex.
Definition: weaver.cpp:47
Job(TQObject *parent=0, const char *name=0)
Construct a Job object.
Definition: weaver.cpp:30
void SPR()
This signal is emitted when the job needs some operation done by the main thread (usually the creator...
void lock()
Lock this Job's mutex.
Definition: weaver.cpp:42
virtual bool isFinished() const
Returns true if the jobs's execute method finished.
Definition: weaver.cpp:72
virtual void run()=0
The method that actually performs the job.
virtual void processEvent(Event *)
Process events related to this job (created by the processing thread or the weaver or whoever).
Definition: weaver.cpp:84
void APR()
Perform an Asynchronous Process Request.
void triggerAPR()
Trigger an APR.
Definition: weaver.cpp:122
void started()
This signal is emitted when a thread starts to process a job.
virtual void setFinished(bool status)
Call with status = true to mark this job as done.
Definition: weaver.cpp:78
void wakeAPR()
Wake the thread after an APR has been processed.
Definition: weaver.cpp:132
void done()
This signal is emitted when a job has been finished.
Thread * thread()
Return the thread that executes this job.
Definition: weaver.cpp:66
The class Thread is used to represent the worker threads in the weaver's inventory.
Definition: weaver.h:250
Thread(Weaver *parent)
Create a thread.
Definition: weaver.cpp:180
unsigned int id() const
Returns the thread id.
Definition: weaver.cpp:199
void run()
Overloaded to execute the assigned job.
Definition: weaver.cpp:204
void post(Event::Action, Job *=0)
Post an event, will be received and processed by the Weaver.
Definition: weaver.cpp:229
~Thread()
The destructor.
Definition: weaver.cpp:187
A weaver is the manager of worker threads (Thread objects) to which it assigns jobs from it's queue.
Definition: weaver.h:297
void jobDone(Job *)
This signal is emitted when a job is done.
void assignJobs()
Schedule enqueued jobs to be executed by idle threads.
Definition: weaver.cpp:386
virtual void dequeue()
Remove all queued jobs.
Definition: weaver.cpp:358
virtual void enqueue(Job *)
Add a job to be executed.
Definition: weaver.cpp:326
virtual void finish()
Get notified when a thread has finished a job.
Definition: weaver.cpp:536
bool m_suspend
If m_suspend is true, no new jobs will be assigned to threads.
Definition: weaver.h:442
bool m_running
m_running is set to true when a job is enqueued and set to false when the job finishes that was the l...
Definition: weaver.h:437
int threads() const
Returns the current number of threads in the inventory.
Definition: weaver.cpp:320
virtual void suspend(bool state)
Suspend job execution if state = true, otherwise resume job execution if it was suspended.
Definition: weaver.cpp:364
TQWaitCondition m_jobAvailable
Wait condition all idle or done threads wait for.
Definition: weaver.h:427
void lock()
Lock the mutex for this weaver.
Definition: weaver.cpp:305
void post(Event::Action, Thread *=0, Job *=0)
Post an event that is handled by this object, but in the main (GUI) thread.
Definition: weaver.cpp:444
virtual Job * applyForWork(Thread *thread, Job *previous)
Assign a job to the calling thread.
Definition: weaver.cpp:456
bool event(TQEvent *)
Check incoming events for user defined ones.
Definition: weaver.cpp:391
int m_active
The number of jobs that are assigned to the worker threads, but not finished.
Definition: weaver.h:421
bool isEmpty() const
Is the queue empty?
Definition: weaver.cpp:450
TQWaitCondition m_jobFinished
Wait for a job to finish.
Definition: weaver.h:429
void unlock()
Unlock.
Definition: weaver.cpp:312
bool isIdle() const
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition: weaver.cpp:530
bool m_shuttingDown
Indicates if the weaver is shutting down and exiting it's threads.
Definition: weaver.h:432
TQPtrList< Job > m_assignments
The job queue.
Definition: weaver.h:418
void finished()
This signal is emitted when the Weaver has finished ALL currently queued jobs.
int queueLength()
Returns the number of pending jobs.
Definition: weaver.cpp:524
TQPtrList< Thread > m_inventory
The thread inventory.
Definition: weaver.h:416
void suspended()
Thread queueing has been suspended.
TDEPIM classes for drag and drop of mails.