Merge branch 'master' into s3_optimize

This commit is contained in:
chen 2022-12-29 23:22:11 +08:00 committed by GitHub
commit 6ad3bb2caa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 610 additions and 368 deletions

View File

@ -10,7 +10,7 @@
#include <base/MoveOrCopyIfThrow.h>
/** Pool for limited size objects that cannot be used from different threads simultaneously.
* The main use case is to have fixed size of objects that can be reused in difference threads during their lifetime
* The main use case is to have fixed size of objects that can be reused in different threads during their lifetime
* and have to be initialized on demand.
* Two main properties of pool are allocated objects size and borrowed objects size.
* Allocated objects size is size of objects that are currently allocated by the pool.

View File

@ -61,14 +61,8 @@ namespace
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
}
}

View File

@ -2,11 +2,10 @@
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <filesystem>
#include <thread>
namespace fs = std::filesystem;
namespace DB
@ -97,9 +96,13 @@ std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
LOG_TRACE(getLog(), "Starting {}", serviceAlias());
/// We will terminate it with the KILL signal instead of the TERM signal,
/// because it's more reliable for arbitrary third-party ODBC drivers.
/// The drivers can spawn threads, install their own signal handlers... we don't care.
ShellCommand::Config command_config(path.string());
command_config.arguments = cmd_args;
command_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy(true);
command_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy(true, SIGKILL);
return ShellCommand::executeDirect(command_config);
}

View File

@ -72,11 +72,11 @@ ShellCommand::~ShellCommand()
if (process_terminated_normally)
return;
LOG_TRACE(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
LOG_TRACE(getLogger(), "Will kill shell command pid {} with signal {}", pid, config.terminate_in_destructor_strategy.termination_signal);
int retcode = kill(pid, SIGTERM);
int retcode = kill(pid, config.terminate_in_destructor_strategy.termination_signal);
if (retcode != 0)
LOG_WARNING(getLogger(), "Cannot kill shell command pid {} errno '{}'", pid, errnoToString());
LOG_WARNING(getLogger(), "Cannot kill shell command pid {}, error: '{}'", pid, errnoToString());
}
else
{

View File

@ -27,18 +27,18 @@ namespace DB
class ShellCommand final
{
public:
~ShellCommand();
struct DestructorStrategy final
{
explicit DestructorStrategy(bool terminate_in_destructor_, size_t wait_for_normal_exit_before_termination_seconds_ = 0)
: terminate_in_destructor(terminate_in_destructor_)
explicit DestructorStrategy(bool terminate_in_destructor_, int termination_signal_, size_t wait_for_normal_exit_before_termination_seconds_ = 0)
: terminate_in_destructor(terminate_in_destructor_), termination_signal(termination_signal_)
, wait_for_normal_exit_before_termination_seconds(wait_for_normal_exit_before_termination_seconds_)
{
}
bool terminate_in_destructor;
int termination_signal;
/// If terminate in destructor is true, command will wait until send SIGTERM signal to created process
size_t wait_for_normal_exit_before_termination_seconds = 0;
@ -64,7 +64,7 @@ public:
bool pipe_stdin_only = false;
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false);
DestructorStrategy terminate_in_destructor_strategy = DestructorStrategy(false, 0);
};
/// Run the command using /bin/sh -c.

View File

@ -401,7 +401,7 @@ void DatabaseReplicated::createEmptyLogEntry(const ZooKeeperPtr & current_zookee
bool DatabaseReplicated::waitForReplicaToProcessAllEntries(UInt64 timeout_ms)
{
if (!ddl_worker)
if (!ddl_worker || is_probably_dropped)
return false;
return ddl_worker->waitForReplicaToProcessAllEntries(timeout_ms);
}
@ -473,9 +473,10 @@ void DatabaseReplicated::startupTables(ThreadPool & thread_pool, LoadingStrictne
chassert(!TSA_SUPPRESS_WARNING_FOR_READ(tables_metadata_digest));
TSA_SUPPRESS_WARNING_FOR_WRITE(tables_metadata_digest) = digest;
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
if (is_probably_dropped)
return;
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();
}
@ -491,7 +492,7 @@ bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool
LOG_TEST(log, "Current in-memory metadata digest: {}", tables_metadata_digest);
/// Database is probably being dropped
if (!local_context->getZooKeeperMetadataTransaction() && !ddl_worker->isCurrentlyActive())
if (!local_context->getZooKeeperMetadataTransaction() && (!ddl_worker || !ddl_worker->isCurrentlyActive()))
return true;
UInt64 local_digest = 0;
@ -1019,8 +1020,51 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
return ast;
}
void DatabaseReplicated::dropReplica(
DatabaseReplicated * database, const String & database_zookeeper_path, const String & full_replica_name)
{
assert(!database || database_zookeeper_path == database->zookeeper_path);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name: {}", full_replica_name);
auto zookeeper = Context::getGlobalContextInstance()->getZooKeeper();
String database_mark = zookeeper->get(database_zookeeper_path);
if (database_mark != REPLICATED_DATABASE_MARK)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path {} does not look like a path of Replicated database", database_zookeeper_path);
String database_replica_path = fs::path(database_zookeeper_path) / "replicas" / full_replica_name;
if (!zookeeper->exists(database_replica_path))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica {} does not exist (database path: {})",
full_replica_name, database_zookeeper_path);
if (zookeeper->exists(database_replica_path + "/active"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica {} is active, cannot drop it (database path: {})",
full_replica_name, database_zookeeper_path);
zookeeper->set(database_replica_path, DROPPED_MARK, -1);
/// Notify other replicas that cluster configuration was changed (if we can)
if (database)
database->createEmptyLogEntry(zookeeper);
zookeeper->tryRemoveRecursive(database_replica_path);
if (zookeeper->tryRemove(database_zookeeper_path + "/replicas") == Coordination::Error::ZOK)
{
/// It was the last replica, remove all metadata
zookeeper->tryRemoveRecursive(database_zookeeper_path);
}
}
void DatabaseReplicated::drop(ContextPtr context_)
{
if (is_probably_dropped)
{
/// Don't need to drop anything from ZooKeeper
DatabaseAtomic::drop(context_);
return;
}
auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, DROPPED_MARK, -1);
createEmptyLogEntry(current_zookeeper);
@ -1038,8 +1082,6 @@ void DatabaseReplicated::drop(ContextPtr context_)
void DatabaseReplicated::stopReplication()
{
if (is_probably_dropped)
return;
if (ddl_worker)
ddl_worker->shutdown();
}
@ -1055,7 +1097,7 @@ void DatabaseReplicated::shutdown()
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
if (txn && txn->isInitialQuery() && !txn->isCreateOrReplaceQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);

View File

@ -77,6 +77,8 @@ public:
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
static void dropReplica(DatabaseReplicated * database, const String & database_zookeeper_path, const String & full_replica_name);
friend struct DatabaseReplicatedTask;
friend class DatabaseReplicatedDDLWorker;
private:

View File

@ -2,6 +2,7 @@
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DDLTask.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/ServerUUID.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -36,6 +37,13 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
auto zookeeper = getAndSetZooKeeper();
if (database->is_readonly)
database->tryConnectToZooKeeperAndInitDatabase(LoadingStrictnessLevel::ATTACH);
if (database->is_probably_dropped)
{
/// The flag was set in tryConnectToZooKeeperAndInitDatabase
LOG_WARNING(log, "Exiting main thread, because the database was probably dropped");
/// NOTE It will not stop cleanup thread until DDLWorker::shutdown() call (cleanup thread will just do nothing)
break;
}
initializeReplication();
initialized = true;
return true;
@ -62,6 +70,16 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
/// Invariant: replica is lost if it's log_ptr value is less then max_log_ptr - logs_to_keep.
auto zookeeper = getAndSetZooKeeper();
/// Create "active" node (remove previous one if necessary)
String active_path = fs::path(database->replica_path) / "active";
String active_id = toString(ServerUUID::get());
zookeeper->handleEphemeralNodeExistence(active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
active_node_holder.reset();
active_node_holder_zookeeper = zookeeper;
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
String log_ptr_str = zookeeper->get(database->replica_path + "/log_ptr");
UInt32 our_log_ptr = parse<UInt32>(log_ptr_str);
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));

View File

@ -1,5 +1,6 @@
#pragma once
#include <Interpreters/DDLWorker.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
@ -49,6 +50,12 @@ private:
String current_task;
std::atomic<UInt32> logs_to_keep = std::numeric_limits<UInt32>::max();
/// EphemeralNodeHolder has reference to ZooKeeper, it may become dangling
ZooKeeperPtr active_node_holder_zookeeper;
/// It will remove "active" node when database is detached
zkutil::EphemeralNodeHolderPtr active_node_holder;
};
}

View File

@ -145,13 +145,9 @@ MongoDBDictionarySource::MongoDBDictionarySource(
connection->connect(host, port);
if (!user.empty())
{
#if POCO_VERSION >= 0x01070800
Poco::MongoDB::Database poco_db(db);
if (!poco_db.authenticate(*connection, user, password, method.empty() ? Poco::MongoDB::Database::AUTH_SCRAM_SHA1 : method))
throw Exception(ErrorCodes::MONGODB_CANNOT_AUTHENTICATE, "Cannot authenticate in MongoDB, incorrect user or password");
#else
authenticate(*connection, db, user, password);
#endif
}
}
}

View File

@ -1,4 +1,5 @@
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include "Common/Exception.h"
#if USE_AZURE_BLOB_STORAGE
@ -176,7 +177,9 @@ void AzureObjectStorage::removeObject(const StoredObject & object)
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
throw Exception(
ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file (path: {}) in AzureBlob Storage, reason: {}",
path, delete_info.RawResponse ? delete_info.RawResponse->GetReasonPhrase() : "Unknown");
}
void AzureObjectStorage::removeObjects(const StoredObjects & objects)
@ -187,22 +190,50 @@ void AzureObjectStorage::removeObjects(const StoredObjects & objects)
LOG_TEST(log, "Removing object: {} (total: {})", object.absolute_path, objects.size());
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", object.absolute_path);
throw Exception(
ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file (path: {}) in AzureBlob Storage, reason: {}",
object.absolute_path, delete_info.RawResponse ? delete_info.RawResponse->GetReasonPhrase() : "Unknown");
}
}
void AzureObjectStorage::removeObjectIfExists(const StoredObject & object)
{
auto client_ptr = client.get();
try
{
LOG_TEST(log, "Removing single object: {}", object.absolute_path);
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
}
catch (const Azure::Storage::StorageException & e)
{
/// If object doesn't exist...
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
return;
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
auto client_ptr = client.get();
for (const auto & object : objects)
{
try
{
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
}
catch (const Azure::Storage::StorageException & e)
{
/// If object doesn't exist...
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
return;
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
}
ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) const

View File

@ -133,8 +133,13 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
void finalize() override
{
/// The client for an object storage may do retries internally
/// and there could be a situation when a query succeeded, but the response is lost
/// due to network error or similar. And when it will retry an operation it may receive
/// a 404 HTTP code. We don't want to threat this code as a real error for deletion process
/// (e.g. throwing some exceptions) and thus we just use method `removeObjectsIfExists`
if (!delete_metadata_only && !objects_to_remove.empty())
object_storage.removeObjects(objects_to_remove);
object_storage.removeObjectsIfExist(objects_to_remove);
}
};
@ -213,8 +218,10 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati
void finalize() override
{
/// Read comment inside RemoveObjectStorageOperation class
/// TL;DR Don't pay any attention to 404 status code
if (!objects_to_remove.empty())
object_storage.removeObjects(objects_to_remove);
object_storage.removeObjectsIfExist(objects_to_remove);
}
};
@ -307,7 +314,9 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
remove_from_remote.insert(remove_from_remote.end(), remote_paths.begin(), remote_paths.end());
}
}
object_storage.removeObjects(remove_from_remote);
/// Read comment inside RemoveObjectStorageOperation class
/// TL;DR Don't pay any attention to 404 status code
object_storage.removeObjectsIfExist(remove_from_remote);
}
}
};
@ -352,8 +361,10 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
void finalize() override
{
/// Read comment inside RemoveObjectStorageOperation class
/// TL;DR Don't pay any attention to 404 status code
if (!objects_to_remove.empty())
object_storage.removeObjects(objects_to_remove);
object_storage.removeObjectsIfExist(objects_to_remove);
}
};

View File

@ -227,7 +227,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if ((create.storage->engine->name == "MaterializeMySQL" || create.storage->engine->name == "MaterializedMySQL")
&& !getContext()->getSettingsRef().allow_experimental_database_materialized_mysql
&& !internal)
&& !internal && !create.attach)
{
throw Exception("MaterializedMySQL is an experimental database engine. "
"Enable allow_experimental_database_materialized_mysql to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
@ -235,7 +235,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (create.storage->engine->name == "Replicated"
&& !getContext()->getSettingsRef().allow_experimental_database_replicated
&& !internal)
&& !internal && !create.attach)
{
throw Exception("Replicated is an experimental database engine. "
"Enable allow_experimental_database_replicated to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
@ -243,7 +243,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (create.storage->engine->name == "MaterializedPostgreSQL"
&& !getContext()->getSettingsRef().allow_experimental_database_materialized_postgresql
&& !internal)
&& !internal && !create.attach)
{
throw Exception("MaterializedPostgreSQL is an experimental database engine. "
"Enable allow_experimental_database_materialized_postgresql to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);

View File

@ -483,6 +483,9 @@ BlockIO InterpreterSystemQuery::execute()
case Type::DROP_REPLICA:
dropReplica(query);
break;
case Type::DROP_DATABASE_REPLICA:
dropDatabaseReplica(query);
break;
case Type::SYNC_REPLICA:
syncReplica(query);
break;
@ -781,6 +784,75 @@ bool InterpreterSystemQuery::dropReplicaImpl(ASTSystemQuery & query, const Stora
return true;
}
void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
{
if (query.replica.empty())
throw Exception("Replica name is empty", ErrorCodes::BAD_ARGUMENTS);
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query)
{
if (!query.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query.replica_zk_path))
return;
if (replicated->getFullReplicaName() != query.replica)
return;
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "There is a local database {}, which has the same path in ZooKeeper "
"and the same replica name. Please check the path in query. "
"If you want to drop replica of this database, use `DROP DATABASE`", replicated->getDatabaseName());
};
if (query.database)
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase());
DatabasePtr database = DatabaseCatalog::instance().getDatabase(query.getDatabase());
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get()))
{
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.replica);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database {} is not Replicated, cannot drop replica", query.getDatabase());
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
else if (query.is_drop_whole_replica)
{
auto databases = DatabaseCatalog::instance().getDatabases();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);
for (auto & elem : databases)
{
DatabasePtr & database = elem.second;
auto * replicated = dynamic_cast<DatabaseReplicated *>(database.get());
if (!replicated)
continue;
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_DROP_REPLICA, elem.first))
{
LOG_INFO(log, "Access {} denied, skipping database {}", "SYSTEM DROP REPLICA", elem.first);
continue;
}
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(replicated, replicated->getZooKeeperPath(), query.replica);
LOG_TRACE(log, "Dropped replica {} of Replicated database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}
else if (!query.replica_zk_path.empty())
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
/// This check is actually redundant, but it may prevent from some user mistakes
for (auto & elem : DatabaseCatalog::instance().getDatabases())
if (auto * replicated = dynamic_cast<DatabaseReplicated *>(elem.second.get()))
check_not_local_replica(replicated, query);
DatabaseReplicated::dropReplica(nullptr, query.replica_zk_path, query.replica);
LOG_INFO(log, "Dropped replica {} of Replicated database with path {}", query.replica, query.replica_zk_path);
}
else
throw Exception("Invalid query", ErrorCodes::LOGICAL_ERROR);
}
void InterpreterSystemQuery::syncReplica(ASTSystemQuery &)
{
getContext()->checkAccess(AccessType::SYSTEM_SYNC_REPLICA, table_id);
@ -981,6 +1053,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
break;
}
case Type::DROP_REPLICA:
case Type::DROP_DATABASE_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_REPLICA, query.getDatabase(), query.getTable());
break;

View File

@ -66,6 +66,7 @@ private:
void dropReplica(ASTSystemQuery & query);
bool dropReplicaImpl(ASTSystemQuery & query, const StoragePtr & table);
void dropDatabaseReplica(ASTSystemQuery & query);
void flushDistributed(ASTSystemQuery & query);
void restartDisk(String & name);

View File

@ -185,7 +185,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
{
print_identifier(database->as<ASTIdentifier>()->name());
}
else if (type == Type::DROP_REPLICA)
else if (type == Type::DROP_REPLICA || type == Type::DROP_DATABASE_REPLICA)
{
print_drop_replica();
}

View File

@ -36,6 +36,7 @@ public:
RESTART_REPLICA,
RESTORE_REPLICA,
DROP_REPLICA,
DROP_DATABASE_REPLICA,
SYNC_REPLICA,
SYNC_DATABASE_REPLICA,
SYNC_TRANSACTION_LOG,

View File

@ -150,6 +150,49 @@ enum class SystemQueryTargetType
return true;
}
[[nodiscard]] static bool parseDropReplica(std::shared_ptr<ASTSystemQuery> & res, IParser::Pos & pos, Expected & expected, bool database)
{
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->replica = ast->as<ASTLiteral &>().value.safeGet<String>();
if (ParserKeyword{"FROM"}.ignore(pos, expected))
{
// way 1. parse replica database
// way 2. parse replica table
// way 3. parse replica zkpath
if (ParserKeyword{"DATABASE"}.ignore(pos, expected))
{
ParserIdentifier database_parser;
if (!database_parser.parse(pos, res->database, expected))
return false;
}
else if (!database && ParserKeyword{"TABLE"}.ignore(pos, expected))
{
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
}
else if (ParserKeyword{"ZKPATH"}.ignore(pos, expected))
{
ASTPtr path_ast;
if (!ParserStringLiteral{}.parse(pos, path_ast, expected))
return false;
String zk_path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
if (!zk_path.empty() && zk_path[zk_path.size() - 1] == '/')
zk_path.pop_back();
res->replica_zk_path = zk_path;
}
else
return false;
}
else
res->is_drop_whole_replica = true;
return true;
}
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
if (!ParserKeyword{"SYSTEM"}.ignore(pos, expected))
@ -194,46 +237,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
return false;
break;
}
case Type::DROP_REPLICA:
{
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
res->replica = ast->as<ASTLiteral &>().value.safeGet<String>();
if (ParserKeyword{"FROM"}.ignore(pos, expected))
{
// way 1. parse replica database
// way 2. parse replica tables
// way 3. parse replica zkpath
if (ParserKeyword{"DATABASE"}.ignore(pos, expected))
{
ParserIdentifier database_parser;
if (!database_parser.parse(pos, res->database, expected))
if (!parseDropReplica(res, pos, expected, /* database */ false))
return false;
break;
}
else if (ParserKeyword{"TABLE"}.ignore(pos, expected))
case Type::DROP_DATABASE_REPLICA:
{
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
}
else if (ParserKeyword{"ZKPATH"}.ignore(pos, expected))
{
ASTPtr path_ast;
if (!ParserStringLiteral{}.parse(pos, path_ast, expected))
if (!parseDropReplica(res, pos, expected, /* database */ true))
return false;
String zk_path = path_ast->as<ASTLiteral &>().value.safeGet<String>();
if (!zk_path.empty() && zk_path[zk_path.size() - 1] == '/')
zk_path.pop_back();
res->replica_zk_path = zk_path;
}
else
return false;
}
else
res->is_drop_whole_replica = true;
break;
}

