do not run support threads if no tasks left

This commit is contained in:
Sema Checherinda 2023-02-22 16:21:08 +01:00
parent e5cbe4311e
commit 08dc874a37

View File

@ -63,6 +63,19 @@ public:
} }
}; };
struct WorkerState
{
struct Task
{
DiskPtr disk;
String file;
std::atomic<size_t> * counter = nullptr;
};
std::vector<Task> tasks;
std::atomic<size_t> next_task = {0};
};
class DetachedPartsSource : public ISource class DetachedPartsSource : public ISource
{ {
public: public:
@ -113,7 +126,7 @@ private:
const std::vector<UInt8> columns_mask; const std::vector<UInt8> columns_mask;
const UInt64 block_size; const UInt64 block_size;
const bool has_bytes_on_disk_column; const bool has_bytes_on_disk_column;
static const size_t support_threads = 35; const size_t support_threads = 35;
StoragesInfo current_info; StoragesInfo current_info;
DetachedPartsInfo detached_parts; DetachedPartsInfo detached_parts;
@ -134,20 +147,7 @@ private:
if (!has_bytes_on_disk_column) if (!has_bytes_on_disk_column)
return; return;
struct Task WorkerState worker_state;
{
DiskPtr disk;
String file;
std::atomic<size_t> * counter = nullptr;
};
struct SharedState
{
std::vector<Task> tasks;
std::atomic<size_t> next_task = {0};
};
SharedState shared_state;
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id) for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
{ {
@ -161,7 +161,7 @@ private:
auto * counter = &parts_sizes[p_id - begin]; auto * counter = &parts_sizes[p_id - begin];
for (auto & file : listing) for (auto & file : listing)
shared_state.tasks.push_back({disk, file, counter}); worker_state.tasks.push_back({disk, file, counter});
} }
std::vector<std::future<void>> futures; std::vector<std::future<void>> futures;
@ -173,21 +173,27 @@ private:
futures.clear(); futures.clear();
}); });
for (size_t i = 0; i < support_threads; ++i) auto max_thread_to_run = std::min(support_threads, worker_state.tasks.size() / 10);
for (size_t i = 0; i < max_thread_to_run; ++i)
{ {
if (worker_state.next_task.load() >= worker_state.tasks.size())
break;
auto worker = [&worker_state] ()
{
for (auto id = worker_state.next_task++; id < worker_state.tasks.size(); id = worker_state.next_task++)
{
auto & task = worker_state.tasks.at(id);
auto size = task.disk->getFileSize(task.file);
task.counter->fetch_add(size);
}
};
futures.push_back( futures.push_back(
scheduleFromThreadPool<void>( scheduleFromThreadPool<void>(
[&shared_state] () std::move(worker),
{ IOThreadPool::get(),
for (auto id = shared_state.next_task++; id < shared_state.tasks.size(); id = shared_state.next_task++) "DP_BytesOnDisk"));
{
auto & task = shared_state.tasks.at(id);
auto size = task.disk->getFileSize(task.file);
task.counter->fetch_add(size);
}
},
IOThreadPool::get(),
"DP_BytesOnDisk"));
} }
/// Exceptions are propagated /// Exceptions are propagated