Merge branch 'master' into sql-insert-format

This commit is contained in:
mergify[bot] 2022-06-28 14:20:31 +00:00 committed by GitHub
commit bed1f68c74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
69 changed files with 968 additions and 462 deletions

View File

@ -143,6 +143,8 @@ jobs:
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # For a proper version and performance artifacts
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
@ -188,6 +190,8 @@ jobs:
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
fetch-depth: 0 # For a proper version and performance artifacts
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync --recursive

View File

@ -16,3 +16,4 @@ ClickHouse® is an open-source column-oriented database management system that a
## Upcoming events
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/286304312/) Please join us for an evening of talks (in English), food and discussion. Featuring talks of ClickHouse in production and at least one on the deep internals of ClickHouse itself.
* [v22.7 Release Webinar](https://clickhouse.com/company/events/v22-7-release-webinar/) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap.

View File

@ -215,7 +215,7 @@ start
clickhouse-client --query "SELECT 'Server successfully started', 'OK'" >> /test_output/test_results.tsv \
|| (echo -e 'Server failed to start (see application_errors.txt and clickhouse-server.clean.log)\tFAIL' >> /test_output/test_results.tsv \
&& grep -Fa "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
&& grep -a "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
[ -f /var/log/clickhouse-server/clickhouse-server.log ] || echo -e "Server log does not exist\tFAIL"
[ -f /var/log/clickhouse-server/stderr.log ] || echo -e "Stderr log does not exist\tFAIL"
@ -313,7 +313,7 @@ then
start 500
clickhouse-client --query "SELECT 'Backward compatibility check: Server successfully started', 'OK'" >> /test_output/test_results.tsv \
|| (echo -e 'Backward compatibility check: Server failed to start\tFAIL' >> /test_output/test_results.tsv \
&& grep -Fa "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt)
&& grep -a "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt)
clickhouse-client --query="SELECT 'Server version: ', version()"
@ -343,7 +343,7 @@ then
-e "UNFINISHED" \
-e "Renaming unexpected part" \
-e "PART_IS_TEMPORARILY_LOCKED" \
-e "and a merge is impossible: we didn't find smaller parts" \
-e "and a merge is impossible: we didn't find" \
-e "found in queue and some source parts for it was lost" \
-e "is lost forever." \
/var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "<Error>" > /test_output/bc_check_error_messages.txt \

View File

@ -186,7 +186,6 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us
| - | - |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
|libhdfs3\_conf | "" |
### Limitations {#limitations}
@ -200,8 +199,7 @@ Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (`HADOOP_SECURE_DN_USER` is a reliable indicator of such
security approach). Use `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` for reference.
If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_kerberos_kinit_command` is specified, `kinit` will be invoked. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case. `kinit` tool and krb5 configuration files are required.
If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_security_kerberos_ticket_cache_path` are specified, Kerberos authentication will be used. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case.
## HDFS Namenode HA support {#namenode-ha}
libhdfs3 support HDFS namenode HA.

View File

@ -168,7 +168,7 @@ For a list of possible configuration options, see the [librdkafka configuration
### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab` and `sasl_kerberos_principal` child elements.
Example:

View File

@ -183,7 +183,6 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9
| - | - |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
### Ограничения {#limitations}
* `hadoop_security_kerberos_ticket_cache_path` и `libhdfs3_conf` могут быть определены только на глобальном, а не на пользовательском уровне
@ -196,7 +195,7 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9
коммуникация с узлами данных не защищена SASL (`HADOOP_SECURE_DN_USER` надежный показатель такого
подхода к безопасности). Используйте `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` для примера настроек.
Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_kerberos_kinit_command` указаны в настройках, `kinit` будет вызван. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае. Необходимо также будет установить `kinit` и файлы конфигурации krb5.
Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_security_kerberos_ticket_cache_path` указаны в настройках, будет использоваться аутентификация с помощью Kerberos. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае.
## Виртуальные столбцы {#virtual-columns}

View File

@ -167,7 +167,7 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
### Поддержка Kerberos {#kafka-kerberos-support}
Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент `security_protocol` со значением `sasl_plaintext`. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС.
ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` и `sasl.kerberos.kinit.cmd`.
ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab` и `sasl_kerberos_principal`.
Пример:

View File

@ -0,0 +1,3 @@
if (ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()

230
src/Access/KerberosInit.cpp Normal file
View File

@ -0,0 +1,230 @@
#include <Access/KerberosInit.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h>
#include <Loggers/Loggers.h>
#include <filesystem>
#include <boost/core/noncopyable.hpp>
#include <fmt/format.h>
#if USE_KRB5
#include <krb5.h>
#include <mutex>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int KERBEROS_ERROR;
}
}
namespace
{
struct K5Data
{
krb5_context ctx;
krb5_ccache out_cc;
krb5_principal me;
char * name;
krb5_boolean switch_to_cache;
};
/**
* This class implements programmatic implementation of kinit.
*/
class KerberosInit : boost::noncopyable
{
public:
void init(const String & keytab_file, const String & principal, const String & cache_name = "");
~KerberosInit();
private:
struct K5Data k5 {};
krb5_ccache defcache = nullptr;
krb5_get_init_creds_opt * options = nullptr;
// Credentials structure including ticket, session key, and lifetime info.
krb5_creds my_creds;
krb5_keytab keytab = nullptr;
krb5_principal defcache_princ = nullptr;
String fmtError(krb5_error_code code) const;
};
}
String KerberosInit::fmtError(krb5_error_code code) const
{
const char *msg;
msg = krb5_get_error_message(k5.ctx, code);
String fmt_error = fmt::format(" ({}, {})", code, msg);
krb5_free_error_message(k5.ctx, msg);
return fmt_error;
}
void KerberosInit::init(const String & keytab_file, const String & principal, const String & cache_name)
{
auto * log = &Poco::Logger::get("KerberosInit");
LOG_TRACE(log,"Trying to authenticate with Kerberos v5");
krb5_error_code ret;
const char *deftype = nullptr;
if (!std::filesystem::exists(keytab_file))
throw Exception("Keytab file does not exist", ErrorCodes::KERBEROS_ERROR);
ret = krb5_init_context(&k5.ctx);
if (ret)
throw Exception(ErrorCodes::KERBEROS_ERROR, "Error while initializing Kerberos 5 library ({})", ret);
if (!cache_name.empty())
{
ret = krb5_cc_resolve(k5.ctx, cache_name.c_str(), &k5.out_cc);
if (ret)
throw Exception("Error in resolving cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Resolved cache");
}
else
{
// Resolve the default cache and get its type and default principal (if it is initialized).
ret = krb5_cc_default(k5.ctx, &defcache);
if (ret)
throw Exception("Error while getting default cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Resolved default cache");
deftype = krb5_cc_get_type(k5.ctx, defcache);
if (krb5_cc_get_principal(k5.ctx, defcache, &defcache_princ) != 0)
defcache_princ = nullptr;
}
// Use the specified principal name.
ret = krb5_parse_name_flags(k5.ctx, principal.c_str(), 0, &k5.me);
if (ret)
throw Exception("Error when parsing principal name " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Cache related commands
if (k5.out_cc == nullptr && krb5_cc_support_switch(k5.ctx, deftype))
{
// Use an existing cache for the client principal if we can.
ret = krb5_cc_cache_match(k5.ctx, k5.me, &k5.out_cc);
if (ret && ret != KRB5_CC_NOTFOUND)
throw Exception("Error while searching for cache for " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
if (0 == ret)
{
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
k5.switch_to_cache = 1;
}
else if (defcache_princ != nullptr)
{
// Create a new cache to avoid overwriting the initialized default cache.
ret = krb5_cc_new_unique(k5.ctx, deftype, nullptr, &k5.out_cc);
if (ret)
throw Exception("Error while generating new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
k5.switch_to_cache = 1;
}
}
// Use the default cache if we haven't picked one yet.
if (k5.out_cc == nullptr)
{
k5.out_cc = defcache;
defcache = nullptr;
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
}
ret = krb5_unparse_name(k5.ctx, k5.me, &k5.name);
if (ret)
throw Exception("Error when unparsing name" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using principal: {}", k5.name);
// Allocate a new initial credential options structure.
ret = krb5_get_init_creds_opt_alloc(k5.ctx, &options);
if (ret)
throw Exception("Error in options allocation" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Resolve keytab
ret = krb5_kt_resolve(k5.ctx, keytab_file.c_str(), &keytab);
if (ret)
throw Exception("Error in resolving keytab "+keytab_file + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using keytab: {}", keytab_file);
// Set an output credential cache in initial credential options.
ret = krb5_get_init_creds_opt_set_out_ccache(k5.ctx, options, k5.out_cc);
if (ret)
throw Exception("Error in setting output credential cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Action: init or renew
LOG_TRACE(log,"Trying to renew credentials");
memset(&my_creds, 0, sizeof(my_creds));
// Get renewed credential from KDC using an existing credential from output cache.
ret = krb5_get_renewed_creds(k5.ctx, &my_creds, k5.me, k5.out_cc, nullptr);
if (ret)
{
LOG_TRACE(log,"Renew failed {}", fmtError(ret));
LOG_TRACE(log,"Trying to get initial credentials");
// Request KDC for an initial credentials using keytab.
ret = krb5_get_init_creds_keytab(k5.ctx, &my_creds, k5.me, keytab, 0, nullptr, options);
if (ret)
throw Exception("Error in getting initial credentials" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
else
LOG_TRACE(log,"Got initial credentials");
}
else
{
LOG_TRACE(log,"Successful renewal");
// Initialize a credential cache. Destroy any existing contents of cache and initialize it for the default principal.
ret = krb5_cc_initialize(k5.ctx, k5.out_cc, k5.me);
if (ret)
throw Exception("Error when initializing cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Initialized cache");
// Store credentials in a credential cache.
ret = krb5_cc_store_cred(k5.ctx, k5.out_cc, &my_creds);
if (ret)
LOG_TRACE(log,"Error while storing credentials");
LOG_TRACE(log,"Stored credentials");
}
if (k5.switch_to_cache)
{
// Make a credential cache the primary cache for its collection.
ret = krb5_cc_switch(k5.ctx, k5.out_cc);
if (ret)
throw Exception("Error while switching to new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
}
LOG_TRACE(log,"Authenticated to Kerberos v5");
}
KerberosInit::~KerberosInit()
{
if (k5.ctx)
{
if (defcache)
krb5_cc_close(k5.ctx, defcache);
krb5_free_principal(k5.ctx, defcache_princ);
if (options)
krb5_get_init_creds_opt_free(k5.ctx, options);
if (my_creds.client == k5.me)
my_creds.client = nullptr;
krb5_free_cred_contents(k5.ctx, &my_creds);
if (keytab)
krb5_kt_close(k5.ctx, keytab);
krb5_free_unparsed_name(k5.ctx, k5.name);
krb5_free_principal(k5.ctx, k5.me);
if (k5.out_cc != nullptr)
krb5_cc_close(k5.ctx, k5.out_cc);
krb5_free_context(k5.ctx);
}
}
void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name)
{
// Using mutex to prevent cache file corruptions
static std::mutex kinit_mtx;
std::unique_lock<std::mutex> lck(kinit_mtx);
KerberosInit k_init;
k_init.init(keytab_file, principal, cache_name);
}
#endif // USE_KRB5

11
src/Access/KerberosInit.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#include "config_core.h"
#include <base/types.h>
#if USE_KRB5
void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name = "");
#endif // USE_KRB5

View File

@ -0,0 +1,4 @@
if (TARGET ch_contrib::krb5)
add_executable (kerberos_init kerberos_init.cpp)
target_link_libraries (kerberos_init PRIVATE dbms ch_contrib::krb5)
endif()

View File

@ -0,0 +1,47 @@
#include <iostream>
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
#include <Common/Exception.h>
#include <Access/KerberosInit.h>
/** The example demonstrates using of kerberosInit function to obtain and cache Kerberos ticket-granting ticket.
* The first argument specifies keytab file. The second argument specifies principal name.
* The third argument is optional. It specifies credentials cache location.
* After successful run of kerberos_init it is useful to call klist command to list cached Kerberos tickets.
* It is also useful to run kdestroy to destroy Kerberos tickets if needed.
*/
using namespace DB;
int main(int argc, char ** argv)
{
std::cout << "Kerberos Init" << "\n";
if (argc < 3)
{
std::cout << "kerberos_init obtains and caches an initial ticket-granting ticket for principal." << "\n\n";
std::cout << "Usage:" << "\n" << " kerberos_init keytab principal [cache]" << "\n";
return 0;
}
String cache_name = "";
if (argc == 4)
cache_name = argv[3];
Poco::AutoPtr<Poco::ConsoleChannel> app_channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(app_channel);
Poco::Logger::root().setLevel("trace");
try
{
kerberosInit(argv[1], argv[2], cache_name);
}
catch (const Exception & e)
{
std::cout << "KerberosInit failure: " << getExceptionMessage(e, false) << "\n";
return -1;
}
std::cout << "Done" << "\n";
return 0;
}

View File

@ -338,11 +338,8 @@ void MemoryTracker::free(Int64 size)
accounted_size += new_amount;
}
}
if (!OvercommitTrackerBlockerInThread::isBlocked())
{
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr)
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
}
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed))
overcommit_tracker_ptr->tryContinueQueryExecutionAfterFree(accounted_size);
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);

View File

@ -20,11 +20,32 @@ OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
, global_mutex(global_mutex_)
, freed_memory(0)
, required_memory(0)
, next_id(0)
, id_to_release(0)
, allow_release(true)
{}
#define LOG_DEBUG_SAFE(...) \
do { \
OvercommitTrackerBlockerInThread blocker; \
try \
{ \
ALLOW_ALLOCATIONS_IN_SCOPE; \
LOG_DEBUG(__VA_ARGS__); \
} \
catch (...) \
{ \
if (fprintf(stderr, "Allocation failed during writing to log in OvercommitTracker\n") != -1) \
; \
} \
} while (false)
OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
{
DENY_ALLOCATIONS_IN_SCOPE;
if (OvercommitTrackerBlockerInThread::isBlocked())
return OvercommitResult::NONE;
// NOTE: Do not change the order of locks
//
// global_mutex must be acquired before overcommit_m, because
@ -34,6 +55,8 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
std::unique_lock<std::mutex> global_lock(global_mutex);
std::unique_lock<std::mutex> lk(overcommit_m);
size_t id = next_id++;
auto max_wait_time = tracker->getOvercommitWaitingTime();
if (max_wait_time == ZERO_MICROSEC)
@ -65,23 +88,21 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
allow_release = true;
required_memory += amount;
required_per_thread[tracker] = amount;
auto wait_start_time = std::chrono::system_clock::now();
bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]()
bool timeout = !cv.wait_for(lk, max_wait_time, [this, id]()
{
return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE;
return id < id_to_release || cancellation_state == QueryCancellationState::NONE;
});
auto wait_end_time = std::chrono::system_clock::now();
ProfileEvents::increment(ProfileEvents::MemoryOvercommitWaitTimeMicroseconds, (wait_end_time - wait_start_time) / 1us);
LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
LOG_DEBUG_SAFE(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
required_memory -= amount;
Int64 still_need = required_per_thread[tracker]; // If enough memory is freed it will be 0
required_per_thread.erase(tracker);
bool still_need = !(id < id_to_release); // True if thread wasn't released
// If threads where not released since last call of this method,
// we can release them now.
if (allow_release && required_memory <= freed_memory && still_need != 0)
if (allow_release && required_memory <= freed_memory && still_need)
releaseThreads();
// All required amount of memory is free now and selected query to stop doesn't know about it.
@ -90,7 +111,7 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
reset();
if (timeout)
return OvercommitResult::TIMEOUTED;
if (still_need != 0)
if (still_need)
return OvercommitResult::NOT_ENOUGH_FREED;
else
return OvercommitResult::MEMORY_FREED;
@ -98,6 +119,11 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)
{
DENY_ALLOCATIONS_IN_SCOPE;
if (OvercommitTrackerBlockerInThread::isBlocked())
return;
std::lock_guard guard(overcommit_m);
if (cancellation_state != QueryCancellationState::NONE)
{
@ -109,10 +135,12 @@ void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)
void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
{
DENY_ALLOCATIONS_IN_SCOPE;
std::unique_lock<std::mutex> lk(overcommit_m);
if (picked_tracker == tracker)
{
LOG_DEBUG(getLogger(), "Picked query stopped");
LOG_DEBUG_SAFE(getLogger(), "Picked query stopped");
reset();
cv.notify_all();
@ -121,8 +149,7 @@ void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
void OvercommitTracker::releaseThreads()
{
for (auto & required : required_per_thread)
required.second = 0;
id_to_release = next_id;
freed_memory = 0;
allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery
cv.notify_all();
@ -140,7 +167,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
// At this moment query list must be read only.
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
auto & queries = user_process_list->queries;
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", queries.size());
LOG_DEBUG_SAFE(logger, "Trying to choose query to stop from {} queries", queries.size());
for (auto const & query : queries)
{
if (query.second->isKilled())
@ -151,14 +178,14 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
continue;
auto ratio = memory_tracker->getOvercommitRatio();
LOG_DEBUG(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
LOG_DEBUG_SAFE(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
if (ratio.soft_limit != 0 && current_ratio < ratio)
{
query_tracker = memory_tracker;
current_ratio = ratio;
}
}
LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}",
LOG_DEBUG_SAFE(logger, "Selected to stop query with overcommit ratio {}/{}",
current_ratio.committed, current_ratio.soft_limit);
picked_tracker = query_tracker;
}
@ -174,7 +201,7 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only.
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", process_list->size());
LOG_DEBUG_SAFE(logger, "Trying to choose query to stop from {} queries", process_list->size());
for (auto const & query : process_list->processes)
{
if (query.isKilled())
@ -190,14 +217,14 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
if (!memory_tracker)
continue;
auto ratio = memory_tracker->getOvercommitRatio(user_soft_limit);
LOG_DEBUG(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
LOG_DEBUG_SAFE(logger, "Query has ratio {}/{}", ratio.committed, ratio.soft_limit);
if (current_ratio < ratio)
{
query_tracker = memory_tracker;
current_ratio = ratio;
}
}
LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}",
LOG_DEBUG_SAFE(logger, "Selected to stop query with overcommit ratio {}/{}",
current_ratio.committed, current_ratio.soft_limit);
picked_tracker = query_tracker;
}

View File

@ -104,6 +104,10 @@ private:
picked_tracker = nullptr;
cancellation_state = QueryCancellationState::NONE;
freed_memory = 0;
next_id = 0;
id_to_release = 0;
allow_release = true;
}
@ -111,8 +115,6 @@ private:
QueryCancellationState cancellation_state;
std::unordered_map<MemoryTracker *, Int64> required_per_thread;
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
@ -122,6 +124,9 @@ private:
Int64 freed_memory;
Int64 required_memory;
size_t next_id; // Id provided to the next thread to come in OvercommitTracker
size_t id_to_release; // We can release all threads with id smaller than this
bool allow_release;
};

View File

@ -476,7 +476,7 @@ class MultiVolnitskyBase
{
private:
/// needles and their offsets
const std::vector<StringRef> & needles;
const std::vector<std::string_view> & needles;
/// fallback searchers
@ -502,7 +502,7 @@ private:
static constexpr size_t small_limit = VolnitskyTraits::hash_size / 8;
public:
explicit MultiVolnitskyBase(const std::vector<StringRef> & needles_) : needles{needles_}, step{0}, last{0}
explicit MultiVolnitskyBase(const std::vector<std::string_view> & needles_) : needles{needles_}, step{0}, last{0}
{
fallback_searchers.reserve(needles.size());
hash = std::unique_ptr<OffsetId[]>(new OffsetId[VolnitskyTraits::hash_size]); /// No zero initialization, it will be done later.
@ -535,8 +535,8 @@ public:
for (; last < size; ++last)
{
const char * cur_needle_data = needles[last].data;
const size_t cur_needle_size = needles[last].size;
const char * cur_needle_data = needles[last].data();
const size_t cur_needle_size = needles[last].size();
/// save the indices of fallback searchers
if (VolnitskyTraits::isFallbackNeedle(cur_needle_size))
@ -593,7 +593,7 @@ public:
{
const auto res = pos - (hash[cell_num].off - 1);
const size_t ind = hash[cell_num].id;
if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
return true;
}
}
@ -625,7 +625,7 @@ public:
{
const auto res = pos - (hash[cell_num].off - 1);
const size_t ind = hash[cell_num].id;
if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
answer = std::min(answer, ind);
}
}
@ -663,7 +663,7 @@ public:
{
const auto res = pos - (hash[cell_num].off - 1);
const size_t ind = hash[cell_num].id;
if (res + needles[ind].size <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
if (res + needles[ind].size() <= haystack_end && fallback_searchers[ind].compare(haystack, haystack_end, res))
answer = std::min<UInt64>(answer, res - haystack);
}
}
@ -699,7 +699,7 @@ public:
const auto * res = pos - (hash[cell_num].off - 1);
const size_t ind = hash[cell_num].id;
if (answer[ind] == 0
&& res + needles[ind].size <= haystack_end
&& res + needles[ind].size() <= haystack_end
&& fallback_searchers[ind].compare(haystack, haystack_end, res))
answer[ind] = count_chars(haystack, res);
}

View File

@ -32,7 +32,7 @@ struct ConnectionInfo
class Connection : private boost::noncopyable
{
public:
Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
explicit Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
void execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec);

View File

@ -25,13 +25,13 @@ public:
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover(
explicit PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
explicit PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,

View File

@ -591,6 +591,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, use_local_cache_for_remote_storage, true, "Use local cache for remote storage like HDFS or S3, it's used for remote table engine only", 0) \
\
M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \
M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \
M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
@ -601,6 +603,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, implicit_transaction, false, "If enabled and not already inside a transaction, wraps the query inside a full transaction (begin + commit or rollback)", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \

View File

@ -8,11 +8,6 @@
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Redis
{
class Client;

View File

@ -113,7 +113,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
}
void AzureObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
auto client_ptr = client.get();

View File

@ -73,7 +73,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -51,7 +51,7 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', buf);
storage_objects[i].relative_path = remote_fs_object_path;
storage_objects[i].path = remote_fs_object_path;
storage_objects[i].bytes_size = remote_fs_object_size;
}

View File

@ -12,17 +12,6 @@ namespace DB
struct DiskObjectStorageMetadata
{
private:
struct RelativePathWithSize
{
String relative_path;
size_t bytes_size;
RelativePathWithSize() = default;
RelativePathWithSize(const String & relative_path_, size_t bytes_size_)
: relative_path(relative_path_), bytes_size(bytes_size_) {}
};
/// Metadata file version.
static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1;
static constexpr uint32_t VERSION_RELATIVE_PATHS = 2;
@ -31,7 +20,7 @@ private:
const std::string & common_metadata_path;
/// Relative paths of blobs.
std::vector<RelativePathWithSize> storage_objects;
RelativePathsWithSize storage_objects;
/// URI
const std::string & remote_fs_root_path;
@ -71,7 +60,7 @@ public:
return remote_fs_root_path;
}
std::vector<RelativePathWithSize> getBlobsRelativePaths() const
RelativePathsWithSize getBlobsRelativePaths() const
{
return storage_objects;
}

View File

@ -1,5 +1,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -379,7 +380,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
return true;
};
PathsWithSize children;
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path, children);
restore_files(children);
@ -523,7 +524,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
return true;
};
PathsWithSize children;
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
restore_file_operations(children);

View File

@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
}
void HDFSObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
const size_t begin_of_path = path.find('/', path.find("//") + 2);
int32_t num_entries;

