Improve shared pool and add settings

This commit is contained in:
Antonio Andelic 2022-03-10 11:06:50 +00:00
parent cce318273b
commit 28e9508c4e
5 changed files with 34 additions and 54 deletions

View File

@ -45,6 +45,7 @@
#include <Core/ServerUUID.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadHelpers.h>
#include <IO/IOThreadPool.h>
#include <IO/UseSSL.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/DDLWorker.h>
@ -554,6 +555,7 @@ if (ThreadFuzzer::instance().isEffective())
config().getUInt("thread_pool_queue_size", 10000)
);
IOThreadPool::initialize(100, 0, 10000);
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))

View File

@ -46,6 +46,8 @@ class IColumn;
M(UInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(UInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(MaxThreads, max_download_threads, 4, "The maximum number of threads to download data (e.g. from s3).", 0) \
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. from s3) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
M(UInt64, max_distributed_connections, 1024, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).", 0) \
M(UInt64, max_query_size, DBMS_DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)", 0) \

View File

@ -19,13 +19,27 @@ ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader
, max_working_readers(max_working_readers_)
, reader_factory(std::move(reader_factory_))
{
initializeWorkers();
std::unique_lock<std::mutex> lock{mutex};
addReaders(lock);
}
void ParallelReadBuffer::initializeWorkers()
bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer_lock*/)
{
for (size_t i = 0; i < max_working_readers; ++i)
pool->scheduleOrThrow([this] { processor(); });
auto reader = reader_factory->getReader();
if (!reader)
{
return false;
}
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader->first), reader->second));
pool->scheduleOrThrow([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
return true;
}
void ParallelReadBuffer::addReaders(std::unique_lock<std::mutex> & buffer_lock)
{
while (read_workers.size() < max_working_readers && addReaderToPool(buffer_lock))
;
}
off_t ParallelReadBuffer::seek(off_t offset, int whence)
@ -98,7 +112,8 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
resetWorkingBuffer();
emergency_stop = false;
initializeWorkers();
lock.lock();
addReaders(lock);
return offset;
}
@ -134,7 +149,7 @@ bool ParallelReadBuffer::nextImpl()
[this]()
{
/// Check if no more readers left or current reader can be processed
return emergency_stop || (all_created && read_workers.empty()) || currentWorkerReady();
return emergency_stop || currentWorkerReady();
});
if (emergency_stop)
@ -149,10 +164,10 @@ bool ParallelReadBuffer::nextImpl()
}
if (worker_removed)
reader_condvar.notify_all();
addReaders(lock);
/// All readers processed, stop
if (read_workers.empty() && all_created)
if (read_workers.empty())
{
all_completed = true;
return false;
@ -171,52 +186,22 @@ bool ParallelReadBuffer::nextImpl()
return true;
}
void ParallelReadBuffer::processor()
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
{
std::lock_guard lock{mutex};
++active_working_reader;
}
while (!emergency_stop)
{
ReadWorkerPtr worker;
{
/// Create new read worker and put in into end of queue
/// reader_factory is not thread safe, so we call getReader under lock
std::unique_lock lock(mutex);
reader_condvar.wait(lock, [this] { return emergency_stop || read_workers.size() < max_working_readers; });
if (emergency_stop)
break;
auto reader = reader_factory->getReader();
if (!reader)
{
all_created = true;
next_condvar.notify_all();
break;
}
worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader->first), reader->second));
}
/// Start processing
readerThreadFunction(std::move(worker));
}
{
SCOPE_EXIT({
std::lock_guard lock{mutex};
--active_working_reader;
if (active_working_reader == 0)
{
readers_done.notify_all();
}
}
}
});
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
try
{
while (!emergency_stop)
@ -263,7 +248,6 @@ void ParallelReadBuffer::onBackgroundException()
void ParallelReadBuffer::finishAndWait()
{
emergency_stop = true;
reader_condvar.notify_all();
std::unique_lock lock{mutex};
readers_done.wait(lock, [&] { return active_working_reader == 0; });

View File

@ -28,8 +28,6 @@ private:
/// Finished readers removed from queue and data from next readers processed
bool nextImpl() override;
void initializeWorkers();
class Segment : private boost::noncopyable
{
public:
@ -131,8 +129,8 @@ private:
[[noreturn]] void handleEmergencyStop();
/// Create new readers in a loop and process it with readerThreadFunction.
void processor();
void addReaders(std::unique_lock<std::mutex> & buffer_lock);
bool addReaderToPool(std::unique_lock<std::mutex> & buffer_lock);
/// Process read_worker, read data and save into internal segments queue
void readerThreadFunction(ReadWorkerPtr read_worker);

View File

@ -290,15 +290,9 @@ namespace
if (supports_ranges && content_length)
{
[[maybe_unused]] static auto initialized = std::invoke(
[]
{
IOThreadPool::initialize(100, 4, 10000);
return true;
});
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
*content_length,
10 * 1024 * 1024,
context->getSettings().max_download_buffer_size,
request_uri,
http_method,
callback,
@ -313,7 +307,7 @@ namespace
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(std::move(read_buffer_factory), &IOThreadPool::get(), 4),
std::make_unique<ParallelReadBuffer>(std::move(read_buffer_factory), &IOThreadPool::get(), context->getSettings().max_download_threads),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}