better code

This commit is contained in:
Alexander Tokmakov 2021-02-08 22:36:17 +03:00
parent 91d0924665
commit 78c1d69b8c
25 changed files with 146 additions and 125 deletions

View File

@ -15,7 +15,6 @@
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundReplicatedSchedulePoolTask, "Number of active tasks in BackgroundReplicatedSchedulePoolTask. The pool is used by replicated database for executing DDL log coming from other replicas. One task corresponds to one replicated database") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \ M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \

View File

@ -129,6 +129,60 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
return statement_buf.str(); return statement_buf.str();
} }
void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemoryMetadata & metadata)
{
auto & ast_create_query = query->as<ASTCreateQuery &>();
bool has_structure = ast_create_query.columns_list && ast_create_query.columns_list->columns;
if (ast_create_query.as_table_function && !has_structure)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(ast_create_query.table));
assert(has_structure);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (metadata.select.select_query)
{
query->replace(ast_create_query.select, metadata.select.select_query);
}
/// MaterializedView is one type of CREATE query without storage.
if (ast_create_query.storage)
{
ASTStorage & storage_ast = *ast_create_query.storage;
bool is_extended_storage_def
= storage_ast.partition_by || storage_ast.primary_key || storage_ast.order_by || storage_ast.sample_by || storage_ast.settings;
if (is_extended_storage_def)
{
if (metadata.sorting_key.definition_ast)
storage_ast.set(storage_ast.order_by, metadata.sorting_key.definition_ast);
if (metadata.primary_key.definition_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key.definition_ast);
if (metadata.sampling_key.definition_ast)
storage_ast.set(storage_ast.sample_by, metadata.sampling_key.definition_ast);
if (metadata.table_ttl.definition_ast)
storage_ast.set(storage_ast.ttl_table, metadata.table_ttl.definition_ast);
else if (storage_ast.ttl_table != nullptr) /// TTL was removed
storage_ast.ttl_table = nullptr;
if (metadata.settings_changes)
storage_ast.set(storage_ast.settings, metadata.settings_changes);
}
}
}
DatabaseOnDisk::DatabaseOnDisk( DatabaseOnDisk::DatabaseOnDisk(
const String & name, const String & name,
const String & metadata_path_, const String & metadata_path_,

View File

@ -25,6 +25,8 @@ std::pair<String, StoragePtr> createTableFromAST(
*/ */
String getObjectDefinitionFromCreateQuery(const ASTPtr & query); String getObjectDefinitionFromCreateQuery(const ASTPtr & query);
void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemoryMetadata & metadata);
/* Class to provide basic operations with tables when metadata is stored on disk in .sql files. /* Class to provide basic operations with tables when metadata is stored on disk in .sql files.
*/ */

View File

@ -272,55 +272,7 @@ void DatabaseOrdinary::alterTable(const Context & context, const StorageID & tab
0, 0,
context.getSettingsRef().max_parser_depth); context.getSettingsRef().max_parser_depth);
auto & ast_create_query = ast->as<ASTCreateQuery &>(); applyMetadataChangesToCreateQuery(ast, metadata);
bool has_structure = ast_create_query.columns_list && ast_create_query.columns_list->columns;
if (ast_create_query.as_table_function && !has_structure)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(table_name));
assert(has_structure);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (metadata.select.select_query)
{
ast->replace(ast_create_query.select, metadata.select.select_query);
}
/// MaterializedView is one type of CREATE query without storage.
if (ast_create_query.storage)
{
ASTStorage & storage_ast = *ast_create_query.storage;
bool is_extended_storage_def
= storage_ast.partition_by || storage_ast.primary_key || storage_ast.order_by || storage_ast.sample_by || storage_ast.settings;
if (is_extended_storage_def)
{
if (metadata.sorting_key.definition_ast)
storage_ast.set(storage_ast.order_by, metadata.sorting_key.definition_ast);
if (metadata.primary_key.definition_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key.definition_ast);
if (metadata.sampling_key.definition_ast)
storage_ast.set(storage_ast.sample_by, metadata.sampling_key.definition_ast);
if (metadata.table_ttl.definition_ast)
storage_ast.set(storage_ast.ttl_table, metadata.table_ttl.definition_ast);
else if (storage_ast.ttl_table != nullptr) /// TTL was removed
storage_ast.ttl_table = nullptr;
if (metadata.settings_changes)
storage_ast.set(storage_ast.settings, metadata.settings_changes);
}
}
statement = getObjectDefinitionFromCreateQuery(ast); statement = getObjectDefinitionFromCreateQuery(ast);
{ {

View File

@ -134,6 +134,7 @@ std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String
ClusterPtr DatabaseReplicated::getCluster() const ClusterPtr DatabaseReplicated::getCluster() const
{ {
/// TODO Maintain up-to-date Cluster and allow to use it in Distributed tables
Strings hosts; Strings hosts;
Strings host_ids; Strings host_ids;
@ -149,6 +150,7 @@ ClusterPtr DatabaseReplicated::getCluster() const
if (hosts.empty()) if (hosts.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No hosts found"); throw Exception(ErrorCodes::LOGICAL_ERROR, "No hosts found");
Int32 cver = stat.cversion; Int32 cver = stat.cversion;
std::sort(hosts.begin(), hosts.end());
std::vector<zkutil::ZooKeeper::FutureGet> futures; std::vector<zkutil::ZooKeeper::FutureGet> futures;
futures.reserve(hosts.size()); futures.reserve(hosts.size());
@ -174,7 +176,6 @@ ClusterPtr DatabaseReplicated::getCluster() const
assert(!hosts.empty()); assert(!hosts.empty());
assert(hosts.size() == host_ids.size()); assert(hosts.size() == host_ids.size());
std::sort(hosts.begin(), hosts.end());
String current_shard = parseFullReplicaName(hosts.front()).first; String current_shard = parseFullReplicaName(hosts.front()).first;
std::vector<Strings> shards; std::vector<Strings> shards;
shards.emplace_back(); shards.emplace_back();
@ -327,9 +328,7 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_
if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0) if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io; return io;
//FIXME need list of all replicas, we can obtain it from zk Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
Strings hosts_to_wait;
hosts_to_wait.emplace_back(getFullReplicaName());
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, query_context, hosts_to_wait); auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, query_context, hosts_to_wait);
io.in = std::move(stream); io.in = std::move(stream);
return io; return io;
@ -338,7 +337,7 @@ BlockIO DatabaseReplicated::propose(const ASTPtr & query, const Context & query_
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot) void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 from_snapshot)
{ {
LOG_WARNING(log, "Will recover replica"); //LOG_WARNING(log, "Will recover replica");
//FIXME drop old tables //FIXME drop old tables
@ -355,7 +354,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
Context query_context = global_context; Context query_context = global_context;
query_context.makeQueryContext(); query_context.makeQueryContext();
query_context.getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY; query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context.setCurrentDatabase(database_name); query_context.setCurrentDatabase(database_name);
query_context.setCurrentQueryId(""); // generate random query_id query_context.setCurrentQueryId(""); // generate random query_id
@ -436,6 +435,8 @@ void DatabaseReplicated::renameTable(const Context & context, const String & tab
{ {
if (this != &to_database) if (this != &to_database)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases is not supported for Replicated engine"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases is not supported for Replicated engine");
if (table_name == to_table_name)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Cannot rename table to itself");
if (!isTableExist(table_name, context)) if (!isTableExist(table_name, context))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_name); throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_name);
if (exchange && !to_database.isTableExist(to_table_name, context)) if (exchange && !to_database.isTableExist(to_table_name, context))

View File

@ -48,7 +48,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
UInt32 our_log_ptr = parse<UInt32>(current_zookeeper->get(database->replica_path + "/log_ptr")); UInt32 our_log_ptr = parse<UInt32>(current_zookeeper->get(database->replica_path + "/log_ptr"));
UInt32 max_log_ptr = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/max_log_ptr")); UInt32 max_log_ptr = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
UInt32 logs_to_keep = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/logs_to_keep")); UInt32 logs_to_keep = parse<UInt32>(current_zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
if (our_log_ptr + logs_to_keep < max_log_ptr) if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
database->recoverLostReplica(current_zookeeper, 0); database->recoverLostReplica(current_zookeeper, 0);
} }

View File

@ -42,7 +42,6 @@ public:
NO_QUERY = 0, /// Uninitialized object. NO_QUERY = 0, /// Uninitialized object.
INITIAL_QUERY = 1, INITIAL_QUERY = 1,
SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed or ON CLUSTER query execution. SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed or ON CLUSTER query execution.
REPLICATED_LOG_QUERY = 3, /// Query from replicated DDL log.
}; };