View File

@ -75,7 +75,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -25,8 +25,7 @@ class WriteBufferFromFileBase;
using ObjectAttributes = std::map<std::string, std::string>;
/// Path to a file and its size.
/// Path can be either relative or absolute - according to the context of use.
/// Path to a file (always absolute) and its size.
struct PathWithSize
{
std::string path;
@ -42,6 +41,7 @@ struct PathWithSize
/// List of paths with their sizes
using PathsWithSize = std::vector<PathWithSize>;
using RelativePathsWithSize = PathsWithSize;
struct ObjectMetadata
{
@ -66,7 +66,7 @@ public:
virtual bool exists(const std::string & path) const = 0;
/// List on prefix, return children (relative paths) with their sizes.
virtual void listPrefix(const std::string & path, PathsWithSize & children) const = 0;
virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0;
/// Get object metadata if supported. It should be possible to receive
/// at least size of object

View File

@ -195,7 +195,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
}
void S3ObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();

View File

@ -80,7 +80,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exist or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -205,7 +205,7 @@ struct ConvertImpl
if constexpr (std::is_same_v<FromDataType, DataTypeUUID> != std::is_same_v<ToDataType, DataTypeUUID>)
{
throw Exception("Conversion between numeric types and UUID is not supported", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Conversion between numeric types and UUID is not supported. Probably the passed UUID is unquoted", ErrorCodes::NOT_IMPLEMENTED);
}
else
{

View File

@ -10,10 +10,8 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/hyperscanRegexpChecker.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <base/StringRef.h>
#include <optional>
@ -23,35 +21,28 @@ namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int FUNCTION_NOT_ALLOWED;
}
template <typename Impl, size_t LimitArgs>
template <typename Impl>
class FunctionsMultiStringFuzzySearch : public IFunction
{
static_assert(LimitArgs > 0);
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(ContextPtr context)
{
if (Impl::is_using_hyperscan && !context->getSettingsRef().allow_hyperscan)
throw Exception(
"Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0", ErrorCodes::FUNCTION_NOT_ALLOWED);
return std::make_shared<FunctionsMultiStringFuzzySearch>(
context->getSettingsRef().max_hyperscan_regexp_length, context->getSettingsRef().max_hyperscan_regexp_total_length);
const auto & settings = context->getSettingsRef();
return std::make_shared<FunctionsMultiStringFuzzySearch>(settings.allow_hyperscan, settings.max_hyperscan_regexp_length, settings.max_hyperscan_regexp_total_length);
}
FunctionsMultiStringFuzzySearch(size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_)
: max_hyperscan_regexp_length(max_hyperscan_regexp_length_), max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_)
{
}
FunctionsMultiStringFuzzySearch(bool allow_hyperscan_, size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_)
: allow_hyperscan(allow_hyperscan_)
, max_hyperscan_regexp_length(max_hyperscan_regexp_length_)
, max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_)
{}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
@ -60,82 +51,53 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[0]->getName(), getName());
if (!isUnsignedInteger(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[1]->getName(), getName());
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[2].get());
if (!array_type || !checkAndGetDataType<DataTypeString>(array_type->getNestedType().get()))
throw Exception(
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[2]->getName(), getName());
return Impl::getReturnType();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
using ResultType = typename Impl::ResultType;
const ColumnPtr & column_haystack = arguments[0].column;
const ColumnPtr & num_ptr = arguments[1].column;
const ColumnPtr & arr_ptr = arguments[2].column;
const ColumnString * col_haystack_vector = checkAndGetColumn<ColumnString>(&*column_haystack);
assert(col_haystack_vector); // getReturnTypeImpl() checks the data type
const ColumnPtr & num_ptr = arguments[1].column;
const ColumnConst * col_const_num = nullptr;
UInt32 edit_distance = 0;
if ((col_const_num = checkAndGetColumnConst<ColumnUInt8>(num_ptr.get())))
edit_distance = col_const_num->getValue<UInt8>();
else if ((col_const_num = checkAndGetColumnConst<ColumnUInt16>(num_ptr.get())))
edit_distance = col_const_num->getValue<UInt16>();
else if ((col_const_num = checkAndGetColumnConst<ColumnUInt32>(num_ptr.get())))
edit_distance = col_const_num->getValue<UInt32>();
if (const auto * col_const_uint8 = checkAndGetColumnConst<ColumnUInt8>(num_ptr.get()))
edit_distance = col_const_uint8->getValue<UInt8>();
else if (const auto * col_const_uint16 = checkAndGetColumnConst<ColumnUInt16>(num_ptr.get()))
edit_distance = col_const_uint16->getValue<UInt16>();
else if (const auto * col_const_uint32 = checkAndGetColumnConst<ColumnUInt32>(num_ptr.get()))
edit_distance = col_const_uint32->getValue<UInt32>();
else
throw Exception(
"Illegal column " + arguments[1].column->getName()
+ ". The number is not const or does not fit in UInt32",
ErrorCodes::ILLEGAL_COLUMN);
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The number is not const or does not fit in UInt32", arguments[1].column->getName());
const ColumnPtr & arr_ptr = arguments[2].column;
const ColumnConst * col_const_arr = checkAndGetColumnConst<ColumnArray>(arr_ptr.get());
if (!col_const_arr)
throw Exception(
"Illegal column " + arguments[2].column->getName() + ". The array is not const",
ErrorCodes::ILLEGAL_COLUMN);
Array src_arr = col_const_arr->getValue<Array>();
if (src_arr.size() > LimitArgs)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + std::to_string(src_arr.size())
+ ", should be at most " + std::to_string(LimitArgs),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
std::vector<StringRef> refs;
refs.reserve(src_arr.size());
for (const auto & el : src_arr)
refs.emplace_back(el.get<String>());
if (Impl::is_using_hyperscan)
checkRegexp(refs, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The array is not const", arguments[2].column->getName());
using ResultType = typename Impl::ResultType;
auto col_res = ColumnVector<ResultType>::create();
auto col_offsets = ColumnArray::ColumnOffsets::create();
auto & vec_res = col_res->getData();
auto & offsets_res = col_offsets->getData();
// the implementations are responsible for resizing the output column
/// The blame for resizing output is for the callee.
if (col_haystack_vector)
Impl::vectorConstant(
col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), refs, vec_res, offsets_res, edit_distance);
else
throw Exception("Illegal column " + arguments[0].column->getName(), ErrorCodes::ILLEGAL_COLUMN);
Array needles_arr = col_const_arr->getValue<Array>();
Impl::vectorConstant(
col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), needles_arr, vec_res, offsets_res, edit_distance,
allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
if constexpr (Impl::is_column_array)
return ColumnArray::create(std::move(col_res), std::move(col_offsets));
@ -144,8 +106,9 @@ public:
}
private:
size_t max_hyperscan_regexp_length;
size_t max_hyperscan_regexp_total_length;
const bool allow_hyperscan;
const size_t max_hyperscan_regexp_length;
const size_t max_hyperscan_regexp_total_length;
};
}

