Merge pull request #6695 from yandex/improve-table-locks

Avoid possible deadlock in TRUNCATE of Replicated table.
This commit is contained in:
alexey-milovidov 2019-08-28 21:05:26 +03:00 committed by GitHub
commit 7b9b67fd71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 159 additions and 202 deletions

View File

@ -115,26 +115,6 @@ void DatabaseDictionary::removeTable(
throw Exception("DatabaseDictionary: removeTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::renameTable(
const Context &,
const String &,
IDatabase &,
const String &)
{
throw Exception("DatabaseDictionary: renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseDictionary::alterTable(
const Context &,
const String &,
const ColumnsDescription &,
const IndicesDescription &,
const ConstraintsDescription &,
const ASTModifier &)
{
throw Exception("DatabaseDictionary: alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
time_t DatabaseDictionary::getTableMetadataModificationTime(
const Context &,
const String &)

View File

@ -60,20 +60,6 @@ public:
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & table_name) override;
void renameTable(
const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name) override;
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
time_t getTableMetadataModificationTime(
const Context & context,
const String & table_name) override;

View File

@ -5,6 +5,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/parseAddress.h>
#include "config_core.h"
#if USE_MYSQL

View File

@ -39,26 +39,6 @@ void DatabaseMemory::removeTable(
detachTable(table_name);
}
void DatabaseMemory::renameTable(
const Context &,
const String &,
IDatabase &,
const String &)
{
throw Exception("DatabaseMemory: renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DatabaseMemory::alterTable(
const Context &,
const String &,
const ColumnsDescription &,
const IndicesDescription &,
const ConstraintsDescription &,
const ASTModifier &)
{
throw Exception("DatabaseMemory: alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
time_t DatabaseMemory::getTableMetadataModificationTime(
const Context &,
const String &)

View File

@ -37,20 +37,6 @@ public:
const Context & context,
const String & table_name) override;
void renameTable(
const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name) override;
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
time_t getTableMetadataModificationTime(
const Context & context,
const String & table_name) override;

View File

@ -5,6 +5,8 @@
#include <mysqlxx/Pool.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -61,21 +63,11 @@ public:
throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void renameTable(const Context &, const String &, IDatabase &, const String &) override
{
throw Exception("MySQL database engine does not support rename table.", ErrorCodes::NOT_IMPLEMENTED);
}
void createTable(const Context &, const String &, const StoragePtr &, const ASTPtr &) override
{
throw Exception("MySQL database engine does not support create table.", ErrorCodes::NOT_IMPLEMENTED);
}
void alterTable(const Context &, const String &, const ColumnsDescription &, const IndicesDescription &, const ConstraintsDescription &, const ASTModifier &) override
{
throw Exception("MySQL database engine does not support alter table.", ErrorCodes::NOT_IMPLEMENTED);
}
private:
struct MySQLStorageInfo
{

View File

@ -355,7 +355,8 @@ void DatabaseOrdinary::renameTable(
const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name)
const String & to_table_name,
TableStructureWriteLockHolder & lock)
{
DatabaseOrdinary * to_database_concrete = typeid_cast<DatabaseOrdinary *>(&to_database);
@ -372,7 +373,7 @@ void DatabaseOrdinary::renameTable(
{
table->rename(context.getPath() + "/data/" + escapeForFileName(to_database_concrete->name) + "/",
to_database_concrete->name,
to_table_name);
to_table_name, lock);
}
catch (const Exception &)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <Databases/DatabasesCommon.h>
#include <Common/ThreadPool.h>
namespace DB
@ -35,7 +36,8 @@ public:
const Context & context,
const String & table_name,
IDatabase & to_database,
const String & to_table_name) override;
const String & to_table_name,
TableStructureWriteLockHolder &) override;
void alterTable(
const Context & context,

View File

@ -4,6 +4,7 @@
#include <Parsers/IAST.h>
#include <Storages/IStorage_fwd.h>
#include <Databases/IDatabase.h>
#include <mutex>
/// General functionality for several different database engines.

View File

@ -1,16 +1,9 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/File.h>
#include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <ctime>
#include <functional>
@ -21,8 +14,16 @@ namespace DB
{
class Context;
struct Settings;
struct ConstraintsDescription;
class ColumnsDescription;
struct IndicesDescription;
struct TableStructureWriteLockHolder;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/** Allows to iterate over tables.
@ -102,22 +103,29 @@ public:
/// Rename the table and possibly move the table to another database.
virtual void renameTable(
const Context & context,
const String & name,
IDatabase & to_database,
const String & to_name) = 0;
const Context & /*context*/,
const String & /*name*/,
IDatabase & /*to_database*/,
const String & /*to_name*/,
TableStructureWriteLockHolder &)
{
throw Exception(getEngineName() + ": renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
using ASTModifier = std::function<void(IAST &)>;
/// Change the table structure in metadata.
/// You must call under the TableStructureLock of the corresponding table . If engine_modifier is empty, then engine does not change.
virtual void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) = 0;
const Context & /*context*/,
const String & /*name*/,
const ColumnsDescription & /*columns*/,
const IndicesDescription & /*indices*/,
const ConstraintsDescription & /*constraints*/,
const ASTModifier & /*engine_modifier*/)
{
throw Exception(getEngineName() + ": renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns time of table's metadata change, 0 if there is no corresponding metadata file.
virtual time_t getTableMetadataModificationTime(

View File

@ -80,7 +80,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = database_and_table.second->lockExclusively(context.getCurrentQueryId());
/// Drop table data, don't touch metadata
database_and_table.second->truncate(query_ptr, context);
database_and_table.second->truncate(query_ptr, context, table_lock);
}
else if (kind == ASTDropQuery::Kind::Drop)
{
@ -94,7 +94,7 @@ BlockIO InterpreterDropQuery::executeToTable(String & database_name_, String & t
database_and_table.first->removeTable(context, database_and_table.second->getTableName());
/// Delete table data
database_and_table.second->drop();
database_and_table.second->drop(table_lock);
database_and_table.second->is_dropped = true;
String database_data_path = database_and_table.first->getDataPath();
@ -128,7 +128,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
/// Drop table data, don't touch metadata
table->truncate(query_ptr, context);
table->truncate(query_ptr, context, table_lock);
}
else if (kind == ASTDropQuery::Kind::Drop)
{
@ -137,7 +137,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(String & table_name, ASTDr
/// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId());
/// Delete table data
table->drop();
table->drop(table_lock);
table->is_dropped = true;
}
}

View File

@ -26,6 +26,8 @@ struct RenameDescription
to_table_name(elem.to.table)
{}
TableStructureWriteLockHolder from_table_lock;
String from_database_name;
String from_table_name;
@ -75,7 +77,7 @@ BlockIO InterpreterRenameQuery::execute()
}
};
std::set<UniqueTableName> unique_tables_from;
std::map<UniqueTableName, TableStructureWriteLockHolder> tables_from_locks;
/// Don't allow to drop tables (that we are renaming); don't allow to create tables in places where tables will be renamed.
std::map<UniqueTableName, std::unique_ptr<DDLGuard>> table_guards;
@ -87,7 +89,11 @@ BlockIO InterpreterRenameQuery::execute()
UniqueTableName from(descriptions.back().from_database_name, descriptions.back().from_table_name);
UniqueTableName to(descriptions.back().to_database_name, descriptions.back().to_table_name);
unique_tables_from.emplace(from);
if (!tables_from_locks.count(from))
if (auto table = context.tryGetTable(from.database_name, from.table_name))
tables_from_locks.emplace(from, table->lockExclusively(context.getCurrentQueryId()));
descriptions.back().from_table_lock = tables_from_locks[from];
if (!table_guards.count(from))
table_guards.emplace(from, context.getDDLGuard(from.database_name, from.table_name));
@ -96,13 +102,6 @@ BlockIO InterpreterRenameQuery::execute()
table_guards.emplace(to, context.getDDLGuard(to.database_name, to.table_name));
}
std::vector<TableStructureWriteLockHolder> locks;
locks.reserve(unique_tables_from.size());
for (const auto & names : unique_tables_from)
if (auto table = context.tryGetTable(names.database_name, names.table_name))
locks.emplace_back(table->lockExclusively(context.getCurrentQueryId()));
/** All tables are locked. If there are more than one rename in chain,
* we need to hold global lock while doing all renames. Order matters to avoid deadlocks.
* It provides atomicity of all RENAME chain as a whole, from the point of view of DBMS client,
@ -114,12 +113,12 @@ BlockIO InterpreterRenameQuery::execute()
if (descriptions.size() > 1)
lock = context.getLock();
for (const auto & elem : descriptions)
for (auto & elem : descriptions)
{
context.assertTableDoesntExist(elem.to_database_name, elem.to_table_name);
context.getDatabase(elem.from_database_name)->renameTable(
context, elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name);
context, elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name, elem.from_table_lock);
}
return {};

View File

@ -22,6 +22,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int SETTINGS_ARE_NOT_SUPPORTED;
extern const int UNKNOWN_SETTING;
extern const int TABLE_IS_DROPPED;
}
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))

View File

@ -9,11 +9,13 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h>
#include <Storages/CheckResults.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <Common/SettingsChanges.h>
#include <Storages/ConstraintsDescription.h>
#include <optional>
#include <shared_mutex>
@ -24,7 +26,6 @@ namespace DB
namespace ErrorCodes
{
extern const int TABLE_IS_DROPPED;
extern const int NOT_IMPLEMENTED;
}
@ -261,12 +262,12 @@ public:
* The table is not usable during and after call to this method.
* If you do not need any action other than deleting the directory with data, you can leave this method blank.
*/
virtual void drop() {}
virtual void drop(TableStructureWriteLockHolder &) {}
/** Clear the table data and leave it empty.
* Must be called under lockForAlter.
*/
virtual void truncate(const ASTPtr & /*query*/, const Context & /* context */)
virtual void truncate(const ASTPtr & /*query*/, const Context & /* context */, TableStructureWriteLockHolder &)
{
throw Exception("Truncate is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
@ -276,7 +277,8 @@ public:
* In this function, you need to rename the directory with the data, if any.
* Called when the table structure is locked for write.
*/
virtual void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/)
virtual void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/,
TableStructureWriteLockHolder &)
{
throw Exception("Method rename is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -4,6 +4,8 @@
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
namespace DB
{

View File

@ -189,7 +189,7 @@ void StorageKafka::shutdown()
}
void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name)
void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -1,8 +1,6 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
@ -11,6 +9,8 @@
#include <ext/shared_ptr_helper.h>
#include <mutex>
#include <atomic>
namespace DB
{
@ -40,10 +40,9 @@ public:
BlockOutputStreamPtr write(
const ASTPtr & query,
const Context & context
) override;
const Context & context) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void updateDependencies() override;

View File

@ -470,7 +470,7 @@ StorageLiveView::~StorageLiveView()
no_users_thread.detach();
}
void StorageLiveView::drop()
void StorageLiveView::drop(TableStructureWriteLockHolder &)
{
global_context.removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),

View File

@ -104,7 +104,7 @@ public:
}
void checkTableCanBeDropped() const override;
void drop() override;
void drop(TableStructureWriteLockHolder &) override;
void startup() override;
void shutdown() override;

View File

@ -1690,7 +1690,7 @@ void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr
empty_columns.clear();
}
void MergeTreeData::freezeAll(const String & with_name, const Context & context)
void MergeTreeData::freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder &)
{
freezePartitionsByMatcher([] (const DataPartPtr &){ return true; }, with_name, context);
}
@ -2550,7 +2550,7 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
}
void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String & with_name, const Context & context)
void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String & with_name, const Context & context, TableStructureReadLockHolder &)
{
std::optional<String> prefix;
String partition_id;

View File

@ -549,7 +549,7 @@ public:
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
/// Freezes all parts.
void freezeAll(const String & with_name, const Context & context);
void freezeAll(const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
/// Should be called if part data is suspected to be corrupted.
void reportBrokenPart(const String & name) const
@ -577,7 +577,7 @@ public:
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
* or if 'with_name' is specified - backup is created in directory with specified name.
*/
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context);
void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context, TableStructureReadLockHolder & table_lock_holder);
size_t getColumnCompressedSize(const std::string & name) const
{

View File

@ -73,7 +73,11 @@ public:
void shutdown() override;
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override { table_name = new_table_name; database_name = new_database_name; }
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;
}
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override

