diff --git a/docker/test/integration/runner/Dockerfile b/docker/test/integration/runner/Dockerfile index 38d8ed5f223..d6c127c8421 100644 --- a/docker/test/integration/runner/Dockerfile +++ b/docker/test/integration/runner/Dockerfile @@ -47,11 +47,13 @@ ENV TZ=Etc/UTC RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone ENV DOCKER_CHANNEL stable +# Unpin the docker version after the release 24.0.3 is released +# https://github.com/moby/moby/issues/45770#issuecomment-1618255130 RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \ && add-apt-repository "deb https://download.docker.com/linux/ubuntu $(lsb_release -c -s) ${DOCKER_CHANNEL}" \ && apt-get update \ && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \ - docker-ce \ + docker-ce='5:23.*' \ && rm -rf \ /var/lib/apt/lists/* \ /var/cache/debconf \ diff --git a/docker/test/upgrade/run.sh b/docker/test/upgrade/run.sh index 8fd514eaa93..82a88272df9 100644 --- a/docker/test/upgrade/run.sh +++ b/docker/test/upgrade/run.sh @@ -76,7 +76,8 @@ sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-serv # But we still need default disk because some tables loaded only into it sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml \ | sed "s|
s3
|
s3
default|" \ - > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml + > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp +mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml diff --git a/docs/en/engines/table-engines/integrations/odbc.md b/docs/en/engines/table-engines/integrations/odbc.md index e29e56c10b2..71085feb626 100644 --- a/docs/en/engines/table-engines/integrations/odbc.md +++ b/docs/en/engines/table-engines/integrations/odbc.md @@ -54,7 +54,7 @@ $ sudo mysql ``` sql mysql> CREATE USER 'clickhouse'@'localhost' IDENTIFIED BY 'clickhouse'; -mysql> GRANT ALL PRIVILEGES ON *.* TO 'clickhouse'@'clickhouse' WITH GRANT OPTION; +mysql> GRANT ALL PRIVILEGES ON *.* TO 'clickhouse'@'localhost' WITH GRANT OPTION; ``` Then configure the connection in `/etc/odbc.ini`. @@ -66,7 +66,7 @@ DRIVER = /usr/local/lib/libmyodbc5w.so SERVER = 127.0.0.1 PORT = 3306 DATABASE = test -USERNAME = clickhouse +USER = clickhouse PASSWORD = clickhouse ``` @@ -83,6 +83,9 @@ $ isql -v mysqlconn Table in MySQL: ``` text +mysql> CREATE DATABASE test; +Query OK, 1 row affected (0,01 sec) + mysql> CREATE TABLE `test`.`test` ( -> `int_id` INT NOT NULL AUTO_INCREMENT, -> `int_nullable` INT NULL DEFAULT NULL, @@ -91,10 +94,10 @@ mysql> CREATE TABLE `test`.`test` ( -> PRIMARY KEY (`int_id`)); Query OK, 0 rows affected (0,09 sec) -mysql> insert into test (`int_id`, `float`) VALUES (1,2); +mysql> insert into test.test (`int_id`, `float`) VALUES (1,2); Query OK, 1 row affected (0,00 sec) -mysql> select * from test; +mysql> select * from test.test; +------+----------+-----+----------+ | int_id | int_nullable | float | float_nullable | +------+----------+-----+----------+ diff --git a/docs/en/operations/system-tables/index.md b/docs/en/operations/system-tables/index.md index 508419783ef..1b720098fc7 100644 --- a/docs/en/operations/system-tables/index.md +++ b/docs/en/operations/system-tables/index.md @@ -13,6 +13,7 @@ System tables provide information about: - Server states, processes, and environment. - Server’s internal processes. +- Options used when the ClickHouse binary was built. System tables: diff --git a/docs/zh/sql-reference/functions/functions-for-nulls.md b/docs/zh/sql-reference/functions/functions-for-nulls.md index 4dd30970923..b3dca3ac549 100644 --- a/docs/zh/sql-reference/functions/functions-for-nulls.md +++ b/docs/zh/sql-reference/functions/functions-for-nulls.md @@ -192,7 +192,7 @@ SELECT coalesce(mail, phone, CAST(icq,'Nullable(String)')) FROM aBook **返回值** - 如果`x`不为`NULL`,返回非`Nullable`类型的原始值。 -- 如果`x`为`NULL`,返回对应非`Nullable`类型的默认值。 +- 如果`x`为`NULL`,则返回任意值。 **示例** diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index caca7cfb50d..2afcd48dafb 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -8,7 +8,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -50,6 +52,8 @@ #include #include +#include "config.h" + #if defined(FUZZING_MODE) #include #endif @@ -170,6 +174,13 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str return system_database; } +static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context_) +{ + auto databaseCombiner = std::make_shared(name_, context_); + databaseCombiner->registerNextDatabase(std::make_shared(name_, "", context_)); + databaseCombiner->registerNextDatabase(std::make_shared(name_, context_)); + return databaseCombiner; +} /// If path is specified and not empty, will try to setup server environment and load existing metadata void LocalServer::tryInitPath() @@ -669,7 +680,7 @@ void LocalServer::processConfig() * if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons. */ std::string default_database = config().getString("default_database", "_local"); - DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared(default_database, global_context)); + DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context)); global_context->setCurrentDatabase(default_database); applyCmdOptions(global_context); diff --git a/programs/server/dashboard.html b/programs/server/dashboard.html index 951b7db3aa3..ea818e05e31 100644 --- a/programs/server/dashboard.html +++ b/programs/server/dashboard.html @@ -12,7 +12,8 @@ --chart-background: white; --shadow-color: rgba(0, 0, 0, 0.25); --input-shadow-color: rgba(0, 255, 0, 1); - --error-color: white; + --error-color: red; + --auth-error-color: white; --legend-background: rgba(255, 255, 255, 0.75); --title-color: #666; --text-color: black; @@ -258,7 +259,7 @@ width: 60%; padding: .5rem; - color: var(--error-color); + color: var(--auth-error-color); display: flex; flex-flow: row nowrap; @@ -906,9 +907,9 @@ async function draw(idx, chart, url_params, query) { if (error) { const errorMatch = errorMessages.find(({ regex }) => error.match(regex)) - if (errorMatch) { - const match = error.match(errorMatch.regex) - const message = errorMatch.messageFunc(match) + const match = error.match(errorMatch.regex) + const message = errorMatch.messageFunc(match) + if (message) { const authError = new Error(message) throw authError } @@ -930,7 +931,7 @@ async function draw(idx, chart, url_params, query) { let title_div = chart.querySelector('.title'); if (error) { error_div.firstChild.data = error; - title_div.style.display = 'none'; + title_div.style.display = 'none'; error_div.style.display = 'block'; return false; } else { @@ -1019,13 +1020,15 @@ async function drawAll() { firstLoad = false; } else { enableReloadButton(); + enableRunButton(); } - if (!results.includes(false)) { + if (results.includes(true)) { const element = document.querySelector('.inputs'); element.classList.remove('unconnected'); const add = document.querySelector('#add'); add.style.display = 'block'; - } else { + } + else { const charts = document.querySelector('#charts') charts.style.height = '0px'; } @@ -1050,6 +1053,13 @@ function disableReloadButton() { reloadButton.classList.add('disabled') } +function disableRunButton() { + const runButton = document.getElementById('run') + runButton.value = 'Reloading...' + runButton.disabled = true + runButton.classList.add('disabled') +} + function enableReloadButton() { const reloadButton = document.getElementById('reload') reloadButton.value = 'Reload' @@ -1057,11 +1067,19 @@ function enableReloadButton() { reloadButton.classList.remove('disabled') } +function enableRunButton() { + const runButton = document.getElementById('run') + runButton.value = 'Ok' + runButton.disabled = false + runButton.classList.remove('disabled') +} + function reloadAll() { updateParams(); drawAll(); saveState(); - disableReloadButton() + disableReloadButton(); + disableRunButton(); } document.getElementById('params').onsubmit = function(event) { diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 6e989e10f76..1723f85dc16 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -101,9 +101,8 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti add_column("name", "columns", true, suggestion_limit); } - /// FIXME: Forbid this query using new analyzer because of bug https://github.com/ClickHouse/ClickHouse/issues/50669 - /// We should remove this restriction after resolving this bug. - query = "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM (" + query + ") WHERE notEmpty(res) SETTINGS allow_experimental_analyzer=0"; + /// FIXME: This query does not work with the new analyzer because of bug https://github.com/ClickHouse/ClickHouse/issues/50669 + query = "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM (" + query + ") WHERE notEmpty(res)"; return query; } diff --git a/src/Common/OptimizedRegularExpression.cpp b/src/Common/OptimizedRegularExpression.cpp index 5df9ce76098..c542945c78d 100644 --- a/src/Common/OptimizedRegularExpression.cpp +++ b/src/Common/OptimizedRegularExpression.cpp @@ -540,7 +540,7 @@ bool OptimizedRegularExpressionImpl::match(const char * subject, si } } - return re2->Match(StringPieceType(subject, subject_size), 0, subject_size, RegexType::UNANCHORED, nullptr, 0); + return re2->Match({subject, subject_size}, 0, subject_size, RegexType::UNANCHORED, nullptr, 0); } } @@ -585,9 +585,9 @@ bool OptimizedRegularExpressionImpl::match(const char * subject, si return false; } - StringPieceType piece; + std::string_view piece; - if (!RegexType::PartialMatch(StringPieceType(subject, subject_size), *re2, &piece)) + if (!RegexType::PartialMatch({subject, subject_size}, *re2, &piece)) return false; else { @@ -652,10 +652,10 @@ unsigned OptimizedRegularExpressionImpl::match(const char * subject return 0; } - DB::PODArrayWithStackMemory pieces(limit); + DB::PODArrayWithStackMemory pieces(limit); if (!re2->Match( - StringPieceType(subject, subject_size), + {subject, subject_size}, 0, subject_size, RegexType::UNANCHORED, diff --git a/src/Common/OptimizedRegularExpression.h b/src/Common/OptimizedRegularExpression.h index f6b59f0a465..51f1bc200e4 100644 --- a/src/Common/OptimizedRegularExpression.h +++ b/src/Common/OptimizedRegularExpression.h @@ -52,7 +52,6 @@ public: using MatchVec = std::vector; using RegexType = std::conditional_t; - using StringPieceType = std::conditional_t; OptimizedRegularExpressionImpl(const std::string & regexp_, int options = 0); /// NOLINT /// StringSearcher store pointers to required_substring, it must be updated on move. diff --git a/src/Common/SensitiveDataMasker.cpp b/src/Common/SensitiveDataMasker.cpp index 34db78d00fb..b59a4758822 100644 --- a/src/Common/SensitiveDataMasker.cpp +++ b/src/Common/SensitiveDataMasker.cpp @@ -5,7 +5,6 @@ #include #include -#include #include @@ -44,7 +43,7 @@ private: const std::string regexp_string; const RE2 regexp; - const re2::StringPiece replacement; + const std::string_view replacement; #ifndef NDEBUG mutable std::atomic matches_count = 0; diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index 9b0743d89c3..7a602afe7e7 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -67,8 +67,8 @@ ThreadGroup::ThreadGroup() : master_thread_id(CurrentThread::get().thread_id) {} -ThreadStatus::ThreadStatus() - : thread_id{getThreadId()} +ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_) + : thread_id{getThreadId()}, check_current_thread_on_destruction(check_current_thread_on_destruction_) { last_rusage = std::make_unique(); @@ -201,8 +201,11 @@ ThreadStatus::~ThreadStatus() /// Only change current_thread if it's currently being used by this ThreadStatus /// For example, PushingToViews chain creates and deletes ThreadStatus instances while running in the main query thread - if (current_thread == this) + if (check_current_thread_on_destruction) + { + assert(current_thread == this); current_thread = nullptr; + } } void ThreadStatus::updatePerformanceCounters() diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 3b6b947471e..7c8dbdb68bd 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -224,8 +224,10 @@ private: Poco::Logger * log = nullptr; + bool check_current_thread_on_destruction; + public: - ThreadStatus(); + explicit ThreadStatus(bool check_current_thread_on_destruction_ = true); ~ThreadStatus(); ThreadGroupPtr getThreadGroup() const; diff --git a/src/Common/parseGlobs.cpp b/src/Common/parseGlobs.cpp index 07cce38afff..33747f6eece 100644 --- a/src/Common/parseGlobs.cpp +++ b/src/Common/parseGlobs.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -33,14 +32,14 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob std::string escaped_with_globs = buf_for_escaping.str(); static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and "," - re2::StringPiece input(escaped_with_globs); - re2::StringPiece matched; + std::string_view input(escaped_with_globs); + std::string_view matched; std::ostringstream oss_for_replacing; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss_for_replacing.exceptions(std::ios::failbit); size_t current_index = 0; while (RE2::FindAndConsume(&input, enum_or_range, &matched)) { - std::string buffer{matched}; + std::string buffer(matched); oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '('; if (buffer.find(',') == std::string::npos) diff --git a/src/Compression/CompressionCodecLZ4.cpp b/src/Compression/CompressionCodecLZ4.cpp index a39052f80b7..3dbb6be9a99 100644 --- a/src/Compression/CompressionCodecLZ4.cpp +++ b/src/Compression/CompressionCodecLZ4.cpp @@ -42,7 +42,6 @@ private: UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; mutable LZ4::PerformanceStatistics lz4_stat; - ASTPtr codec_desc; }; diff --git a/src/Core/SettingsQuirks.cpp b/src/Core/SettingsQuirks.cpp index 3326f42adf5..37a0f2db3e2 100644 --- a/src/Core/SettingsQuirks.cpp +++ b/src/Core/SettingsQuirks.cpp @@ -1,10 +1,11 @@ +#include #include #include #include #include #include #include -#include + namespace { @@ -71,6 +72,12 @@ void applySettingsQuirks(Settings & settings, Poco::Logger * log) } } +#if defined(THREAD_SANITIZER) + settings.use_hedged_requests.value = false; + if (log) + LOG_WARNING(log, "use_hedged_requests has been disabled for the build with Thread Sanitizer, because they are using fibers, leading to a failed assertion inside TSan"); +#endif + if (!queryProfilerWorks()) { if (settings.query_profiler_real_time_period_ns) diff --git a/src/Databases/DatabaseFactory.cpp b/src/Databases/DatabaseFactory.cpp index e1c8afa52c0..9d90c61bb41 100644 --- a/src/Databases/DatabaseFactory.cpp +++ b/src/Databases/DatabaseFactory.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,14 @@ #include #endif +#if USE_AWS_S3 +#include +#endif + +#if USE_HDFS +#include +#endif + namespace fs = std::filesystem; namespace DB @@ -131,13 +140,13 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String static const std::unordered_set database_engines{"Ordinary", "Atomic", "Memory", "Dictionary", "Lazy", "Replicated", "MySQL", "MaterializeMySQL", "MaterializedMySQL", - "PostgreSQL", "MaterializedPostgreSQL", "SQLite"}; + "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; if (!database_engines.contains(engine_name)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine name `{}` does not exist", engine_name); static const std::unordered_set engines_with_arguments{"MySQL", "MaterializeMySQL", "MaterializedMySQL", - "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite"}; + "Lazy", "Replicated", "PostgreSQL", "MaterializedPostgreSQL", "SQLite", "Filesystem", "S3", "HDFS"}; static const std::unordered_set engines_with_table_overrides{"MaterializeMySQL", "MaterializedMySQL", "MaterializedPostgreSQL"}; bool engine_may_have_arguments = engines_with_arguments.contains(engine_name); @@ -432,6 +441,63 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String } #endif + else if (engine_name == "Filesystem") + { + const ASTFunction * engine = engine_define->engine; + + /// If init_path is empty, then the current path will be used + std::string init_path; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filesystem database requires at most 1 argument: filesystem_path"); + + const auto & arguments = engine->arguments->children; + init_path = safeGetLiteralValue(arguments[0], engine_name); + } + + return std::make_shared(database_name, init_path, context); + } + +#if USE_AWS_S3 + else if (engine_name == "S3") + { + const ASTFunction * engine = engine_define->engine; + + DatabaseS3::Configuration config; + + if (engine->arguments && !engine->arguments->children.empty()) + { + ASTs & engine_args = engine->arguments->children; + config = DatabaseS3::parseArguments(engine_args, context); + } + + return std::make_shared(database_name, config, context); + } +#endif + +#if USE_HDFS + else if (engine_name == "HDFS") + { + const ASTFunction * engine = engine_define->engine; + + /// If source_url is empty, then table name must contain full url + std::string source_url; + + if (engine->arguments && !engine->arguments->children.empty()) + { + if (engine->arguments->children.size() != 1) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS database requires at most 1 argument: source_url"); + + const auto & arguments = engine->arguments->children; + source_url = safeGetLiteralValue(arguments[0], engine_name); + } + + return std::make_shared(database_name, source_url, context); + } +#endif + throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", engine_name); } diff --git a/src/Databases/DatabaseFilesystem.cpp b/src/Databases/DatabaseFilesystem.cpp new file mode 100644 index 00000000000..7eaf474eea0 --- /dev/null +++ b/src/Databases/DatabaseFilesystem.cpp @@ -0,0 +1,245 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace fs = std::filesystem; + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int UNKNOWN_TABLE; + extern const int PATH_ACCESS_DENIED; + extern const int BAD_ARGUMENTS; + extern const int FILE_DOESNT_EXIST; +} + +DatabaseFilesystem::DatabaseFilesystem(const String & name_, const String & path_, ContextPtr context_) + : IDatabase(name_), WithContext(context_->getGlobalContext()), path(path_), log(&Poco::Logger::get("DatabaseFileSystem(" + name_ + ")")) +{ + bool is_local = context_->getApplicationType() == Context::ApplicationType::LOCAL; + fs::path user_files_path = is_local ? "" : fs::canonical(getContext()->getUserFilesPath()); + + if (fs::path(path).is_relative()) + { + path = user_files_path / path; + } + else if (!is_local && !pathStartsWith(fs::path(path), user_files_path)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Path must be inside user-files path: {}", user_files_path.string()); + } + + path = fs::absolute(path).lexically_normal(); + if (!fs::exists(path)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path does not exist: {}", path); +} + +std::string DatabaseFilesystem::getTablePath(const std::string & table_name) const +{ + fs::path table_path = fs::path(path) / table_name; + return table_path.lexically_normal().string(); +} + +void DatabaseFilesystem::addTable(const std::string & table_name, StoragePtr table_storage) const +{ + std::lock_guard lock(mutex); + auto [_, inserted] = loaded_tables.emplace(table_name, table_storage); + if (!inserted) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Table with name `{}` already exists in database `{}` (engine {})", + table_name, getDatabaseName(), getEngineName()); +} + +bool DatabaseFilesystem::checkTableFilePath(const std::string & table_path, ContextPtr context_, bool throw_on_error) const +{ + /// If run in Local mode, no need for path checking. + bool check_path = context_->getApplicationType() != Context::ApplicationType::LOCAL; + const auto & user_files_path = context_->getUserFilesPath(); + + /// Check access for file before checking its existence. + if (check_path && !fileOrSymlinkPathStartsWith(table_path, user_files_path)) + { + if (throw_on_error) + throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File is not inside {}", user_files_path); + else + return false; + } + + /// Check if the corresponding file exists. + if (!fs::exists(table_path)) + { + if (throw_on_error) + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File does not exist: {}", table_path); + else + return false; + } + + if (!fs::is_regular_file(table_path)) + { + if (throw_on_error) + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, + "File is directory, but expected a file: {}", table_path); + else + return false; + } + + return true; +} + +StoragePtr DatabaseFilesystem::tryGetTableFromCache(const std::string & name) const +{ + StoragePtr table = nullptr; + { + std::lock_guard lock(mutex); + auto it = loaded_tables.find(name); + if (it != loaded_tables.end()) + table = it->second; + } + + /// Invalidate cache if file no longer exists. + if (table && !fs::exists(getTablePath(name))) + { + std::lock_guard lock(mutex); + loaded_tables.erase(name); + return nullptr; + } + + return table; +} + +bool DatabaseFilesystem::isTableExist(const String & name, ContextPtr context_) const +{ + if (tryGetTableFromCache(name)) + return true; + + return checkTableFilePath(getTablePath(name), context_, /* throw_on_error */false); +} + +StoragePtr DatabaseFilesystem::getTableImpl(const String & name, ContextPtr context_) const +{ + /// Check if table exists in loaded tables map. + if (auto table = tryGetTableFromCache(name)) + return table; + + auto table_path = getTablePath(name); + checkTableFilePath(table_path, context_, /* throw_on_error */true); + + /// If the file exists, create a new table using TableFunctionFile and return it. + auto args = makeASTFunction("file", std::make_shared(table_path)); + + auto table_function = TableFunctionFactory::instance().get(args, context_); + if (!table_function) + return nullptr; + + /// TableFunctionFile throws exceptions, if table cannot be created. + auto table_storage = table_function->execute(args, context_, name); + if (table_storage) + addTable(name, table_storage); + + return table_storage; +} + +StoragePtr DatabaseFilesystem::getTable(const String & name, ContextPtr context_) const +{ + /// getTableImpl can throw exceptions, do not catch them to show correct error to user. + if (auto storage = getTableImpl(name, context_)) + return storage; + + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", + backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name)); +} + +StoragePtr DatabaseFilesystem::tryGetTable(const String & name, ContextPtr context_) const +{ + try + { + return getTableImpl(name, context_); + } + catch (const Exception & e) + { + /// Ignore exceptions thrown by TableFunctionFile, which indicate that there is no table + /// see tests/02722_database_filesystem.sh for more details. + if (e.code() == ErrorCodes::FILE_DOESNT_EXIST) + { + return nullptr; + } + throw; + } +} + +bool DatabaseFilesystem::empty() const +{ + std::lock_guard lock(mutex); + return loaded_tables.empty(); +} + +ASTPtr DatabaseFilesystem::getCreateDatabaseQuery() const +{ + const auto & settings = getContext()->getSettingsRef(); + const String query = fmt::format("CREATE DATABASE {} ENGINE = Filesystem('{}')", backQuoteIfNeed(getDatabaseName()), path); + + ParserCreateQuery parser; + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth); + + if (const auto database_comment = getDatabaseComment(); !database_comment.empty()) + { + auto & ast_create_query = ast->as(); + ast_create_query.set(ast_create_query.comment, std::make_shared(database_comment)); + } + + return ast; +} + +void DatabaseFilesystem::shutdown() +{ + Tables tables_snapshot; + { + std::lock_guard lock(mutex); + tables_snapshot = loaded_tables; + } + + for (const auto & kv : tables_snapshot) + { + auto table_id = kv.second->getStorageID(); + kv.second->flushAndShutdown(); + } + + std::lock_guard lock(mutex); + loaded_tables.clear(); +} + +/** + * Returns an empty vector because the database is read-only and no tables can be backed up + */ +std::vector> DatabaseFilesystem::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const +{ + return {}; +} + +/** + * + * Returns an empty iterator because the database does not have its own tables + * But only caches them for quick access + */ +DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, const FilterByNameFunction &) const +{ + return std::make_unique(Tables{}, getDatabaseName()); +} + +} diff --git a/src/Databases/DatabaseFilesystem.h b/src/Databases/DatabaseFilesystem.h new file mode 100644 index 00000000000..7fe620401dc --- /dev/null +++ b/src/Databases/DatabaseFilesystem.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; + +/** + * DatabaseFilesystem allows to interact with files stored on the local filesystem. + * Uses TableFunctionFile to implicitly load file when a user requests the table, + * and provides a read-only access to the data in the file. + * Tables are cached inside the database for quick access + * + * Used in clickhouse-local to access local files. + * For clickhouse-server requires allows to access file only from user_files directory. + */ +class DatabaseFilesystem : public IDatabase, protected WithContext +{ +public: + DatabaseFilesystem(const String & name, const String & path, ContextPtr context); + + String getEngineName() const override { return "Filesystem"; } + + bool isTableExist(const String & name, ContextPtr context) const override; + + StoragePtr getTable(const String & name, ContextPtr context) const override; + + StoragePtr tryGetTable(const String & name, ContextPtr context) const override; + + bool shouldBeEmptyOnDetach() const override { return false; } /// Contains only temporary tables. + + bool empty() const override; + + bool isReadOnly() const override { return true; } + + ASTPtr getCreateDatabaseQuery() const override; + + void shutdown() override; + + std::vector> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override; + + DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override; + +protected: + StoragePtr getTableImpl(const String & name, ContextPtr context) const; + + StoragePtr tryGetTableFromCache(const std::string & name) const; + + std::string getTablePath(const std::string & table_name) const; + + void addTable(const std::string & table_name, StoragePtr table_storage) const; + + bool checkTableFilePath(const std::string & table_path, ContextPtr context_, bool throw_on_error) const; + +private: + String path; + mutable Tables loaded_tables TSA_GUARDED_BY(mutex); + Poco::Logger * log; +}; + +} diff --git a/src/Databases/DatabaseHDFS.cpp b/src/Databases/DatabaseHDFS.cpp new file mode 100644 index 00000000000..1a0145b9015 --- /dev/null +++ b/src/Databases/DatabaseHDFS.cpp @@ -0,0 +1,234 @@ +#include "config.h" + +#if USE_HDFS + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace fs = std::filesystem; + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int UNKNOWN_TABLE; + extern const int BAD_ARGUMENTS; + extern const int FILE_DOESNT_EXIST; + extern const int UNACCEPTABLE_URL; + extern const int ACCESS_DENIED; + extern const int DATABASE_ACCESS_DENIED; + extern const int HDFS_ERROR; + extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; +} + +static constexpr std::string_view HDFS_HOST_REGEXP = "^hdfs://[^/]*"; + + +DatabaseHDFS::DatabaseHDFS(const String & name_, const String & source_url, ContextPtr context_) + : IDatabase(name_) + , WithContext(context_->getGlobalContext()) + , source(source_url) + , log(&Poco::Logger::get("DatabaseHDFS(" + name_ + ")")) +{ + if (!source.empty()) + { + if (!re2::RE2::FullMatch(source, std::string(HDFS_HOST_REGEXP))) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs host: {}. " + "It should have structure 'hdfs://:'", source); + + context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI(source)); + } +} + +void DatabaseHDFS::addTable(const std::string & table_name, StoragePtr table_storage) const +{ + std::lock_guard lock(mutex); + auto [_, inserted] = loaded_tables.emplace(table_name, table_storage); + if (!inserted) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Table with name `{}` already exists in database `{}` (engine {})", + table_name, getDatabaseName(), getEngineName()); +} + +std::string DatabaseHDFS::getTablePath(const std::string & table_name) const +{ + if (table_name.starts_with("hdfs://")) + return table_name; + + if (source.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}. " + "It should have structure 'hdfs://:/path'", table_name); + + return fs::path(source) / table_name; +} + +bool DatabaseHDFS::checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const +{ + try + { + checkHDFSURL(url); + context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI(url)); + } + catch (...) + { + if (throw_on_error) + throw; + return false; + } + + return true; +} + +bool DatabaseHDFS::isTableExist(const String & name, ContextPtr context_) const +{ + std::lock_guard lock(mutex); + if (loaded_tables.find(name) != loaded_tables.end()) + return true; + + return checkUrl(name, context_, false); +} + +StoragePtr DatabaseHDFS::getTableImpl(const String & name, ContextPtr context_) const +{ + /// Check if the table exists in the loaded tables map. + { + std::lock_guard lock(mutex); + auto it = loaded_tables.find(name); + if (it != loaded_tables.end()) + return it->second; + } + + auto url = getTablePath(name); + + checkUrl(url, context_, true); + + auto args = makeASTFunction("hdfs", std::make_shared(url)); + + auto table_function = TableFunctionFactory::instance().get(args, context_); + if (!table_function) + return nullptr; + + /// TableFunctionHDFS throws exceptions, if table cannot be created. + auto table_storage = table_function->execute(args, context_, name); + if (table_storage) + addTable(name, table_storage); + + return table_storage; +} + +StoragePtr DatabaseHDFS::getTable(const String & name, ContextPtr context_) const +{ + /// Rethrow all exceptions from TableFunctionHDFS to show correct error to user. + if (auto storage = getTableImpl(name, context_)) + return storage; + + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", + backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name)); +} + +StoragePtr DatabaseHDFS::tryGetTable(const String & name, ContextPtr context_) const +{ + try + { + return getTableImpl(name, context_); + } + catch (const Exception & e) + { + // Ignore exceptions thrown by TableFunctionHDFS, which indicate that there is no table + if (e.code() == ErrorCodes::BAD_ARGUMENTS + || e.code() == ErrorCodes::ACCESS_DENIED + || e.code() == ErrorCodes::DATABASE_ACCESS_DENIED + || e.code() == ErrorCodes::FILE_DOESNT_EXIST + || e.code() == ErrorCodes::UNACCEPTABLE_URL + || e.code() == ErrorCodes::HDFS_ERROR + || e.code() == ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE) + { + return nullptr; + } + throw; + } + catch (const Poco::URISyntaxException &) + { + return nullptr; + } +} + +bool DatabaseHDFS::empty() const +{ + std::lock_guard lock(mutex); + return loaded_tables.empty(); +} + +ASTPtr DatabaseHDFS::getCreateDatabaseQuery() const +{ + const auto & settings = getContext()->getSettingsRef(); + ParserCreateQuery parser; + + const String query = fmt::format("CREATE DATABASE {} ENGINE = HDFS('{}')", backQuoteIfNeed(getDatabaseName()), source); + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth); + + if (const auto database_comment = getDatabaseComment(); !database_comment.empty()) + { + auto & ast_create_query = ast->as(); + ast_create_query.set(ast_create_query.comment, std::make_shared(database_comment)); + } + + return ast; +} + +void DatabaseHDFS::shutdown() +{ + Tables tables_snapshot; + { + std::lock_guard lock(mutex); + tables_snapshot = loaded_tables; + } + + for (const auto & kv : tables_snapshot) + { + auto table_id = kv.second->getStorageID(); + kv.second->flushAndShutdown(); + } + + std::lock_guard lock(mutex); + loaded_tables.clear(); +} + +/** + * Returns an empty vector because the database is read-only and no tables can be backed up + */ +std::vector> DatabaseHDFS::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const +{ + return {}; +} + +/** + * + * Returns an empty iterator because the database does not have its own tables + * But only caches them for quick access + */ +DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const FilterByNameFunction &) const +{ + return std::make_unique(Tables{}, getDatabaseName()); +} + +} // DB + +#endif diff --git a/src/Databases/DatabaseHDFS.h b/src/Databases/DatabaseHDFS.h new file mode 100644 index 00000000000..957b2080135 --- /dev/null +++ b/src/Databases/DatabaseHDFS.h @@ -0,0 +1,68 @@ +#pragma once + +#include "config.h" + +#if USE_HDFS + +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; + +/** + * DatabaseHDFS allows to interact with files stored on the file system. + * Uses TableFunctionHDFS to implicitly load file when a user requests the table, + * and provides read-only access to the data in the file. + * Tables are cached inside the database for quick access. + */ +class DatabaseHDFS : public IDatabase, protected WithContext +{ +public: + DatabaseHDFS(const String & name, const String & source_url, ContextPtr context); + + String getEngineName() const override { return "S3"; } + + bool isTableExist(const String & name, ContextPtr context) const override; + + StoragePtr getTable(const String & name, ContextPtr context) const override; + + StoragePtr tryGetTable(const String & name, ContextPtr context) const override; + + bool shouldBeEmptyOnDetach() const override { return false; } /// Contains only temporary tables. + + bool empty() const override; + + bool isReadOnly() const override { return true; } + + ASTPtr getCreateDatabaseQuery() const override; + + void shutdown() override; + + std::vector> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override; + DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override; + +protected: + StoragePtr getTableImpl(const String & name, ContextPtr context) const; + + void addTable(const std::string & table_name, StoragePtr table_storage) const; + + bool checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const; + + std::string getTablePath(const std::string & table_name) const; + +private: + const String source; + + mutable Tables loaded_tables TSA_GUARDED_BY(mutex); + Poco::Logger * log; +}; + +} + +#endif diff --git a/src/Databases/DatabaseS3.cpp b/src/Databases/DatabaseS3.cpp new file mode 100644 index 00000000000..11655f5f100 --- /dev/null +++ b/src/Databases/DatabaseS3.cpp @@ -0,0 +1,312 @@ +#include "config.h" + +#if USE_AWS_S3 + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace fs = std::filesystem; + +namespace DB +{ + +static const std::unordered_set optional_configuration_keys = { + "url", + "access_key_id", + "secret_access_key", + "no_sign_request" +}; + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int UNKNOWN_TABLE; + extern const int BAD_ARGUMENTS; + extern const int FILE_DOESNT_EXIST; + extern const int UNACCEPTABLE_URL; + extern const int S3_ERROR; + + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +DatabaseS3::DatabaseS3(const String & name_, const Configuration& config_, ContextPtr context_) + : IDatabase(name_) + , WithContext(context_->getGlobalContext()) + , config(config_) + , log(&Poco::Logger::get("DatabaseS3(" + name_ + ")")) +{ +} + +void DatabaseS3::addTable(const std::string & table_name, StoragePtr table_storage) const +{ + std::lock_guard lock(mutex); + auto [_, inserted] = loaded_tables.emplace(table_name, table_storage); + if (!inserted) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Table with name `{}` already exists in database `{}` (engine {})", + table_name, getDatabaseName(), getEngineName()); +} + +std::string DatabaseS3::getFullUrl(const std::string & name) const +{ + if (!config.url_prefix.empty()) + return fs::path(config.url_prefix) / name; + + return name; +} + +bool DatabaseS3::checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const +{ + try + { + S3::URI uri(url); + context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri.uri); + } + catch (...) + { + if (throw_on_error) + throw; + return false; + } + return true; +} + +bool DatabaseS3::isTableExist(const String & name, ContextPtr context_) const +{ + std::lock_guard lock(mutex); + if (loaded_tables.find(name) != loaded_tables.end()) + return true; + + return checkUrl(getFullUrl(name), context_, false); +} + +StoragePtr DatabaseS3::getTableImpl(const String & name, ContextPtr context_) const +{ + /// Check if the table exists in the loaded tables map. + { + std::lock_guard lock(mutex); + auto it = loaded_tables.find(name); + if (it != loaded_tables.end()) + return it->second; + } + + auto url = getFullUrl(name); + checkUrl(url, context_, /* throw_on_error */true); + + auto function = std::make_shared(); + function->name = "s3"; + function->arguments = std::make_shared(); + function->children.push_back(function->arguments); + + function->arguments->children.push_back(std::make_shared(url)); + if (config.no_sign_request) + { + function->arguments->children.push_back(std::make_shared("NOSIGN")); + } + else if (config.access_key_id.has_value() && config.secret_access_key.has_value()) + { + function->arguments->children.push_back(std::make_shared(config.access_key_id.value())); + function->arguments->children.push_back(std::make_shared(config.secret_access_key.value())); + } + + auto table_function = TableFunctionFactory::instance().get(function, context_); + if (!table_function) + return nullptr; + + /// TableFunctionS3 throws exceptions, if table cannot be created. + auto table_storage = table_function->execute(function, context_, name); + if (table_storage) + addTable(name, table_storage); + + return table_storage; +} + +StoragePtr DatabaseS3::getTable(const String & name, ContextPtr context_) const +{ + /// Rethrow all exceptions from TableFunctionS3 to show correct error to user. + if (auto storage = getTableImpl(name, context_)) + return storage; + + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", + backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name)); +} + +StoragePtr DatabaseS3::tryGetTable(const String & name, ContextPtr context_) const +{ + try + { + return getTableImpl(name, context_); + } + catch (const Exception & e) + { + /// Ignore exceptions thrown by TableFunctionS3, which indicate that there is no table. + if (e.code() == ErrorCodes::BAD_ARGUMENTS + || e.code() == ErrorCodes::S3_ERROR + || e.code() == ErrorCodes::FILE_DOESNT_EXIST + || e.code() == ErrorCodes::UNACCEPTABLE_URL) + { + return nullptr; + } + throw; + } + catch (const Poco::URISyntaxException &) + { + return nullptr; + } +} + +bool DatabaseS3::empty() const +{ + std::lock_guard lock(mutex); + return loaded_tables.empty(); +} + +ASTPtr DatabaseS3::getCreateDatabaseQuery() const +{ + const auto & settings = getContext()->getSettingsRef(); + ParserCreateQuery parser; + + std::string creation_args; + creation_args += fmt::format("'{}'", config.url_prefix); + if (config.no_sign_request) + creation_args += ", 'NOSIGN'"; + else if (config.access_key_id.has_value() && config.secret_access_key.has_value()) + creation_args += fmt::format(", '{}', '{}'", config.access_key_id.value(), config.secret_access_key.value()); + + const String query = fmt::format("CREATE DATABASE {} ENGINE = S3({})", backQuoteIfNeed(getDatabaseName()), creation_args); + ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth); + + if (const auto database_comment = getDatabaseComment(); !database_comment.empty()) + { + auto & ast_create_query = ast->as(); + ast_create_query.set(ast_create_query.comment, std::make_shared(database_comment)); + } + + return ast; +} + +void DatabaseS3::shutdown() +{ + Tables tables_snapshot; + { + std::lock_guard lock(mutex); + tables_snapshot = loaded_tables; + } + + for (const auto & kv : tables_snapshot) + { + auto table_id = kv.second->getStorageID(); + kv.second->flushAndShutdown(); + } + + std::lock_guard lock(mutex); + loaded_tables.clear(); +} + +DatabaseS3::Configuration DatabaseS3::parseArguments(ASTs engine_args, ContextPtr context_) +{ + Configuration result; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context_)) + { + auto & collection = *named_collection; + + validateNamedCollection(collection, {}, optional_configuration_keys); + + result.url_prefix = collection.getOrDefault("url", ""); + result.no_sign_request = collection.getOrDefault("no_sign_request", false); + + auto key_id = collection.getOrDefault("access_key_id", ""); + auto secret_key = collection.getOrDefault("secret_access_key", ""); + + if (!key_id.empty()) + result.access_key_id = key_id; + + if (!secret_key.empty()) + result.secret_access_key = secret_key; + } + else + { + const std::string supported_signature = + " - S3()\n" + " - S3('url')\n" + " - S3('url', 'NOSIGN')\n" + " - S3('url', 'access_key_id', 'secret_access_key')\n"; + const auto error_message = + fmt::format("Engine DatabaseS3 must have the following arguments signature\n{}", supported_signature); + + for (auto & arg : engine_args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context_); + + if (engine_args.size() > 3) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message.c_str()); + + if (engine_args.empty()) + return result; + + result.url_prefix = checkAndGetLiteralArgument(engine_args[0], "url"); + + // url, NOSIGN + if (engine_args.size() == 2) + { + auto second_arg = checkAndGetLiteralArgument(engine_args[1], "NOSIGN"); + if (boost::iequals(second_arg, "NOSIGN")) + result.no_sign_request = true; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, error_message.c_str()); + } + + // url, access_key_id, secret_access_key + if (engine_args.size() == 3) + { + auto key_id = checkAndGetLiteralArgument(engine_args[1], "access_key_id"); + auto secret_key = checkAndGetLiteralArgument(engine_args[2], "secret_access_key"); + + if (key_id.empty() || secret_key.empty() || boost::iequals(key_id, "NOSIGN")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, error_message.c_str()); + + result.access_key_id = key_id; + result.secret_access_key = secret_key; + } + } + + return result; +} + +/** + * Returns an empty vector because the database is read-only and no tables can be backed up + */ +std::vector> DatabaseS3::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const +{ + return {}; +} + +/** + * + * Returns an empty iterator because the database does not have its own tables + * But only caches them for quick access + */ +DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const FilterByNameFunction &) const +{ + return std::make_unique(Tables{}, getDatabaseName()); +} + +} + +#endif diff --git a/src/Databases/DatabaseS3.h b/src/Databases/DatabaseS3.h new file mode 100644 index 00000000000..8297ae4e02d --- /dev/null +++ b/src/Databases/DatabaseS3.h @@ -0,0 +1,81 @@ +#pragma once + +#include "config.h" + +#if USE_AWS_S3 + +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; + +/** + * DatabaseS3 provides access to data stored in S3. + * Uses TableFunctionS3 to implicitly load file when a user requests the table, + * and provides read-only access to the data in the file. + * Tables are cached inside the database for quick access. + */ +class DatabaseS3 : public IDatabase, protected WithContext +{ +public: + struct Configuration + { + std::string url_prefix; + + bool no_sign_request = false; + + std::optional access_key_id; + std::optional secret_access_key; + }; + + DatabaseS3(const String & name, const Configuration& config, ContextPtr context); + + String getEngineName() const override { return "S3"; } + + bool isTableExist(const String & name, ContextPtr context) const override; + + StoragePtr getTable(const String & name, ContextPtr context) const override; + + StoragePtr tryGetTable(const String & name, ContextPtr context) const override; + + // Contains only temporary tables + bool shouldBeEmptyOnDetach() const override { return false; } + + bool empty() const override; + + bool isReadOnly() const override { return true; } + + ASTPtr getCreateDatabaseQuery() const override; + + void shutdown() override; + + std::vector> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override; + DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override; + + static Configuration parseArguments(ASTs engine_args, ContextPtr context); + +protected: + StoragePtr getTableImpl(const String & name, ContextPtr context) const; + + void addTable(const std::string & table_name, StoragePtr table_storage) const; + + bool checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const; + + std::string getFullUrl(const std::string & name) const; + +private: + const Configuration config; + + mutable Tables loaded_tables TSA_GUARDED_BY(mutex); + Poco::Logger * log; +}; + +} + +#endif diff --git a/src/Databases/DatabasesOverlay.cpp b/src/Databases/DatabasesOverlay.cpp new file mode 100644 index 00000000000..b44a9798072 --- /dev/null +++ b/src/Databases/DatabasesOverlay.cpp @@ -0,0 +1,266 @@ +#include + +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int CANNOT_GET_CREATE_TABLE_QUERY; +} + +DatabasesOverlay::DatabasesOverlay(const String & name_, ContextPtr context_) + : IDatabase(name_), WithContext(context_->getGlobalContext()), log(&Poco::Logger::get("DatabaseOverlay(" + name_ + ")")) +{ +} + +DatabasesOverlay & DatabasesOverlay::registerNextDatabase(DatabasePtr database) +{ + databases.push_back(std::move(database)); + return *this; +} + +bool DatabasesOverlay::isTableExist(const String & table_name, ContextPtr context_) const +{ + for (const auto & db : databases) + { + if (db->isTableExist(table_name, context_)) + return true; + } + return false; +} + +StoragePtr DatabasesOverlay::tryGetTable(const String & table_name, ContextPtr context_) const +{ + StoragePtr result = nullptr; + for (const auto & db : databases) + { + result = db->tryGetTable(table_name, context_); + if (result) + break; + } + return result; +} + +void DatabasesOverlay::createTable(ContextPtr context_, const String & table_name, const StoragePtr & table, const ASTPtr & query) +{ + for (auto & db : databases) + { + if (!db->isReadOnly()) + { + db->createTable(context_, table_name, table, query); + return; + } + } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "There is no databases for CREATE TABLE `{}` query in database `{}` (engine {})", + table_name, + getDatabaseName(), + getEngineName()); +} + +void DatabasesOverlay::dropTable(ContextPtr context_, const String & table_name, bool sync) +{ + for (auto & db : databases) + { + if (db->isTableExist(table_name, context_)) + { + db->dropTable(context_, table_name, sync); + return; + } + } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "There is no databases for DROP TABLE `{}` query in database `{}` (engine {})", + table_name, + getDatabaseName(), + getEngineName()); +} + +void DatabasesOverlay::attachTable( + ContextPtr context_, const String & table_name, const StoragePtr & table, const String & relative_table_path) +{ + for (auto & db : databases) + { + try + { + db->attachTable(context_, table_name, table, relative_table_path); + return; + } + catch (...) + { + continue; + } + } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "There is no databases for ATTACH TABLE `{}` query in database `{}` (engine {})", + table_name, + getDatabaseName(), + getEngineName()); +} + +StoragePtr DatabasesOverlay::detachTable(ContextPtr context_, const String & table_name) +{ + StoragePtr result = nullptr; + for (auto & db : databases) + { + if (db->isTableExist(table_name, context_)) + return db->detachTable(context_, table_name); + } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "There is no databases for DETACH TABLE `{}` query in database `{}` (engine {})", + table_name, + getDatabaseName(), + getEngineName()); +} + +ASTPtr DatabasesOverlay::getCreateTableQueryImpl(const String & name, ContextPtr context_, bool throw_on_error) const +{ + ASTPtr result = nullptr; + for (const auto & db : databases) + { + result = db->tryGetCreateTableQuery(name, context_); + if (result) + break; + } + if (!result && throw_on_error) + throw Exception( + ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, + "There is no metadata of table `{}` in database `{}` (engine {})", + name, + getDatabaseName(), + getEngineName()); + return result; +} + +/* + * DatabaseOverlay cannot be constructed by "CREATE DATABASE" query, as it is not a traditional ClickHouse database + * To use DatabaseOverlay, it must be constructed programmatically in code + */ +ASTPtr DatabasesOverlay::getCreateDatabaseQuery() const +{ + return std::make_shared(); +} + +String DatabasesOverlay::getTableDataPath(const String & table_name) const +{ + String result; + for (const auto & db : databases) + { + result = db->getTableDataPath(table_name); + if (!result.empty()) + break; + } + return result; +} + +String DatabasesOverlay::getTableDataPath(const ASTCreateQuery & query) const +{ + String result; + for (const auto & db : databases) + { + result = db->getTableDataPath(query); + if (!result.empty()) + break; + } + return result; +} + +UUID DatabasesOverlay::tryGetTableUUID(const String & table_name) const +{ + UUID result = UUIDHelpers::Nil; + for (const auto & db : databases) + { + result = db->tryGetTableUUID(table_name); + if (result != UUIDHelpers::Nil) + break; + } + return result; +} + +void DatabasesOverlay::drop(ContextPtr context_) +{ + for (auto & db : databases) + db->drop(context_); +} + +void DatabasesOverlay::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) +{ + for (auto & db : databases) + { + if (!db->isReadOnly() && db->isTableExist(table_id.table_name, local_context)) + { + db->alterTable(local_context, table_id, metadata); + return; + } + } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "There is no databases for ALTER TABLE `{}` query in database `{}` (engine {})", + table_id.table_name, + getDatabaseName(), + getEngineName()); +} + +std::vector> +DatabasesOverlay::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const +{ + std::vector> result; + for (const auto & db : databases) + { + auto db_backup = db->getTablesForBackup(filter, local_context); + result.insert(result.end(), std::make_move_iterator(db_backup.begin()), std::make_move_iterator(db_backup.end())); + } + return result; +} + +void DatabasesOverlay::createTableRestoredFromBackup( + const ASTPtr & create_table_query, + ContextMutablePtr local_context, + std::shared_ptr /*restore_coordination*/, + UInt64 /*timeout_ms*/) +{ + /// Creates a tables by executing a "CREATE TABLE" query. + InterpreterCreateQuery interpreter{create_table_query, local_context}; + interpreter.setInternal(true); + interpreter.execute(); +} + +bool DatabasesOverlay::empty() const +{ + for (const auto & db : databases) + { + if (!db->empty()) + return false; + } + return true; +} + +void DatabasesOverlay::shutdown() +{ + for (auto & db : databases) + db->shutdown(); +} + +DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name) const +{ + Tables tables; + for (const auto & db : databases) + { + for (auto table_it = db->getTablesIterator(context_, filter_by_table_name); table_it->isValid(); table_it->next()) + tables.insert({table_it->name(), table_it->table()}); + } + return std::make_unique(std::move(tables), getDatabaseName()); +} + +} diff --git a/src/Databases/DatabasesOverlay.h b/src/Databases/DatabasesOverlay.h new file mode 100644 index 00000000000..0f31bbd6a47 --- /dev/null +++ b/src/Databases/DatabasesOverlay.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/** + * Implements the IDatabase interface and combines multiple other databases + * Searches for tables in each database in order until found, and delegates operations to the appropriate database + * Useful for combining databases + * + * Used in clickhouse-local to combine DatabaseFileSystem and DatabaseMemory + */ +class DatabasesOverlay : public IDatabase, protected WithContext +{ +public: + DatabasesOverlay(const String & name_, ContextPtr context_); + + /// Not thread-safe. Use only as factory to initialize database + DatabasesOverlay & registerNextDatabase(DatabasePtr database); + + String getEngineName() const override { return "Overlay"; } + +public: + bool isTableExist(const String & table_name, ContextPtr context) const override; + + StoragePtr tryGetTable(const String & table_name, ContextPtr context) const override; + + void createTable(ContextPtr context, const String & table_name, const StoragePtr & table, const ASTPtr & query) override; + + void dropTable(ContextPtr context, const String & table_name, bool sync) override; + + void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override; + + StoragePtr detachTable(ContextPtr context, const String & table_name) override; + + ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override; + ASTPtr getCreateDatabaseQuery() const override; + + String getTableDataPath(const String & table_name) const override; + String getTableDataPath(const ASTCreateQuery & query) const override; + + UUID tryGetTableUUID(const String & table_name) const override; + + void drop(ContextPtr context) override; + + void alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override; + + std::vector> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override; + + void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr restore_coordination, UInt64 timeout_ms) override; + + DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override; + + bool empty() const override; + + void shutdown() override; + +protected: + std::vector databases; + Poco::Logger * log; +}; + +} diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index aadae3e2491..a9577dfc84a 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -170,7 +170,7 @@ public: /// Get the table for work. Return nullptr if there is no table. virtual StoragePtr tryGetTable(const String & name, ContextPtr context) const = 0; - StoragePtr getTable(const String & name, ContextPtr context) const; + virtual StoragePtr getTable(const String & name, ContextPtr context) const; virtual UUID tryGetTableUUID(const String & /*table_name*/) const { return UUIDHelpers::Nil; } @@ -183,6 +183,8 @@ public: /// Is the database empty. virtual bool empty() const = 0; + virtual bool isReadOnly() const { return false; } + /// Add the table to the database. Record its presence in the metadata. virtual void createTable( ContextPtr /*context*/, diff --git a/src/Dictionaries/RegExpTreeDictionary.cpp b/src/Dictionaries/RegExpTreeDictionary.cpp index 074b179c48e..a9846dc06e9 100644 --- a/src/Dictionaries/RegExpTreeDictionary.cpp +++ b/src/Dictionaries/RegExpTreeDictionary.cpp @@ -30,8 +30,6 @@ #include #include -#include - #include "config.h" #if USE_VECTORSCAN @@ -469,10 +467,9 @@ public: std::pair processBackRefs(const String & data, const re2_st::RE2 & searcher, const std::vector & pieces) { - re2_st::StringPiece haystack(data.data(), data.size()); - re2_st::StringPiece matches[10]; + std::string_view matches[10]; String result; - searcher.Match(haystack, 0, data.size(), re2_st::RE2::Anchor::UNANCHORED, matches, 10); + searcher.Match({data.data(), data.size()}, 0, data.size(), re2_st::RE2::Anchor::UNANCHORED, matches, 10); /// if the pattern is a single '$1' but fails to match, we would use the default value. if (pieces.size() == 1 && pieces[0].ref_num >= 0 && pieces[0].ref_num < 10 && matches[pieces[0].ref_num].empty()) return std::make_pair(result, true); diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 81aa29639ac..15b6a9211de 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -74,19 +74,22 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile( } void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog( - const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type) + const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type) { if (!cache_log) return; + const auto range = file_segment.range(); FilesystemCacheLogElement elem { .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), .query_id = query_id, .source_file_path = source_file_path, - .file_segment_range = { file_segment_range.left, file_segment_range.right }, + .file_segment_range = { range.left, range.right }, .requested_range = { first_offset, read_until_position }, - .file_segment_size = file_segment_range.size(), + .file_segment_key = file_segment.key().toString(), + .file_segment_offset = file_segment.offset(), + .file_segment_size = range.size(), .read_from_cache_attempted = true, .read_buffer_id = current_buffer_id, .profile_counters = std::make_shared( @@ -495,7 +498,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext() auto completed_range = current_file_segment->range(); if (cache_log) - appendFilesystemCacheLog(completed_range, read_type); + appendFilesystemCacheLog(*current_file_segment, read_type); chassert(file_offset_of_buffer_end > completed_range.right); @@ -518,7 +521,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile() { if (cache_log && file_segments && !file_segments->empty()) { - appendFilesystemCacheLog(file_segments->front().range(), read_type); + appendFilesystemCacheLog(file_segments->front(), read_type); } } diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h index b4e7701de75..36cf8a54183 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h @@ -90,7 +90,7 @@ private: bool completeFileSegmentAndGetNext(); - void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type); + void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type); bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment); diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 16c1def7b11..2cd90731f1d 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -90,6 +90,8 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo() .source_file_path = current_object.remote_path, .file_segment_range = { 0, current_object.bytes_size }, .cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE, + .file_segment_key = {}, + .file_segment_offset = {}, .file_segment_size = current_object.bytes_size, .read_from_cache_attempted = false, }; diff --git a/src/Functions/ReplaceRegexpImpl.h b/src/Functions/ReplaceRegexpImpl.h index 7e3af1e62d9..9395489dac3 100644 --- a/src/Functions/ReplaceRegexpImpl.h +++ b/src/Functions/ReplaceRegexpImpl.h @@ -99,8 +99,8 @@ struct ReplaceRegexpImpl int num_captures, const Instructions & instructions) { - re2_st::StringPiece haystack(haystack_data, haystack_length); - re2_st::StringPiece matches[max_captures]; + std::string_view haystack(haystack_data, haystack_length); + std::string_view matches[max_captures]; size_t copy_pos = 0; size_t match_pos = 0; diff --git a/src/Functions/checkHyperscanRegexp.cpp b/src/Functions/checkHyperscanRegexp.cpp index 441e35cc5db..0dd4c5740c3 100644 --- a/src/Functions/checkHyperscanRegexp.cpp +++ b/src/Functions/checkHyperscanRegexp.cpp @@ -45,8 +45,8 @@ bool isLargerThanFifty(std::string_view str) /// Check for sub-patterns of the form x{n} or x{n,} can be expensive. Ignore spaces before/after n and m. bool SlowWithHyperscanChecker::isSlowOneRepeat(std::string_view regexp) { - re2_st::StringPiece haystack(regexp.data(), regexp.size()); - re2_st::StringPiece matches[2]; + std::string_view haystack(regexp.data(), regexp.size()); + std::string_view matches[2]; size_t start_pos = 0; while (start_pos < haystack.size()) { @@ -67,8 +67,8 @@ bool SlowWithHyperscanChecker::isSlowOneRepeat(std::string_view regexp) /// Check if sub-patterns of the form x{n,m} can be expensive. Ignore spaces before/after n and m. bool SlowWithHyperscanChecker::isSlowTwoRepeats(std::string_view regexp) { - re2_st::StringPiece haystack(regexp.data(), regexp.size()); - re2_st::StringPiece matches[3]; + std::string_view haystack(regexp.data(), regexp.size()); + std::string_view matches[3]; size_t start_pos = 0; while (start_pos < haystack.size()) { diff --git a/src/Functions/extractAllGroups.h b/src/Functions/extractAllGroups.h index faee25aa0ab..3a7987be93e 100644 --- a/src/Functions/extractAllGroups.h +++ b/src/Functions/extractAllGroups.h @@ -94,7 +94,6 @@ public: if (needle.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Length of 'needle' argument must be greater than 0."); - using StringPiece = typename Regexps::Regexp::StringPieceType; const Regexps::Regexp holder = Regexps::createRegexp(needle); const auto & regexp = holder.getRE2(); @@ -111,7 +110,7 @@ public: groups_count, std::to_string(MAX_GROUPS_COUNT - 1)); // Including 0-group, which is the whole regexp. - PODArrayWithStackMemory matched_groups(groups_count + 1); + PODArrayWithStackMemory matched_groups(groups_count + 1); ColumnArray::ColumnOffsets::MutablePtr root_offsets_col = ColumnArray::ColumnOffsets::create(); ColumnArray::ColumnOffsets::MutablePtr nested_offsets_col = ColumnArray::ColumnOffsets::create(); @@ -160,7 +159,7 @@ public: /// Additional limit to fail fast on supposedly incorrect usage. const auto max_matches_per_row = context->getSettingsRef().regexp_max_matches_per_row; - PODArray all_matches; + PODArray all_matches; /// Number of times RE matched on each row of haystack column. PODArray number_of_matches_per_row; diff --git a/src/Functions/extractGroups.cpp b/src/Functions/extractGroups.cpp index 6744edda922..21b8a68fc10 100644 --- a/src/Functions/extractGroups.cpp +++ b/src/Functions/extractGroups.cpp @@ -75,7 +75,7 @@ public: throw Exception(ErrorCodes::BAD_ARGUMENTS, "There are no groups in regexp: {}", needle); // Including 0-group, which is the whole regexp. - PODArrayWithStackMemory matched_groups(groups_count + 1); + PODArrayWithStackMemory matched_groups(groups_count + 1); ColumnArray::ColumnOffsets::MutablePtr offsets_col = ColumnArray::ColumnOffsets::create(); ColumnString::MutablePtr data_col = ColumnString::create(); @@ -89,7 +89,7 @@ public: { std::string_view current_row = column_haystack->getDataAt(i).toView(); - if (re2->Match(re2_st::StringPiece(current_row.data(), current_row.size()), + if (re2->Match({current_row.data(), current_row.size()}, 0, current_row.size(), re2_st::RE2::UNANCHORED, matched_groups.data(), static_cast(matched_groups.size()))) { diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 91d1c63e832..de8ae33433a 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -806,6 +806,13 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size) return true; } +void FileCache::removeKey(const Key & key) +{ + assertInitialized(); + auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW); + locked_key->removeAll(); +} + void FileCache::removeKeyIfExists(const Key & key) { assertInitialized(); @@ -818,7 +825,14 @@ void FileCache::removeKeyIfExists(const Key & key) /// But if we have multiple replicated zero-copy tables on the same server /// it became possible to start removing something from cache when it is used /// by other "zero-copy" tables. That is why it's not an error. - locked_key->removeAllReleasable(); + locked_key->removeAll(/* if_releasable */true); +} + +void FileCache::removeFileSegment(const Key & key, size_t offset) +{ + assertInitialized(); + auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW); + locked_key->removeFileSegment(offset); } void FileCache::removePathIfExists(const String & path) @@ -830,22 +844,12 @@ void FileCache::removeAllReleasable() { assertInitialized(); - auto lock = lockCache(); - - main_priority->iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata) - { - if (segment_metadata->releasable()) - { - auto file_segment = segment_metadata->file_segment; - locked_key.removeFileSegment(file_segment->offset(), file_segment->lock()); - return PriorityIterationResult::REMOVE_AND_CONTINUE; - } - return PriorityIterationResult::CONTINUE; - }, lock); + metadata.iterate([](LockedKey & locked_key) { locked_key.removeAll(/* if_releasable */true); }); if (stash) { /// Remove all access information. + auto lock = lockCache(); stash->records.clear(); stash->queue->removeAll(lock); } @@ -915,7 +919,7 @@ void FileCache::loadMetadata() continue; } - const auto key = Key(unhexUInt(key_directory.filename().string().data())); + const auto key = Key::fromKeyString(key_directory.filename().string()); auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true); for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it) @@ -1070,7 +1074,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot() FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key) { FileSegments file_segments; - auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW); + auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL); for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata()) file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment)); return std::make_unique(std::move(file_segments)); diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 0e3b17baa2f..2e6a5094758 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -83,13 +83,19 @@ public: FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings); - /// Remove files by `key`. Removes files which might be used at the moment. + /// Remove file segment by `key` and `offset`. Throws if file segment does not exist. + void removeFileSegment(const Key & key, size_t offset); + + /// Remove files by `key`. Throws if key does not exist. + void removeKey(const Key & key); + + /// Remove files by `key`. void removeKeyIfExists(const Key & key); - /// Removes files by `path`. Removes files which might be used at the moment. + /// Removes files by `path`. void removePathIfExists(const String & path); - /// Remove files by `key`. Will not remove files which are used at the moment. + /// Remove files by `key`. void removeAllReleasable(); std::vector tryGetCachePaths(const Key & key); diff --git a/src/Interpreters/Cache/FileCacheKey.cpp b/src/Interpreters/Cache/FileCacheKey.cpp index f97cdc058aa..772fcd600bf 100644 --- a/src/Interpreters/Cache/FileCacheKey.cpp +++ b/src/Interpreters/Cache/FileCacheKey.cpp @@ -28,4 +28,9 @@ FileCacheKey FileCacheKey::random() return FileCacheKey(UUIDHelpers::generateV4().toUnderType()); } +FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str) +{ + return FileCacheKey(unhexUInt(key_str.data())); +} + } diff --git a/src/Interpreters/Cache/FileCacheKey.h b/src/Interpreters/Cache/FileCacheKey.h index bab8359732c..e788cd5e7cd 100644 --- a/src/Interpreters/Cache/FileCacheKey.h +++ b/src/Interpreters/Cache/FileCacheKey.h @@ -21,6 +21,8 @@ struct FileCacheKey static FileCacheKey random(); bool operator==(const FileCacheKey & other) const { return key == other.key; } + + static FileCacheKey fromKeyString(const std::string & key_str); }; using FileCacheKeyAndOffset = std::pair; diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index decc69bb81f..0a2d58432e4 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -25,6 +25,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } FileSegmentMetadata::FileSegmentMetadata(FileSegmentPtr && file_segment_) @@ -191,6 +192,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata( if (it == end()) { if (key_not_found_policy == KeyNotFoundPolicy::THROW) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key); + else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL) throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key); else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL) return nullptr; @@ -215,6 +218,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata( return locked_metadata; if (key_not_found_policy == KeyNotFoundPolicy::THROW) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key); + else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL) throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key); if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL) @@ -333,11 +338,11 @@ class DownloadQueue { friend struct CacheMetadata; public: - void add(std::weak_ptr file_segment) + void add(FileSegmentPtr file_segment) { { std::lock_guard lock(mutex); - queue.push(file_segment); + queue.emplace(file_segment->key(), file_segment->offset(), file_segment); } CurrentMetrics::add(CurrentMetrics::FilesystemCacheDownloadQueueElements); @@ -356,8 +361,19 @@ private: std::mutex mutex; std::condition_variable cv; - std::queue> queue; bool cancelled = false; + + struct DownloadInfo + { + CacheMetadata::Key key; + size_t offset; + /// We keep weak pointer to file segment + /// instead of just getting it from file_segment_metadata, + /// because file segment at key:offset count be removed and added back to metadata + /// before we actually started background download. + std::weak_ptr file_segment; + }; + std::queue queue; }; void CacheMetadata::downloadThreadFunc() @@ -365,6 +381,8 @@ void CacheMetadata::downloadThreadFunc() std::optional> memory; while (true) { + Key key; + size_t offset; std::weak_ptr file_segment_weak; { @@ -379,7 +397,11 @@ void CacheMetadata::downloadThreadFunc() continue; } - file_segment_weak = download_queue->queue.front(); + auto entry = download_queue->queue.front(); + key = entry.key; + offset = entry.offset; + file_segment_weak = entry.file_segment; + download_queue->queue.pop(); } @@ -389,19 +411,21 @@ void CacheMetadata::downloadThreadFunc() try { { - auto file_segment = file_segment_weak.lock(); - if (!file_segment - || file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED) - continue; - - auto locked_key = lockKeyMetadata(file_segment->key(), KeyNotFoundPolicy::RETURN_NULL); + auto locked_key = lockKeyMetadata(key, KeyNotFoundPolicy::RETURN_NULL); if (!locked_key) continue; - auto file_segment_metadata = locked_key->tryGetByOffset(file_segment->offset()); + auto file_segment_metadata = locked_key->tryGetByOffset(offset); if (!file_segment_metadata || file_segment_metadata->evicting()) continue; + auto file_segment = file_segment_weak.lock(); + + if (!file_segment + || file_segment != file_segment_metadata->file_segment + || file_segment->state() != FileSegment::State::PARTIALLY_DOWNLOADED) + continue; + holder = std::make_unique(FileSegments{file_segment}); } @@ -539,11 +563,11 @@ bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const return file_segment_metadata->file_segment.use_count() == 2; } -void LockedKey::removeAllReleasable() +void LockedKey::removeAll(bool if_releasable) { for (auto it = key_metadata->begin(); it != key_metadata->end();) { - if (!it->second->releasable()) + if (if_releasable && !it->second->releasable()) { ++it; continue; @@ -564,17 +588,32 @@ void LockedKey::removeAllReleasable() } } +KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset) +{ + auto it = key_metadata->find(offset); + if (it == key_metadata->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset); + + auto file_segment = it->second->file_segment; + return removeFileSegmentImpl(it, file_segment->lock()); +} + KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock) { auto it = key_metadata->find(offset); if (it == key_metadata->end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset); + return removeFileSegmentImpl(it, segment_lock); +} + +KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock) +{ auto file_segment = it->second->file_segment; LOG_DEBUG( key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}", - getKey(), offset, file_segment->reserved_size); + getKey(), file_segment->offset(), file_segment->reserved_size); chassert(file_segment->assertCorrectnessUnlocked(segment_lock)); diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index 503c19f4150..42d74338e12 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -87,7 +87,7 @@ struct CacheMetadata : public std::unordered_map, { public: using Key = FileCacheKey; - using IterateCacheMetadataFunc = std::function; + using IterateCacheMetadataFunc = std::function; explicit CacheMetadata(const std::string & path_); @@ -106,6 +106,7 @@ public: enum class KeyNotFoundPolicy { THROW, + THROW_LOGICAL, CREATE_EMPTY, RETURN_NULL, }; @@ -169,9 +170,10 @@ struct LockedKey : private boost::noncopyable std::shared_ptr getKeyMetadata() const { return key_metadata; } std::shared_ptr getKeyMetadata() { return key_metadata; } - void removeAllReleasable(); + void removeAll(bool if_releasable = true); KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &); + KeyMetadata::iterator removeFileSegment(size_t offset); void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &); @@ -188,6 +190,8 @@ struct LockedKey : private boost::noncopyable std::string toString() const; private: + KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &); + const std::shared_ptr key_metadata; KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`. }; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 4cb2f6e3b3d..e0b6348ed3c 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -356,7 +356,8 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( auto table = database->tryGetTable(table_id.table_name, context_); if (!table && exception) - exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs())); + exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs())); + if (!table) database = nullptr; diff --git a/src/Interpreters/FilesystemCacheLog.cpp b/src/Interpreters/FilesystemCacheLog.cpp index 17f0fda71ec..b660db064d1 100644 --- a/src/Interpreters/FilesystemCacheLog.cpp +++ b/src/Interpreters/FilesystemCacheLog.cpp @@ -40,6 +40,8 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes() {"source_file_path", std::make_shared()}, {"file_segment_range", std::make_shared(types)}, {"total_requested_range", std::make_shared(types)}, + {"key", std::make_shared()}, + {"offset", std::make_shared()}, {"size", std::make_shared()}, {"read_type", std::make_shared()}, {"read_from_cache_attempted", std::make_shared()}, @@ -60,6 +62,8 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(source_file_path); columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second}); columns[i++]->insert(Tuple{requested_range.first, requested_range.second}); + columns[i++]->insert(file_segment_key); + columns[i++]->insert(file_segment_offset); columns[i++]->insert(file_segment_size); columns[i++]->insert(typeToString(cache_type)); columns[i++]->insert(read_from_cache_attempted); diff --git a/src/Interpreters/FilesystemCacheLog.h b/src/Interpreters/FilesystemCacheLog.h index 1b22d561c51..d6dd00e5463 100644 --- a/src/Interpreters/FilesystemCacheLog.h +++ b/src/Interpreters/FilesystemCacheLog.h @@ -39,6 +39,8 @@ struct FilesystemCacheLogElement std::pair file_segment_range{}; std::pair requested_range{}; CacheType cache_type{}; + std::string file_segment_key; + size_t file_segment_offset; size_t file_segment_size; bool read_from_cache_attempted; String read_buffer_id; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index f2d011b12d1..e1ff8676bc7 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -370,7 +370,18 @@ BlockIO InterpreterSystemQuery::execute() else { auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache; - cache->removeAllReleasable(); + if (query.delete_key.empty()) + { + cache->removeAllReleasable(); + } + else + { + auto key = FileCacheKey::fromKeyString(query.delete_key); + if (query.delete_offset.has_value()) + cache->removeFileSegment(key, query.delete_offset.value()); + else + cache->removeKey(key); + } } break; } diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index a91449ff035..9c5e7bff61e 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -210,7 +210,15 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &, else if (type == Type::DROP_FILESYSTEM_CACHE) { if (!filesystem_cache_name.empty()) + { settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name; + if (!delete_key.empty()) + { + settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << delete_key; + if (delete_offset.has_value()) + settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << delete_offset.value(); + } + } } else if (type == Type::UNFREEZE) { diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index ca4802d9a9b..ebc3e9cd430 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -107,6 +107,8 @@ public: UInt64 seconds{}; String filesystem_cache_name; + std::string delete_key; + std::optional delete_offset; String backup_name; diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index 48dbe60e241..ef71e994d56 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -405,7 +405,15 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & ParserLiteral path_parser; ASTPtr ast; if (path_parser.parse(pos, ast, expected)) + { res->filesystem_cache_name = ast->as()->value.safeGet(); + if (ParserKeyword{"KEY"}.ignore(pos, expected) && ParserIdentifier().parse(pos, ast, expected)) + { + res->delete_key = ast->as()->name(); + if (ParserKeyword{"OFFSET"}.ignore(pos, expected) && ParserLiteral().parse(pos, ast, expected)) + res->delete_offset = ast->as()->value.safeGet(); + } + } if (!parseQueryWithOnCluster(res, pos, expected)) return false; break; diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.h b/src/Processors/Formats/Impl/RegexpRowInputFormat.h index d6696ffe751..2469774aaf9 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.h +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -28,14 +27,14 @@ public: /// Return true if row was successfully parsed and row fields were extracted. bool parseRow(PeekableReadBuffer & buf); - re2_st::StringPiece getField(size_t index) { return matched_fields[index]; } + std::string_view getField(size_t index) { return matched_fields[index]; } size_t getMatchedFieldsSize() const { return matched_fields.size(); } size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); } private: const re2_st::RE2 regexp; // The vector of fields extracted from line using regexp. - std::vector matched_fields; + std::vector matched_fields; // These two vectors are needed to use RE2::FullMatchN (function for extracting fields). std::vector re2_arguments; std::vector re2_arguments_ptrs; diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 43085690519..7f7f9058f1b 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -282,7 +282,7 @@ Chain buildPushingToViewsChain( auto * original_thread = current_thread; SCOPE_EXIT({ current_thread = original_thread; }); - std::unique_ptr view_thread_status_ptr = std::make_unique(); + std::unique_ptr view_thread_status_ptr = std::make_unique(/*check_current_thread_on_destruction=*/ false); /// Copy of a ThreadStatus should be internal. view_thread_status_ptr->setInternalThread(); view_thread_status_ptr->attachToGroup(running_group); diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index fe98ae5f69e..42459340c57 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -44,6 +44,8 @@ #include #include +#include + #include #include @@ -1163,8 +1165,8 @@ void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, Conte { int num_captures = compiled_regex->NumberOfCapturingGroups() + 1; - re2::StringPiece matches[num_captures]; - re2::StringPiece input(begin, end - begin); + std::string_view matches[num_captures]; + std::string_view input(begin, end - begin); if (compiled_regex->Match(input, 0, end - begin, re2::RE2::Anchor::ANCHOR_BOTH, matches, num_captures)) { for (const auto & [capturing_name, capturing_index] : compiled_regex->NamedCapturingGroups()) diff --git a/src/Server/HTTPHandlerRequestFilter.h b/src/Server/HTTPHandlerRequestFilter.h index c6bcdb211e1..25cbb950871 100644 --- a/src/Server/HTTPHandlerRequestFilter.h +++ b/src/Server/HTTPHandlerRequestFilter.h @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -26,9 +25,8 @@ static inline bool checkRegexExpression(std::string_view match_str, const Compil { int num_captures = compiled_regex->NumberOfCapturingGroups() + 1; - re2::StringPiece matches[num_captures]; - re2::StringPiece match_input(match_str.data(), match_str.size()); - return compiled_regex->Match(match_input, 0, match_str.size(), re2::RE2::Anchor::ANCHOR_BOTH, matches, num_captures); + std::string_view matches[num_captures]; + return compiled_regex->Match({match_str.data(), match_str.size()}, 0, match_str.size(), re2::RE2::Anchor::ANCHOR_BOTH, matches, num_captures); } static inline bool checkExpression(std::string_view match_str, const std::pair & expression) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3164c4ad081..318a704ce37 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4530,9 +4530,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( } -void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) +void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &) { - auto lock = lockParts(); for (auto original_active_part : getDataPartsStateRange(DataPartState::Active)) // NOLINT (copy is intended) { if (part_copy->name == original_active_part->name) @@ -4588,6 +4587,12 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & return getActiveContainingPart(part_info); } +MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name, DataPartsLock & lock) const +{ + auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); + return getActiveContainingPart(part_info, DataPartState::Active, lock); +} + MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const { return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id); @@ -7192,7 +7197,10 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage( if (query_context->canUseParallelReplicasOnInitiator() && to_stage >= QueryProcessingStage::WithMergeableState) { if (!canUseParallelReplicasBasedOnPKAnalysis(query_context, storage_snapshot, query_info)) + { + query_info.parallel_replicas_disabled = true; return QueryProcessingStage::Enum::FetchColumns; + } /// ReplicatedMergeTree if (supportsReplication()) diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 91f3d698a7c..a54d38b3ecf 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -504,12 +504,13 @@ public: /// Returns a part in Active state with the given name or a part containing it. If there is no such part, returns nullptr. DataPartPtr getActiveContainingPart(const String & part_name) const; + DataPartPtr getActiveContainingPart(const String & part_name, DataPartsLock & lock) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const; /// Swap part with it's identical copy (possible with another path on another disk). /// If original part is not active or doesn't exist exception will be thrown. - void swapActivePart(MergeTreeData::DataPartPtr part_copy); + void swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &); /// Returns all parts in specified partition DataPartsVector getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const; diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 8fa4ac6c78a..a8f34ba4cec 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -263,7 +263,10 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons if (moves_blocker.isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts."); - auto active_part = data->getActiveContainingPart(cloned_part.part->name); + /// `getActiveContainingPart` and `swapActivePart` are called under the same lock + /// to prevent part becoming inactive between calls + auto part_lock = data->lockParts(); + auto active_part = data->getActiveContainingPart(cloned_part.part->name, part_lock); /// It's ok, because we don't block moving parts for merges or mutations if (!active_part || active_part->name != cloned_part.part->name) @@ -284,7 +287,7 @@ void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) cons cloned_part.part->renameTo(active_part->name, false); /// TODO what happen if server goes down here? - data->swapActivePart(cloned_part.part); + data->swapActivePart(cloned_part.part, part_lock); LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath()); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 8fbc64b7a24..13d6909fd52 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -255,6 +255,8 @@ struct SelectQueryInfo Block minmax_count_projection_block; MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr; + bool parallel_replicas_disabled = false; + bool is_parameterized_view = false; NameToNameMap parameterized_view_values; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 645603993ff..281d0593c28 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -209,7 +209,9 @@ void StorageMergeTree::read( size_t max_block_size, size_t num_streams) { - if (local_context->canUseParallelReplicasOnInitiator() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree) + if (!query_info.parallel_replicas_disabled && + local_context->canUseParallelReplicasOnInitiator() && + local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree) { auto table_id = getStorageID(); @@ -240,7 +242,10 @@ void StorageMergeTree::read( } else { - const bool enable_parallel_reading = local_context->canUseParallelReplicasOnFollower() && local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree; + const bool enable_parallel_reading = + !query_info.parallel_replicas_disabled && + local_context->canUseParallelReplicasOnFollower() && + local_context->getSettingsRef().parallel_replicas_for_non_replicated_merge_tree; if (auto plan = reader.read( column_names, storage_snapshot, query_info, @@ -922,43 +927,69 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; - if (!canEnqueueBackgroundTask()) + auto is_background_memory_usage_ok = [](String * disable_reason) -> bool { - out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", + if (canEnqueueBackgroundTask()) + return true; + disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", formatReadableSizeWithBinarySuffix(background_memory_tracker.get()), formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit())); - } - else if (partition_id.empty()) - { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); - bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; + return false; + }; - /// TTL requirements is much more strict than for regular merge, so - /// if regular not possible, than merge with ttl is not also not - /// possible. - if (max_source_parts_size > 0) + if (partition_id.empty()) + { + if (is_background_memory_usage_ok(out_disable_reason)) { - select_decision = merger_mutator.selectPartsToMerge( - future_part, - aggressive, - max_source_parts_size, - can_merge, - merge_with_ttl_allowed, - txn, - out_disable_reason); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); + bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; + + /// TTL requirements is much more strict than for regular merge, so + /// if regular not possible, than merge with ttl is not also not + /// possible. + if (max_source_parts_size > 0) + { + select_decision = merger_mutator.selectPartsToMerge( + future_part, + aggressive, + max_source_parts_size, + can_merge, + merge_with_ttl_allowed, + txn, + out_disable_reason); + } + else + out_disable_reason = "Current value of max_source_parts_size is zero"; } - else - out_disable_reason = "Current value of max_source_parts_size is zero"; } else { while (true) { - select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( - future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds(); auto timeout = std::chrono::milliseconds(timeout_ms); + if (!is_background_memory_usage_ok(out_disable_reason)) + { + constexpr auto poll_interval = std::chrono::seconds(1); + Int64 attempts = timeout / poll_interval; + bool ok = false; + for (Int64 i = 0; i < attempts; ++i) + { + std::this_thread::sleep_for(poll_interval); + if (is_background_memory_usage_ok(out_disable_reason)) + { + ok = true; + break; + } + } + if (!ok) + break; + } + + select_decision = merger_mutator.selectAllPartsToMergeWithinPartition( + future_part, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions); + /// If final - we will wait for currently processing merges to finish and continue. if (final && select_decision != SelectPartsDecision::SELECTED diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index 0872033aed0..f7cc13dd2e2 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -36,6 +36,7 @@ 01455_shard_leaf_max_rows_bytes_to_read 01495_subqueries_in_with_statement 01504_rocksdb +01526_client_start_and_exit 01527_dist_sharding_key_dictGet_reload 01528_allow_nondeterministic_optimize_skip_unused_shards 01540_verbatim_partition_pruning @@ -50,6 +51,7 @@ 01624_soft_constraints 01651_bugs_from_15889 01656_test_query_log_factories_info +01676_clickhouse_client_autocomplete 01681_bloom_filter_nullable_column 01700_system_zookeeper_path_in 01710_projection_additional_filters diff --git a/tests/ci/stress_tests.lib b/tests/ci/stress_tests.lib index 2b8ac77b952..190f3f39f9e 100644 --- a/tests/ci/stress_tests.lib +++ b/tests/ci/stress_tests.lib @@ -243,7 +243,7 @@ function check_logs_for_critical_errors() # Remove file fatal_messages.txt if it's empty [ -s /test_output/fatal_messages.txt ] || rm /test_output/fatal_messages.txt - rg -Fa "########################################" /test_output/* > /dev/null \ + rg -Faz "########################################" /test_output/* > /dev/null \ && echo -e "Killed by signal (output files)$FAIL" >> /test_output/test_results.tsv function get_gdb_log_context() diff --git a/tests/config/config.d/merge_tree_old_dirs_cleanup.xml b/tests/config/config.d/merge_tree_old_dirs_cleanup.xml index 2b8ea63b63d..e6b50724c97 100644 --- a/tests/config/config.d/merge_tree_old_dirs_cleanup.xml +++ b/tests/config/config.d/merge_tree_old_dirs_cleanup.xml @@ -5,4 +5,5 @@ 5 + true diff --git a/tests/config/config.d/named_collection.xml b/tests/config/config.d/named_collection.xml index 2e49c0c596f..5b716a7b8da 100644 --- a/tests/config/config.d/named_collection.xml +++ b/tests/config/config.d/named_collection.xml @@ -32,5 +32,10 @@ testtest auto + + http://localhost:11111/test/ + test + testtest + diff --git a/tests/integration/test_hedged_requests/test.py b/tests/integration/test_hedged_requests/test.py index be6cea80f87..18ea3e50619 100644 --- a/tests/integration/test_hedged_requests/test.py +++ b/tests/integration/test_hedged_requests/test.py @@ -203,6 +203,9 @@ def update_configs( def test_stuck_replica(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs() cluster.pause_container("node_1") @@ -233,6 +236,9 @@ def test_stuck_replica(started_cluster): def test_long_query(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs() # Restart to reset pool states. @@ -249,12 +255,18 @@ def test_long_query(started_cluster): def test_send_table_status_sleep(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs(node_1_sleep_in_send_tables_status=sleep_time) check_query(expected_replica="node_2") check_changing_replica_events(1) def test_send_table_status_sleep2(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_tables_status=sleep_time, node_2_sleep_in_send_tables_status=sleep_time, @@ -264,12 +276,18 @@ def test_send_table_status_sleep2(started_cluster): def test_send_data(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs(node_1_sleep_in_send_data=sleep_time) check_query(expected_replica="node_2") check_changing_replica_events(1) def test_send_data2(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time ) @@ -278,6 +296,9 @@ def test_send_data2(started_cluster): def test_combination1(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_tables_status=sleep_time, node_2_sleep_in_send_data=sleep_time, @@ -287,6 +308,9 @@ def test_combination1(started_cluster): def test_combination2(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_tables_status=sleep_time, @@ -296,6 +320,9 @@ def test_combination2(started_cluster): def test_combination3(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_tables_status=1000, @@ -306,6 +333,9 @@ def test_combination3(started_cluster): def test_combination4(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_tables_status=1000, node_1_sleep_in_send_data=sleep_time, @@ -317,6 +347,9 @@ def test_combination4(started_cluster): def test_receive_timeout1(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + # Check the situation when first two replicas get receive timeout # in establishing connection, but the third replica is ok. update_configs( @@ -329,6 +362,9 @@ def test_receive_timeout1(started_cluster): def test_receive_timeout2(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + # Check the situation when first replica get receive timeout # in packet receiving but there are replicas in process of # connection establishing. @@ -342,6 +378,9 @@ def test_receive_timeout2(started_cluster): def test_initial_receive_timeout(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + # Check the situation when replicas don't respond after # receiving query (so, no packets were send to initiator) update_configs( @@ -360,6 +399,9 @@ def test_initial_receive_timeout(started_cluster): def test_async_connect(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs() NODES["node"].restart_clickhouse() @@ -390,6 +432,9 @@ def test_async_connect(started_cluster): def test_async_query_sending(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_after_receiving_query=5000, node_2_sleep_after_receiving_query=5000, diff --git a/tests/integration/test_hedged_requests_parallel/test.py b/tests/integration/test_hedged_requests_parallel/test.py index 492b869614f..728697c690d 100644 --- a/tests/integration/test_hedged_requests_parallel/test.py +++ b/tests/integration/test_hedged_requests_parallel/test.py @@ -172,6 +172,9 @@ def update_configs( def test_send_table_status_sleep(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_tables_status=sleep_time, node_2_sleep_in_send_tables_status=sleep_time, @@ -181,6 +184,9 @@ def test_send_table_status_sleep(started_cluster): def test_send_data(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time ) @@ -189,6 +195,9 @@ def test_send_data(started_cluster): def test_combination1(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_tables_status=1000, node_2_sleep_in_send_tables_status=1000, @@ -199,6 +208,9 @@ def test_combination1(started_cluster): def test_combination2(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_tables_status=1000, @@ -210,6 +222,9 @@ def test_combination2(started_cluster): def test_query_with_no_data_to_sample(started_cluster): + if NODES["node"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + update_configs( node_1_sleep_in_send_data=sleep_time, node_2_sleep_in_send_data=sleep_time ) diff --git a/tests/integration/test_secure_socket/test.py b/tests/integration/test_secure_socket/test.py index 2dffbed03d6..123715e5f05 100644 --- a/tests/integration/test_secure_socket/test.py +++ b/tests/integration/test_secure_socket/test.py @@ -58,6 +58,9 @@ def test(started_cluster): config.format(sleep_in_send_data_ms=1000000), ) + if NODES["node1"].is_built_with_thread_sanitizer(): + pytest.skip("Hedged requests don't work under Thread Sanitizer") + attempts = 0 while attempts < 1000: setting = NODES["node2"].http_query( diff --git a/tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh b/tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh index 687ddd8fb46..db6921bc1c8 100755 --- a/tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh +++ b/tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh @@ -111,6 +111,23 @@ cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF dfs.datanode.http.address 0.0.0.0:1006 + + + dfs.datanode.ipc.address + 0.0.0.0:0 + + + dfs.namenode.secondary.http-address + 0.0.0.0:0 + + + dfs.namenode.backup.address + 0.0.0.0:0 + + + dfs.namenode.backup.http-address + 0.0.0.0:0 +