support TTL TO [DISK|VOLUME] [IF EXISTS]

This commit is contained in:
Anton Popov 2022-02-10 19:18:01 +03:00
parent 437940b29d
commit 70986a70a1
14 changed files with 161 additions and 28 deletions

View File

@ -0,0 +1,34 @@
#include <Disks/IStoragePolicy.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_VOLUME;
extern const int UNKNOWN_DISK;
}
DiskPtr IStoragePolicy::getDiskByName(const String & disk_name) const
{
auto disk = tryGetDiskByName(disk_name);
if (!disk)
throw Exception(ErrorCodes::UNKNOWN_DISK,
"No such disk {} in storage policy {}", backQuote(disk_name), backQuote(getName()));
return disk;
}
VolumePtr IStoragePolicy::getVolumeByName(const String & volume_name) const
{
auto volume = tryGetVolumeByName(volume_name);
if (!volume)
throw Exception(ErrorCodes::UNKNOWN_VOLUME,
"No such volume {} in storage policy {}", backQuote(volume_name), backQuote(getName()));
return volume;
}
}

View File

@ -39,7 +39,8 @@ public:
/// Used when it's not important, for example for
/// mutations files
virtual DiskPtr getAnyDisk() const = 0;
virtual DiskPtr getDiskByName(const String & disk_name) const = 0;
virtual DiskPtr tryGetDiskByName(const String & disk_name) const = 0;
DiskPtr getDiskByName(const String & disk_name) const;
/// Get free space from most free disk
virtual UInt64 getMaxUnreservedFreeSpace() const = 0;
/// Reserves space on any volume with index > min_volume_index or returns nullptr
@ -53,7 +54,8 @@ public:
virtual ReservationPtr makeEmptyReservationOnLargestDisk() const = 0;
/// Get volume by index.
virtual VolumePtr getVolume(size_t index) const = 0;
virtual VolumePtr getVolumeByName(const String & volume_name) const = 0;
virtual VolumePtr tryGetVolumeByName(const String & volume_name) const = 0;
VolumePtr getVolumeByName(const String & volume_name) const;
/// Checks if storage policy can be replaced by another one.
virtual void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const = 0;
/// Find volume index, which contains disk

View File

@ -179,7 +179,7 @@ DiskPtr StoragePolicy::getAnyDisk() const
}
DiskPtr StoragePolicy::getDiskByName(const String & disk_name) const
DiskPtr StoragePolicy::tryGetDiskByName(const String & disk_name) const
{
for (auto && volume : volumes)
for (auto && disk : volume->getDisks())
@ -265,11 +265,11 @@ VolumePtr StoragePolicy::getVolume(size_t index) const
}
VolumePtr StoragePolicy::getVolumeByName(const String & volume_name) const
VolumePtr StoragePolicy::tryGetVolumeByName(const String & volume_name) const
{
auto it = volume_index_by_volume_name.find(volume_name);
if (it == volume_index_by_volume_name.end())
throw Exception("No such volume " + backQuote(volume_name) + " in storage policy " + backQuote(name), ErrorCodes::UNKNOWN_VOLUME);
return nullptr;
return getVolume(it->second);
}

View File

@ -52,7 +52,7 @@ public:
/// mutations files
DiskPtr getAnyDisk() const override;
DiskPtr getDiskByName(const String & disk_name) const override;
DiskPtr tryGetDiskByName(const String & disk_name) const override;
/// Get free space from most free disk
UInt64 getMaxUnreservedFreeSpace() const override;
@ -84,7 +84,7 @@ public:
/// Get volume by index.
VolumePtr getVolume(size_t index) const override;
VolumePtr getVolumeByName(const String & volume_name) const override;
VolumePtr tryGetVolumeByName(const String & volume_name) const override;
/// Checks if storage policy can be replaced by another one.
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const override;

View File

