From 8267fe24f77f6982b4d62d8d5d43cd1efc907136 Mon Sep 17 00:00:00 2001 From: Arch Librarian Date: Mon, 20 Sep 2004 16:51:22 +0000 Subject: More or less working acquire system Author: jgg Date: 1998-11-09 01:09:19 GMT More or less working acquire system --- apt-pkg/acquire-item.cc | 59 +++++++++++++++++++++++++++++++++++++------ apt-pkg/acquire-item.h | 11 +++++--- apt-pkg/acquire-method.cc | 4 +-- apt-pkg/acquire-worker.cc | 55 ++++++++++++++++++++++++++++++++++++++-- apt-pkg/acquire-worker.h | 11 +++++--- apt-pkg/acquire.cc | 64 +++++++++++++++++++++++++++++++---------------- apt-pkg/acquire.h | 64 ++++++++++++++++++++++++++++++++++++++--------- 7 files changed, 215 insertions(+), 53 deletions(-) diff --git a/apt-pkg/acquire-item.cc b/apt-pkg/acquire-item.cc index 9f9d082fe..ce8c9d44c 100644 --- a/apt-pkg/acquire-item.cc +++ b/apt-pkg/acquire-item.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-item.cc,v 1.7 1998/11/05 07:21:35 jgg Exp $ +// $Id: acquire-item.cc,v 1.8 1998/11/09 01:09:19 jgg Exp $ /* ###################################################################### Acquire Item - Item to acquire @@ -30,7 +30,8 @@ // Acquire::Item::Item - Constructor /*{{{*/ // --------------------------------------------------------------------- /* */ -pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), QueueCounter(0) +pkgAcquire::Item::Item(pkgAcquire *Owner) : Owner(Owner), FileSize(0), + Complete(false), QueueCounter(0) { Owner->Add(this); Status = StatIdle; @@ -59,6 +60,16 @@ void pkgAcquire::Item::Failed(string Message) } } /*}}}*/ +// Acquire::Item::Start - Item has begun to download /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Item::Start(string Message,unsigned long Size) +{ + Status = StatFetching; + if (FileSize == 0 && Complete == false) + FileSize = Size; +} + /*}}}*/ // Acquire::Item::Done - Item downloaded OK /*{{{*/ // --------------------------------------------------------------------- /* */ @@ -98,8 +109,19 @@ pkgAcqIndex::pkgAcqIndex(pkgAcquire *Owner,const pkgSourceList::Item *Location) DestFile = _config->FindDir("Dir::State::lists") + "partial/"; DestFile += URItoFileName(Location->PackagesURI()); - - QueueURI(Location->PackagesURI() + ".gz",Location->PackagesInfo()); + + // Create the item + Desc.URI = Location->PackagesURI() + ".gz"; + Desc.Description = Location->PackagesInfo(); + Desc.Owner = this; + + // Set the short description to the archive component + if (Location->Dist[Location->Dist.size() - 1] == '/') + Desc.ShortDesc = Location->Dist; + else + Desc.ShortDesc = Location->Dist + '/' + Location->Section; + + QueueURI(Desc); // Create the Release fetch class new pkgAcqIndexRel(Owner,Location); @@ -149,6 +171,7 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5) } Erase = false; + Complete = true; // Handle the unzipd case string FileName = LookupTag(Message,"Alt-Filename"); @@ -159,8 +182,10 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5) return; Decompression = true; + FileSize = 0; DestFile += ".decomp"; - QueueURI("copy:" + FileName,string()); + Desc.URI = "copy:" + FileName; + QueueURI(Desc); return; } @@ -177,10 +202,13 @@ void pkgAcqIndex::Done(string Message,unsigned long Size,string MD5) if (FileName == DestFile) Erase = true; + else + FileSize = 0; Decompression = true; DestFile += ".decomp"; - QueueURI("gzip:" + FileName,string()); + Desc.URI = "gzip:" + FileName,Location->PackagesInfo(); + QueueURI(Desc); } /*}}}*/ @@ -194,7 +222,18 @@ pkgAcqIndexRel::pkgAcqIndexRel(pkgAcquire *Owner, DestFile = _config->FindDir("Dir::State::lists") + "partial/"; DestFile += URItoFileName(Location->ReleaseURI()); - QueueURI(Location->ReleaseURI(),Location->ReleaseInfo()); + // Create the item + Desc.URI = Location->ReleaseURI(); + Desc.Description = Location->ReleaseInfo(); + Desc.Owner = this; + + // Set the short description to the archive component + if (Location->Dist[Location->Dist.size() - 1] == '/') + Desc.ShortDesc = Location->Dist; + else + Desc.ShortDesc = Location->Dist + '/' + Location->Section; + + QueueURI(Desc); } /*}}}*/ // AcqIndexRel::Custom600Headers - Insert custom request headers /*{{{*/ @@ -229,6 +268,8 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5) return; } + Complete = true; + // The files timestamp matches if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true) return; @@ -236,7 +277,9 @@ void pkgAcqIndexRel::Done(string Message,unsigned long Size,string MD5) // We have to copy it into place if (FileName != DestFile) { - QueueURI("copy:" + FileName,string()); + FileSize = 0; + Desc.URI = "copy:" + FileName; + QueueURI(Desc); return; } diff --git a/apt-pkg/acquire-item.h b/apt-pkg/acquire-item.h index 9ead29c9a..d50efc04b 100644 --- a/apt-pkg/acquire-item.h +++ b/apt-pkg/acquire-item.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-item.h,v 1.5 1998/11/05 07:21:36 jgg Exp $ +// $Id: acquire-item.h,v 1.6 1998/11/09 01:09:21 jgg Exp $ /* ###################################################################### Acquire Item - Item to acquire @@ -30,8 +30,8 @@ class pkgAcquire::Item protected: pkgAcquire *Owner; - inline void QueueURI(string URI,string Description) - {Owner->Enqueue(this,URI,Description);}; + inline void QueueURI(ItemDesc &Item) + {Owner->Enqueue(Item);}; void Rename(string From,string To); @@ -40,6 +40,8 @@ class pkgAcquire::Item // State of the item enum {StatIdle, StatFetching, StatDone, StatError} Status; string ErrorText; + unsigned long FileSize; + bool Complete; // Number of queues we are inserted into unsigned int QueueCounter; @@ -49,6 +51,7 @@ class pkgAcquire::Item virtual void Failed(string Message); virtual void Done(string Message,unsigned long Size,string Md5Hash); + virtual void Start(string Message,unsigned long Size); virtual string Custom600Headers() {return string();}; @@ -64,6 +67,7 @@ class pkgAcqIndex : public pkgAcquire::Item const pkgSourceList::Item *Location; bool Decompression; bool Erase; + pkgAcquire::ItemDesc Desc; public: @@ -79,6 +83,7 @@ class pkgAcqIndexRel : public pkgAcquire::Item protected: const pkgSourceList::Item *Location; + pkgAcquire::ItemDesc Desc; public: diff --git a/apt-pkg/acquire-method.cc b/apt-pkg/acquire-method.cc index 681015910..f1a247788 100644 --- a/apt-pkg/acquire-method.cc +++ b/apt-pkg/acquire-method.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-method.cc,v 1.4 1998/11/05 07:21:38 jgg Exp $ +// $Id: acquire-method.cc,v 1.5 1998/11/09 01:09:22 jgg Exp $ /* ###################################################################### Acquire Method @@ -314,7 +314,7 @@ void pkgAcqMethod::Status(const char *Format,...) // sprintf the description char S[1024]; - unsigned int Len = snprintf(S,sizeof(S),"101 Status\nURI: %s\n" + unsigned int Len = snprintf(S,sizeof(S),"102 Status\nURI: %s\n" "Message: ",CurrentURI.c_str()); vsnprintf(S+Len,sizeof(S)-Len,Format,args); diff --git a/apt-pkg/acquire-worker.cc b/apt-pkg/acquire-worker.cc index 2cbab7720..718944d73 100644 --- a/apt-pkg/acquire-worker.cc +++ b/apt-pkg/acquire-worker.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-worker.cc,v 1.10 1998/11/05 07:21:39 jgg Exp $ +// $Id: acquire-worker.cc,v 1.11 1998/11/09 01:09:23 jgg Exp $ /* ###################################################################### Acquire Worker @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -30,7 +31,8 @@ // Worker::Worker - Constructor for Queue startup /*{{{*/ // --------------------------------------------------------------------- /* */ -pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf) +pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf, + pkgAcquireStatus *Log) : Log(Log) { OwnerQ = Q; Config = Cnf; @@ -221,10 +223,15 @@ bool pkgAcquire::Worker::RunMessages() _error->Error("Method gave invalid 200 URI Start message"); break; } + CurrentItem = Itm; CurrentSize = 0; TotalSize = atoi(LookupTag(Message,"Size","0").c_str()); + Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str())); + if (Log != 0) + Log->Fetch(*Itm); + break; } @@ -238,9 +245,21 @@ bool pkgAcquire::Worker::RunMessages() } pkgAcquire::Item *Owner = Itm->Owner; + pkgAcquire::ItemDesc Desc = *Itm; OwnerQ->ItemDone(Itm); Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()), LookupTag(Message,"MD5-Hash")); + ItemDone(); + + // Log that we are done + if (Log != 0) + { + if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true || + StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true) + Log->IMSHit(Desc); + else + Log->Done(Desc); + } break; } @@ -254,8 +273,14 @@ bool pkgAcquire::Worker::RunMessages() } pkgAcquire::Item *Owner = Itm->Owner; + pkgAcquire::ItemDesc Desc = *Itm; OwnerQ->ItemDone(Itm); Owner->Failed(Message); + ItemDone(); + + if (Log != 0) + Log->Fail(Desc); + break; } @@ -419,3 +444,29 @@ bool pkgAcquire::Worker::MethodFailure() return false; } /*}}}*/ +// Worker::Pulse - Called periodically /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Worker::Pulse() +{ + if (CurrentItem == 0) + return; + + + struct stat Buf; + if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0) + return; + CurrentSize = Buf.st_size; +} + /*}}}*/ +// Worker::ItemDone - Called when the current item is finished /*{{{*/ +// --------------------------------------------------------------------- +/* */ +void pkgAcquire::Worker::ItemDone() +{ + CurrentItem = 0; + CurrentSize = 0; + TotalSize = 0; + Status = string(); +} + /*}}}*/ diff --git a/apt-pkg/acquire-worker.h b/apt-pkg/acquire-worker.h index ad1ea9165..95ba340ac 100644 --- a/apt-pkg/acquire-worker.h +++ b/apt-pkg/acquire-worker.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire-worker.h,v 1.6 1998/10/30 07:53:36 jgg Exp $ +// $Id: acquire-worker.h,v 1.7 1998/11/09 01:09:24 jgg Exp $ /* ###################################################################### Acquire Worker - Worker process manager @@ -33,6 +33,7 @@ class pkgAcquire::Worker // The access association Queue *OwnerQ; + pkgAcquireStatus *Log; MethodConfig *Config; string Access; @@ -62,6 +63,7 @@ class pkgAcquire::Worker bool SendConfiguration(); bool MethodFailure(); + void ItemDone(); public: @@ -70,12 +72,13 @@ class pkgAcquire::Worker string Status; unsigned long CurrentSize; unsigned long TotalSize; - + // Load the method and do the startup bool QueueItem(pkgAcquire::Queue::QItem *Item); - bool Start(); + bool Start(); + void Pulse(); - Worker(Queue *OwnerQ,MethodConfig *Config); + Worker(Queue *OwnerQ,MethodConfig *Config,pkgAcquireStatus *Log); Worker(MethodConfig *Config); ~Worker(); }; diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc index ed64c5e5f..e5972d5e9 100644 --- a/apt-pkg/acquire.cc +++ b/apt-pkg/acquire.cc @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire.cc,v 1.9 1998/11/06 02:52:20 jgg Exp $ +// $Id: acquire.cc,v 1.10 1998/11/09 01:09:25 jgg Exp $ /* ###################################################################### Acquire - File Acquiration @@ -22,12 +22,14 @@ #include #include #include + +#include /*}}}*/ // Acquire::pkgAcquire - Constructor /*{{{*/ // --------------------------------------------------------------------- /* We grab some runtime state from the configuration space */ -pkgAcquire::pkgAcquire() +pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log) { Queues = 0; Configs = 0; @@ -85,7 +87,7 @@ void pkgAcquire::Remove(Item *Itm) { if (*I == Itm) Items.erase(I); - } + } } /*}}}*/ // Acquire::Add - Add a worker /*{{{*/ @@ -124,10 +126,10 @@ void pkgAcquire::Remove(Worker *Work) it is construction which creates a queue (based on the current queue mode) and puts the item in that queue. If the system is running then the queue might be started. */ -void pkgAcquire::Enqueue(Item *Itm,string URI,string Description) +void pkgAcquire::Enqueue(ItemDesc &Item) { // Determine which queue to put the item in - string Name = QueueName(URI); + string Name = QueueName(Item.URI); if (Name.empty() == true) return; @@ -144,18 +146,18 @@ void pkgAcquire::Enqueue(Item *Itm,string URI,string Description) I->Startup(); } - Itm->Status = Item::StatIdle; + Item.Owner->Status = Item::StatIdle; // Queue it into the named queue - I->Enqueue(Itm,URI,Description); + I->Enqueue(Item); ToFetch++; // Some trace stuff if (Debug == true) { - clog << "Fetching " << URI << endl; - clog << " to " << Itm->DestFile << endl; - clog << " Queue is: " << QueueName(URI) << endl; + clog << "Fetching " << Item.URI << endl; + clog << " to " << Item.Owner->DestFile << endl; + clog << " Queue is: " << QueueName(Item.URI) << endl; } } /*}}}*/ @@ -275,6 +277,9 @@ bool pkgAcquire::Run() I->Startup(); // Run till all things have been acquired + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 500000; while (ToFetch > 0) { fd_set RFds; @@ -284,15 +289,26 @@ bool pkgAcquire::Run() FD_ZERO(&WFds); SetFds(Highest,&RFds,&WFds); - if (select(Highest+1,&RFds,&WFds,0,0) <= 0) + int Res = select(Highest+1,&RFds,&WFds,0,&tv); + if (Res < 0) { - Running = false; - return _error->Errno("select","Select has failed"); + _error->Errno("select","Select has failed"); + break; } RunFds(&RFds,&WFds); if (_error->PendingError() == true) break; + + // Timeout, notify the log class + if (Res == 0 || (Log != 0 && Log->Update == true)) + { + tv.tv_usec = 500000; + for (Worker *I = Workers; I != 0; I = I->NextAcquire) + I->Pulse(); + if (Log != 0) + Log->Pulse(this); + } } // Shut down the acquire bits @@ -313,6 +329,14 @@ void pkgAcquire::Bump() I->Bump(); } /*}}}*/ +// Acquire::WorkerStep - Step to the next worker /*{{{*/ +// --------------------------------------------------------------------- +/* Not inlined to advoid including acquire-worker.h */ +pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I) +{ + return I->NextAcquire; +}; + /*}}}*/ // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/ // --------------------------------------------------------------------- @@ -356,19 +380,15 @@ pkgAcquire::Queue::~Queue() // Queue::Enqueue - Queue an item to the queue /*{{{*/ // --------------------------------------------------------------------- /* */ -void pkgAcquire::Queue::Enqueue(Item *Owner,string URI,string Description) +void pkgAcquire::Queue::Enqueue(ItemDesc &Item) { // Create a new item - QItem *I = new QItem; + QItem *I = new QItem; I->Next = Items; Items = I; + *I = Item; - // Fill it in - Items->Owner = Owner; - Items->URI = URI; - Items->Description = Description; - Owner->QueueCounter++; - + Item.Owner->QueueCounter++; if (Items->Next == 0) Cycle(); } @@ -410,7 +430,7 @@ bool pkgAcquire::Queue::Startup() if (Cnf == 0) return false; - Workers = new Worker(this,Cnf); + Workers = new Worker(this,Cnf,Owner->Log); Owner->Add(Workers); if (Workers->Start() == false) return false; diff --git a/apt-pkg/acquire.h b/apt-pkg/acquire.h index a4ea45bff..0d8803c1d 100644 --- a/apt-pkg/acquire.h +++ b/apt-pkg/acquire.h @@ -1,6 +1,6 @@ // -*- mode: cpp; mode: fold -*- // Description /*{{{*/ -// $Id: acquire.h,v 1.8 1998/11/05 07:21:41 jgg Exp $ +// $Id: acquire.h,v 1.9 1998/11/09 01:09:26 jgg Exp $ /* ###################################################################### Acquire - File Acquiration @@ -41,6 +41,7 @@ #include +class pkgAcquireStatus; class pkgAcquire { public: @@ -49,6 +50,7 @@ class pkgAcquire class Queue; class Worker; struct MethodConfig; + struct ItemDesc; friend Item; friend Queue; @@ -61,8 +63,9 @@ class pkgAcquire Queue *Queues; Worker *Workers; MethodConfig *Configs; + pkgAcquireStatus *Log; unsigned long ToFetch; - + // Configurable parameters for the schedular enum {QueueHost,QueueAccess} QueueMode; bool Debug; @@ -73,7 +76,7 @@ class pkgAcquire void Add(Worker *Work); void Remove(Worker *Work); - void Enqueue(Item *Item,string URI,string Description); + void Enqueue(ItemDesc &Item); void Dequeue(Item *Item); string QueueName(string URI); @@ -88,11 +91,26 @@ class pkgAcquire MethodConfig *GetConfig(string Access); bool Run(); + + // Simple iteration mechanism + inline Worker *WorkersBegin() {return Workers;}; + Worker *WorkerStep(Worker *I); + inline Item **ItemsBegin() {return Items.begin();}; + inline Item **ItemsEnd() {return Items.end();}; - pkgAcquire(); + pkgAcquire(pkgAcquireStatus *Log = 0); ~pkgAcquire(); }; +// Description of an Item+URI +struct pkgAcquire::ItemDesc +{ + string URI; + string Description; + string ShortDesc; + Item *Owner; +}; + // List of possible items queued for download. class pkgAcquire::Queue { @@ -102,15 +120,19 @@ class pkgAcquire::Queue protected: // Queued item - struct QItem + struct QItem : pkgAcquire::ItemDesc { - QItem *Next; - - string URI; - string Description; - Item *Owner; + QItem *Next; pkgAcquire::Worker *Worker; - }; + + void operator =(pkgAcquire::ItemDesc const &I) + { + URI = I.URI; + Description = I.Description; + ShortDesc = I.ShortDesc; + Owner = I.Owner; + }; + }; // Name of the queue string Name; @@ -123,11 +145,12 @@ class pkgAcquire::Queue public: // Put an item into this queue - void Enqueue(Item *Owner,string URI,string Description); + void Enqueue(ItemDesc &Item); bool Dequeue(Item *Owner); // Find a Queued item QItem *FindItem(string URI,pkgAcquire::Worker *Owner); + bool ItemStart(QItem *Itm,unsigned long Size); bool ItemDone(QItem *Itm); bool Startup(); @@ -155,4 +178,21 @@ struct pkgAcquire::MethodConfig MethodConfig(); }; +class pkgAcquireStatus +{ + public: + + bool Update; + + // Each of these is called by the workers when an event occures + virtual void IMSHit(pkgAcquire::ItemDesc &Itm) {}; + virtual void Fetch(pkgAcquire::ItemDesc &Itm) {}; + virtual void Done(pkgAcquire::ItemDesc &Itm) {}; + virtual void Fail(pkgAcquire::ItemDesc &Itm) {}; + virtual void Pulse(pkgAcquire *Owner) {}; + + pkgAcquireStatus() : Update(false) {}; + virtual ~pkgAcquireStatus() {}; +}; + #endif -- cgit v1.2.3