diff --git a/dbms/src/DataStreams/IBlockInputStream.h b/dbms/src/DataStreams/IBlockInputStream.h index 988f15bffb7..b66fe70e2c7 100644 --- a/dbms/src/DataStreams/IBlockInputStream.h +++ b/dbms/src/DataStreams/IBlockInputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -108,7 +109,9 @@ public: template void forEachChild(F && f) { - std::lock_guard lock(children_mutex); + /// NOTE: Acquire a read lock, therefore f() should be thread safe + std::shared_lock lock(children_mutex); + for (auto & child : children) if (f(*child)) return; @@ -116,7 +119,7 @@ public: protected: BlockInputStreams children; - std::mutex children_mutex; + std::shared_mutex children_mutex; private: TableStructureReadLocks table_locks; diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.h b/dbms/src/DataStreams/IProfilingBlockInputStream.h index a9601d5c265..5febcb18c56 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.h @@ -190,7 +190,7 @@ protected: void addChild(BlockInputStreamPtr & child) { - std::lock_guard lock(children_mutex); + std::unique_lock lock(children_mutex); children.push_back(child); } @@ -231,7 +231,9 @@ private: template void forEachProfilingChild(F && f) { - std::lock_guard lock(children_mutex); + /// NOTE: Acquire a read lock, therefore f() should be thread safe + std::shared_lock lock(children_mutex); + for (auto & child : children) if (IProfilingBlockInputStream * p_child = dynamic_cast(child.get())) if (f(*p_child)) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index e3d39e5f98d..bb51273c2c8 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -589,6 +589,12 @@ QuotaForIntervals & Context::getQuota() } void Context::checkDatabaseAccessRights(const std::string & database_name) const +{ + auto lock = getLock(); + checkDatabaseAccessRightsImpl(database_name); +} + +void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const { if (client_info.current_user.empty() || (database_name == "system")) { @@ -603,8 +609,8 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) { auto lock = getLock(); - checkDatabaseAccessRights(from.first); - checkDatabaseAccessRights(where.first); + checkDatabaseAccessRightsImpl(from.first); + checkDatabaseAccessRightsImpl(where.first); shared->view_dependencies[from].insert(where); // Notify table of dependencies change @@ -616,8 +622,8 @@ void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAnd void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) { auto lock = getLock(); - checkDatabaseAccessRights(from.first); - checkDatabaseAccessRights(where.first); + checkDatabaseAccessRightsImpl(from.first); + checkDatabaseAccessRightsImpl(where.first); shared->view_dependencies[from].erase(where); // Notify table of dependencies change @@ -638,7 +644,7 @@ Dependencies Context::getDependencies(const String & database_name, const String } else { - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); } ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name)); @@ -653,7 +659,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); return shared->databases.end() != it @@ -665,7 +671,7 @@ bool Context::isDatabaseExist(const String & database_name) const { auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); return shared->databases.end() != shared->databases.find(db); } @@ -680,7 +686,7 @@ void Context::assertTableExists(const String & database_name, const String & tab auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() == it) @@ -697,7 +703,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String String db = resolveDatabase(database_name, current_database); if (check_database_access_rights) - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() != it && it->second->isTableExist(*this, table_name)) @@ -711,7 +717,7 @@ void Context::assertDatabaseExists(const String & database_name, bool check_data String db = resolveDatabase(database_name, current_database); if (check_database_access_rights) - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); if (shared->databases.end() == shared->databases.find(db)) throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE); @@ -723,7 +729,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); if (shared->databases.end() != shared->databases.find(db)) throw Exception("Database " + backQuoteIfNeed(db) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS); @@ -790,7 +796,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta } String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() == it) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 670bda401bf..69f18c913b0 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -178,6 +178,7 @@ public: void assertDatabaseExists(const String & database_name, bool check_database_acccess_rights = true) const; void assertDatabaseDoesntExist(const String & database_name) const; + void checkDatabaseAccessRights(const std::string & database_name) const; Tables getExternalTables() const; StoragePtr tryGetExternalTable(const String & table_name) const; @@ -392,7 +393,7 @@ private: * If access is denied, throw an exception. * NOTE: This method should always be called when the `shared->mutex` mutex is acquired. */ - void checkDatabaseAccessRights(const std::string & database_name) const; + void checkDatabaseAccessRightsImpl(const std::string & database_name) const; EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const; ExternalDictionaries & getExternalDictionariesImpl(bool throw_on_error) const; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index ad70fac5390..3025f841ade 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -960,15 +960,25 @@ public: { Block res; if (num_hosts_finished >= waiting_hosts.size()) + { + if (first_exception) + throw Exception(*first_exception); + return res; + } auto zookeeper = context.getZooKeeper(); size_t try_number = 0; - while(res.rows() == 0) + while (res.rows() == 0) { if (isCancelled()) + { + if (first_exception) + throw Exception(*first_exception); + return res; + } if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) { @@ -1020,6 +1030,9 @@ public: UInt16 port; Cluster::Address::fromString(host_id, host, port); + if (status.code != 0 && first_exception == nullptr) + first_exception = std::make_unique("There was an error on " + host + ": " + status.message, status.code); + ++num_hosts_finished; columns[0]->insert(host); @@ -1092,11 +1105,14 @@ private: Strings current_active_hosts; /// Hosts that were in active state at the last check size_t num_hosts_finished = 0; + /// Save the first detected error and throw it at the end of excecution + std::unique_ptr first_exception; + Int64 timeout_seconds = 120; }; -BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context) +BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, const NameSet & query_databases) { /// Remove FORMAT and INTO OUTFILE if exists ASTPtr query_ptr = query_ptr_->clone(); @@ -1128,13 +1144,26 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont entry.query = queryToString(query_ptr); entry.initiator = ddl_worker.getCommonHostID(); + /// Check database access rights, assume that all servers have the same users config + NameSet databases_to_check_access_rights; + Cluster::AddressesWithFailover shards = cluster->getShardsAddresses(); + for (const auto & shard : shards) { for (const auto & addr : shard) + { entry.hosts.emplace_back(addr); + + /// Expand empty database name to shards' default database name + for (const String & database : query_databases) + databases_to_check_access_rights.emplace(database.empty() ? addr.default_database : database); + } } + for (const String & database : databases_to_check_access_rights) + context.checkDatabaseAccessRights(database.empty() ? context.getCurrentDatabase() : database); + String node_path = ddl_worker.enqueueQuery(entry); BlockIO io; diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index f9c296d373a..d640b6d0bc8 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -18,7 +18,8 @@ struct DDLLogEntry; struct DDLTask; -BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context); +/// Pushes distributed DDL query to the queue +BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const NameSet & query_databases); class DDLWorker diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index f4708a67c3d..bc7861ad41c 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -42,7 +42,7 @@ BlockIO InterpreterAlterQuery::execute() auto & alter = typeid_cast(*query_ptr); if (!alter.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {alter.table}); const String & table_name = alter.table; String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 9b22376f2ff..9b4deee75e1 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -66,7 +66,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Contex BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) { if (!create.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {create.database}); String database_name = create.database; @@ -439,7 +439,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) { if (!create.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + { + NameSet databases{create.database}; + if (!create.to_table.empty()) + databases.emplace(create.to_database); + + return executeDDLQueryOnCluster(query_ptr, context, databases); + } String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index 0fdf2b1ccf4..839b714a499 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -32,7 +32,7 @@ BlockIO InterpreterDropQuery::execute() checkAccess(drop); if (!drop.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {drop.database}); String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp index 1710f881fe4..bddd74432f3 100644 --- a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -147,7 +147,6 @@ public: } /// KILL QUERY could be killed also - /// Probably interpreting KILL QUERIES as complete (not internal) queries is extra functionality if (isCancelled()) break; diff --git a/dbms/src/Interpreters/InterpreterRenameQuery.cpp b/dbms/src/Interpreters/InterpreterRenameQuery.cpp index 00aa95ee6fb..d241e620455 100644 --- a/dbms/src/Interpreters/InterpreterRenameQuery.cpp +++ b/dbms/src/Interpreters/InterpreterRenameQuery.cpp @@ -39,7 +39,16 @@ BlockIO InterpreterRenameQuery::execute() ASTRenameQuery & rename = typeid_cast(*query_ptr); if (!rename.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + { + NameSet databases; + for (const auto & elem : rename.elements) + { + databases.emplace(elem.from.database); + databases.emplace(elem.to.database); + } + + return executeDDLQueryOnCluster(query_ptr, context, databases); + } String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index 59c481e6e3a..8f8053a401c 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -1,6 +1,9 @@ #include #include +#include +#include #include +#include #include #include #include @@ -19,21 +22,70 @@ namespace ErrorCodes } +/// Should we execute the query even if max_concurrent_queries limit is exhausted +static bool isUnlimitedQuery(const IAST * ast) +{ + if (!ast) + return false; + + /// It is KILL QUERY + if (typeid_cast(ast)) + return true; + + /// It is SELECT FROM system.processes + if (auto ast_selects = typeid_cast(ast)) + { + if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty()) + return false; + + auto ast_select = typeid_cast(ast_selects->list_of_selects->children[0].get()); + + if (!ast_select) + return false; + + auto ast_database = ast_select->database(); + if (!ast_database) + return false; + + auto ast_table = ast_select->table(); + if (!ast_table) + return false; + + auto ast_database_id = typeid_cast(ast_database.get()); + if (!ast_database_id) + return false; + + auto ast_table_id = typeid_cast(ast_table.get()); + if (!ast_table_id) + return false; + + return ast_database_id->name == "system" && ast_table_id->name == "processes"; + } + + return false; +} + + ProcessList::EntryPtr ProcessList::insert( const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings) { EntryPtr res; - bool is_kill_query = ast && typeid_cast(ast); if (client_info.current_query_id.empty()) throw Exception("Query id cannot be empty", ErrorCodes::LOGICAL_ERROR); + bool is_unlimited_query = isUnlimitedQuery(ast); + { std::lock_guard lock(mutex); - if (!is_kill_query && max_size && cur_size >= max_size - && (!settings.queue_max_wait_ms.totalMilliseconds() || !have_space.tryWait(mutex, settings.queue_max_wait_ms.totalMilliseconds()))) - throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); + if (!is_unlimited_query && max_size && cur_size >= max_size) + { + if (!settings.queue_max_wait_ms.totalMilliseconds() || !have_space.tryWait(mutex, settings.queue_max_wait_ms.totalMilliseconds())) + { + throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); + } + } /** Why we use current user? * Because initial one is passed by client and credentials for it is not verified, @@ -50,7 +102,7 @@ ProcessList::EntryPtr ProcessList::insert( if (user_process_list != user_to_queries.end()) { - if (!is_kill_query && settings.max_concurrent_queries_for_user + if (!is_unlimited_query && settings.max_concurrent_queries_for_user && user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user) throw Exception("Too many simultaneous queries for user " + client_info.current_user + ". Current: " + toString(user_process_list->second.queries.size()) @@ -191,31 +243,37 @@ void ProcessListElement::setQueryStreams(const BlockIO & io) query_stream_in = io.in; query_stream_out = io.out; - query_streams_initialized = true; + query_streams_status = QueryStreamsStatus::Initialized; } void ProcessListElement::releaseQueryStreams() { - std::lock_guard lock(query_streams_mutex); + BlockInputStreamPtr in; + BlockOutputStreamPtr out; - query_streams_initialized = false; - query_streams_released = true; - query_stream_in.reset(); - query_stream_out.reset(); + { + std::lock_guard lock(query_streams_mutex); + + query_streams_status = QueryStreamsStatus::Released; + in = std::move(query_stream_in); + out = std::move(query_stream_out); + } + + /// Destroy streams outside the mutex lock } bool ProcessListElement::streamsAreReleased() { std::lock_guard lock(query_streams_mutex); - return query_streams_released; + return query_streams_status == QueryStreamsStatus::Released; } bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const { std::lock_guard lock(query_streams_mutex); - if (!query_streams_initialized) + if (query_streams_status != QueryStreamsStatus::Initialized) return false; in = query_stream_in; diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index ecc29d671fe..2d7d3227eb7 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -91,8 +91,14 @@ private: BlockInputStreamPtr query_stream_in; BlockOutputStreamPtr query_stream_out; - bool query_streams_initialized{false}; - bool query_streams_released{false}; + enum QueryStreamsStatus + { + NotInitialized, + Initialized, + Released + }; + + QueryStreamsStatus query_streams_status{NotInitialized}; public: ProcessListElement( diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp new file mode 100644 index 00000000000..16a84b4b2f6 --- /dev/null +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.cpp @@ -0,0 +1,42 @@ +#include "ReplicatedMergeTreeAddress.h" +#include +#include +#include + +namespace DB +{ + + +void ReplicatedMergeTreeAddress::writeText(WriteBuffer & out) const +{ + out + << "host: " << escape << host << '\n' + << "port: " << replication_port << '\n' + << "tcp_port: " << queries_port << '\n' + << "database: " << escape << database << '\n' + << "table: " << escape << table << '\n'; +} + +void ReplicatedMergeTreeAddress::readText(ReadBuffer & in) +{ + in + >> "host: " >> escape >> host >> "\n" + >> "port: " >> replication_port >> "\n" + >> "tcp_port: " >> queries_port >> "\n" + >> "database: " >> escape >> database >> "\n" + >> "table: " >> escape >> table >> "\n"; +} + +String ReplicatedMergeTreeAddress::toString() const +{ + WriteBufferFromOwnString out; + writeText(out); + return out.str(); +} + +void ReplicatedMergeTreeAddress::fromString(const String & str) +{ + ReadBufferFromString in(str); + readText(in); +} +} diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h index 325b2dc617b..b50ec72f3a5 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAddress.h @@ -1,8 +1,7 @@ +#pragma once +#include #include -#include #include -#include -#include namespace DB @@ -18,44 +17,19 @@ struct ReplicatedMergeTreeAddress String database; String table; - ReplicatedMergeTreeAddress() {} - ReplicatedMergeTreeAddress(const String & str) + ReplicatedMergeTreeAddress() = default; + explicit ReplicatedMergeTreeAddress(const String & str) { fromString(str); } - void writeText(WriteBuffer & out) const - { - out - << "host: " << escape << host << '\n' - << "port: " << replication_port << '\n' - << "tcp_port: " << queries_port << '\n' - << "database: " << escape << database << '\n' - << "table: " << escape << table << '\n'; - } + void writeText(WriteBuffer & out) const; - void readText(ReadBuffer & in) - { - in - >> "host: " >> escape >> host >> "\n" - >> "port: " >> replication_port >> "\n" - >> "tcp_port: " >> queries_port >> "\n" - >> "database: " >> escape >> database >> "\n" - >> "table: " >> escape >> table >> "\n"; - } + void readText(ReadBuffer & in); - String toString() const - { - WriteBufferFromOwnString out; - writeText(out); - return out.str(); - } + String toString() const; - void fromString(const String & str) - { - ReadBufferFromString in(str); - readText(in); - } + void fromString(const String & str); }; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 5affd77ac7b..37ef004dd55 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -292,16 +292,10 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() void ReplicatedMergeTreeRestartingThread::activateReplica() { - auto host_port = storage.context.getInterserverIOAddress(); auto zookeeper = storage.getZooKeeper(); - /// How other replicas can access this. - ReplicatedMergeTreeAddress address; - address.host = host_port.first; - address.replication_port = host_port.second; - address.queries_port = storage.context.getTCPPort(); - address.database = storage.database_name; - address.table = storage.table_name; + /// How other replicas can access this one. + ReplicatedMergeTreeAddress address = storage.getReplicatedMergeTreeAddress(); String is_active_path = storage.replica_path + "/is_active"; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c2c4555e7e7..fb56172645b 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3004,6 +3004,10 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str table_name = new_table_name; full_path = new_full_path; + /// Update table name in zookeeper + auto zookeeper = getZooKeeper(); + zookeeper->set(replica_path + "/host", getReplicatedMergeTreeAddress().toString()); + /// TODO: You can update names of loggers. } @@ -3766,4 +3770,17 @@ void StorageReplicatedMergeTree::clearBlocksInPartition( LOG_TRACE(log, "Deleted " << to_delete_futures.size() << " deduplication block IDs in partition ID " << partition_id); } +ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAddress() const +{ + auto host_port = context.getInterserverIOAddress(); + + ReplicatedMergeTreeAddress res; + res.host = host_port.first; + res.replication_port = host_port.second; + res.queries_port = context.getTCPPort(); + res.database = database_name; + res.table = table_name; + return res; +} + } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 457e834ea1c..0cb6dbb004c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -451,6 +452,9 @@ private: void clearBlocksInPartition( zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num); + /// Info about how other replicas can access this one. + ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const; + protected: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. */ diff --git a/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml b/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml new file mode 100644 index 00000000000..5b6084eea7b --- /dev/null +++ b/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml @@ -0,0 +1,16 @@ + + + + + default + default + + ::/0 + + + + db1 + + + + diff --git a/dbms/tests/integration/test_distributed_ddl/test.py b/dbms/tests/integration/test_distributed_ddl/test.py index 8b7e46443d5..8621f723ac1 100755 --- a/dbms/tests/integration/test_distributed_ddl/test.py +++ b/dbms/tests/integration/test_distributed_ddl/test.py @@ -315,6 +315,24 @@ def test_macro(started_cluster): ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'") ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'") + +def test_allowed_databases(started_cluster): + instance = cluster.instances['ch2'] + instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster") + instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster") + + instance.query("CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + + with pytest.raises(Exception): + instance.query("CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + with pytest.raises(Exception): + instance.query("CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + with pytest.raises(Exception): + instance.query("DROP DATABASE db2 ON CLUSTER cluster", settings={"user" : "restricted_user"}) + + instance.query("DROP DATABASE db1 ON CLUSTER cluster", settings={"user" : "restricted_user"}) + + if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: for name, instance in cluster.instances.items(): diff --git a/dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql b/dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql index 0ace86c2e5e..7625c6e01b1 100644 --- a/dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql +++ b/dbms/tests/queries/0_stateless/00446_clear_column_in_partition_zookeeper.sql @@ -61,11 +61,3 @@ ALTER TABLE test.clear_column1 CLEAR COLUMN s IN PARTITION '200002'; ALTER TABLE test.clear_column1 CLEAR COLUMN s IN PARTITION '200012', CLEAR COLUMN i IN PARTITION '200012'; -- Drop empty partition also Ok ALTER TABLE test.clear_column1 DROP PARTITION '200012', DROP PARTITION '200011'; - - --- check optimize for non-leader replica (it is not related with CLEAR COLUMN) -OPTIMIZE TABLE test.clear_column1; -OPTIMIZE TABLE test.clear_column2; - -DROP TABLE IF EXISTS test.clear_column1; -DROP TABLE IF EXISTS test.clear_column2; diff --git a/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.reference b/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.reference new file mode 100644 index 00000000000..087a2f3b9d7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.reference @@ -0,0 +1,2 @@ +0 1 1 +0 1 2 diff --git a/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql b/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql new file mode 100644 index 00000000000..f66ab550bd4 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql @@ -0,0 +1,20 @@ +DROP TABLE IF EXISTS test.clear_column1; +DROP TABLE IF EXISTS test.clear_column2; +CREATE TABLE test.clear_column1 (p Int64, i Int64, v UInt64) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/test/clear_column', '1', v) PARTITION BY p ORDER BY i; +CREATE TABLE test.clear_column2 (p Int64, i Int64, v UInt64) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/test/clear_column', '2', v) PARTITION BY p ORDER BY i; + +INSERT INTO test.clear_column1 VALUES (0, 1, 0); +INSERT INTO test.clear_column1 VALUES (0, 1, 1); + +OPTIMIZE TABLE test.clear_column1; +OPTIMIZE TABLE test.clear_column2; +SELECT * FROM test.clear_column1; + +RENAME TABLE test.clear_column2 TO test.clear_column3; + +INSERT INTO test.clear_column1 VALUES (0, 1, 2); +OPTIMIZE TABLE test.clear_column3; +SELECT * FROM test.clear_column1; + +DROP TABLE IF EXISTS test.clear_column1; +DROP TABLE IF EXISTS test.clear_column2; \ No newline at end of file