View File

@ -98,7 +98,8 @@ public:
+ ", should be at most 255",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
std::vector<StringRef> refs;
std::vector<std::string_view> refs;
refs.reserve(src_arr.size());
for (const auto & el : src_arr)
refs.emplace_back(el.get<String>());

View File

@ -10,15 +10,17 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/hyperscanRegexpChecker.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <base/StringRef.h>
namespace DB
{
/**
* multiMatchAny(haystack, [pattern_1, pattern_2, ..., pattern_n])
* multiMatchAnyIndex(haystack, [pattern_1, pattern_2, ..., pattern_n])
* multiMatchAllIndices(haystack, [pattern_1, pattern_2, ..., pattern_n])
*
* multiSearchAny(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- find any of the const patterns inside haystack and return 0 or 1
* multiSearchAnyUTF8(haystack, [pattern_1, pattern_2, ..., pattern_n])
* multiSearchAnyCaseInsensitive(haystack, [pattern_1, pattern_2, ..., pattern_n])
@ -34,37 +36,28 @@ namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int FUNCTION_NOT_ALLOWED;
}
/// The argument limiting raises from Volnitsky searcher -- it is performance crucial to save only one byte for pattern number.
/// But some other searchers use this function, for example, multiMatchAny -- hyperscan does not have such restrictions
template <typename Impl, size_t LimitArgs = std::numeric_limits<UInt8>::max()>
template <typename Impl>
class FunctionsMultiStringSearch : public IFunction
{
static_assert(LimitArgs > 0);
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(ContextPtr context)
{
if (Impl::is_using_hyperscan && !context->getSettingsRef().allow_hyperscan)
throw Exception(
"Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0", ErrorCodes::FUNCTION_NOT_ALLOWED);
return std::make_shared<FunctionsMultiStringSearch>(
context->getSettingsRef().max_hyperscan_regexp_length, context->getSettingsRef().max_hyperscan_regexp_total_length);
const auto & settings = context->getSettingsRef();
return std::make_shared<FunctionsMultiStringSearch>(settings.allow_hyperscan, settings.max_hyperscan_regexp_length, settings.max_hyperscan_regexp_total_length);
}
FunctionsMultiStringSearch(size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_)
: max_hyperscan_regexp_length(max_hyperscan_regexp_length_), max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_)
{
}
FunctionsMultiStringSearch(bool allow_hyperscan_, size_t max_hyperscan_regexp_length_, size_t max_hyperscan_regexp_total_length_)
: allow_hyperscan(allow_hyperscan_)
, max_hyperscan_regexp_length(max_hyperscan_regexp_length_)
, max_hyperscan_regexp_total_length(max_hyperscan_regexp_total_length_)
{}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
@ -73,60 +66,39 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[0]->getName(), getName());
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
if (!array_type || !checkAndGetDataType<DataTypeString>(array_type->getNestedType().get()))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}", arguments[1]->getName(), getName());
return Impl::getReturnType();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
using ResultType = typename Impl::ResultType;
const ColumnPtr & column_haystack = arguments[0].column;
const ColumnPtr & arr_ptr = arguments[1].column;
const ColumnString * col_haystack_vector = checkAndGetColumn<ColumnString>(&*column_haystack);
assert(col_haystack_vector); // getReturnTypeImpl() checks the data type
const ColumnPtr & arr_ptr = arguments[1].column;
const ColumnConst * col_const_arr = checkAndGetColumnConst<ColumnArray>(arr_ptr.get());
if (!col_const_arr)
throw Exception(
"Illegal column " + arguments[1].column->getName() + ". The array is not const",
ErrorCodes::ILLEGAL_COLUMN);
Array src_arr = col_const_arr->getValue<Array>();
if (src_arr.size() > LimitArgs)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + std::to_string(src_arr.size())
+ ", should be at most " + std::to_string(LimitArgs),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
std::vector<StringRef> refs;
refs.reserve(src_arr.size());
for (const auto & el : src_arr)
refs.emplace_back(el.get<String>());
if (Impl::is_using_hyperscan)
checkRegexp(refs, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {}. The array is not const", arguments[1].column->getName());
using ResultType = typename Impl::ResultType;
auto col_res = ColumnVector<ResultType>::create();
auto col_offsets = ColumnArray::ColumnOffsets::create();
auto & vec_res = col_res->getData();
auto & offsets_res = col_offsets->getData();
// the implementations are responsible for resizing the output column
/// The blame for resizing output is for the callee.
if (col_haystack_vector)
Impl::vectorConstant(col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), refs, vec_res, offsets_res);
else
throw Exception("Illegal column " + arguments[0].column->getName(), ErrorCodes::ILLEGAL_COLUMN);
Array needles_arr = col_const_arr->getValue<Array>();
Impl::vectorConstant(
col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), needles_arr, vec_res, offsets_res,
allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
if constexpr (Impl::is_column_array)
return ColumnArray::create(std::move(col_res), std::move(col_offsets));
@ -135,8 +107,9 @@ public:
}
private:
size_t max_hyperscan_regexp_length;
size_t max_hyperscan_regexp_total_length;
const bool allow_hyperscan;
const size_t max_hyperscan_regexp_length;
const size_t max_hyperscan_regexp_total_length;
};
}

View File

@ -1,9 +1,11 @@
#pragma once
#include <base/types.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <Functions/checkHyperscanRegexp.h>
#include "Regexps.h"
#include "config_functions.h"
@ -19,18 +21,19 @@ namespace DB
namespace ErrorCodes
{
extern const int HYPERSCAN_CANNOT_SCAN_TEXT;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int FUNCTION_NOT_ALLOWED;
extern const int HYPERSCAN_CANNOT_SCAN_TEXT;
extern const int NOT_IMPLEMENTED;
extern const int TOO_MANY_BYTES;
}
template <typename Name, typename Type, bool MultiSearchDistance>
template <typename Name, typename ResultType_, bool WithEditDistance>
struct MultiMatchAllIndicesImpl
{
using ResultType = Type;
static constexpr bool is_using_hyperscan = true;
using ResultType = ResultType_;
/// Variable for understanding, if we used offsets for the output, most
/// likely to determine whether the function returns ColumnVector of ColumnArray.
static constexpr bool is_column_array = true;
@ -44,24 +47,39 @@ struct MultiMatchAllIndicesImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
PaddedPODArray<Type> & res,
PaddedPODArray<UInt64> & offsets)
const Array & needles_arr,
PaddedPODArray<ResultType> & res,
PaddedPODArray<UInt64> & offsets,
bool allow_hyperscan,
size_t max_hyperscan_regexp_length,
size_t max_hyperscan_regexp_total_length)
{
vectorConstant(haystack_data, haystack_offsets, needles, res, offsets, std::nullopt);
vectorConstant(haystack_data, haystack_offsets, needles_arr, res, offsets, std::nullopt, allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
}
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
PaddedPODArray<Type> & res,
PaddedPODArray<UInt64> & offsets,
[[maybe_unused]] std::optional<UInt32> edit_distance)
[[maybe_unused]] const ColumnString::Chars & haystack_data,
[[maybe_unused]] const ColumnString::Offsets & haystack_offsets,
[[maybe_unused]] const Array & needles_arr,
[[maybe_unused]] PaddedPODArray<ResultType> & res,
[[maybe_unused]] PaddedPODArray<UInt64> & offsets,
[[maybe_unused]] std::optional<UInt32> edit_distance,
bool allow_hyperscan,
[[maybe_unused]] size_t max_hyperscan_regexp_length,
[[maybe_unused]] size_t max_hyperscan_regexp_total_length)
{
offsets.resize(haystack_offsets.size());
if (!allow_hyperscan)
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0");
#if USE_VECTORSCAN
const auto & hyperscan_regex = MultiRegexps::get</*SaveIndices=*/true, MultiSearchDistance>(needles, edit_distance);
std::vector<std::string_view> needles;
needles.reserve(needles_arr.size());
for (const auto & needle : needles_arr)
needles.emplace_back(needle.get<String>());
checkHyperscanRegexp(needles, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
offsets.resize(haystack_offsets.size());
const auto & hyperscan_regex = MultiRegexps::get</*SaveIndices=*/true, WithEditDistance>(needles, edit_distance);
hs_scratch_t * scratch = nullptr;
hs_error_t err = hs_clone_scratch(hyperscan_regex->getScratch(), &scratch);
@ -76,7 +94,7 @@ struct MultiMatchAllIndicesImpl
unsigned int /* flags */,
void * context) -> int
{
static_cast<PaddedPODArray<Type>*>(context)->push_back(id);
static_cast<PaddedPODArray<ResultType>*>(context)->push_back(id);
return 0;
};
const size_t haystack_offsets_size = haystack_offsets.size();
@ -102,11 +120,6 @@ struct MultiMatchAllIndicesImpl
offset = haystack_offsets[i];
}
#else
(void)haystack_data;
(void)haystack_offsets;
(void)needles;
(void)res;
(void)offsets;
throw Exception(
"multi-search all indices is not implemented when vectorscan is off",
ErrorCodes::NOT_IMPLEMENTED);

