Rename move partition to partition_expr to move partition to table partition_expr

This commit is contained in:
Guillaume Tassery 2019-09-17 13:59:09 +02:00
parent 3308902c03
commit acaa9cdbad
12 changed files with 36 additions and 57 deletions

View File

@ -182,7 +182,8 @@ void ASTAlterCommand::formatImpl(
case MoveDestinationType::VOLUME:
settings.ostr << "VOLUME ";
break;
case MoveDestinationType::PARTITION:
case MoveDestinationType::TABLE:
settings.ostr << "TABLE ";
if (!to_database.empty())
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
@ -193,7 +194,7 @@ void ASTAlterCommand::formatImpl(
<< (settings.hilite ? hilite_none : "");
return;
}
if (move_destination_type != MoveDestinationType::PARTITION)
if (move_destination_type != MoveDestinationType::TABLE)
{
WriteBufferFromOwnString move_destination_name_buf;
writeQuoted(move_destination_name, move_destination_name_buf);

View File

@ -132,7 +132,7 @@ public:
{
DISK,
VOLUME,
PARTITION,
TABLE,
};
MoveDestinationType move_destination_type;
@ -152,7 +152,7 @@ public:
String from_table;
/// To distinguish REPLACE and ATTACH PARTITION partition FROM db.table
bool replace = true;
/// MOVE PARTITION partition TO db.table
/// MOVE PARTITION partition TO TABLE db.table
String to_database;
String to_table;

View File

@ -63,13 +63,13 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_from("FROM");
ParserKeyword s_to("TO");
ParserKeyword s_in_partition("IN PARTITION");
ParserKeyword s_with("WITH");
ParserKeyword s_name("NAME");
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_to_table("TO TABLE");
ParserKeyword s_delete_where("DELETE WHERE");
ParserKeyword s_update("UPDATE");
@ -240,12 +240,16 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->move_destination_type = ASTAlterCommand::MoveDestinationType::DISK;
else if (s_to_volume.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::VOLUME;
else if (s_to.ignore(pos) && parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::PARTITION;
else if (s_to_table.ignore(pos))
{
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = ASTAlterCommand::MoveDestinationType::TABLE;
}
else
return false;
if (command->move_destination_type != ASTAlterCommand::MoveDestinationType::PARTITION)
if (command->move_destination_type != ASTAlterCommand::MoveDestinationType::TABLE)
{
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))
@ -265,14 +269,16 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->move_destination_type = ASTAlterCommand::MoveDestinationType::DISK;
else if (s_to_volume.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::VOLUME;
else if (s_to.ignore(pos) && parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
else if (s_to_table.ignore(pos))
{
command->move_destination_type = ASTAlterCommand::MoveDestinationType::PARTITION;
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = ASTAlterCommand::MoveDestinationType::TABLE;
}
else
return false;
if (command->move_destination_type != ASTAlterCommand::MoveDestinationType::PARTITION)
if (command->move_destination_type != ASTAlterCommand::MoveDestinationType::TABLE)
{
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))

View File

@ -53,13 +53,13 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
case ASTAlterCommand::MoveDestinationType::VOLUME:
res.move_destination_type = PartitionCommand::MoveDestinationType::VOLUME;
break;
case ASTAlterCommand::MoveDestinationType::PARTITION:
res.move_destination_type = PartitionCommand::MoveDestinationType::PARTITION;
case ASTAlterCommand::MoveDestinationType::TABLE:
res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE;
res.to_database = command_ast->to_database;
res.to_table = command_ast->to_table;
break;
}
if (res.move_destination_type != PartitionCommand::MoveDestinationType::PARTITION)
if (res.move_destination_type != PartitionCommand::MoveDestinationType::TABLE)
res.move_destination_name = command_ast->move_destination_name;
return res;
}

View File

@ -61,7 +61,7 @@ struct PartitionCommand
{
DISK,
VOLUME,
PARTITION,
TABLE,
};
MoveDestinationType move_destination_type;

View File

@ -988,11 +988,11 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, context);
break;
case PartitionCommand::MoveDestinationType::PARTITION:
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database;
auto dest_storage = context.getTable(dest_database, command.to_table);
movePartitionTo(dest_storage, command.partition, context);
movePartitionToTable(dest_storage, command.partition, context);
break;
}
@ -1170,7 +1170,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
}
}
void StorageMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
@ -1220,13 +1220,10 @@ void StorageMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTP
try
{
{
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
/// and we should be able to rollback already added (Precomitted) parts
Transaction transaction(*dest_table_storage);
auto data_parts_lock = lockParts();
/// Populate transaction
for (MutableDataPartPtr & part : dst_parts)
dest_table_storage->renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);

View File

@ -129,7 +129,7 @@ private:
void clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context);
void attachPartition(const ASTPtr & partition, bool part, const Context & context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
friend class MergeTreeBlockOutputStream;

View File

@ -3445,11 +3445,11 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::MoveDestinationType::VOLUME:
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
break;
case PartitionCommand::MoveDestinationType::PARTITION:
case PartitionCommand::MoveDestinationType::TABLE:
checkPartitionCanBeDropped(command.partition);
String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database;
auto dest_storage = query_context.getTable(dest_database, command.to_table);
movePartitionTo(dest_storage, command.partition, query_context);
movePartitionToTable(dest_storage, command.partition, query_context);
break;
}
}
@ -4975,7 +4975,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
}
}
void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = dest_table->lockStructureForShare(false, context.getCurrentQueryId());
@ -5001,9 +5001,6 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
@ -5014,25 +5011,12 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
if (drop_range.getBlocksCount() > 1)
{
/// We have to prohibit merges in drop_range, since new merge log entry appeared after this REPLACE FROM entry
/// could produce new merged part instead in place of just deleted parts.
/// It is better to prohibit them on leader replica (like DROP PARTITION makes),
/// but it is inconvenient for a user since he could actually use source table from this replica.
/// Therefore prohibit merges on the initializer server now and on the remaining servers when log entry will be executed.
/// It does not provides strong guarantees, but is suitable for intended use case (assume merges are quite rare).
{
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInBlockRange(drop_range_fake_part_name);
}
std::lock_guard merge_selecting_lock(merge_selecting_mutex);
queue.disableMergesInBlockRange(drop_range_fake_part_name);
}
for (size_t i = 0; i < src_all_parts.size(); ++i)
{
/// We also make some kind of deduplication to avoid duplicated parts in case of ATTACH PARTITION
/// Assume that merges in the partition are quite rare
/// Save deduplication block ids with special prefix replace_partition
auto & src_part = src_all_parts[i];
if (!dest_table_storage->canReplacePartition(src_part))
@ -5041,7 +5025,7 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
ErrorCodes::LOGICAL_ERROR);
String hash_hex = src_part->checksums.getTotalChecksumHex();
String block_id_path = ""; //(dest_table_storage->zookeeper_path + "/blocks/" + partition_id + "_replace_from_" + hash_hex);
String block_id_path = "";
auto lock = dest_table_storage->allocateBlockNumber(partition_id, zookeeper, block_id_path);
if (!lock)
@ -5096,7 +5080,6 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
entry_replace.columns_version = columns_version;
}
/// We are almost ready to commit changes, remove fetches and merges from drop range
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
@ -5114,7 +5097,6 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
/// It is unnecessary to add parts to working set until we commit log entry
zookeeper->multi(ops);
ops.clear();
}
@ -5153,14 +5135,11 @@ void StorageReplicatedMergeTree::movePartitionTo(const StoragePtr & dest_table,
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove);
/// Speedup removing of replaced parts from filesystem
parts_to_remove.clear();
cleanup_thread.wakeup();
/// If necessary, wait until the operation is performed on all replicas.
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
dest_table_storage->waitForAllReplicasToProcessLogEntry(entry);

