update test cases and fix bug

This commit is contained in:
terrylin 2021-08-03 15:37:25 +08:00
parent d468bd7af3
commit ffe9eab1d0
6 changed files with 66 additions and 29 deletions

View File

@ -82,11 +82,11 @@ TTLBlockInputStream::TTLBlockInputStream(
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
move_ttl, TTLUpdateType::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
recompression_ttl, TTLUpdateType::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
Block reorderColumns(Block block, const Block & header)

View File

@ -22,33 +22,33 @@ TTLCalcInputStream::TTLCalcInputStream(
{
const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
rows_ttl, old_ttl_infos.table_ttl, current_time_, force_));
rows_ttl, TTLUpdateType::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
}
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
where_ttl, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
where_ttl, TTLUpdateType::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
group_by_ttl, TTLUpdateType::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
if (metadata_snapshot_->hasAnyColumnTTL())
{
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
{
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
description, old_ttl_infos.columns_ttl[name], current_time_, force_));
description, TTLUpdateType::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
}
}
for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
move_ttl, TTLUpdateType::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
recompression_ttl, TTLUpdateType::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
Block TTLCalcInputStream::readImpl()

View File

@ -4,8 +4,15 @@ namespace DB
{
TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
const TTLDescription & description_,
const TTLUpdateType ttl_update_type_,
const String ttl_update_key_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
, ttl_update_type(ttl_update_type_)
, ttl_update_key(ttl_update_key_)
{
}
@ -24,26 +31,32 @@ void TTLUpdateInfoAlgorithm::execute(Block & block)
void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
if (description.mode == TTLMode::RECOMPRESS)
if (ttl_update_type == TTLUpdateType::RECOMPRESSION_TTL)
{
data_part->ttl_infos.recompression_ttl[description.result_column] = new_ttl_info;
data_part->ttl_infos.recompression_ttl[ttl_update_key] = new_ttl_info;
}
else if (description.mode == TTLMode::MOVE)
else if (ttl_update_type == TTLUpdateType::MOVES_TTL)
{
data_part->ttl_infos.moves_ttl[description.result_column] = new_ttl_info;
data_part->ttl_infos.moves_ttl[ttl_update_key] = new_ttl_info;
}
else if (description.mode == TTLMode::GROUP_BY)
else if (ttl_update_type == TTLUpdateType::GROUP_BY_TTL)
{
data_part->ttl_infos.group_by_ttl[description.result_column] = new_ttl_info;
data_part->ttl_infos.group_by_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (description.mode == TTLMode::DELETE)
else if (ttl_update_type == TTLUpdateType::ROWS_WHERE_TTL)
{
data_part->ttl_infos.rows_where_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (ttl_update_type == TTLUpdateType::TABLE_TTL)
{
if (description.where_expression)
data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
else
data_part->ttl_infos.table_ttl = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (ttl_update_type == TTLUpdateType::COLUMNS_TTL)
{
data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}

View File

@ -5,14 +5,35 @@
namespace DB
{
enum class TTLUpdateType
{
COLUMNS_TTL,
TABLE_TTL,
ROWS_WHERE_TTL,
MOVES_TTL,
RECOMPRESSION_TTL,
GROUP_BY_TTL,
};
/// Calculates new ttl_info and does nothing with data.
class TTLUpdateInfoAlgorithm : public ITTLAlgorithm
{
public:
TTLUpdateInfoAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
TTLUpdateInfoAlgorithm(
const TTLDescription & description_,
const TTLUpdateType ttl_update_type_,
const String ttl_update_key_,
const TTLInfo & old_ttl_info_,
time_t current_time_, bool force_
);
void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override;
private:
const TTLUpdateType ttl_update_type;
const String ttl_update_key;
};
}

View File

@ -169,15 +169,15 @@ def test_modify_ttl(started_cluster):
node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20)
node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node2.query("SELECT id FROM test_ttl") == "2\n3\n"
node2.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node1.query("SELECT id FROM test_ttl") == "3\n"
node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node2.query("SELECT id FROM test_ttl") == ""
@ -196,15 +196,15 @@ def test_modify_column_ttl(started_cluster):
node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20)
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node2.query("SELECT id FROM test_ttl") == "42\n2\n3\n"
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node1.query("SELECT id FROM test_ttl") == "42\n42\n3\n"
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2")
time.sleep(5) # TTL merges shall happen.
time.sleep(6) # TTL merges shall happen.
assert node2.query("SELECT id FROM test_ttl") == "42\n42\n42\n"
@ -328,7 +328,7 @@ def test_ttl_empty_parts(started_cluster):
node.query("SYSTEM START TTL MERGES test_ttl_empty_parts")
optimize_with_retry(node1, 'test_ttl_empty_parts')
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_4_1_6\n"
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_5_1_6\n"
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "3000\n"

View File

@ -10,6 +10,9 @@ SELECT count() FROM system.parts WHERE table = 'ttl_empty_parts' AND database =
ALTER TABLE ttl_empty_parts MODIFY TTL d;
SELECT sleep(3) format Null;
SELECT sleep(3) format Null;
-- To be sure, that task, which clears outdated parts executed.
DETACH TABLE ttl_empty_parts;
ATTACH TABLE ttl_empty_parts;