@ -1,13 +1,17 @@
#include <Columns/Collator.h>
#include <Common/quoteString.h>
#include <Parsers/ASTTTLElement.h>
#include <IO/Operators.h>
#include <base/EnumReflection.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ASTPtr ASTTTLElement::clone() const
{
auto clone = std::make_shared<ASTTTLElement>(*this);
@ -29,13 +33,21 @@ ASTPtr ASTTTLElement::clone() const
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
ttl()->formatImpl(settings, state, frame);
if (mode == TTLMode::MOVE && destination_type == DataDestinationType::DISK)
if (mode == TTLMode::MOVE)
{
settings.ostr << " TO DISK " << quoteString(destination_name);
}
else if (mode == TTLMode::MOVE && destination_type == DataDestinationType::VOLUME)
{
settings.ostr << " TO VOLUME " << quoteString(destination_name);
if (destination_type == DataDestinationType::DISK)
settings.ostr << " TO DISK ";
else if (destination_type == DataDestinationType::VOLUME)
settings.ostr << " TO VOLUME ";
else
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unsupported destination type {} for TTL MOVE",
magic_enum::enum_name(destination_type));
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << quoteString(destination_name);
}
else if (mode == TTLMode::GROUP_BY)
{

View File

@ -16,16 +16,18 @@ public:
TTLMode mode;
DataDestinationType destination_type;
String destination_name;
bool if_exists = false;
ASTs group_by_key;
ASTs group_by_assignments;
ASTPtr recompression_codec;
ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_)
ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_, bool if_exists_)
: mode(mode_)
, destination_type(destination_type_)
, destination_name(destination_name_)
, if_exists(if_exists_)
, ttl_expr_pos(-1)
, where_expr_pos(-1)
{

View File

@ -2360,6 +2360,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_delete("DELETE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
@ -2414,9 +2415,13 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr group_by_key;
ASTPtr recompression_codec;
ASTPtr group_by_assignments;
bool if_exists = false;
if (mode == TTLMode::MOVE)
{
if (s_if_exists.ignore(pos))
if_exists = true;
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))
return false;
@ -2448,7 +2453,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name);
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name, if_exists);
ttl_element->setTTL(std::move(ttl_expr));
if (where_expr)
ttl_element->setWhere(std::move(where_expr));

View File

