refactor IStorage::rename(...)

This commit is contained in:
Alexander Tokmakov 2019-11-11 17:28:28 +03:00
parent ab4d452145
commit cff9b02db2
33 changed files with 131 additions and 66 deletions

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_TABLE;
extern const int TABLE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
}
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, const Context & context_)
@ -62,6 +63,31 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
return DatabaseWithDictionaries::detachTable(name);
}
void DatabaseAtomic::renameTable(const Context & context, const String & table_name, IDatabase & to_database,
const String & to_table_name, TableStructureWriteLockHolder &)
{
//FIXME
if (typeid(*this) != typeid(to_database))
throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED);
StoragePtr table = tryGetTable(context, table_name);
if (!table)
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
table->renameInMemory(to_database.getDatabaseName(), to_table_name);
ASTPtr ast = getQueryFromMetadata(getObjectMetadataPath(table_name));
if (!ast)
throw Exception("There is no metadata file for table " + backQuote(table_name) + ".", ErrorCodes::FILE_DOESNT_EXIST);
ast->as<ASTCreateQuery &>().table = to_table_name;
/// NOTE Non-atomic.
to_database.createTable(context, to_table_name, table, ast);
removeTable(context, table_name);
}
}

View File

@ -21,6 +21,13 @@ public:
const StoragePtr & table,
const ASTPtr & query) override;
void renameTable(
const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name,
TableStructureWriteLockHolder & lock) override;
void attachTable(const String & name, const StoragePtr & table, const String & relative_table_path = {}) override;
StoragePtr detachTable(const String & name) override;

View File

