diff --git a/.github/workflows/backport_branches.yml b/.github/workflows/backport_branches.yml index b93c1b61ffd..e88a121b5d3 100644 --- a/.github/workflows/backport_branches.yml +++ b/.github/workflows/backport_branches.yml @@ -143,6 +143,8 @@ jobs: sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE" - name: Check out repository code uses: actions/checkout@v2 + with: + fetch-depth: 0 # For a proper version and performance artifacts - name: Build run: | git -C "$GITHUB_WORKSPACE" submodule sync --recursive @@ -188,6 +190,8 @@ jobs: sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE" - name: Check out repository code uses: actions/checkout@v2 + with: + fetch-depth: 0 # For a proper version and performance artifacts - name: Build run: | git -C "$GITHUB_WORKSPACE" submodule sync --recursive diff --git a/README.md b/README.md index e07a701d7c7..153a0d5ce11 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,4 @@ ClickHouse® is an open-source column-oriented database management system that a ## Upcoming events * [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/286304312/) Please join us for an evening of talks (in English), food and discussion. Featuring talks of ClickHouse in production and at least one on the deep internals of ClickHouse itself. +* [v22.7 Release Webinar](https://clickhouse.com/company/events/v22-7-release-webinar/) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap. diff --git a/docker/test/stress/run.sh b/docker/test/stress/run.sh index c73784c4ef1..f0f5d21c0b8 100755 --- a/docker/test/stress/run.sh +++ b/docker/test/stress/run.sh @@ -215,7 +215,7 @@ start clickhouse-client --query "SELECT 'Server successfully started', 'OK'" >> /test_output/test_results.tsv \ || (echo -e 'Server failed to start (see application_errors.txt and clickhouse-server.clean.log)\tFAIL' >> /test_output/test_results.tsv \ - && grep -Fa ".*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt) + && grep -a ".*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt) [ -f /var/log/clickhouse-server/clickhouse-server.log ] || echo -e "Server log does not exist\tFAIL" [ -f /var/log/clickhouse-server/stderr.log ] || echo -e "Stderr log does not exist\tFAIL" @@ -313,7 +313,7 @@ then start 500 clickhouse-client --query "SELECT 'Backward compatibility check: Server successfully started', 'OK'" >> /test_output/test_results.tsv \ || (echo -e 'Backward compatibility check: Server failed to start\tFAIL' >> /test_output/test_results.tsv \ - && grep -Fa ".*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt) + && grep -a ".*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt) clickhouse-client --query="SELECT 'Server version: ', version()" @@ -343,7 +343,7 @@ then -e "UNFINISHED" \ -e "Renaming unexpected part" \ -e "PART_IS_TEMPORARILY_LOCKED" \ - -e "and a merge is impossible: we didn't find smaller parts" \ + -e "and a merge is impossible: we didn't find" \ -e "found in queue and some source parts for it was lost" \ -e "is lost forever." \ /var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "" > /test_output/bc_check_error_messages.txt \ diff --git a/docs/en/engines/table-engines/integrations/hdfs.md b/docs/en/engines/table-engines/integrations/hdfs.md index ae32f5dd80f..9796fd73b1b 100644 --- a/docs/en/engines/table-engines/integrations/hdfs.md +++ b/docs/en/engines/table-engines/integrations/hdfs.md @@ -186,7 +186,6 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us | - | - | |hadoop\_kerberos\_keytab | "" | |hadoop\_kerberos\_principal | "" | -|hadoop\_kerberos\_kinit\_command | kinit | |libhdfs3\_conf | "" | ### Limitations {#limitations} @@ -200,8 +199,7 @@ Note that due to libhdfs3 limitations only old-fashioned approach is supported, datanode communications are not secured by SASL (`HADOOP_SECURE_DN_USER` is a reliable indicator of such security approach). Use `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` for reference. -If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_kerberos_kinit_command` is specified, `kinit` will be invoked. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case. `kinit` tool and krb5 configuration files are required. - +If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_security_kerberos_ticket_cache_path` are specified, Kerberos authentication will be used. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case. ## HDFS Namenode HA support {#namenode-ha} libhdfs3 support HDFS namenode HA. diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 2de931c9e51..47a0e022841 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -168,7 +168,7 @@ For a list of possible configuration options, see the [librdkafka configuration ### Kerberos support {#kafka-kerberos-support} To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities. -ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements. +ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab` and `sasl_kerberos_principal` child elements. Example: diff --git a/docs/ru/engines/table-engines/integrations/hdfs.md b/docs/ru/engines/table-engines/integrations/hdfs.md index 0857359e987..84f31c0afcc 100644 --- a/docs/ru/engines/table-engines/integrations/hdfs.md +++ b/docs/ru/engines/table-engines/integrations/hdfs.md @@ -183,7 +183,6 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9 | - | - | |hadoop\_kerberos\_keytab | "" | |hadoop\_kerberos\_principal | "" | -|hadoop\_kerberos\_kinit\_command | kinit | ### Ограничения {#limitations} * `hadoop_security_kerberos_ticket_cache_path` и `libhdfs3_conf` могут быть определены только на глобальном, а не на пользовательском уровне @@ -196,7 +195,7 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9 коммуникация с узлами данных не защищена SASL (`HADOOP_SECURE_DN_USER` надежный показатель такого подхода к безопасности). Используйте `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` для примера настроек. -Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_kerberos_kinit_command` указаны в настройках, `kinit` будет вызван. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае. Необходимо также будет установить `kinit` и файлы конфигурации krb5. +Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_security_kerberos_ticket_cache_path` указаны в настройках, будет использоваться аутентификация с помощью Kerberos. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае. ## Виртуальные столбцы {#virtual-columns} diff --git a/docs/ru/engines/table-engines/integrations/kafka.md b/docs/ru/engines/table-engines/integrations/kafka.md index b24a096015d..b51a0113302 100644 --- a/docs/ru/engines/table-engines/integrations/kafka.md +++ b/docs/ru/engines/table-engines/integrations/kafka.md @@ -167,7 +167,7 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format ### Поддержка Kerberos {#kafka-kerberos-support} Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент `security_protocol` со значением `sasl_plaintext`. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. -ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` и `sasl.kerberos.kinit.cmd`. +ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab` и `sasl_kerberos_principal`. Пример: diff --git a/src/Access/CMakeLists.txt b/src/Access/CMakeLists.txt index e69de29bb2d..83bbe418246 100644 --- a/src/Access/CMakeLists.txt +++ b/src/Access/CMakeLists.txt @@ -0,0 +1,3 @@ +if (ENABLE_EXAMPLES) + add_subdirectory(examples) +endif() diff --git a/src/Access/KerberosInit.cpp b/src/Access/KerberosInit.cpp new file mode 100644 index 00000000000..ace03a5e0b5 --- /dev/null +++ b/src/Access/KerberosInit.cpp @@ -0,0 +1,230 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#if USE_KRB5 +#include +#include + +using namespace DB; + +namespace DB +{ +namespace ErrorCodes +{ + extern const int KERBEROS_ERROR; +} +} + +namespace +{ +struct K5Data +{ + krb5_context ctx; + krb5_ccache out_cc; + krb5_principal me; + char * name; + krb5_boolean switch_to_cache; +}; + +/** + * This class implements programmatic implementation of kinit. + */ +class KerberosInit : boost::noncopyable +{ +public: + void init(const String & keytab_file, const String & principal, const String & cache_name = ""); + ~KerberosInit(); +private: + struct K5Data k5 {}; + krb5_ccache defcache = nullptr; + krb5_get_init_creds_opt * options = nullptr; + // Credentials structure including ticket, session key, and lifetime info. + krb5_creds my_creds; + krb5_keytab keytab = nullptr; + krb5_principal defcache_princ = nullptr; + String fmtError(krb5_error_code code) const; +}; +} + + +String KerberosInit::fmtError(krb5_error_code code) const +{ + const char *msg; + msg = krb5_get_error_message(k5.ctx, code); + String fmt_error = fmt::format(" ({}, {})", code, msg); + krb5_free_error_message(k5.ctx, msg); + return fmt_error; +} + +void KerberosInit::init(const String & keytab_file, const String & principal, const String & cache_name) +{ + auto * log = &Poco::Logger::get("KerberosInit"); + LOG_TRACE(log,"Trying to authenticate with Kerberos v5"); + + krb5_error_code ret; + + const char *deftype = nullptr; + + if (!std::filesystem::exists(keytab_file)) + throw Exception("Keytab file does not exist", ErrorCodes::KERBEROS_ERROR); + + ret = krb5_init_context(&k5.ctx); + if (ret) + throw Exception(ErrorCodes::KERBEROS_ERROR, "Error while initializing Kerberos 5 library ({})", ret); + + if (!cache_name.empty()) + { + ret = krb5_cc_resolve(k5.ctx, cache_name.c_str(), &k5.out_cc); + if (ret) + throw Exception("Error in resolving cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Resolved cache"); + } + else + { + // Resolve the default cache and get its type and default principal (if it is initialized). + ret = krb5_cc_default(k5.ctx, &defcache); + if (ret) + throw Exception("Error while getting default cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Resolved default cache"); + deftype = krb5_cc_get_type(k5.ctx, defcache); + if (krb5_cc_get_principal(k5.ctx, defcache, &defcache_princ) != 0) + defcache_princ = nullptr; + } + + // Use the specified principal name. + ret = krb5_parse_name_flags(k5.ctx, principal.c_str(), 0, &k5.me); + if (ret) + throw Exception("Error when parsing principal name " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + + // Cache related commands + if (k5.out_cc == nullptr && krb5_cc_support_switch(k5.ctx, deftype)) + { + // Use an existing cache for the client principal if we can. + ret = krb5_cc_cache_match(k5.ctx, k5.me, &k5.out_cc); + if (ret && ret != KRB5_CC_NOTFOUND) + throw Exception("Error while searching for cache for " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + if (0 == ret) + { + LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc)); + k5.switch_to_cache = 1; + } + else if (defcache_princ != nullptr) + { + // Create a new cache to avoid overwriting the initialized default cache. + ret = krb5_cc_new_unique(k5.ctx, deftype, nullptr, &k5.out_cc); + if (ret) + throw Exception("Error while generating new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc)); + k5.switch_to_cache = 1; + } + } + + // Use the default cache if we haven't picked one yet. + if (k5.out_cc == nullptr) + { + k5.out_cc = defcache; + defcache = nullptr; + LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc)); + } + + ret = krb5_unparse_name(k5.ctx, k5.me, &k5.name); + if (ret) + throw Exception("Error when unparsing name" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Using principal: {}", k5.name); + + // Allocate a new initial credential options structure. + ret = krb5_get_init_creds_opt_alloc(k5.ctx, &options); + if (ret) + throw Exception("Error in options allocation" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + + // Resolve keytab + ret = krb5_kt_resolve(k5.ctx, keytab_file.c_str(), &keytab); + if (ret) + throw Exception("Error in resolving keytab "+keytab_file + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Using keytab: {}", keytab_file); + + // Set an output credential cache in initial credential options. + ret = krb5_get_init_creds_opt_set_out_ccache(k5.ctx, options, k5.out_cc); + if (ret) + throw Exception("Error in setting output credential cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + + // Action: init or renew + LOG_TRACE(log,"Trying to renew credentials"); + memset(&my_creds, 0, sizeof(my_creds)); + // Get renewed credential from KDC using an existing credential from output cache. + ret = krb5_get_renewed_creds(k5.ctx, &my_creds, k5.me, k5.out_cc, nullptr); + if (ret) + { + LOG_TRACE(log,"Renew failed {}", fmtError(ret)); + LOG_TRACE(log,"Trying to get initial credentials"); + // Request KDC for an initial credentials using keytab. + ret = krb5_get_init_creds_keytab(k5.ctx, &my_creds, k5.me, keytab, 0, nullptr, options); + if (ret) + throw Exception("Error in getting initial credentials" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + else + LOG_TRACE(log,"Got initial credentials"); + } + else + { + LOG_TRACE(log,"Successful renewal"); + // Initialize a credential cache. Destroy any existing contents of cache and initialize it for the default principal. + ret = krb5_cc_initialize(k5.ctx, k5.out_cc, k5.me); + if (ret) + throw Exception("Error when initializing cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + LOG_TRACE(log,"Initialized cache"); + // Store credentials in a credential cache. + ret = krb5_cc_store_cred(k5.ctx, k5.out_cc, &my_creds); + if (ret) + LOG_TRACE(log,"Error while storing credentials"); + LOG_TRACE(log,"Stored credentials"); + } + + if (k5.switch_to_cache) + { + // Make a credential cache the primary cache for its collection. + ret = krb5_cc_switch(k5.ctx, k5.out_cc); + if (ret) + throw Exception("Error while switching to new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR); + } + + LOG_TRACE(log,"Authenticated to Kerberos v5"); +} + +KerberosInit::~KerberosInit() +{ + if (k5.ctx) + { + if (defcache) + krb5_cc_close(k5.ctx, defcache); + krb5_free_principal(k5.ctx, defcache_princ); + + if (options) + krb5_get_init_creds_opt_free(k5.ctx, options); + if (my_creds.client == k5.me) + my_creds.client = nullptr; + krb5_free_cred_contents(k5.ctx, &my_creds); + if (keytab) + krb5_kt_close(k5.ctx, keytab); + + krb5_free_unparsed_name(k5.ctx, k5.name); + krb5_free_principal(k5.ctx, k5.me); + if (k5.out_cc != nullptr) + krb5_cc_close(k5.ctx, k5.out_cc); + krb5_free_context(k5.ctx); + } +} + +void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name) +{ + // Using mutex to prevent cache file corruptions + static std::mutex kinit_mtx; + std::unique_lock lck(kinit_mtx); + KerberosInit k_init; + k_init.init(keytab_file, principal, cache_name); +} +#endif // USE_KRB5 diff --git a/src/Access/KerberosInit.h b/src/Access/KerberosInit.h new file mode 100644 index 00000000000..5a11a275529 --- /dev/null +++ b/src/Access/KerberosInit.h @@ -0,0 +1,11 @@ +#pragma once + +#include "config_core.h" + +#include + +#if USE_KRB5 + +void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name = ""); + +#endif // USE_KRB5 diff --git a/src/Access/examples/CMakeLists.txt b/src/Access/examples/CMakeLists.txt new file mode 100644 index 00000000000..07f75ca0b47 --- /dev/null +++ b/src/Access/examples/CMakeLists.txt @@ -0,0 +1,4 @@ +if (TARGET ch_contrib::krb5) + add_executable (kerberos_init kerberos_init.cpp) + target_link_libraries (kerberos_init PRIVATE dbms ch_contrib::krb5) +endif() diff --git a/src/Access/examples/kerberos_init.cpp b/src/Access/examples/kerberos_init.cpp new file mode 100644 index 00000000000..5dbe92a5b57 --- /dev/null +++ b/src/Access/examples/kerberos_init.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include +#include +#include + +/** The example demonstrates using of kerberosInit function to obtain and cache Kerberos ticket-granting ticket. + * The first argument specifies keytab file. The second argument specifies principal name. + * The third argument is optional. It specifies credentials cache location. + * After successful run of kerberos_init it is useful to call klist command to list cached Kerberos tickets. + * It is also useful to run kdestroy to destroy Kerberos tickets if needed. + */ + +using namespace DB; + +int main(int argc, char ** argv) +{ + std::cout << "Kerberos Init" << "\n"; + + if (argc < 3) + { + std::cout << "kerberos_init obtains and caches an initial ticket-granting ticket for principal." << "\n\n"; + std::cout << "Usage:" << "\n" << " kerberos_init keytab principal [cache]" << "\n"; + return 0; + } + + String cache_name = ""; + if (argc == 4) + cache_name = argv[3]; + + Poco::AutoPtr app_channel(new Poco::ConsoleChannel(std::cerr)); + Poco::Logger::root().setChannel(app_channel); + Poco::Logger::root().setLevel("trace"); + + try + { + kerberosInit(argv[1], argv[2], cache_name); + } + catch (const Exception & e) + { + std::cout << "KerberosInit failure: " << getExceptionMessage(e, false) << "\n"; + return -1; + } + std::cout << "Done" << "\n"; + return 0; +} diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 51f4c83dc23..a9aef7d8465 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -338,11 +338,8 @@ void MemoryTracker::free(Int64 size) accounted_size += new_amount; } } - if (!OvercommitTrackerBlockerInThread::isBlocked()) - { - if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr) - overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size); - } + if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed)) + overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size); if (auto * loaded_next = parent.load(std::memory_order_relaxed)) loaded_next->free(size); diff --git a/src/Common/OvercommitTracker.cpp b/src/Common/OvercommitTracker.cpp index 3cef72eb8b4..4faed833428 100644 --- a/src/Common/OvercommitTracker.cpp +++ b/src/Common/OvercommitTracker.cpp @@ -20,11 +20,32 @@ OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_) , global_mutex(global_mutex_) , freed_memory(0) , required_memory(0) + , next_id(0) + , id_to_release(0) , allow_release(true) {} +#define LOG_DEBUG_SAFE(...) \ + do { \ + OvercommitTrackerBlockerInThread blocker; \ + try \ + { \ + ALLOW_ALLOCATIONS_IN_SCOPE; \ + LOG_DEBUG(__VA_ARGS__); \ + } \ + catch (...) \ + { \ + if (fprintf(stderr, "Allocation failed during writing to log in OvercommitTracker\n") != -1) \ + ; \ + } \ + } while (false) + OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount) { + DENY_ALLOCATIONS_IN_SCOPE; + + if (OvercommitTrackerBlockerInThread::isBlocked()) + return OvercommitResult::NONE; // NOTE: Do not change the order of locks // // global_mutex must be acquired before overcommit_m, because @@ -34,6 +55,8 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int std::unique_lock global_lock(global_mutex); std::unique_lock lk(overcommit_m); + size_t id = next_id++; + auto max_wait_time = tracker->getOvercommitWaitingTime(); if (max_wait_time == ZERO_MICROSEC) @@ -65,23 +88,21 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int allow_release = true; required_memory += amount; - required_per_thread[tracker] = amount; auto wait_start_time = std::chrono::system_clock::now(); - bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]() + bool timeout = !cv.wait_for(lk, max_wait_time, [this, id]() { - return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE; + return id < id_to_release || cancellation_state == QueryCancellationState::NONE; }); auto wait_end_time = std::chrono::system_clock::now(); ProfileEvents::increment(ProfileEvents::MemoryOvercommitWaitTimeMicroseconds, (wait_end_time - wait_start_time) / 1us); - LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : "")); + LOG_DEBUG_SAFE(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : "")); required_memory -= amount; - Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0 - required_per_thread.erase(tracker); + bool still_need = !(id < id_to_release); // True if thread wasn't released // If threads where not released since last call of this method, // we can release them now. - if (allow_release && required_memory <= freed_memory && still_need != 0) + if (allow_release && required_memory <= freed_memory && still_need) releaseThreads(); // All required amount of memory is free now and selected query to stop doesn't know about it. @@ -90,7 +111,7 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int reset(); if (timeout) return OvercommitResult::TIMEOUTED; - if (still_need != 0) + if (still_need) return OvercommitResult::NOT_ENOUGH_FREED; else return OvercommitResult::MEMORY_FREED; @@ -98,6 +119,11 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount) { + DENY_ALLOCATIONS_IN_SCOPE; + + if (OvercommitTrackerBlockerInThread::isBlocked()) + return; + std::lock_guard guard(overcommit_m); if (cancellation_state != QueryCancellationState::NONE) { @@ -109,10 +135,12 @@ void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount) void OvercommitTracker::onQueryStop(MemoryTracker * tracker) { + DENY_ALLOCATIONS_IN_SCOPE; + std::unique_lock lk(overcommit_m); if (picked_tracker == tracker) { - LOG_DEBUG(getLogger(), "Picked query stopped"); + LOG_DEBUG_SAFE(getLogger(), "Picked query stopped"); reset(); cv.notify_all(); @@ -121,8 +149,7 @@ void OvercommitTracker::onQueryStop(MemoryTracker * tracker) void OvercommitTracker::releaseThreads() { - for (auto & required : required_per_thread) - required.second = 0; + id_to_release = next_id; freed_memory = 0; allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery cv.notify_all(); @@ -140,7 +167,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl() // At this moment query list must be read only. // This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery. auto & queries = user_process_list->queries; - LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", queries.size()); + LOG_DEBUG_SAFE(logger, "Trying to choose query to stop from {} queries", queries.size()); for (auto const & query : queries) { if (query.second->isKilled()) @@ -151,14 +178,14 @@ void UserOvercommitTracker::pickQueryToExcludeImpl() continue; auto ratio = memory_tracker->getOvercommitRatio(); - LOG_DEBUG(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit); + LOG_DEBUG_SAFE(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit); if (ratio.soft_limit != 0 && current_ratio < ratio) { query_tracker = memory_tracker; current_ratio = ratio; } } - LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}", + LOG_DEBUG_SAFE(logger, "Selected to stop query with overcommit ratio {}/{}", current_ratio.committed, current_ratio.soft_limit); picked_tracker = query_tracker; } @@ -174,7 +201,7 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl() OvercommitRatio current_ratio{0, 0}; // At this moment query list must be read only. // This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery. - LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", process_list->size()); + LOG_DEBUG_SAFE(logger, "Trying to choose query to stop from {} queries", process_list->size()); for (auto const & query : process_list->processes) { if (query.isKilled()) @@ -190,14 +217,14 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl() if (!memory_tracker) continue; auto ratio = memory_tracker->getOvercommitRatio(user_soft_limit); - LOG_DEBUG(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit); + LOG_DEBUG_SAFE(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit); if (current_ratio < ratio) { query_tracker = memory_tracker; current_ratio = ratio; } } - LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}", + LOG_DEBUG_SAFE(logger, "Selected to stop query with overcommit ratio {}/{}", current_ratio.committed, current_ratio.soft_limit); picked_tracker = query_tracker; } diff --git a/src/Common/OvercommitTracker.h b/src/Common/OvercommitTracker.h index 80aaed68e37..6f6788493a9 100644 --- a/src/Common/OvercommitTracker.h +++ b/src/Common/OvercommitTracker.h @@ -104,6 +104,10 @@ private: picked_tracker = nullptr; cancellation_state = QueryCancellationState::NONE; freed_memory = 0; + + next_id = 0; + id_to_release = 0; + allow_release = true; } @@ -111,8 +115,6 @@ private: QueryCancellationState cancellation_state; - std::unordered_map required_per_thread; - // Global mutex which is used in ProcessList to synchronize // insertion and deletion of queries. // OvercommitTracker::pickQueryToExcludeImpl() implementations @@ -122,6 +124,9 @@ private: Int64 freed_memory; Int64 required_memory; + size_t next_id; // Id provided to the next thread to come in OvercommitTracker + size_t id_to_release; // We can release all threads with id smaller than this + bool allow_release; }; diff --git a/src/Common/Volnitsky.h b/src/Common/Volnitsky.h index 7eca0c0fe53..a6aef293ac1 100644 --- a/src/Common/Volnitsky.h +++ b/src/Common/Volnitsky.h @@ -476,7 +476,7 @@ class MultiVolnitskyBase { private: /// needles and their offsets - const std::vector & needles; + const std::vector & needles; /// fallback searchers @@ -502,7 +502,7 @@ private: static constexpr size_t small_limit = VolnitskyTraits::hash_size / 8; public: - explicit MultiVolnitskyBase(const std::vector & needles_) : needles{needles_}, step{0}, last{0} + explicit MultiVolnitskyBase(const std::vector & needles_) : needles{needles_}, step{0}, last{0} { fallback_searchers.reserve(needles.size()); hash = std::unique_ptr(new OffsetId[VolnitskyTraits::hash_size]); /// No zero initialization, it will be done later. @@ -535,8 +535,8 @@ public: for (; last < size; ++last) { - const char * cur_needle_data = needles[last].data; - const size_t cur_needle_size = needles[last].size; + const char * cur_needle_data = needles[last].data(); + const size_t cur_needle_size = needles[last].size(); /// save the indices of fallback searchers if (VolnitskyTraits::isFallbackNeedle(cur_needle_size)) @@ -593,7 +593,7 @@ public: { const auto res = pos - (hash[cell_num].off - 1); const size_t ind = hash[cell_num].id; - if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) + if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) return true; } } @@ -625,7 +625,7 @@ public: { const auto res = pos - (hash[cell_num].off - 1); const size_t ind = hash[cell_num].id; - if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) + if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) answer = std::min(answer, ind); } } @@ -663,7 +663,7 @@ public: { const auto res = pos - (hash[cell_num].off - 1); const size_t ind = hash[cell_num].id; - if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) + if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) answer = std::min(answer, res - haystack); } } @@ -699,7 +699,7 @@ public: const auto * res = pos - (hash[cell_num].off - 1); const size_t ind = hash[cell_num].id; if (answer[ind] == 0 - && res + needles[ind].size <= haystack_end + && res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res)) answer[ind] = count_chars(haystack, res); } diff --git a/src/Core/PostgreSQL/Connection.h b/src/Core/PostgreSQL/Connection.h index 8c5609dc66b..97ce3c152d5 100644 --- a/src/Core/PostgreSQL/Connection.h +++ b/src/Core/PostgreSQL/Connection.h @@ -32,7 +32,7 @@ struct ConnectionInfo class Connection : private boost::noncopyable { public: - Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3); + explicit Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3); void execWithRetry(const std::function & exec); diff --git a/src/Core/PostgreSQL/PoolWithFailover.h b/src/Core/PostgreSQL/PoolWithFailover.h index 600e12fb53a..4e3a17b5e9c 100644 --- a/src/Core/PostgreSQL/PoolWithFailover.h +++ b/src/Core/PostgreSQL/PoolWithFailover.h @@ -25,13 +25,13 @@ public: static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000; static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5; - PoolWithFailover( + explicit PoolWithFailover( const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority, size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); - PoolWithFailover( + explicit PoolWithFailover( const DB::StoragePostgreSQLConfiguration & configuration, size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE, size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT, diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e16ad65880b..ec6c600b630 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -591,6 +591,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, use_local_cache_for_remote_storage, true, "Use local cache for remote storage like HDFS or S3, it's used for remote table engine only", 0) \ \ M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \ + M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \ + M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \ \ /** Experimental functions */ \ M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \ @@ -601,6 +603,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \ M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \ M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \ + M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \ M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \ M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \ M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \ diff --git a/src/Dictionaries/RedisDictionarySource.h b/src/Dictionaries/RedisDictionarySource.h index bf745a7bb41..26f5ab2a613 100644 --- a/src/Dictionaries/RedisDictionarySource.h +++ b/src/Dictionaries/RedisDictionarySource.h @@ -8,11 +8,6 @@ namespace Poco { - namespace Util - { - class AbstractConfiguration; - } - namespace Redis { class Client; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index 32fd285dcdb..c25d1d7470c 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -113,7 +113,7 @@ std::unique_ptr AzureObjectStorage::writeObject( /// NO return std::make_unique(std::move(buffer), std::move(finalize_callback), path); } -void AzureObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const +void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const { auto client_ptr = client.get(); diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index 559be0ad257..ab7d2b28508 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -73,7 +73,7 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, const WriteSettings & write_settings = {}) override; - void listPrefix(const std::string & path, PathsWithSize & children) const override; + void listPrefix(const std::string & path, RelativePathsWithSize & children) const override; /// Remove file. Throws exception if file doesn't exists or it's a directory. void removeObject(const std::string & path) override; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp index 4564e84316d..a2c0c8abd36 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp @@ -51,7 +51,7 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf) remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); } assertChar('\n', buf); - storage_objects[i].relative_path = remote_fs_object_path; + storage_objects[i].path = remote_fs_object_path; storage_objects[i].bytes_size = remote_fs_object_size; } diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h index adebbcb952d..0f2a2a5507d 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h @@ -12,17 +12,6 @@ namespace DB struct DiskObjectStorageMetadata { private: - struct RelativePathWithSize - { - String relative_path; - size_t bytes_size; - - RelativePathWithSize() = default; - - RelativePathWithSize(const String & relative_path_, size_t bytes_size_) - : relative_path(relative_path_), bytes_size(bytes_size_) {} - }; - /// Metadata file version. static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1; static constexpr uint32_t VERSION_RELATIVE_PATHS = 2; @@ -31,7 +20,7 @@ private: const std::string & common_metadata_path; /// Relative paths of blobs. - std::vector storage_objects; + RelativePathsWithSize storage_objects; /// URI const std::string & remote_fs_root_path; @@ -71,7 +60,7 @@ public: return remote_fs_root_path; } - std::vector getBlobsRelativePaths() const + RelativePathsWithSize getBlobsRelativePaths() const { return storage_objects; } diff --git a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp index a8140e8954e..c9dbb5de078 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -379,7 +380,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage * return true; }; - PathsWithSize children; + RelativePathsWithSize children; source_object_storage->listPrefix(restore_information.source_path, children); restore_files(children); @@ -523,7 +524,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject return true; }; - PathsWithSize children; + RelativePathsWithSize children; source_object_storage->listPrefix(restore_information.source_path + "operations/", children); restore_file_operations(children); diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp index 82c700e1a63..bedd1a83df1 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp @@ -81,7 +81,7 @@ std::unique_ptr HDFSObjectStorage::writeObject( /// NOL } -void HDFSObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const +void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const { const size_t begin_of_path = path.find('/', path.find("//") + 2); int32_t num_entries; diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h index 28f553906ea..69878568548 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h @@ -75,7 +75,7 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, const WriteSettings & write_settings = {}) override; - void listPrefix(const std::string & path, PathsWithSize & children) const override; + void listPrefix(const std::string & path, RelativePathsWithSize & children) const override; /// Remove file. Throws exception if file doesn't exists or it's a directory. void removeObject(const std::string & path) override; diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 64022ec046d..7532f2a3267 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -25,8 +25,7 @@ class WriteBufferFromFileBase; using ObjectAttributes = std::map; -/// Path to a file and its size. -/// Path can be either relative or absolute - according to the context of use. +/// Path to a file (always absolute) and its size. struct PathWithSize { std::string path; @@ -42,6 +41,7 @@ struct PathWithSize /// List of paths with their sizes using PathsWithSize = std::vector; +using RelativePathsWithSize = PathsWithSize; struct ObjectMetadata { @@ -66,7 +66,7 @@ public: virtual bool exists(const std::string & path) const = 0; /// List on prefix, return children (relative paths) with their sizes. - virtual void listPrefix(const std::string & path, PathsWithSize & children) const = 0; + virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0; /// Get object metadata if supported. It should be possible to receive /// at least size of object diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 58bd29d2d73..9236cde6e93 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -195,7 +195,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN return std::make_unique(std::move(s3_buffer), std::move(finalize_callback), path); } -void S3ObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const +void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const { auto settings_ptr = s3_settings.get(); auto client_ptr = client.get(); diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 5c53ea1f894..5d4300bffd3 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -80,7 +80,7 @@ public: size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, const WriteSettings & write_settings = {}) override; - void listPrefix(const std::string & path, PathsWithSize & children) const override; + void listPrefix(const std::string & path, RelativePathsWithSize & children) const override; /// Remove file. Throws exception if file doesn't exist or it's a directory. void removeObject(const std::string & path) override; diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h index 038bb9bf3df..e0c42401207 100644 --- a/src/Functions/FunctionsConversion.h +++ b/src/Functions/FunctionsConversion.h @@ -205,7 +205,7 @@ struct ConvertImpl if constexpr (std::is_same_v != std::is_same_v) { - throw Exception("Conversion between numeric types and UUID is not supported", ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Conversion between numeric types and UUID is not supported. Probably the passed UUID is unquoted", ErrorCodes::NOT_IMPLEMENTED); } else { diff --git a/src/Functions/FunctionsMultiStringFuzzySearch.h b/src/Functions/FunctionsMultiStringFuzzySearch.h index e082cfbec2f..865a5d182c8 100644 --- a/src/Functions/FunctionsMultiStringFuzzySearch.h +++ b/src/Functions/FunctionsMultiStringFuzzySearch.h @@ -10,10 +10,8 @@ #include #include #include -#include #include #include -#include #include @@ -23,35 +21,28 @@ namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_COLUMN; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int FUNCTION_NOT_ALLOWED; } -template +template class FunctionsMultiStringFuzzySearch : public IFunction { - static_assert(LimitArgs > 0); - public: static constexpr auto name = Impl::name; + static FunctionPtr create(ContextPtr context) { - if (Impl::is_using_hyperscan && !context->getSettingsRef().allow_hyperscan) - throw Exception( - "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0", ErrorCodes::FUNCTION_NOT_ALLOWED); - - return std::make_shared( - context->getSettingsRef().max_hyperscan_regexp_length, context->getSettingsRef().max_hyperscan_regexp_total_length); + const auto & settings = context->getSettingsRef(); + return std::make_shared(settings.allow_hyperscan, settings.max_hyperscan_regexp_length, settings.max_hyperscan_regexp_total_length); } - FunctionsMultiStringFuzzySearch(size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_) - : max_hyperscan_regexp_length(max_hyperscan_regexp_length_), max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_) - { - } + FunctionsMultiStringFuzzySearch(bool allow_hyperscan_, size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_) + : allow_hyperscan(allow_hyperscan_) + , max_hyperscan_regexp_length(max_hyperscan_regexp_length_) + , max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_) + {} String getName() const override { return name; } - size_t getNumberOfArguments() const override { return 3; } bool useDefaultImplementationForConstants() const override { return true; } bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } @@ -60,82 +51,53 @@ public: DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override { if (!isString(arguments[0])) - throw Exception( - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[0]->getName(), getName()); if (!isUnsignedInteger(arguments[1])) - throw Exception( - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[1]->getName(), getName()); const DataTypeArray * array_type = checkAndGetDataType(arguments[2].get()); if (!array_type || !checkAndGetDataType(array_type->getNestedType().get())) - throw Exception( - "Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[2]->getName(), getName()); + return Impl::getReturnType(); } ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override { - using ResultType = typename Impl::ResultType; - const ColumnPtr & column_haystack = arguments[0].column; + const ColumnPtr & num_ptr = arguments[1].column; + const ColumnPtr & arr_ptr = arguments[2].column; const ColumnString * col_haystack_vector = checkAndGetColumn(&*column_haystack); + assert(col_haystack_vector); // getReturnTypeImpl() checks the data type - const ColumnPtr & num_ptr = arguments[1].column; - const ColumnConst * col_const_num = nullptr; UInt32 edit_distance = 0; - - if ((col_const_num = checkAndGetColumnConst(num_ptr.get()))) - edit_distance = col_const_num->getValue(); - else if ((col_const_num = checkAndGetColumnConst(num_ptr.get()))) - edit_distance = col_const_num->getValue(); - else if ((col_const_num = checkAndGetColumnConst(num_ptr.get()))) - edit_distance = col_const_num->getValue(); + if (const auto * col_const_uint8 = checkAndGetColumnConst(num_ptr.get())) + edit_distance = col_const_uint8->getValue(); + else if (const auto * col_const_uint16 = checkAndGetColumnConst(num_ptr.get())) + edit_distance = col_const_uint16->getValue(); + else if (const auto * col_const_uint32 = checkAndGetColumnConst(num_ptr.get())) + edit_distance = col_const_uint32->getValue(); else - throw Exception( - "Illegal column " + arguments[1].column->getName() - + ". The number is not const or does not fit in UInt32", - ErrorCodes::ILLEGAL_COLUMN); + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The number is not const or does not fit in UInt32", arguments[1].column->getName()); - - const ColumnPtr & arr_ptr = arguments[2].column; const ColumnConst * col_const_arr = checkAndGetColumnConst(arr_ptr.get()); - if (!col_const_arr) - throw Exception( - "Illegal column " + arguments[2].column->getName() + ". The array is not const", - ErrorCodes::ILLEGAL_COLUMN); - - Array src_arr = col_const_arr->getValue(); - - if (src_arr.size() > LimitArgs) - throw Exception( - "Number of arguments for function " + getName() + " doesn't match: passed " + std::to_string(src_arr.size()) - + ", should be at most " + std::to_string(LimitArgs), - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - std::vector refs; - refs.reserve(src_arr.size()); - - for (const auto & el : src_arr) - refs.emplace_back(el.get()); - - if (Impl::is_using_hyperscan) - checkRegexp(refs, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The array is not const", arguments[2].column->getName()); + using ResultType = typename Impl::ResultType; auto col_res = ColumnVector::create(); auto col_offsets = ColumnArray::ColumnOffsets::create(); auto & vec_res = col_res->getData(); auto & offsets_res = col_offsets->getData(); + // the implementations are responsible for resizing the output column - /// The blame for resizing output is for the callee. - if (col_haystack_vector) - Impl::vectorConstant( - col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), refs, vec_res, offsets_res, edit_distance); - else - throw Exception("Illegal column " + arguments[0].column->getName(), ErrorCodes::ILLEGAL_COLUMN); + Array needles_arr = col_const_arr->getValue(); + Impl::vectorConstant( + col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), needles_arr, vec_res, offsets_res, edit_distance, + allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); if constexpr (Impl::is_column_array) return ColumnArray::create(std::move(col_res), std::move(col_offsets)); @@ -144,8 +106,9 @@ public: } private: - size_t max_hyperscan_regexp_length; - size_t max_hyperscan_regexp_total_length; + const bool allow_hyperscan; + const size_t max_hyperscan_regexp_length; + const size_t max_hyperscan_regexp_total_length; }; } diff --git a/src/Functions/FunctionsMultiStringPosition.h b/src/Functions/FunctionsMultiStringPosition.h index 357606f4042..855b5448b87 100644 --- a/src/Functions/FunctionsMultiStringPosition.h +++ b/src/Functions/FunctionsMultiStringPosition.h @@ -98,7 +98,8 @@ public: + ", should be at most 255", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - std::vector refs; + std::vector refs; + refs.reserve(src_arr.size()); for (const auto & el : src_arr) refs.emplace_back(el.get()); diff --git a/src/Functions/FunctionsMultiStringSearch.h b/src/Functions/FunctionsMultiStringSearch.h index 4bc8af3f214..04235e0a97a 100644 --- a/src/Functions/FunctionsMultiStringSearch.h +++ b/src/Functions/FunctionsMultiStringSearch.h @@ -10,15 +10,17 @@ #include #include #include -#include #include #include -#include namespace DB { /** + * multiMatchAny(haystack, [pattern_1, pattern_2, ..., pattern_n]) + * multiMatchAnyIndex(haystack, [pattern_1, pattern_2, ..., pattern_n]) + * multiMatchAllIndices(haystack, [pattern_1, pattern_2, ..., pattern_n]) + * * multiSearchAny(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- find any of the const patterns inside haystack and return 0 or 1 * multiSearchAnyUTF8(haystack, [pattern_1, pattern_2, ..., pattern_n]) * multiSearchAnyCaseInsensitive(haystack, [pattern_1, pattern_2, ..., pattern_n]) @@ -34,37 +36,28 @@ namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_COLUMN; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int FUNCTION_NOT_ALLOWED; } -/// The argument limiting raises from Volnitsky searcher -- it is performance crucial to save only one byte for pattern number. -/// But some other searchers use this function, for example, multiMatchAny -- hyperscan does not have such restrictions -template ::max()> +template class FunctionsMultiStringSearch : public IFunction { - static_assert(LimitArgs > 0); - public: static constexpr auto name = Impl::name; + static FunctionPtr create(ContextPtr context) { - if (Impl::is_using_hyperscan && !context->getSettingsRef().allow_hyperscan) - throw Exception( - "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0", ErrorCodes::FUNCTION_NOT_ALLOWED); - - return std::make_shared( - context->getSettingsRef().max_hyperscan_regexp_length, context->getSettingsRef().max_hyperscan_regexp_total_length); + const auto & settings = context->getSettingsRef(); + return std::make_shared(settings.allow_hyperscan, settings.max_hyperscan_regexp_length, settings.max_hyperscan_regexp_total_length); } - FunctionsMultiStringSearch(size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_) - : max_hyperscan_regexp_length(max_hyperscan_regexp_length_), max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_) - { - } + FunctionsMultiStringSearch(bool allow_hyperscan_, size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_) + : allow_hyperscan(allow_hyperscan_) + , max_hyperscan_regexp_length(max_hyperscan_regexp_length_) + , max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_) + {} String getName() const override { return name; } - size_t getNumberOfArguments() const override { return 2; } bool useDefaultImplementationForConstants() const override { return true; } bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; } @@ -73,60 +66,39 @@ public: DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override { if (!isString(arguments[0])) - throw Exception( - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[0]->getName(), getName()); const DataTypeArray * array_type = checkAndGetDataType(arguments[1].get()); if (!array_type || !checkAndGetDataType(array_type->getNestedType().get())) - throw Exception( - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[1]->getName(), getName()); + return Impl::getReturnType(); } ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override { - using ResultType = typename Impl::ResultType; - const ColumnPtr & column_haystack = arguments[0].column; + const ColumnPtr & arr_ptr = arguments[1].column; const ColumnString * col_haystack_vector = checkAndGetColumn(&*column_haystack); + assert(col_haystack_vector); // getReturnTypeImpl() checks the data type - const ColumnPtr & arr_ptr = arguments[1].column; const ColumnConst * col_const_arr = checkAndGetColumnConst(arr_ptr.get()); - if (!col_const_arr) - throw Exception( - "Illegal column " + arguments[1].column->getName() + ". The array is not const", - ErrorCodes::ILLEGAL_COLUMN); - - Array src_arr = col_const_arr->getValue(); - - if (src_arr.size() > LimitArgs) - throw Exception( - "Number of arguments for function " + getName() + " doesn't match: passed " + std::to_string(src_arr.size()) - + ", should be at most " + std::to_string(LimitArgs), - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - std::vector refs; - refs.reserve(src_arr.size()); - - for (const auto & el : src_arr) - refs.emplace_back(el.get()); - - if (Impl::is_using_hyperscan) - checkRegexp(refs, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The array is not const", arguments[1].column->getName()); + using ResultType = typename Impl::ResultType; auto col_res = ColumnVector::create(); auto col_offsets = ColumnArray::ColumnOffsets::create(); auto & vec_res = col_res->getData(); auto & offsets_res = col_offsets->getData(); + // the implementations are responsible for resizing the output column - /// The blame for resizing output is for the callee. - if (col_haystack_vector) - Impl::vectorConstant(col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), refs, vec_res, offsets_res); - else - throw Exception("Illegal column " + arguments[0].column->getName(), ErrorCodes::ILLEGAL_COLUMN); + Array needles_arr = col_const_arr->getValue(); + Impl::vectorConstant( + col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), needles_arr, vec_res, offsets_res, + allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); if constexpr (Impl::is_column_array) return ColumnArray::create(std::move(col_res), std::move(col_offsets)); @@ -135,8 +107,9 @@ public: } private: - size_t max_hyperscan_regexp_length; - size_t max_hyperscan_regexp_total_length; + const bool allow_hyperscan; + const size_t max_hyperscan_regexp_length; + const size_t max_hyperscan_regexp_total_length; }; } diff --git a/src/Functions/MultiMatchAllIndicesImpl.h b/src/Functions/MultiMatchAllIndicesImpl.h index 80a71548deb..9c60dbffe91 100644 --- a/src/Functions/MultiMatchAllIndicesImpl.h +++ b/src/Functions/MultiMatchAllIndicesImpl.h @@ -1,9 +1,11 @@ #pragma once #include +#include #include #include #include +#include #include "Regexps.h" #include "config_functions.h" @@ -19,18 +21,19 @@ namespace DB namespace ErrorCodes { - extern const int HYPERSCAN_CANNOT_SCAN_TEXT; extern const int CANNOT_ALLOCATE_MEMORY; + extern const int FUNCTION_NOT_ALLOWED; + extern const int HYPERSCAN_CANNOT_SCAN_TEXT; extern const int NOT_IMPLEMENTED; extern const int TOO_MANY_BYTES; } -template +template struct MultiMatchAllIndicesImpl { - using ResultType = Type; - static constexpr bool is_using_hyperscan = true; + using ResultType = ResultType_; + /// Variable for understanding, if we used offsets for the output, most /// likely to determine whether the function returns ColumnVector of ColumnArray. static constexpr bool is_column_array = true; @@ -44,24 +47,39 @@ struct MultiMatchAllIndicesImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, - PaddedPODArray & res, - PaddedPODArray & offsets) + const Array & needles_arr, + PaddedPODArray & res, + PaddedPODArray & offsets, + bool allow_hyperscan, + size_t max_hyperscan_regexp_length, + size_t max_hyperscan_regexp_total_length) { - vectorConstant(haystack_data, haystack_offsets, needles, res, offsets, std::nullopt); + vectorConstant(haystack_data, haystack_offsets, needles_arr, res, offsets, std::nullopt, allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); } static void vectorConstant( - const ColumnString::Chars & haystack_data, - const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, - PaddedPODArray & res, - PaddedPODArray & offsets, - [[maybe_unused]] std::optional edit_distance) + [[maybe_unused]] const ColumnString::Chars & haystack_data, + [[maybe_unused]] const ColumnString::Offsets & haystack_offsets, + [[maybe_unused]] const Array & needles_arr, + [[maybe_unused]] PaddedPODArray & res, + [[maybe_unused]] PaddedPODArray & offsets, + [[maybe_unused]] std::optional edit_distance, + bool allow_hyperscan, + [[maybe_unused]] size_t max_hyperscan_regexp_length, + [[maybe_unused]] size_t max_hyperscan_regexp_total_length) { - offsets.resize(haystack_offsets.size()); + if (!allow_hyperscan) + throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0"); #if USE_VECTORSCAN - const auto & hyperscan_regex = MultiRegexps::get(needles, edit_distance); + std::vector needles; + needles.reserve(needles_arr.size()); + for (const auto & needle : needles_arr) + needles.emplace_back(needle.get()); + + checkHyperscanRegexp(needles, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); + + offsets.resize(haystack_offsets.size()); + const auto & hyperscan_regex = MultiRegexps::get(needles, edit_distance); hs_scratch_t * scratch = nullptr; hs_error_t err = hs_clone_scratch(hyperscan_regex->getScratch(), &scratch); @@ -76,7 +94,7 @@ struct MultiMatchAllIndicesImpl unsigned int /* flags */, void * context) -> int { - static_cast*>(context)->push_back(id); + static_cast*>(context)->push_back(id); return 0; }; const size_t haystack_offsets_size = haystack_offsets.size(); @@ -102,11 +120,6 @@ struct MultiMatchAllIndicesImpl offset = haystack_offsets[i]; } #else - (void)haystack_data; - (void)haystack_offsets; - (void)needles; - (void)res; - (void)offsets; throw Exception( "multi-search all indices is not implemented when vectorscan is off", ErrorCodes::NOT_IMPLEMENTED); diff --git a/src/Functions/MultiMatchAnyImpl.h b/src/Functions/MultiMatchAnyImpl.h index fbbefe7be1d..0752e87e8af 100644 --- a/src/Functions/MultiMatchAnyImpl.h +++ b/src/Functions/MultiMatchAnyImpl.h @@ -1,8 +1,10 @@ #pragma once #include +#include #include #include +#include #include "Regexps.h" #include "config_functions.h" @@ -20,19 +22,31 @@ namespace DB namespace ErrorCodes { - extern const int HYPERSCAN_CANNOT_SCAN_TEXT; extern const int CANNOT_ALLOCATE_MEMORY; + extern const int FUNCTION_NOT_ALLOWED; + extern const int HYPERSCAN_CANNOT_SCAN_TEXT; extern const int NOT_IMPLEMENTED; extern const int TOO_MANY_BYTES; } +// For more readable instantiations of MultiMatchAnyImpl<> +struct MultiMatchTraits +{ +enum class Find +{ + Any, + AnyIndex +}; +}; -template +template struct MultiMatchAnyImpl { - static_assert(static_cast(FindAny) + static_cast(FindAnyIndex) == 1); - using ResultType = Type; - static constexpr bool is_using_hyperscan = true; + using ResultType = ResultType_; + + static constexpr bool FindAny = (Find == MultiMatchTraits::Find::Any); + static constexpr bool FindAnyIndex = (Find == MultiMatchTraits::Find::AnyIndex); + /// Variable for understanding, if we used offsets for the output, most /// likely to determine whether the function returns ColumnVector of ColumnArray. static constexpr bool is_column_array = false; @@ -46,26 +60,40 @@ struct MultiMatchAnyImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, - PaddedPODArray & res, - PaddedPODArray & offsets) + const Array & needles_arr, + PaddedPODArray & res, + PaddedPODArray & offsets, + bool allow_hyperscan, + size_t max_hyperscan_regexp_length, + size_t max_hyperscan_regexp_total_length) { - vectorConstant(haystack_data, haystack_offsets, needles, res, offsets, std::nullopt); + vectorConstant(haystack_data, haystack_offsets, needles_arr, res, offsets, std::nullopt, allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); } static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, - PaddedPODArray & res, + const Array & needles_arr, + PaddedPODArray & res, [[maybe_unused]] PaddedPODArray & offsets, - [[maybe_unused]] std::optional edit_distance) + [[maybe_unused]] std::optional edit_distance, + bool allow_hyperscan, + size_t max_hyperscan_regexp_length, + size_t max_hyperscan_regexp_total_length) { - (void)FindAny; - (void)FindAnyIndex; + if (!allow_hyperscan) + throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0"); + + std::vector needles; + needles.reserve(needles_arr.size()); + for (const auto & needle : needles_arr) + needles.emplace_back(needle.get()); + + checkHyperscanRegexp(needles, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length); + res.resize(haystack_offsets.size()); #if USE_VECTORSCAN - const auto & hyperscan_regex = MultiRegexps::get(needles, edit_distance); + const auto & hyperscan_regex = MultiRegexps::get(needles, edit_distance); hs_scratch_t * scratch = nullptr; hs_error_t err = hs_clone_scratch(hyperscan_regex->getScratch(), &scratch); @@ -81,9 +109,9 @@ struct MultiMatchAnyImpl void * context) -> int { if constexpr (FindAnyIndex) - *reinterpret_cast(context) = id; + *reinterpret_cast(context) = id; else if constexpr (FindAny) - *reinterpret_cast(context) = 1; + *reinterpret_cast(context) = 1; /// Once we hit the callback, there is no need to search for others. return 1; }; @@ -110,8 +138,8 @@ struct MultiMatchAnyImpl offset = haystack_offsets[i]; } #else - /// Fallback if do not use vectorscan - if constexpr (MultiSearchDistance) + // fallback if vectorscan is not compiled + if constexpr (WithEditDistance) throw Exception( "Edit distance multi-search is not implemented when vectorscan is off", ErrorCodes::NOT_IMPLEMENTED); @@ -120,7 +148,7 @@ struct MultiMatchAnyImpl memset(accum.data(), 0, accum.size()); for (size_t j = 0; j < needles.size(); ++j) { - MatchImpl::vectorConstant(haystack_data, haystack_offsets, needles[j].toString(), nullptr, accum); + MatchImpl::vectorConstant(haystack_data, haystack_offsets, std::string(needles[j].data(), needles[j].size()), nullptr, accum); for (size_t i = 0; i < res.size(); ++i) { if constexpr (FindAny) diff --git a/src/Functions/MultiSearchAllPositionsImpl.h b/src/Functions/MultiSearchAllPositionsImpl.h index f54fe41f20c..4356d6110f1 100644 --- a/src/Functions/MultiSearchAllPositionsImpl.h +++ b/src/Functions/MultiSearchAllPositionsImpl.h @@ -15,7 +15,7 @@ struct MultiSearchAllPositionsImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, + const std::vector & needles, PaddedPODArray & res) { auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64 diff --git a/src/Functions/MultiSearchFirstIndexImpl.h b/src/Functions/MultiSearchFirstIndexImpl.h index 26709119f6e..f69a3edbf8b 100644 --- a/src/Functions/MultiSearchFirstIndexImpl.h +++ b/src/Functions/MultiSearchFirstIndexImpl.h @@ -1,17 +1,22 @@ #pragma once #include +#include #include namespace DB { +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + template struct MultiSearchFirstIndexImpl { using ResultType = UInt64; - static constexpr bool is_using_hyperscan = false; /// Variable for understanding, if we used offsets for the output, most /// likely to determine whether the function returns ColumnVector of ColumnArray. static constexpr bool is_column_array = false; @@ -22,10 +27,24 @@ struct MultiSearchFirstIndexImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, + const Array & needles_arr, PaddedPODArray & res, - [[maybe_unused]] PaddedPODArray & offsets) + [[maybe_unused]] PaddedPODArray & offsets, + bool /*allow_hyperscan*/, + size_t /*max_hyperscan_regexp_length*/, + size_t /*max_hyperscan_regexp_total_length*/) { + // For performance of Volnitsky search, it is crucial to save only one byte for pattern number. + if (needles_arr.size() > std::numeric_limits::max()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Number of arguments for function {} doesn't match: passed {}, should be at most {}", + name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits::max())); + + std::vector needles; + needles.reserve(needles_arr.size()); + for (const auto & needle : needles_arr) + needles.emplace_back(needle.get()); + auto searcher = Impl::createMultiSearcherInBigHaystack(needles); const size_t haystack_string_size = haystack_offsets.size(); res.resize(haystack_string_size); diff --git a/src/Functions/MultiSearchFirstPositionImpl.h b/src/Functions/MultiSearchFirstPositionImpl.h index 1db8dcbde83..21d558a6d58 100644 --- a/src/Functions/MultiSearchFirstPositionImpl.h +++ b/src/Functions/MultiSearchFirstPositionImpl.h @@ -1,17 +1,22 @@ #pragma once #include +#include #include namespace DB { +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + template struct MultiSearchFirstPositionImpl { using ResultType = UInt64; - static constexpr bool is_using_hyperscan = false; /// Variable for understanding, if we used offsets for the output, most /// likely to determine whether the function returns ColumnVector of ColumnArray. static constexpr bool is_column_array = false; @@ -22,10 +27,24 @@ struct MultiSearchFirstPositionImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, + const Array & needles_arr, PaddedPODArray & res, - [[maybe_unused]] PaddedPODArray & offsets) + [[maybe_unused]] PaddedPODArray & offsets, + bool /*allow_hyperscan*/, + size_t /*max_hyperscan_regexp_length*/, + size_t /*max_hyperscan_regexp_total_length*/) { + // For performance of Volnitsky search, it is crucial to save only one byte for pattern number. + if (needles_arr.size() > std::numeric_limits::max()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Number of arguments for function {} doesn't match: passed {}, should be at most {}", + name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits::max())); + + std::vector needles; + needles.reserve(needles_arr.size()); + for (const auto & needle : needles_arr) + needles.emplace_back(needle.get()); + auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64 { return 1 + Impl::countChars(reinterpret_cast(start), reinterpret_cast(end)); diff --git a/src/Functions/MultiSearchImpl.h b/src/Functions/MultiSearchImpl.h index 7cb0cefe580..1124184f58c 100644 --- a/src/Functions/MultiSearchImpl.h +++ b/src/Functions/MultiSearchImpl.h @@ -1,17 +1,22 @@ #pragma once #include +#include #include namespace DB { +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + template struct MultiSearchImpl { using ResultType = UInt8; - static constexpr bool is_using_hyperscan = false; /// Variable for understanding, if we used offsets for the output, most /// likely to determine whether the function returns ColumnVector of ColumnArray. static constexpr bool is_column_array = false; @@ -22,10 +27,24 @@ struct MultiSearchImpl static void vectorConstant( const ColumnString::Chars & haystack_data, const ColumnString::Offsets & haystack_offsets, - const std::vector & needles, + const Array & needles_arr, PaddedPODArray & res, - [[maybe_unused]] PaddedPODArray & offsets) + [[maybe_unused]] PaddedPODArray & offsets, + bool /*allow_hyperscan*/, + size_t /*max_hyperscan_regexp_length*/, + size_t /*max_hyperscan_regexp_total_length*/) { + // For performance of Volnitsky search, it is crucial to save only one byte for pattern number. + if (needles_arr.size() > std::numeric_limits::max()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Number of arguments for function {} doesn't match: passed {}, should be at most {}", + name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits::max())); + + std::vector needles; + needles.reserve(needles_arr.size()); + for (const auto & needle : needles_arr) + needles.emplace_back(needle.get()); + auto searcher = Impl::createMultiSearcherInBigHaystack(needles); const size_t haystack_string_size = haystack_offsets.size(); res.resize(haystack_string_size); diff --git a/src/Functions/PositionImpl.h b/src/Functions/PositionImpl.h index 82e58cdc643..5380fcc36d9 100644 --- a/src/Functions/PositionImpl.h +++ b/src/Functions/PositionImpl.h @@ -38,7 +38,7 @@ struct PositionCaseSensitiveASCII return SearcherInSmallHaystack(needle_data, needle_size); } - static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) + static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) { return MultiSearcherInBigHaystack(needles); } @@ -74,7 +74,7 @@ struct PositionCaseInsensitiveASCII return SearcherInSmallHaystack(needle_data, needle_size); } - static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) + static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) { return MultiSearcherInBigHaystack(needles); } @@ -106,7 +106,7 @@ struct PositionCaseSensitiveUTF8 return SearcherInSmallHaystack(needle_data, needle_size); } - static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) + static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) { return MultiSearcherInBigHaystack(needles); } @@ -154,7 +154,7 @@ struct PositionCaseInsensitiveUTF8 return SearcherInSmallHaystack(needle_data, needle_size); } - static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) + static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector & needles) { return MultiSearcherInBigHaystack(needles); } diff --git a/src/Functions/Regexps.h b/src/Functions/Regexps.h index ac37875f91e..b932b14a6a9 100644 --- a/src/Functions/Regexps.h +++ b/src/Functions/Regexps.h @@ -145,7 +145,7 @@ public: Regexps * operator()() { - std::unique_lock lock(mutex); + std::lock_guard lock(mutex); if (regexp) return &*regexp; regexp = constructor(); @@ -166,10 +166,9 @@ struct Pool std::map, std::optional>, RegexpsConstructor> storage; }; -template -inline Regexps constructRegexps(const std::vector & str_patterns, std::optional edit_distance) +template +inline Regexps constructRegexps(const std::vector & str_patterns, [[maybe_unused]] std::optional edit_distance) { - (void)edit_distance; /// Common pointers std::vector patterns; std::vector flags; @@ -181,7 +180,7 @@ inline Regexps constructRegexps(const std::vector & str_patterns, std::o patterns.reserve(str_patterns.size()); flags.reserve(str_patterns.size()); - if constexpr (CompileForEditDistance) + if constexpr (WithEditDistance) { ext_exprs.reserve(str_patterns.size()); ext_exprs_ptrs.reserve(str_patterns.size()); @@ -199,7 +198,7 @@ inline Regexps constructRegexps(const std::vector & str_patterns, std::o * as it is said in the Hyperscan documentation. https://intel.github.io/hyperscan/dev-reference/performance.html#single-match-flag */ flags.push_back(HS_FLAG_DOTALL | HS_FLAG_SINGLEMATCH | HS_FLAG_ALLOWEMPTY | HS_FLAG_UTF8); - if constexpr (CompileForEditDistance) + if constexpr (WithEditDistance) { /// Hyperscan currently does not support UTF8 matching with edit distance. flags.back() &= ~HS_FLAG_UTF8; @@ -224,7 +223,7 @@ inline Regexps constructRegexps(const std::vector & str_patterns, std::o } hs_error_t err; - if constexpr (!CompileForEditDistance) + if constexpr (!WithEditDistance) err = hs_compile_multi( patterns.data(), flags.data(), @@ -270,23 +269,22 @@ inline Regexps constructRegexps(const std::vector & str_patterns, std::o if (err != HS_SUCCESS) throw Exception("Could not allocate scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY); - return Regexps{db, scratch}; + return {db, scratch}; } -/// If CompileForEditDistance is False, edit_distance must be nullopt +/// If WithEditDistance is False, edit_distance must be nullopt /// Also, we use templates here because each instantiation of function /// template has its own copy of local static variables which must not be the same /// for different hyperscan compilations. -template -inline Regexps * get(const std::vector & patterns, std::optional edit_distance) +template +inline Regexps * get(const std::vector & patterns, std::optional edit_distance) { - /// C++11 has thread-safe function-local static on most modern compilers. - static Pool known_regexps; /// Different variables for different pattern parameters. + static Pool known_regexps; /// Different variables for different pattern parameters, thread-safe in C++11 std::vector str_patterns; str_patterns.reserve(patterns.size()); - for (const StringRef & ref : patterns) - str_patterns.push_back(ref.toString()); + for (const auto & pattern : patterns) + str_patterns.emplace_back(std::string(pattern.data(), pattern.size())); /// Get the lock for finding database. std::unique_lock lock(known_regexps.mutex); @@ -301,7 +299,7 @@ inline Regexps * get(const std::vector & patterns, std::optionalsecond.setConstructor([&str_patterns = it->first.first, edit_distance]() { - return constructRegexps(str_patterns, edit_distance); + return constructRegexps(str_patterns, edit_distance); }); } diff --git a/src/Functions/hyperscanRegexpChecker.cpp b/src/Functions/checkHyperscanRegexp.cpp similarity index 59% rename from src/Functions/hyperscanRegexpChecker.cpp rename to src/Functions/checkHyperscanRegexp.cpp index b3c46e34daa..ba9705208d5 100644 --- a/src/Functions/hyperscanRegexpChecker.cpp +++ b/src/Functions/checkHyperscanRegexp.cpp @@ -1,4 +1,4 @@ -#include +#include #include @@ -9,16 +9,16 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -void checkRegexp(const std::vector & refs, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length) +void checkHyperscanRegexp(const std::vector & regexps, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length) { if (max_hyperscan_regexp_length > 0 || max_hyperscan_regexp_total_length > 0) { size_t total_regexp_length = 0; - for (const auto & pattern : refs) + for (const auto & regexp : regexps) { - if (max_hyperscan_regexp_length > 0 && pattern.size > max_hyperscan_regexp_length) + if (max_hyperscan_regexp_length > 0 && regexp.size() > max_hyperscan_regexp_length) throw Exception("Regexp length too large", ErrorCodes::BAD_ARGUMENTS); - total_regexp_length += pattern.size; + total_regexp_length += regexp.size(); } if (max_hyperscan_regexp_total_length > 0 && total_regexp_length > max_hyperscan_regexp_total_length) diff --git a/src/Functions/checkHyperscanRegexp.h b/src/Functions/checkHyperscanRegexp.h new file mode 100644 index 00000000000..2aac44115fc --- /dev/null +++ b/src/Functions/checkHyperscanRegexp.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +void checkHyperscanRegexp(const std::vector & regexps, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length); + +} diff --git a/src/Functions/hyperscanRegexpChecker.h b/src/Functions/hyperscanRegexpChecker.h deleted file mode 100644 index 1eea53722c1..00000000000 --- a/src/Functions/hyperscanRegexpChecker.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -void checkRegexp(const std::vector & refs, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length); - -} diff --git a/src/Functions/multiFuzzyMatchAllIndices.cpp b/src/Functions/multiFuzzyMatchAllIndices.cpp index d0121ee3981..93ffb936dc1 100644 --- a/src/Functions/multiFuzzyMatchAllIndices.cpp +++ b/src/Functions/multiFuzzyMatchAllIndices.cpp @@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAllIndices static constexpr auto name = "multiFuzzyMatchAllIndices"; }; -using FunctionMultiFuzzyMatchAllIndices = FunctionsMultiStringFuzzySearch< - MultiMatchAllIndicesImpl, - std::numeric_limits::max()>; +using FunctionMultiFuzzyMatchAllIndices = FunctionsMultiStringFuzzySearch>; } diff --git a/src/Functions/multiFuzzyMatchAny.cpp b/src/Functions/multiFuzzyMatchAny.cpp index 640e93a23b0..a627030d7af 100644 --- a/src/Functions/multiFuzzyMatchAny.cpp +++ b/src/Functions/multiFuzzyMatchAny.cpp @@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAny static constexpr auto name = "multiFuzzyMatchAny"; }; -using FunctionMultiFuzzyMatchAny = FunctionsMultiStringFuzzySearch< - MultiMatchAnyImpl, - std::numeric_limits::max()>; +using FunctionMultiFuzzyMatchAny = FunctionsMultiStringFuzzySearch>; } diff --git a/src/Functions/multiFuzzyMatchAnyIndex.cpp b/src/Functions/multiFuzzyMatchAnyIndex.cpp index f8bad1bc461..4b24a06e171 100644 --- a/src/Functions/multiFuzzyMatchAnyIndex.cpp +++ b/src/Functions/multiFuzzyMatchAnyIndex.cpp @@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAnyIndex static constexpr auto name = "multiFuzzyMatchAnyIndex"; }; -using FunctionMultiFuzzyMatchAnyIndex = FunctionsMultiStringFuzzySearch< - MultiMatchAnyImpl, - std::numeric_limits::max()>; +using FunctionMultiFuzzyMatchAnyIndex = FunctionsMultiStringFuzzySearch>; } diff --git a/src/Functions/multiMatchAllIndices.cpp b/src/Functions/multiMatchAllIndices.cpp index 940c9e7e3bf..47bd57029e2 100644 --- a/src/Functions/multiMatchAllIndices.cpp +++ b/src/Functions/multiMatchAllIndices.cpp @@ -13,9 +13,7 @@ struct NameMultiMatchAllIndices static constexpr auto name = "multiMatchAllIndices"; }; -using FunctionMultiMatchAllIndices = FunctionsMultiStringSearch< - MultiMatchAllIndicesImpl, - std::numeric_limits::max()>; +using FunctionMultiMatchAllIndices = FunctionsMultiStringSearch>; } diff --git a/src/Functions/multiMatchAny.cpp b/src/Functions/multiMatchAny.cpp index 47510e0ecc2..324e435de26 100644 --- a/src/Functions/multiMatchAny.cpp +++ b/src/Functions/multiMatchAny.cpp @@ -13,9 +13,7 @@ struct NameMultiMatchAny static constexpr auto name = "multiMatchAny"; }; -using FunctionMultiMatchAny = FunctionsMultiStringSearch< - MultiMatchAnyImpl, - std::numeric_limits::max()>; +using FunctionMultiMatchAny = FunctionsMultiStringSearch>; } diff --git a/src/Functions/multiMatchAnyIndex.cpp b/src/Functions/multiMatchAnyIndex.cpp index a56d41dc95b..6a11fa4eb35 100644 --- a/src/Functions/multiMatchAnyIndex.cpp +++ b/src/Functions/multiMatchAnyIndex.cpp @@ -13,9 +13,7 @@ struct NameMultiMatchAnyIndex static constexpr auto name = "multiMatchAnyIndex"; }; -using FunctionMultiMatchAnyIndex = FunctionsMultiStringSearch< - MultiMatchAnyImpl, - std::numeric_limits::max()>; +using FunctionMultiMatchAnyIndex = FunctionsMultiStringSearch>; } diff --git a/src/Functions/multiSearchAnyCaseInsensitive.cpp b/src/Functions/multiSearchAnyCaseInsensitive.cpp index 9bc950c0d3d..af463805ea5 100644 --- a/src/Functions/multiSearchAnyCaseInsensitive.cpp +++ b/src/Functions/multiSearchAnyCaseInsensitive.cpp @@ -13,8 +13,7 @@ struct NameMultiSearchAnyCaseInsensitive { static constexpr auto name = "multiSearchAnyCaseInsensitive"; }; -using FunctionMultiSearchCaseInsensitive - = FunctionsMultiStringSearch>; +using FunctionMultiSearchCaseInsensitive = FunctionsMultiStringSearch>; } diff --git a/src/Functions/multiSearchFirstIndex.cpp b/src/Functions/multiSearchFirstIndex.cpp index a96ebed029c..a5fbec2dc36 100644 --- a/src/Functions/multiSearchFirstIndex.cpp +++ b/src/Functions/multiSearchFirstIndex.cpp @@ -14,8 +14,7 @@ struct NameMultiSearchFirstIndex static constexpr auto name = "multiSearchFirstIndex"; }; -using FunctionMultiSearchFirstIndex - = FunctionsMultiStringSearch>; +using FunctionMultiSearchFirstIndex = FunctionsMultiStringSearch>; } diff --git a/src/Interpreters/InternalTextLogsQueue.cpp b/src/Interpreters/InternalTextLogsQueue.cpp index 6176e3cc865..2172a6f4261 100644 --- a/src/Interpreters/InternalTextLogsQueue.cpp +++ b/src/Interpreters/InternalTextLogsQueue.cpp @@ -38,7 +38,6 @@ MutableColumns InternalTextLogsQueue::getSampleColumns() void InternalTextLogsQueue::pushBlock(Block && log_block) { - OvercommitTrackerBlockerInThread blocker; static Block sample_block = getSampleBlock(); if (blocksHaveEqualStructure(sample_block, log_block)) diff --git a/src/Interpreters/InternalTextLogsQueue.h b/src/Interpreters/InternalTextLogsQueue.h index a7193a55178..53710fa3bd2 100644 --- a/src/Interpreters/InternalTextLogsQueue.h +++ b/src/Interpreters/InternalTextLogsQueue.h @@ -18,62 +18,6 @@ public: static Block getSampleBlock(); static MutableColumns getSampleColumns(); - template - bool push(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::push(std::forward(args)...); - } - - template - bool emplace(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::emplace(std::forward(args)...); - } - - template - bool pop(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::pop(std::forward(args)...); - } - - template - bool tryPush(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::tryPush(std::forward(args)...); - } - - template - bool tryEmplace(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::tryEmplace(std::forward(args)...); - } - - template - bool tryPop(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::tryPop(std::forward(args)...); - } - - template - void clear(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::clear(std::forward(args)...); - } - - template - void clearAndFinish(Args &&... args) - { - OvercommitTrackerBlockerInThread blocker; - return ConcurrentBoundedQueue::clearAndFinish(std::forward(args)...); - } - /// Is used to pass block from remote server to the client void pushBlock(Block && log_block); diff --git a/src/Interpreters/InterpreterTransactionControlQuery.h b/src/Interpreters/InterpreterTransactionControlQuery.h index bf2dc7891a7..a66a740ce0c 100644 --- a/src/Interpreters/InterpreterTransactionControlQuery.h +++ b/src/Interpreters/InterpreterTransactionControlQuery.h @@ -20,7 +20,6 @@ public: bool ignoreLimits() const override { return true; } bool supportsTransactions() const override { return true; } -private: BlockIO executeBegin(ContextMutablePtr session_context); BlockIO executeCommit(ContextMutablePtr session_context); static BlockIO executeRollback(ContextMutablePtr session_context); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 24649128cee..4b328f0466e 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -42,18 +42,19 @@ #include #include #include -#include #include +#include +#include #include #include #include -#include #include +#include #include -#include -#include -#include #include +#include +#include +#include #include #include @@ -68,6 +69,7 @@ #include #include +#include #include @@ -416,7 +418,9 @@ static std::tuple executeQueryImpl( chassert(txn->getState() != MergeTreeTransaction::COMMITTING); chassert(txn->getState() != MergeTreeTransaction::COMMITTED); if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as() && !ast->as()) - throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query because current transaction failed. Expecting ROLLBACK statement."); + throw Exception( + ErrorCodes::INVALID_TRANSACTION, + "Cannot execute query because current transaction failed. Expecting ROLLBACK statement"); } /// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter), @@ -498,6 +502,7 @@ static std::tuple executeQueryImpl( BlockIO res; String query_for_logging; + std::shared_ptr implicit_txn_control{}; try { @@ -621,11 +626,37 @@ static std::tuple executeQueryImpl( if (!table_id.empty()) context->setInsertionTable(table_id); - if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction) + if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported"); + if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported"); } else { + /// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots + if (!context->getCurrentTransaction() && settings.implicit_transaction && !ast->as()) + { + try + { + if (context->isGlobalContext()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot create transactions"); + + /// If there is no session (which is the default for the HTTP Handler), set up one just for this as it is necessary + /// to control the transaction lifetime + if (!context->hasSessionContext()) + context->makeSessionContext(); + + auto tc = std::make_shared(ast, context); + tc->executeBegin(context->getSessionContext()); + implicit_txn_control = std::move(tc); + } + catch (Exception & e) + { + e.addMessage("while starting a transaction with 'implicit_transaction'"); + throw; + } + } + interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal)); if (context->getCurrentTransaction() && !interpreter->supportsTransactions() && @@ -813,15 +844,16 @@ static std::tuple executeQueryImpl( }; /// Also make possible for caller to log successful query finish and exception during execution. - auto finish_callback = [elem, context, ast, - log_queries, - log_queries_min_type = settings.log_queries_min_type, - log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(), - log_processors_profiles = settings.log_processors_profiles, - status_info_to_query_log, - pulling_pipeline = pipeline.pulling() - ] - (QueryPipeline & query_pipeline) mutable + auto finish_callback = [elem, + context, + ast, + log_queries, + log_queries_min_type = settings.log_queries_min_type, + log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(), + log_processors_profiles = settings.log_processors_profiles, + status_info_to_query_log, + implicit_txn_control, + pulling_pipeline = pipeline.pulling()](QueryPipeline & query_pipeline) mutable { QueryStatus * process_list_elem = context->getProcessListElement(); @@ -942,15 +974,40 @@ static std::tuple executeQueryImpl( opentelemetry_span_log->add(span); } + + if (implicit_txn_control) + { + try + { + implicit_txn_control->executeCommit(context->getSessionContext()); + implicit_txn_control.reset(); + } + catch (const Exception &) + { + /// An exception might happen when trying to commit the transaction. For example we might get an immediate exception + /// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN + implicit_txn_control.reset(); + throw; + } + } }; - auto exception_callback = [elem, context, ast, - log_queries, - log_queries_min_type = settings.log_queries_min_type, - log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(), - quota(quota), status_info_to_query_log] () mutable + auto exception_callback = [elem, + context, + ast, + log_queries, + log_queries_min_type = settings.log_queries_min_type, + log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(), + quota(quota), + status_info_to_query_log, + implicit_txn_control]() mutable { - if (auto txn = context->getCurrentTransaction()) + if (implicit_txn_control) + { + implicit_txn_control->executeRollback(context->getSessionContext()); + implicit_txn_control.reset(); + } + else if (auto txn = context->getCurrentTransaction()) txn->onException(); if (quota) @@ -1000,7 +1057,6 @@ static std::tuple executeQueryImpl( { ProfileEvents::increment(ProfileEvents::FailedInsertQuery); } - }; res.finish_callback = std::move(finish_callback); @@ -1009,8 +1065,15 @@ static std::tuple executeQueryImpl( } catch (...) { - if (auto txn = context->getCurrentTransaction()) + if (implicit_txn_control) + { + implicit_txn_control->executeRollback(context->getSessionContext()); + implicit_txn_control.reset(); + } + else if (auto txn = context->getCurrentTransaction()) + { txn->onException(); + } if (!internal) { diff --git a/src/Storages/HDFS/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp index 2f7b03790ee..3ac9f50231d 100644 --- a/src/Storages/HDFS/HDFSCommon.cpp +++ b/src/Storages/HDFS/HDFSCommon.cpp @@ -10,7 +10,9 @@ #include #include #include - +#if USE_KRB5 +#include +#endif // USE_KRB5 namespace DB { @@ -18,8 +20,10 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int NETWORK_ERROR; + #if USE_KRB5 extern const int EXCESSIVE_ELEMENT_IN_CONFIG; - extern const int NO_ELEMENTS_IN_CONFIG; + extern const int KERBEROS_ERROR; + #endif // USE_KRB5 } const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; @@ -40,25 +44,28 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration String key_name; if (key == "hadoop_kerberos_keytab") { + #if USE_KRB5 need_kinit = true; hadoop_kerberos_keytab = config.getString(key_path); + #else // USE_KRB5 + LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_keytab parameter is ignored because ClickHouse was built without support of krb5 library."); + #endif // USE_KRB5 continue; } else if (key == "hadoop_kerberos_principal") { + #if USE_KRB5 need_kinit = true; hadoop_kerberos_principal = config.getString(key_path); hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str()); - continue; - } - else if (key == "hadoop_kerberos_kinit_command") - { - need_kinit = true; - hadoop_kerberos_kinit_command = config.getString(key_path); + #else // USE_KRB5 + LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_principal parameter is ignored because ClickHouse was built without support of krb5 library."); + #endif // USE_KRB5 continue; } else if (key == "hadoop_security_kerberos_ticket_cache_path") { + #if USE_KRB5 if (isUser) { throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user", @@ -67,6 +74,9 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration hadoop_security_kerberos_ticket_cache_path = config.getString(key_path); // standard param - pass further + #else // USE_KRB5 + LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop.security.kerberos.ticket.cache.path parameter is ignored because ClickHouse was built without support of krb5 library."); + #endif // USE_KRB5 } key_name = boost::replace_all_copy(key, "_", "."); @@ -76,44 +86,21 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration } } -String HDFSBuilderWrapper::getKinitCmd() -{ - - if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty()) - { - throw Exception("Not enough parameters to run kinit", - ErrorCodes::NO_ELEMENTS_IN_CONFIG); - } - - WriteBufferFromOwnString ss; - - String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ? - String() : - (String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\""); - - // command to run looks like - // kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || .. - ss << hadoop_kerberos_kinit_command << cache_name << - " -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal << - "|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" << - hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal; - return ss.str(); -} - +#if USE_KRB5 void HDFSBuilderWrapper::runKinit() { - String cmd = getKinitCmd(); - LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd); - - std::unique_lock lck(kinit_mtx); - - auto command = ShellCommand::execute(cmd); - auto status = command->tryWait(); - if (status) + LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Running KerberosInit"); + try { - throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS); + kerberosInit(hadoop_kerberos_keytab,hadoop_kerberos_principal,hadoop_security_kerberos_ticket_cache_path); } + catch (const DB::Exception & e) + { + throw Exception("KerberosInit failure: "+ getExceptionMessage(e, false), ErrorCodes::KERBEROS_ERROR); + } + LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Finished KerberosInit"); } +#endif // USE_KRB5 HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config) { @@ -184,16 +171,16 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A } } + #if USE_KRB5 if (builder.need_kinit) { builder.runKinit(); } + #endif // USE_KRB5 return builder; } -std::mutex HDFSBuilderWrapper::kinit_mtx; - HDFSFSPtr createHDFSFS(hdfsBuilder * builder) { HDFSFSPtr fs(hdfsBuilderConnect(builder)); diff --git a/src/Storages/HDFS/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h index 0523849abe5..9eb2dfd3e46 100644 --- a/src/Storages/HDFS/HDFSCommon.h +++ b/src/Storages/HDFS/HDFSCommon.h @@ -9,7 +9,6 @@ #include #include -#include #include #include @@ -69,10 +68,6 @@ public: private: void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false); - String getKinitCmd(); - - void runKinit(); - // hdfs builder relies on an external config data storage std::pair& keep(const String & k, const String & v) { @@ -80,14 +75,15 @@ private: } hdfsBuilder * hdfs_builder; + std::vector> config_stor; + + #if USE_KRB5 + void runKinit(); String hadoop_kerberos_keytab; String hadoop_kerberos_principal; - String hadoop_kerberos_kinit_command = "kinit"; String hadoop_security_kerberos_ticket_cache_path; - - static std::mutex kinit_mtx; - std::vector> config_stor; bool need_kinit{false}; + #endif // USE_KRB5 }; using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index d03db010a1f..058cc5ff7de 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -45,7 +45,9 @@ #include #include - +#if USE_KRB5 +#include +#endif // USE_KRB5 namespace CurrentMetrics { @@ -517,6 +519,33 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf) if (config.has(config_prefix)) loadFromConfig(conf, config, config_prefix); + #if USE_KRB5 + if (conf.has_property("sasl.kerberos.kinit.cmd")) + LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored."); + + conf.set("sasl.kerberos.kinit.cmd",""); + conf.set("sasl.kerberos.min.time.before.relogin","0"); + + if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal")) + { + String keytab = conf.get("sasl.kerberos.keytab"); + String principal = conf.get("sasl.kerberos.principal"); + LOG_DEBUG(log, "Running KerberosInit"); + try + { + kerberosInit(keytab,principal); + } + catch (const Exception & e) + { + LOG_ERROR(log, "KerberosInit failure: {}", getExceptionMessage(e, false)); + } + LOG_DEBUG(log, "Finished KerberosInit"); + } + #else // USE_KRB5 + if (conf.has_property("sasl.kerberos.keytab") || conf.has_property("sasl.kerberos.principal")) + LOG_WARNING(log, "Kerberos-related parameters are ignored because ClickHouse was built without support of krb5 library."); + #endif // USE_KRB5 + // Update consumer topic-specific configuration for (const auto & topic : topics) { diff --git a/tests/ci/build_report_check.py b/tests/ci/build_report_check.py index 1ba91a38a60..dbf5adfe174 100644 --- a/tests/ci/build_report_check.py +++ b/tests/ci/build_report_check.py @@ -151,6 +151,8 @@ def main(): needs_data = json.load(file_handler) required_builds = len(needs_data) + logging.info("The next builds are required: %s", ", ".join(needs_data)) + gh = Github(get_best_robot_token()) pr_info = PRInfo() rerun_helper = RerunHelper(gh, pr_info, build_check_name) @@ -159,7 +161,6 @@ def main(): sys.exit(0) builds_for_check = CI_CONFIG["builds_report_config"][build_check_name] - logging.info("My reports list %s", builds_for_check) required_builds = required_builds or len(builds_for_check) # Collect reports from json artifacts diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 75159053f26..22c6816eec2 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -5,6 +5,7 @@ # pylint: disable=too-many-lines import enum +from queue import Full import shutil import sys import os @@ -1581,15 +1582,18 @@ def do_run_tests(jobs, test_suite: TestSuite, parallel): for _ in range(jobs): parallel_tests_array.append((None, batch_size, test_suite)) - with closing(multiprocessing.Pool(processes=jobs)) as pool: - pool.map_async(run_tests_array, parallel_tests_array) + try: + with closing(multiprocessing.Pool(processes=jobs)) as pool: + pool.map_async(run_tests_array, parallel_tests_array) - for suit in test_suite.parallel_tests: - queue.put(suit, timeout=args.timeout * 1.1) + for suit in test_suite.parallel_tests: + queue.put(suit, timeout=args.timeout * 1.1) - for _ in range(jobs): - queue.put(None, timeout=args.timeout * 1.1) + for _ in range(jobs): + queue.put(None, timeout=args.timeout * 1.1) + queue.close() + except Full: queue.close() pool.join() diff --git a/tests/integration/test_broken_detached_part_clean_up/configs/store_cleanup.xml b/tests/integration/test_broken_detached_part_clean_up/configs/store_cleanup.xml index 3b0260dd07a..5fbe87cce00 100644 --- a/tests/integration/test_broken_detached_part_clean_up/configs/store_cleanup.xml +++ b/tests/integration/test_broken_detached_part_clean_up/configs/store_cleanup.xml @@ -1,6 +1,6 @@ 0 - 15 + 60 1