[clickhouse][server][dll][alter]support fetch part

This commit is contained in:
songenjie 2021-04-13 12:40:33 +08:00
parent 3effb74d31
commit 564136ec46
15 changed files with 198 additions and 31 deletions

View File

@ -62,7 +62,7 @@ enum class AccessType
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION, FETCH PART", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_TABLE, "", GROUP, ALTER) \

View File

@ -549,6 +549,8 @@
M(579, INCORRECT_PART_TYPE) \
M(580, CANNOT_SET_ROUNDING_MODE) \
M(581, TOO_LARGE_DISTRIBUTED_DEPTH) \
M(582, PART_DOESNT_EXIST) \
M(583, PART_ALREADY_EXISTS) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -245,7 +245,7 @@ void ASTAlterCommand::formatImpl(
else if (type == ASTAlterCommand::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
<< (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << DB::quote << from;

View File

@ -61,6 +61,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_fetch_part("FETCH PART");
ParserKeyword s_replace_partition("REPLACE PARTITION");
ParserKeyword s_freeze("FREEZE");
ParserKeyword s_unfreeze("UNFREEZE");
@ -428,6 +429,21 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_fetch_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->part = true;
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_freeze.ignore(pos, expected))
{
if (s_partition.ignore(pos, expected))

View File

@ -2909,7 +2909,12 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, ContextPtr /*query_context*/)
void MergeTreeData::fetchPartition(
const ASTPtr & /*partition*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const String & /*from*/,
bool /*fetch_part*/,
ContextPtr /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName());
}
@ -2972,7 +2977,7 @@ Pipe MergeTreeData::alterPartition(
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, command.part, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:

View File

@ -970,7 +970,12 @@ protected:
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;
/// Makes sense only for replicated tables
virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context);
virtual void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context);
void writePartLog(
PartLogElement::Type type,

View File

@ -82,6 +82,7 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.type = FETCH_PARTITION;
res.partition = command_ast->partition;
res.from_zookeeper_path = command_ast->from;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION)
@ -140,7 +141,10 @@ std::string PartitionCommand::typeToString() const
else
return "DROP DETACHED PARTITION";
case PartitionCommand::Type::FETCH_PARTITION:
return "FETCH PARTITION";
if (part)
return "FETCH PART";
else
return "FETCH PARTITION";
case PartitionCommand::Type::FREEZE_ALL_PARTITIONS:
return "FREEZE ALL";
case PartitionCommand::Type::FREEZE_PARTITION:

View File

@ -112,9 +112,11 @@ namespace ErrorCodes
extern const int NOT_A_LEADER;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int PARTITION_ALREADY_EXISTS;
extern const int PART_ALREADY_EXISTS;
extern const int TOO_MANY_RETRIES_TO_FETCH_PARTS;
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int PARTITION_DOESNT_EXIST;
extern const int PART_DOESNT_EXIST;
extern const int UNFINISHED;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
extern const int TOO_MANY_FETCHES;
@ -5356,11 +5358,11 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
}
}
void StorageReplicatedMergeTree::fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from_,
bool fetch_part,
ContextPtr query_context)
{
Macros::MacroExpansionInfo info;
@ -5373,40 +5375,54 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
String partition_id = getPartitionIDFromQuery(partition, query_context);
zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name)
{
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
}
else
{
zookeeper = getZooKeeper();
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
}
if (from.back() == '/')
from.resize(from.size() - 1);
if (fetch_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto replica_path_ = findReplicaHavingPart(part_name, from, zookeeper);
if (replica_path_.empty())
throw Exception("fetch part " + part_name + " not exists !", ErrorCodes::PART_DOESNT_EXIST);
/** Let's check that there is no such part in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a part may appear a little later.
*/
if (checkDetachPartIfExists(part_name))
throw Exception("Detached part " + part_name + " already exists.", ErrorCodes::PART_ALREADY_EXISTS);
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name);
try
{
/// part name , metadata, replica path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, replica_path_, true, 0, zookeeper))
throw Exception("fetch part " + part_name + " failed! ", ErrorCodes::UNFINISHED);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
&& e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
throw;
LOG_INFO(log, e.displayText());
}
return;
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later.
*/
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
}
if (checkDetachPartitionIfExists(partition_id))
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
zkutil::Strings replicas;
zkutil::Strings active_replicas;
@ -6913,4 +6929,46 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_)
{
Strings replicas = zookeeper_->getChildren(zookeeper_path_ + "/replicas");
/// Select replicas in uniformly random order.
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
for (const String & replica : replicas)
{
if (zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/parts/" + part_name)
&& zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/is_active"))
return zookeeper_path_ + "/replicas/" + replica;
}
return {};
}
bool StorageReplicatedMergeTree::checkDetachPartIfExists(const String & part_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
if (dir_it.name() == part_name)
return true;
return false;
}
bool StorageReplicatedMergeTree::checkDetachPartitionIfExists(const String & partition_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) && part_info.partition_id == partition_name)
return true;
}
}
return false;
}
}

View File

@ -522,8 +522,11 @@ private:
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
bool checkReplicaHavePart(const String & replica, const String & part_name);
bool checkDetachPartIfExists(const String & part_name);
bool checkDetachPartitionIfExists(const String & partition_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
@ -626,7 +629,12 @@ private:
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed

View File

@ -156,6 +156,7 @@
"extractURLParameterNames"
"extractURLParameters"
"FETCH PARTITION"
"FETCH PART"
"FINAL"
"FIRST"
"firstSignificantSubdomain"

View File

@ -0,0 +1,28 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
</zookeeper>
<auxiliary_zookeepers>
<zookeeper2>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
</zookeeper2>
</auxiliary_zookeepers>
</yandex>

View File

@ -0,0 +1,40 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/zookeeper_config.xml"], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_fetch_part_from_allowed_zookeeper(start_cluster):
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
node.query(
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query(
"ALTER TABLE simple2 FETCH PART '20200827_1_1_0' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
)
node.query("ALTER TABLE simple2 ATTACH PART '20200827_1_1_0';")
with pytest.raises(QueryRuntimeException):
node.query(
"ALTER TABLE simple2 FETCH PART '20200827_1_1_0' FROM 'zookeeper:/clickhouse/tables/0/simple';"
)
assert node.query("SELECT id FROM simple2").strip() == "1"

View File

@ -18,7 +18,7 @@ def start_cluster():
cluster.shutdown()
def test_fetch_part_from_allowed_zookeeper(start_cluster):
def test_fetch_partition_from_allowed_zookeeper(start_cluster):
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)

View File

@ -89,7 +89,7 @@ def grant_option_check(grant_option_target, grant_target, user_name, table_type,
@Examples("privilege", [
("ALTER MOVE PARTITION",), ("ALTER MOVE PART",), ("MOVE PARTITION",), ("MOVE PART",),
("ALTER DELETE",), ("DELETE",),
("ALTER FETCH PARTITION",), ("FETCH PARTITION",),
("ALTER FETCH PARTITION",), ("ALTER FETCH PART",), ("FETCH PARTITION",), ("FETCH PART",),
("ALTER FREEZE PARTITION",), ("FREEZE PARTITION",),
("ALTER UPDATE",), ("UPDATE",),
("ALTER ADD COLUMN",), ("ADD COLUMN",),