Fix bug in execution of TTL GROUP BY

This commit is contained in:
alesapin 2021-06-27 19:18:15 +03:00
parent bf85b09a16
commit c977c33d6d
9 changed files with 174 additions and 69 deletions

View File

@ -36,88 +36,110 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params); aggregator = std::make_unique<Aggregator>(params);
if (isMinTTLExpired())
new_ttl_info.finished = true;
} }
void TTLAggregationAlgorithm::execute(Block & block) void TTLAggregationAlgorithm::execute(Block & block)
{ {
if (!block)
{
if (!aggregation_result.empty())
{
MutableColumns result_columns = header.cloneEmptyColumns();
finalizeAggregates(result_columns);
block = header.cloneWithColumns(std::move(result_columns));
}
return; bool some_rows_were_aggregated = false;
}
const auto & column_names = header.getNames();
MutableColumns result_columns = header.cloneEmptyColumns(); MutableColumns result_columns = header.cloneEmptyColumns();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column); if (!block) /// Empty block -- no more data, but we may still have some accumulated rows
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{ {
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); if (!aggregation_result.empty()) /// Still have some aggregated data, let's update TTL
bool where_filter_passed = !where_column || where_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < description.group_by_keys.size(); ++j)
{ {
const String & key_column = description.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
finalizeAggregates(result_columns); finalizeAggregates(result_columns);
some_rows_were_aggregated = true;
current_key_start = rows_aggregated;
rows_with_current_key = 0;
} }
else /// No block, all aggregated, just finish
if (ttl_expired)
{ {
++rows_with_current_key; return;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_info.update(cur_ttl);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
} }
} }
else
{
const auto & column_names = header.getNames();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
if (rows_with_current_key) auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
bool where_filter_passed = !where_column || where_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < description.group_by_keys.size(); ++j)
{
const String & key_column = description.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
{
some_rows_were_aggregated = true;
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
}
finalizeAggregates(result_columns);
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
}
if (rows_with_current_key)
{
some_rows_were_aggregated = true;
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
}
}
block = header.cloneWithColumns(std::move(result_columns)); block = header.cloneWithColumns(std::move(result_columns));
/// If some rows were aggregated we have to recalculate ttl info's
if (some_rows_were_aggregated)
{
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column);
for (size_t i = 0; i < block.rows(); ++i)
new_ttl_info.update(getTimestampByIndex(ttl_column_after_aggregation.get(), i));
}
} }
void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length) void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
@ -133,6 +155,7 @@ void TTLAggregationAlgorithm::calculateAggregates(const MutableColumns & aggrega
aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns, aggregator->executeOnBlock(aggregate_chunk, length, aggregation_result, key_columns,
columns_for_aggregator, no_more_keys); columns_for_aggregator, no_more_keys);
} }
void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns) void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns)
@ -140,6 +163,7 @@ void TTLAggregationAlgorithm::finalizeAggregates(MutableColumns & result_columns
if (!aggregation_result.empty()) if (!aggregation_result.empty())
{ {
auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1); auto aggregated_res = aggregator->convertToBlocks(aggregation_result, true, 1);
for (auto & agg_block : aggregated_res) for (auto & agg_block : aggregated_res)
{ {
for (const auto & it : description.set_parts) for (const auto & it : description.set_parts)

View File

@ -21,6 +21,8 @@ TTLColumnAlgorithm::TTLColumnAlgorithm(
new_ttl_info = old_ttl_info; new_ttl_info = old_ttl_info;
is_fully_empty = false; is_fully_empty = false;
} }
else
new_ttl_info.finished = true;
} }
void TTLColumnAlgorithm::execute(Block & block) void TTLColumnAlgorithm::execute(Block & block)

View File

@ -9,6 +9,8 @@ TTLDeleteAlgorithm::TTLDeleteAlgorithm(
{ {
if (!isMinTTLExpired()) if (!isMinTTLExpired())
new_ttl_info = old_ttl_info; new_ttl_info = old_ttl_info;
else
new_ttl_info.finished = true;
} }
void TTLDeleteAlgorithm::execute(Block & block) void TTLDeleteAlgorithm::execute(Block & block)

View File

@ -55,6 +55,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
MergeTreeDataPartTTLInfo ttl_info; MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = col["min"].getUInt(); ttl_info.min = col["min"].getUInt();
ttl_info.max = col["max"].getUInt(); ttl_info.max = col["max"].getUInt();
if (col.has("finished"))
ttl_info.finished = col["finished"].getUInt();
String name = col["name"].getString(); String name = col["name"].getString();
columns_ttl.emplace(name, ttl_info); columns_ttl.emplace(name, ttl_info);
@ -67,6 +71,9 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
table_ttl.min = table["min"].getUInt(); table_ttl.min = table["min"].getUInt();
table_ttl.max = table["max"].getUInt(); table_ttl.max = table["max"].getUInt();
if (table.has("finished"))
table_ttl.finished = table["finished"].getUInt();
updatePartMinMaxTTL(table_ttl.min, table_ttl.max); updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
} }
@ -77,6 +84,10 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
MergeTreeDataPartTTLInfo ttl_info; MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = elem["min"].getUInt(); ttl_info.min = elem["min"].getUInt();
ttl_info.max = elem["max"].getUInt(); ttl_info.max = elem["max"].getUInt();
if (elem.has("finished"))
ttl_info.finished = elem["finished"].getUInt();
String expression = elem["expression"].getString(); String expression = elem["expression"].getString();
ttl_info_map.emplace(expression, ttl_info); ttl_info_map.emplace(expression, ttl_info);
@ -126,6 +137,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeIntText(it->second.min, out); writeIntText(it->second.min, out);
writeString(",\"max\":", out); writeString(",\"max\":", out);
writeIntText(it->second.max, out); writeIntText(it->second.max, out);
writeString(R"(,"finished":)", out);
writeIntText(static_cast<uint8_t>(it->second.finished), out);
writeString("}", out); writeString("}", out);
} }
writeString("]", out); writeString("]", out);
@ -138,6 +151,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeIntText(table_ttl.min, out); writeIntText(table_ttl.min, out);
writeString(R"(,"max":)", out); writeString(R"(,"max":)", out);
writeIntText(table_ttl.max, out); writeIntText(table_ttl.max, out);
writeString(R"(,"finished":)", out);
writeIntText(static_cast<uint8_t>(table_ttl.finished), out);
writeString("}", out); writeString("}", out);
} }
@ -159,6 +174,8 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeIntText(it->second.min, out); writeIntText(it->second.min, out);
writeString(R"(,"max":)", out); writeString(R"(,"max":)", out);
writeIntText(it->second.max, out); writeIntText(it->second.max, out);
writeString(R"(,"finished":)", out);
writeIntText(static_cast<uint8_t>(it->second.finished), out);
writeString("}", out); writeString("}", out);
} }
writeString("]", out); writeString("]", out);
@ -202,6 +219,39 @@ time_t MergeTreeDataPartTTLInfos::getMinimalMaxRecompressionTTL() const
return max; return max;
} }
bool MergeTreeDataPartTTLInfos::hasAnyNonFinishedTTLs() const
{
auto has_non_finished_ttl = [] (const TTLInfoMap & map) -> bool
{
for (const auto & [name, info] : map)
{
if (!info.finished)
return true;
}
return false;
};
if (!table_ttl.finished)
return true;
if (has_non_finished_ttl(columns_ttl))
return true;
if (has_non_finished_ttl(rows_where_ttl))
return true;
if (has_non_finished_ttl(moves_ttl))
return true;
if (has_non_finished_ttl(recompression_ttl))
return true;
if (has_non_finished_ttl(group_by_ttl))
return true;
return false;
}
std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max) std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescriptions & descriptions, const TTLInfoMap & ttl_info_map, time_t current_time, bool use_max)
{ {
time_t best_ttl_time = 0; time_t best_ttl_time = 0;
@ -232,4 +282,5 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
return best_ttl_time ? *best_entry_it : std::optional<TTLDescription>(); return best_ttl_time ? *best_entry_it : std::optional<TTLDescription>();
} }
} }

