// -*- mode: cpp; mode: fold -*- // Description /*{{{*/ // $Id: acquire.cc,v 1.6 1998/10/30 07:53:37 jgg Exp $ /* ###################################################################### Acquire - File Acquiration The core element for the schedual system is the concept of a named queue. Each queue is unique and each queue has a name derived from the URI. The degree of paralization can be controled by how the queue name is derived from the URI. ##################################################################### */ /*}}}*/ // Include Files /*{{{*/ #ifdef __GNUG__ #pragma implementation "apt-pkg/acquire.h" #endif #include #include #include #include #include #include /*}}}*/ // Acquire::pkgAcquire - Constructor /*{{{*/ // --------------------------------------------------------------------- /* We grab some runtime state from the configuration space */ pkgAcquire::pkgAcquire() { Queues = 0; Configs = 0; Workers = 0; ToFetch = 0; Running = false; string Mode = _config->Find("Acquire::Queue-Mode","host"); if (strcasecmp(Mode.c_str(),"host") == 0) QueueMode = QueueHost; if (strcasecmp(Mode.c_str(),"access") == 0) QueueMode = QueueAccess; Debug = _config->FindB("Debug::pkgAcquire",false); } /*}}}*/ // Acquire::~pkgAcquire - Destructor /*{{{*/ // --------------------------------------------------------------------- /* Free our memory, clean up the queues (destroy the workers) */ pkgAcquire::~pkgAcquire() { while (Items.size() != 0) delete Items[0]; while (Configs != 0) { MethodConfig *Jnk = Configs; Configs = Configs->Next; delete Jnk; } while (Queues != 0) { Queue *Jnk = Queues; Queues = Queues->Next; delete Jnk; } } /*}}}*/ // Acquire::Add - Add a new item /*{{{*/ // --------------------------------------------------------------------- /* This puts an item on the acquire list. This list is mainly for tracking item status */ void pkgAcquire::Add(Item *Itm) { Items.push_back(Itm); } /*}}}*/ // Acquire::Remove - Remove a item /*{{{*/ // --------------------------------------------------------------------- /* Remove an item from the acquire list. This is usually not used.. */ void pkgAcquire::Remove(Item *Itm) { for (vector::iterator I = Items.begin(); I < Items.end(); I++) { if (*I == Itm) Items.erase(I); } } /*}}}*/ // Acquire::Add - Add a worker /*{{{*/ // --------------------------------------------------------------------- /* A list of workers is kept so that the select loop can direct their FD usage. */ void pkgAcquire::Add(Worker *Work) { Work->NextAcquire = Workers; Workers = Work; } /*}}}*/ // Acquire::Remove - Remove a worker /*{{{*/ // --------------------------------------------------------------------- /* A worker has died. This can not be done while the select loop is running as it would require that RunFds could handling a changing list state and it cant.. */ void pkgAcquire::Remove(Worker *Work) { if (Running == true) abort(); Worker **I = &Workers; for (; *I != 0;) { if (*I == Work) *I = (*I)->NextAcquire; else I = &(*I)->NextAcquire; } } /*}}}*/ // Acquire::Enqueue - Queue an URI for fetching /*{{{*/ // --------------------------------------------------------------------- /* This is the entry point for an item. An item calls this function when it is construction which creates a queue (based on the current queue mode) and puts the item in that queue. If the system is running then the queue might be started. */ void pkgAcquire::Enqueue(Item *Itm,string URI,string Description) { // Determine which queue to put the item in string Name = QueueName(URI); if (Name.empty() == true) return; // Find the queue structure Queue *I = Queues; for (; I != 0 && I->Name != Name; I = I->Next); if (I == 0) { I = new Queue(Name,this); I->Next = Queues; Queues = I; if (Running == true) I->Startup(); } // Queue it into the named queue I->Enqueue(Itm,URI,Description); ToFetch++; // Some trace stuff if (Debug == true) { clog << "Fetching " << URI << endl; clog << " to " << Itm->DestFile << endl; clog << " Queue is: " << QueueName(URI) << endl; } } /*}}}*/ // Acquire::Dequeue - Remove an item from all queues /*{{{*/ // --------------------------------------------------------------------- /* This is called when an item is finished being fetched. It removes it from all the queues */ void pkgAcquire::Dequeue(Item *Itm) { Queue *I = Queues; for (; I != 0; I = I->Next) I->Dequeue(Itm); if (Debug == true) clog << "Dequeuing " << Itm->DestFile << endl; ToFetch--; } /*}}}*/ // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/ // --------------------------------------------------------------------- /* The string returned depends on the configuration settings and the method parameters. Given something like http://foo.org/bar it can return http://foo.org or http */ string pkgAcquire::QueueName(string Uri) { URI U(Uri); const MethodConfig *Config = GetConfig(U.Access); if (Config == 0) return string(); /* Single-Instance methods get exactly one queue per URI. This is also used for the Access queue method */ if (Config->SingleInstance == true || QueueMode == QueueAccess) return U.Access; return U.Access + ':' + U.Host; } /*}}}*/ // Acquire::GetConfig - Fetch the configuration information /*{{{*/ // --------------------------------------------------------------------- /* This locates the configuration structure for an access method. If a config structure cannot be found a Worker will be created to retrieve it */ pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access) { // Search for an existing config MethodConfig *Conf; for (Conf = Configs; Conf != 0; Conf = Conf->Next) if (Conf->Access == Access) return Conf; // Create the new config class Conf = new MethodConfig; Conf->Access = Access; Conf->Next = Configs; Configs = Conf; // Create the worker to fetch the configuration Worker Work(Conf); if (Work.Start() == false) return 0; return Conf; } /*}}}*/ // Acquire::SetFds - Deal with readable FDs /*{{{*/ // --------------------------------------------------------------------- /* Collect FDs that have activity monitors into the fd sets */ void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet) { for (Worker *I = Workers; I != 0; I = I->NextAcquire) { if (I->InReady == true && I->InFd >= 0) { if (Fd < I->InFd) Fd = I->InFd; FD_SET(I->InFd,RSet); } if (I->OutReady == true && I->OutFd >= 0) { if (Fd < I->OutFd) Fd = I->OutFd; FD_SET(I->OutFd,WSet); } } } /*}}}*/ // Acquire::RunFds - Deal with active FDs /*{{{*/ // --------------------------------------------------------------------- /* Dispatch active FDs over to the proper workers. It is very important that a worker never be erased while this is running! The queue class should never erase a worker except during shutdown processing. */ void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet) { for (Worker *I = Workers; I != 0; I = I->NextAcquire) { if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0) I->InFdReady(); if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0) I->OutFdReady(); } } /*}}}*/ // Acquire::Run - Run the fetch sequence /*{{{*/ // --------------------------------------------------------------------- /* This runs the queues. It manages a select loop for all of the Worker tasks. The workers interact with the queues and items to manage the actual fetch. */ bool pkgAcquire::Run() { Running = true; for (Queue *I = Queues; I != 0; I = I->Next) I->Startup(); // Run till all things have been acquired while (ToFetch > 0) { fd_set RFds; fd_set WFds; int Highest = 0; FD_ZERO(&RFds); FD_ZERO(&WFds); SetFds(Highest,&RFds,&WFds); if (select(Highest+1,&RFds,&WFds,0,0) <= 0) { Running = false; return _error->Errno("select","Select has failed"); } RunFds(&RFds,&WFds); if (_error->PendingError() == true) break; } for (Queue *I = Queues; I != 0; I = I->Next) I->Shutdown(); Running = false; return _error->PendingError(); } /*}}}*/ // pkgAcquire::Bump - Called when an item is dequeued /*{{{*/ // --------------------------------------------------------------------- /* This routine bumps idle queues in hopes that they will be able to fetch the dequeued item */ void pkgAcquire::Bump() { } /*}}}*/ // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/ // --------------------------------------------------------------------- /* */ pkgAcquire::MethodConfig::MethodConfig() { SingleInstance = false; PreScan = false; Pipeline = false; SendConfig = false; Next = 0; } /*}}}*/ // Queue::Queue - Constructor /*{{{*/ // --------------------------------------------------------------------- /* */ pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name), Owner(Owner) { Items = 0; Next = 0; Workers = 0; } /*}}}*/ // Queue::~Queue - Destructor /*{{{*/ // --------------------------------------------------------------------- /* */ pkgAcquire::Queue::~Queue() { Shutdown(); while (Items != 0) { QItem *Jnk = Items; Items = Items->Next; delete Jnk; } } /*}}}*/ // Queue::Enqueue - Queue an item to the queue /*{{{*/ // --------------------------------------------------------------------- /* */ void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description) { // Create a new item QItem *I = new QItem; I->Next = Items; Items = I; // Fill it in Items->Owner = Owner; Items->URI = URI; Items->Description = Description; Owner->QueueCounter++; if (Items->Next == 0) Cycle(); } /*}}}*/ // Queue::Dequeue - Remove an item from the queue /*{{{*/ // --------------------------------------------------------------------- /* */ void pkgAcquire::Queue::Dequeue(Item *Owner) { QItem **I = &Items; for (; *I != 0;) { if ((*I)->Owner == Owner) { QItem *Jnk= *I; *I = (*I)->Next; Owner->QueueCounter--; delete Jnk; } else I = &(*I)->Next; } } /*}}}*/ // Queue::Startup - Start the worker processes /*{{{*/ // --------------------------------------------------------------------- /* */ bool pkgAcquire::Queue::Startup() { Shutdown(); URI U(Name); pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access); if (Cnf == 0) return false; Workers = new Worker(this,Cnf); Owner->Add(Workers); if (Workers->Start() == false) return false; return Cycle(); } /*}}}*/ // Queue::Shutdown - Shutdown the worker processes /*{{{*/ // --------------------------------------------------------------------- /* */ bool pkgAcquire::Queue::Shutdown() { // Delete all of the workers while (Workers != 0) { pkgAcquire::Worker *Jnk = Workers; Workers = Workers->NextQueue; Owner->Remove(Jnk); delete Jnk; } return true; } /*}}}*/ // Queue::Finditem - Find a URI in the item list /*{{{*/ // --------------------------------------------------------------------- /* */ pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner) { for (QItem *I = Items; I != 0; I = I->Next) if (I->URI == URI && I->Worker == Owner) return I; return 0; } /*}}}*/ // Queue::ItemDone - Item has been completed /*{{{*/ // --------------------------------------------------------------------- /* The worker signals this which causes the item to be removed from the queue. If this is the last queue instance then it is removed from the main queue too.*/ bool pkgAcquire::Queue::ItemDone(QItem *Itm) { if (Itm->Owner->QueueCounter <= 1) Owner->Dequeue(Itm->Owner); else { Dequeue(Itm->Owner); Owner->Bump(); } return Cycle(); } /*}}}*/ // Queue::Cycle - Queue new items into the method /*{{{*/ // --------------------------------------------------------------------- /* This locates a new idle item and sends it to the worker */ bool pkgAcquire::Queue::Cycle() { if (Items == 0 || Workers == 0) return true; // Look for a queable item QItem *I = Items; for (; I != 0; I = I->Next) if (I->Owner->Status == pkgAcquire::Item::StatIdle) break; // Nothing to do, queue is idle. if (I == 0) return true; I->Worker = Workers; I->Owner->Status = pkgAcquire::Item::StatFetching; return Workers->QueueItem(I); } /*}}}*/