Merge pull request #10130 from CurtizJ/polymorphic-parts

Fix some kinds of alters with compact parts
This commit is contained in:
alexey-milovidov 2020-04-11 23:02:48 +03:00 committed by GitHub
commit cd979d7997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 135 additions and 71 deletions

View File

@ -102,8 +102,7 @@ public:
written_offset_columns = written_offset_columns_; written_offset_columns = written_offset_columns_;
} }
using SkipIndices = std::vector<MergeTreeIndexPtr>; const MergeTreeIndices & getSkipIndices() { return skip_indices; }
const SkipIndices & getSkipIndices() { return skip_indices; }
void initSkipIndices(); void initSkipIndices();
void initPrimaryIndex(); void initPrimaryIndex();
@ -126,7 +125,7 @@ protected:
CompressionCodecPtr default_codec; CompressionCodecPtr default_codec;
std::vector<MergeTreeIndexPtr> skip_indices; MergeTreeIndices skip_indices;
MergeTreeWriterSettings settings; MergeTreeWriterSettings settings;

View File

@ -32,6 +32,8 @@ IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_
, all_mark_ranges(all_mark_ranges_) , all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part)) , alter_conversions(storage.getAlterConversionsForPart(data_part))
{ {
for (const NameAndTypePair & column_from_part : data_part->getColumns())
columns_from_part[column_from_part.name] = column_from_part.type;
} }
IMergeTreeReader::~IMergeTreeReader() = default; IMergeTreeReader::~IMergeTreeReader() = default;
@ -183,6 +185,23 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
} }
} }
NameAndTypePair IMergeTreeReader::getColumnFromPart(const NameAndTypePair & required_column) const
{
auto it = columns_from_part.find(required_column.name);
if (it != columns_from_part.end())
return {it->first, it->second};
if (alter_conversions.isColumnRenamed(required_column.name))
{
String old_name = alter_conversions.getColumnOldName(required_column.name);
it = columns_from_part.find(old_name);
if (it != columns_from_part.end())
return {it->first, it->second};
}
return required_column;
}
void IMergeTreeReader::performRequiredConversions(Columns & res_columns) void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
{ {
try try
@ -209,10 +228,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns)
if (res_columns[pos] == nullptr) if (res_columns[pos] == nullptr)
continue; continue;
if (columns_from_part.count(name_and_type->name)) copy_block.insert({res_columns[pos], getColumnFromPart(*name_and_type).type, name_and_type->name});
copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name});
else
copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
} }
DB::performRequiredConversions(copy_block, columns, storage.global_context); DB::performRequiredConversions(copy_block, columns, storage.global_context);

View File

@ -4,7 +4,6 @@
#include <Storages/MergeTree/MergeTreeReaderStream.h> #include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
namespace DB namespace DB
{ {
@ -59,6 +58,9 @@ public:
MergeTreeData::DataPartPtr data_part; MergeTreeData::DataPartPtr data_part;
protected: protected:
/// Returns actual column type in part, which can differ from table metadata.
NameAndTypePair getColumnFromPart(const NameAndTypePair & required_column) const;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size. /// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints; ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk /// Stores states for IDataType::deserializeBinaryBulk
@ -67,8 +69,6 @@ protected:
/// Columns that are read. /// Columns that are read.
NamesAndTypesList columns; NamesAndTypesList columns;
std::unordered_map<String, DataTypePtr> columns_from_part;
UncompressedCache * uncompressed_cache; UncompressedCache * uncompressed_cache;
MarkCache * mark_cache; MarkCache * mark_cache;
@ -78,8 +78,13 @@ protected:
MarkRanges all_mark_ranges; MarkRanges all_mark_ranges;
friend class MergeTreeRangeReader::DelayedStream; friend class MergeTreeRangeReader::DelayedStream;
private:
/// Alter conversions, which must be applied on fly if required /// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions; MergeTreeData::AlterConversions alter_conversions;
/// Actual data type of columns in part
std::unordered_map<String, DataTypePtr> columns_from_part;
}; };
} }

View File