@ -650,13 +650,14 @@ void MergeTreeData::checkTTLExpressions(const StorageInMemoryMetadata & new_meta
{
for (const auto & move_ttl : new_table_ttl.move_ttl)
{
if (!getDestinationForMoveTTL(move_ttl))
if (!move_ttl.if_exists && !getDestinationForMoveTTL(move_ttl))
{
String message;
if (move_ttl.destination_type == DataDestinationType::DISK)
message = "No such disk " + backQuote(move_ttl.destination_name) + " for given storage policy.";
message = "No such disk " + backQuote(move_ttl.destination_name) + " for given storage policy";
else
message = "No such volume " + backQuote(move_ttl.destination_name) + " for given storage policy.";
message = "No such volume " + backQuote(move_ttl.destination_name) + " for given storage policy";
throw Exception(message, ErrorCodes::BAD_TTL_EXPRESSION);
}
}
@ -3363,9 +3364,6 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Active, partition_id);
auto disk = getStoragePolicy()->getDiskByName(name);
if (!disk)
throw Exception("Disk " + name + " does not exists on policy " + getStoragePolicy()->getName(), ErrorCodes::UNKNOWN_DISK);
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
return part_ptr->volume->getDisk()->getName() == disk->getName();
@ -4112,10 +4110,10 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
SpacePtr destination_ptr = getDestinationForMoveTTL(*move_ttl_entry, is_insert);
if (!destination_ptr)
{
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME)
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME && !move_ttl_entry->if_exists)
LOG_WARNING(log, "Would like to reserve space on volume '{}' by TTL rule of table '{}' but volume was not found or rule is not applicable at the moment",
move_ttl_entry->destination_name, log_name);
else if (move_ttl_entry->destination_type == DataDestinationType::DISK)
else if (move_ttl_entry->destination_type == DataDestinationType::DISK && !move_ttl_entry->if_exists)
LOG_WARNING(log, "Would like to reserve space on disk '{}' by TTL rule of table '{}' but disk was not found or rule is not applicable at the moment",
move_ttl_entry->destination_name, log_name);
}
@ -4149,7 +4147,7 @@ SpacePtr MergeTreeData::getDestinationForMoveTTL(const TTLDescription & move_ttl
auto policy = getStoragePolicy();
if (move_ttl.destination_type == DataDestinationType::VOLUME)
{
auto volume = policy->getVolumeByName(move_ttl.destination_name);
auto volume = policy->tryGetVolumeByName(move_ttl.destination_name);
if (!volume)
return {};
@ -4161,7 +4159,8 @@ SpacePtr MergeTreeData::getDestinationForMoveTTL(const TTLDescription & move_ttl
}
else if (move_ttl.destination_type == DataDestinationType::DISK)
{
auto disk = policy->getDiskByName(move_ttl.destination_name);
auto disk = policy->tryGetDiskByName(move_ttl.destination_name);
if (!disk)
return {};

View File

@ -112,6 +112,7 @@ TTLDescription::TTLDescription(const TTLDescription & other)
, aggregate_descriptions(other.aggregate_descriptions)
, destination_type(other.destination_type)
, destination_name(other.destination_name)
, if_exists(other.if_exists)
, recompression_codec(other.recompression_codec)
{
if (other.expression)
@ -149,6 +150,7 @@ TTLDescription & TTLDescription::operator=(const TTLDescription & other)
aggregate_descriptions = other.aggregate_descriptions;
destination_type = other.destination_type;
destination_name = other.destination_name;
if_exists = other.if_exists;
if (other.recompression_codec)
recompression_codec = other.recompression_codec->clone();
@ -185,9 +187,10 @@ TTLDescription TTLDescription::getTTLFromAST(
}
else /// rows TTL
{
result.mode = ttl_element->mode;
result.destination_type = ttl_element->destination_type;
result.destination_name = ttl_element->destination_name;
result.mode = ttl_element->mode;
result.if_exists = ttl_element->if_exists;
if (ttl_element->mode == TTLMode::DELETE)
{

View File

@ -75,6 +75,10 @@ struct TTLDescription
/// Name of destination disk or volume
String destination_name;
/// If true, do nothing if DISK or VOLUME doesn't exist .
/// Only valid for table MOVE TTLs.
bool if_exists = false;
/// Codec name which will be used to recompress data
ASTPtr recompression_codec;

View File

@ -76,6 +76,14 @@
</volumes>
</jbod1_with_jbod2>
<only_jbod_1>
<volumes>
<main>
<disk>jbod1</disk>
</main>
</volumes>
</only_jbod_1>
<only_jbod2>
<volumes>
<main>

View File

@ -1174,3 +1174,57 @@ def test_disabled_ttl_move_on_insert(started_cluster, name, dest_type, engine):
node1.query("DROP TABLE IF EXISTS {} NO DELAY".format(name))
except:
pass
@pytest.mark.parametrize("name,dest_type", [
pytest.param("replicated_mt_move_if_exists", "DISK", id="replicated_disk"),
pytest.param("replicated_mt_move_if_exists", "VOLUME", id="replicated_volume"),
])
def test_ttl_move_if_exists(started_cluster, name, dest_type):
name = unique_table_name(name)
try:
query_template = """
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_mt_move_if_exists', '{node_name}')
ORDER BY tuple()
TTL d1 TO {dest_type} {if_exists} 'external'
SETTINGS storage_policy='{policy}'
"""
with pytest.raises(QueryRuntimeException):
node1.query(query_template.format( \
name=name, node_name=node1.name, dest_type=dest_type, \
if_exists='', policy='only_jbod_1'))
for (node, policy) in zip([node1, node2], ['only_jbod_1', 'small_jbod_with_external']):
node.query(query_template.format( \
name=name, node_name=node.name, dest_type=dest_type, \
if_exists='IF EXISTS', policy=policy))
data = [] # 10MB in total
for i in range(10):
data.append(("randomPrintableASCII(1024*1024)", "toDateTime({})".format(time.time() - 1)))
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
node2.query("SYSTEM SYNC REPLICA {}".format(name))
time.sleep(5)
used_disks1 = get_used_disks_for_table(node1, name)
assert set(used_disks1) == {"jbod1"}
used_disks2 = get_used_disks_for_table(node2, name)
assert set(used_disks2) == {"external"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
assert node2.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
try:
node1.query("DROP TABLE IF EXISTS {} NO DELAY".format(name))
node2.query("DROP TABLE IF EXISTS {} NO DELAY".format(name))
except:
pass

View File

@ -0,0 +1 @@
CREATE TABLE default.t_ttl_move_if_exists\n(\n `d` DateTime,\n `a` UInt32\n)\nENGINE = MergeTree\nORDER BY tuple()\nTTL d TO DISK IF EXISTS \'non_existing_disk\'\nSETTINGS index_granularity = 8192

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS t_ttl_move_if_exists;
CREATE TABLE t_ttl_move_if_exists (d DateTime, a UInt32)
ENGINE = MergeTree ORDER BY tuple()
TTL d TO DISK IF EXISTS 'non_existing_disk';
SHOW CREATE TABLE t_ttl_move_if_exists;
DROP TABLE IF EXISTS t_ttl_move_if_exists;