View File

@ -79,7 +79,6 @@ namespace CurrentMetrics
extern const Metric BackgroundSchedulePoolTask; extern const Metric BackgroundSchedulePoolTask;
extern const Metric BackgroundBufferFlushSchedulePoolTask; extern const Metric BackgroundBufferFlushSchedulePoolTask;
extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric BackgroundReplicatedSchedulePoolTask;
extern const Metric BackgroundMessageBrokerSchedulePoolTask; extern const Metric BackgroundMessageBrokerSchedulePoolTask;
} }

View File

@ -622,7 +622,6 @@ public:
BackgroundSchedulePool & getSchedulePool() const; BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const; BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
BackgroundSchedulePool & getDistributedSchedulePool() const; BackgroundSchedulePool & getDistributedSchedulePool() const;
BackgroundSchedulePool & getReplicatedSchedulePool() const;
/// Has distributed_ddl configuration or not. /// Has distributed_ddl configuration or not.
bool hasDistributedDDL() const; bool hasDistributedDDL() const;

View File

@ -296,7 +296,7 @@ String DatabaseReplicatedTask::getShardID() const
std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from_context) std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from_context)
{ {
auto query_context = DDLTaskBase::makeQueryContext(from_context); auto query_context = DDLTaskBase::makeQueryContext(from_context);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::REPLICATED_LOG_QUERY; //FIXME why do we need separate query kind? query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentDatabase(database->getDatabaseName()); query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<MetadataTransaction>(); auto txn = std::make_shared<MetadataTransaction>();
@ -340,7 +340,7 @@ void MetadataTransaction::commit()
assert(state == CREATED); assert(state == CREATED);
state = FAILED; state = FAILED;
current_zookeeper->multi(ops); current_zookeeper->multi(ops);
state = COMMITED; state = COMMITTED;
} }
} }