View File

@ -71,28 +71,22 @@ static bool pollFd(int fd, size_t timeout_milliseconds, int events)
pfd.events = events;
pfd.revents = 0;
Stopwatch watch;
int res;
while (true)
{
Stopwatch watch;
res = poll(&pfd, 1, static_cast<int>(timeout_milliseconds));
if (res < 0)
{
if (errno == EINTR)
{
watch.stop();
timeout_milliseconds -= watch.elapsedMilliseconds();
watch.start();
continue;
}
else
{
if (errno != EINTR)
throwFromErrno("Cannot poll", ErrorCodes::CANNOT_POLL);
}
const auto elapsed = watch.elapsedMilliseconds();
if (timeout_milliseconds <= elapsed)
break;
timeout_milliseconds -= elapsed;
}
else
{
@ -474,7 +468,7 @@ Pipe ShellCommandSourceCoordinator::createPipe(
std::unique_ptr<ShellCommand> process;
std::unique_ptr<ShellCommandHolder> process_holder;
auto destructor_strategy = ShellCommand::DestructorStrategy{true /*terminate_in_destructor*/, configuration.command_termination_timeout_seconds};
auto destructor_strategy = ShellCommand::DestructorStrategy{true /*terminate_in_destructor*/, SIGTERM, configuration.command_termination_timeout_seconds};
command_config.terminate_in_destructor_strategy = destructor_strategy;
bool is_executable_pool = (process_pool != nullptr);

View File

@ -29,111 +29,11 @@ namespace DB
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int MONGODB_CANNOT_AUTHENTICATE;
extern const int UNKNOWN_TYPE;
extern const int MONGODB_ERROR;
}
#if POCO_VERSION < 0x01070800
/// See https://pocoproject.org/forum/viewtopic.php?f=10&t=6326&p=11426&hilit=mongodb+auth#p11485
void authenticate(Poco::MongoDB::Connection & connection, const std::string & database, const std::string & user, const std::string & password)
{
Poco::MongoDB::Database db(database);
/// Challenge-response authentication.
std::string nonce;
/// First step: request nonce.
{
auto command = db.createCommand();
command->setNumberToReturn(1);
command->selector().add<Int32>("getnonce", 1);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*command, response);
if (response.documents().empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned empty response for 'getnonce' command",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
auto doc = response.documents()[0];
try
{
double ok = doc->get<double>("ok", 0);
if (ok != 1)
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
" has field 'ok' missing or having wrong value",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
nonce = doc->get<std::string>("nonce", "");
if (nonce.empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that"
" has field 'nonce' missing or empty",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
catch (Poco::NotFoundException & e)
{
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'getnonce' command that has missing required field: "
+ e.displayText(),
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
}
/// Second step: use nonce to calculate digest and send it back to the server.
/// Digest is hex_md5(n.nonce + username + hex_md5(username + ":mongo:" + password))
{
std::string first = user + ":mongo:" + password;
Poco::MD5Engine md5;
md5.update(first);
std::string digest_first(Poco::DigestEngine::digestToHex(md5.digest()));
std::string second = nonce + user + digest_first;
md5.reset();
md5.update(second);
std::string digest_second(Poco::DigestEngine::digestToHex(md5.digest()));
auto command = db.createCommand();
command->setNumberToReturn(1);
command->selector()
.add<Int32>("authenticate", 1)
.add<std::string>("user", user)
.add<std::string>("nonce", nonce)
.add<std::string>("key", digest_second);
Poco::MongoDB::ResponseMessage response;
connection.sendRequest(*command, response);
if (response.empty())
throw Exception(
"Cannot authenticate in MongoDB: server returned empty response for 'authenticate' command",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
auto doc = response.documents()[0];
try
{
double ok = doc->get<double>("ok", 0);
if (ok != 1)
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that"
" has field 'ok' missing or having wrong value",
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
catch (Poco::NotFoundException & e)
{
throw Exception(
"Cannot authenticate in MongoDB: server returned response for 'authenticate' command that has missing required field: "
+ e.displayText(),
ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
}
}
#endif
std::unique_ptr<Poco::MongoDB::Cursor> createCursor(const std::string & database, const std::string & collection, const Block & sample_block_to_select)
{
auto cursor = std::make_unique<Poco::MongoDB::Cursor>(database, collection);

View File

@ -1836,6 +1836,9 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector &
void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts)
{
if (parts.empty())
return;
{
auto lock = lockParts();
@ -1852,12 +1855,12 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
(*it)->assertState({DataPartState::Deleting});
LOG_DEBUG(log, "Finally removing part from memory {}", part->name);
data_parts_indexes.erase(it);
}
}
LOG_DEBUG(log, "Removing {} parts from memory: Parts: [{}]", parts.size(), fmt::join(parts, ", "));
/// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
/// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
@ -1910,12 +1913,13 @@ void MergeTreeData::flushAllInMemoryPartsIfNeeded()
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);
if (parts_to_remove.empty())
return 0;
clearPartsFromFilesystem(parts_to_remove);
removePartsFinally(parts_to_remove);
/// This is needed to close files to avoid they reside on disk after being deleted.
/// NOTE: we can drop files from cache more selectively but this is good enough.
if (!parts_to_remove.empty())
getContext()->dropMMappedFileCache();
return parts_to_remove.size();
@ -1980,7 +1984,8 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
ThreadPool pool(num_threads);
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
LOG_DEBUG(log, "Removing {} parts from filesystem: {} (concurrently)", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
LOG_DEBUG(
log, "Removing {} parts from filesystem (concurrently): Parts: [{}]", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
for (const DataPartPtr & part : parts_to_remove)
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
@ -2005,7 +2010,8 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
}
else if (!parts_to_remove.empty())
{
LOG_DEBUG(log, "Removing {} parts from filesystem: {}", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
LOG_DEBUG(
log, "Removing {} parts from filesystem (serially): Parts: [{}]", parts_to_remove.size(), fmt::join(parts_to_remove, ", "));
for (const DataPartPtr & part : parts_to_remove)
{
preparePartForRemoval(part)->remove();

View File

@ -72,16 +72,14 @@ void StorageMongoDB::connectIfNotConnected()
auto auth_db = database_name;
if (auth_source != query_params.end())
auth_db = auth_source->second;
#if POCO_VERSION >= 0x01070800
if (!username.empty() && !password.empty())
{
Poco::MongoDB::Database poco_db(auth_db);
if (!poco_db.authenticate(*connection, username, password, Poco::MongoDB::Database::AUTH_SCRAM_SHA1))
throw Exception("Cannot authenticate in MongoDB, incorrect user or password", ErrorCodes::MONGODB_CANNOT_AUTHENTICATE);
}
#else
authenticate(*connection, database_name, username, password);
#endif
authenticated = true;
}
}
@ -213,7 +211,6 @@ StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, C
if (engine_args.size() >= 6)
configuration.options = checkAndGetLiteralArgument<String>(engine_args[5], "database");
}
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));

View File

@ -1010,10 +1010,14 @@ class TestCase:
seconds_left = max(
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
try:
drop_database_query = "DROP DATABASE " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
try:
# It's possible to get an error "New table appeared in database being dropped or detached. Try again."
for _ in range(1, 60):
try:
clickhouse_execute(
args,
drop_database_query,
@ -1022,6 +1026,12 @@ class TestCase:
"log_comment": args.testcase_basename,
},
)
except HTTPError as e:
if need_retry(args, e.message, e.message, 0):
continue
raise
break
except socket.timeout:
total_time = (datetime.now() - start_time).total_seconds()
return (

View File

@ -46,6 +46,22 @@ def get_genuine_zk():
return cluster.get_kazoo_client("zoo1")
# FIXME: this sleep is a workaround for the bug that is fixed by this patch [1].
#
# The problem is that after AUTH_FAILED (that is caused by the line above)
# there can be a race, because of which, stop() will hang indefinitely.
#
# [1]: https://github.com/python-zk/kazoo/pull/688
def zk_auth_failure_workaround():
time.sleep(2)
def zk_stop_and_close(zk):
if zk:
zk.stop()
zk.close()
@pytest.mark.parametrize(("get_zk"), [get_genuine_zk, get_fake_zk])
def test_remove_acl(started_cluster, get_zk):
auth_connection = None
@ -118,13 +134,12 @@ def test_remove_acl(started_cluster, get_zk):
assert acl.acl_list == ["ALL"]
assert acl.perms == 31
finally:
if auth_connection:
auth_connection.stop()
auth_connection.close()
zk_stop_and_close(auth_connection)
@pytest.mark.parametrize(("get_zk"), [get_genuine_zk, get_fake_zk])
def test_digest_auth_basic(started_cluster, get_zk):
try:
auth_connection = None
no_auth_connection = None
@ -163,14 +178,8 @@ def test_digest_auth_basic(started_cluster, get_zk):
with pytest.raises(AuthFailedError):
no_auth_connection.add_auth("world", "anyone")
# FIXME: this sleep is a workaround for the bug that is fixed by this patch [1].
#
# The problem is that after AUTH_FAILED (that is caused by the line above)
# there can be a race, because of which, stop() will hang indefinitely.
#
# [1]: https://github.com/python-zk/kazoo/pull/688
time.sleep(5)
no_auth_connection.stop()
zk_auth_failure_workaround()
zk_stop_and_close(no_auth_connection)
# session became broken, reconnect
no_auth_connection = get_zk()
@ -197,32 +206,30 @@ def test_digest_auth_basic(started_cluster, get_zk):
for path in ["/test_no_acl", "/test_all_acl"]:
no_auth_connection.set(path, b"auth_added")
assert no_auth_connection.get(path)[0] == b"auth_added"
if auth_connection:
auth_connection.stop()
auth_connection.close()
if no_auth_connection:
no_auth_connection.stop()
no_auth_connection.close()
finally:
zk_stop_and_close(auth_connection)
zk_stop_and_close(no_auth_connection)
def test_super_auth(started_cluster):
auth_connection = get_fake_zk()
try:
auth_connection.add_auth("digest", "user1:password1")
auth_connection.create("/test_super_no_acl", b"")
auth_connection.create(
"/test_super_all_acl", b"data", acl=[make_acl("auth", "", all=True)]
)
finally:
zk_stop_and_close(auth_connection)
super_connection = get_fake_zk()
try:
super_connection.add_auth("digest", "super:admin")
for path in ["/test_super_no_acl", "/test_super_all_acl"]:
super_connection.set(path, b"value")
assert super_connection.get(path)[0] == b"value"
finally:
zk_stop_and_close(super_connection)
@pytest.mark.parametrize(("get_zk"), [get_genuine_zk, get_fake_zk])
@ -254,24 +261,15 @@ def test_digest_auth_multiple(started_cluster, get_zk):
assert other_auth_connection.get("/test_multi_all_acl")[0] == b"Y"
finally:
if auth_connection:
auth_connection.stop()
auth_connection.close()
if one_auth_connection:
auth_connection.stop()
auth_connection.close()
if other_auth_connection:
auth_connection.stop()
auth_connection.close()
zk_stop_and_close(auth_connection)
zk_stop_and_close(one_auth_connection)
zk_stop_and_close(other_auth_connection)
@pytest.mark.parametrize(("get_zk"), [get_genuine_zk, get_fake_zk])
def test_partial_auth(started_cluster, get_zk):
auth_connection = None
try:
auth_connection = get_zk()
try:
auth_connection.add_auth("digest", "user1:password1")
auth_connection.create(
@ -372,52 +370,81 @@ def test_partial_auth(started_cluster, get_zk):
with pytest.raises(NoAuthError):
auth_connection.delete("/test_partial_acl_delete/subnode")
finally:
if auth_connection:
auth_connection.stop()
auth_connection.close()
zk_stop_and_close(auth_connection)
def test_bad_auth(started_cluster):
def test_bad_auth_1(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
auth_connection.add_auth("world", "anyone")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_2(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 1")
auth_connection.add_auth("adssagf", "user1:password1")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_3(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 2")
auth_connection.add_auth("digest", "")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_4(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 3")
auth_connection.add_auth("", "user1:password1")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_5(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 4")
auth_connection.add_auth("digest", "user1")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_6(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 5")
auth_connection.add_auth("digest", "user1:password:otherpassword")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_7(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 6")
auth_connection.add_auth("auth", "user1:password")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_8(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(AuthFailedError):
print("Sending 7")
auth_connection.add_auth("world", "somebody")
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_9(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(InvalidACLError):
print("Sending 8")
@ -436,7 +463,11 @@ def test_bad_auth(started_cluster):
)
],
)
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_10(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(InvalidACLError):
print("Sending 9")
@ -455,7 +486,11 @@ def test_bad_auth(started_cluster):
)
],
)
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_11(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(InvalidACLError):
print("Sending 10")
@ -468,7 +503,11 @@ def test_bad_auth(started_cluster):
)
],
)
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_12(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(InvalidACLError):
print("Sending 11")
@ -487,7 +526,11 @@ def test_bad_auth(started_cluster):
)
],
)
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_bad_auth_13(started_cluster):
auth_connection = get_fake_zk()
with pytest.raises(InvalidACLError):
print("Sending 12")
@ -506,9 +549,16 @@ def test_bad_auth(started_cluster):
)
],
)
zk_auth_failure_workaround()
zk_stop_and_close(auth_connection)
def test_auth_snapshot(started_cluster):
connection = None
connection1 = None
connection2 = None
try:
connection = get_fake_zk()
connection.add_auth("digest", "user1:password1")
@ -529,11 +579,14 @@ def test_auth_snapshot(started_cluster):
for i in range(100):
connection.create(
f"/test_snapshot_acl/path{i}", b"data", acl=[make_acl("auth", "", all=True)]
f"/test_snapshot_acl/path{i}",
b"data",
acl=[make_acl("auth", "", all=True)],
)
node.restart_clickhouse()
zk_stop_and_close(connection)
connection = get_fake_zk()
with pytest.raises(NoAuthError):
@ -551,6 +604,7 @@ def test_auth_snapshot(started_cluster):
for i in range(100):
assert connection.get(f"/test_snapshot_acl/path{i}")[0] == b"data"
zk_stop_and_close(connection1)
connection1 = get_fake_zk()
connection1.add_auth("digest", "user2:password2")
@ -559,6 +613,7 @@ def test_auth_snapshot(started_cluster):
with pytest.raises(NoAuthError):
connection1.get("/test_snapshot_acl")
zk_stop_and_close(connection2)
connection2 = get_fake_zk()
assert connection2.get("/test_snapshot_acl2")[0] == b"data"
with pytest.raises(NoAuthError):
@ -566,6 +621,10 @@ def test_auth_snapshot(started_cluster):
with pytest.raises(NoAuthError):
connection2.get("/test_snapshot_acl1")
finally:
zk_stop_and_close(connection)
zk_stop_and_close(connection1)
zk_stop_and_close(connection2)
@pytest.mark.parametrize(("get_zk"), [get_genuine_zk, get_fake_zk])
@ -630,10 +689,5 @@ def test_get_set_acl(started_cluster, get_zk):
"/test_set_get_acl", acls=[make_acl("auth", "", all=True)], version=0
)
finally:
if auth_connection:
auth_connection.stop()
auth_connection.close()
if other_auth_connection:
other_auth_connection.stop()
other_auth_connection.close()
zk_stop_and_close(auth_connection)
zk_stop_and_close(other_auth_connection)

View File

@ -331,8 +331,10 @@ def test_bridge_dies_with_parent(ch_cluster):
)
logging.debug(f"Bridge is running, gdb output:\n{out}")
try:
assert clickhouse_pid is None
assert bridge_pid is None
finally:
instance.start_clickhouse(20)
instance.query("DROP DICTIONARY lib_dict_c")

View File

@ -75,6 +75,7 @@ def test_mysql_ddl_for_mysql_database(started_cluster):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')"
)
@ -122,11 +123,13 @@ def test_clickhouse_ddl_for_mysql_database(started_cluster):
"root", "clickhouse", started_cluster.mysql_ip, started_cluster.mysql_port
)
) as mysql_node:
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
"CREATE TABLE `test_database`.`test_table` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;"
)
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')"
)
@ -157,10 +160,13 @@ def test_clickhouse_dml_for_mysql_database(started_cluster):
"root", "clickhouse", started_cluster.mysql_ip, started_cluster.mysql_port
)
) as mysql_node:
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
"CREATE TABLE `test_database`.`test_table` ( `i``d` int(11) NOT NULL, PRIMARY KEY (`i``d`)) ENGINE=InnoDB;"
)
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', test_database, 'root', 'clickhouse')"
)
@ -193,9 +199,8 @@ def test_clickhouse_join_for_mysql_database(started_cluster):
"root", "clickhouse", started_cluster.mysql_ip, started_cluster.mysql_port
)
) as mysql_node:
mysql_node.query(
"CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'"
)
mysql_node.query("DROP DATABASE IF EXISTS test")
mysql_node.query("CREATE DATABASE test DEFAULT CHARACTER SET 'utf8'")
mysql_node.query(
"CREATE TABLE test.t1_mysql_local ("
"pays VARCHAR(55) DEFAULT 'FRA' NOT NULL,"
@ -209,6 +214,8 @@ def test_clickhouse_join_for_mysql_database(started_cluster):
"opco VARCHAR(5) DEFAULT ''"
")"
)
clickhouse_node.query("DROP TABLE IF EXISTS default.t1_remote_mysql SYNC")
clickhouse_node.query("DROP TABLE IF EXISTS default.t2_remote_mysql SYNC")
clickhouse_node.query(
"CREATE TABLE default.t1_remote_mysql AS mysql('mysql57:3306','test','t1_mysql_local','root','clickhouse')"
)
@ -266,6 +273,7 @@ def test_column_comments_for_mysql_database_engine(started_cluster):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', 'test_database', 'root', 'clickhouse')"
)
@ -298,9 +306,11 @@ def test_data_types_support_level_for_mysql_database_engine(started_cluster):
"root", "clickhouse", started_cluster.mysql_ip, started_cluster.mysql_port
)
) as mysql_node:
mysql_node.query("DROP DATABASE IF EXISTS test")
mysql_node.query(
"CREATE DATABASE IF NOT EXISTS test DEFAULT CHARACTER SET 'utf8'"
)
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MySQL('mysql57:3306', test, 'root', 'clickhouse')",
settings={"mysql_datatypes_support_level": "decimal,datetime64"},