@ -433,11 +433,6 @@ public:
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
std::vector<MergeTreeIndexPtr> getSkipIndices() const
{
return std::vector<MergeTreeIndexPtr>(std::begin(skip_indices), std::end(skip_indices));
}
/// Total size of active parts in bytes. /// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const; size_t getTotalActiveSizeInBytes() const;

View File

@ -773,6 +773,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergedBlockOutputStream to{ MergedBlockOutputStream to{
new_data_part, new_data_part,
merging_columns, merging_columns,
data.skip_indices,
compression_codec, compression_codec,
merged_column_to_size, merged_column_to_size,
data_settings->min_merge_bytes_to_use_direct_io, data_settings->min_merge_bytes_to_use_direct_io,
@ -991,7 +992,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames); splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
UInt64 watch_prev_elapsed = 0; UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0); MergeStageProgress stage_progress(1.0);
@ -1043,8 +1043,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// All columns from part are changed and may be some more that were missing before in part /// All columns from part are changed and may be some more that were missing before in part
if (isCompactPart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList())) if (isCompactPart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList()))
{ {
auto part_indices = getIndicesForNewDataPart(data.skip_indices, for_file_renames);
mutateAllPartColumns( mutateAllPartColumns(
new_data_part, new_data_part,
part_indices,
in, in,
time_of_mutation, time_of_mutation,
compression_codec, compression_codec,
@ -1260,6 +1262,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN) else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN)
{ {
removed_columns_from_compact_part.emplace(command.column_name); removed_columns_from_compact_part.emplace(command.column_name);
for_file_renames.push_back(command);
} }
else if (command.type == MutationCommand::Type::RENAME_COLUMN) else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{ {
@ -1439,6 +1442,22 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
return all_columns; return all_columns;
} }
MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(
const MergeTreeIndices & all_indices,
const MutationCommands & commands_for_removes)
{
NameSet removed_indices;
for (const auto & command : commands_for_removes)
if (command.type == MutationCommand::DROP_INDEX)
removed_indices.insert(command.column_name);
MergeTreeIndices new_indices;
for (const auto & index : all_indices)
if (!removed_indices.count(index->name))
new_indices.push_back(index);
return new_indices;
}
std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate( std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
BlockInputStreamPtr & input_stream, BlockInputStreamPtr & input_stream,
@ -1503,6 +1522,7 @@ bool MergeTreeDataMergerMutator::shouldExecuteTTL(const Names & columns, const M
void MergeTreeDataMergerMutator::mutateAllPartColumns( void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part, MergeTreeData::MutableDataPartPtr new_data_part,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream, BlockInputStreamPtr mutating_stream,
time_t time_of_mutation, time_t time_of_mutation,
const CompressionCodecPtr & compression_codec, const CompressionCodecPtr & compression_codec,
@ -1524,6 +1544,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergedBlockOutputStream out{ MergedBlockOutputStream out{
new_data_part, new_data_part,
new_data_part->getColumns(), new_data_part->getColumns(),
skip_indices,
compression_codec}; compression_codec};
mutating_stream->readPrefix(); mutating_stream->readPrefix();
@ -1560,7 +1581,6 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
if (mutating_stream == nullptr) if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (need_remove_expired_values) if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true); mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);

View File

@ -160,6 +160,11 @@ private:
NamesAndTypesList all_columns, NamesAndTypesList all_columns,
const MutationCommands & commands_for_removes); const MutationCommands & commands_for_removes);
/// Get skip indcies, that should exists in the resulting data part.
static MergeTreeIndices getIndicesForNewDataPart(
const MergeTreeIndices & all_indices,
const MutationCommands & commands_for_removes);
bool shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const; bool shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const;
/// Return set of indices which should be recalculated during mutation also /// Return set of indices which should be recalculated during mutation also
@ -173,6 +178,7 @@ private:
/// Override all columns of new part using mutating_stream /// Override all columns of new part using mutating_stream
void mutateAllPartColumns( void mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part, MergeTreeData::MutableDataPartPtr new_data_part,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream, BlockInputStreamPtr mutating_stream,
time_t time_of_mutation, time_t time_of_mutation,
const CompressionCodecPtr & codec, const CompressionCodecPtr & codec,

View File

@ -294,7 +294,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// either default lz4 or compression method with zero thresholds on absolute and relative part size. /// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0); auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
MergedBlockOutputStream out(new_data_part, columns, compression_codec); MergedBlockOutputStream out(new_data_part, columns, data.skip_indices, compression_codec);
out.writePrefix(); out.writePrefix();
out.writeWithPermutation(block, perm_ptr); out.writeWithPermutation(block, perm_ptr);

View File

@ -125,7 +125,7 @@ public:
size_t granularity; size_t granularity;
}; };
using MergeTreeIndices = std::vector<MutableMergeTreeIndexPtr>; using MergeTreeIndices = std::vector<MergeTreeIndexPtr>;
class MergeTreeIndexFactory : private boost::noncopyable class MergeTreeIndexFactory : private boost::noncopyable