View File

@ -1,8 +1,10 @@
#pragma once
#include <base/types.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/checkHyperscanRegexp.h>
#include "Regexps.h"
#include "config_functions.h"
@ -20,19 +22,31 @@ namespace DB
namespace ErrorCodes
{
extern const int HYPERSCAN_CANNOT_SCAN_TEXT;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int FUNCTION_NOT_ALLOWED;
extern const int HYPERSCAN_CANNOT_SCAN_TEXT;
extern const int NOT_IMPLEMENTED;
extern const int TOO_MANY_BYTES;
}
// For more readable instantiations of MultiMatchAnyImpl<>
struct MultiMatchTraits
{
enum class Find
{
Any,
AnyIndex
};
};
template <typename Name, typename Type, bool FindAny, bool FindAnyIndex, bool MultiSearchDistance>
template <typename Name, typename ResultType_, MultiMatchTraits::Find Find, bool WithEditDistance>
struct MultiMatchAnyImpl
{
static_assert(static_cast<int>(FindAny) + static_cast<int>(FindAnyIndex) == 1);
using ResultType = Type;
static constexpr bool is_using_hyperscan = true;
using ResultType = ResultType_;
static constexpr bool FindAny = (Find == MultiMatchTraits::Find::Any);
static constexpr bool FindAnyIndex = (Find == MultiMatchTraits::Find::AnyIndex);
/// Variable for understanding, if we used offsets for the output, most
/// likely to determine whether the function returns ColumnVector of ColumnArray.
static constexpr bool is_column_array = false;
@ -46,26 +60,40 @@ struct MultiMatchAnyImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
PaddedPODArray<Type> & res,
PaddedPODArray<UInt64> & offsets)
const Array & needles_arr,
PaddedPODArray<ResultType> & res,
PaddedPODArray<UInt64> & offsets,
bool allow_hyperscan,
size_t max_hyperscan_regexp_length,
size_t max_hyperscan_regexp_total_length)
{
vectorConstant(haystack_data, haystack_offsets, needles, res, offsets, std::nullopt);
vectorConstant(haystack_data, haystack_offsets, needles_arr, res, offsets, std::nullopt, allow_hyperscan, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
}
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
PaddedPODArray<Type> & res,
const Array & needles_arr,
PaddedPODArray<ResultType> & res,
[[maybe_unused]] PaddedPODArray<UInt64> & offsets,
[[maybe_unused]] std::optional<UInt32> edit_distance)
[[maybe_unused]] std::optional<UInt32> edit_distance,
bool allow_hyperscan,
size_t max_hyperscan_regexp_length,
size_t max_hyperscan_regexp_total_length)
{
(void)FindAny;
(void)FindAnyIndex;
if (!allow_hyperscan)
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Hyperscan functions are disabled, because setting 'allow_hyperscan' is set to 0");
std::vector<std::string_view> needles;
needles.reserve(needles_arr.size());
for (const auto & needle : needles_arr)
needles.emplace_back(needle.get<String>());
checkHyperscanRegexp(needles, max_hyperscan_regexp_length, max_hyperscan_regexp_total_length);
res.resize(haystack_offsets.size());
#if USE_VECTORSCAN
const auto & hyperscan_regex = MultiRegexps::get<FindAnyIndex, MultiSearchDistance>(needles, edit_distance);
const auto & hyperscan_regex = MultiRegexps::get<FindAnyIndex, WithEditDistance>(needles, edit_distance);
hs_scratch_t * scratch = nullptr;
hs_error_t err = hs_clone_scratch(hyperscan_regex->getScratch(), &scratch);
@ -81,9 +109,9 @@ struct MultiMatchAnyImpl
void * context) -> int
{
if constexpr (FindAnyIndex)
*reinterpret_cast<Type *>(context) = id;
*reinterpret_cast<ResultType *>(context) = id;
else if constexpr (FindAny)
*reinterpret_cast<Type *>(context) = 1;
*reinterpret_cast<ResultType *>(context) = 1;
/// Once we hit the callback, there is no need to search for others.
return 1;
};
@ -110,8 +138,8 @@ struct MultiMatchAnyImpl
offset = haystack_offsets[i];
}
#else
/// Fallback if do not use vectorscan
if constexpr (MultiSearchDistance)
// fallback if vectorscan is not compiled
if constexpr (WithEditDistance)
throw Exception(
"Edit distance multi-search is not implemented when vectorscan is off",
ErrorCodes::NOT_IMPLEMENTED);
@ -120,7 +148,7 @@ struct MultiMatchAnyImpl
memset(accum.data(), 0, accum.size());
for (size_t j = 0; j < needles.size(); ++j)
{
MatchImpl<Name, MatchTraits::Syntax::Re2, MatchTraits::Case::Sensitive, MatchTraits::Result::DontNegate>::vectorConstant(haystack_data, haystack_offsets, needles[j].toString(), nullptr, accum);
MatchImpl<Name, MatchTraits::Syntax::Re2, MatchTraits::Case::Sensitive, MatchTraits::Result::DontNegate>::vectorConstant(haystack_data, haystack_offsets, std::string(needles[j].data(), needles[j].size()), nullptr, accum);
for (size_t i = 0; i < res.size(); ++i)
{
if constexpr (FindAny)

View File

@ -15,7 +15,7 @@ struct MultiSearchAllPositionsImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
const std::vector<std::string_view> & needles,
PaddedPODArray<UInt64> & res)
{
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64

View File

@ -1,17 +1,22 @@
#pragma once
#include <vector>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Name, typename Impl>
struct MultiSearchFirstIndexImpl
{
using ResultType = UInt64;
static constexpr bool is_using_hyperscan = false;
/// Variable for understanding, if we used offsets for the output, most
/// likely to determine whether the function returns ColumnVector of ColumnArray.
static constexpr bool is_column_array = false;
@ -22,10 +27,24 @@ struct MultiSearchFirstIndexImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
const Array & needles_arr,
PaddedPODArray<UInt64> & res,
[[maybe_unused]] PaddedPODArray<UInt64> & offsets)
[[maybe_unused]] PaddedPODArray<UInt64> & offsets,
bool /*allow_hyperscan*/,
size_t /*max_hyperscan_regexp_length*/,
size_t /*max_hyperscan_regexp_total_length*/)
{
// For performance of Volnitsky search, it is crucial to save only one byte for pattern number.
if (needles_arr.size() > std::numeric_limits<UInt8>::max())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at most {}",
name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits<UInt8>::max()));
std::vector<std::string_view> needles;
needles.reserve(needles_arr.size());
for (const auto & needle : needles_arr)
needles.emplace_back(needle.get<String>());
auto searcher = Impl::createMultiSearcherInBigHaystack(needles);
const size_t haystack_string_size = haystack_offsets.size();
res.resize(haystack_string_size);

