Add special logic for alter

This commit is contained in:
alesapin 2018-11-29 14:55:34 +03:00
parent 8f3b7e063f
commit b2fd3e804a
4 changed files with 28 additions and 19 deletions

View File

@ -1224,7 +1224,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
if (expression)
{
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, expression->getRequiredColumns(), false);
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
auto compression_settings = this->context.chooseCompressionSettings(
part->bytes_on_disk,

View File

@ -607,7 +607,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
UInt64 watch_prev_elapsed = 0;
/// We count total amount of bytes in parts
/// and use direct_io + aio is there are more than setting
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
bool read_with_direct_io = false;
if (data.settings.min_merge_bytes_to_use_direct_io != 0)
{
@ -629,7 +629,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeSequentialBlockInputStream>(
data, part, merging_column_names, read_with_direct_io);
data, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(MergeProgressCallback(
merge_entry, sum_input_rows_upper_bound, column_sizes, watch_prev_elapsed, merge_alg));
@ -775,7 +775,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeSequentialBlockInputStream>(
data, parts[part_num], column_names, read_with_direct_io);
data, parts[part_num], column_names, read_with_direct_io, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
merge_entry, sum_input_rows_exact, column_sizes, column_name, watch_prev_elapsed));

View File

@ -13,6 +13,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
const MergeTreeData::DataPartPtr & data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
: storage(storage_)
, data_part(data_part_)
@ -29,17 +30,21 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
addTotalRowsApprox(data_part->rows_count);
header = storage.getSampleBlockForColumns(columns_to_read);
fixHeader(header);
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
auto column_names_with_helper_columns = columns_to_read;
/// take columns from data_part (header was fixed by fixHeader)
NamesAndTypesList columns_for_reader = header.getNamesAndTypesList();
if (take_column_types_from_storage)
{
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
auto column_names_with_helper_columns = columns_to_read;
injectRequiredColumns(storage, data_part, column_names_with_helper_columns);
columns_for_reader = physical_columns.addTypes(column_names_with_helper_columns);
}
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, column_names_with_helper_columns);
reader = std::make_unique<MergeTreeReader>(
data_part->getFullPath(), data_part, physical_columns.addTypes(column_names_with_helper_columns), /* uncompressed_cache = */ nullptr,
data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
mark_cache.get(), /* save_marks_in_cache = */ false, storage,
MarkRanges{MarkRange(0, data_part->marks_count)},
/* bytes to use AIO (this is hack) */
@ -79,19 +84,22 @@ try
bool continue_reading = (current_mark != 0);
size_t rows_readed = reader->readRows(current_mark, continue_reading, storage.index_granularity, res);
res.checkNumberOfRows();
if (res)
{
res.checkNumberOfRows();
current_row += rows_readed;
current_mark += (rows_readed / storage.index_granularity);
current_row += rows_readed;
current_mark += (rows_readed / storage.index_granularity);
bool should_reorder = false, should_evaluate_missing_defaults = false;
reader->fillMissingColumns(res, should_reorder, should_evaluate_missing_defaults, res.rows());
bool should_reorder = false, should_evaluate_missing_defaults = false;
reader->fillMissingColumns(res, should_reorder, should_evaluate_missing_defaults, res.rows());
if (res && should_evaluate_missing_defaults)
reader->evaluateMissingDefaults(res);
if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults(res);
if (res && should_reorder)
reader->reorderColumns(res, header.getNames(), nullptr);
if (should_reorder)
reader->reorderColumns(res, header.getNames(), nullptr);
}
}
else
{

View File

@ -17,6 +17,7 @@ public:
const MergeTreeData::DataPartPtr & data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet = false
);