fix 'ALTER CLEAR INDEX/COLUMN' queries with compact parts

This commit is contained in:
Anton Popov 2020-04-10 16:36:51 +03:00
parent 76cf9b7f03
commit d018977f4b
8 changed files with 38 additions and 14 deletions

View File

@ -102,8 +102,7 @@ public:
written_offset_columns = written_offset_columns_;
}
using SkipIndices = std::vector<MergeTreeIndexPtr>;
const SkipIndices & getSkipIndices() { return skip_indices; }
const MergeTreeIndices & getSkipIndices() { return skip_indices; }
void initSkipIndices();
void initPrimaryIndex();
@ -126,7 +125,7 @@ protected:
CompressionCodecPtr default_codec;
std::vector<MergeTreeIndexPtr> skip_indices;
MergeTreeIndices skip_indices;
MergeTreeWriterSettings settings;

View File

@ -433,11 +433,6 @@ public:
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
std::vector<MergeTreeIndexPtr> getSkipIndices() const
{
return std::vector<MergeTreeIndexPtr>(std::begin(skip_indices), std::end(skip_indices));
}
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;

View File

@ -773,6 +773,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergedBlockOutputStream to{
new_data_part,
merging_columns,
data.skip_indices,
compression_codec,
merged_column_to_size,
data_settings->min_merge_bytes_to_use_direct_io,
@ -991,7 +992,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
@ -1043,8 +1043,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// All columns from part are changed and may be some more that were missing before in part
if (isCompactPart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList()))
{
auto part_indices = getIndicesForNewDataPart(data.skip_indices, for_file_renames);
mutateAllPartColumns(
new_data_part,
part_indices,
in,
time_of_mutation,
compression_codec,
@ -1260,6 +1262,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN)
{
removed_columns_from_compact_part.emplace(command.column_name);
for_file_renames.push_back(command);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
@ -1439,6 +1442,22 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
return all_columns;
}
MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(
const MergeTreeIndices & all_indices,
const MutationCommands & commands_for_removes)
{
NameSet removed_indices;
for (const auto & command : commands_for_removes)
if (command.type == MutationCommand::DROP_INDEX)
removed_indices.insert(command.column_name);
MergeTreeIndices new_indices;
for (const auto & index : all_indices)
if (!removed_indices.count(index->name))
new_indices.push_back(index);
return new_indices;
}
std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
BlockInputStreamPtr & input_stream,
@ -1503,6 +1522,7 @@ bool MergeTreeDataMergerMutator::shouldExecuteTTL(const Names & columns, const M
void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & compression_codec,
@ -1524,6 +1544,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergedBlockOutputStream out{
new_data_part,
new_data_part->getColumns(),
skip_indices,
compression_codec};
mutating_stream->readPrefix();
@ -1560,7 +1581,6 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);

View File

@ -160,6 +160,11 @@ private:
NamesAndTypesList all_columns,
const MutationCommands & commands_for_removes);
/// Get skip indcies, that should exists in the resulting data part.
static MergeTreeIndices getIndicesForNewDataPart(
const MergeTreeIndices & all_indices,
const MutationCommands & commands_for_removes);
bool shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const;
/// Return set of indices which should be recalculated during mutation also
@ -173,6 +178,7 @@ private:
/// Override all columns of new part using mutating_stream
void mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & codec,

View File

@ -294,7 +294,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
MergedBlockOutputStream out(new_data_part, columns, compression_codec);
MergedBlockOutputStream out(new_data_part, columns, data.skip_indices, compression_codec);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);

View File

@ -125,7 +125,7 @@ public:
size_t granularity;
};
using MergeTreeIndices = std::vector<MutableMergeTreeIndexPtr>;
using MergeTreeIndices = std::vector<MergeTreeIndexPtr>;
class MergeTreeIndexFactory : private boost::noncopyable

View File

@ -15,10 +15,11 @@ namespace ErrorCodes
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
bool blocks_are_granules_size)
: MergedBlockOutputStream(
data_part, columns_list_, default_codec, {},
data_part, columns_list_, skip_indices, default_codec, {},
data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size)
{
@ -27,6 +28,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,
@ -49,7 +51,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
disk->createDirectories(part_path);
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
writer = data_part->getWriter(columns_list, skip_indices, default_codec, writer_settings);
writer->initPrimaryIndex();
writer->initSkipIndices();
}

View File

@ -16,12 +16,14 @@ public:
MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
bool blocks_are_granules_size = false);
MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,