View File

@ -1,17 +1,22 @@
#pragma once
#include <vector>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Name, typename Impl>
struct MultiSearchFirstPositionImpl
{
using ResultType = UInt64;
static constexpr bool is_using_hyperscan = false;
/// Variable for understanding, if we used offsets for the output, most
/// likely to determine whether the function returns ColumnVector of ColumnArray.
static constexpr bool is_column_array = false;
@ -22,10 +27,24 @@ struct MultiSearchFirstPositionImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
const Array & needles_arr,
PaddedPODArray<UInt64> & res,
[[maybe_unused]] PaddedPODArray<UInt64> & offsets)
[[maybe_unused]] PaddedPODArray<UInt64> & offsets,
bool /*allow_hyperscan*/,
size_t /*max_hyperscan_regexp_length*/,
size_t /*max_hyperscan_regexp_total_length*/)
{
// For performance of Volnitsky search, it is crucial to save only one byte for pattern number.
if (needles_arr.size() > std::numeric_limits<UInt8>::max())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at most {}",
name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits<UInt8>::max()));
std::vector<std::string_view> needles;
needles.reserve(needles_arr.size());
for (const auto & needle : needles_arr)
needles.emplace_back(needle.get<String>());
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64
{
return 1 + Impl::countChars(reinterpret_cast<const char *>(start), reinterpret_cast<const char *>(end));

View File

@ -1,17 +1,22 @@
#pragma once
#include <vector>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Name, typename Impl>
struct MultiSearchImpl
{
using ResultType = UInt8;
static constexpr bool is_using_hyperscan = false;
/// Variable for understanding, if we used offsets for the output, most
/// likely to determine whether the function returns ColumnVector of ColumnArray.
static constexpr bool is_column_array = false;
@ -22,10 +27,24 @@ struct MultiSearchImpl
static void vectorConstant(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const std::vector<StringRef> & needles,
const Array & needles_arr,
PaddedPODArray<UInt8> & res,
[[maybe_unused]] PaddedPODArray<UInt64> & offsets)
[[maybe_unused]] PaddedPODArray<UInt64> & offsets,
bool /*allow_hyperscan*/,
size_t /*max_hyperscan_regexp_length*/,
size_t /*max_hyperscan_regexp_total_length*/)
{
// For performance of Volnitsky search, it is crucial to save only one byte for pattern number.
if (needles_arr.size() > std::numeric_limits<UInt8>::max())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at most {}",
name, std::to_string(needles_arr.size()), std::to_string(std::numeric_limits<UInt8>::max()));
std::vector<std::string_view> needles;
needles.reserve(needles_arr.size());
for (const auto & needle : needles_arr)
needles.emplace_back(needle.get<String>());
auto searcher = Impl::createMultiSearcherInBigHaystack(needles);
const size_t haystack_string_size = haystack_offsets.size();
res.resize(haystack_string_size);

View File

@ -38,7 +38,7 @@ struct PositionCaseSensitiveASCII
return SearcherInSmallHaystack(needle_data, needle_size);
}
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<StringRef> & needles)
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<std::string_view> & needles)
{
return MultiSearcherInBigHaystack(needles);
}
@ -74,7 +74,7 @@ struct PositionCaseInsensitiveASCII
return SearcherInSmallHaystack(needle_data, needle_size);
}
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<StringRef> & needles)
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<std::string_view> & needles)
{
return MultiSearcherInBigHaystack(needles);
}
@ -106,7 +106,7 @@ struct PositionCaseSensitiveUTF8
return SearcherInSmallHaystack(needle_data, needle_size);
}
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<StringRef> & needles)
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<std::string_view> & needles)
{
return MultiSearcherInBigHaystack(needles);
}
@ -154,7 +154,7 @@ struct PositionCaseInsensitiveUTF8
return SearcherInSmallHaystack(needle_data, needle_size);
}
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<StringRef> & needles)
static MultiSearcherInBigHaystack createMultiSearcherInBigHaystack(const std::vector<std::string_view> & needles)
{
return MultiSearcherInBigHaystack(needles);
}

