mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Merge pull request #25743 from ClickHouse/fix_aggregation_ttl
Fix bug in execution of TTL GROUP BY
This commit is contained in:
commit
4c85dae572
@ -37,88 +37,115 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
|
|||||||
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression);
|
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression);
|
||||||
|
|
||||||
aggregator = std::make_unique<Aggregator>(params);
|
aggregator = std::make_unique<Aggregator>(params);
|
||||||
|
|
||||||
|
if (isMaxTTLExpired())
|
||||||
|
new_ttl_info.finished = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLAggregationAlgorithm::execute(Block & block)
|
void TTLAggregationAlgorithm::execute(Block & block)
|
||||||
{
|
{
|
||||||
if (!block)
|
|
||||||
{
|
|
||||||
if (!aggregation_result.empty())
|
|
||||||
{
|
|
||||||
MutableColumns result_columns = header.cloneEmptyColumns();
|
|
||||||
finalizeAggregates(result_columns);
|
|
||||||
block = header.cloneWithColumns(std::move(result_columns));
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
bool some_rows_were_aggregated = false;
|
||||||
}
|
|
||||||
|
|
||||||
const auto & column_names = header.getNames();
|
|
||||||
MutableColumns result_columns = header.cloneEmptyColumns();
|
MutableColumns result_columns = header.cloneEmptyColumns();
|
||||||
MutableColumns aggregate_columns = header.cloneEmptyColumns();
|
|
||||||
|
|
||||||
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
if (!block) /// Empty block -- no more data, but we may still have some accumulated rows
|
||||||
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
|
||||||
|
|
||||||
size_t rows_aggregated = 0;
|
|
||||||
size_t current_key_start = 0;
|
|
||||||
size_t rows_with_current_key = 0;
|
|
||||||
|
|
||||||
for (size_t i = 0; i < block.rows(); ++i)
|
|
||||||
{
|
{
|
||||||
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
|
if (!aggregation_result.empty()) /// Still have some aggregated data, let's update TTL
|
||||||
bool where_filter_passed = !where_column || where_column->getBool(i);
|
|
||||||
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
|
|
||||||
|
|
||||||
bool same_as_current = true;
|
|
||||||
for (size_t j = 0; j < description.group_by_keys.size(); ++j)
|
|
||||||
{
|
{
|
||||||
const String & key_column = description.group_by_keys[j];
|
|
||||||
const IColumn * values_column = block.getByName(key_column).column.get();
|
|
||||||
if (!same_as_current || (*values_column)[i] != current_key_value[j])
|
|
||||||
{
|
|
||||||
values_column->get(i, current_key_value[j]);
|
|
||||||
same_as_current = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!same_as_current)
|
|
||||||
{
|
|
||||||
if (rows_with_current_key)
|
|
||||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
|
||||||
finalizeAggregates(result_columns);
|
finalizeAggregates(result_columns);
|
||||||
|
some_rows_were_aggregated = true;
|
||||||
current_key_start = rows_aggregated;
|
|
||||||
rows_with_current_key = 0;
|
|
||||||
}
|
}
|
||||||
|
else /// No block, all aggregated, just finish
|
||||||
if (ttl_expired)
|
|
||||||
{
|
{
|
||||||
++rows_with_current_key;
|
return;
|
||||||
++rows_aggregated;
|
|
||||||
for (const auto & name : column_names)
|
|
||||||
{
|
|
||||||
const IColumn * values_column = block.getByName(name).column.get();
|
|
||||||
auto & column = aggregate_columns[header.getPositionByName(name)];
|
|
||||||
column->insertFrom(*values_column, i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
new_ttl_info.update(cur_ttl);
|
|
||||||
for (const auto & name : column_names)
|
|
||||||
{
|
|
||||||
const IColumn * values_column = block.getByName(name).column.get();
|
|
||||||
auto & column = result_columns[header.getPositionByName(name)];
|
|
||||||
column->insertFrom(*values_column, i);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
const auto & column_names = header.getNames();
|
||||||
|
MutableColumns aggregate_columns = header.cloneEmptyColumns();
|
||||||
|
|
||||||
if (rows_with_current_key)
|
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
||||||
|
|
||||||
|
size_t rows_aggregated = 0;
|
||||||
|
size_t current_key_start = 0;
|
||||||
|
size_t rows_with_current_key = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < block.rows(); ++i)
|
||||||
|
{
|
||||||
|
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
|
||||||
|
bool where_filter_passed = !where_column || where_column->getBool(i);
|
||||||
|
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
|
||||||
|
|
||||||
|
bool same_as_current = true;
|
||||||
|
for (size_t j = 0; j < description.group_by_keys.size(); ++j)
|
||||||
|
{
|
||||||
|
const String & key_column = description.group_by_keys[j];
|
||||||
|
const IColumn * values_column = block.getByName(key_column).column.get();
|
||||||
|
if (!same_as_current || (*values_column)[i] != current_key_value[j])
|
||||||
|
{
|
||||||
|
values_column->get(i, current_key_value[j]);
|
||||||
|
same_as_current = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!same_as_current)
|
||||||
|
{
|
||||||
|
if (rows_with_current_key)
|
||||||
|
{
|
||||||
|
some_rows_were_aggregated = true;
|
||||||
|
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||||
|
}
|
||||||
|
finalizeAggregates(result_columns);
|
||||||
|
|
||||||
|
current_key_start = rows_aggregated;
|
||||||
|
rows_with_current_key = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ttl_expired)
|
||||||
|
{
|
||||||
|
++rows_with_current_key;
|
||||||
|
++rows_aggregated;
|
||||||
|
for (const auto & name : column_names)
|
||||||
|
{
|
||||||
|
const IColumn * values_column = block.getByName(name).column.get();
|
||||||
|
auto & column = aggregate_columns[header.getPositionByName(name)];
|
||||||
|
column->insertFrom(*values_column, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (const auto & name : column_names)
|
||||||
|
{
|
||||||
|
const IColumn * values_column = block.getByName(name).column.get();
|
||||||
|
auto & column = result_columns[header.getPositionByName(name)];
|
||||||
|
column->insertFrom(*values_column, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rows_with_current_key)
|
||||||
|
{
|
||||||
|
some_rows_were_aggregated = true;
|
||||||
|
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
block = header.cloneWithColumns(std::move(result_columns));
|
block = header.cloneWithColumns(std::move(result_columns));
|
||||||
|
|
||||||
|
/// If some rows were aggregated we have to recalculate ttl info's
|
||||||
|
if (some_rows_were_aggregated)
|
||||||
|
{
|
||||||
|
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column);
|
||||||
|
auto where_column_after_aggregation = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
|
||||||
|
for (size_t i = 0; i < block.rows(); ++i)
|
||||||
|
{
|
||||||
|
bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i);
|
||||||
|
if (where_filter_passed)
|
||||||
|
new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
|
void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
|
||||||
@ -134,6 +161,7 @@ void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggrega
|
|||||||
|
|
||||||
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
|
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
|
||||||
columns_for_aggregator, no_more_keys);
|
columns_for_aggregator, no_more_keys);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
|
void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
|
||||||
@ -141,6 +169,7 @@ void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns
|
|||||||
if (!aggregation_result.empty())
|
if (!aggregation_result.empty())
|
||||||
{
|
{
|
||||||
auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
|
auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
|
||||||
|
|
||||||
for (auto & agg_block : aggregated_res)
|
for (auto & agg_block : aggregated_res)
|
||||||
{
|
{
|
||||||
for (const auto & it : description.set_parts)
|
for (const auto & it : description.set_parts)
|
||||||
|
@ -21,6 +21,9 @@ TTLColumnAlgorithm::TTLColumnAlgorithm(
|
|||||||
new_ttl_info = old_ttl_info;
|
new_ttl_info = old_ttl_info;
|
||||||
is_fully_empty = false;
|
is_fully_empty = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isMaxTTLExpired())
|
||||||
|
new_ttl_info.finished = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLColumnAlgorithm::execute(Block & block)
|
void TTLColumnAlgorithm::execute(Block & block)
|
||||||
|
@ -9,6 +9,9 @@ TTLDeleteAlgorithm::TTLDeleteAlgorithm(
|
|||||||
{
|
{
|
||||||
if (!isMinTTLExpired())
|
if (!isMinTTLExpired())
|
||||||
new_ttl_info = old_ttl_info;
|
new_ttl_info = old_ttl_info;
|
||||||
|
|
||||||
|
if (isMaxTTLExpired())
|
||||||
|
new_ttl_info.finished = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLDeleteAlgorithm::execute(Block & block)
|
void TTLDeleteAlgorithm::execute(Block & block)
|
||||||
|
@ -752,13 +752,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
bool force_ttl = false;
|
bool force_ttl = false;
|
||||||
for (const auto & part : parts)
|
for (const auto & part : parts)
|
||||||
{
|
{
|
||||||
new_data_part->ttl_infos.update(part->ttl_infos);
|
|
||||||
if (metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(metadata_snapshot))
|
if (metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(metadata_snapshot))
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name);
|
LOG_INFO(log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name);
|
||||||
need_remove_expired_values = true;
|
need_remove_expired_values = true;
|
||||||
force_ttl = true;
|
force_ttl = true;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
new_data_part->ttl_infos.update(part->ttl_infos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto & part_min_ttl = new_data_part->ttl_infos.part_min_ttl;
|
const auto & part_min_ttl = new_data_part->ttl_infos.part_min_ttl;
|
||||||
@ -939,7 +942,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns);
|
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, deduplicate_by_columns);
|
||||||
|
|
||||||
if (need_remove_expired_values)
|
if (need_remove_expired_values)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Outdated rows found in source parts, TTLs processing enabled for merge");
|
||||||
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);
|
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);
|
||||||
|
}
|
||||||
|
|
||||||
if (metadata_snapshot->hasSecondaryIndices())
|
if (metadata_snapshot->hasSecondaryIndices())
|
||||||
{
|
{
|
||||||
|
@ -55,6 +55,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
|||||||
MergeTreeDataPartTTLInfo ttl_info;
|
MergeTreeDataPartTTLInfo ttl_info;
|
||||||
ttl_info.min = col["min"].getUInt();
|
ttl_info.min = col["min"].getUInt();
|
||||||
ttl_info.max = col["max"].getUInt();
|
ttl_info.max = col["max"].getUInt();
|
||||||
|
|
||||||
|
if (col.has("finished"))
|
||||||
|
ttl_info.finished = col["finished"].getUInt();
|
||||||
|
|
||||||
String name = col["name"].getString();
|
String name = col["name"].getString();
|
||||||
columns_ttl.emplace(name, ttl_info);
|
columns_ttl.emplace(name, ttl_info);
|
||||||
|
|
||||||
@ -67,6 +71,9 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
|||||||
table_ttl.min = table["min"].getUInt();
|
table_ttl.min = table["min"].getUInt();
|
||||||
table_ttl.max = table["max"].getUInt();
|
table_ttl.max = table["max"].getUInt();
|
||||||
|
|
||||||
|
if (table.has("finished"))
|
||||||
|
table_ttl.finished = table["finished"].getUInt();
|
||||||
|
|
||||||
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
|
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,6 +84,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
|||||||
MergeTreeDataPartTTLInfo ttl_info;
|
MergeTreeDataPartTTLInfo ttl_info;
|
||||||
ttl_info.min = elem["min"].getUInt();
|
ttl_info.min = elem["min"].getUInt();
|
||||||
ttl_info.max = elem["max"].getUInt();
|
ttl_info.max = elem["max"].getUInt();
|
||||||
|
|
||||||
|
if (elem.has("finished"))
|
||||||
|
ttl_info.finished = elem["finished"].getUInt();
|
||||||
|
|
||||||
String expression = elem["expression"].getString();
|
String expression = elem["expression"].getString();
|
||||||
ttl_info_map.emplace(expression, ttl_info);
|
ttl_info_map.emplace(expression, ttl_info);
|
||||||
|
|
||||||
@ -126,6 +137,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
|
|||||||
writeIntText(it->second.min, out);
|
writeIntText(it->second.min, out);
|
||||||
writeString(",\"max\":", out);
|
writeString(",\"max\":", out);
|
||||||
writeIntText(it->second.max, out);
|
writeIntText(it->second.max, out);
|
||||||
|
writeString(R"(,"finished":)", out);
|
||||||
|
writeIntText(static_cast<uint8_t>(it->second.finished), out);
|
||||||
writeString("}", out);
|
writeString("}", out);
|
||||||
}
|
}
|
||||||
writeString("]", out);
|
writeString("]", out);
|
||||||
@ -138,6 +151,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
|
|||||||
writeIntText(table_ttl.min, out);
|
writeIntText(table_ttl.min, out);
|
||||||
writeString(R"(,"max":)", out);
|
writeString(R"(,"max":)", out);
|
||||||
writeIntText(table_ttl.max, out);
|
writeIntText(table_ttl.max, out);
|
||||||
|
writeString(R"(,"finished":)", out);
|
||||||
|
writeIntText(static_cast<uint8_t>(table_ttl.finished), out);
|
||||||
writeString("}", out);
|
writeString("}", out);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,6 +174,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
|
|||||||
writeIntText(it->second.min, out);
|
writeIntText(it->second.min, out);
|
||||||
writeString(R"(,"max":)", out);
|
writeString(R"(,"max":)", out);
|
||||||
writeIntText(it->second.max, out);
|
writeIntText(it->second.max, out);
|
||||||
|
writeString(R"(,"finished":)", out);
|
||||||
|
writeIntText(static_cast<uint8_t>(it->second.finished), out);
|
||||||
writeString("}", out);
|
writeString("}", out);
|
||||||
}
|
}
|
||||||
writeString("]", out);
|
writeString("]", out);
|
||||||
@ -202,6 +219,39 @@ time_t MergeTreeDataPartTTLInfos::getMinimalMaxRecompressionTTL() const
|
|||||||
return max;
|
return max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool MergeTreeDataPartTTLInfos::hasAnyNonFinishedTTLs() const
|
||||||
|
{
|
||||||
|
auto has_non_finished_ttl = [] (const TTLInfoMap & map) -> bool
|
||||||
|
{
|
||||||
|
for (const auto & [name, info] : map)
|
||||||
|
{
|
||||||
|
if (!info.finished)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!table_ttl.finished)
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (has_non_finished_ttl(columns_ttl))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (has_non_finished_ttl(rows_where_ttl))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (has_non_finished_ttl(moves_ttl))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (has_non_finished_ttl(recompression_ttl))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (has_non_finished_ttl(group_by_ttl))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max)
|
std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max)
|
||||||
{
|
{
|
||||||
time_t best_ttl_time = 0;
|
time_t best_ttl_time = 0;
|
||||||
@ -232,4 +282,5 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
|
|||||||
return best_ttl_time ? *best_entry_it : std::optional<TTLDescription>();
|
return best_ttl_time ? *best_entry_it : std::optional<TTLDescription>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,11 @@ struct MergeTreeDataPartTTLInfo
|
|||||||
time_t min = 0;
|
time_t min = 0;
|
||||||
time_t max = 0;
|
time_t max = 0;
|
||||||
|
|
||||||
|
/// This TTL was computed on completely expired part. It doesn't make sense
|
||||||
|
/// to select such parts for TTL again. But make sense to recalcuate TTL
|
||||||
|
/// again for merge with multiple parts.
|
||||||
|
bool finished = false;
|
||||||
|
|
||||||
void update(time_t time)
|
void update(time_t time)
|
||||||
{
|
{
|
||||||
if (time && (!min || time < min))
|
if (time && (!min || time < min))
|
||||||
@ -28,6 +33,7 @@ struct MergeTreeDataPartTTLInfo
|
|||||||
min = other_info.min;
|
min = other_info.min;
|
||||||
|
|
||||||
max = std::max(other_info.max, max);
|
max = std::max(other_info.max, max);
|
||||||
|
finished &= other_info.finished;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -60,6 +66,9 @@ struct MergeTreeDataPartTTLInfos
|
|||||||
void write(WriteBuffer & out) const;
|
void write(WriteBuffer & out) const;
|
||||||
void update(const MergeTreeDataPartTTLInfos & other_infos);
|
void update(const MergeTreeDataPartTTLInfos & other_infos);
|
||||||
|
|
||||||
|
/// Has any TTLs which are not calculated on completely expired parts.
|
||||||
|
bool hasAnyNonFinishedTTLs() const;
|
||||||
|
|
||||||
void updatePartMinMaxTTL(time_t time_min, time_t time_max)
|
void updatePartMinMaxTTL(time_t time_min, time_t time_max)
|
||||||
{
|
{
|
||||||
if (time_min && (!part_min_ttl || time_min < part_min_ttl))
|
if (time_min && (!part_min_ttl || time_min < part_min_ttl))
|
||||||
|
@ -111,6 +111,10 @@ bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part &
|
|||||||
if (only_drop_parts)
|
if (only_drop_parts)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
/// All TTL satisfied
|
||||||
|
if (!part.ttl_infos->hasAnyNonFinishedTTLs())
|
||||||
|
return true;
|
||||||
|
|
||||||
return !part.shall_participate_in_merges;
|
return !part.shall_participate_in_merges;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,6 +552,10 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This replica inactive, don't check anything
|
||||||
|
if (!inactive_replicas.empty() && inactive_replicas.count(replica))
|
||||||
|
break;
|
||||||
|
|
||||||
/// It maybe already removed from zk, but local in-memory mutations
|
/// It maybe already removed from zk, but local in-memory mutations
|
||||||
/// state was not updated.
|
/// state was not updated.
|
||||||
if (!getZooKeeper()->exists(fs::path(zookeeper_path) / "mutations" / mutation_id))
|
if (!getZooKeeper()->exists(fs::path(zookeeper_path) / "mutations" / mutation_id))
|
||||||
|
@ -351,6 +351,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
|
||||||
ORDER BY id PARTITION BY toDayOfMonth(date)
|
ORDER BY id PARTITION BY toDayOfMonth(date)
|
||||||
TTL date + INTERVAL 3 SECOND
|
TTL date + INTERVAL 3 SECOND
|
||||||
|
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
|
||||||
'''.format(suff=num_run, replica=node.name))
|
'''.format(suff=num_run, replica=node.name))
|
||||||
|
|
||||||
node.query(
|
node.query(
|
||||||
@ -359,6 +360,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
|
||||||
ORDER BY id PARTITION BY toDayOfMonth(date)
|
ORDER BY id PARTITION BY toDayOfMonth(date)
|
||||||
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
|
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
|
||||||
|
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
|
||||||
'''.format(suff=num_run, replica=node.name))
|
'''.format(suff=num_run, replica=node.name))
|
||||||
|
|
||||||
node.query(
|
node.query(
|
||||||
@ -367,6 +369,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
|
||||||
ORDER BY id PARTITION BY toDayOfMonth(date)
|
ORDER BY id PARTITION BY toDayOfMonth(date)
|
||||||
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
|
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
|
||||||
|
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
|
||||||
'''.format(suff=num_run, replica=node.name))
|
'''.format(suff=num_run, replica=node.name))
|
||||||
|
|
||||||
node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)")
|
node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)")
|
||||||
@ -397,9 +400,9 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
|
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
|
||||||
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
|
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
|
||||||
|
|
||||||
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete")
|
exec_query_with_retry(node_left, "OPTIMIZE TABLE test_ttl_delete FINAL")
|
||||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
node_left.query("OPTIMIZE TABLE test_ttl_group_by FINAL", timeout=20)
|
||||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
node_left.query("OPTIMIZE TABLE test_ttl_where FINAL", timeout=20)
|
||||||
|
|
||||||
# After OPTIMIZE TABLE, it is not guaranteed that everything is merged.
|
# After OPTIMIZE TABLE, it is not guaranteed that everything is merged.
|
||||||
# Possible scenario (for test_ttl_group_by):
|
# Possible scenario (for test_ttl_group_by):
|
||||||
@ -414,6 +417,10 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
||||||
node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
||||||
|
|
||||||
|
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete")
|
||||||
|
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
||||||
|
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
||||||
|
|
||||||
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
||||||
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
||||||
|
|
||||||
|
@ -16,11 +16,11 @@ ttl_01280_3
|
|||||||
2 1 0 3
|
2 1 0 3
|
||||||
3 1 8 2
|
3 1 8 2
|
||||||
ttl_01280_4
|
ttl_01280_4
|
||||||
1 1 0 4
|
0 4
|
||||||
10 2 13 9
|
13 9
|
||||||
ttl_01280_5
|
ttl_01280_5
|
||||||
1 2 7 5
|
1 2 7 5
|
||||||
2 3 6 5
|
2 3 6 5
|
||||||
ttl_01280_6
|
ttl_01280_6
|
||||||
1 5 3 5
|
1 3 5
|
||||||
2 10 3 5
|
2 3 5
|
||||||
|
@ -80,7 +80,7 @@ insert into ttl_01280_4 values (1, 5, 4, 9, now())"
|
|||||||
|
|
||||||
sleep 2
|
sleep 2
|
||||||
optimize "ttl_01280_4"
|
optimize "ttl_01280_4"
|
||||||
$CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_4 ORDER BY a, b, x, y"
|
$CLICKHOUSE_CLIENT --query "select x, y from ttl_01280_4 ORDER BY a, b, x, y"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5"
|
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5"
|
||||||
|
|
||||||
@ -107,7 +107,7 @@ insert into ttl_01280_6 values (1, 5, 3, 5, now())"
|
|||||||
|
|
||||||
sleep 2
|
sleep 2
|
||||||
optimize "ttl_01280_6"
|
optimize "ttl_01280_6"
|
||||||
$CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_6 ORDER BY a, b, x, y"
|
$CLICKHOUSE_CLIENT --query "select a, x, y from ttl_01280_6 ORDER BY a, b, x, y"
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_1"
|
$CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_1"
|
||||||
$CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_2"
|
$CLICKHOUSE_CLIENT -q "DROP TABLE ttl_01280_2"
|
||||||
|
Loading…
Reference in New Issue
Block a user