View File

@ -14,6 +14,11 @@ struct MergeTreeDataPartTTLInfo
time_t min = 0; time_t min = 0;
time_t max = 0; time_t max = 0;
/// This TTL was computed on completely expired part. It doesn't make sense
/// to select such parts for TTL again. But make sense to recalcuate TTL
/// again for merge with multiple parts.
bool finished = false;
void update(time_t time) void update(time_t time)
{ {
if (time && (!min || time < min)) if (time && (!min || time < min))
@ -28,6 +33,7 @@ struct MergeTreeDataPartTTLInfo
min = other_info.min; min = other_info.min;
max = std::max(other_info.max, max); max = std::max(other_info.max, max);
finished &= other_info.finished;
} }
}; };
@ -60,6 +66,9 @@ struct MergeTreeDataPartTTLInfos
void write(WriteBuffer & out) const; void write(WriteBuffer & out) const;
void update(const MergeTreeDataPartTTLInfos & other_infos); void update(const MergeTreeDataPartTTLInfos & other_infos);
/// Has any TTLs which are not calculated on completely expired parts.
bool hasAnyNonFinishedTTLs() const;
void updatePartMinMaxTTL(time_t time_min, time_t time_max) void updatePartMinMaxTTL(time_t time_min, time_t time_max)
{ {
if (time_min && (!part_min_ttl || time_min < part_min_ttl)) if (time_min && (!part_min_ttl || time_min < part_min_ttl))

View File

@ -111,6 +111,9 @@ bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part &
if (only_drop_parts) if (only_drop_parts)
return false; return false;
if (!part.ttl_infos->hasAnyNonFinishedTTLs())
return false;
return !part.shall_participate_in_merges; return !part.shall_participate_in_merges;
} }

