Tests are passed. [#CLICKHOUSE-2902]

This commit is contained in:
Vitaliy Lyudvichenko 2017-04-05 23:34:19 +03:00 committed by alexey-milovidov
parent 82af2278fc
commit 4b566304ca
9 changed files with 151 additions and 134 deletions

View File

@ -30,79 +30,24 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
bool save_marks_in_cache_,
const Names & virt_column_names,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseBlockInputStream{storage_, prewhere_actions_, prewhere_column_, max_block_size_rows_, preferred_block_size_bytes_,
min_bytes_to_use_direct_io_, max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_},
owned_data_part{owned_data_part_},
part_columns_lock{new Poco::ScopedReadRWLock(owned_data_part->columns_lock)},
min_bytes_to_use_direct_io_, max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
ordered_names{column_names},
data_part{owned_data_part_},
part_columns_lock{new Poco::ScopedReadRWLock(data_part->columns_lock)},
all_mark_ranges(mark_ranges_),
path(owned_data_part->getFullPath())
part_index_in_query(part_index_in_query_),
check_columns(check_columns),
path(data_part->getFullPath())
{
try
{
log = &Logger::get("MergeTreeBlockInputStream");
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, owned_data_part, column_names).empty();
bool remove_prewhere_column = false;
Names pre_column_names;
if (prewhere_actions)
{
pre_column_names = prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, owned_data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// If the expression in PREWHERE is not a column of the table, you do not need to output a column with it
/// (from storage expect to receive only the columns of the table).
remove_prewhere_column = !pre_name_set.count(prewhere_column);
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
column_names = post_column_names;
}
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (check_columns)
{
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(owned_data_part->columns, pre_column_names);
if (!column_names.empty())
storage.check(owned_data_part->columns, column_names);
pre_columns = storage.getColumnsList().addTypes(pre_column_names);
columns = storage.getColumnsList().addTypes(column_names);
}
else
{
pre_columns = owned_data_part->columns.addTypes(pre_column_names);
columns = owned_data_part->columns.addTypes(column_names);
}
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
* and remove this reverse. */
MarkRanges remaining_mark_ranges = all_mark_ranges;
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
task = std::make_unique<MergeTreeReadTask>(owned_data_part, remaining_mark_ranges, 0, ordered_names, column_name_set,
columns, pre_columns, remove_prewhere_column, should_reorder);
/// Let's estimate total number of rows for progress bar.
size_t total_rows = 0;
for (const auto & range : all_mark_ranges)
@ -110,7 +55,7 @@ try
total_rows *= storage.index_granularity;
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1
? ", up to " + toString((all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity)
@ -123,12 +68,12 @@ catch (const Exception & e)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(owned_data_part->name);
storage.reportBrokenPart(data_part->name);
throw;
}
catch (...)
{
storage.reportBrokenPart(owned_data_part->name);
storage.reportBrokenPart(data_part->name);
throw;
}
}
@ -154,11 +99,78 @@ String MergeTreeBlockInputStream::getID() const
return res.str();
}
Block MergeTreeBlockInputStream::readImpl()
bool MergeTreeBlockInputStream::getNewTask()
{
if (!task || task->mark_ranges.empty())
return Block();
/// Produce only one task
if (!is_first_task)
{
task.reset();
return false;
}
is_first_task = false;
Names pre_column_names, column_names = ordered_names;
bool remove_prewhere_column = false;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, data_part, column_names).empty();
if (prewhere_actions)
{
pre_column_names = prewhere_actions->getRequiredColumns();
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// If the expression in PREWHERE is not a column of the table, you do not need to output a column with it
/// (from storage expect to receive only the columns of the table).
remove_prewhere_column = !pre_name_set.count(prewhere_column);
Names post_column_names;
for (const auto & name : column_names)
if (!pre_name_set.count(name))
post_column_names.push_back(name);
column_names = post_column_names;
}
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
column_name_set = NameSet{column_names.begin(), column_names.end()};
if (check_columns)
{
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(data_part->columns, pre_column_names);
if (!column_names.empty())
storage.check(data_part->columns, column_names);
pre_columns = storage.getColumnsList().addTypes(pre_column_names);
columns = storage.getColumnsList().addTypes(column_names);
}
else
{
pre_columns = data_part->columns.addTypes(pre_column_names);
columns = data_part->columns.addTypes(column_names);
}
/** @note you could simply swap `reverse` in if and else branches of MergeTreeDataSelectExecutor,
* and remove this reverse. */
MarkRanges remaining_mark_ranges = all_mark_ranges;
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
task = std::make_unique<MergeTreeReadTask>(data_part, remaining_mark_ranges, part_index_in_query, ordered_names, column_name_set,
columns, pre_columns, remove_prewhere_column, should_reorder);
if (preferred_block_size_bytes)
task->size_predictor = std::make_shared<MergeTreeBlockSizePredictor>(storage, *task);
if (!reader)
{
@ -168,40 +180,47 @@ Block MergeTreeBlockInputStream::readImpl()
owned_mark_cache = storage.context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, owned_data_part, columns, owned_uncompressed_cache.get(),
path, data_part, columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, owned_data_part, pre_columns, owned_uncompressed_cache.get(),
path, data_part, pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
}
if (preferred_block_size_bytes)
return true;
}
Block MergeTreeBlockInputStream::readImpl()
{
Block res;
while (!res && !isCancelled())
{
if (!task->size_predictor)
task->size_predictor = std::make_unique<MergeTreeBlockSizePredictor>(storage, *task);
task->size_predictor->startBlock();
}
if (!task && !getNewTask())
break;
Block res = readFromPart(task.get());
res = readFromPart();
if (res)
injectVirtualColumns(res, task.get());
if (res)
injectVirtualColumns(res);
if (task->mark_ranges.empty())
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.reset();
owned_data_part.reset();
task.reset();
if (task->mark_ranges.empty())
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
pre_reader.reset();
part_columns_lock.reset();
data_part.reset();
task.reset();
}
}
return res;

