summaryrefslogtreecommitdiff
path: root/apt-pkg
diff options
context:
space:
mode:
authorDavid Kalnischkies <david@kalnischkies.de>2019-04-27 16:13:02 +0200
committerDavid Kalnischkies <david@kalnischkies.de>2019-07-08 13:18:31 +0200
commit79b1a82983e737e74359bc306d9edb357c5bdd46 (patch)
tree28fe86abdf88e54e8e9407e81e59d01e149a2ef2 /apt-pkg
parent8ff87e9cd37a4436eb7e56f814a099cb30845ae1 (diff)
Distribute host-less work based on backlog of the queues
Work like applying patches via rred can be performed by many concurrent rred processes, but we can't just spawn new ones forever: We limit us to the number of CPUs which can drive them and reuse existing ones if they have nothing to do at the moment. The problem arises if we have reached the limit of queues and all of them are busy which is more likely to happen on "slow" machines with few CPUs. In this case we opted for random distribution, but that can result in many big files (e.g. Contents) being added to one queue while the others get none or only small files. Ideally we would ask the methods how much they still have to do, but they only know that for the current item, not for all items in the queue, so we use the filesize of the expected result.
Diffstat (limited to 'apt-pkg')
-rw-r--r--apt-pkg/acquire.cc110
1 files changed, 66 insertions, 44 deletions
diff --git a/apt-pkg/acquire.cc b/apt-pkg/acquire.cc
index 8bb72d549..6cf8b4c83 100644
--- a/apt-pkg/acquire.cc
+++ b/apt-pkg/acquire.cc
@@ -385,75 +385,97 @@ void pkgAcquire::Dequeue(Item *Itm)
return http://foo.org or http */
string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
{
+ constexpr int DEFAULT_HOST_LIMIT = 10;
URI U(Uri);
-
- Config = GetConfig(U.Access);
- if (Config == 0)
- return string();
-
- /* Single-Instance methods get exactly one queue per URI. This is
- also used for the Access queue method */
- if (Config->SingleInstance == true || QueueMode == QueueAccess)
+ // Access mode forces all methods to be Single-Instance
+ if (QueueMode == QueueAccess)
return U.Access;
- int Limit = 10;
- string AccessSchema = U.Access + ':';
- string FullQueueName;
+ Config = GetConfig(U.Access);
+ if (Config == nullptr)
+ return {};
+
+ // Single-Instance methods get exactly one queue per URI
+ if (Config->SingleInstance == true)
+ return U.Access;
+ // Host-less methods like rred, store, …
if (U.Host.empty())
{
- long existing = 0;
+ int existing = 0;
// check how many queues exist already and reuse empty ones
+ auto const AccessSchema = U.Access + ':';
for (Queue const *I = Queues; I != 0; I = I->Next)
- if (I->Name.compare(0, AccessSchema.length(), AccessSchema) == 0)
+ if (APT::String::Startswith(I->Name, AccessSchema))
{
if (I->Items == nullptr)
return I->Name;
++existing;
}
+ int const Limit = _config->FindI("Acquire::QueueHost::Limit",
#ifdef _SC_NPROCESSORS_ONLN
- long cpuCount = sysconf(_SC_NPROCESSORS_ONLN) * 2;
+ sysconf(_SC_NPROCESSORS_ONLN) * 2
#else
- long cpuCount = Limit;
+ DEFAULT_HOST_LIMIT
#endif
- Limit = _config->FindI("Acquire::QueueHost::Limit", cpuCount);
+ );
+ // create a new worker if we don't have too many yet
if (Limit <= 0 || existing < Limit)
- strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), existing);
- else
- {
- long const randomQueue = random() % Limit;
- strprintf(FullQueueName, "%s%ld", AccessSchema.c_str(), randomQueue);
- }
+ return AccessSchema + std::to_string(existing);
+
+ // find the worker with the least to do
+ // we already established that there are no empty and we can't spawn new
+ Queue const *selected = nullptr;
+ auto selected_backlog = std::numeric_limits<decltype(HashStringList().FileSize())>::max();
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (APT::String::Startswith(Q->Name, AccessSchema))
+ {
+ decltype(selected_backlog) current_backlog = 0;
+ for (auto const *I = Q->Items; I != nullptr; I = I->Next)
+ {
+ auto const hashes = I->Owner->GetExpectedHashes();
+ if (not hashes.empty())
+ current_backlog += hashes.FileSize();
+ else
+ current_backlog += I->Owner->FileSize;
+ }
+ if (current_backlog < selected_backlog)
+ {
+ selected = Q;
+ selected_backlog = current_backlog;
+ }
+ }
- if (Debug)
- clog << "Chose random queue " << FullQueueName << " for " << Uri << endl;
- } else
- {
- Limit = _config->FindI("Acquire::QueueHost::Limit", Limit);
- FullQueueName = AccessSchema + U.Host;
+ if (unlikely(selected == nullptr))
+ return AccessSchema + "0";
+ return selected->Name;
}
- unsigned int Instances = 0, SchemaLength = AccessSchema.length();
-
- Queue *I = Queues;
- for (; I != 0; I = I->Next) {
+ // most methods talking to remotes like http
+ else
+ {
+ auto const FullQueueName = U.Access + ':' + U.Host;
// if the queue already exists, re-use it
- if (I->Name == FullQueueName)
- return FullQueueName;
-
- if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
- Instances++;
- }
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (Q->Name == FullQueueName)
+ return FullQueueName;
- if (Debug) {
- clog << "Found " << Instances << " instances of " << U.Access << endl;
- }
+ int existing = 0;
+ // check how many queues exist already and reuse empty ones
+ auto const AccessSchema = U.Access + ':';
+ for (Queue const *Q = Queues; Q != nullptr; Q = Q->Next)
+ if (APT::String::Startswith(Q->Name, AccessSchema))
+ ++existing;
- if (Instances >= static_cast<decltype(Instances)>(Limit))
- return U.Access;
+ int const Limit = _config->FindI("Acquire::QueueHost::Limit", DEFAULT_HOST_LIMIT);
+ // if we have too many hosts open use a single generic for the rest
+ if (existing >= Limit)
+ return U.Access;
- return FullQueueName;
+ // we can still create new named queues
+ return FullQueueName;
+ }
}
/*}}}*/
// Acquire::GetConfig - Fetch the configuration information /*{{{*/