mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Alter table freeze with verbose output
This commit is contained in:
parent
3b5cd9412f
commit
c75830e878
@ -393,6 +393,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingBool, allow_experimental_geo_types, false, "Allow geo data types such as Point, Ring, Polygon, MultiPolygon", 0) \
|
||||
M(SettingBool, data_type_default_nullable, false, "Data types without NULL or NOT NULL will make Nullable", 0) \
|
||||
M(SettingBool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
|
||||
M(SettingBool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
|
||||
\
|
||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||
\
|
||||
|
@ -34,6 +34,7 @@ InterpreterAlterQuery::InterpreterAlterQuery(const ASTPtr & query_ptr_, const Co
|
||||
|
||||
BlockIO InterpreterAlterQuery::execute()
|
||||
{
|
||||
BlockIO res;
|
||||
const auto & alter = query_ptr->as<ASTAlterQuery &>();
|
||||
|
||||
if (!alter.cluster.empty())
|
||||
@ -86,7 +87,13 @@ BlockIO InterpreterAlterQuery::execute()
|
||||
if (!partition_commands.empty())
|
||||
{
|
||||
table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, context.getSettingsRef());
|
||||
table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
|
||||
auto partition_commands_pipes = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
|
||||
if (!partition_commands_pipes.empty())
|
||||
{
|
||||
QueryPipeline pipeline;
|
||||
pipeline.init(std::move(partition_commands_pipes));
|
||||
res.pipeline = std::move(pipeline);
|
||||
}
|
||||
}
|
||||
|
||||
if (!live_view_commands.empty())
|
||||
@ -113,7 +120,7 @@ BlockIO InterpreterAlterQuery::execute()
|
||||
table->alter(alter_commands, context, alter_lock);
|
||||
}
|
||||
|
||||
return {};
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
|
@ -65,10 +65,13 @@ void Chunk::setColumns(MutableColumns columns_, UInt64 num_rows_)
|
||||
|
||||
void Chunk::checkNumRowsIsConsistent()
|
||||
{
|
||||
for (auto & column : columns)
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
{
|
||||
auto & column = columns[i];
|
||||
if (column->size() != num_rows)
|
||||
throw Exception("Invalid number of rows in Chunk column " + column->getName()+ ": expected " +
|
||||
throw Exception("Invalid number of rows in Chunk column " + column->getName()+ " position " + toString(i) + ": expected " +
|
||||
toString(num_rows) + ", got " + toString(column->size()), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
MutableColumns Chunk::mutateColumns()
|
||||
|
@ -355,7 +355,7 @@ public:
|
||||
/** ALTER tables with regard to its partitions.
|
||||
* Should handle locks for each command on its own.
|
||||
*/
|
||||
virtual void alterPartition(const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, const PartitionCommands & /* commands */, const Context & /* context */)
|
||||
virtual Pipes alterPartition(const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, const PartitionCommands & /* commands */, const Context & /* context */)
|
||||
{
|
||||
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -1662,9 +1662,9 @@ void MergeTreeData::changeSettings(
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::freezeAll(const String & with_name, const StorageMetadataPtr & metadata_snapshot, const Context & context, TableLockHolder &)
|
||||
PartitionCommandsResultInfo MergeTreeData::freezeAll(const String & with_name, const StorageMetadataPtr & metadata_snapshot, const Context & context, TableLockHolder &)
|
||||
{
|
||||
freezePartitionsByMatcher([] (const DataPartPtr &){ return true; }, metadata_snapshot, with_name, context);
|
||||
return freezePartitionsByMatcher([] (const DataPartPtr &) { return true; }, metadata_snapshot, with_name, context);
|
||||
}
|
||||
|
||||
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
|
||||
@ -2468,7 +2468,7 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context, TableLockHolder &)
|
||||
PartitionCommandsResultInfo MergeTreeData::freezePartition(const ASTPtr & partition_ast, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context, TableLockHolder &)
|
||||
{
|
||||
std::optional<String> prefix;
|
||||
String partition_id;
|
||||
@ -2492,7 +2492,7 @@ void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const StorageM
|
||||
LOG_DEBUG(log, "Freezing parts with partition ID {}", partition_id);
|
||||
|
||||
|
||||
freezePartitionsByMatcher(
|
||||
return freezePartitionsByMatcher(
|
||||
[&prefix, &partition_id](const DataPartPtr & part)
|
||||
{
|
||||
if (prefix)
|
||||
@ -3319,7 +3319,7 @@ MergeTreeData::PathsWithDisks MergeTreeData::getRelativeDataPathsWithDisks() con
|
||||
return res;
|
||||
}
|
||||
|
||||
void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context)
|
||||
PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context)
|
||||
{
|
||||
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
|
||||
String default_shadow_path = clickhouse_path + "shadow/";
|
||||
@ -3331,6 +3331,10 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const StorageMe
|
||||
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
|
||||
const auto data_parts = getDataParts();
|
||||
|
||||
String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment));
|
||||
|
||||
PartitionCommandsResultInfo result;
|
||||
|
||||
size_t parts_processed = 0;
|
||||
for (const auto & part : data_parts)
|
||||
{
|
||||
@ -3339,11 +3343,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const StorageMe
|
||||
|
||||
part->volume->getDisk()->createDirectories(shadow_path);
|
||||
|
||||
String backup_path = shadow_path
|
||||
+ (!with_name.empty()
|
||||
? escapeForFileName(with_name)
|
||||
: toString(increment))
|
||||
+ "/";
|
||||
String backup_path = shadow_path + backup_name + "/";
|
||||
|
||||
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
|
||||
|
||||
@ -3356,10 +3356,17 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const StorageMe
|
||||
part->volume->getDisk()->removeIfExists(backup_part_path + "/" + DELETE_ON_DESTROY_MARKER_PATH);
|
||||
|
||||
part->is_frozen.store(true, std::memory_order_relaxed);
|
||||
result.push_back(PartitionCommandResultInfo{
|
||||
.partition_id = part->info.partition_id,
|
||||
.part_name = part->name,
|
||||
.backup_path = backup_path,
|
||||
.backup_name = backup_name,
|
||||
});
|
||||
++parts_processed;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Freezed {} parts", parts_processed);
|
||||
return result;
|
||||
}
|
||||
|
||||
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
|
||||
|
@ -515,7 +515,7 @@ public:
|
||||
TableLockHolder & table_lock_holder);
|
||||
|
||||
/// Freezes all parts.
|
||||
void freezeAll(
|
||||
PartitionCommandsResultInfo freezeAll(
|
||||
const String & with_name,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const Context & context,
|
||||
@ -541,7 +541,7 @@ public:
|
||||
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
|
||||
* or if 'with_name' is specified - backup is created in directory with specified name.
|
||||
*/
|
||||
void freezePartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context, TableLockHolder & table_lock_holder);
|
||||
PartitionCommandsResultInfo freezePartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context, TableLockHolder & table_lock_holder);
|
||||
|
||||
|
||||
public:
|
||||
@ -836,7 +836,7 @@ protected:
|
||||
|
||||
/// Common part for |freezePartition()| and |freezeAll()|.
|
||||
using MatcherFn = std::function<bool(const DataPartPtr &)>;
|
||||
void freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context);
|
||||
PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, const Context & context);
|
||||
|
||||
bool canReplacePartition(const DataPartPtr & src_part) const;
|
||||
|
||||
|
@ -3,6 +3,9 @@
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <Parsers/ASTAlterQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -97,4 +100,86 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string PartitionCommand::typeToString() const
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case PartitionCommand::Type::ATTACH_PARTITION:
|
||||
if (part)
|
||||
return "ATTACH PART";
|
||||
else
|
||||
return "ATTACH PARTITION";
|
||||
case PartitionCommand::Type::MOVE_PARTITION:
|
||||
return "MOVE PARTITION";
|
||||
case PartitionCommand::Type::DROP_PARTITION:
|
||||
if (detach)
|
||||
return "DETACH PARTITION";
|
||||
else
|
||||
return "DROP PARTITION";
|
||||
case PartitionCommand::Type::DROP_DETACHED_PARTITION:
|
||||
if (part)
|
||||
return "DROP DETACHED PART";
|
||||
else
|
||||
return "DROP DETACHED PARTITION";
|
||||
case PartitionCommand::Type::FETCH_PARTITION:
|
||||
return "FETCH PARTITION";
|
||||
case PartitionCommand::Type::FREEZE_ALL_PARTITIONS:
|
||||
return "FREEZE ALL";
|
||||
case PartitionCommand::Type::FREEZE_PARTITION:
|
||||
return "FREEZE PARTITION";
|
||||
case PartitionCommand::Type::REPLACE_PARTITION:
|
||||
return "REPLACE PARTITION";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
std::shared_ptr<SourceFromSingleChunk> convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result)
|
||||
{
|
||||
Block header {
|
||||
ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "command_type"),
|
||||
ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "partition_id"),
|
||||
ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "part_name"),
|
||||
};
|
||||
|
||||
for (const auto & command_result : commands_result)
|
||||
{
|
||||
if (!command_result.old_part_name.empty() && !header.has("old_part_name"))
|
||||
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "old_part_name"));
|
||||
|
||||
if (!command_result.backup_name.empty() && !header.has("backup_name"))
|
||||
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "backup_name"));
|
||||
|
||||
if (!command_result.backup_path.empty() && !header.has("backup_path"))
|
||||
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "backup_path"));
|
||||
}
|
||||
|
||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||
|
||||
for (const auto & command_result : commands_result)
|
||||
{
|
||||
res_columns[0]->insert(command_result.command_type);
|
||||
res_columns[1]->insert(command_result.partition_id);
|
||||
res_columns[2]->insert(command_result.part_name);
|
||||
if (header.has("old_part_name"))
|
||||
{
|
||||
size_t pos = header.getPositionByName("old_part_name");
|
||||
res_columns[pos]->insert(command_result.old_part_name);
|
||||
}
|
||||
if (header.has("backup_name"))
|
||||
{
|
||||
size_t pos = header.getPositionByName("backup_name");
|
||||
res_columns[pos]->insert(command_result.backup_name);
|
||||
}
|
||||
if (header.has("backup_path"))
|
||||
{
|
||||
size_t pos = header.getPositionByName("backup_path");
|
||||
res_columns[pos]->insert(command_result.backup_path);
|
||||
}
|
||||
}
|
||||
|
||||
Chunk chunk(std::move(res_columns), commands_result.size());
|
||||
|
||||
return std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Core/Types.h>
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
@ -66,9 +67,39 @@ struct PartitionCommand
|
||||
String move_destination_name;
|
||||
|
||||
static std::optional<PartitionCommand> parse(const ASTAlterCommand * command);
|
||||
/// Convert type of the command to string (use not only type, but also
|
||||
/// different flags)
|
||||
std::string typeToString() const;
|
||||
};
|
||||
|
||||
using PartitionCommands = std::vector<PartitionCommand>;
|
||||
|
||||
/// Result of exectuin of a single partition commands. Partition commands quite
|
||||
/// different, so some fields will be empty for some commands. Currently used in
|
||||
/// ATTACH and FREEZE commands.
|
||||
struct PartitionCommandResultInfo
|
||||
{
|
||||
/// Command type, always filled
|
||||
String command_type;
|
||||
/// Partition id, always filled
|
||||
String partition_id;
|
||||
/// Part name, always filled
|
||||
String part_name;
|
||||
/// Part name in /detached directory, filled in ATTACH
|
||||
String old_part_name;
|
||||
/// Path to backup directory, filled in FREEZE
|
||||
String backup_path;
|
||||
/// Name of the backup (specified by user or increment value), filled in
|
||||
/// FREEZE
|
||||
String backup_name;
|
||||
};
|
||||
|
||||
using PartitionCommandsResultInfo = std::vector<PartitionCommandResultInfo>;
|
||||
|
||||
/// Convert partition comands result to Source from single Chunk, which will be
|
||||
/// used to print info to the user. Tries to create narrowest table for given
|
||||
/// results. For example, if all commands were FREEZE commands, than
|
||||
/// old_part_name column will be absent.
|
||||
std::shared_ptr<SourceFromSingleChunk> convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result);
|
||||
|
||||
}
|
||||
|
@ -250,11 +250,11 @@ void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & command
|
||||
}
|
||||
}
|
||||
|
||||
void StorageMaterializedView::alterPartition(
|
||||
Pipes StorageMaterializedView::alterPartition(
|
||||
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context)
|
||||
{
|
||||
checkStatementCanBeForwarded();
|
||||
getTargetTable()->alterPartition(query, metadata_snapshot, commands, context);
|
||||
return getTargetTable()->alterPartition(query, metadata_snapshot, commands, context);
|
||||
}
|
||||
|
||||
void StorageMaterializedView::checkAlterPartitionIsPossible(
|
||||
|
@ -51,7 +51,7 @@ public:
|
||||
|
||||
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override;
|
||||
|
||||
void alterPartition(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context) override;
|
||||
Pipes alterPartition(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context) override;
|
||||
|
||||
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;
|
||||
|
||||
|
@ -1007,24 +1007,29 @@ bool StorageMergeTree::optimize(
|
||||
return true;
|
||||
}
|
||||
|
||||
void StorageMergeTree::alterPartition(
|
||||
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context)
|
||||
Pipes StorageMergeTree::alterPartition(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const PartitionCommands & commands,
|
||||
const Context & query_context)
|
||||
{
|
||||
PartitionCommandsResultInfo result;
|
||||
for (const PartitionCommand & command : commands)
|
||||
{
|
||||
PartitionCommandsResultInfo current_command_results;
|
||||
switch (command.type)
|
||||
{
|
||||
case PartitionCommand::DROP_PARTITION:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
dropPartition(command.partition, command.detach, context);
|
||||
dropPartition(command.partition, command.detach, query_context);
|
||||
break;
|
||||
|
||||
case PartitionCommand::DROP_DETACHED_PARTITION:
|
||||
dropDetached(command.partition, command.part, context);
|
||||
dropDetached(command.partition, command.part, query_context);
|
||||
break;
|
||||
|
||||
case PartitionCommand::ATTACH_PARTITION:
|
||||
attachPartition(command.partition, command.part, context);
|
||||
current_command_results = attachPartition(command.partition, command.part, query_context);
|
||||
break;
|
||||
|
||||
case PartitionCommand::MOVE_PARTITION:
|
||||
@ -1032,18 +1037,18 @@ void StorageMergeTree::alterPartition(
|
||||
switch (*command.move_destination_type)
|
||||
{
|
||||
case PartitionCommand::MoveDestinationType::DISK:
|
||||
movePartitionToDisk(command.partition, command.move_destination_name, command.part, context);
|
||||
movePartitionToDisk(command.partition, command.move_destination_name, command.part, query_context);
|
||||
break;
|
||||
|
||||
case PartitionCommand::MoveDestinationType::VOLUME:
|
||||
movePartitionToVolume(command.partition, command.move_destination_name, command.part, context);
|
||||
movePartitionToVolume(command.partition, command.move_destination_name, command.part, query_context);
|
||||
break;
|
||||
|
||||
case PartitionCommand::MoveDestinationType::TABLE:
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String dest_database = context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, context);
|
||||
movePartitionToTable(dest_storage, command.partition, context);
|
||||
String dest_database = query_context.resolveDatabase(command.to_database);
|
||||
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
|
||||
movePartitionToTable(dest_storage, command.partition, query_context);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1053,30 +1058,44 @@ void StorageMergeTree::alterPartition(
|
||||
case PartitionCommand::REPLACE_PARTITION:
|
||||
{
|
||||
checkPartitionCanBeDropped(command.partition);
|
||||
String from_database = context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, context);
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, context);
|
||||
String from_database = query_context.resolveDatabase(command.from_database);
|
||||
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
|
||||
replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
|
||||
}
|
||||
break;
|
||||
|
||||
case PartitionCommand::FREEZE_PARTITION:
|
||||
{
|
||||
auto lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
freezePartition(command.partition, metadata_snapshot, command.with_name, context, lock);
|
||||
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
|
||||
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
|
||||
}
|
||||
break;
|
||||
|
||||
case PartitionCommand::FREEZE_ALL_PARTITIONS:
|
||||
{
|
||||
auto lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
|
||||
freezeAll(command.with_name, metadata_snapshot, context, lock);
|
||||
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
|
||||
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
IStorage::alterPartition(query, metadata_snapshot, commands, context); // should throw an exception.
|
||||
IStorage::alterPartition(query, metadata_snapshot, commands, query_context); // should throw an exception.
|
||||
}
|
||||
|
||||
for (auto & command_result : current_command_results)
|
||||
command_result.command_type = command.typeToString();
|
||||
result.insert(result.end(), current_command_results.begin(), current_command_results.end());
|
||||
}
|
||||
|
||||
if (query_context.getSettingsRef().alter_partition_verbose_result)
|
||||
{
|
||||
auto source = convertCommandsResultToSource(result);
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(Pipe(source));
|
||||
return pipes;
|
||||
}
|
||||
|
||||
return { };
|
||||
}
|
||||
|
||||
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context)
|
||||
@ -1114,24 +1133,32 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
|
||||
}
|
||||
|
||||
|
||||
void StorageMergeTree::attachPartition(
|
||||
PartitionCommandsResultInfo StorageMergeTree::attachPartition(
|
||||
const ASTPtr & partition, bool attach_part, const Context & context)
|
||||
{
|
||||
// TODO: should get some locks to prevent race with 'alter … modify column'
|
||||
|
||||
PartitionCommandsResultInfo results;
|
||||
PartsTemporaryRename renamed_parts(*this, "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts);
|
||||
|
||||
for (size_t i = 0; i < loaded_parts.size(); ++i)
|
||||
{
|
||||
LOG_INFO(log, "Attaching part {} from {}", loaded_parts[i]->name, renamed_parts.old_and_new_names[i].second);
|
||||
String old_name = renamed_parts.old_and_new_names[i].first;
|
||||
renameTempPartAndAdd(loaded_parts[i], &increment);
|
||||
renamed_parts.old_and_new_names[i].first.clear();
|
||||
|
||||
results.push_back(PartitionCommandResultInfo{
|
||||
.partition_id = loaded_parts[i]->info.partition_id,
|
||||
.part_name = loaded_parts[i]->name,
|
||||
.old_part_name = old_name,
|
||||
});
|
||||
|
||||
LOG_INFO(log, "Finished attaching part");
|
||||
}
|
||||
|
||||
/// New parts with other data may appear in place of deleted parts.
|
||||
context.dropCaches();
|
||||
return results;
|
||||
}
|
||||
|
||||
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
|
||||
|
@ -61,7 +61,7 @@ public:
|
||||
bool deduplicate,
|
||||
const Context & context) override;
|
||||
|
||||
void alterPartition(
|
||||
Pipes alterPartition(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & /* metadata_snapshot */,
|
||||
const PartitionCommands & commands,
|
||||
@ -149,7 +149,8 @@ private:
|
||||
|
||||
// Partition helpers
|
||||
void dropPartition(const ASTPtr & partition, bool detach, const Context & context);
|
||||
void attachPartition(const ASTPtr & partition, bool part, const Context & context);
|
||||
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, bool part, const Context & context);
|
||||
|
||||
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
|
||||
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);
|
||||
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
|
||||
|
@ -3851,14 +3851,16 @@ void StorageReplicatedMergeTree::alter(
|
||||
}
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::alterPartition(
|
||||
Pipes StorageReplicatedMergeTree::alterPartition(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const PartitionCommands & commands,
|
||||
const Context & query_context)
|
||||
{
|
||||
PartitionCommandsResultInfo result;
|
||||
for (const PartitionCommand & command : commands)
|
||||
{
|
||||
PartitionCommandsResultInfo current_command_results;
|
||||
switch (command.type)
|
||||
{
|
||||
case PartitionCommand::DROP_PARTITION:
|
||||
@ -3871,7 +3873,7 @@ void StorageReplicatedMergeTree::alterPartition(
|
||||
break;
|
||||
|
||||
case PartitionCommand::ATTACH_PARTITION:
|
||||
attachPartition(command.partition, metadata_snapshot, command.part, query_context);
|
||||
current_command_results = attachPartition(command.partition, metadata_snapshot, command.part, query_context);
|
||||
break;
|
||||
case PartitionCommand::MOVE_PARTITION:
|
||||
{
|
||||
@ -3911,18 +3913,31 @@ void StorageReplicatedMergeTree::alterPartition(
|
||||
case PartitionCommand::FREEZE_PARTITION:
|
||||
{
|
||||
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
|
||||
freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
|
||||
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
|
||||
}
|
||||
break;
|
||||
|
||||
case PartitionCommand::FREEZE_ALL_PARTITIONS:
|
||||
{
|
||||
auto lock = lockForShare(query_context.getCurrentQueryId(), query_context.getSettingsRef().lock_acquire_timeout);
|
||||
freezeAll(command.with_name, metadata_snapshot, query_context, lock);
|
||||
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
|
||||
}
|
||||
break;
|
||||
}
|
||||
for (auto & command_result : current_command_results)
|
||||
command_result.command_type = command.typeToString();
|
||||
result.insert(result.end(), current_command_results.begin(), current_command_results.end());
|
||||
}
|
||||
|
||||
if (query_context.getSettingsRef().alter_partition_verbose_result)
|
||||
{
|
||||
auto source = convertCommandsResultToSource(result);
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(Pipe(source));
|
||||
return pipes;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
@ -4028,12 +4043,15 @@ void StorageReplicatedMergeTree::truncate(
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool attach_part, const Context & query_context)
|
||||
PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
|
||||
const ASTPtr & partition,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
bool attach_part,
|
||||
const Context & query_context)
|
||||
{
|
||||
// TODO: should get some locks to prevent race with 'alter … modify column'
|
||||
|
||||
assertNotReadonly();
|
||||
|
||||
PartitionCommandsResultInfo results;
|
||||
PartsTemporaryRename renamed_parts(*this, "detached/");
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
|
||||
|
||||
@ -4044,7 +4062,13 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, const
|
||||
output.writeExistingPart(loaded_parts[i]);
|
||||
renamed_parts.old_and_new_names[i].first.clear();
|
||||
LOG_DEBUG(log, "Attached part {} as {}", old_name, loaded_parts[i]->name);
|
||||
results.push_back(PartitionCommandResultInfo{
|
||||
.partition_id = loaded_parts[i]->info.partition_id,
|
||||
.part_name = loaded_parts[i]->name,
|
||||
.old_part_name = old_name,
|
||||
});
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
|
||||
|
@ -111,7 +111,7 @@ public:
|
||||
|
||||
void alter(const AlterCommands & params, const Context & query_context, TableLockHolder & table_lock_holder) override;
|
||||
|
||||
void alterPartition(
|
||||
Pipes alterPartition(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const PartitionCommands & commands,
|
||||
@ -534,7 +534,7 @@ private:
|
||||
|
||||
// Partition helpers
|
||||
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
|
||||
void attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context);
|
||||
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context);
|
||||
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
|
||||
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context);
|
||||
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, const Context & query_context);
|
||||
|
@ -0,0 +1,18 @@
|
||||
command_type partition_id part_name backup_name backup_path
|
||||
FREEZE ALL 0 0_1_1_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 1 1_2_2_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 2 2_3_3_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 3 3_4_4_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 4 4_5_5_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 5 5_6_6_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 6 6_7_7_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 7 7_8_8_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 8 8_9_9_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 9 9_10_10_0 test_01417 shadow/test_01417/
|
||||
command_type partition_id part_name backup_name backup_path
|
||||
FREEZE PARTITION 3 3_4_4_0 test_01417_single_part shadow/test_01417_single_part/
|
||||
command_type partition_id part_name old_part_name
|
||||
ATTACH PARTITION 3 3_12_12_0 3_4_4_0
|
||||
command_type partition_id part_name backup_name backup_path old_part_name
|
||||
FREEZE PARTITION 7 7_8_8_0 test_01417_single_part_7 shadow/test_01417_single_part_7/
|
||||
ATTACH PART 5 5_13_13_0 5_6_6_0
|
28
tests/queries/0_stateless/01417_freeze_partition_verbose.sql
Normal file
28
tests/queries/0_stateless/01417_freeze_partition_verbose.sql
Normal file
@ -0,0 +1,28 @@
|
||||
DROP TABLE IF EXISTS table_for_freeze;
|
||||
|
||||
CREATE TABLE table_for_freeze
|
||||
(
|
||||
key UInt64,
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY key
|
||||
PARTITION BY key % 10;
|
||||
|
||||
INSERT INTO table_for_freeze SELECT number, toString(number) from numbers(10);
|
||||
|
||||
ALTER TABLE table_for_freeze FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze FREEZE PARTITION '3' WITH NAME 'test_01417_single_part' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze DETACH PARTITION '3';
|
||||
|
||||
INSERT INTO table_for_freeze VALUES (3, '3');
|
||||
|
||||
ALTER TABLE table_for_freeze ATTACH PARTITION '3' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze DETACH PARTITION '5';
|
||||
|
||||
ALTER TABLE table_for_freeze FREEZE PARTITION '7' WITH NAME 'test_01417_single_part_7', ATTACH PART '5_6_6_0' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
DROP TABLE IF EXISTS table_for_freeze;
|
@ -0,0 +1,18 @@
|
||||
command_type partition_id part_name backup_name backup_path
|
||||
FREEZE ALL 0 0_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 1 1_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 2 2_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 3 3_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 4 4_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 5 5_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 6 6_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 7 7_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 8 8_0_0_0 test_01417 shadow/test_01417/
|
||||
FREEZE ALL 9 9_0_0_0 test_01417 shadow/test_01417/
|
||||
command_type partition_id part_name backup_name backup_path
|
||||
FREEZE PARTITION 3 3_0_0_0 test_01417_single_part shadow/test_01417_single_part/
|
||||
command_type partition_id part_name old_part_name
|
||||
ATTACH PARTITION 3 3_3_3_0 3_0_0_0
|
||||
command_type partition_id part_name backup_name backup_path old_part_name
|
||||
FREEZE PARTITION 7 7_0_0_0 test_01417_single_part_7 shadow/test_01417_single_part_7/
|
||||
ATTACH PART 5 5_2_2_0 5_0_0_0
|
@ -0,0 +1,28 @@
|
||||
DROP TABLE IF EXISTS table_for_freeze_replicated;
|
||||
|
||||
CREATE TABLE table_for_freeze_replicated
|
||||
(
|
||||
key UInt64,
|
||||
value String
|
||||
)
|
||||
ENGINE = ReplicatedMergeTree('/test/table_for_freeze_replicated', '1')
|
||||
ORDER BY key
|
||||
PARTITION BY key % 10;
|
||||
|
||||
INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated FREEZE PARTITION '3' WITH NAME 'test_01417_single_part' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated DETACH PARTITION '3';
|
||||
|
||||
INSERT INTO table_for_freeze_replicated VALUES (3, '3');
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated ATTACH PARTITION '3' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated DETACH PARTITION '5';
|
||||
|
||||
ALTER TABLE table_for_freeze_replicated FREEZE PARTITION '7' WITH NAME 'test_01417_single_part_7', ATTACH PART '5_0_0_0' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;
|
||||
|
||||
DROP TABLE IF EXISTS table_for_freeze_replicated;
|
Loading…
Reference in New Issue
Block a user