This commit is contained in:
Alexander Tokmakov 2021-01-26 20:51:25 +03:00
parent 3146a1a954
commit f20d5e3b41
12 changed files with 78 additions and 51 deletions

View File

@ -115,8 +115,8 @@ void DatabaseAtomic::dropTable(const Context & context, const String & table_nam
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
table = getTableUnlocked(table_name, lock); table = getTableUnlocked(table_name, lock);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID()); table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
auto txn = context.getMetadataTransaction();
if (auto txn = context.getMetadataTransaction()) if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database txn->commit(); /// Commit point (a sort of) for Replicated database
/// NOTE: replica will be lost if server crashes before the following rename /// NOTE: replica will be lost if server crashes before the following rename
@ -241,7 +241,8 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
} }
/// Table renaming actually begins here /// Table renaming actually begins here
if (auto txn = context.getMetadataTransaction()) auto txn = context.getMetadataTransaction();
if (txn && !context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database txn->commit(); /// Commit point (a sort of) for Replicated database
/// NOTE: replica will be lost if server crashes before the following rename /// NOTE: replica will be lost if server crashes before the following rename
@ -301,7 +302,8 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
DatabaseCatalog::instance().addUUIDMapping(query.uuid); DatabaseCatalog::instance().addUUIDMapping(query.uuid);
locked_uuid = true; locked_uuid = true;
if (auto txn = query_context.getMetadataTransaction()) auto txn = query_context.getMetadataTransaction();
if (txn && !query_context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database txn->commit(); /// Commit point (a sort of) for Replicated database
/// NOTE: replica will be lost if server crashes before the following renameNoReplace(...) /// NOTE: replica will be lost if server crashes before the following renameNoReplace(...)
@ -335,7 +337,8 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
if (table_id.uuid != actual_table_id.uuid) if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER); throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
if (auto txn = query_context.getMetadataTransaction()) auto txn = query_context.getMetadataTransaction();
if (txn && !query_context.isInternalSubquery())
txn->commit(); /// Commit point (a sort of) for Replicated database txn->commit(); /// Commit point (a sort of) for Replicated database
/// NOTE: replica will be lost if server crashes before the following rename /// NOTE: replica will be lost if server crashes before the following rename

View File

@ -64,7 +64,7 @@ public:
void shutdown() override; void shutdown() override;
void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach = false) override; void loadStoredObjects(Context & context, bool has_force_restore_data_flag, bool force_attach) override;
String getFullReplicaName() const { return shard_name + '|' + replica_name; } String getFullReplicaName() const { return shard_name + '|' + replica_name; }

View File

@ -2522,8 +2522,7 @@ void Context::initMetadataTransaction(MetadataTransactionPtr txn)
MetadataTransactionPtr Context::getMetadataTransaction() const MetadataTransactionPtr Context::getMetadataTransaction() const
{ {
//FIXME assert(!metadata_transaction || hasQueryContext());
//assert(query_context == this);
return metadata_transaction; return metadata_transaction;
} }

View File

@ -536,6 +536,7 @@ public:
const Context & getQueryContext() const; const Context & getQueryContext() const;
Context & getQueryContext(); Context & getQueryContext();
bool hasQueryContext() const { return query_context != nullptr; } bool hasQueryContext() const { return query_context != nullptr; }
bool isInternalSubquery() const { return hasQueryContext() && query_context != this; }
const Context & getSessionContext() const; const Context & getSessionContext() const;
Context & getSessionContext(); Context & getSessionContext();

View File

@ -85,9 +85,10 @@ struct DDLTaskBase
ExecutionStatus execution_status; ExecutionStatus execution_status;
bool was_executed = false; bool was_executed = false;
std::atomic_bool completely_processed = false;
DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {} DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {}
DDLTaskBase(const DDLTaskBase &) = delete; DDLTaskBase(const DDLTaskBase &) = delete;
DDLTaskBase(DDLTaskBase &&) = default;
virtual ~DDLTaskBase() = default; virtual ~DDLTaskBase() = default;
void parseQueryFromEntry(const Context & context); void parseQueryFromEntry(const Context & context);

View File

