Fix bugs after method split

This commit is contained in:
alesapin 2020-03-19 17:11:37 +03:00
parent ce8eb9293a
commit 204d0ac955
4 changed files with 22 additions and 35 deletions

View File

@ -946,15 +946,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const ReservationPtr & space_reservation,
TableStructureReadLockHolder & table_lock_holder)
{
auto check_not_cancelled = [&]()
{
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
};
check_not_cancelled();
checkOperationIsNotCanceled(merge_entry);
if (future_part.parts.size() != 1)
throw Exception("Trying to mutate " + toString(future_part.parts.size()) + " parts, not one. "
@ -1000,8 +992,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
LOG_DEBUG(log, "All columns:" << all_columns.toString());
if (!for_interpreter.empty())
{
interpreter.emplace(storage_from_source_part, for_interpreter, context_for_reading, true);
@ -1091,6 +1081,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
mutateSomePartColumns(
source_part,
indices_to_recalc,
updated_header,
new_data_part,
in,
time_of_mutation,
@ -1408,8 +1399,7 @@ std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalc(
if (!indices_to_recalc.empty() && input_stream)
{
auto indices_recalc_syntax =
SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, updated_columns);
auto indices_recalc_syntax = SyntaxAnalyzer(context).analyze(indices_recalc_expr_list, input_stream->getHeader().getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
@ -1453,14 +1443,6 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
auto check_not_cancelled = [&]()
{
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
};
if (data.hasPrimaryKey() || data.hasSkipIndices())
mutating_stream = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(mutating_stream, data.primary_key_and_skip_indices_expr));
@ -1479,7 +1461,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
out.writePrefix();
Block block;
while (check_not_cancelled() && (block = mutating_stream->read()))
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
{
minmax_idx.update(block, data.minmax_idx_columns);
out.write(block);
@ -1498,6 +1480,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
void MergeTreeDataMergerMutator::mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
@ -1505,24 +1488,17 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
MergeListEntry & merge_entry,
bool need_remove_expired_values) const
{
auto check_not_cancelled = [&]()
{
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
};
if (mutating_stream == nullptr)
throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR);
if (need_remove_expired_values)
mutating_stream = std::make_shared<TTLBlockInputStream>(mutating_stream, data, new_data_part, time_of_mutation, true);
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
new_data_part,
mutating_stream->getHeader(),
mutation_header,
/* sync = */ false,
compression_codec,
/* skip_offsets = */ false,
@ -1536,7 +1512,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
out.writePrefix();
Block block;
while (check_not_cancelled() && (block = mutating_stream->read()))
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
{
out.write(block);
@ -1590,4 +1566,13 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
}
bool MergeTreeDataMergerMutator::checkOperationIsNotCanceled(const MergeListEntry & merge_entry) const
{
if (merges_blocker.isCancelled() || merge_entry->is_cancelled)
throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED);
return true;
}
}

View File

@ -183,6 +183,7 @@ private:
void mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
@ -216,6 +217,9 @@ private:
const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const;
bool checkOperationIsNotCanceled(const MergeListEntry & merge_entry) const;
private:
MergeTreeData & data;
const size_t background_pool_size;

View File

@ -667,12 +667,10 @@ bool StorageMergeTree::tryMutatePart()
if (current_mutations_by_version.empty())
return false;
LOG_DEBUG(log, "Looking at parts");
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
{
LOG_DEBUG(log, "Iterating parts");
if (currently_merging_mutating_parts.count(part))
continue;

View File

@ -19,7 +19,7 @@ function thread1()
function thread2()
{
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done
while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String '0'; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done
}
function thread3()