View File

@ -34,7 +34,6 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
void drop() override {}
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);
template <typename ForwardIterator>

View File

@ -415,7 +415,7 @@ void StorageDistributed::shutdown()
}
void StorageDistributed::truncate(const ASTPtr &, const Context &)
void StorageDistributed::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
std::lock_guard lock(cluster_nodes_mutex);

View File

@ -77,12 +77,17 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop() override {}
void drop(TableStructureWriteLockHolder &) override {}
/// Removes temporary data in local filesystem.
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;
}
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override { table_name = new_table_name; database_name = new_database_name; }
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(

View File

@ -264,13 +264,7 @@ BlockOutputStreamPtr StorageFile::write(
}
void StorageFile::drop()
{
/// Extra actions are not required.
}
void StorageFile::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageFile::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
if (!is_db_table)
throw Exception("Can't rename table '" + table_name + "' binded to user-defined file (or FD)", ErrorCodes::DATABASE_ACCESS_DENIED);

View File

@ -38,9 +38,7 @@ public:
const ASTPtr & query,
const Context & context) override;
void drop() override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
String getDataPath() const override { return path; }

View File

@ -148,7 +148,7 @@ BlockInputStreams StorageHDFS::read(
max_block_size)};
}
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -30,7 +30,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
protected:
StorageHDFS(const String & uri_,

View File

@ -55,7 +55,7 @@ StorageJoin::StorageJoin(
}
void StorageJoin::truncate(const ASTPtr &, const Context &)
void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();

View File

@ -26,7 +26,7 @@ class StorageJoin : public ext::shared_ptr_helper<StorageJoin>, public StorageSe
public:
String getName() const override { return "Join"; }
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
/// Access the innards.
JoinPtr & getJoin() { return join; }

View File

@ -512,7 +512,7 @@ void StorageLog::loadMarks()
}
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
@ -530,7 +530,7 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_databa
marks_file = Poco::File(path + escapeForFileName(table_name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
}
void StorageLog::truncate(const ASTPtr &, const Context &)
void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
std::shared_lock<std::shared_mutex> lock(rwlock);

View File

@ -38,11 +38,11 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
std::string full_path() const { return path + escapeForFileName(table_name) + '/';}

View File

@ -232,7 +232,7 @@ static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context,
}
void StorageMaterializedView::drop()
void StorageMaterializedView::drop(TableStructureWriteLockHolder &)
{
global_context.removeDependency(
DatabaseAndTableName(select_database_name, select_table_name),
@ -242,7 +242,7 @@ void StorageMaterializedView::drop()
executeDropQuery(ASTDropQuery::Kind::Drop, global_context, target_database_name, target_table_name);
}
void StorageMaterializedView::truncate(const ASTPtr &, const Context &)
void StorageMaterializedView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
if (has_inner_table)
executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, target_database_name, target_table_name);
@ -299,7 +299,8 @@ static void executeRenameQuery(Context & global_context, const String & database
}
void StorageMaterializedView::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)
void StorageMaterializedView::rename(
const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
if (has_inner_table && tryGetTargetTable())
{

View File

@ -33,9 +33,9 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop() override;
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
@ -43,7 +43,7 @@ public:
void mutate(const MutationCommands & commands, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void shutdown() override;

View File

@ -123,13 +123,13 @@ BlockOutputStreamPtr StorageMemory::write(
}
void StorageMemory::drop()
void StorageMemory::drop(TableStructureWriteLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();
}
void StorageMemory::truncate(const ASTPtr &, const Context &)
void StorageMemory::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();

View File

@ -40,11 +40,15 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop() override;
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override { table_name = new_table_name; database_name = new_database_name; }
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;
}
private:
String database_name;

View File

@ -42,8 +42,11 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void drop() override {}
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override { table_name = new_table_name; database_name = new_database_name; }
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;
}
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked

View File

@ -157,13 +157,13 @@ void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
global_context.checkPartitionCanBeDropped(database_name, table_name, partition_size);
}
void StorageMergeTree::drop()
void StorageMergeTree::drop(TableStructureWriteLockHolder &)
{
shutdown();
dropAllData();
}
void StorageMergeTree::truncate(const ASTPtr &, const Context &)
void StorageMergeTree::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
{
/// Asks to complete merges and does not allow them to start.
@ -181,7 +181,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &)
clearOldPartsFromFilesystem();
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
@ -995,7 +995,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, context);
freezePartition(command.partition, command.with_name, context, lock);
}
break;
@ -1020,7 +1020,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
freezeAll(command.with_name, context);
freezeAll(command.with_name, context, lock);
}
break;