@ -341,9 +341,10 @@ void DDLWorker::scheduleTasks()
auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end()); auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end());
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_task->entry_name); begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_task->entry_name);
current_tasks.clear(); current_tasks.clear();
//FIXME better way of maintaning current tasks list and min_task name;
} }
assert(current_tasks.empty());
for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it) for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{ {
String entry_name = *it; String entry_name = *it;
@ -378,12 +379,8 @@ void DDLWorker::scheduleTasks()
DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task) DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
{ {
//assert(current_tasks.size() <= pool_size + 1); std::remove_if(current_tasks.begin(), current_tasks.end(), [](const DDLTaskPtr & t) { return t->completely_processed.load(); });
//if (current_tasks.size() == pool_size) assert(current_tasks.size() <= pool_size);
//{
// assert(current_tasks.front()->ops.empty()); //FIXME
// current_tasks.pop_front();
//}
current_tasks.emplace_back(std::move(task)); current_tasks.emplace_back(std::move(task));
return *current_tasks.back(); return *current_tasks.back();
} }
@ -555,6 +552,8 @@ void DDLWorker::processTask(DDLTaskBase & task)
active_node->reset(); active_node->reset();
task.ops.clear(); task.ops.clear();
} }
task.completely_processed = true;
} }
@ -572,6 +571,9 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, const Storage
// Setting alters should be executed on all replicas // Setting alters should be executed on all replicas
if (alter->isSettingsAlter()) if (alter->isSettingsAlter())
return false; return false;
if (alter->isFreezeAlter())
return false;
} }
return storage->supportsReplication(); return storage->supportsReplication();
@ -856,28 +858,20 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
void DDLWorker::initializeMainThread() void DDLWorker::initializeMainThread()
{ {
auto reset_state = [&](bool reset_pool = true) assert(!initialized);
{ assert(max_id == 0);
initialized = false; assert(current_tasks.empty());
/// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
/// We create new thread pool to forget previous exceptions.
if (reset_pool)
worker_pool = std::make_unique<ThreadPool>(pool_size);
/// Clear other in-memory state, like server just started.
current_tasks.clear();
max_id = 0;
};
setThreadName("DDLWorker"); setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread"); LOG_DEBUG(log, "Started DDLWorker thread");
do while (!stop_flag)
{ {
try try
{ {
auto zookeeper = getAndSetZooKeeper(); auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / ""); zookeeper->createAncestors(fs::path(queue_dir) / "");
initialized = true; initialized = true;
return;
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
@ -885,33 +879,29 @@ void DDLWorker::initializeMainThread()
{ {
/// A logical error. /// A logical error.
LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true)); LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true));
reset_state(false);
assert(false); /// Catch such failures in tests with debug build assert(false); /// Catch such failures in tests with debug build
} }
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds(5);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log, "Cannot initialize DDL queue."); tryLogCurrentException(log, "Cannot initialize DDL queue.");
reset_state(false);
sleepForSeconds(5);
} }
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds(5);
} }
while (!initialized && !stop_flag);
} }
void DDLWorker::runMainThread() void DDLWorker::runMainThread()
{ {
auto reset_state = [&](bool reset_pool = true) auto reset_state = [&]()
{ {
initialized = false; initialized = false;
/// It will wait for all threads in pool to finish and will not rethrow exceptions (if any). /// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
/// We create new thread pool to forget previous exceptions. /// We create new thread pool to forget previous exceptions.
if (reset_pool) if (1 < pool_size)
worker_pool = std::make_unique<ThreadPool>(pool_size); worker_pool = std::make_unique<ThreadPool>(pool_size);
/// Clear other in-memory state, like server just started. /// Clear other in-memory state, like server just started.
current_tasks.clear(); current_tasks.clear();
@ -944,6 +934,7 @@ void DDLWorker::runMainThread()
if (Coordination::isHardwareError(e.code)) if (Coordination::isHardwareError(e.code))
{ {
initialized = false; initialized = false;
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
} }
else if (e.code == Coordination::Error::ZNONODE) else if (e.code == Coordination::Error::ZNONODE)
{ {
@ -953,10 +944,10 @@ void DDLWorker::runMainThread()
} }
else else
{ {
LOG_ERROR(log, "Unexpected ZooKeeper error: {}", getCurrentExceptionMessage(true)); LOG_ERROR(log, "Unexpected ZooKeeper error, will try to restart main thread: {}", getCurrentExceptionMessage(true));
reset_state(); reset_state();
} }
sleepForSeconds(5); sleepForSeconds(1);
} }
catch (...) catch (...)
{ {

View File

@ -13,6 +13,10 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
InterpreterRenameQuery::InterpreterRenameQuery(const ASTPtr & query_ptr_, Context & context_) InterpreterRenameQuery::InterpreterRenameQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) : query_ptr(query_ptr_), context(context_)
@ -78,6 +82,9 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name); DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY) if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::REPLICATED_LOG_QUERY)
{ {
if (1 < descriptions.size())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Database {} is Replicated, "
"it does not support renaming of multiple tables in single query.", elem.from_database_name);
return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr); return typeid_cast<DatabaseReplicated *>(database.get())->propose(query_ptr);
} }
else else

View File

