Merge branch 'master' into dont_finalize_s3_buffer

This commit is contained in:
alesapin 2022-05-03 12:16:15 +02:00
commit 2c856bdeca
7 changed files with 1033 additions and 430 deletions

View File

@ -167,6 +167,34 @@ Config is read from multiple files (in XML or YAML format) and merged into singl
For queries and subsystems other than `Server` config is accessible using `Context::getConfigRef()` method. Every subsystem that is capable of reloading it's config without server restart should register itself in reload callback in `Server::main()` method. Note that if newer config has an error, most subsystems will ignore new config, log warning messages and keep working with previously loaded config. Due to the nature of `AbstractConfiguration` it is not possible to pass reference to specific section, so `String config_prefix` is usually used instead.
## Threads and jobs {#threads-and-jobs}
To execute queries and do side activities ClickHouse allocates threads from one of thread pools to avoid frequent thread creation and destruction. There are a few thread pools, which are selected depending on a purpose and structure of a job:
* Server pool for incoming client sessions.
* Global thread pool for general purpose jobs, background activities and standalone threads.
* IO thread pool for jobs that are mostly blocked on some IO and are not CPU-intensive.
* Background pools for periodic tasks.
* Pools for preemptable tasks that can be split into steps.
Server pool is a `Poco::ThreadPool` class instance defined in `Server::main()` method. It can have at most `max_connection` threads. Every thread is dedicated to a single active connection.
Global thread pool is `GlobalThreadPool` singleton class. To allocate thread from it `ThreadFromGlobalPool` is used. It has an interface similar to `std::thread`, but pulls thread from the global pool and does all necessary initializations. It is configured with the following settings:
* `max_thread_pool_size` - limit on thread count in pool.
* `max_thread_pool_free_size` - limit on idle thread count waiting for new jobs.
* `thread_pool_queue_size` - limit on scheduled job count.
Global pool is universal and all pools described below are implemented on top of it. This can be thought of as a hierarchy of pools. Any specialized pool takes its threads from the global pool using `ThreadPool` class. So the main purpose of any specialized pool is to apply limit on the number of simultaneous jobs and do job scheduling. If there are more jobs scheduled than threads in a pool, `ThreadPool` accumulates jobs in a queue with priorities. Each job has an integer priority. Default priority is zero. All jobs with higher priority values are started before any job with lower priority value. But there is no difference between already executing jobs, thus priority matters only when the pool in overloaded.
IO thread pool is implemented as a plain `ThreadPool` accessible via `IOThreadPool::get()` method. It is configured in the same way as global pool with `max_io_thread_pool_size`, `max_io_thread_pool_free_size` and `io_thread_pool_queue_size` settings. The main purpose of IO thread pool is to avoid exhaustion of the global pool with IO jobs, which could prevent queries from fully utilizing CPU.
For periodic task execution there is `BackgroundSchedulePool` class. You can register tasks using `BackgroundSchedulePool::TaskHolder` objects and the pool ensures that no task runs two jobs at the same time. It also allows you to postpone task execution to a specific instant in the future or temporarily deactivate task. Global `Context` provides a few instances of this class for different purposes. For general purpose tasks `Context::getSchedulePool()` is used.
There are also specialized thread pools for preemptable tasks. Such `IExecutableTask` task can be split into ordered sequence of jobs, called steps. To schedule these tasks in a manner allowing short tasks to be prioritied over long ones `MergeTreeBackgroundExecutor` is used. As name suggests it is used for background MergeTree related operations such as merges, mutations, fetches and moves. Pool instances are available using `Context::getCommonExecutor()` and other similar methods.
No matter what pool is used for a job, at start `ThreadStatus` instance is created for this job. It encapsulates all per-thread information: thread id, query id, performance counters, resource consumption and many other useful data. Job can access it via thread local pointer by `CurrentThread::get()` call, so we do not need to pass it to every function.
If thread is related to query execution, then the most important thing attached to `ThreadStatus` is query context `ContextPtr`. Every query has its master thread in the server pool. Master thread does the attachment by holding an `ThreadStatus::QueryScope query_scope(query_context)` object. Master thread also creates a thread group represented with `ThreadGroupStatus` object. Every additional thread that is allocated during this query execution is attached to its thread group by `CurrentThread::attachTo(thread_group)` call. Thread groups are used to aggregate profile event counters and track memory consumption by all threads dedicated to a single task (see `MemoryTracker` and `ProfileEvents::Counters` classes for more information).
## Distributed Query Execution {#distributed-query-execution}
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.