View File

@ -28,6 +28,8 @@ public:
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool save_marks_in_cache,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);
~MergeTreeBlockInputStream() override;
@ -40,19 +42,23 @@ protected:
Block readImpl() override;
bool getNewTask() override;
private:
Names ordered_names;
NameSet column_name_set;
NamesAndTypesList columns;
NamesAndTypesList pre_columns;
std::unique_ptr<MergeTreeReadTask> task;
MergeTreeData::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
MergeTreeData::DataPartPtr data_part; /// Кусок не будет удалён, пока им владеет этот объект.
std::unique_ptr<Poco::ScopedReadRWLock> part_columns_lock; /// Не дадим изменить список столбцов куска, пока мы из него читаем.
MarkRanges all_mark_ranges; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
size_t part_index_in_query = 0;
bool check_columns;
String path;
bool is_first_task = true;
};
}

View File

@ -63,4 +63,6 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData
}
MergeTreeReadTask::~MergeTreeReadTask() = default;
}

View File

@ -51,6 +51,8 @@ struct MergeTreeReadTask
ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns},
remove_prewhere_column{remove_prewhere_column}, should_reorder{should_reorder}
{}
virtual ~MergeTreeReadTask();
};
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
@ -104,6 +106,7 @@ struct MergeTreeBlockSizePredictor
info.bytes_per_row = info.bytes_per_row_global;
bytes_per_row_global += info.bytes_per_row_global;
}
bytes_per_row_current = bytes_per_row_global;
}
void startBlock()

View File

@ -1016,7 +1016,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
/// Apply the expression and write the result to temporary files.
if (expression)
{
MarkRanges ranges(1, MarkRange(0, part->size));
MarkRanges ranges{MarkRange(0, part->size)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);

View File

@ -665,8 +665,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, column_name_, MarkRanges(1, MarkRange(0, parts[part_num]->size)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, true);
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed});

View File

