2019-10-01 16:50:08 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
2019-10-10 16:30:30 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeReader.h>
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
|
|
|
|
{
|
|
|
|
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
|
|
|
|
/// NOTE: We may use similar code to implement non blocking ALTERs.
|
|
|
|
for (const auto & name_type : data_part->columns)
|
|
|
|
{
|
|
|
|
if (header.has(name_type.name))
|
|
|
|
{
|
|
|
|
auto & elem = header.getByName(name_type.name);
|
|
|
|
if (!elem.type->equals(*name_type.type))
|
|
|
|
{
|
|
|
|
elem.type = name_type.type;
|
|
|
|
elem.column = elem.type->createColumn();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
return std::move(header);
|
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
|
2018-10-17 03:13:00 +00:00
|
|
|
const MergeTreeData & storage_,
|
2017-03-24 13:52:50 +00:00
|
|
|
const MergeTreeData::DataPartPtr & owned_data_part_,
|
2019-02-10 16:55:12 +00:00
|
|
|
UInt64 max_block_size_rows_,
|
2017-03-24 13:52:50 +00:00
|
|
|
size_t preferred_block_size_bytes_,
|
2017-06-30 16:28:27 +00:00
|
|
|
size_t preferred_max_column_in_block_size_bytes_,
|
2019-07-19 14:56:00 +00:00
|
|
|
Names required_columns_,
|
2019-10-04 15:40:05 +00:00
|
|
|
MarkRanges mark_ranges_,
|
2017-03-24 13:52:50 +00:00
|
|
|
bool use_uncompressed_cache_,
|
2019-01-07 10:40:58 +00:00
|
|
|
const PrewhereInfoPtr & prewhere_info_,
|
2019-08-03 11:02:40 +00:00
|
|
|
bool check_columns_,
|
2019-12-18 15:54:45 +00:00
|
|
|
const MergeTreeReaderSettings & reader_settings_,
|
2019-01-07 10:40:58 +00:00
|
|
|
const Names & virt_column_names_,
|
2017-04-05 20:34:19 +00:00
|
|
|
size_t part_index_in_query_,
|
2017-03-24 13:52:50 +00:00
|
|
|
bool quiet)
|
2017-04-01 07:20:54 +00:00
|
|
|
:
|
2019-12-19 11:46:43 +00:00
|
|
|
<<<<<<< HEAD:dbms/src/Storages/MergeTree/MergeTreeSelectBlockInputStream.cpp
|
2019-01-07 10:40:58 +00:00
|
|
|
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
|
2019-12-18 16:41:11 +00:00
|
|
|
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
2019-10-10 16:30:30 +00:00
|
|
|
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
2019-07-19 14:56:00 +00:00
|
|
|
required_columns{required_columns_},
|
2019-12-19 11:46:43 +00:00
|
|
|
=======
|
2019-10-01 16:50:08 +00:00
|
|
|
MergeTreeBaseSelectProcessor{
|
2019-10-02 11:57:17 +00:00
|
|
|
replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
|
2019-10-01 16:50:08 +00:00
|
|
|
storage_, prewhere_info_, max_block_size_rows_,
|
2017-06-30 16:28:27 +00:00
|
|
|
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
|
2019-01-07 10:40:58 +00:00
|
|
|
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
|
2019-10-01 16:50:08 +00:00
|
|
|
required_columns{std::move(required_columns_)},
|
2019-12-19 11:46:43 +00:00
|
|
|
>>>>>>> upstream/master:dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp
|
2017-04-05 20:34:19 +00:00
|
|
|
data_part{owned_data_part_},
|
2017-07-28 17:34:02 +00:00
|
|
|
part_columns_lock(data_part->columns_lock),
|
2019-10-04 15:40:05 +00:00
|
|
|
all_mark_ranges(std::move(mark_ranges_)),
|
2017-04-05 20:34:19 +00:00
|
|
|
part_index_in_query(part_index_in_query_),
|
2019-08-03 11:02:40 +00:00
|
|
|
check_columns(check_columns_),
|
2017-04-05 20:34:19 +00:00
|
|
|
path(data_part->getFullPath())
|
2017-03-24 13:52:50 +00:00
|
|
|
{
|
|
|
|
/// Let's estimate total number of rows for progress bar.
|
|
|
|
for (const auto & range : all_mark_ranges)
|
2018-05-23 19:34:37 +00:00
|
|
|
total_marks_count += range.end - range.begin;
|
|
|
|
|
2019-07-15 20:26:55 +00:00
|
|
|
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
|
2017-03-24 13:52:50 +00:00
|
|
|
|
|
|
|
if (!quiet)
|
2017-04-05 20:34:19 +00:00
|
|
|
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << data_part->name
|
2017-03-24 13:52:50 +00:00
|
|
|
<< ", approx. " << total_rows
|
|
|
|
<< (all_mark_ranges.size() > 1
|
2019-07-15 20:26:55 +00:00
|
|
|
? ", up to " + toString(total_rows)
|
2017-03-24 13:52:50 +00:00
|
|
|
: "")
|
2019-03-25 13:55:24 +00:00
|
|
|
<< " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
|
2017-03-24 13:52:50 +00:00
|
|
|
|
2018-02-23 10:02:29 +00:00
|
|
|
addTotalRowsApprox(total_rows);
|
2019-10-02 11:57:17 +00:00
|
|
|
ordered_names = header_without_virtual_columns.getNames();
|
2018-04-16 12:21:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
bool MergeTreeSelectProcessor::getNewTask()
|
2017-04-06 17:21:45 +00:00
|
|
|
try
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2018-05-23 19:34:37 +00:00
|
|
|
/// Produce no more than one task
|
|
|
|
if (!is_first_task || total_marks_count == 0)
|
2017-04-05 20:34:19 +00:00
|
|
|
{
|
2017-04-06 17:21:45 +00:00
|
|
|
finish();
|
2017-04-05 20:34:19 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
is_first_task = false;
|
|
|
|
|
2019-07-19 14:56:00 +00:00
|
|
|
task_columns = getReadTaskColumns(storage, data_part, required_columns, prewhere_info, check_columns);
|
2017-04-05 20:34:19 +00:00
|
|
|
|
|
|
|
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
|
|
|
|
* and remove this reverse. */
|
|
|
|
MarkRanges remaining_mark_ranges = all_mark_ranges;
|
|
|
|
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
|
|
|
|
|
2018-09-03 05:06:19 +00:00
|
|
|
auto size_predictor = (preferred_block_size_bytes == 0)
|
|
|
|
? nullptr
|
|
|
|
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
|
2017-04-05 20:34:19 +00:00
|
|
|
|
2019-07-19 14:56:00 +00:00
|
|
|
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
|
|
|
const auto & column_names = task_columns.columns.getNames();
|
|
|
|
column_name_set = NameSet{column_names.begin(), column_names.end()};
|
|
|
|
|
2018-04-11 14:31:54 +00:00
|
|
|
task = std::make_unique<MergeTreeReadTask>(
|
2019-07-19 14:56:00 +00:00
|
|
|
data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
|
|
|
|
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
|
|
|
task_columns.should_reorder, std::move(size_predictor));
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (!reader)
|
|
|
|
{
|
|
|
|
if (use_uncompressed_cache)
|
2019-01-04 12:10:00 +00:00
|
|
|
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-01-04 12:10:00 +00:00
|
|
|
owned_mark_cache = storage.global_context.getMarkCache();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-10-10 16:30:30 +00:00
|
|
|
reader = data_part->getReader(task_columns.columns, all_mark_ranges,
|
|
|
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-11 14:31:54 +00:00
|
|
|
if (prewhere_info)
|
2019-10-10 16:30:30 +00:00
|
|
|
pre_reader = data_part->getReader(task_columns.pre_columns, all_mark_ranges,
|
|
|
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2017-04-05 20:34:19 +00:00
|
|
|
return true;
|
|
|
|
}
|
2017-04-06 17:21:45 +00:00
|
|
|
catch (...)
|
2017-04-05 20:34:19 +00:00
|
|
|
{
|
2017-04-06 17:21:45 +00:00
|
|
|
/// Suspicion of the broken part. A part is added to the queue for verification.
|
|
|
|
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
throw;
|
|
|
|
}
|
2017-03-24 13:52:50 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
void MergeTreeSelectProcessor::finish()
|
2017-04-06 17:21:45 +00:00
|
|
|
{
|
|
|
|
/** Close the files (before destroying the object).
|
|
|
|
* When many sources are created, but simultaneously reading only a few of them,
|
|
|
|
* buffers don't waste memory.
|
|
|
|
*/
|
|
|
|
reader.reset();
|
|
|
|
pre_reader.reset();
|
2017-07-28 17:34:02 +00:00
|
|
|
part_columns_lock.unlock();
|
2017-04-06 17:21:45 +00:00
|
|
|
data_part.reset();
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
MergeTreeSelectProcessor::~MergeTreeSelectProcessor() = default;
|
2017-03-24 13:52:50 +00:00
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|