View File

@ -517,7 +517,7 @@ private:
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
void attachPartition(const ASTPtr & partition, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void movePartitionTo(const StoragePtr & source_table, const ASTPtr & partition, const Context & query_context);
void movePartitionToTable(const StoragePtr & source_table, const ASTPtr & partition, const Context & query_context);
void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context);
/// Check granularity of already existing replicated table in zookeeper if it exists

View File

@ -20,7 +20,7 @@ INSERT INTO test_move_partition_src SELECT number % 2, number FROM system.number
SELECT count() FROM test_move_partition_src;
SELECT count() FROM test_move_partition_dest;
ALTER TABLE test_move_partition_src MOVE PARTITION 1 TO test_move_partition_dest;
ALTER TABLE test_move_partition_src MOVE PARTITION 1 TO TABLE test_move_partition_dest;
SELECT count() FROM test_move_partition_src;
SELECT count() FROM test_move_partition_dest;

View File

@ -1,9 +1,5 @@
#!/usr/bin/env bash
# Because REPLACE PARTITION does not forces immediate removal of replaced data parts from local filesystem
# (it tries to do it as quick as possible, but it still performed in separate thread asynchronously)
# and when we do DETACH TABLE / ATTACH TABLE or SYSTEM RESTART REPLICA, these files may be discovered
# and discarded after restart with Warning/Error messages in log. This is Ok.
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
@ -48,7 +44,7 @@ $CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.dst;"
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE simple';"
query_with_retry "ALTER TABLE test.src MOVE PARTITION 1 TO test.dst;"
query_with_retry "ALTER TABLE test.src MOVE PARTITION 1 TO TABLE test.dst;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA test.dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM test.src;"

View File

@ -273,7 +273,7 @@ For the query to run successfully, the following conditions must be met:
#### MOVE PARTITION {#alter_move-partition}
``` sql
ALTER TABLE table_source MOVE PARTITION partition_expr TO table_dest
ALTER TABLE table_source MOVE PARTITION partition_expr TO TABLE table_dest
```
This query move the data partition from the `table_source` to `table_dest` with deleting the data from `table_source`.