mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #8422 from excitoon-favorites/bettertestalterttl
Fixed `ALTER MODIFY TTL`
This commit is contained in:
commit
c711ce500b
@ -633,6 +633,10 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
|
||||
|
||||
if (new_ttl_table_ast)
|
||||
{
|
||||
std::vector<TTLEntry> update_move_ttl_entries;
|
||||
ASTPtr update_ttl_table_ast = nullptr;
|
||||
TTLEntry update_ttl_table_entry;
|
||||
|
||||
bool seen_delete_ttl = false;
|
||||
for (auto ttl_element_ptr : new_ttl_table_ast->children)
|
||||
{
|
||||
@ -647,8 +651,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
|
||||
auto new_ttl_table_entry = create_ttl_entry(ttl_element.children[0]);
|
||||
if (!only_check)
|
||||
{
|
||||
ttl_table_ast = ttl_element.children[0];
|
||||
ttl_table_entry = new_ttl_table_entry;
|
||||
update_ttl_table_ast = ttl_element.children[0];
|
||||
update_ttl_table_entry = new_ttl_table_entry;
|
||||
}
|
||||
|
||||
seen_delete_ttl = true;
|
||||
@ -671,11 +675,18 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
{
|
||||
move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
}
|
||||
update_move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
}
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
{
|
||||
ttl_table_entry = update_ttl_table_entry;
|
||||
ttl_table_ast = update_ttl_table_ast;
|
||||
|
||||
auto move_ttl_entries_lock = std::lock_guard<std::mutex>(move_ttl_entries_mutex);
|
||||
move_ttl_entries = update_move_ttl_entries;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3293,7 +3304,7 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
ReservationPtr reservation;
|
||||
|
||||
auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
|
||||
if (ttl_entry != nullptr)
|
||||
if (ttl_entry)
|
||||
{
|
||||
SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy);
|
||||
if (!destination_ptr)
|
||||
@ -3352,27 +3363,28 @@ bool MergeTreeData::TTLEntry::isPartInDestination(const StoragePolicyPtr & polic
|
||||
return false;
|
||||
}
|
||||
|
||||
const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
const MergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move) const
|
||||
{
|
||||
const MergeTreeData::TTLEntry * result = nullptr;
|
||||
/// Prefer TTL rule which went into action last.
|
||||
time_t max_max_ttl = 0;
|
||||
std::vector<DB::MergeTreeData::TTLEntry>::const_iterator best_entry_it;
|
||||
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
auto lock = std::lock_guard(move_ttl_entries_mutex);
|
||||
for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column);
|
||||
/// Prefer TTL rule which went into action last.
|
||||
if (ttl_info_it != ttl_infos.moves_ttl.end()
|
||||
&& ttl_info_it->second.max <= time_of_move
|
||||
&& max_max_ttl <= ttl_info_it->second.max)
|
||||
{
|
||||
result = &ttl_entry;
|
||||
best_entry_it = ttl_entry_it;
|
||||
max_max_ttl = ttl_info_it->second.max;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return max_max_ttl ? *best_entry_it : std::optional<MergeTreeData::TTLEntry>();
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
|
@ -737,12 +737,17 @@ public:
|
||||
bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const;
|
||||
};
|
||||
|
||||
const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
std::optional<TTLEntry> selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
|
||||
TTLEntriesByName column_ttl_entries_by_name;
|
||||
|
||||
TTLEntry ttl_table_entry;
|
||||
|
||||
/// This mutex is required for background move operations which do not obtain global locks.
|
||||
mutable std::mutex move_ttl_entries_mutex;
|
||||
|
||||
/// Vector rw operations have to be done under "move_ttl_entries_mutex".
|
||||
std::vector<TTLEntry> move_ttl_entries;
|
||||
|
||||
String sampling_expr_column_name;
|
||||
|
@ -127,14 +127,14 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
if (!can_move(part, &reason))
|
||||
continue;
|
||||
|
||||
const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto to_insert = need_to_move.find(part->disk);
|
||||
ReservationPtr reservation;
|
||||
if (ttl_entry_ptr)
|
||||
if (ttl_entry)
|
||||
{
|
||||
auto destination = ttl_entry_ptr->getDestination(policy);
|
||||
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
|
||||
auto destination = ttl_entry->getDestination(policy);
|
||||
if (destination && !ttl_entry->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry->getDestination(policy));
|
||||
}
|
||||
|
||||
if (reservation) /// Found reservation by TTL rule.
|
||||
|
@ -74,37 +74,37 @@ def test_rule_with_invalid_destination(started_cluster, name, engine, alter):
|
||||
{expression}
|
||||
SETTINGS storage_policy='{policy}'
|
||||
""".format(expression=x, name=name, engine=engine, policy=policy)
|
||||
|
||||
|
||||
if alter:
|
||||
node1.query(get_command(None, "small_jbod_with_external"))
|
||||
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query(get_command("TTL d1 TO DISK 'unknown'", "small_jbod_with_external"))
|
||||
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
||||
|
||||
|
||||
if alter:
|
||||
node1.query(get_command(None, "small_jbod_with_external"))
|
||||
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query(get_command("TTL d1 TO VOLUME 'unknown'", "small_jbod_with_external"))
|
||||
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
||||
|
||||
|
||||
if alter:
|
||||
node1.query(get_command(None, "only_jbod2"))
|
||||
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query(get_command("TTL d1 TO DISK 'jbod1'", "only_jbod2"))
|
||||
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
||||
|
||||
|
||||
if alter:
|
||||
node1.query(get_command(None, "only_jbod2"))
|
||||
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query(get_command("TTL d1 TO VOLUME 'external'", "only_jbod2"))
|
||||
|
||||
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
||||
|
||||
@ -501,13 +501,17 @@ def test_moves_after_merges_work(started_cluster, name, engine, positive):
|
||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name,engine,positive", [
|
||||
("mt_test_moves_after_merges_do_not_work","MergeTree()",0),
|
||||
("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0),
|
||||
("mt_test_moves_after_merges_work","MergeTree()",1),
|
||||
("replicated_mt_test_moves_after_merges_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')",1),
|
||||
@pytest.mark.parametrize("name,engine,positive,bar", [
|
||||
("mt_test_moves_after_alter_do_not_work","MergeTree()",0,"DELETE"),
|
||||
("replicated_mt_test_moves_after_alter_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_do_not_work', '1')",0,"DELETE"),
|
||||
("mt_test_moves_after_alter_work","MergeTree()",1,"DELETE"),
|
||||
("replicated_mt_test_moves_after_alter_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_work', '1')",1,"DELETE"),
|
||||
("mt_test_moves_after_alter_do_not_work","MergeTree()",0,"TO DISK 'external'"),
|
||||
("replicated_mt_test_moves_after_alter_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_do_not_work', '1')",0,"TO DISK 'external'"),
|
||||
("mt_test_moves_after_alter_work","MergeTree()",1,"TO DISK 'external'"),
|
||||
("replicated_mt_test_moves_after_alter_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_alter_work', '1')",1,"TO DISK 'external'"),
|
||||
])
|
||||
def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive):
|
||||
def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive, bar):
|
||||
try:
|
||||
node1.query("""
|
||||
CREATE TABLE {name} (
|
||||
@ -523,8 +527,8 @@ def test_ttls_do_not_work_after_alter(started_cluster, name, engine, positive):
|
||||
node1.query("""
|
||||
ALTER TABLE {name}
|
||||
MODIFY TTL
|
||||
d1 + INTERVAL 15 MINUTE
|
||||
""".format(name=name)) # That shall disable TTL.
|
||||
d1 + INTERVAL 15 MINUTE {bar}
|
||||
""".format(name=name, bar=bar)) # That shall disable TTL.
|
||||
|
||||
data = [] # 10MB in total
|
||||
for i in range(10):
|
||||
@ -578,14 +582,14 @@ limitations under the License."""
|
||||
) ENGINE = {engine}
|
||||
ORDER BY tuple()
|
||||
PARTITION BY p1
|
||||
TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2',
|
||||
TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2',
|
||||
d1 + INTERVAL 60 SECOND TO VOLUME 'external'
|
||||
SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0
|
||||
""".format(name=name, engine=engine))
|
||||
|
||||
node1.query("""
|
||||
ALTER TABLE {name} MODIFY
|
||||
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
|
||||
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
|
||||
d1 + INTERVAL 5 SECOND TO VOLUME 'external',
|
||||
d1 + INTERVAL 10 SECOND DELETE
|
||||
""".format(name=name))
|
||||
@ -622,6 +626,96 @@ limitations under the License."""
|
||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name,engine", [
|
||||
("concurrently_altering_ttl_mt","MergeTree()"),
|
||||
("concurrently_altering_ttl_replicated_mt","ReplicatedMergeTree('/clickhouse/concurrently_altering_ttl_replicated_mt', '1')",),
|
||||
])
|
||||
def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
|
||||
try:
|
||||
node1.query("""
|
||||
CREATE TABLE {name} (
|
||||
EventDate Date,
|
||||
number UInt64
|
||||
) ENGINE = {engine}
|
||||
ORDER BY tuple()
|
||||
PARTITION BY toYYYYMM(EventDate)
|
||||
SETTINGS storage_policy='jbods_with_external'
|
||||
""".format(name=name, engine=engine))
|
||||
|
||||
values = list({ random.randint(1, 1000000) for _ in range(0, 1000) })
|
||||
|
||||
def insert(num):
|
||||
for i in range(num):
|
||||
day = random.randint(11, 30)
|
||||
value = values.pop()
|
||||
month = '0' + str(random.choice([3, 4]))
|
||||
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
||||
|
||||
def alter_move(num):
|
||||
def produce_alter_move(node, name):
|
||||
move_type = random.choice(["PART", "PARTITION"])
|
||||
if move_type == "PART":
|
||||
for _ in range(10):
|
||||
try:
|
||||
parts = node1.query("SELECT name from system.parts where table = '{}' and active = 1".format(name)).strip().split('\n')
|
||||
break
|
||||
except QueryRuntimeException:
|
||||
pass
|
||||
else:
|
||||
raise Exception("Cannot select from system.parts")
|
||||
|
||||
move_part = random.choice(["'" + part + "'" for part in parts])
|
||||
else:
|
||||
move_part = random.choice([201903, 201904])
|
||||
|
||||
move_disk = random.choice(["DISK", "VOLUME"])
|
||||
if move_disk == "DISK":
|
||||
move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"])
|
||||
else:
|
||||
move_volume = random.choice(["'main'", "'external'"])
|
||||
try:
|
||||
node1.query("ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format(
|
||||
name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume))
|
||||
except QueryRuntimeException as ex:
|
||||
pass
|
||||
|
||||
for i in range(num):
|
||||
produce_alter_move(node1, name)
|
||||
|
||||
def alter_update(num):
|
||||
for i in range(num):
|
||||
node1.query("ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name))
|
||||
|
||||
def alter_modify_ttl(num):
|
||||
for i in range(num):
|
||||
ttls = []
|
||||
for j in range(random.randint(1, 10)):
|
||||
what = random.choice(["TO VOLUME 'main'", "TO VOLUME 'external'", "TO DISK 'jbod1'", "TO DISK 'jbod2'", "TO DISK 'external'"])
|
||||
when = "now()+{}".format(random.randint(-1, 5))
|
||||
ttls.append("{} {}".format(when, what))
|
||||
node1.query("ALTER TABLE {} MODIFY TTL {}".format(name, ", ".join(ttls)))
|
||||
|
||||
def optimize_table(num):
|
||||
for i in range(num):
|
||||
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
||||
|
||||
p = Pool(15)
|
||||
tasks = []
|
||||
for i in range(5):
|
||||
tasks.append(p.apply_async(insert, (100,)))
|
||||
tasks.append(p.apply_async(alter_move, (100,)))
|
||||
tasks.append(p.apply_async(alter_update, (100,)))
|
||||
tasks.append(p.apply_async(alter_modify_ttl, (100,)))
|
||||
tasks.append(p.apply_async(optimize_table, (100,)))
|
||||
|
||||
for task in tasks:
|
||||
task.get(timeout=60)
|
||||
|
||||
assert node1.query("SELECT 1") == "1\n"
|
||||
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
||||
|
||||
@pytest.mark.parametrize("name,positive", [
|
||||
("test_double_move_while_select_negative", 0),
|
||||
("test_double_move_while_select_positive", 1),
|
||||
|
Loading…
Reference in New Issue
Block a user