@ -31,12 +31,13 @@ namespace ErrorCodes
bool isSupportedAlterType(int type) bool isSupportedAlterType(int type)
{ {
assert(type != ASTAlterCommand::NO_TYPE);
static const std::unordered_set<int> unsupported_alter_types{ static const std::unordered_set<int> unsupported_alter_types{
/// It's dangerous, because it may duplicate data if executed on multiple replicas
ASTAlterCommand::ATTACH_PARTITION, ASTAlterCommand::ATTACH_PARTITION,
ASTAlterCommand::REPLACE_PARTITION, /// Usually followed by ATTACH PARTITION
ASTAlterCommand::FETCH_PARTITION, ASTAlterCommand::FETCH_PARTITION,
ASTAlterCommand::FREEZE_PARTITION, /// Logical error
ASTAlterCommand::FREEZE_ALL,
ASTAlterCommand::NO_TYPE, ASTAlterCommand::NO_TYPE,
}; };

View File

@ -344,7 +344,7 @@ void ASTAlterCommand::formatImpl(
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
} }
bool ASTAlterQuery::isSettingsAlter() const bool ASTAlterQuery::isOneCommandTypeOnly(const ASTAlterCommand::Type & type) const
{ {
if (command_list) if (command_list)
{ {
@ -353,7 +353,7 @@ bool ASTAlterQuery::isSettingsAlter() const
for (const auto & child : command_list->children) for (const auto & child : command_list->children)
{ {
const auto & command = child->as<const ASTAlterCommand &>(); const auto & command = child->as<const ASTAlterCommand &>();
if (command.type != ASTAlterCommand::MODIFY_SETTING) if (command.type != type)
return false; return false;
} }
return true; return true;
@ -361,6 +361,16 @@ bool ASTAlterQuery::isSettingsAlter() const
return false; return false;
} }
bool ASTAlterQuery::isSettingsAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::MODIFY_SETTING);
}
bool ASTAlterQuery::isFreezeAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::FREEZE_PARTITION) || isOneCommandTypeOnly(ASTAlterCommand::FREEZE_ALL);
}
/** Get the text that identifies this element. */ /** Get the text that identifies this element. */
String ASTAlterQuery::getID(char delim) const String ASTAlterQuery::getID(char delim) const
{ {

View File

@ -189,6 +189,8 @@ public:
bool isSettingsAlter() const; bool isSettingsAlter() const;
bool isFreezeAlter() const;
String getID(char) const override; String getID(char) const override;
ASTPtr clone() const override; ASTPtr clone() const override;
@ -200,6 +202,8 @@ public:
protected: protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
bool isOneCommandTypeOnly(const ASTAlterCommand::Type & type) const;
}; };
} }

View File

@ -89,6 +89,7 @@ StorageMaterializedView::StorageMaterializedView(
else else
{ {
/// We will create a query to create an internal table. /// We will create a query to create an internal table.
auto create_context = Context(local_context);
auto manual_create_query = std::make_shared<ASTCreateQuery>(); auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = getStorageID().database_name; manual_create_query->database = getStorageID().database_name;
manual_create_query->table = generateInnerTableName(getStorageID()); manual_create_query->table = generateInnerTableName(getStorageID());
@ -99,7 +100,7 @@ StorageMaterializedView::StorageMaterializedView(
manual_create_query->set(manual_create_query->columns_list, new_columns_list); manual_create_query->set(manual_create_query->columns_list, new_columns_list);
manual_create_query->set(manual_create_query->storage, query.storage->ptr()); manual_create_query->set(manual_create_query->storage, query.storage->ptr());
InterpreterCreateQuery create_interpreter(manual_create_query, local_context); InterpreterCreateQuery create_interpreter(manual_create_query, create_context);
create_interpreter.setInternal(true); create_interpreter.setInternal(true);
create_interpreter.execute(); create_interpreter.execute();
@ -205,7 +206,8 @@ static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context,
drop_query->no_delay = no_delay; drop_query->no_delay = no_delay;
drop_query->if_exists = true; drop_query->if_exists = true;
ASTPtr ast_drop_query = drop_query; ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); auto drop_context = Context(global_context);
InterpreterDropQuery drop_interpreter(ast_drop_query, drop_context);
drop_interpreter.execute(); drop_interpreter.execute();
} }
} }

View File

@ -162,7 +162,12 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None: while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01) sleep(0.01)
if not args.database: need_drop_database = not args.database
if need_drop_database and args.no_drop_if_fail:
maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and (proc.stdout is None or 'Exception' not in proc.stdout)
need_drop_database = not maybe_passed
if need_drop_database:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10) seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10)
try: try:
@ -181,9 +186,10 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
total_time = (datetime.now() - start_time).total_seconds() total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files. if not args.show_db_name:
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file)) # Normalize randomized database names in stdout, stderr files.
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file)) os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file))
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file))
stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b'' stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b''
stdout = str(stdout, errors='replace', encoding='utf-8') stdout = str(stdout, errors='replace', encoding='utf-8')
@ -884,6 +890,8 @@ if __name__ == '__main__':
parser.add_argument('--hung-check', action='store_true', default=False) parser.add_argument('--hung-check', action='store_true', default=False)
parser.add_argument('--force-color', action='store_true', default=False) parser.add_argument('--force-color', action='store_true', default=False)
parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)') parser.add_argument('--database', help='Database for tests (random name test_XXXXXX by default)')
parser.add_argument('--no-drop-if-fail', action='store_true', help='Do not drop database for test if test has failed')
parser.add_argument('--show-db-name', action='store_true', help='Do not replace random database name with "default"')
parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total') parser.add_argument('--parallel', default='1/1', help='One parallel test run number/total')
parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel') parser.add_argument('-j', '--jobs', default=1, nargs='?', type=int, help='Run all tests in parallel')
parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context') parser.add_argument('-U', '--unified', default=3, type=int, help='output NUM lines of unified context')