View File

@ -145,7 +145,7 @@ public:
Regexps * operator()()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (regexp)
return &*regexp;
regexp = constructor();
@ -166,10 +166,9 @@ struct Pool
std::map<std::pair<std::vector<String>, std::optional<UInt32>>, RegexpsConstructor> storage;
};
template <bool save_indices, bool CompileForEditDistance>
inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::optional<UInt32> edit_distance)
template <bool save_indices, bool WithEditDistance>
inline Regexps constructRegexps(const std::vector<String> & str_patterns, [[maybe_unused]] std::optional<UInt32> edit_distance)
{
(void)edit_distance;
/// Common pointers
std::vector<const char *> patterns;
std::vector<unsigned int> flags;
@ -181,7 +180,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::o
patterns.reserve(str_patterns.size());
flags.reserve(str_patterns.size());
if constexpr (CompileForEditDistance)
if constexpr (WithEditDistance)
{
ext_exprs.reserve(str_patterns.size());
ext_exprs_ptrs.reserve(str_patterns.size());
@ -199,7 +198,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::o
* as it is said in the Hyperscan documentation. https://intel.github.io/hyperscan/dev-reference/performance.html#single-match-flag
*/
flags.push_back(HS_FLAG_DOTALL | HS_FLAG_SINGLEMATCH | HS_FLAG_ALLOWEMPTY | HS_FLAG_UTF8);
if constexpr (CompileForEditDistance)
if constexpr (WithEditDistance)
{
/// Hyperscan currently does not support UTF8 matching with edit distance.
flags.back() &= ~HS_FLAG_UTF8;
@ -224,7 +223,7 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::o
}
hs_error_t err;
if constexpr (!CompileForEditDistance)
if constexpr (!WithEditDistance)
err = hs_compile_multi(
patterns.data(),
flags.data(),
@ -270,23 +269,22 @@ inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::o
if (err != HS_SUCCESS)
throw Exception("Could not allocate scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
return Regexps{db, scratch};
return {db, scratch};
}
/// If CompileForEditDistance is False, edit_distance must be nullopt
/// If WithEditDistance is False, edit_distance must be nullopt
/// Also, we use templates here because each instantiation of function
/// template has its own copy of local static variables which must not be the same
/// for different hyperscan compilations.
template <bool save_indices, bool CompileForEditDistance>
inline Regexps * get(const std::vector<StringRef> & patterns, std::optional<UInt32> edit_distance)
template <bool save_indices, bool WithEditDistance>
inline Regexps * get(const std::vector<std::string_view> & patterns, std::optional<UInt32> edit_distance)
{
/// C++11 has thread-safe function-local static on most modern compilers.
static Pool known_regexps; /// Different variables for different pattern parameters.
static Pool known_regexps; /// Different variables for different pattern parameters, thread-safe in C++11
std::vector<String> str_patterns;
str_patterns.reserve(patterns.size());
for (const StringRef & ref : patterns)
str_patterns.push_back(ref.toString());
for (const auto & pattern : patterns)
str_patterns.emplace_back(std::string(pattern.data(), pattern.size()));
/// Get the lock for finding database.
std::unique_lock lock(known_regexps.mutex);
@ -301,7 +299,7 @@ inline Regexps * get(const std::vector<StringRef> & patterns, std::optional<UInt
.first;
it->second.setConstructor([&str_patterns = it->first.first, edit_distance]()
{
return constructRegexps<save_indices, CompileForEditDistance>(str_patterns, edit_distance);
return constructRegexps<save_indices, WithEditDistance>(str_patterns, edit_distance);
});
}

View File

@ -1,4 +1,4 @@
#include <Functions/hyperscanRegexpChecker.h>
#include <Functions/checkHyperscanRegexp.h>
#include <Common/Exception.h>
@ -9,16 +9,16 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void checkRegexp(const std::vector<StringRef> & refs, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length)
void checkHyperscanRegexp(const std::vector<std::string_view> & regexps, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length)
{
if (max_hyperscan_regexp_length > 0 || max_hyperscan_regexp_total_length > 0)
{
size_t total_regexp_length = 0;
for (const auto & pattern : refs)
for (const auto & regexp : regexps)
{
if (max_hyperscan_regexp_length > 0 && pattern.size > max_hyperscan_regexp_length)
if (max_hyperscan_regexp_length > 0 && regexp.size() > max_hyperscan_regexp_length)
throw Exception("Regexp length too large", ErrorCodes::BAD_ARGUMENTS);
total_regexp_length += pattern.size;
total_regexp_length += regexp.size();
}
if (max_hyperscan_regexp_total_length > 0 && total_regexp_length > max_hyperscan_regexp_total_length)

View File

@ -0,0 +1,10 @@
#pragma once
#include <base/StringRef.h>
namespace DB
{
void checkHyperscanRegexp(const std::vector<std::string_view> & regexps, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length);
}

View File

@ -1,10 +0,0 @@
#pragma once
#include <base/StringRef.h>
namespace DB
{
void checkRegexp(const std::vector<StringRef> & refs, size_t max_hyperscan_regexp_length, size_t max_hyperscan_regexp_total_length);
}

View File

@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAllIndices
static constexpr auto name = "multiFuzzyMatchAllIndices";
};
using FunctionMultiFuzzyMatchAllIndices = FunctionsMultiStringFuzzySearch<
MultiMatchAllIndicesImpl<NameMultiFuzzyMatchAllIndices, UInt64, true>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiFuzzyMatchAllIndices = FunctionsMultiStringFuzzySearch<MultiMatchAllIndicesImpl<NameMultiFuzzyMatchAllIndices, /*ResultType*/ UInt64, /*WithEditDistance*/ true>>;
}

View File

@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAny
static constexpr auto name = "multiFuzzyMatchAny";
};
using FunctionMultiFuzzyMatchAny = FunctionsMultiStringFuzzySearch<
MultiMatchAnyImpl<NameMultiFuzzyMatchAny, UInt8, true, false, true>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiFuzzyMatchAny = FunctionsMultiStringFuzzySearch<MultiMatchAnyImpl<NameMultiFuzzyMatchAny, /*ResultType*/ UInt8, MultiMatchTraits::Find::Any, /*WithEditDistance*/ true>>;
}

View File

@ -13,9 +13,7 @@ struct NameMultiFuzzyMatchAnyIndex
static constexpr auto name = "multiFuzzyMatchAnyIndex";
};
using FunctionMultiFuzzyMatchAnyIndex = FunctionsMultiStringFuzzySearch<
MultiMatchAnyImpl<NameMultiFuzzyMatchAnyIndex, UInt64, false, true, true>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiFuzzyMatchAnyIndex = FunctionsMultiStringFuzzySearch<MultiMatchAnyImpl<NameMultiFuzzyMatchAnyIndex, /*ResultType*/ UInt64, MultiMatchTraits::Find::AnyIndex, /*WithEditDistance*/ true>>;
}

View File

@ -13,9 +13,7 @@ struct NameMultiMatchAllIndices
static constexpr auto name = "multiMatchAllIndices";
};
using FunctionMultiMatchAllIndices = FunctionsMultiStringSearch<
MultiMatchAllIndicesImpl<NameMultiMatchAllIndices, UInt64, false>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiMatchAllIndices = FunctionsMultiStringSearch<MultiMatchAllIndicesImpl<NameMultiMatchAllIndices, /*ResultType*/ UInt64, /*WithEditDistance*/ false>>;
}

View File

@ -13,9 +13,7 @@ struct NameMultiMatchAny
static constexpr auto name = "multiMatchAny";
};
using FunctionMultiMatchAny = FunctionsMultiStringSearch<
MultiMatchAnyImpl<NameMultiMatchAny, UInt8, true, false, false>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiMatchAny = FunctionsMultiStringSearch<MultiMatchAnyImpl<NameMultiMatchAny, /*ResultType*/ UInt8, MultiMatchTraits::Find::Any, /*WithEditDistance*/ false>>;
}

View File

