kdecore Library API Documentation

kresolvermanager.cpp

00001 /*  -*- C++ -*-
00002  *  Copyright (C) 2003 Thiago Macieira <thiago.macieira@kdemail.net>
00003  *
00004  *
00005  *  Permission is hereby granted, free of charge, to any person obtaining
00006  *  a copy of this software and associated documentation files (the
00007  *  "Software"), to deal in the Software without restriction, including
00008  *  without limitation the rights to use, copy, modify, merge, publish,
00009  *  distribute, sublicense, and/or sell copies of the Software, and to
00010  *  permit persons to whom the Software is furnished to do so, subject to
00011  *  the following conditions:
00012  *
00013  *  The above copyright notice and this permission notice shall be included 
00014  *  in all copies or substantial portions of the Software.
00015  *
00016  *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
00017  *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
00018  *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
00019  *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
00020  *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
00021  *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
00022  *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00023  */
00024 
00025 #include "config.h"
00026 
00027 #include <sys/types.h>
00028 #include <netinet/in.h>
00029 #include <limits.h>
00030 #include <unistd.h>     // only needed for pid_t
00031 
00032 #ifdef HAVE_RES_INIT
00033 # include <sys/stat.h>
00034 # include <resolv.h>
00035 #endif
00036 
00037 #include <qapplication.h>
00038 #include <qstring.h>
00039 #include <qcstring.h>
00040 #include <qptrlist.h>
00041 #include <qtimer.h>
00042 #include <qmutex.h>
00043 #include <qthread.h>
00044 #include <qwaitcondition.h>
00045 #include <qsemaphore.h>
00046 
00047 #include "kresolver.h"
00048 #include "kresolver_p.h"
00049 #include "kresolverworkerbase.h"
00050 #include "kresolverstandardworkers_p.h"
00051 
00052 using namespace KNetwork;
00053 using namespace KNetwork::Internal;
00054 
00055 /*
00056  * Explanation on how the resolver system works
00057 
00058    When KResolver::start is called, it calls KResolverManager::enqueue to add
00059    an entry to the queue. KResolverManager::enqueue will verify the availability
00060    of a worker thread: if one is available, it will dispatch the request to it.
00061    If no threads are available, it will then decide whether to launch a thread
00062    or to queue for the future.
00063 
00064    (This process is achieved by always queueing the new request, starting a
00065    new thread if necessary and then notifying of the availability of data
00066    to all worker threads).
00067 
00068  * Worker thread
00069    A new thread, when started, will enter its event loop
00070    immediately. That is, it'll first try to acquire new data to
00071    process, which means it will lock and unlock the manager mutex in
00072    the process.
00073 
00074    If it finds no new data, it'll wait on the feedWorkers condition
00075    for a certain maximum time. If that time expires and there's still
00076    no data, the thread will exit, in order to save system resources.
00077 
00078    If it finds data, however, it'll set up and call the worker class
00079    that has been selected by the manager. Once that worker is done,
00080    the thread releases the data through KResolverManager::releaseData.
00081 
00082  * Data requesting/releasing
00083    A worker thread always calls upon functions on the resolver manager
00084    in order to acquire and release data.
00085 
00086    When data is being requested, the KResolverManager::requestData
00087    function will look the currentRequests list and return the first
00088    Queued request it finds, while marking it to be InProgress.
00089 
00090    When the worker class has returned, the worker thread will release
00091    that data through the KResolverManager::releaseData function. If the
00092    worker class has requested no further data (nRequests == 0), the
00093    request's status is marked to be Done. It'll then look at the
00094    requestor for that data: if it was requested by another worker,
00095    it'll decrement the requests count for that one and add the results
00096    to a list. And, finally, if the requests count for the requestor
00097    becomes 0, it'll repeat this process for the requestor as well
00098    (change status to Done, check for a requestor).
00099  */
00100 
00101 namespace
00102 {
00103 
00104 /*
00105  * This class is used to control the access to the
00106  * system's resolver API.
00107  *
00108  * It is necessary to periodically poll /etc/resolv.conf and reload
00109  * it if any changes are noticed. This class does exactly that.
00110  *
00111  * However, there's also the problem of reloading the structure while
00112  * some threads are in progress. Therefore, we keep a usage reference count.
00113  */
00114 class ResInitUsage
00115 {
00116 #ifdef HAVE_RES_INIT
00117   time_t mTime;
00118   QWaitCondition cond;
00119   QMutex mutex;
00120   int useCount;
00121 
00122   bool shouldResInit()
00123   {
00124     // check that /etc/resolv.conf has changed 
00125     struct stat st;
00126     if (stat("/etc/resolv.conf", &st) != 0)
00127       return false;
00128     
00129     if (mTime < st.st_mtime)
00130       {
00131     //qDebug("ResInitUsage: /etc/resolv.conf updated");
00132     return true;
00133       }
00134     return false;
00135   }
00136 
00137   void reResInit()
00138   {
00139     //qDebug("ResInitUsage: calling res_init()");
00140     res_init();
00141     
00142     struct stat st;
00143     if (stat("/etc/resolv.conf", &st) == 0)
00144       mTime = st.st_mtime;
00145   }
00146 
00147 public:
00148   ResInitUsage()
00149     : mTime(0), useCount(0)
00150   { }
00151 
00152   /*
00153    * Marks the end of usage to the resolver tools
00154    */
00155   void operator--(int)
00156   {
00157     mutex.lock();
00158     if (--useCount == 0)
00159       // we've reached 0, wake up anyone that's waiting to call res_init
00160       cond.wakeAll(); 
00161     mutex.unlock();
00162   }
00163 
00164   /*
00165    * Marks the beginning of usage of the resolver API
00166    */
00167   void operator++(int)
00168   {
00169     mutex.lock();
00170 
00171     if (shouldResInit())
00172       {
00173     if (useCount)
00174       {
00175         // other threads are already using the API, so wait till
00176         // it's all clear
00177         //qDebug("ResInitUsage: waiting for libresolv to be clear");
00178         cond.wait(&mutex);
00179       }
00180     reResInit();
00181       }
00182     useCount++;
00183     mutex.unlock();
00184   }
00185 
00186 #else
00187 public:
00188   ResInitUsage()
00189   { }
00190 
00191   void operator--(int)
00192   { }
00193 
00194   void operator++(int)
00195   { }
00196 #endif
00197 
00198 } resInit;
00199 
00200 } // anonymous namespace
00201 
00202 /*
00203  * parameters
00204  */
00205 // a thread will try maxThreadRetries to get data, waiting at most
00206 // maxThreadWaitTime milliseconds between each attempt. After that, it'll
00207 // exit
00208 static const int maxThreadWaitTime = ULONG_MAX; // wait forever
00209 static const int maxThreads = 5;
00210 
00211 static pid_t pid;       // FIXME -- disable when everything is ok
00212 
00213 KResolverThread::KResolverThread()
00214   : data(0L)
00215 {
00216 }
00217 
00218 // remember! This function runs in a separate thread!
00219 void KResolverThread::run()
00220 {
00221   // initialisation
00222   // enter the loop already
00223 
00224   //qDebug("KResolverThread(thread %u/%p): started", pid, (void*)QThread::currentThread());
00225   KResolverManager::manager()->registerThread(this);
00226   while (true)
00227     {
00228       data = KResolverManager::manager()->requestData(this, ::maxThreadWaitTime);
00229       //qDebug("KResolverThread(thread %u/%p) got data %p", KResolverManager::pid, 
00230       //       (void*)QThread::currentThread(), (void*)data);
00231       if (data)
00232     {
00233       // yes, we got data
00234       // process it!
00235       
00236       // 1) set up
00237       ;
00238       
00239       // 2) run it
00240       data->worker->run();
00241       
00242       // 3) release data
00243       KResolverManager::manager()->releaseData(this, data);
00244       
00245       // now go back to the loop
00246     }
00247       else
00248     break;
00249     }
00250 
00251   KResolverManager::manager()->unregisterThread(this);
00252   //qDebug("KResolverThread(thread %u/%p): exiting", pid, (void*)QThread::currentThread());
00253 }
00254 
00255 static KResolverManager *globalManager;
00256 
00257 KResolverManager* KResolverManager::manager()
00258 {
00259   if (globalManager == 0L)
00260     new KResolverManager();
00261   return globalManager;
00262 }
00263 
00264 KResolverManager::KResolverManager()
00265   : runningThreads(0), availableThreads(0)
00266 {
00267   globalManager = this;
00268   workers.setAutoDelete(true);
00269   currentRequests.setAutoDelete(true);
00270   initStandardWorkers();
00271 
00272   pid = getpid();
00273 }
00274 
00275 KResolverManager::~KResolverManager()
00276 {
00277   // this should never be called
00278 
00279   // kill off running threads
00280   for (workers.first(); workers.current(); workers.next())
00281     workers.current()->terminate();
00282 }
00283 
00284 void KResolverManager::registerThread(KResolverThread* )
00285 {
00286 }
00287 
00288 void KResolverManager::unregisterThread(KResolverThread*)
00289 {
00290 }
00291 
00292 // this function is called by KResolverThread::run
00293 RequestData* KResolverManager::requestData(KResolverThread *th, int maxWaitTime)
00294 {
00296   // This function is called in a worker thread!!
00298 
00299   resInit++;
00300 
00301   // lock the mutex, so that the manager thread or other threads won't
00302   // interfere.
00303   QMutexLocker locker(&mutex);
00304   RequestData *data = findData(th);
00305 
00306   if (data)
00307     // it found something, that's good
00308     return data;
00309 
00310   // nope, nothing found; sleep for a while
00311   availableThreads++;
00312   feedWorkers.wait(&mutex, maxWaitTime);
00313   availableThreads--;
00314 
00315   data = findData(th);
00316   if (data == 0L)
00317     {
00318       // if we could find no data, this thread will exit
00319       runningThreads--;
00320       resInit--;
00321     }
00322   return data;
00323 }
00324 
00325 RequestData* KResolverManager::findData(KResolverThread* th)
00326 {
00328   // This function is called by @ref requestData above and must
00329   // always be called with a locked mutex
00331 
00332   // now find data to be processed
00333   for (RequestData *curr = newRequests.first(); curr; curr = newRequests.next())
00334     if (!curr->worker->m_finished)
00335       {
00336     // found one
00337     if (curr->obj)
00338       curr->obj->status = KResolver::InProgress;
00339     curr->worker->th = th;
00340 
00341     // move it to the currentRequests list
00342     currentRequests.append(newRequests.take());
00343 
00344     return curr;
00345       }
00346 
00347   // found nothing!
00348   return 0L;
00349 }
00350 
00351 // this function is called by KResolverThread::run
00352 void KResolverManager::releaseData(KResolverThread *, RequestData* data)
00353 {
00355   // This function is called in a worker thread!!
00357 
00358   resInit--;
00359 
00360   //qDebug("KResolverManager::releaseData(%u/%p): %p has been released", pid, 
00361 //   (void*)QThread::currentThread(), (void*)data);
00362 
00363   if (data->obj)
00364     {
00365       data->obj->status = KResolver::PostProcessing;    
00366     }
00367       
00368   data->worker->m_finished = true;
00369   data->worker->th = 0L;    // this releases the object
00370 
00371   // handle finished requests
00372   handleFinished();
00373 }
00374 
00375 // this function is called by KResolverManager::releaseData above
00376 void KResolverManager::handleFinished()
00377 {  
00378   bool redo = false;
00379   QPtrQueue<RequestData> doneRequests;
00380 
00381   mutex.lock();
00382 
00383   // loop over all items on the currently running list
00384   // we loop from the last to the first so that we catch requests with "requestors" before
00385   // we catch the requestor itself.
00386   RequestData *curr = currentRequests.last();
00387   while (curr)
00388     {
00389       if (curr->worker->th == 0L)
00390     {
00391       if (handleFinishedItem(curr))
00392         {
00393           doneRequests.enqueue(currentRequests.take());
00394           if (curr->requestor &&
00395           curr->requestor->nRequests == 0 && 
00396           curr->requestor->worker->m_finished)
00397         // there's a requestor that is now finished
00398         redo = true;
00399         }
00400     }
00401       
00402       curr = currentRequests.prev();
00403     }
00404       
00405   //qDebug("KResolverManager::handleFinished(%u): %d requests to notify", pid, doneRequests.count());
00406   while (RequestData *d = doneRequests.dequeue())
00407     doNotifying(d);
00408 
00409   mutex.unlock();
00410 
00411   if (redo)
00412     {
00413       //qDebug("KResolverManager::handleFinished(%u): restarting processing to catch requestor",
00414     //     pid);
00415       handleFinished();
00416     }
00417 }
00418 
00419 // This function is called by KResolverManager::handleFinished above
00420 bool KResolverManager::handleFinishedItem(RequestData* curr)
00421                       
00422 {
00423   // for all items that aren't currently running, remove from the list
00424   // this includes all finished or cancelled requests
00425 
00426   if (curr->worker->m_finished && curr->nRequests == 0)
00427     {
00428       // this one has finished
00429       if (curr->obj)
00430     curr->obj->status = KResolver::PostProcessing; // post-processing is run in doNotifying()
00431 
00432       if (curr->requestor)
00433     --curr->requestor->nRequests;
00434 
00435       //qDebug("KResolverManager::handleFinishedItem(%u): removing %p since it's done",
00436     //     pid, (void*)curr);
00437       return true;
00438     }
00439   return false;
00440 }
00441 
00442 
00443 
00444 void KResolverManager::registerNewWorker(KResolverWorkerFactoryBase *factory)
00445 {
00446   workerFactories.append(factory);
00447 }
00448 
00449 KResolverWorkerBase* KResolverManager::findWorker(KResolverPrivate* p)
00450 {
00452   // this function can be called on any user thread
00454 
00455   // this function is called with an unlocked mutex and it's expected to be 
00456   // thread-safe!
00457   // but the factory list is expected not to be changed asynchronously
00458 
00459   // This function is responsible for finding a suitable worker for the given
00460   // input. That means we have to do a costly operation to create each worker
00461   // class and call their preprocessing functions. The first one that
00462   // says they can process (i.e., preprocess() returns true) will get the job.
00463 
00464   KResolverWorkerBase *worker;
00465   for (KResolverWorkerFactoryBase *factory = workerFactories.first(); factory; 
00466        factory = workerFactories.next())
00467     {
00468       worker = factory->create();
00469 
00470       // set up the data the worker needs to preprocess
00471       worker->input = &p->input;
00472 
00473       if (worker->preprocess())
00474     {
00475       // good, this one says it can process
00476       if (worker->m_finished)      
00477         p->status = KResolver::PostProcessing;
00478       else
00479         p->status = KResolver::Queued;
00480       return worker;
00481     }
00482 
00483       // no, try again
00484       delete worker;
00485     }
00486 
00487   // found no worker
00488   return 0L;
00489 }
00490 
00491 void KResolverManager::doNotifying(RequestData *p)
00492 {
00494   // This function may be called on any thread
00495   // any thread at all: user threads, GUI thread, manager thread or worker thread
00497 
00498   // Notification and finalisation
00499   //
00500   // Once a request has finished the normal processing, we call the
00501   // post processing function.
00502   //
00503   // After that is done, we will consolidate all results in the object's
00504   // KResolverResults and then post an event indicating that the signal
00505   // be emitted
00506   //
00507   // In case we detect that the object is waiting for completion, we do not
00508   // post the event, for KResolver::wait will take care of emitting the
00509   // signal.
00510   //
00511   // Once we release the mutex on the object, we may no longer reference it
00512   // for it might have been deleted.
00513 
00514   // "User" objects are those that are not created by the manager. Note that
00515   // objects created by worker threads are considered "user" objects. Objects
00516   // created by the manager are those created for KResolver::resolveAsync.
00517   // We should delete them.
00518 
00519   if (p->obj)
00520     {
00521       // lock the object
00522       p->obj->mutex.lock();
00523       KResolver* parent = p->obj->parent; // is 0 for non-"user" objects
00524       KResolverResults& r = p->obj->results;
00525 
00526       if (p->obj->status == KResolver::Canceled)
00527     {
00528       p->obj->status = KResolver::Canceled;
00529       p->obj->errorcode = KResolver::Canceled;
00530       p->obj->syserror = 0;
00531       r.setError(KResolver::Canceled, 0);
00532     }
00533       else if (p->worker)
00534     {
00535       // post processing
00536       p->worker->postprocess(); // ignore the result
00537 
00538       // copy the results from the worker thread to the final
00539       // object
00540       r = p->worker->results;
00541 
00542       // reset address
00543       r.setAddress(p->input->node, p->input->service);
00544 
00545       //qDebug("KResolverManager::doNotifying(%u/%p): for %p whose status is %d and has %d results", 
00546          //pid, (void*)QThread::currentThread(), (void*)p, p->obj->status, r.count());
00547 
00548       p->obj->errorcode = r.error();
00549       p->obj->syserror = r.systemError();
00550       p->obj->status = !r.isEmpty() ? 
00551         KResolver::Success : KResolver::Failed;
00552     }
00553       else
00554     {
00555       r.empty();
00556       r.setError(p->obj->errorcode, p->obj->syserror);
00557     }
00558 
00559       // check whether there's someone waiting
00560       if (!p->obj->waiting && parent)
00561     // no, so we must post an event requesting that the signal be emitted
00562     // sorry for the C-style cast, but neither static nor reintepret cast work
00563     // here; I'd have to do two casts
00564     QApplication::postEvent(parent, new QEvent((QEvent::Type)(ResolutionCompleted)));
00565 
00566       // release the mutex
00567       p->obj->mutex.unlock();
00568     }
00569   else
00570     {
00571       // there's no object!
00572       if (p->worker)
00573     p->worker->postprocess();
00574     }
00575 
00576   delete p->worker;
00577 
00578   // ignore p->requestor and p->nRequests
00579   // they have been dealt with by the main loop
00580 
00581   delete p;
00582 
00583   // notify any objects waiting in KResolver::wait
00584   notifyWaiters.wakeAll();
00585 }
00586 
00587 // enqueue a new request
00588 // this function is called from KResolver::start and 
00589 // from KResolverWorkerBase::enqueue
00590 void KResolverManager::enqueue(KResolver *obj, RequestData *requestor)
00591 {
00592   RequestData *newrequest = new RequestData;
00593   newrequest->nRequests = 0;
00594   newrequest->obj = obj->d;
00595   newrequest->input = &obj->d->input;
00596   newrequest->requestor = requestor;
00597 
00598   // when processing a new request, find the most
00599   // suitable worker
00600   if ((newrequest->worker = findWorker(obj->d)) == 0L)
00601     {
00602       // oops, problem
00603       // cannot find a worker class for this guy
00604       obj->d->status = KResolver::Failed;
00605       obj->d->errorcode = KResolver::UnsupportedFamily;
00606       obj->d->syserror = 0;
00607 
00608       doNotifying(newrequest);
00609       return;
00610     }
00611 
00612   // no, queue it
00613   // p->status was set in findWorker!
00614   if (requestor)
00615     requestor->nRequests++;
00616 
00617   if (!newrequest->worker->m_finished)
00618     dispatch(newrequest);
00619   else if (newrequest->nRequests > 0)
00620     {
00621       mutex.lock();
00622       currentRequests.append(newrequest);
00623       mutex.unlock();
00624     }
00625   else
00626     // already done
00627     doNotifying(newrequest);
00628 }
00629 
00630 // a new request has been created
00631 // dispatch it
00632 void KResolverManager::dispatch(RequestData *data)
00633 {
00634   // As stated in the beginning of the file, this function
00635   // is supposed to verify the availability of threads, start
00636   // any if necessary
00637 
00638   QMutexLocker locker(&mutex);
00639 
00640   // add to the queue
00641   newRequests.append(data);
00642 
00643   // check if we need to start a new thread
00644   //
00645   // we depend on the variables availableThreads and runningThreads to
00646   // know if we are supposed to start any threads:
00647   // - if availableThreads > 0, then there is at least one thread waiting,
00648   //    blocked in KResolverManager::requestData. It can't unblock
00649   //    while we are holding the mutex locked, therefore we are sure that
00650   //    our event will be handled
00651   // - if availableThreads == 0:
00652   //   - if runningThreads < maxThreads
00653   //     we will start a new thread, which will certainly block in
00654   //     KResolverManager::requestData because we are holding the mutex locked
00655   //   - if runningThreads == maxThreads
00656   //     This situation generally means that we have already maxThreads running
00657   //     and that all of them are processing. We will not start any new threads,
00658   //     but will instead wait for one to finish processing and request new data
00659   //
00660   //     There's a possible race condition here, which goes unhandled: if one of
00661   //     threads has timed out waiting for new data and is in the process of
00662   //     exiting. In that case, availableThreads == 0 and runningThreads will not
00663   //     have decremented yet. This means that we will not start a new thread
00664   //     that we could have. However, since there are other threads working, our
00665   //     event should be handled soon.
00666   //     It won't be handled if and only if ALL threads are in the process of 
00667   //     exiting. That situation is EXTREMELY unlikely and is not handled either.
00668   //
00669   if (availableThreads == 0 && runningThreads < maxThreads)
00670     {
00671       // yes, a new thread should be started
00672 
00673       // find if there's a finished one
00674       KResolverThread *th = workers.first();
00675       while (th && th->running())
00676     th = workers.next();
00677 
00678       if (th == 0L)
00679     // no, create one
00680     th = new KResolverThread;
00681       else
00682     workers.take();
00683 
00684       th->start();
00685       workers.append(th);
00686       runningThreads++;
00687     }
00688 
00689   feedWorkers.wakeAll();
00690 
00691   // clean up idle threads
00692   workers.first();
00693   while (workers.current())
00694     {
00695       if (!workers.current()->running())
00696     workers.remove();
00697       else
00698     workers.next();
00699     }
00700 }
00701 
00702 // this function is called by KResolverManager::dequeue
00703 bool KResolverManager::dequeueNew(KResolver* obj)
00704 {
00705   // This function must be called with a locked mutex
00706   // Deadlock warning:
00707   // always lock the global mutex first if both mutexes must be locked
00708 
00709   KResolverPrivate *d = obj->d;
00710 
00711   // check if it's in the new request list
00712   RequestData *curr = newRequests.first(); 
00713   while (curr)
00714     if (curr->obj == d)
00715       {
00716     // yes, this object is still in the list
00717     // but it has never been processed
00718     d->status = KResolver::Canceled;
00719     d->errorcode = KResolver::Canceled;
00720     d->syserror = 0;
00721     newRequests.take();
00722 
00723     delete curr->worker;
00724     delete curr;
00725     
00726     return true;
00727       }
00728     else
00729       curr = newRequests.next();
00730 
00731   // check if it's running
00732   curr = currentRequests.first();
00733   while (curr)
00734     if (curr->obj == d)
00735       {
00736     // it's running. We cannot simply take it out of the list.
00737     // it will be handled when the thread that is working on it finishes
00738     d->mutex.lock();
00739 
00740     d->status = KResolver::Canceled;
00741     d->errorcode = KResolver::Canceled;
00742     d->syserror = 0;
00743 
00744     // disengage from the running threads
00745     curr->obj = 0L;
00746     curr->input = 0L;
00747     if (curr->worker)
00748       curr->worker->input = 0L;
00749 
00750     d->mutex.unlock();
00751       }
00752     else
00753       curr = currentRequests.next();
00754 
00755   return false;
00756 }
00757 
00758 // this function is called by KResolver::cancel
00759 // it's expected to be thread-safe
00760 void KResolverManager::dequeue(KResolver *obj)
00761 {
00762   QMutexLocker locker(&mutex);
00763   dequeueNew(obj);
00764 }
KDE Logo
This file is part of the documentation for kdecore Library Version 3.3.2.
Documentation copyright © 1996-2004 the KDE developers.
Generated on Sun Jan 15 13:32:30 2006 by doxygen 1.4.2 written by Dimitri van Heesch, © 1997-2003