2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeReader.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeReadPool.h>
|
2018-11-29 09:19:42 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
2018-11-29 09:19:42 +00:00
|
|
|
MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
|
2017-03-24 13:52:50 +00:00
|
|
|
const size_t thread,
|
|
|
|
const MergeTreeReadPoolPtr & pool,
|
|
|
|
const size_t min_marks_to_read_,
|
2019-02-10 16:55:12 +00:00
|
|
|
const UInt64 max_block_size_rows_,
|
2019-01-07 10:40:58 +00:00
|
|
|
size_t preferred_block_size_bytes_,
|
|
|
|
size_t preferred_max_column_in_block_size_bytes_,
|
|
|
|
const MergeTreeData & storage_,
|
|
|
|
const bool use_uncompressed_cache_,
|
|
|
|
const PrewhereInfoPtr & prewhere_info_,
|
2017-03-24 13:52:50 +00:00
|
|
|
const Settings & settings,
|
2019-01-07 10:40:58 +00:00
|
|
|
const Names & virt_column_names_)
|
2017-03-24 13:52:50 +00:00
|
|
|
:
|
2019-01-07 10:40:58 +00:00
|
|
|
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
|
|
|
|
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, settings.min_bytes_to_use_direct_io,
|
|
|
|
settings.max_read_buffer_size, use_uncompressed_cache_, true, virt_column_names_},
|
2017-03-24 13:52:50 +00:00
|
|
|
thread{thread},
|
|
|
|
pool{pool}
|
|
|
|
{
|
|
|
|
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
|
2019-04-01 10:34:22 +00:00
|
|
|
/// If granularity is adaptive it doesn't make sense
|
|
|
|
/// Maybe it will make sence to add settings `max_block_size_bytes`
|
|
|
|
if (max_block_size_rows && !storage.index_granularity_info.is_adaptive)
|
2017-03-24 13:52:50 +00:00
|
|
|
{
|
2019-04-01 10:34:22 +00:00
|
|
|
size_t fixed_index_granularity = storage.index_granularity_info.fixed_index_granularity;
|
|
|
|
min_marks_to_read = (min_marks_to_read_ * fixed_index_granularity + max_block_size_rows - 1)
|
|
|
|
/ max_block_size_rows * max_block_size_rows / fixed_index_granularity;
|
2017-03-24 13:52:50 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
min_marks_to_read = min_marks_to_read_;
|
2018-04-19 15:18:26 +00:00
|
|
|
|
|
|
|
ordered_names = getHeader().getNames();
|
2017-03-24 13:52:50 +00:00
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
|
2018-11-29 09:19:42 +00:00
|
|
|
Block MergeTreeThreadSelectBlockInputStream::getHeader() const
|
2018-01-09 01:51:08 +00:00
|
|
|
{
|
2018-02-19 03:56:08 +00:00
|
|
|
auto res = pool->getHeader();
|
2018-04-16 12:21:36 +00:00
|
|
|
executePrewhereActions(res, prewhere_info);
|
2018-09-07 20:23:28 +00:00
|
|
|
injectVirtualColumns(res);
|
2018-02-19 03:56:08 +00:00
|
|
|
return res;
|
2018-08-10 04:02:56 +00:00
|
|
|
}
|
2018-01-09 01:51:08 +00:00
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
/// Requests read task from MergeTreeReadPool and signals whether it got one
|
2018-11-29 09:19:42 +00:00
|
|
|
bool MergeTreeThreadSelectBlockInputStream::getNewTask()
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2018-04-19 15:18:26 +00:00
|
|
|
task = pool->getTask(min_marks_to_read, thread, ordered_names);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (!task)
|
|
|
|
{
|
|
|
|
/** Close the files (before destroying the object).
|
|
|
|
* When many sources are created, but simultaneously reading only a few of them,
|
|
|
|
* buffers don't waste memory.
|
|
|
|
*/
|
2017-04-06 17:21:45 +00:00
|
|
|
reader.reset();
|
|
|
|
pre_reader.reset();
|
2017-04-01 07:20:54 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-03-24 13:52:50 +00:00
|
|
|
const std::string path = task->data_part->getFullPath();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Allows pool to reduce number of threads in case of too slow reads.
|
|
|
|
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info) { pool->profileFeedback(info); };
|
|
|
|
|
|
|
|
if (!reader)
|
|
|
|
{
|
2018-10-17 14:56:15 +00:00
|
|
|
auto rest_mark_ranges = pool->getRestMarks(path, task->mark_ranges[0]);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (use_uncompressed_cache)
|
2019-01-04 12:10:00 +00:00
|
|
|
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
|
|
|
|
owned_mark_cache = storage.global_context.getMarkCache();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
reader = std::make_unique<MergeTreeReader>(
|
2017-03-24 13:52:50 +00:00
|
|
|
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
|
2018-10-17 14:56:15 +00:00
|
|
|
storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-11 14:31:54 +00:00
|
|
|
if (prewhere_info)
|
2017-04-01 07:20:54 +00:00
|
|
|
pre_reader = std::make_unique<MergeTreeReader>(
|
2017-03-24 13:52:50 +00:00
|
|
|
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
|
2018-10-17 14:56:15 +00:00
|
|
|
storage, rest_mark_ranges, min_bytes_to_use_direct_io,
|
2017-04-01 07:20:54 +00:00
|
|
|
max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2018-10-17 14:56:15 +00:00
|
|
|
/// in other case we can reuse readers, anyway they will be "seeked" to required mark
|
|
|
|
if (path != last_readed_part_path)
|
2018-10-03 17:12:38 +00:00
|
|
|
{
|
2018-10-17 14:56:15 +00:00
|
|
|
auto rest_mark_ranges = pool->getRestMarks(path, task->mark_ranges[0]);
|
2018-10-03 17:10:23 +00:00
|
|
|
/// retain avg_value_size_hints
|
|
|
|
reader = std::make_unique<MergeTreeReader>(
|
|
|
|
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
|
2018-10-17 14:56:15 +00:00
|
|
|
storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
|
2018-10-03 17:10:23 +00:00
|
|
|
reader->getAvgValueSizeHints(), profile_callback);
|
|
|
|
|
|
|
|
if (prewhere_info)
|
|
|
|
pre_reader = std::make_unique<MergeTreeReader>(
|
|
|
|
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
|
2018-10-17 14:56:15 +00:00
|
|
|
storage, rest_mark_ranges, min_bytes_to_use_direct_io,
|
2018-10-03 17:10:23 +00:00
|
|
|
max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-10-04 08:52:56 +00:00
|
|
|
last_readed_part_path = path;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
return true;
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-11-29 09:19:42 +00:00
|
|
|
MergeTreeThreadSelectBlockInputStream::~MergeTreeThreadSelectBlockInputStream() = default;
|
2017-03-24 13:52:50 +00:00
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|