20 #include "tdeio/sessiondata.h"
21 #include "tdeio/slaveconfig.h"
22 #include "tdeio/scheduler.h"
23 #include "tdeio/authinfo.h"
24 #include "tdeio/slave.h"
25 #include <tqptrlist.h>
28 #include <dcopclient.h>
31 #include <tdeglobal.h>
32 #include <tdeprotocolmanager.h>
33 #include <kprotocolinfo.h>
35 #include <kstaticdeleter.h>
36 #include <tdesu/client.h>
41 #define MAX_SLAVE_IDLE (3*60)
43 using namespace TDEIO;
45 template class TQDict<TDEIO::Scheduler::ProtocolInfo>;
49 class TDEIO::SlaveList:
public TQPtrList<Slave>
70 class TDEIO::Scheduler::JobData
73 JobData() : checkOnHold(false) { }
81 class TDEIO::Scheduler::ExtraJobData:
public TQPtrDict<TDEIO::Scheduler::JobData>
84 ExtraJobData() { setAutoDelete(
true); }
87 class TDEIO::Scheduler::ProtocolInfo
90 ProtocolInfo() : maxSlaves(1), skipCount(0)
92 joblist.setAutoDelete(
false);
95 TQPtrList<SimpleJob> joblist;
96 SlaveList activeSlaves;
102 class TDEIO::Scheduler::ProtocolInfoDict :
public TQDict<TDEIO::Scheduler::ProtocolInfo>
105 ProtocolInfoDict() { }
107 TDEIO::Scheduler::ProtocolInfo *
get(
const TQString &protocol);
110 TDEIO::Scheduler::ProtocolInfo *
111 TDEIO::Scheduler::ProtocolInfoDict::get(
const TQString &protocol)
113 ProtocolInfo *info = find(protocol);
116 info =
new ProtocolInfo;
117 info->protocol = protocol;
120 insert(protocol, info);
126 Scheduler::Scheduler()
127 : DCOPObject(
"TDEIO::Scheduler" ),
128 TQObject(kapp,
"scheduler"),
129 slaveTimer(0,
"Scheduler::slaveTimer"),
130 coSlaveTimer(0,
"Scheduler::coSlaveTimer"),
131 cleanupTimer(0,
"Scheduler::cleanupTimer")
135 protInfoDict =
new ProtocolInfoDict;
136 slaveList =
new SlaveList;
137 idleSlaves =
new SlaveList;
138 coIdleSlaves =
new SlaveList;
139 extraJobData =
new ExtraJobData;
140 sessionData =
new SessionData;
141 slaveConfig = SlaveConfig::self();
142 connect(&slaveTimer, TQ_SIGNAL(timeout()), TQ_SLOT(startStep()));
143 connect(&coSlaveTimer, TQ_SIGNAL(timeout()), TQ_SLOT(slotScheduleCoSlave()));
144 connect(&cleanupTimer, TQ_SIGNAL(timeout()), TQ_SLOT(slotCleanIdleSlaves()));
148 Scheduler::~Scheduler()
150 protInfoDict->setAutoDelete(
true);
151 delete protInfoDict; protInfoDict = 0;
152 delete idleSlaves; idleSlaves = 0;
153 delete coIdleSlaves; coIdleSlaves = 0;
154 slaveList->setAutoDelete(
true);
155 delete slaveList; slaveList = 0;
156 delete extraJobData; extraJobData = 0;
157 delete sessionData; sessionData = 0;
162 Scheduler::debug_info()
166 bool Scheduler::process(
const TQCString &fun,
const TQByteArray &data, TQCString &replyType, TQByteArray &replyData )
168 if ( fun !=
"reparseSlaveConfiguration(TQString)" )
169 return DCOPObject::process( fun, data, replyType, replyData );
171 slaveConfig = SlaveConfig::self();
173 TQDataStream stream( data, IO_ReadOnly );
177 kdDebug( 7006 ) <<
"reparseConfiguration( " << proto <<
" )" << endl;
179 slaveConfig->
reset();
180 sessionData->reset();
183 Slave *slave = slaveList->first();
184 for (; slave; slave = slaveList->next() )
187 slave->
send( CMD_REPARSECONFIGURATION );
193 QCStringList Scheduler::functions()
195 QCStringList funcs = DCOPObject::functions();
196 funcs <<
"void reparseSlaveConfiguration(TQString)";
201 JobData *jobData =
new JobData;
204 if (job->command() == CMD_GET)
206 jobData->checkOnHold = checkOnHold;
209 extraJobData->replace(job, jobData);
211 slaveTimer.start(0,
true);
213 if (newJobs.count() > 150)
214 kdDebug() <<
"WARNING - TDEIO::Scheduler got more than 150 jobs! This shows a misuse in your app (yes, a job is a TQObject)." << endl;
218 void Scheduler::_scheduleJob(
SimpleJob *job) {
219 newJobs.removeRef(job);
220 JobData *jobData = extraJobData->find(job);
223 kdFatal(7006) <<
"BUG! _ScheduleJob(): No extraJobData for job!" << endl;
226 TQString protocol = jobData->protocol;
228 ProtocolInfo *protInfo = protInfoDict->get(protocol);
229 protInfo->joblist.append(job);
231 slaveTimer.start(0,
true);
234 void Scheduler::_cancelJob(
SimpleJob *job) {
236 Slave *slave = job->slave();
240 JobData *jobData = extraJobData->find(job);
244 newJobs.removeRef(job);
245 ProtocolInfo *protInfo = protInfoDict->get(jobData->protocol);
246 protInfo->joblist.removeRef(job);
249 slave = slaveList->first();
250 for(; slave; slave = slaveList->next())
252 JobList *list = coSlaves.find(slave);
253 if (list && list->removeRef(job))
259 extraJobData->remove(job);
263 kdDebug(7006) <<
"Scheduler: killing slave " << slave->slave_pid() << endl;
265 _jobFinished( job, slave );
266 slotSlaveDied( slave);
269 void Scheduler::startStep()
271 while(newJobs.count())
273 (void) startJobDirect();
275 TQDictIterator<TDEIO::Scheduler::ProtocolInfo> it(*protInfoDict);
278 if (startJobScheduled(it.current()))
return;
283 void Scheduler::setupSlave(
TDEIO::Slave *slave,
const KURL &url,
const TQString &protocol,
const TQString &proxy ,
bool newSlave,
const TDEIO::MetaData *config)
285 TQString host = url.host();
286 int port = url.port();
287 TQString user = url.user();
288 TQString passwd = url.pass();
291 (slave->
host() != host) ||
292 (slave->
port() != port) ||
293 (slave->
user() != user) ||
294 (slave->
passwd() != passwd))
296 slaveConfig = SlaveConfig::self();
299 sessionData->configDataFor( configData, protocol, host );
301 configData[
"UseProxy"] = proxy;
303 TQString autoLogin = configData[
"EnableAutoLogin"].lower();
304 if ( autoLogin ==
"true" )
308 bool usern = (protocol ==
"ftp");
311 configData[
"autoLoginUser"] = l.login;
312 configData[
"autoLoginPass"] = l.password;
316 TQMap<TQString, TQStringList>::ConstIterator it = l.macdef.begin();
317 for ( ; it != l.macdef.end(); ++it )
318 macdef += it.key() +
'\\' + it.data().join(
"\\" ) +
'\n';
319 configData[
"autoLoginMacro"] = macdef;
324 configData += *config;
326 slave->setProtocol(url.protocol());
327 slave->
setHost(host, port, user, passwd);
331 bool Scheduler::startJobScheduled(ProtocolInfo *protInfo)
333 if (protInfo->joblist.isEmpty())
338 bool newSlave =
false;
343 if (protInfo->skipCount > 2)
348 protInfo->skipCount = 0;
349 job = protInfo->joblist.at(0);
350 slave = findIdleSlave(protInfo, job, dummy );
356 Slave *firstSlave = 0;
357 for(uint i = 0; (i < protInfo->joblist.count()) && (i < 10); i++)
359 job = protInfo->joblist.at(i);
360 slave = findIdleSlave(protInfo, job, exact);
376 protInfo->skipCount = 0;
378 protInfo->skipCount++;
383 if ( protInfo->maxSlaves >
static_cast<int>(protInfo->activeSlaves.count()) )
386 slave = createSlave(protInfo, job, job->
url());
388 slaveTimer.start(0,
true);
399 protInfo->activeSlaves.append(slave);
400 idleSlaves->removeRef(slave);
401 protInfo->joblist.removeRef(job);
405 JobData *jobData = extraJobData->find(job);
406 setupSlave(slave, job->
url(), jobData->protocol, jobData->proxy, newSlave);
409 slaveTimer.start(0,
true);
413 bool Scheduler::startJobDirect()
417 JobData *jobData = extraJobData->find(job);
420 kdFatal(7006) <<
"BUG! startjobDirect(): No extraJobData for job!"
424 TQString protocol = jobData->protocol;
425 ProtocolInfo *protInfo = protInfoDict->get(protocol);
427 bool newSlave =
false;
431 Slave *slave = findIdleSlave(protInfo, job, dummy);
436 slave = createSlave(protInfo, job, job->
url());
442 idleSlaves->removeRef(slave);
445 setupSlave(slave, job->
url(), protocol, jobData->proxy, newSlave);
450 static Slave *searchIdleList(SlaveList *idleSlaves,
const KURL &url,
const TQString &protocol,
bool &exact)
452 TQString host = url.
host();
453 int port = url.port();
454 TQString user = url.user();
457 for(
Slave *slave = idleSlaves->first();
459 slave = idleSlaves->next())
462 (host == slave->
host()) &&
463 (port == slave->
port()) &&
464 (user == slave->
user()))
471 for(
Slave *slave = idleSlaves->first();
473 slave = idleSlaves->next())
481 Slave *Scheduler::findIdleSlave(ProtocolInfo *,
SimpleJob *job,
bool &exact)
484 JobData *jobData = extraJobData->find(job);
487 kdFatal(7006) <<
"BUG! findIdleSlave(): No extraJobData for job!" << endl;
490 if (jobData->checkOnHold)
492 slave = Slave::holdSlave(jobData->protocol, job->
url());
499 bool bCanReuse = (job->command() == CMD_GET);
503 bCanReuse = (job->command() == CMD_GET || job->command() == CMD_SPECIAL);
507 TQString resume = (!outgoing.contains(
"resume")) ? TQString() : outgoing[
"resume"];
508 kdDebug(7006) <<
"Resume metadata is '" << resume <<
"'" << endl;
509 bCanReuse = (resume.isEmpty() || resume ==
"0");
515 if (job->
url() == urlOnHold)
517 kdDebug(7006) <<
"HOLD: Reusing held slave for " << urlOnHold.prettyURL() << endl;
522 kdDebug(7006) <<
"HOLD: Discarding held slave (" << urlOnHold.prettyURL() <<
")" << endl;
532 return searchIdleList(idleSlaves, job->
url(), jobData->protocol, exact);
535 Slave *Scheduler::createSlave(ProtocolInfo *protInfo,
SimpleJob *job,
const KURL &url)
542 slaveList->append(slave);
543 idleSlaves->append(slave);
546 connect(slave, TQ_SIGNAL(slaveStatus(pid_t,
const TQCString &,
const TQString &,
bool)),
547 TQ_SLOT(slotSlaveStatus(pid_t,
const TQCString &,
const TQString &,
bool)));
549 connect(slave,TQ_SIGNAL(authorizationKey(
const TQCString&,
const TQCString&,
bool)),
550 sessionData,TQ_SLOT(slotAuthData(
const TQCString&,
const TQCString&,
bool)));
551 connect(slave,TQ_SIGNAL(delAuthorization(
const TQCString&)), sessionData,
552 TQ_SLOT(slotDelAuthData(
const TQCString&)));
556 kdError() <<
": couldn't create slave : " << errortext << endl;
559 protInfo->joblist.removeRef(job);
560 extraJobData->remove(job);
561 job->slotError( error, errortext );
567 void Scheduler::slotSlaveStatus(pid_t,
const TQCString &,
const TQString &,
bool)
573 JobData *jobData = extraJobData->take(job);
576 kdFatal(7006) <<
"BUG! _jobFinished(): No extraJobData for job!" << endl;
579 ProtocolInfo *protInfo = protInfoDict->get(jobData->protocol);
581 slave->disconnect(job);
582 protInfo->activeSlaves.removeRef(slave);
585 JobList *list = coSlaves.find(slave);
588 assert(slave->isConnected());
589 assert(!coIdleSlaves->contains(slave));
590 coIdleSlaves->append(slave);
591 if (!list->isEmpty())
592 coSlaveTimer.start(0,
true);
597 assert(!slave->isConnected());
598 idleSlaves->append(slave);
604 if (protInfo->joblist.count())
606 slaveTimer.start(0,
true);
613 ProtocolInfo *protInfo = protInfoDict->get(slave->
slaveProtocol());
614 protInfo->activeSlaves.removeRef(slave);
615 if (slave == slaveOnHold)
620 idleSlaves->removeRef(slave);
621 JobList *list = coSlaves.find(slave);
625 disconnectSlave(slave);
628 if (!slaveList->removeRef(slave))
629 kdDebug(7006) <<
"Scheduler: BUG!! Slave " << slave <<
"/" << slave->slave_pid() <<
" died, but is NOT in slaveList!!!\n" << endl;
634 void Scheduler::slotCleanIdleSlaves()
636 for(
Slave *slave = idleSlaves->first();slave;)
638 if (slave->
idleTime() >= MAX_SLAVE_IDLE)
641 Slave *removeSlave = slave;
642 slave = idleSlaves->next();
643 idleSlaves->removeRef(removeSlave);
644 slaveList->removeRef(removeSlave);
646 removeSlave->deref();
650 slave = idleSlaves->next();
656 void Scheduler::_scheduleCleanup()
658 if (idleSlaves->count())
660 if (!cleanupTimer.isActive())
661 cleanupTimer.start( MAX_SLAVE_IDLE*1000,
true );
667 Slave *slave = job->slave();
668 slave->disconnect(job);
679 void Scheduler::_publishSlaveOnHold()
684 slaveOnHold->
hold(urlOnHold);
687 void Scheduler::_removeSlaveOnHold()
698 Scheduler::_getConnectedSlave(
const KURL &url,
const TDEIO::MetaData &config )
703 Slave *slave = searchIdleList(idleSlaves, url, protocol, dummy);
706 ProtocolInfo *protInfo = protInfoDict->get(protocol);
707 slave = createSlave(protInfo, 0, url);
711 idleSlaves->removeRef(slave);
713 setupSlave(slave, url, protocol, proxy,
true, &config);
715 slave->
send( CMD_CONNECT );
716 connect(slave, TQ_SIGNAL(connected()),
717 TQ_SLOT(slotSlaveConnected()));
718 connect(slave, TQ_SIGNAL(error(
int,
const TQString &)),
719 TQ_SLOT(slotSlaveError(
int,
const TQString &)));
721 coSlaves.insert(slave,
new TQPtrList<SimpleJob>());
727 Scheduler::slotScheduleCoSlave()
730 slaveConfig = SlaveConfig::self();
731 for(
Slave *slave = coIdleSlaves->first();
735 nextSlave = coIdleSlaves->next();
736 JobList *list = coSlaves.find(slave);
738 if (list && !list->isEmpty())
741 coIdleSlaves->removeRef(slave);
744 assert(!coIdleSlaves->contains(slave));
746 KURL url =job->
url();
747 TQString host = url.host();
748 int port = url.port();
750 if (slave->
host() ==
"<reset>")
752 TQString user = url.user();
753 TQString passwd = url.pass();
757 slave->setProtocol(url.protocol());
758 slave->
setHost(host, port, user, passwd);
761 assert(slave->
protocol() == url.protocol());
762 assert(slave->
host() == host);
763 assert(slave->
port() == port);
770 Scheduler::slotSlaveConnected()
774 slave->setConnected(
true);
775 disconnect(slave, TQ_SIGNAL(connected()),
776 this, TQ_SLOT(slotSlaveConnected()));
777 emit slaveConnected(slave);
778 assert(!coIdleSlaves->contains(slave));
779 coIdleSlaves->append(slave);
780 coSlaveTimer.start(0,
true);
784 Scheduler::slotSlaveError(
int errorNr,
const TQString &errorMsg)
787 if (!slave->isConnected() || (coIdleSlaves->find(slave) != -1))
790 emit slaveError(slave, errorNr, errorMsg);
801 (!newJobs.removeRef(job)))
803 kdDebug(7006) <<
"_assignJobToSlave(): ERROR, nonmatching or unknown job." << endl;
808 JobList *list = coSlaves.find(slave);
812 kdDebug(7006) <<
"_assignJobToSlave(): ERROR, unknown slave." << endl;
817 assert(list->contains(job) == 0);
819 coSlaveTimer.start(0,
true);
828 JobList *list = coSlaves.take(slave);
833 while(!list->isEmpty())
835 Job *job = list->take(0);
839 coIdleSlaves->removeRef(slave);
840 assert(!coIdleSlaves->contains(slave));
841 disconnect(slave, TQ_SIGNAL(connected()),
842 this, TQ_SLOT(slotSlaveConnected()));
843 disconnect(slave, TQ_SIGNAL(error(
int,
const TQString &)),
844 this, TQ_SLOT(slotSlaveError(
int,
const TQString &)));
847 idleSlaves->append(slave);
848 slave->
send( CMD_DISCONNECT );
850 slave->setConnected(
false);
857 Scheduler::_checkSlaveOnHold(
bool b)
863 Scheduler::_registerWindow(TQWidget *wid)
869 if (!m_windowList.contains(obj))
874 WId windowId = wid->winId();
875 m_windowList.insert(obj, windowId);
876 connect(wid, TQ_SIGNAL(destroyed(TQObject *)),
879 TQDataStream stream(params, IO_WriteOnly);
881 if( !kapp->dcopClient()->send(
"kded",
"kded",
882 "registerWindowId(long int)", params ) )
883 kdDebug(7006) <<
"Could not register window with kded!" << endl;
893 TQMap<TQObject *, WId>::Iterator it = m_windowList.find(obj);
894 if (it == m_windowList.end())
896 WId windowId = it.data();
897 disconnect( it.key(), TQ_SIGNAL(destroyed(TQObject *)),
899 m_windowList.remove( it );
903 TQDataStream stream(params, IO_WriteOnly);
905 kapp->dcopClient()->send(
"kded",
"kded",
906 "unregisterWindowId(long int)", params );
917 void Scheduler::virtual_hook(
int id,
void* data )
918 { DCOPObject::virtual_hook(
id, data ); }
922 #include "scheduler.moc"
static int maxSlaves(const TQString &protocol)
Returns the soft limit on the number of slaves for this protocol.
static TQString slaveProtocol(const KURL &url, TQString &proxy)
Return the protocol to use in order to handle the given url It's usually the same,...
static void reparseConfiguration()
Force a reload of the general config file of io-slaves ( tdeioslaverc).
void close()
Closes the connection.
The base class for all jobs.
virtual void kill(bool quietly=true)
Abort this job.
bool lookup(const KURL &url, AutoLogin &login, bool userealnetrc=false, TQString type=TQString::null, int mode=(exactOnly|defaultOnly))
Looks up the login information for the given url.
void reload()
Reloads the auto login information.
static NetRC * self()
A reference to the instance of the class.
The TDEIO::Scheduler manages io-slaves for the application.
static bool connect(const char *signal, const TQObject *receiver, const char *member)
Function to connect signals emitted by the scheduler.
void slotUnregisterWindow(TQObject *)
A simple job (one url and one command).
const KURL & url() const
Returns the SimpleJob's URL.
virtual void kill(bool quietly=true)
Abort job.
MetaData configData(const TQString &protocol, const TQString &host)
Query slave configuration for slaves of type protocol when dealing with host.
void reset()
Undo any changes made by calls to setConfigData.
Attention developers: If you change the implementation of TDEIO::Slave, do not use connection() or sl...
static Slave * createSlave(const TQString &protocol, const KURL &url, int &error, TQString &error_text)
Creates a new slave.
TQString slaveProtocol()
The actual protocol used to handle the request.
void suspend()
Suspends the operation of the attached tdeioslave.
void kill()
Force termination.
void setIdle()
Marks this slave as idle.
TQString protocol()
The protocol this slave handles.
void setHost(const TQString &host, int port, const TQString &user, const TQString &passwd)
Set host for url.
void hold(const KURL &url)
Puts the tdeioslave associated with url at halt.
void send(int cmd, const TQByteArray &data=TQByteArray())
Sends the given command to the tdeioslave.
TDE_DEPRECATED Connection * connection()
void resetHost()
Clear host info.
void setConfig(const MetaData &config)
Configure slave.
The transfer job pumps data into and/or out of a Slave.
A namespace for TDEIO globals.
TDEIO_EXPORT TransferJob * get(const KURL &url, bool reload=false, bool showProgressInfo=true)
Get (a.k.a.
Contains auto login information.