View File

@ -144,7 +144,7 @@ struct MetadataTransaction
enum State enum State
{ {
CREATED, CREATED,
COMMITED, COMMITTED,
FAILED FAILED
}; };
@ -154,10 +154,11 @@ struct MetadataTransaction
bool is_initial_query; bool is_initial_query;
Coordination::Requests ops; Coordination::Requests ops;
void addOps(Coordination::Requests & other_ops) void moveOpsTo(Coordination::Requests & other_ops)
{ {
std::move(ops.begin(), ops.end(), std::back_inserter(other_ops)); std::move(ops.begin(), ops.end(), std::back_inserter(other_ops));
ops.clear(); ops.clear();
state = COMMITTED;
} }
void commit(); void commit();

View File

@ -42,7 +42,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED; extern const int UNFINISHED;
@ -51,7 +50,6 @@ namespace ErrorCodes
extern const int CANNOT_ASSIGN_ALTER; extern const int CANNOT_ASSIGN_ALTER;
extern const int CANNOT_ALLOCATE_MEMORY; extern const int CANNOT_ALLOCATE_MEMORY;
extern const int MEMORY_LIMIT_EXCEEDED; extern const int MEMORY_LIMIT_EXCEEDED;
extern const int INCORRECT_QUERY;
} }

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY; extern const int INCORRECT_QUERY;
extern const int NOT_IMPLEMENTED;
} }
@ -49,7 +50,7 @@ BlockIO InterpreterAlterQuery::execute()
auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary); auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name); DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name); auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock(); guard->releaseTableLock();
@ -60,8 +61,6 @@ BlockIO InterpreterAlterQuery::execute()
auto alter_lock = table->lockForAlter(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); auto alter_lock = table->lockForAlter(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr(); auto metadata_snapshot = table->getInMemoryMetadataPtr();
//FIXME commit MetadataTransaction for all ALTER kinds. Now its' implemented only for metadata alter.
/// Add default database to table identifiers that we can encounter in e.g. default expressions, /// Add default database to table identifiers that we can encounter in e.g. default expressions,
/// mutation expression, etc. /// mutation expression, etc.
AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName()); AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName());
@ -95,6 +94,14 @@ BlockIO InterpreterAlterQuery::execute()
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
} }
if (typeid_cast<DatabaseReplicated *>(database.get()))
{
int command_types_count = !mutation_commands.empty() + !partition_commands.empty() + !live_view_commands.empty() + !alter_commands.empty();
if (1 < command_types_count)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "For Replicated databases it's not allowed "
"to execute ALTERs of different types in single query");
}
if (!mutation_commands.empty()) if (!mutation_commands.empty())
{ {
MutationsInterpreter(table, metadata_snapshot, mutation_commands, context, false).validate(); MutationsInterpreter(table, metadata_snapshot, mutation_commands, context, false).validate();

View File

@ -149,7 +149,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
engine = makeASTFunction("Replicated", engine = makeASTFunction("Replicated",
std::make_shared<ASTLiteral>(fmt::format("/clickhouse/db/{}/", create.database)), std::make_shared<ASTLiteral>(fmt::format("/clickhouse/db/{}/", create.database)),
std::make_shared<ASTLiteral>("s1"), std::make_shared<ASTLiteral>("s1"),
std::make_shared<ASTLiteral>("r1")); std::make_shared<ASTLiteral>("r" + toString(getpid())));
} }
engine->no_empty_args = true; engine->no_empty_args = true;
@ -573,8 +573,9 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
/// Set the table engine if it was not specified explicitly. /// Set the table engine if it was not specified explicitly.
setEngine(create); setEngine(create);
create.as_database.clear(); assert(as_database_saved.empty() && as_table_saved.empty());
create.as_table.clear(); std::swap(create.as_database, as_database_saved);
std::swap(create.as_table, as_table_saved);
return properties; return properties;
} }
@ -722,7 +723,7 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
const auto * kind = create.is_dictionary ? "Dictionary" : "Table"; const auto * kind = create.is_dictionary ? "Dictionary" : "Table";
const auto * kind_upper = create.is_dictionary ? "DICTIONARY" : "TABLE"; const auto * kind_upper = create.is_dictionary ? "DICTIONARY" : "TABLE";
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind == ClientInfo::QueryKind::REPLICATED_LOG_QUERY && !internal) if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY && !internal)
{ {
if (create.uuid == UUIDHelpers::Nil) if (create.uuid == UUIDHelpers::Nil)
throw Exception("Table UUID is not specified in DDL log", ErrorCodes::LOGICAL_ERROR); throw Exception("Table UUID is not specified in DDL log", ErrorCodes::LOGICAL_ERROR);
@ -753,7 +754,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
} }
else else
{ {
assert(context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY);
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster) if (create.uuid != UUIDHelpers::Nil && !is_on_cluster)
throw Exception(ErrorCodes::INCORRECT_QUERY, throw Exception(ErrorCodes::INCORRECT_QUERY,
@ -850,7 +850,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
"Data directory {} must be inside {} to attach it", String(data_path), String(user_files)); "Data directory {} must be inside {} to attach it", String(data_path), String(user_files));
} }
} }
else if (create.attach && !create.attach_short_syntax && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) else if (create.attach && !create.attach_short_syntax && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
auto * log = &Poco::Logger::get("InterpreterCreateQuery"); auto * log = &Poco::Logger::get("InterpreterCreateQuery");
LOG_WARNING(log, "ATTACH TABLE query with full table definition is not recommended: " LOG_WARNING(log, "ATTACH TABLE query with full table definition is not recommended: "
@ -874,16 +874,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way. /// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create); TableProperties properties = setProperties(create);
/// DDL log for replicated databases can not
/// contain the right database name for every replica
/// therefore for such queries the AST database
/// field is modified right before an actual execution
if (context.getClientInfo().query_kind == ClientInfo::QueryKind::REPLICATED_LOG_QUERY)
{
create.database = current_database;
}
//TODO make code better if possible
DatabasePtr database; DatabasePtr database;
bool need_add_to_database = !create.temporary; bool need_add_to_database = !create.temporary;
if (need_add_to_database) if (need_add_to_database)
@ -893,7 +883,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{ {
auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table); auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
database = DatabaseCatalog::instance().getDatabase(create.database); database = DatabaseCatalog::instance().getDatabase(create.database);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
assertOrSetUUID(create, database); assertOrSetUUID(create, database);
guard->releaseTableLock(); guard->releaseTableLock();
@ -930,9 +920,6 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table); guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
database = DatabaseCatalog::instance().getDatabase(create.database); database = DatabaseCatalog::instance().getDatabase(create.database);
//TODO do we need it?
if (database->getEngineName() == "Replicated" && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed");
assertOrSetUUID(create, database); assertOrSetUUID(create, database);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard. /// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
@ -1107,9 +1094,10 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, dictionary_name); auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, dictionary_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name); DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
assertOrSetUUID(create, database); if (!create.attach)
assertOrSetUUID(create, database);
guard->releaseTableLock(); guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context); return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
} }
@ -1266,15 +1254,14 @@ AccessRightsElements InterpreterCreateQuery::getRequiredAccess() const
return required_access; return required_access;
} }
void InterpreterCreateQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, const Context &) const void InterpreterCreateQuery::extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, const Context &) const
{ {
const auto & create = ast->as<const ASTCreateQuery &>();
elem.query_kind = "Create"; elem.query_kind = "Create";
if (!create.as_table.empty()) if (!as_table_saved.empty())
{ {
String database = backQuoteIfNeed(create.as_database.empty() ? context.getCurrentDatabase() : create.as_database); String database = backQuoteIfNeed(as_database_saved.empty() ? context.getCurrentDatabase() : as_database_saved);
elem.query_databases.insert(database); elem.query_databases.insert(database);
elem.query_tables.insert(database + "." + backQuoteIfNeed(create.as_table)); elem.query_tables.insert(database + "." + backQuoteIfNeed(as_table_saved));
} }
} }

