diff options
author | David Kalnischkies <david@kalnischkies.de> | 2016-04-06 01:08:57 +0200 |
---|---|---|
committer | David Kalnischkies <david@kalnischkies.de> | 2016-04-07 13:48:31 +0200 |
commit | 38f8704e419ed93f433129e20df5611df6652620 (patch) | |
tree | b49599c1b86bb22109fc1f68468e385baf6d9ce0 /apt-pkg/acquire-worker.cc | |
parent | 57f16d51f4158dce1a49f6d5f5f05f057125b871 (diff) |
stop handling items in doomed transactions
With the previous commit we track the state of transactions, so we can
now use our knowledge to avoid processing data for a transaction which
was already closed (via an abort in this case).
This is needed as multiple independent processes are interacting in the
process, so there isn't a simple immediate full-engine stop and it would
also be bad to teach each and every item how to check if its manager
has failed subordinate and what to do in that case.
In the pdiff case, which deals (potentially) with many items during its
lifetime e.g. a hashsum mismatch in another file can abort the
transaction the file we try to patch via pdiff belongs to. This causes
some of the items (which are already done) to be aborted with it, but
items still in the process of acquisition continue in the processing and
will later try to use all the items together failing in strange ways as
cleanup already happened.
The chosen solution is to dry up the communication channels instead by
ignoring new requests for data acquisition, canceling requests which are
not assigned to a queue and not calling Done/Failed on items anymore.
This means that e.g. already started or pending (e.g. pipelined)
downloads aren't stopped and continue as normal for now, but they remain
in partial/ and aren't processed further so the next update command will
pick them up and put them to good use while the current process fails
updating (for this transaction group) in an orderly fashion.
Closes: 817240
Thanks: Barr Detwix & Vincent Lefevre for log files
Diffstat (limited to 'apt-pkg/acquire-worker.cc')
-rw-r--r-- | apt-pkg/acquire-worker.cc | 115 |
1 files changed, 63 insertions, 52 deletions
diff --git a/apt-pkg/acquire-worker.cc b/apt-pkg/acquire-worker.cc index f901847f7..ca1fd4836 100644 --- a/apt-pkg/acquire-worker.cc +++ b/apt-pkg/acquire-worker.cc @@ -172,6 +172,25 @@ bool pkgAcquire::Worker::ReadMessages() // --------------------------------------------------------------------- /* This takes the messages from the message queue and runs them through the parsers in order. */ +enum class APT_HIDDEN MessageType { + CAPABILITIES = 100, + LOG = 101, + STATUS = 102, + REDIRECT = 103, + WARNING = 104, + URI_START = 200, + URI_DONE = 201, + URI_FAILURE = 400, + GENERAL_FAILURE = 401, + MEDIA_CHANGE = 403 +}; +static bool isDoomedItem(pkgAcquire::Item const * const Itm) +{ + auto const TransItm = dynamic_cast<pkgAcqTransactionItem const * const>(Itm); + if (TransItm == nullptr) + return false; + return TransItm->TransactionManager->State != pkgAcqTransactionItem::TransactionStarted; +} bool pkgAcquire::Worker::RunMessages() { while (MessageQueue.empty() == false) @@ -184,7 +203,7 @@ bool pkgAcquire::Worker::RunMessages() // Fetch the message number char *End; - int Number = strtol(Message.c_str(),&End,10); + MessageType const Number = static_cast<MessageType>(strtoul(Message.c_str(),&End,10)); if (End == Message.c_str()) return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str()); @@ -210,27 +229,23 @@ bool pkgAcquire::Worker::RunMessages() // Determine the message number and dispatch switch (Number) { - // 100 Capabilities - case 100: + case MessageType::CAPABILITIES: if (Capabilities(Message) == false) return _error->Error("Unable to process Capabilities message from %s",Access.c_str()); break; - // 101 Log - case 101: + case MessageType::LOG: if (Debug == true) clog << " <- (log) " << LookupTag(Message,"Message") << endl; break; - // 102 Status - case 102: + case MessageType::STATUS: Status = LookupTag(Message,"Message"); break; - // 103 Redirect - case 103: + case MessageType::REDIRECT: { - if (Itm == 0) + if (Itm == nullptr) { _error->Error("Method gave invalid 103 Redirect message"); break; @@ -248,11 +263,13 @@ bool pkgAcquire::Worker::RunMessages() // and then put it in the main queue again std::vector<Item*> const ItmOwners = Itm->Owners; OwnerQ->ItemDone(Itm); - Itm = NULL; - for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O) + Itm = nullptr; + for (auto const &Owner: ItmOwners) { - pkgAcquire::Item *Owner = *O; pkgAcquire::ItemDesc &desc = Owner->GetItemDesc(); + if (Log != nullptr) + Log->Done(desc); + // if we change site, treat it as a mirror change if (URI::SiteOnly(NewURI) != URI::SiteOnly(desc.URI)) { @@ -270,22 +287,19 @@ bool pkgAcquire::Worker::RunMessages() } } desc.URI = NewURI; - OwnerQ->Owner->Enqueue(desc); - - if (Log != 0) - Log->Done(desc); + if (isDoomedItem(Owner) == false) + OwnerQ->Owner->Enqueue(desc); } break; } - // 104 Warning - case 104: + + case MessageType::WARNING: _error->Warning("%s: %s", Itm->Owner->DescURI().c_str(), LookupTag(Message,"Message").c_str()); break; - // 200 URI Start - case 200: + case MessageType::URI_START: { - if (Itm == 0) + if (Itm == nullptr) { _error->Error("Method gave invalid 200 URI Start message"); break; @@ -295,26 +309,24 @@ bool pkgAcquire::Worker::RunMessages() CurrentSize = 0; TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10); ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10); - for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O) + for (auto const Owner: Itm->Owners) { - (*O)->Start(Message, TotalSize); - + Owner->Start(Message, TotalSize); // Display update before completion - if (Log != 0) + if (Log != nullptr) { if (Log->MorePulses == true) - Log->Pulse((*O)->GetOwner()); - Log->Fetch((*O)->GetItemDesc()); + Log->Pulse(Owner->GetOwner()); + Log->Fetch(Owner->GetItemDesc()); } } break; } - // 201 URI Done - case 201: + case MessageType::URI_DONE: { - if (Itm == 0) + if (Itm == nullptr) { _error->Error("Method gave invalid 201 URI Done message"); break; @@ -363,9 +375,8 @@ bool pkgAcquire::Worker::RunMessages() bool const isIMSHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) || StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false); - for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O) + for (auto const Owner: ItmOwners) { - pkgAcquire::Item * const Owner = *O; HashStringList const ExpectedHashes = Owner->GetExpectedHashes(); if(_config->FindB("Debug::pkgAcquire::Auth", false) == true) { @@ -412,10 +423,12 @@ bool pkgAcquire::Worker::RunMessages() else // hashsum mismatch Owner->Status = pkgAcquire::Item::StatAuthError; + if (consideredOkay == true) { - Owner->Done(Message, ReceivedHashes, Config); - if (Log != 0) + if (isDoomedItem(Owner) == false) + Owner->Done(Message, ReceivedHashes, Config); + if (Log != nullptr) { if (isIMSHit) Log->IMSHit(Owner->GetItemDesc()); @@ -425,8 +438,9 @@ bool pkgAcquire::Worker::RunMessages() } else { - Owner->Failed(Message,Config); - if (Log != 0) + if (isDoomedItem(Owner) == false) + Owner->Failed(Message,Config); + if (Log != nullptr) Log->Fail(Owner->GetItemDesc()); } } @@ -434,10 +448,9 @@ bool pkgAcquire::Worker::RunMessages() break; } - // 400 URI Failure - case 400: + case MessageType::URI_FAILURE: { - if (Itm == 0) + if (Itm == nullptr) { std::string const msg = LookupTag(Message,"Message"); _error->Error("Method gave invalid 400 URI Failure message: %s", msg.c_str()); @@ -447,13 +460,13 @@ bool pkgAcquire::Worker::RunMessages() PrepareFiles("400::URIFailure", Itm); // Display update before completion - if (Log != 0 && Log->MorePulses == true) + if (Log != nullptr && Log->MorePulses == true) for (pkgAcquire::Queue::QItem::owner_iterator O = Itm->Owners.begin(); O != Itm->Owners.end(); ++O) Log->Pulse((*O)->GetOwner()); std::vector<Item*> const ItmOwners = Itm->Owners; OwnerQ->ItemDone(Itm); - Itm = NULL; + Itm = nullptr; bool errTransient; { @@ -463,27 +476,25 @@ bool pkgAcquire::Worker::RunMessages() errTransient = std::find(std::begin(reasons), std::end(reasons), failReason) != std::end(reasons); } - for (pkgAcquire::Queue::QItem::owner_iterator O = ItmOwners.begin(); O != ItmOwners.end(); ++O) + for (auto const Owner: ItmOwners) { if (errTransient) - (*O)->Status = pkgAcquire::Item::StatTransientNetworkError; - (*O)->Failed(Message,Config); - - if (Log != 0) - Log->Fail((*O)->GetItemDesc()); + Owner->Status = pkgAcquire::Item::StatTransientNetworkError; + if (isDoomedItem(Owner) == false) + Owner->Failed(Message,Config); + if (Log != nullptr) + Log->Fail(Owner->GetItemDesc()); } ItemDone(); break; } - // 401 General Failure - case 401: + case MessageType::GENERAL_FAILURE: _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str()); break; - // 403 Media Change - case 403: + case MessageType::MEDIA_CHANGE: MediaChange(Message); break; } |