View File

@ -22,7 +22,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE, false);
}
}

View File

@ -22,10 +22,12 @@ MergeTreeReaderStream::MergeTreeReaderStream(
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache, size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
bool is_low_cardinality_dictionary_)
: disk(std::move(disk_))
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
, marks_count(marks_count_)
, file_size(file_size_)
, mark_cache(mark_cache_)
@ -126,23 +128,59 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
auto right_mark = marks_loader.getMark(right_mark_non_included);
result_right_offset = right_mark.offset_in_compressed_file;
/// Let's go to the right and find mark with bigger offset in compressed file
bool found_bigger_mark = false;
for (size_t i = right_mark_non_included + 1; i < marks_count; ++i)
bool need_to_check_marks_from_the_right = false;
/// If the end of range is inside the block, we will need to read it too.
if (right_mark.offset_in_decompressed_block > 0)
{
const auto & candidate_mark = marks_loader.getMark(i);
if (result_right_offset < candidate_mark.offset_in_compressed_file)
{
result_right_offset = candidate_mark.offset_in_compressed_file;
found_bigger_mark = true;
break;
}
need_to_check_marks_from_the_right = true;
}
else if (is_low_cardinality_dictionary)
{
/// Also, in LowCardinality dictionary several consecutive marks can point to
/// the same offset. So to get true bytes offset we have to get first
/// non-equal mark.
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]
/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// Check test_s3_low_cardinality_right_border.
need_to_check_marks_from_the_right = true;
}
if (!found_bigger_mark)
/// Let's go to the right and find mark with bigger offset in compressed file
if (need_to_check_marks_from_the_right)
{
/// If there are no marks after the end of range, just use file size
result_right_offset = file_size;
bool found_bigger_mark = false;
for (size_t i = right_mark_non_included + 1; i < marks_count; ++i)
{
const auto & candidate_mark = marks_loader.getMark(i);
if (result_right_offset < candidate_mark.offset_in_compressed_file)
{
result_right_offset = candidate_mark.offset_in_compressed_file;
found_bigger_mark = true;
break;
}
}
if (!found_bigger_mark)
{
/// If there are no marks after the end of range, just use file size
result_right_offset = file_size;
}
}
}
else if (right_mark_non_included == 0)

View File

@ -24,7 +24,8 @@ public:
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache, UncompressedCache * uncompressed_cache,
size_t file_size_, const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
bool is_low_cardinality_dictionary_);
void seekToMark(size_t index);
@ -46,6 +47,8 @@ private:
std::string path_prefix;
std::string data_file_extension;
bool is_low_cardinality_dictionary = false;
size_t marks_count;
size_t file_size;

View File

@ -180,12 +180,14 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
if (!data_file_exists)
return;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
disk, data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
profile_callback, clock_type));
profile_callback, clock_type, is_lc_dict));
};
data_part->getSerialization(name_and_type)->enumerateStreams(callback);

File diff suppressed because it is too large Load Diff

View File

@ -77,6 +77,7 @@ def started_cluster():
def test_s3_right_border(started_cluster):
node1.query("drop table if exists s3_low_cardinality")
node1.query(
"""
CREATE TABLE s3_low_cardinality
@ -112,3 +113,22 @@ SETTINGS storage_policy = 's3', min_bytes_for_wide_part = 0, index_granularity
)
== "1300000\n"
)
def test_s3_right_border_2(started_cluster):
node1.query("drop table if exists s3_low_cardinality")
node1.query(
"create table s3_low_cardinality (key UInt32, str_column LowCardinality(String)) engine = MergeTree order by (key) settings storage_policy = 's3', min_bytes_for_wide_part = 0, index_granularity = 8192, min_compress_block_size=1, merge_max_block_size=10000"
)
node1.query(
"insert into s3_low_cardinality select number, number % 8000 from numbers(8192)"
)
node1.query(
"insert into s3_low_cardinality select number = 0 ? 0 : (number + 8192 * 1), number % 8000 + 1 * 8192 from numbers(8192)"
)
node1.query(
"insert into s3_low_cardinality select number = 0 ? 0 : (number + 8192 * 2), number % 8000 + 2 * 8192 from numbers(8192)"
)
node1.query("optimize table s3_low_cardinality final")
res = node1.query("select * from s3_low_cardinality where key = 9000")
assert res == "9000\t9000\n"