View File

@ -95,5 +95,8 @@ private:
/// Is this an internal query - not from the user. /// Is this an internal query - not from the user.
bool internal = false; bool internal = false;
bool force_attach = false; bool force_attach = false;
mutable String as_database_saved;
mutable String as_table_saved;
}; };
} }

View File

@ -129,7 +129,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
/// Prevents recursive drop from drop database query. The original query must specify a table. /// Prevents recursive drop from drop database query. The original query must specify a table.
bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty(); bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty();
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) && bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY &&
!is_drop_or_detach_database; !is_drop_or_detach_database;
if (is_replicated_ddl_query) if (is_replicated_ddl_query)
{ {
@ -137,6 +137,13 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH TABLE is not allowed for Replicated databases. " throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH TABLE is not allowed for Replicated databases. "
"Use DETACH TABLE PERMANENTLY or SYSTEM RESTART REPLICA"); "Use DETACH TABLE PERMANENTLY or SYSTEM RESTART REPLICA");
if (query.kind == ASTDropQuery::Kind::Detach)
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
else if (query.kind == ASTDropQuery::Kind::Truncate)
context.checkAccess(AccessType::TRUNCATE, table_id);
else if (query.kind == ASTDropQuery::Kind::Drop)
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
ddl_guard->releaseTableLock(); ddl_guard->releaseTableLock();
table.reset(); table.reset();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query.clone(), context); return typeid_cast<DatabaseReplicated *>(database.get())->propose(query.clone(), context);
@ -214,13 +221,15 @@ BlockIO InterpreterDropQuery::executeToDictionary(
bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty(); bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty();
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) && bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY &&
!is_drop_or_detach_database; !is_drop_or_detach_database;
if (is_replicated_ddl_query) if (is_replicated_ddl_query)
{ {
if (kind == ASTDropQuery::Kind::Detach) if (kind == ASTDropQuery::Kind::Detach)
throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH DICTIONARY is not allowed for Replicated databases."); throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH DICTIONARY is not allowed for Replicated databases.");
context.checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
ddl_guard->releaseTableLock(); ddl_guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context); return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr, context);
} }