View File

@ -78,15 +78,9 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++name_and_type) for (size_t i = 0; i < columns_num; ++i, ++name_and_type)
{ {
const auto & [name, type] = *name_and_type; const auto & [name, type] = getColumnFromPart(*name_and_type);
auto position = data_part->getColumnPosition(name); auto position = data_part->getColumnPosition(name);
if (!position && alter_conversions.isColumnRenamed(name))
{
String old_name = alter_conversions.getColumnOldName(name);
position = data_part->getColumnPosition(old_name);
}
if (!position && typeid_cast<const DataTypeArray *>(type.get())) if (!position && typeid_cast<const DataTypeArray *>(type.get()))
{ {
/// If array of Nested column is missing in part, /// If array of Nested column is missing in part,
@ -118,7 +112,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
bool append = res_columns[i] != nullptr; bool append = res_columns[i] != nullptr;
if (!append) if (!append)
res_columns[i] = column_it->type->createColumn(); res_columns[i] = getColumnFromPart(*column_it).type->createColumn();
mutable_columns[i] = res_columns[i]->assumeMutable(); mutable_columns[i] = res_columns[i]->assumeMutable();
} }
@ -132,15 +126,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
if (!res_columns[pos]) if (!res_columns[pos])
continue; continue;
auto [name, type] = *name_and_type; auto [name, type] = getColumnFromPart(*name_and_type);
if (alter_conversions.isColumnRenamed(name))
{
String old_name = alter_conversions.getColumnOldName(name);
if (!data_part->getColumnPosition(name) && data_part->getColumnPosition(old_name))
name = old_name;
}
auto & column = mutable_columns[pos]; auto & column = mutable_columns[pos];
try try

View File

@ -41,28 +41,10 @@ MergeTreeReaderWide::MergeTreeReaderWide(
{ {
try try
{ {
for (const NameAndTypePair & column_from_part : data_part->getColumns())
columns_from_part[column_from_part.name] = column_from_part.type;
for (const NameAndTypePair & column : columns) for (const NameAndTypePair & column : columns)
{ {
if (columns_from_part.count(column.name)) auto column_from_part = getColumnFromPart(column);
{ addStreams(column_from_part.name, *column_from_part.type, profile_callback_, clock_type_);
addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_);
}
else
{
if (alter_conversions.isColumnRenamed(column.name))
{
String old_name = alter_conversions.getColumnOldName(column.name);
if (columns_from_part.count(old_name))
addStreams(old_name, *columns_from_part[old_name], profile_callback_, clock_type_);
}
else
{
addStreams(column.name, *column.type, profile_callback_, clock_type_);
}
}
} }
} }
catch (...) catch (...)
@ -93,19 +75,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
auto name_and_type = columns.begin(); auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{ {
String name = name_and_type->name; auto [name, type] = getColumnFromPart(*name_and_type);
if (alter_conversions.isColumnRenamed(name))
{
String original_name = alter_conversions.getColumnOldName(name);
if (!columns_from_part.count(name) && columns_from_part.count(original_name))
name = original_name;
}
DataTypePtr type;
if (columns_from_part.count(name))
type = columns_from_part[name];
else
type = name_and_type->type;
/// The column is already present in the block so we will append the values to the end. /// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr; bool append = res_columns[pos] != nullptr;

View File

@ -15,10 +15,11 @@ namespace ErrorCodes
MergedBlockOutputStream::MergedBlockOutputStream( MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part, const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec, CompressionCodecPtr default_codec,
bool blocks_are_granules_size) bool blocks_are_granules_size)
: MergedBlockOutputStream( : MergedBlockOutputStream(
data_part, columns_list_, default_codec, {}, data_part, columns_list_, skip_indices, default_codec, {},
data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io, data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size) blocks_are_granules_size)
{ {
@ -27,6 +28,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergedBlockOutputStream::MergedBlockOutputStream( MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part, const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec, CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold, size_t aio_threshold,
@ -49,7 +51,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
disk->createDirectories(part_path); disk->createDirectories(part_path);
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings); writer = data_part->getWriter(columns_list, skip_indices, default_codec, writer_settings);
writer->initPrimaryIndex(); writer->initPrimaryIndex();
writer->initSkipIndices(); writer->initSkipIndices();
} }

View File

@ -16,12 +16,14 @@ public:
MergedBlockOutputStream( MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part, const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec, CompressionCodecPtr default_codec,
bool blocks_are_granules_size = false); bool blocks_are_granules_size = false);
MergedBlockOutputStream( MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part, const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_, const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec, CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold, size_t aio_threshold,

View File

@ -0,0 +1 @@
999000

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS mt_compact;
CREATE TABLE mt_compact (d Date, id UInt32, s String)
ENGINE = MergeTree ORDER BY id PARTITION BY d
SETTINGS min_bytes_for_wide_part = 10000000, index_granularity = 128;
INSERT INTO mt_compact SELECT toDate('2020-01-05'), number, toString(number) FROM numbers(1000);
INSERT INTO mt_compact SELECT toDate('2020-01-06'), number, toString(number) FROM numbers(1000);
ALTER TABLE mt_compact MODIFY COLUMN s UInt64;
SELECT sum(s) from mt_compact;
DROP TABLE IF EXISTS mt_compact;

View File

@ -0,0 +1,4 @@
1 0
2 3
1 0
2 0

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS clear_column;
CREATE TABLE clear_column(x UInt32, y UInt32) ENGINE MergeTree ORDER BY x PARTITION by x;
INSERT INTO clear_column VALUES (1, 1), (2, 3);
ALTER TABLE clear_column CLEAR COLUMN y IN PARTITION 1;
SELECT * FROM clear_column ORDER BY x;
ALTER TABLE clear_column CLEAR COLUMN y IN PARTITION 2;
SELECT * FROM clear_column ORDER BY x;
DROP TABLE clear_column;

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS minmax_compact;
CREATE TABLE minmax_compact
(
u64 UInt64,
i64 Int64,
i32 Int32
) ENGINE = MergeTree()
PARTITION BY i32
ORDER BY u64
SETTINGS index_granularity = 2, min_rows_for_wide_part = 1000000;
INSERT INTO minmax_compact VALUES (0, 2, 1), (1, 1, 1), (2, 1, 1), (3, 1, 1), (4, 1, 1), (5, 2, 1), (6, 1, 2), (7, 1, 2), (8, 1, 2), (9, 1, 2);
SET mutations_sync = 1;
ALTER TABLE minmax_compact ADD INDEX idx (i64, u64 * i64) TYPE minmax GRANULARITY 1;
ALTER TABLE minmax_compact MATERIALIZE INDEX idx IN PARTITION 1;
set max_rows_to_read = 8;
SELECT count() FROM minmax_compact WHERE i64 = 2;
ALTER TABLE minmax_compact MATERIALIZE INDEX idx IN PARTITION 2;
set max_rows_to_read = 6;
SELECT count() FROM minmax_compact WHERE i64 = 2;
ALTER TABLE minmax_compact CLEAR INDEX idx IN PARTITION 1;
ALTER TABLE minmax_compact CLEAR INDEX idx IN PARTITION 2;
SELECT count() FROM minmax_compact WHERE i64 = 2; -- { serverError 158 }
set max_rows_to_read = 10;
SELECT count() FROM minmax_compact WHERE i64 = 2;