View File

@ -551,6 +551,10 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
break; break;
} }
/// This replica inactive, don't check anything
if (!inactive_replicas.empty() && inactive_replicas.count(replica))
break;
/// It maybe already removed from zk, but local in-memory mutations /// It maybe already removed from zk, but local in-memory mutations
/// state was not updated. /// state was not updated.
if (!getZooKeeper()->exists(fs::path(zookeeper_path) / "mutations" / mutation_id)) if (!getZooKeeper()->exists(fs::path(zookeeper_path) / "mutations" / mutation_id))

View File

@ -351,6 +351,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}') ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND TTL date + INTERVAL 3 SECOND
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name)) '''.format(suff=num_run, replica=node.name))
node.query( node.query(
@ -359,6 +360,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}') ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val) TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name)) '''.format(suff=num_run, replica=node.name))
node.query( node.query(
@ -367,6 +369,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}') ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1 TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name)) '''.format(suff=num_run, replica=node.name))
node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)") node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)")
@ -397,9 +400,9 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL") node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL") node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete") exec_query_with_retry(node_left, "OPTIMIZE TABLE test_ttl_delete FINAL")
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) node_left.query("OPTIMIZE TABLE test_ttl_group_by FINAL", timeout=20)
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) node_left.query("OPTIMIZE TABLE test_ttl_where FINAL", timeout=20)
# After OPTIMIZE TABLE, it is not guaranteed that everything is merged. # After OPTIMIZE TABLE, it is not guaranteed that everything is merged.
# Possible scenario (for test_ttl_group_by): # Possible scenario (for test_ttl_group_by):
@ -414,6 +417,10 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete")
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n" assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n" assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"

View File

@ -79,7 +79,10 @@ def test_upgrade_while_mutation(start_cluster):
node3.restart_with_latest_version(signal=9) node3.restart_with_latest_version(signal=9)
exec_query_with_retry(node3, "ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) # wait replica became active
exec_query_with_retry(node3, "SYSTEM RESTART REPLICA mt1")
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
# will delete nothing, but previous async mutation will finish with this query # will delete nothing, but previous async mutation will finish with this query
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n") assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")