@ -301,21 +301,21 @@ public:
* In this function, you need to rename the directory with the data, if any.
* Called when the table structure is locked for write.
*/
virtual void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/,
virtual void rename(const String & /*new_path_to_table_data*/, const String & new_database_name, const String & new_table_name,
TableStructureWriteLockHolder &)
{
throw Exception("Method rename is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
renameInMemory(new_database_name, new_table_name);
}
// TODO refactor rename() and renameAtomic()
// TODO refactor rename() and renameInMemory()
/**
* Just updates names of database and table without moving any data on disk
* Can be called only from DatabaseAtomic.
*/
virtual void renameAtomic(const String & /*new_database_name*/, const String & /*new_table_name*/)
virtual void renameInMemory(const String & /*new_database_name*/, const String & /*new_table_name*/)
{
throw Exception("Method renameAtomic is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method rename is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** ALTER tables in the form of column changes that do not affect the change to Storage or its parameters.

View File

@ -192,7 +192,7 @@ void StorageKafka::shutdown()
}
void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageKafka::renameInMemory(const String & new_database_name, const String & new_table_name)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -49,7 +49,7 @@ public:
const ASTPtr & query,
const Context & context) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override;
void updateDependencies() override;

View File

@ -1177,17 +1177,14 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
}
void MergeTreeData::rename(
const String & /*new_path_to_db*/, const String & new_database_name,
const String & new_path_to_table_data, const String & new_database_name,
const String & new_table_name, TableStructureWriteLockHolder &)
{
auto new_file_db_name = escapeForFileName(new_database_name);
auto new_file_table_name = escapeForFileName(new_table_name);
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
{
auto new_full_path = disk->getClickHouseDataPath() + new_file_db_name + '/' + new_file_table_name + '/';
auto new_full_path = disk->getPath() + new_path_to_table_data;
if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
@ -1196,21 +1193,19 @@ void MergeTreeData::rename(
for (const auto & disk : disks)
{
auto full_path = disk->getPath() + relative_data_path;
auto new_db_path = disk->getClickHouseDataPath() + new_file_db_name + '/';
auto new_full_path = disk->getPath() + new_path_to_table_data;
Poco::File db_file{new_db_path};
if (!db_file.exists())
db_file.createDirectory();
Poco::File new_full_path_parent{Poco::Path(new_full_path).makeParent()};
if (!new_full_path_parent.exists())
new_full_path_parent.createDirectories();
auto new_full_path = new_db_path + new_file_table_name + '/';
Poco::File{full_path}.renameTo(new_full_path);
}
global_context.dropCaches();
database_name = new_database_name;
table_name = new_table_name;
relative_data_path = "data/" + new_file_db_name + '/' + new_file_table_name + '/';
renameInMemory(new_database_name, new_table_name);
}
void MergeTreeData::dropAllData()

View File

@ -538,9 +538,15 @@ public:
/// Moves the entire data directory.
/// Flushes the uncompressed blocks cache and the marks cache.
/// Must be called with locked lockStructureForAlter().
void rename(const String & new_path_to_db, const String & new_database_name,
void rename(const String & new_path_to_table_data, const String & new_database_name,
const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
/// Check if the ALTER can be performed:
/// - all needed columns are present.
/// - all type conversions can be done.

View File

@ -73,7 +73,7 @@ public:
void shutdown() override;
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -82,8 +82,9 @@ public:
/// Removes temporary data in local filesystem.
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void rename(const String & /*new_path_to_table_data*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
//TODO do we need no move data on disk and update path?
table_name = new_table_name;
database_name = new_database_name;
}

View File

@ -323,7 +323,7 @@ Strings StorageFile::getDataPaths() const
return paths;
}
void StorageFile::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageFile::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
if (!is_db_table)
throw Exception("Can't rename table '" + table_name + "' binded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);
@ -333,7 +333,7 @@ void StorageFile::rename(const String & new_path_to_db, const String & new_datab
std::unique_lock<std::shared_mutex> lock(rwlock);
std::string path_new = getTablePath(new_path_to_db + escapeForFileName(new_table_name), format_name);
std::string path_new = getTablePath(context_global.getPath() + new_path_to_table_data, format_name);
Poco::File(Poco::Path(path_new).parent()).createDirectories();
Poco::File(paths[0]).renameTo(path_new);

View File

@ -38,7 +38,13 @@ public:
const ASTPtr & query,
const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
Strings getDataPaths() const override;

View File

@ -209,7 +209,7 @@ BlockInputStreams StorageHDFS::read(
return result;
}
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageHDFS::renameInMemory(const String & new_database_name, const String & new_table_name)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -30,7 +30,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override;
protected:
StorageHDFS(const String & uri_,

View File

@ -425,7 +425,7 @@ StorageLog::StorageLog(
const ConstraintsDescription & constraints_,
size_t max_compress_block_size_,
const Context & context_)
: path(context_.getPath() + relative_path_), table_name(table_name_), database_name(database_name_),
: base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + "sizes.json")
{
@ -513,17 +513,16 @@ void StorageLog::loadMarks()
}
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
String new_path = base_path + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
table_name = new_table_name;
database_name = new_database_name;
renameInMemory(new_database_name, new_table_name);
file_checker.setPath(path + "sizes.json");
for (auto & file : files)

View File

@ -38,7 +38,13 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
@ -63,6 +69,7 @@ protected:
const Context & context_);
private:
String base_path;
String path;
String table_name;
String database_name;

View File

@ -299,8 +299,7 @@ static void executeRenameQuery(Context & global_context, const String & database
}
void StorageMaterializedView::rename(
const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageMaterializedView::renameInMemory(const String & new_database_name, const String & new_table_name)
{
if (has_inner_table && tryGetTargetTable())
{

View File

@ -43,7 +43,7 @@ public:
void mutate(const MutationCommands & commands, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override;
void shutdown() override;

View File

@ -44,7 +44,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -42,7 +42,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -38,7 +38,7 @@ public:
return std::make_shared<NullBlockOutputStream>(getSampleBlock());
}
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -3746,10 +3746,10 @@ void StorageReplicatedMergeTree::drop(TableStructureWriteLockHolder &)
void StorageReplicatedMergeTree::rename(
const String & new_path_to_db, const String & new_database_name,
const String & new_path_to_table_data, const String & new_database_name,
const String & new_table_name, TableStructureWriteLockHolder & lock)
{
MergeTreeData::rename(new_path_to_db, new_database_name, new_table_name, lock);
MergeTreeData::rename(new_path_to_table_data, new_database_name, new_table_name, lock);
/// Update table name in zookeeper
auto zookeeper = getZooKeeper();

View File

@ -117,7 +117,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
bool supportsIndexForIn() const override { return true; }

View File

@ -164,7 +164,7 @@ BlockInputStreams StorageS3::read(
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
}
void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageS3::renameInMemory(const String & new_database_name, const String & new_table_name)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -51,7 +51,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override;
private:
Poco::URI uri;

View File

@ -101,7 +101,8 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
if (relative_path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
path = context_.getPath() + relative_path_;
base_path = context_.getPath();
path = base_path + relative_path_;
}
@ -196,15 +197,14 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
void StorageSetOrJoinBase::rename(
const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name) + "/";
String new_path = base_path + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
table_name = new_table_name;
database_name = new_database_name;
renameInMemory(new_database_name, new_table_name);
}

View File

@ -22,7 +22,13 @@ public:
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
@ -37,6 +43,7 @@ protected:
const ConstraintsDescription & constraints_,
const Context & context_);
String base_path;
String path;
String table_name;
String database_name;

View File

@ -202,7 +202,7 @@ StorageStripeLog::StorageStripeLog(
bool attach,
size_t max_compress_block_size_,
const Context & context_)
: path(context_.getPath() + relative_path_), table_name(table_name_), database_name(database_name_),
: base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + "sizes.json"),
log(&Logger::get("StorageStripeLog"))
@ -218,17 +218,16 @@ StorageStripeLog::StorageStripeLog(
}
void StorageStripeLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageStripeLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name) + "/";
Poco::File(path).renameTo(new_path_to_db + escapeForFileName(new_table_name));
String new_path = base_path + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
table_name = new_table_name;
database_name = new_database_name;
renameInMemory(new_database_name, new_table_name);
file_checker.setPath(path + "sizes.json");
}

View File

@ -40,7 +40,13 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
@ -58,6 +64,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
private:
String base_path;
String path;
String table_name;
String database_name;

View File

@ -331,7 +331,7 @@ StorageTinyLog::StorageTinyLog(
bool attach,
size_t max_compress_block_size_,
const Context & context_)
: path(context_.getPath() + relative_path_), table_name(table_name_), database_name(database_name_),
: base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
max_compress_block_size(max_compress_block_size_),
file_checker(path + "sizes.json"),
log(&Logger::get("StorageTinyLog"))
@ -373,17 +373,16 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
}
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void StorageTinyLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name) + "/";
String new_path = base_path + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
path = new_path;
table_name = new_table_name;
database_name = new_database_name;
renameInMemory(new_database_name, new_table_name);
file_checker.setPath(path + "sizes.json");
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)

View File

@ -39,7 +39,13 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
@ -57,6 +63,7 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
private:
String base_path;
String path;
String table_name;
String database_name;

View File

@ -186,7 +186,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
}
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
void IStorageURLBase::renameInMemory(const String & new_database_name, const String & new_table_name)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -29,7 +29,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void renameInMemory(const String & new_database_name, const String & new_table_name) override;
protected:
IStorageURLBase(

View File

@ -30,7 +30,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
void renameInMemory(const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;