View File

@ -659,8 +659,10 @@ def test_bridge_dies_with_parent(started_cluster):
)
logging.debug(f"Bridge is running, gdb output:\n{out}")
try:
assert clickhouse_pid is None
assert bridge_pid is None
finally:
node1.start_clickhouse(20)

View File

@ -88,3 +88,9 @@ test_with_engine Log
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t2"
# It is not enough to kill the commands running the queries, we also have to kill the queries, the server might be still running
# to avoid the following error:
# Code: 219. DB::Exception: New table appeared in database being dropped or detached. Try again. (DATABASE_NOT_EMPTY)
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE current_database = currentDatabase() SYNC FORMAT Null"

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash
# Tags: no-debug
# Tags: no-debug, no-tsan, no-msan, no-ubsan, no-asan
# ^ because inserting a 50 MB file can be slow.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -126,4 +127,3 @@ DROP TABLE commits;
DROP TABLE file_changes;
DROP TABLE line_changes;
"

View File

@ -0,0 +1,15 @@
t
1
2
2
2
2
2
2
rdb_default 1 1
rdb_default 1 2
2
2
2
t
rdb_default_3 1 1

View File

@ -0,0 +1,47 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
db="rdb_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT -q "system flush logs"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.t as system.query_log" # Suppress style check: current_database=$CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT -q "show tables from $db"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from table t" 2>&1| grep -Fac "SYNTAX_ERROR"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db" 2>&1| grep -Fac "There is a local database"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from zkpath '/test/$CLICKHOUSE_DATABASE/rdb'" 2>&1| grep -Fac "There is a local database"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from zkpath '/test/$CLICKHOUSE_DATABASE/rdb/'" 2>&1| grep -Fac "There is a local database"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from zkpath '/test/$CLICKHOUSE_DATABASE/'" 2>&1| grep -Fac "does not look like a path of Replicated database"
$CLICKHOUSE_CLIENT -q "system drop database replica 's2|r1' from zkpath '/test/$CLICKHOUSE_DATABASE/rdb'" 2>&1| grep -Fac "does not exist"
$CLICKHOUSE_CLIENT -q "system drop database replica 's2/r1' from zkpath '/test/$CLICKHOUSE_DATABASE/rdb'" 2>&1| grep -Fac "Invalid replica name"
db2="${db}_2"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db2 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r2')"
$CLICKHOUSE_CLIENT -q "system sync database replica $db"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num from system.clusters where cluster='$db' order by shard_num, replica_num"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"
$CLICKHOUSE_CLIENT -q "detach database $db2"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r2' from database $db"
$CLICKHOUSE_CLIENT -q "attach database $db2" 2>/dev/null
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db2.t2 as system.query_log" 2>&1| grep -Fac "Database is in readonly mode" # Suppress style check: current_database=$CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT -q "detach database $db"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from zkpath '/test/$CLICKHOUSE_DATABASE/rdb/'"
$CLICKHOUSE_CLIENT -q "attach database $db" 2>/dev/null
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=none -q "create table $db.t2 as system.query_log" 2>&1| grep -Fac "Database is in readonly mode" # Suppress style check: current_database=$CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT -q "show tables from $db"
db3="${db}_3"
$CLICKHOUSE_CLIENT --allow_experimental_database_replicated=1 -q "create database $db3 engine=Replicated('/test/$CLICKHOUSE_DATABASE/rdb', 's1', 'r1')"
$CLICKHOUSE_CLIENT -q "system sync database replica $db3"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num from system.clusters where cluster='$db3'"
$CLICKHOUSE_CLIENT -q "drop database $db"
$CLICKHOUSE_CLIENT -q "drop database $db2"
$CLICKHOUSE_CLIENT -q "drop database $db3"

View File

@ -0,0 +1 @@
2148-06-07 1969-01-01 2105 2105 1969-01-01 10:42:00.000

View File

@ -0,0 +1,5 @@
CREATE TEMPORARY TABLE my_table (col_date Date, col_date32 Date32, col_datetime DateTime('UTC'), col_datetime32 DateTime32('UTC'), col_datetime64 DateTime64);
insert into `my_table` (`col_date`, `col_date32`, `col_datetime`, `col_datetime32`, `col_datetime64`) values (parseDateTime64BestEffort('1969-01-01'), '1969-01-01', parseDateTime64BestEffort('1969-01-01 10:42:00'), parseDateTime64BestEffort('1969-01-01 10:42:00'), parseDateTime64BestEffort('1969-01-01 10:42:00'));
-- The values for Date32 and DateTime64 will be year 1969, while the values of Date, DateTime will contain a value affected by implementation-defined overflow and can be arbitrary.
SELECT col_date, col_date32, toYear(col_datetime), toYear(col_datetime32), col_datetime64 FROM my_table;