diff options
Diffstat (limited to 'apt-pkg')
-rw-r--r-- | apt-pkg/acquire.cc | 110 |
1 files changed, 66 insertions, 44 deletions
diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc index 8bb72d549..6cf8b4c83 100644 --- a/apt-pkg/acquire.cc +++ b/apt-pkg/acquire.cc @@ -385,75 +385,97 @@ void pkgAcquire::Dequeue(Item *Itm) return http://foo.org or http */ string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config) { + constexpr int DEFAULT_HOST_LIMIT = 10; URI U(Uri); - - Config = GetConfig(U.Access); - if (Config == 0) - return string(); - - /* Single-Instance methods get exactly one queue per URI. This is - also used for the Access queue method */ - if (Config->SingleInstance == true || QueueMode == QueueAccess) + // Access mode forces all methods to be Single-Instance + if (QueueMode == QueueAccess) return U.Access; - int Limit = 10; - string AccessSchema = U.Access + ':'; - string FullQueueName; + Config = GetConfig(U.Access); + if (Config == nullptr) + return {}; + + // Single-Instance methods get exactly one queue per URI + if (Config->SingleInstance == true) + return U.Access; + // Host-less methods like rred, store, … if (U.Host.empty()) { - long existing = 0; + int existing = 0; // check how many queues exist already and reuse empty ones + auto const AccessSchema = U.Access + ':'; for (Queue const *I = Queues; I != 0; I = I->Next) - if (I->Name.compare(0, AccessSchema.length(), AccessSchema) == 0) + if (APT::String::Startswith(I->Name, AccessSchema)) { if (I->Items == nullptr) return I->Name; ++existing; } + int const Limit = _config->FindI("Acquire::QueueHost::Limit", #ifdef _SC_NPROCESSORS_ONLN - long cpuCount = sysconf(_SC_NPROCESSORS_ONLN) * 2; + sysconf(_SC_NPROCESSORS_ONLN) * 2 #else - long cpuCount = Limit; + DEFAULT_HOST_LIMIT #endif - Limit = _config->FindI("Acquire::QueueHost::Limit", cpuCount); + ); + // create a new worker if we don't have too many yet if (Limit <= 0 || existing < Limit) - strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), existing); - else - { - long const randomQueue = random() % Limit; - strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), randomQueue); - } + return AccessSchema + std::to_string(existing); + + // find the worker with the least to do + // we already established that there are no empty and we can't spawn new + Queue const *selected = nullptr; + auto selected_backlog = std::numeric_limits<decltype(HashStringList().FileSize())>::max(); + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (APT::String::Startswith(Q->Name, AccessSchema)) + { + decltype(selected_backlog) current_backlog = 0; + for (auto const *I = Q->Items; I != nullptr; I = I->Next) + { + auto const hashes = I->Owner->GetExpectedHashes(); + if (not hashes.empty()) + current_backlog += hashes.FileSize(); + else + current_backlog += I->Owner->FileSize; + } + if (current_backlog < selected_backlog) + { + selected = Q; + selected_backlog = current_backlog; + } + } - if (Debug) - clog << "Chose random queue " << FullQueueName << " for " << Uri << endl; - } else - { - Limit = _config->FindI("Acquire::QueueHost::Limit", Limit); - FullQueueName = AccessSchema + U.Host; + if (unlikely(selected == nullptr)) + return AccessSchema + "0"; + return selected->Name; } - unsigned int Instances = 0, SchemaLength = AccessSchema.length(); - - Queue *I = Queues; - for (; I != 0; I = I->Next) { + // most methods talking to remotes like http + else + { + auto const FullQueueName = U.Access + ':' + U.Host; // if the queue already exists, re-use it - if (I->Name == FullQueueName) - return FullQueueName; - - if (I->Name.compare(0, SchemaLength, AccessSchema) == 0) - Instances++; - } + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (Q->Name == FullQueueName) + return FullQueueName; - if (Debug) { - clog << "Found " << Instances << " instances of " << U.Access << endl; - } + int existing = 0; + // check how many queues exist already and reuse empty ones + auto const AccessSchema = U.Access + ':'; + for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next) + if (APT::String::Startswith(Q->Name, AccessSchema)) + ++existing; - if (Instances >= static_cast<decltype(Instances)>(Limit)) - return U.Access; + int const Limit = _config->FindI("Acquire::QueueHost::Limit", DEFAULT_HOST_LIMIT); + // if we have too many hosts open use a single generic for the rest + if (existing >= Limit) + return U.Access; - return FullQueueName; + // we can still create new named queues + return FullQueueName; + } } /*}}}*/ // Acquire::GetConfig - Fetch the configuration information /*{{{*/ |