@ -13,9 +13,7 @@ struct NameMultiMatchAnyIndex
static constexpr auto name = "multiMatchAnyIndex";
};
using FunctionMultiMatchAnyIndex = FunctionsMultiStringSearch<
MultiMatchAnyImpl<NameMultiMatchAnyIndex, UInt64, false, true, false>,
std::numeric_limits<UInt32>::max()>;
using FunctionMultiMatchAnyIndex = FunctionsMultiStringSearch<MultiMatchAnyImpl<NameMultiMatchAnyIndex, /*ResultType*/ UInt64, MultiMatchTraits::Find::AnyIndex, /*WithEditDistance*/ false>>;
}

View File

@ -13,8 +13,7 @@ struct NameMultiSearchAnyCaseInsensitive
{
static constexpr auto name = "multiSearchAnyCaseInsensitive";
};
using FunctionMultiSearchCaseInsensitive
= FunctionsMultiStringSearch<MultiSearchImpl<NameMultiSearchAnyCaseInsensitive, PositionCaseInsensitiveASCII>>;
using FunctionMultiSearchCaseInsensitive = FunctionsMultiStringSearch<MultiSearchImpl<NameMultiSearchAnyCaseInsensitive, PositionCaseInsensitiveASCII>>;
}

View File

@ -14,8 +14,7 @@ struct NameMultiSearchFirstIndex
static constexpr auto name = "multiSearchFirstIndex";
};
using FunctionMultiSearchFirstIndex
= FunctionsMultiStringSearch<MultiSearchFirstIndexImpl<NameMultiSearchFirstIndex, PositionCaseSensitiveASCII>>;
using FunctionMultiSearchFirstIndex = FunctionsMultiStringSearch<MultiSearchFirstIndexImpl<NameMultiSearchFirstIndex, PositionCaseSensitiveASCII>>;
}

View File

@ -38,7 +38,6 @@ MutableColumns InternalTextLogsQueue::getSampleColumns()
void InternalTextLogsQueue::pushBlock(Block && log_block)
{
OvercommitTrackerBlockerInThread blocker;
static Block sample_block = getSampleBlock();
if (blocksHaveEqualStructure(sample_block, log_block))

View File

@ -18,62 +18,6 @@ public:
static Block getSampleBlock();
static MutableColumns getSampleColumns();
template <typename... Args>
bool push(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::push(std::forward<Args>(args)...);
}
template <typename... Args>
bool emplace(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::emplace(std::forward<Args>(args)...);
}
template <typename... Args>
bool pop(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::pop(std::forward<Args>(args)...);
}
template <typename... Args>
bool tryPush(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::tryPush(std::forward<Args>(args)...);
}
template <typename... Args>
bool tryEmplace(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::tryEmplace(std::forward<Args>(args)...);
}
template <typename... Args>
bool tryPop(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::tryPop(std::forward<Args>(args)...);
}
template <typename... Args>
void clear(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::clear(std::forward<Args>(args)...);
}
template <typename... Args>
void clearAndFinish(Args &&... args)
{
OvercommitTrackerBlockerInThread blocker;
return ConcurrentBoundedQueue::clearAndFinish(std::forward<Args>(args)...);
}
/// Is used to pass block from remote server to the client
void pushBlock(Block && log_block);

View File

@ -20,7 +20,6 @@ public:
bool ignoreLimits() const override { return true; }
bool supportsTransactions() const override { return true; }
private:
BlockIO executeBegin(ContextMutablePtr session_context);
BlockIO executeCommit(ContextMutablePtr session_context);
static BlockIO executeRollback(ContextMutablePtr session_context);

View File

@ -42,18 +42,19 @@
#include <Interpreters/ApplyWithGlobalVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/InterpreterTransactionControlQuery.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/SelectIntersectExceptQueryVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/executeQuery.h>
#include <Common/ProfileEvents.h>
#include <Common/SensitiveDataMasker.h>
@ -68,6 +69,7 @@
#include <base/EnumReflection.h>
#include <base/demangle.h>
#include <memory>
#include <random>
@ -416,7 +418,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
chassert(txn->getState() != MergeTreeTransaction::COMMITTING);
chassert(txn->getState() != MergeTreeTransaction::COMMITTED);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as<ASTTransactionControl>() && !ast->as<ASTExplainQuery>())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query because current transaction failed. Expecting ROLLBACK statement.");
throw Exception(
ErrorCodes::INVALID_TRANSACTION,
"Cannot execute query because current transaction failed. Expecting ROLLBACK statement");
}
/// Interpret SETTINGS clauses as early as possible (before invoking the corresponding interpreter),
@ -498,6 +502,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
BlockIO res;
String query_for_logging;
std::shared_ptr<InterpreterTransactionControlQuery> implicit_txn_control{};
try
{
@ -621,11 +626,37 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!table_id.empty())
context->setInsertionTable(table_id);
if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
if (context->getCurrentTransaction() && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts inside transactions are not supported");
if (settings.implicit_transaction && settings.throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Async inserts with 'implicit_transaction' are not supported");
}
else
{
/// We need to start the (implicit) transaction before getting the interpreter as this will get links to the latest snapshots
if (!context->getCurrentTransaction() && settings.implicit_transaction && !ast->as<ASTTransactionControl>())
{
try
{
if (context->isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot create transactions");
/// If there is no session (which is the default for the HTTP Handler), set up one just for this as it is necessary
/// to control the transaction lifetime
if (!context->hasSessionContext())
context->makeSessionContext();
auto tc = std::make_shared<InterpreterTransactionControlQuery>(ast, context);
tc->executeBegin(context->getSessionContext());
implicit_txn_control = std::move(tc);
}
catch (Exception & e)
{
e.addMessage("while starting a transaction with 'implicit_transaction'");
throw;
}
}
interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
if (context->getCurrentTransaction() && !interpreter->supportsTransactions() &&
@ -813,15 +844,16 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
};
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback = [elem, context, ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
pulling_pipeline = pipeline.pulling()
]
(QueryPipeline & query_pipeline) mutable
auto finish_callback = [elem,
context,
ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
implicit_txn_control,
pulling_pipeline = pipeline.pulling()](QueryPipeline & query_pipeline) mutable
{
QueryStatus * process_list_elem = context->getProcessListElement();
@ -942,15 +974,40 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
opentelemetry_span_log->add(span);
}
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
};
auto exception_callback = [elem, context, ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
quota(quota), status_info_to_query_log] () mutable
auto exception_callback = [elem,
context,
ast,
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
quota(quota),
status_info_to_query_log,
implicit_txn_control]() mutable
{
if (auto txn = context->getCurrentTransaction())
if (implicit_txn_control)
{
implicit_txn_control->executeRollback(context->getSessionContext());
implicit_txn_control.reset();
}
else if (auto txn = context->getCurrentTransaction())
txn->onException();
if (quota)
@ -1000,7 +1057,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
}
};
res.finish_callback = std::move(finish_callback);
@ -1009,8 +1065,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
catch (...)
{
if (auto txn = context->getCurrentTransaction())
if (implicit_txn_control)
{
implicit_txn_control->executeRollback(context->getSessionContext());
implicit_txn_control.reset();
}
else if (auto txn = context->getCurrentTransaction())
{
txn->onException();
}
if (!internal)
{

View File

@ -10,7 +10,9 @@
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace DB
{
@ -18,8 +20,10 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
#if USE_KRB5
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int KERBEROS_ERROR;
#endif // USE_KRB5
}
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
@ -40,25 +44,28 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
String key_name;
if (key == "hadoop_kerberos_keytab")
{
#if USE_KRB5
need_kinit = true;
hadoop_kerberos_keytab = config.getString(key_path);
#else // USE_KRB5
LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_keytab parameter is ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
continue;
}
else if (key == "hadoop_kerberos_principal")
{
#if USE_KRB5
need_kinit = true;
hadoop_kerberos_principal = config.getString(key_path);
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
continue;
}
else if (key == "hadoop_kerberos_kinit_command")
{
need_kinit = true;
hadoop_kerberos_kinit_command = config.getString(key_path);
#else // USE_KRB5
LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_principal parameter is ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
continue;
}
else if (key == "hadoop_security_kerberos_ticket_cache_path")
{
#if USE_KRB5
if (isUser)
{
throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user",
@ -67,6 +74,9 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass further
#else // USE_KRB5
LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop.security.kerberos.ticket.cache.path parameter is ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
}
key_name = boost::replace_all_copy(key, "_", ".");
@ -76,44 +86,21 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
}
}
String HDFSBuilderWrapper::getKinitCmd()
{
if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty())
{
throw Exception("Not enough parameters to run kinit",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
WriteBufferFromOwnString ss;
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
// command to run looks like
// kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || ..
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
#if USE_KRB5
void HDFSBuilderWrapper::runKinit()
{
String cmd = getKinitCmd();
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd);
std::unique_lock<std::mutex> lck(kinit_mtx);
auto command = ShellCommand::execute(cmd);
auto status = command->tryWait();
if (status)
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Running KerberosInit");
try
{
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
kerberosInit(hadoop_kerberos_keytab,hadoop_kerberos_principal,hadoop_security_kerberos_ticket_cache_path);
}
catch (const DB::Exception & e)
{
throw Exception("KerberosInit failure: "+ getExceptionMessage(e, false), ErrorCodes::KERBEROS_ERROR);
}
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Finished KerberosInit");
}
#endif // USE_KRB5
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config)
{
@ -184,16 +171,16 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
}
}
#if USE_KRB5
if (builder.need_kinit)
{
builder.runKinit();
}
#endif // USE_KRB5
return builder;
}
std::mutex HDFSBuilderWrapper::kinit_mtx;
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));

View File

@ -9,7 +9,6 @@
#include <hdfs/hdfs.h>
#include <base/types.h>
#include <mutex>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -69,10 +68,6 @@ public:
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
String getKinitCmd();
void runKinit();
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v)
{
@ -80,14 +75,15 @@ private:
}
hdfsBuilder * hdfs_builder;
std::vector<std::pair<String, String>> config_stor;
#if USE_KRB5
void runKinit();
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
bool need_kinit{false};
#endif // USE_KRB5
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;