@ -710,19 +710,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true);
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
res.push_back(source_stream);
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
res.back() = std::make_shared<AddingConstColumnBlockInputStream<String>>(
res.back(), std::make_shared<DataTypeString>(), part.data_part->name, "_part");
else if (virt_column == "_part_index")
res.back() = std::make_shared<AddingConstColumnBlockInputStream<UInt64>>(
res.back(), std::make_shared<DataTypeUInt64>(), part.part_index_in_query, "_part_index");
}
}
}
@ -766,17 +756,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true);
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
source_stream, std::make_shared<DataTypeString>(), part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = std::make_shared<AddingConstColumnBlockInputStream<UInt64>>(
source_stream, std::make_shared<DataTypeUInt64>(), part.part_index_in_query, "_part_index");
}
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
}

View File

@ -35,6 +35,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
max_read_buffer_size(max_read_buffer_size),
use_uncompressed_cache(use_uncompressed_cache),
save_marks_in_cache(save_marks_in_cache),
virt_column_names(virt_column_names),
max_block_size_marks(max_block_size_rows / storage.index_granularity)
{
}
@ -92,13 +93,13 @@ Block MergeTreeThreadBlockInputStream::readImpl()
if (!task && !getNewTask())
break;
res = readFromPart(task.get());
res = readFromPart();
if (res)
injectVirtualColumns(res, task.get());
injectVirtualColumns(res);
if (task->mark_ranges.empty())
task = {};
task.reset();
}
return res;
@ -162,7 +163,7 @@ bool MergeTreeThreadBlockInputStream::getNewTask()
}
Block MergeTreeBaseBlockInputStream::readFromPart(MergeTreeReadTask * task)
Block MergeTreeBaseBlockInputStream::readFromPart()
{
Block res;
@ -355,6 +356,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart(MergeTreeReadTask * task)
marks_to_read = std::min(marks_to_read, std::max(1UL, recommended_marks));
}
LOG_TRACE(log, "Will read " << marks_to_read << " marks");
reader->readRange(range.begin, range.begin + marks_to_read, res);
if (preferred_block_size_bytes)
@ -374,7 +376,8 @@ Block MergeTreeBaseBlockInputStream::readFromPart(MergeTreeReadTask * task)
reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
}
LOG_TRACE(log, "Read block with " << res.rows() << " rows");
LOG_TRACE(log, "task->ordered_names.size()=" << task->ordered_names.size());
LOG_TRACE(log, "Read block with " << res.rows() << " rows: " << res.dumpStructure());
if (preferred_block_size_bytes && bytes_exceeded)
{
@ -387,7 +390,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart(MergeTreeReadTask * task)
}
void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block, const MergeTreeReadTask * task)
void MergeTreeBaseBlockInputStream::injectVirtualColumns(Block & block)
{
const auto rows = block.rows();

View File

@ -32,13 +32,16 @@ struct MergeTreeBaseBlockInputStream : public IProfilingBlockInputStream
protected:
Block readFromPart(MergeTreeReadTask * task);
void injectVirtualColumns(Block & block, const MergeTreeReadTask * task);
/// Creates new this->task, and initilizes readers
virtual bool getNewTask() = 0;
/// We will call progressImpl manually.
void progress(const Progress & value) override {}
Block readFromPart();
void injectVirtualColumns(Block & block);
protected:
MergeTreeData & storage;
@ -55,6 +58,10 @@ protected:
bool use_uncompressed_cache;
bool save_marks_in_cache;
Names virt_column_names;
std::unique_ptr<MergeTreeReadTask> task;
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
@ -62,11 +69,8 @@ protected:
MergeTreeReaderPtr reader;
MergeTreeReaderPtr pre_reader;
size_t max_block_size_marks;
Names virt_column_names;
Logger * log;
size_t max_block_size_marks;
};
@ -100,13 +104,12 @@ protected:
private:
/// Requests read task from MergeTreeReadPool and signals whether it got one
bool getNewTask();
bool getNewTask() override;
/// "thread" index (there are N threads and each thread is assigned index in interval [0..N-1])
size_t thread;
std::shared_ptr<MergeTreeReadPool> pool;
std::shared_ptr<MergeTreeReadTask> task;
size_t min_marks_to_read;
};