View File

@ -80,7 +80,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context); database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context);
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name); DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
if (1 < descriptions.size()) if (1 < descriptions.size())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database {} is Replicated, " throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database {} is Replicated, "

View File

@ -454,7 +454,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries /// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
bool is_on_cluster = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; bool is_on_cluster = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::REPLICATED_LOG_QUERY && bool is_replicated_database = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated"; DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";
bool allow_uuid_macro = is_on_cluster || is_replicated_database || args.query.attach; bool allow_uuid_macro = is_on_cluster || is_replicated_database || args.query.attach;

View File

@ -4284,24 +4284,12 @@ void StorageReplicatedMergeTree::alter(
if (auto txn = query_context.getMetadataTransaction()) if (auto txn = query_context.getMetadataTransaction())
{ {
txn->addOps(ops); txn->moveOpsTo(ops);
/// NOTE: IDatabase::alterTable(...) is called when executing ALTER_METADATA queue entry without query context, /// NOTE: IDatabase::alterTable(...) is called when executing ALTER_METADATA queue entry without query context,
/// so we have to update metadata of DatabaseReplicated here. /// so we have to update metadata of DatabaseReplicated here.
/// It also may cause "Table columns structure in ZooKeeper is different" error on server startup
/// even for Ordinary and Atomic databases.
String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name); String metadata_zk_path = txn->zookeeper_path + "/metadata/" + escapeForFileName(table_id.table_name);
auto ast = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, query_context); auto ast = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, query_context);
auto & ast_create_query = ast->as<ASTCreateQuery &>(); applyMetadataChangesToCreateQuery(ast, future_metadata);
//FIXME copy-paste
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(future_metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(future_metadata.secondary_indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(future_metadata.constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, getObjectDefinitionFromCreateQuery(ast), -1)); ops.emplace_back(zkutil::makeSetRequest(metadata_zk_path, getObjectDefinitionFromCreateQuery(ast), -1));
} }
@ -4450,7 +4438,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
else else
{ {
String partition_id = getPartitionIDFromQuery(partition, query_context); String partition_id = getPartitionIDFromQuery(partition, query_context);
did_drop = dropAllPartsInPartition(*zookeeper, partition_id, entry, detach); did_drop = dropAllPartsInPartition(*zookeeper, partition_id, entry, query_context, detach);
} }
if (did_drop) if (did_drop)
@ -4474,7 +4462,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
void StorageReplicatedMergeTree::truncate( void StorageReplicatedMergeTree::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder & table_lock) const ASTPtr &, const StorageMetadataPtr &, const Context & query_context, TableExclusiveLockHolder & table_lock)
{ {
table_lock.release(); /// Truncate is done asynchronously. table_lock.release(); /// Truncate is done asynchronously.
@ -4490,7 +4478,7 @@ void StorageReplicatedMergeTree::truncate(
{ {
LogEntry entry; LogEntry entry;
if (dropAllPartsInPartition(*zookeeper, partition_id, entry, false)) if (dropAllPartsInPartition(*zookeeper, partition_id, entry, query_context, false))
waitForAllReplicasToProcessLogEntry(entry); waitForAllReplicasToProcessLogEntry(entry);
} }
} }
@ -5274,6 +5262,9 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
requests.emplace_back(zkutil::makeCreateRequest( requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential)); mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
if (auto txn = query_context.getMetadataTransaction())
txn->moveOpsTo(requests);
Coordination::Responses responses; Coordination::Responses responses;
Coordination::Error rc = zookeeper->tryMulti(requests, responses); Coordination::Error rc = zookeeper->tryMulti(requests, responses);
@ -5775,6 +5766,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
} }
} }
if (auto txn = context.getMetadataTransaction())
txn->moveOpsTo(ops);
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
@ -6243,7 +6237,7 @@ bool StorageReplicatedMergeTree::dropPart(
} }
bool StorageReplicatedMergeTree::dropAllPartsInPartition( bool StorageReplicatedMergeTree::dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach) zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, const Context & query_context, bool detach)
{ {
MergeTreePartInfo drop_range_info; MergeTreePartInfo drop_range_info;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info)) if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
@ -6275,6 +6269,8 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
Coordination::Requests ops; Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version. ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); /// Just update version.
if (auto txn = query_context.getMetadataTransaction())
txn->moveOpsTo(ops);
Coordination::Responses responses = zookeeper.multi(ops); Coordination::Responses responses = zookeeper.multi(ops);
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created; String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;