View File

@ -45,7 +45,9 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace CurrentMetrics
{
@ -517,6 +519,33 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
if (config.has(config_prefix))
loadFromConfig(conf, config, config_prefix);
#if USE_KRB5
if (conf.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
conf.set("sasl.kerberos.kinit.cmd","");
conf.set("sasl.kerberos.min.time.before.relogin","0");
if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal"))
{
String keytab = conf.get("sasl.kerberos.keytab");
String principal = conf.get("sasl.kerberos.principal");
LOG_DEBUG(log, "Running KerberosInit");
try
{
kerberosInit(keytab,principal);
}
catch (const Exception & e)
{
LOG_ERROR(log, "KerberosInit failure: {}", getExceptionMessage(e, false));
}
LOG_DEBUG(log, "Finished KerberosInit");
}
#else // USE_KRB5
if (conf.has_property("sasl.kerberos.keytab") || conf.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Kerberos-related parameters are ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{

View File

@ -151,6 +151,8 @@ def main():
needs_data = json.load(file_handler)
required_builds = len(needs_data)
logging.info("The next builds are required: %s", ", ".join(needs_data))
gh = Github(get_best_robot_token())
pr_info = PRInfo()
rerun_helper = RerunHelper(gh, pr_info, build_check_name)
@ -159,7 +161,6 @@ def main():
sys.exit(0)
builds_for_check = CI_CONFIG["builds_report_config"][build_check_name]
logging.info("My reports list %s", builds_for_check)
required_builds = required_builds or len(builds_for_check)
# Collect reports from json artifacts

View File

@ -5,6 +5,7 @@
# pylint: disable=too-many-lines
import enum
from queue import Full
import shutil
import sys
import os
@ -1581,15 +1582,18 @@ def do_run_tests(jobs, test_suite: TestSuite, parallel):
for _ in range(jobs):
parallel_tests_array.append((None, batch_size, test_suite))
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map_async(run_tests_array, parallel_tests_array)
try:
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map_async(run_tests_array, parallel_tests_array)
for suit in test_suite.parallel_tests:
queue.put(suit, timeout=args.timeout * 1.1)
for suit in test_suite.parallel_tests:
queue.put(suit, timeout=args.timeout * 1.1)
for _ in range(jobs):
queue.put(None, timeout=args.timeout * 1.1)
for _ in range(jobs):
queue.put(None, timeout=args.timeout * 1.1)
queue.close()
except Full:
queue.close()
pool.join()

View File

@ -1,6 +1,6 @@
<clickhouse>
<database_catalog_unused_dir_hide_timeout_sec>0</database_catalog_unused_dir_hide_timeout_sec>
<database_catalog_unused_dir_rm_timeout_sec>15</database_catalog_unused_dir_rm_timeout_sec>
<database_catalog_unused_dir_rm_timeout_sec>60</database_catalog_unused_dir_rm_timeout_sec>
<database_catalog_unused_dir_cleanup_period_sec>1</database_catalog_unused_dir_cleanup_period_sec>
<!-- We don't really need [Zoo]Keeper for this test.

View File

@ -213,9 +213,11 @@ def test_store_cleanup(started_cluster):
node1.query("DETACH DATABASE db2")
node1.query("DETACH TABLE db3.log")
node1.wait_for_log_line("Removing access rights for unused directory")
time.sleep(1)
node1.wait_for_log_line("testgarbage")
node1.wait_for_log_line(
"Removing access rights for unused directory",
timeout=60,
look_behind_lines=1000,
)
node1.wait_for_log_line("directories from store")
store = node1.exec_in_container(["ls", f"{path_to_data}/store"])
@ -268,8 +270,9 @@ def test_store_cleanup(started_cluster):
["ls", "-l", f"{path_to_data}/store/456"]
)
node1.wait_for_log_line("Removing unused directory")
time.sleep(1)
node1.wait_for_log_line(
"Removing unused directory", timeout=90, look_behind_lines=1000
)
node1.wait_for_log_line("directories from store")
store = node1.exec_in_container(["ls", f"{path_to_data}/store"])

View File

@ -113,7 +113,7 @@ def test_read_table_expired(started_cluster):
)
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: kinit failure:" in str(ex)
assert "DB::Exception: KerberosInit failure:" in str(ex)
started_cluster.unpause_container("hdfskerberos")

View File

@ -128,6 +128,48 @@ def test_kafka_json_as_string(kafka_cluster):
)
def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster):
# Ticket should be expired after the wait time
# On run of SELECT query new ticket should be requested and SELECT query should run fine.
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
time.sleep(45) # wait for ticket expiration
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
def test_kafka_json_as_string_no_kdc(kafka_cluster):
# When the test is run alone (not preceded by any other kerberized kafka test),
# we need a ticket to
@ -182,7 +224,7 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
assert instance.contains_in_log("Kerberos ticket refresh failed")
assert instance.contains_in_log("KerberosInit failure:")
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
-- Tags: zookeeper
-- Tags: zookeeper, no-backward-compatibility-check
DROP TABLE IF EXISTS table_rename_with_ttl;

View File

@ -0,0 +1,14 @@
no_transaction_landing 10000
no_transaction_target 0
after_transaction_landing 0
after_transaction_target 0
after_implicit_txn_in_query_settings_landing 0
after_implicit_txn_in_query_settings_target 0
after_implicit_txn_in_session_landing 0
after_implicit_txn_in_session_target 0
inside_txn_and_implicit 1
inside_txn_and_implicit 1
in_transaction 10000
out_transaction 0
{"'implicit_True'":"implicit_True","all":"2","is_empty":0}
{"'implicit_False'":"implicit_False","all":"2","is_empty":1}

View File

@ -0,0 +1,92 @@
CREATE TABLE landing (n Int64) engine=MergeTree order by n;
CREATE TABLE target (n Int64) engine=MergeTree order by n;
CREATE MATERIALIZED VIEW landing_to_target TO target AS
SELECT n + throwIf(n == 3333)
FROM landing;
INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError 395 }
SELECT 'no_transaction_landing', count() FROM landing;
SELECT 'no_transaction_target', count() FROM target;
TRUNCATE TABLE landing;
TRUNCATE TABLE target;
BEGIN TRANSACTION;
INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError 395 }
ROLLBACK;
SELECT 'after_transaction_landing', count() FROM landing;
SELECT 'after_transaction_target', count() FROM target;
-- Same but using implicit_transaction
INSERT INTO landing SETTINGS implicit_transaction=True SELECT * FROM numbers(10000); -- { serverError 395 }
SELECT 'after_implicit_txn_in_query_settings_landing', count() FROM landing;
SELECT 'after_implicit_txn_in_query_settings_target', count() FROM target;
-- Same but using implicit_transaction in a session
SET implicit_transaction=True;
INSERT INTO landing SELECT * FROM numbers(10000); -- { serverError 395 }
SET implicit_transaction=False;
SELECT 'after_implicit_txn_in_session_landing', count() FROM landing;
SELECT 'after_implicit_txn_in_session_target', count() FROM target;
-- Reading from incompatible sources with implicit_transaction works the same way as with normal transactions:
-- Currently reading from system tables inside a transaction is Not implemented:
SELECT name, value, changed FROM system.settings where name = 'implicit_transaction' SETTINGS implicit_transaction=True; -- { serverError 48 }
-- Verify that you don't have to manually close transactions with implicit_transaction
SET implicit_transaction=True;
SELECT throwIf(number == 0) FROM numbers(100); -- { serverError 395 }
SELECT throwIf(number == 0) FROM numbers(100); -- { serverError 395 }
SELECT throwIf(number == 0) FROM numbers(100); -- { serverError 395 }
SELECT throwIf(number == 0) FROM numbers(100); -- { serverError 395 }
SET implicit_transaction=False;
-- implicit_transaction is ignored when inside a transaction (no recursive transaction error)
BEGIN TRANSACTION;
SELECT 'inside_txn_and_implicit', 1 SETTINGS implicit_transaction=True;
SELECT throwIf(number == 0) FROM numbers(100) SETTINGS implicit_transaction=True; -- { serverError 395 }
ROLLBACK;
SELECT 'inside_txn_and_implicit', 1 SETTINGS implicit_transaction=True;
-- You can work with transactions even if `implicit_transaction=True` is set
SET implicit_transaction=True;
BEGIN TRANSACTION;
INSERT INTO target SELECT * FROM numbers(10000);
SELECT 'in_transaction', count() FROM target;
ROLLBACK;
SELECT 'out_transaction', count() FROM target;
SET implicit_transaction=False;
-- Verify that the transaction_id column is populated correctly
SELECT 'Looking_at_transaction_id_True' FORMAT Null SETTINGS implicit_transaction=1;
-- Verify that the transaction_id column is NOT populated without transaction
SELECT 'Looking_at_transaction_id_False' FORMAT Null SETTINGS implicit_transaction=0;
SYSTEM FLUSH LOGS;
SELECT
'implicit_True',
count() as all,
transaction_id = (0,0,'00000000-0000-0000-0000-000000000000') as is_empty
FROM system.query_log
WHERE
current_database = currentDatabase() AND
event_date >= yesterday() AND
query LIKE '-- Verify that the transaction_id column is populated correctly%'
GROUP BY transaction_id
FORMAT JSONEachRow;
SELECT
'implicit_False',
count() as all,
transaction_id = (0,0,'00000000-0000-0000-0000-000000000000') as is_empty
FROM system.query_log
WHERE
current_database = currentDatabase() AND
event_date >= yesterday() AND
query LIKE '-- Verify that the transaction_id column is NOT populated without transaction%'
GROUP BY transaction_id
FORMAT JSONEachRow;