View File

@ -55,10 +55,10 @@ public:
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
void drop() override;
void truncate(const ASTPtr &, const Context &) override;
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;

View File

@ -38,7 +38,7 @@ public:
return std::make_shared<NullBlockOutputStream>(getSampleBlock());
}
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -3132,7 +3132,6 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
{
/// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock.
/// TODO Check all other "wait" places.
for (auto & merge_entry : merge_entries)
waitForAllReplicasToProcessLogEntry(merge_entry);
}
@ -3484,7 +3483,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezePartition(command.partition, command.with_name, query_context);
freezePartition(command.partition, command.with_name, query_context, lock);
}
break;
@ -3509,7 +3508,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
freezeAll(command.with_name, query_context);
freezeAll(command.with_name, query_context, lock);
}
break;
}
@ -3633,8 +3632,10 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
}
void StorageReplicatedMergeTree::truncate(const ASTPtr & query, const Context & query_context)
void StorageReplicatedMergeTree::truncate(const ASTPtr & query, const Context & query_context, TableStructureWriteLockHolder & table_lock)
{
table_lock.release(); /// Truncate is done asynchronously.
assertNotReadonly();
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
@ -3701,7 +3702,7 @@ void StorageReplicatedMergeTree::checkPartitionCanBeDropped(const ASTPtr & parti
}
void StorageReplicatedMergeTree::drop()
void StorageReplicatedMergeTree::drop(TableStructureWriteLockHolder &)
{
{
auto zookeeper = tryGetZooKeeper();
@ -3731,7 +3732,8 @@ void StorageReplicatedMergeTree::drop()
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageReplicatedMergeTree::rename(
const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';

View File

@ -109,11 +109,11 @@ public:
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
void drop() override;
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
bool supportsIndexForIn() const override { return true; }

View File

@ -126,7 +126,7 @@ void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block);
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
void StorageSet::truncate(const ASTPtr &, const Context &)
void StorageSet::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();
@ -193,7 +193,8 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
}
void StorageSetOrJoinBase::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageSetOrJoinBase::rename(
const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name);

View File

@ -22,7 +22,7 @@ public:
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
@ -69,7 +69,7 @@ public:
/// Access the insides.
SetPtr & getSet() { return set; }
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
private:
SetPtr set;

View File

@ -223,7 +223,7 @@ StorageStripeLog::StorageStripeLog(
}
void StorageStripeLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageStripeLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
@ -294,7 +294,7 @@ CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Conte
return file_checker.check();
}
void StorageStripeLog::truncate(const ASTPtr &, const Context &)
void StorageStripeLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
if (table_name.empty())
throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR);

View File

@ -40,7 +40,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
@ -55,7 +55,7 @@ public:
String getDataPath() const override { return full_path(); }
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
private:
String path;

View File

@ -378,7 +378,7 @@ void StorageTinyLog::addFiles(const String & column_name, const IDataType & type
}
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
@ -424,7 +424,7 @@ CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context
return file_checker.check();
}
void StorageTinyLog::truncate(const ASTPtr &, const Context &)
void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
if (table_name.empty())
throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR);

View File

@ -39,7 +39,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
@ -54,7 +54,7 @@ public:
String getDataPath() const override { return full_path(); }
void truncate(const ASTPtr &, const Context &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
private:
String path;

View File

@ -187,7 +187,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context)};
}
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -29,7 +29,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
protected:
IStorageURLBase(

View File

@ -30,7 +30,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name) override
void rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override
{
table_name = new_table_name;
database_name = new_database_name;

View File

@ -18,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TABLE_IS_DROPPED;
}
StorageSystemColumns::StorageSystemColumns(const std::string & name_)

View File

@ -17,6 +17,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_DROPPED;
}
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const
{
bool has_state_column = false;