View File

@ -134,7 +134,7 @@ public:
*/ */
void drop() override; void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override; void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context & query_context, TableExclusiveLockHolder &) override;
void checkTableCanBeRenamed() const override; void checkTableCanBeRenamed() const override;
@ -577,7 +577,7 @@ private:
bool dropPart(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop); bool dropPart(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
bool dropAllPartsInPartition( bool dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, bool detach); zkutil::ZooKeeper & zookeeper, String & partition_id, LogEntry & entry, const Context & query_context, bool detach);
// Partition helpers // Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context, bool throw_if_noop) override; void dropPartition(const ASTPtr & partition, bool detach, bool drop_part, const Context & query_context, bool throw_if_noop) override;

View File

@ -41,7 +41,7 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context
} }
} }
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster) const void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster)
{ {
const String & cluster_name = name_and_cluster.first; const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second; const ClusterPtr & cluster = name_and_cluster.second;

View File

@ -29,7 +29,7 @@ protected:
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>; using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override; void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster) const; static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster);
}; };
} }

View File

@ -147,7 +147,16 @@ def test_alters_from_different_replicas(started_cluster):
main_node.query("SYSTEM FLUSH DISTRIBUTED testdb.dist") main_node.query("SYSTEM FLUSH DISTRIBUTED testdb.dist")
main_node.query("ALTER TABLE testdb.concurrent_test UPDATE StartDate = addYears(StartDate, 1) WHERE 1") main_node.query("ALTER TABLE testdb.concurrent_test UPDATE StartDate = addYears(StartDate, 1) WHERE 1")
main_node.query("ALTER TABLE testdb.concurrent_test DELETE WHERE UserID % 2") res = main_node.query("ALTER TABLE testdb.concurrent_test DELETE WHERE UserID % 2")
assert "shard1|replica1" in res and "shard1|replica2" in res and "shard1|replica3" in res
assert "shard2|replica1" in res and "shard2|replica2" in res
expected = "1\t1\tmain_node\n" \
"1\t2\tdummy_node\n" \
"1\t3\tcompeting_node\n" \
"2\t1\tsnapshotting_node\n" \
"2\t2\tsnapshot_recovering_node\n"
assert main_node.query("SELECT shard_num, replica_num, host_name FROM system.clusters WHERE cluster='testdb'") == expected
# test_drop_and_create_replica # test_drop_and_create_replica
main_node.query("DROP DATABASE testdb") main_node.query("DROP DATABASE testdb")

View File

@ -113,8 +113,8 @@ timeout $TIMEOUT bash -c thread7 2> /dev/null &
wait wait
$CLICKHOUSE_CLIENT -q "SELECT 'Still alive'" $CLICKHOUSE_CLIENT -q "SELECT 'Still alive'"
$CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY database_for_dict.dict1" $CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY IF NOT EXISTS database_for_dict.dict1"
$CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY database_for_dict.dict2" $CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY IF NOT EXISTS database_for_dict.dict2"
$CLICKHOUSE_CLIENT -n -q " $CLICKHOUSE_CLIENT -n -q "
DROP TABLE table_for_dict1; DROP TABLE table_for_dict1;

View File

@ -103,6 +103,12 @@
"memory_tracking", /// FIXME remove it before merge "memory_tracking", /// FIXME remove it before merge
"memory_tracking", "memory_tracking",
"memory_usage", "memory_usage",
"01188_attach_table_from_pat",
"01110_dictionary_layout_without_arguments",
"01018_ddl_dictionaries_create",
"01018_ddl_dictionaries_select",
"01414_freeze_does_not_prevent_alters",
"01018_ddl_dictionaries_bad_queries",
"01686_rocksdb", "01686_rocksdb",
"01550_mutation_subquery", "01550_mutation_subquery",
"01070_mutations_with_dependencies", "01070_mutations_with_dependencies",