Merge branch 'master' into patch-18

This commit is contained in:
Denis Zhuravlev 2020-03-06 21:22:51 -04:00 committed by GitHub
commit de1420ebd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
972 changed files with 14534 additions and 4735 deletions

View File

@ -52,7 +52,7 @@ IncludeCategories:
ReflowComments: false
AlignEscapedNewlinesLeft: false
AlignEscapedNewlines: DontAlign
AlignTrailingComments: true
AlignTrailingComments: false
# Not changed:
AccessModifierOffset: -4

2
.gitmodules vendored
View File

@ -140,7 +140,7 @@
url = https://github.com/ClickHouse-Extras/libc-headers.git
[submodule "contrib/replxx"]
path = contrib/replxx
url = https://github.com/AmokHuginnsson/replxx.git
url = https://github.com/ClickHouse-Extras/replxx.git
[submodule "contrib/ryu"]
path = contrib/ryu
url = https://github.com/ClickHouse-Extras/ryu.git

View File

@ -88,8 +88,7 @@ endif()
include (cmake/sanitize.cmake)
if (CMAKE_GENERATOR STREQUAL "Ninja")
if (CMAKE_GENERATOR STREQUAL "Ninja" AND NOT DISABLE_COLORED_BUILD)
# Turn on colored output. https://github.com/ninja-build/ninja/wiki/FAQ
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-color=always")
@ -215,6 +214,10 @@ if (COMPILER_CLANG)
# TODO investigate that
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-omit-frame-pointer")
if (OS_DARWIN)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wl,-U,_inside_main")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wl,-U,_inside_main")
endif()
endif ()
option (ENABLE_LIBRARIES "Enable all libraries (Global default switch)" ON)

View File

@ -12,6 +12,3 @@ ClickHouse is an open-source column-oriented database management system that all
* [Contacts](https://clickhouse.tech/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in Athens](https://www.meetup.com/Athens-Big-Data/events/268379195/) on March 5.

View File

@ -24,6 +24,11 @@ if (ENABLE_REPLXX)
ReplxxLineReader.cpp
ReplxxLineReader.h
)
elseif (ENABLE_READLINE)
set (SRCS ${SRCS}
ReadlineLineReader.cpp
ReadlineLineReader.h
)
endif ()
if (USE_DEBUG_HELPERS)
@ -57,6 +62,28 @@ endif()
target_link_libraries(common PUBLIC replxx)
# allow explicitly fallback to readline
if (NOT ENABLE_REPLXX AND ENABLE_READLINE)
message (STATUS "Attempt to fallback to readline explicitly")
set (READLINE_PATHS "/usr/local/opt/readline/lib")
# First try find custom lib for macos users (default lib without history support)
find_library (READLINE_LIB NAMES readline PATHS ${READLINE_PATHS} NO_DEFAULT_PATH)
if (NOT READLINE_LIB)
find_library (READLINE_LIB NAMES readline PATHS ${READLINE_PATHS})
endif ()
set(READLINE_INCLUDE_PATHS "/usr/local/opt/readline/include")
find_path (READLINE_INCLUDE_DIR NAMES readline/readline.h PATHS ${READLINE_INCLUDE_PATHS} NO_DEFAULT_PATH)
if (NOT READLINE_INCLUDE_DIR)
find_path (READLINE_INCLUDE_DIR NAMES readline/readline.h PATHS ${READLINE_INCLUDE_PATHS})
endif ()
if (READLINE_INCLUDE_DIR AND READLINE_LIB)
target_link_libraries(common PUBLIC ${READLINE_LIB})
target_compile_definitions(common PUBLIC USE_READLINE=1)
message (STATUS "Using readline: ${READLINE_INCLUDE_DIR} : ${READLINE_LIB}")
endif ()
endif ()
target_link_libraries (common
PUBLIC
${Poco_Util_LIBRARY}

View File

@ -53,11 +53,18 @@ LineReader::Suggest::WordsRange LineReader::Suggest::getCompletions(const String
/// last_word can be empty.
return std::equal_range(
words.begin(), words.end(), last_word, [prefix_length](std::string_view s, std::string_view prefix_searched)
{
return strncmp(s.data(), prefix_searched.data(), prefix_length) < 0;
});
if (case_insensitive)
return std::equal_range(
words.begin(), words.end(), last_word, [prefix_length](std::string_view s, std::string_view prefix_searched)
{
return strncasecmp(s.data(), prefix_searched.data(), prefix_length) < 0;
});
else
return std::equal_range(
words.begin(), words.end(), last_word, [prefix_length](std::string_view s, std::string_view prefix_searched)
{
return strncmp(s.data(), prefix_searched.data(), prefix_length) < 0;
});
}
LineReader::LineReader(const String & history_file_path_, char extender_, char delimiter_)

View File

@ -8,18 +8,19 @@
class LineReader
{
public:
class Suggest
struct Suggest
{
protected:
using Words = std::vector<std::string>;
using WordsRange = std::pair<Words::const_iterator, Words::const_iterator>;
Words words;
std::atomic<bool> ready{false};
public:
/// Get iterators for the matched range of words if any.
WordsRange getCompletions(const String & prefix, size_t prefix_length) const;
/// case sensitive suggestion
bool case_insensitive = false;
};
LineReader(const String & history_file_path, char extender, char delimiter = 0); /// if delimiter != 0, then it's multiline mode
@ -31,6 +32,13 @@ public:
/// Typical delimiter is ';' (semicolon) and typical extender is '\' (backslash).
String readLine(const String & first_prompt, const String & second_prompt);
/// When bracketed paste mode is set, pasted text is bracketed with control sequences so
/// that the program can differentiate pasted text from typed-in text. This helps
/// clickhouse-client so that without -m flag, one can still paste multiline queries, and
/// possibly get better pasting performance. See https://cirw.in/blog/bracketed-paste for
/// more details.
virtual void enableBracketedPaste() {}
protected:
enum InputStatus
{

View File

@ -0,0 +1,173 @@
#include <common/ReadlineLineReader.h>
#include <ext/scope_guard.h>
#include <errno.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
namespace
{
/// Trim ending whitespace inplace
void trim(String & s)
{
s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) { return !std::isspace(ch); }).base(), s.end());
}
}
static const LineReader::Suggest * suggest;
/// Points to current word to suggest.
static LineReader::Suggest::Words::const_iterator pos;
/// Points after the last possible match.
static LineReader::Suggest::Words::const_iterator end;
/// Set iterators to the matched range of words if any.
static void findRange(const char * prefix, size_t prefix_length)
{
std::string prefix_str(prefix);
std::tie(pos, end) = suggest->getCompletions(prefix_str, prefix_length);
}
/// Iterates through matched range.
static char * nextMatch()
{
if (pos >= end)
return nullptr;
/// readline will free memory by itself.
char * word = strdup(pos->c_str());
++pos;
return word;
}
static char * generate(const char * text, int state)
{
if (!suggest->ready)
return nullptr;
if (state == 0)
findRange(text, strlen(text));
/// Do not append whitespace after word. For unknown reason, rl_completion_append_character = '\0' does not work.
rl_completion_suppress_append = 1;
return nextMatch();
};
ReadlineLineReader::ReadlineLineReader(const Suggest & suggest_, const String & history_file_path_, char extender_, char delimiter_)
: LineReader(history_file_path_, extender_, delimiter_)
{
suggest = &suggest_;
if (!history_file_path.empty())
{
int res = read_history(history_file_path.c_str());
if (res)
std::cerr << "Cannot read history from file " + history_file_path + ": "+ strerror(errno) << std::endl;
}
/// Added '.' to the default list. Because it is used to separate database and table.
rl_basic_word_break_characters = word_break_characters;
/// Not append whitespace after single suggestion. Because whitespace after function name is meaningless.
rl_completion_append_character = '\0';
rl_completion_entry_function = generate;
/// Install Ctrl+C signal handler that will be used in interactive mode.
if (rl_initialize())
throw std::runtime_error("Cannot initialize readline");
auto clear_prompt_or_exit = [](int)
{
/// This is signal safe.
ssize_t res = write(STDOUT_FILENO, "\n", 1);
/// Allow to quit client while query is in progress by pressing Ctrl+C twice.
/// (First press to Ctrl+C will try to cancel query by InterruptListener).
if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
{
rl_replace_line("", 0);
if (rl_forced_update_display())
_exit(0);
}
else
{
/// A little dirty, but we struggle to find better way to correctly
/// force readline to exit after returning from the signal handler.
_exit(0);
}
};
if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
throw std::runtime_error(std::string("Cannot set signal handler for readline: ") + strerror(errno));
}
ReadlineLineReader::~ReadlineLineReader()
{
}
LineReader::InputStatus ReadlineLineReader::readOneLine(const String & prompt)
{
input.clear();
const char* cinput = readline(prompt.c_str());
if (cinput == nullptr)
return (errno != EAGAIN) ? ABORT : RESET_LINE;
input = cinput;
trim(input);
return INPUT_LINE;
}
void ReadlineLineReader::addToHistory(const String & line)
{
add_history(line.c_str());
}
#if RL_VERSION_MAJOR >= 7
#define BRACK_PASTE_PREF "\033[200~"
#define BRACK_PASTE_SUFF "\033[201~"
#define BRACK_PASTE_LAST '~'
#define BRACK_PASTE_SLEN 6
/// This handler bypasses some unused macro/event checkings and remove trailing newlines before insertion.
static int clickhouse_rl_bracketed_paste_begin(int /* count */, int /* key */)
{
std::string buf;
buf.reserve(128);
RL_SETSTATE(RL_STATE_MOREINPUT);
SCOPE_EXIT(RL_UNSETSTATE(RL_STATE_MOREINPUT));
int c;
while ((c = rl_read_key()) >= 0)
{
if (c == '\r')
c = '\n';
buf.push_back(c);
if (buf.size() >= BRACK_PASTE_SLEN && c == BRACK_PASTE_LAST && buf.substr(buf.size() - BRACK_PASTE_SLEN) == BRACK_PASTE_SUFF)
{
buf.resize(buf.size() - BRACK_PASTE_SLEN);
break;
}
}
trim(buf);
return static_cast<size_t>(rl_insert_text(buf.c_str())) == buf.size() ? 0 : 1;
}
#endif
void ReadlineLineReader::enableBracketedPaste()
{
#if RL_VERSION_MAJOR >= 7
rl_variable_bind("enable-bracketed-paste", "on");
/// Use our bracketed paste handler to get better user experience. See comments above.
rl_bind_keyseq(BRACK_PASTE_PREF, clickhouse_rl_bracketed_paste_begin);
#endif
};

View File

@ -0,0 +1,19 @@
#pragma once
#include "LineReader.h"
#include <readline/readline.h>
#include <readline/history.h>
class ReadlineLineReader : public LineReader
{
public:
ReadlineLineReader(const Suggest & suggest, const String & history_file_path, char extender, char delimiter = 0);
~ReadlineLineReader() override;
void enableBracketedPaste() override;
private:
InputStatus readOneLine(const String & prompt) override;
void addToHistory(const String & line) override;
};

View File

@ -3,6 +3,7 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <functional>
namespace
{
@ -18,18 +19,31 @@ void trim(String & s)
ReplxxLineReader::ReplxxLineReader(const Suggest & suggest, const String & history_file_path_, char extender_, char delimiter_)
: LineReader(history_file_path_, extender_, delimiter_)
{
using namespace std::placeholders;
using Replxx = replxx::Replxx;
if (!history_file_path.empty())
rx.history_load(history_file_path);
auto callback = [&suggest] (const String & context, size_t context_size)
{
auto range = suggest.getCompletions(context, context_size);
return replxx::Replxx::completions_t(range.first, range.second);
return Replxx::completions_t(range.first, range.second);
};
rx.set_completion_callback(callback);
rx.set_complete_on_empty(false);
rx.set_word_break_characters(word_break_characters);
/// By default C-p/C-n binded to COMPLETE_NEXT/COMPLETE_PREV,
/// bind C-p/C-n to history-previous/history-next like readline.
rx.bind_key(Replxx::KEY::control('N'), std::bind(&Replxx::invoke, &rx, Replxx::ACTION::HISTORY_NEXT, _1));
rx.bind_key(Replxx::KEY::control('P'), std::bind(&Replxx::invoke, &rx, Replxx::ACTION::HISTORY_PREVIOUS, _1));
/// By default COMPLETE_NEXT/COMPLETE_PREV was binded to C-p/C-n, re-bind
/// to M-P/M-N (that was used for HISTORY_COMMON_PREFIX_SEARCH before, but
/// it also binded to M-p/M-n).
rx.bind_key(Replxx::KEY::meta('N'), std::bind(&Replxx::invoke, &rx, Replxx::ACTION::COMPLETE_NEXT, _1));
rx.bind_key(Replxx::KEY::meta('P'), std::bind(&Replxx::invoke, &rx, Replxx::ACTION::COMPLETE_PREVIOUS, _1));
}
ReplxxLineReader::~ReplxxLineReader()
@ -55,3 +69,8 @@ void ReplxxLineReader::addToHistory(const String & line)
{
rx.history_add(line);
}
void ReplxxLineReader::enableBracketedPaste()
{
rx.enable_bracketed_paste();
};

View File

@ -10,6 +10,8 @@ public:
ReplxxLineReader(const Suggest & suggest, const String & history_file_path, char extender, char delimiter = 0);
~ReplxxLineReader() override;
void enableBracketedPaste() override;
private:
InputStatus readOneLine(const String & prompt) override;
void addToHistory(const String & line) override;

View File

@ -3,6 +3,8 @@
#if OS_LINUX
#include <unistd.h>
#include <syscall.h>
#elif OS_FREEBSD
#include <pthread_np.h>
#else
#include <pthread.h>
#include <stdexcept>
@ -16,6 +18,8 @@ uint64_t getThreadId()
{
#if OS_LINUX
current_tid = syscall(SYS_gettid); /// This call is always successful. - man gettid
#elif OS_FREEBSD
current_tid = pthread_getthreadid_np();
#else
if (0 != pthread_threadid_np(nullptr, &current_tid))
throw std::logic_error("pthread_threadid_np returned error");

View File

@ -80,7 +80,6 @@ dumpImpl(Out & out, T && x)
}
/// Tuple, pair
template <size_t N, typename Out, typename T>
Out & dumpTupleImpl(Out & out, T && x)

View File

@ -88,10 +88,14 @@ using signal_function = void(int, siginfo_t*, void*);
static void writeSignalIDtoSignalPipe(int sig)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[buf_size];
DB::WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], buf_size, buf);
DB::writeBinary(sig, out);
out.next();
errno = saved_errno;
}
/** Signal handler for HUP / USR1 */
@ -110,6 +114,8 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co
*/
static void signalHandler(int sig, siginfo_t * info, void * context)
{
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
char buf[buf_size];
DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf);
@ -134,6 +140,8 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
::sleep(10);
call_default_signal_handler(sig);
}
errno = saved_errno;
}

View File

@ -1,44 +0,0 @@
#pragma once
#include <memory>
namespace ext
{
/** Thread-unsafe singleton. It works simply like a global variable.
* Supports deinitialization.
*
* In most of the cases, you don't need this class.
* Use "Meyers Singleton" instead: static T & instance() { static T x; return x; }
*/
template <class T>
class Singleton
{
public:
Singleton()
{
if (!instance)
instance = std::make_unique<T>();
}
T * operator->()
{
return instance.get();
}
static bool isInitialized()
{
return !!instance;
}
static void reset()
{
instance.reset();
}
private:
inline static std::unique_ptr<T> instance{};
};
}

View File

@ -2,7 +2,6 @@
#include <stdint.h>
#include <time.h>
#include "atomic.h"
#include "musl_features.h"
#include "syscall.h"
#ifdef VDSO_CGT_SYM
@ -54,7 +53,7 @@ static void *volatile vdso_func = (void *)cgt_init;
#endif
int __clock_gettime(clockid_t clk, struct timespec *ts)
int clock_gettime(clockid_t clk, struct timespec *ts)
{
int r;
@ -104,5 +103,3 @@ int __clock_gettime(clockid_t clk, struct timespec *ts)
return __syscall_ret(r);
#endif
}
weak_alias(__clock_gettime, clock_gettime);

View File

@ -1,10 +1,9 @@
#include <errno.h>
#include <pthread.h>
#include <time.h>
#include "musl_features.h"
#include "syscall.h"
int __clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, struct timespec * rem)
int clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, struct timespec * rem)
{
if (clk == CLOCK_THREAD_CPUTIME_ID)
return EINVAL;
@ -23,5 +22,3 @@ int __clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, str
pthread_setcanceltype(old_cancel_type, NULL);
return status;
}
weak_alias(__clock_nanosleep, clock_nanosleep);

View File

@ -2,7 +2,4 @@
#define weak __attribute__((__weak__))
#define hidden __attribute__((__visibility__("hidden")))
#define weak_alias(old, new) \
extern __typeof(old) new __attribute__((__weak__, __alias__(#old)))
#define predict_false(x) __builtin_expect(x, 0)

View File

@ -2,6 +2,7 @@
.hidden __syscall
.type __syscall,@function
__syscall:
.cfi_startproc
movq %rdi,%rax
movq %rsi,%rdi
movq %rdx,%rsi
@ -11,3 +12,4 @@ __syscall:
movq 8(%rsp),%r9
syscall
ret
.cfi_endproc

View File

@ -39,7 +39,6 @@ typedef __attribute__((__aligned__(1))) uint32_t uint32_unaligned_t;
typedef __attribute__((__aligned__(1))) uint64_t uint64_unaligned_t;
//---------------------------------------------------------------------
// fast copy for different sizes
//---------------------------------------------------------------------
@ -694,4 +693,3 @@ static INLINE void* memcpy_fast(void *destination, const void *source, size_t si
#endif

View File

@ -8,6 +8,7 @@ add_library (mysqlxx
src/Row.cpp
src/Value.cpp
src/Pool.cpp
src/PoolFactory.cpp
src/PoolWithFailover.cpp
include/mysqlxx/Connection.h
@ -15,6 +16,7 @@ add_library (mysqlxx
include/mysqlxx/mysqlxx.h
include/mysqlxx/Null.h
include/mysqlxx/Pool.h
include/mysqlxx/PoolFactory.h
include/mysqlxx/PoolWithFailover.h
include/mysqlxx/Query.h
include/mysqlxx/ResultBase.h

View File

@ -198,6 +198,8 @@ public:
return description;
}
void removeConnection(Connection * data);
protected:
/// Number of MySQL connections which are created at launch.
unsigned default_connections;

View File

@ -0,0 +1,55 @@
#pragma once
#include <mutex>
#include <memory>
#include <boost/noncopyable.hpp>
#include <mysqlxx/PoolWithFailover.h>
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
namespace mysqlxx
{
/*
* PoolFactory.h
* This class is a helper singleton to mutualize connections to MySQL.
*/
class PoolFactory final : private boost::noncopyable
{
public:
static PoolFactory & instance();
PoolFactory(const PoolFactory &) = delete;
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
/** Allocates a PoolWithFailover to connect to MySQL. */
PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
void reset();
~PoolFactory() = default;
PoolFactory& operator=(const PoolFactory &) = delete;
private:
PoolFactory();
struct Impl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -77,6 +77,10 @@ namespace mysqlxx
size_t max_tries;
/// Mutex for set of replicas.
std::mutex mutex;
std::string config_name;
/// Can the Pool be shared
bool shareable;
public:
using Entry = Pool::Entry;
@ -100,8 +104,6 @@ namespace mysqlxx
PoolWithFailover(const PoolWithFailover & other);
PoolWithFailover & operator=(const PoolWithFailover &) = delete;
/** Allocates a connection to use. */
Entry Get();
};

View File

@ -23,26 +23,26 @@ namespace mysqlxx
class ResultBase;
/** Представляет одно значение, считанное из MySQL.
* Объект сам не хранит данные, а является всего лишь обёрткой над парой (const char *, size_t).
* Если уничтожить UseQueryResult/StoreQueryResult или Connection,
* или считать следующий Row при использовании UseQueryResult, то объект станет некорректным.
* Позволяет преобразовать значение (распарсить) в различные типы данных:
* - с помощью функций вида getUInt(), getString(), ... (рекомендуется);
* - с помощью шаблонной функции get<Type>(), которая специализирована для многих типов (для шаблонного кода);
* - шаблонная функция get<Type> работает также для всех типов, у которых есть конструктор из Value
* (это сделано для возможности расширения);
* - с помощью operator Type() - но этот метод реализован лишь для совместимости и не рекомендуется
* к использованию, так как неудобен (часто возникают неоднозначности).
/** Represents a single value read from MySQL.
* It doesn't owns the value. It's just a wrapper of a pair (const char *, size_t).
* If the UseQueryResult/StoreQueryResult or Connection is destroyed,
* or you have read the next Row while using UseQueryResult, then the object is invalidated.
* Allows to transform (parse) the value to various data types:
* - with getUInt(), getString(), ... (recommended);
* - with template function get<Type>() that is specialized for multiple data types;
* - the template function get<Type> also works for all types that can be constructed from Value
* (it is an extension point);
* - with operator Type() - this is done for compatibility and not recommended because ambiguities possible.
*
* При ошибке парсинга, выкидывается исключение.
* При попытке достать значение, которое равно nullptr, выкидывается исключение
* - используйте метод isNull() для проверки.
* On parsing error, exception is thrown.
* When trying to extract a value that is nullptr, exception is thrown
* - use isNull() method to check.
*
* Во всех распространённых системах, time_t - это всего лишь typedef от Int64 или Int32.
* Для того, чтобы можно было писать row[0].get<time_t>(), ожидая, что значение вида '2011-01-01 00:00:00'
* корректно распарсится согласно текущей тайм-зоне, сделано так, что метод getUInt и соответствующие методы get<>()
* также умеют парсить дату и дату-время.
* As time_t is just an alias for integer data type
* to allow to write row[0].get<time_t>(), and expect that the values like '2011-01-01 00:00:00'
* will be successfully parsed according to the current time zone,
* the getUInt method and the corresponding get<>() methods
* are capable of parsing Date and DateTime.
*/
class Value
{
@ -166,7 +166,7 @@ private:
else
throwException("Cannot parse DateTime");
return 0; /// чтобы не было warning-а.
return 0; /// avoid warning.
}
@ -184,7 +184,7 @@ private:
else
throwException("Cannot parse Date");
return 0; /// чтобы не было warning-а.
return 0; /// avoid warning.
}
@ -231,7 +231,7 @@ private:
double readFloatText(const char * buf, size_t length) const;
/// Выкинуть исключение с подробной информацией
void throwException(const char * text) const;
[[noreturn]] void throwException(const char * text) const;
};

View File

@ -22,15 +22,20 @@ void Pool::Entry::incrementRefCount()
if (!data)
return;
++data->ref_count;
mysql_thread_init();
if (data->ref_count == 1)
mysql_thread_init();
}
void Pool::Entry::decrementRefCount()
{
if (!data)
return;
--data->ref_count;
mysql_thread_end();
if (data->ref_count > 0)
{
--data->ref_count;
if (data->ref_count == 0)
mysql_thread_end();
}
}
@ -169,14 +174,24 @@ Pool::Entry Pool::tryGet()
return Entry();
}
void Pool::removeConnection(Connection* connection)
{
std::lock_guard<std::mutex> lock(mutex);
if (connection)
{
if (connection->ref_count > 0)
{
connection->conn.disconnect();
connection->ref_count = 0;
}
connections.remove(connection);
}
}
void Pool::Entry::disconnect()
{
if (data)
{
decrementRefCount();
data->conn.disconnect();
}
pool->removeConnection(data);
}

View File

@ -0,0 +1,122 @@
#include <mysqlxx/PoolFactory.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace mysqlxx
{
struct PoolFactory::Impl
{
// Cache of already affected pools identified by their config name
std::map<std::string, std::shared_ptr<PoolWithFailover>> pools;
// Cache of Pool ID (host + port + user +...) cibling already established shareable pool
std::map<std::string, std::string> pools_by_ids;
/// Protect pools and pools_by_ids caches
std::mutex mutex;
};
PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections,
unsigned max_connections, size_t max_tries)
{
return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
}
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name)
{
bool shared = config.getBool(config_name + ".share_connection", false);
// Not shared no need to generate a name the pool won't be stored
if (!shared)
return "";
std::string entry_name = "";
std::string host = config.getString(config_name + ".host", "");
std::string port = config.getString(config_name + ".port", "");
std::string user = config.getString(config_name + ".user", "");
std::string db = config.getString(config_name + ".db", "");
std::string table = config.getString(config_name + ".table", "");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
if (config.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_name, replica_keys);
for (const auto & replica_config_key : replica_keys)
{
/// There could be another elements in the same level in configuration file, like "user", "port"...
if (startsWith(replica_config_key, "replica"))
{
std::string replica_name = config_name + "." + replica_config_key;
std::string tmp_host = config.getString(replica_name + ".host", host);
std::string tmp_port = config.getString(replica_name + ".port", port);
std::string tmp_user = config.getString(replica_name + ".user", user);
entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db;
}
}
}
else
{
entry_name = user + "@" + host + ":" + port + "/" + db;
}
return entry_name;
}
PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
{
std::lock_guard<std::mutex> lock(impl->mutex);
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
{
return *(entry->second.get());
}
else
{
std::string entry_name = getPoolEntryName(config, config_name);
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
{
entry = impl->pools.find(id->second);
std::shared_ptr<PoolWithFailover> pool = entry->second;
impl->pools.insert_or_assign(config_name, pool);
return *pool;
}
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
// Check the pool will be shared
if (!entry_name.empty())
{
// Store shared pool
impl->pools.insert_or_assign(config_name, pool);
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
}
return *(pool.get());
}
}
void PoolFactory::reset()
{
std::lock_guard<std::mutex> lock(impl->mutex);
impl->pools.clear();
impl->pools_by_ids.clear();
}
PoolFactory::PoolFactory() : impl(std::make_unique<PoolFactory::Impl>()) {}
PoolFactory & PoolFactory::instance()
{
static PoolFactory ret;
return ret;
}
}

View File

@ -15,6 +15,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
const unsigned max_connections, const size_t max_tries)
: max_tries(max_tries)
{
shareable = cfg.getBool(config_name + ".share_connection", false);
if (cfg.has(config_name + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
@ -48,15 +49,22 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign
{}
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
: max_tries{other.max_tries}, config_name{other.config_name}, shareable{other.shareable}
{
for (const auto & priority_replicas : other.replicas_by_priority)
if (shareable)
{
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
replicas.emplace_back(std::make_shared<Pool>(*pool));
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
replicas_by_priority = other.replicas_by_priority;
}
else
{
for (const auto & priority_replicas : other.replicas_by_priority)
{
Replicas replicas;
replicas.reserve(priority_replicas.second.size());
for (const auto & pool : priority_replicas.second)
replicas.emplace_back(std::make_shared<Pool>(*pool));
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
}
}
}
@ -81,7 +89,7 @@ PoolWithFailover::Entry PoolWithFailover::Get()
try
{
Entry entry = pool->tryGet();
Entry entry = shareable ? pool->Get() : pool->tryGet();
if (!entry.isNull())
{

View File

@ -15,9 +15,8 @@ if (USE_INTERNAL_H3_LIBRARY)
set (H3_LIBRARY h3)
set (H3_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/h3/src/h3lib/include)
elseif (NOT MISSING_INTERNAL_H3_LIBRARY)
set (H3_INCLUDE_PATHS /usr/local/include/h3)
find_library (H3_LIBRARY h3)
find_path (H3_INCLUDE_DIR NAMES h3api.h PATHS ${H3_INCLUDE_PATHS})
find_path (H3_INCLUDE_DIR NAMES h3/h3api.h PATHS ${H3_INCLUDE_PATHS})
endif ()
if (H3_LIBRARY AND H3_INCLUDE_DIR)

View File

@ -29,11 +29,7 @@ if (NOT ZLIB_FOUND AND NOT MISSING_INTERNAL_ZLIB_LIBRARY)
set (ZLIB_INCLUDE_DIRS ${ZLIB_INCLUDE_DIR}) # for poco
set (ZLIB_INCLUDE_DIRECTORIES ${ZLIB_INCLUDE_DIR}) # for protobuf
set (ZLIB_FOUND 1) # for poco
if (USE_STATIC_LIBRARIES)
set (ZLIB_LIBRARIES zlibstatic CACHE INTERNAL "")
else ()
set (ZLIB_LIBRARIES zlib CACHE INTERNAL "")
endif ()
set (ZLIB_LIBRARIES zlib CACHE INTERNAL "")
endif ()
message (STATUS "Using ${INTERNAL_ZLIB_NAME}: ${ZLIB_INCLUDE_DIR} : ${ZLIB_LIBRARIES}")

View File

@ -32,8 +32,9 @@ else ()
find_program (GOLD_PATH NAMES "ld.gold" "gold")
endif ()
if (NOT OS_FREEBSD)
# We prefer LLD linker over Gold or BFD.
if (NOT LINKER_NAME)
if (NOT LINKER_NAME)
if (LLD_PATH)
if (COMPILER_GCC)
# GCC driver requires one of supported linker names like "lld".
@ -43,9 +44,9 @@ if (NOT LINKER_NAME)
set (LINKER_NAME ${LLD_PATH})
endif ()
endif ()
endif ()
endif ()
if (NOT LINKER_NAME)
if (NOT LINKER_NAME)
if (GOLD_PATH)
if (COMPILER_GCC)
set (LINKER_NAME "gold")
@ -53,6 +54,7 @@ if (NOT LINKER_NAME)
set (LINKER_NAME ${GOLD_PATH})
endif ()
endif ()
endif ()
endif ()
if (LINKER_NAME)

View File

@ -49,7 +49,6 @@ if (USE_INTERNAL_BTRIE_LIBRARY)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY)
unset (BUILD_SHARED_LIBS CACHE)
set (ZLIB_ENABLE_TESTS 0 CACHE INTERNAL "")
set (SKIP_INSTALL_ALL 1 CACHE INTERNAL "")
set (ZLIB_COMPAT 1 CACHE INTERNAL "") # also enables WITH_GZFILEOP
@ -65,10 +64,14 @@ if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (${INTERNAL_ZLIB_NAME})
# We should use same defines when including zlib.h as used when zlib compiled
target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
endif ()
if (ARCH_AMD64 OR ARCH_AARCH64)
target_compile_definitions (zlib PUBLIC X86_64 UNALIGNED_OK)
target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK)
target_compile_definitions (zlib PUBLIC X86_64 UNALIGNED_OK)
if (TARGET zlibstatic)
target_compile_definitions (zlibstatic PUBLIC X86_64 UNALIGNED_OK)
endif ()
endif ()
endif ()
@ -111,7 +114,7 @@ function(mysql_support)
endif()
if (USE_INTERNAL_ZLIB_LIBRARY)
set(ZLIB_FOUND ON)
set(ZLIB_LIBRARY zlibstatic)
set(ZLIB_LIBRARY ${ZLIB_LIBRARIES})
set(WITH_EXTERNAL_ZLIB ON)
endif()
add_subdirectory (mariadb-connector-c)
@ -143,9 +146,6 @@ endif ()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
@ -254,6 +254,7 @@ if (USE_EMBEDDED_COMPILER AND USE_INTERNAL_LLVM_LIBRARY)
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")
set (LLVM_TARGETS_TO_BUILD "X86;AArch64" CACHE STRING "")
add_subdirectory (llvm/llvm)
target_include_directories(LLVMSupport SYSTEM BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
endif ()
if (USE_INTERNAL_LIBGSASL_LIBRARY)

View File

@ -348,6 +348,7 @@ if (ARROW_WITH_ZLIB)
endif ()
if (ARROW_WITH_ZSTD)
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${ZSTD_LIBRARY})
target_include_directories(${ARROW_LIBRARY} SYSTEM BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
endif ()
target_include_directories(${ARROW_LIBRARY} PRIVATE SYSTEM ${ORC_INCLUDE_DIR})

2
contrib/base64 vendored

@ -1 +1 @@
Subproject commit 5257626d2be17a3eb23f79be17fe55ebba394ad2
Subproject commit 95ba56a9b041f9933f5cd2bbb2ee4e083468c20a

View File

@ -41,4 +41,5 @@ endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_boost_lib(iostreams)
target_link_libraries(boost_iostreams_internal PUBLIC ${ZLIB_LIBRARIES})
target_include_directories(boost_iostreams_internal SYSTEM BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
endif()

View File

@ -52,11 +52,11 @@ set(SRCS
)
add_library(libxml2 ${SRCS})
target_link_libraries(libxml2 PRIVATE ${ZLIB_LIBRARIES} ${CMAKE_DL_LIBS})
target_link_libraries(libxml2 PRIVATE ${ZLIB_LIBRARIES})
if(M_LIBRARY)
target_link_libraries(libxml2 PRIVATE ${M_LIBRARY})
endif()
target_include_directories(libxml2 PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/include)
target_include_directories(libxml2 PUBLIC ${LIBXML2_SOURCE_DIR}/include)
target_include_directories(libxml2 PRIVATE ${ZLIB_INCLUDE_DIR}/include)
target_include_directories(libxml2 SYSTEM BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})

2
contrib/llvm vendored

@ -1 +1 @@
Subproject commit 778c297395b4a2dfd60e13969a0f9488bf2c16cf
Subproject commit 5dab18f4861677548b8f7f6815f49384480ecead

@ -1 +1 @@
Subproject commit 18016300b00825a3fcbc6fb2aa37ac3e51416f71
Subproject commit 3f512fedf0ba0f769a1b4852b4bac542d92c5b20

2
contrib/openssl vendored

@ -1 +1 @@
Subproject commit c74e7895eb0d219d4007775eec134dd2bcd9d1ae
Subproject commit debbae80cb44de55fd8040fdfbe4b506601ff2a6

View File

@ -91,6 +91,7 @@ elseif (ARCH_AARCH64)
perl_generate_asm(${OPENSSL_SOURCE_DIR}/crypto/sha/asm/sha1-armv8.pl ${OPENSSL_BINARY_DIR}/crypto/sha/sha1-armv8.S)
perl_generate_asm(${OPENSSL_SOURCE_DIR}/crypto/sha/asm/sha512-armv8.pl ${OPENSSL_BINARY_DIR}/crypto/sha/sha256-armv8.S) # This is not a mistake
perl_generate_asm(${OPENSSL_SOURCE_DIR}/crypto/sha/asm/sha512-armv8.pl ${OPENSSL_BINARY_DIR}/crypto/sha/sha512-armv8.S)
perl_generate_asm(${OPENSSL_SOURCE_DIR}/crypto/modes/asm/aes-gcm-armv8_64.pl ${OPENSSL_BINARY_DIR}/crypto/modes/asm/aes-gcm-armv8_64.S)
endif ()
@ -188,6 +189,7 @@ ${OPENSSL_SOURCE_DIR}/crypto/bio/bf_buff.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bf_lbuf.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bf_nbio.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bf_null.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bf_prefix.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bio_cb.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bio_err.c
${OPENSSL_SOURCE_DIR}/crypto/bio/bio_lib.c
@ -320,6 +322,7 @@ ${OPENSSL_SOURCE_DIR}/crypto/dh/dh_check.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_depr.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_err.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_gen.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_group_params.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_kdf.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_key.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_lib.c
@ -327,7 +330,7 @@ ${OPENSSL_SOURCE_DIR}/crypto/dh/dh_meth.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_pmeth.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_prn.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_rfc5114.c
${OPENSSL_SOURCE_DIR}/crypto/dh/dh_rfc7919.c
${OPENSSL_SOURCE_DIR}/crypto/dsa/dsa_aid.c
${OPENSSL_SOURCE_DIR}/crypto/dsa/dsa_ameth.c
${OPENSSL_SOURCE_DIR}/crypto/dsa/dsa_asn1.c
${OPENSSL_SOURCE_DIR}/crypto/dsa/dsa_depr.c
@ -464,10 +467,10 @@ ${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_md5.c
${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_md5_sha1.c
${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_mdc2.c
${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_sha.c
${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_ripemd.c
${OPENSSL_SOURCE_DIR}/crypto/evp/legacy_wp.c
${OPENSSL_SOURCE_DIR}/crypto/evp/m_null.c
${OPENSSL_SOURCE_DIR}/crypto/evp/m_ripemd.c
${OPENSSL_SOURCE_DIR}/crypto/evp/m_sigver.c
${OPENSSL_SOURCE_DIR}/crypto/evp/m_wp.c
${OPENSSL_SOURCE_DIR}/crypto/evp/mac_lib.c
${OPENSSL_SOURCE_DIR}/crypto/evp/mac_meth.c
${OPENSSL_SOURCE_DIR}/crypto/evp/names.c
@ -486,6 +489,8 @@ ${OPENSSL_SOURCE_DIR}/crypto/evp/pkey_mac.c
${OPENSSL_SOURCE_DIR}/crypto/evp/pmeth_fn.c
${OPENSSL_SOURCE_DIR}/crypto/evp/pmeth_gn.c
${OPENSSL_SOURCE_DIR}/crypto/evp/pmeth_lib.c
${OPENSSL_SOURCE_DIR}/crypto/evp/signature.c
${OPENSSL_SOURCE_DIR}/crypto/ffc/ffc_params.c
${OPENSSL_SOURCE_DIR}/crypto/hmac/hm_ameth.c
${OPENSSL_SOURCE_DIR}/crypto/hmac/hmac.c
${OPENSSL_SOURCE_DIR}/crypto/idea/i_cbc.c
@ -529,6 +534,7 @@ ${OPENSSL_SOURCE_DIR}/crypto/provider_conf.c
${OPENSSL_SOURCE_DIR}/crypto/provider_core.c
${OPENSSL_SOURCE_DIR}/crypto/provider_predefined.c
${OPENSSL_SOURCE_DIR}/crypto/sparse_array.c
${OPENSSL_SOURCE_DIR}/crypto/self_test_core.c
${OPENSSL_SOURCE_DIR}/crypto/threads_none.c
${OPENSSL_SOURCE_DIR}/crypto/threads_pthread.c
${OPENSSL_SOURCE_DIR}/crypto/threads_win.c
@ -673,8 +679,8 @@ ${OPENSSL_SOURCE_DIR}/crypto/sm2/sm2_crypt.c
${OPENSSL_SOURCE_DIR}/crypto/sm2/sm2_err.c
${OPENSSL_SOURCE_DIR}/crypto/sm2/sm2_pmeth.c
${OPENSSL_SOURCE_DIR}/crypto/sm2/sm2_sign.c
${OPENSSL_SOURCE_DIR}/crypto/sm3/m_sm3.c
${OPENSSL_SOURCE_DIR}/crypto/sm3/sm3.c
${OPENSSL_SOURCE_DIR}/crypto/sm3/legacy_sm3.c
${OPENSSL_SOURCE_DIR}/crypto/sm4/sm4.c
${OPENSSL_SOURCE_DIR}/crypto/srp/srp_lib.c
${OPENSSL_SOURCE_DIR}/crypto/srp/srp_vfy.c
@ -779,7 +785,11 @@ ${OPENSSL_SOURCE_DIR}/crypto/x509/x_x509.c
${OPENSSL_SOURCE_DIR}/crypto/x509/x_x509a.c
${OPENSSL_SOURCE_DIR}/providers/implementations/asymciphers/rsa_enc.c
${OPENSSL_SOURCE_DIR}/providers/defltprov.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_null.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_cbc_hmac_sha.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_cbc_hmac_sha1_hw.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_cbc_hmac_sha256_hw.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_ccm.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_ccm_hw.c
${OPENSSL_SOURCE_DIR}/providers/implementations/ciphers/cipher_aes_gcm.c
@ -944,7 +954,8 @@ elseif (ARCH_AARCH64)
${OPENSSL_BINARY_DIR}/crypto/sha/keccak1600-armv8.S
${OPENSSL_BINARY_DIR}/crypto/sha/sha1-armv8.S
${OPENSSL_BINARY_DIR}/crypto/sha/sha256-armv8.S
${OPENSSL_BINARY_DIR}/crypto/sha/sha512-armv8.S)
${OPENSSL_BINARY_DIR}/crypto/sha/sha512-armv8.S
${OPENSSL_BINARY_DIR}/crypto/modes/asm/aes-gcm-armv8_64.S)
endif ()
set(SSL_SRCS

View File

@ -27,12 +27,7 @@ extern "C" {
# define OPENSSL_CONFIGURED_API 30000
/// This fragment was edited to avoid dependency on "getrandom" function that is not available on old libc and old Linux kernels.
/// The DEVRANDOM method is also good.
//# ifndef OPENSSL_RAND_SEED_OS
//# define OPENSSL_RAND_SEED_OS
//# endif
#define OPENSSL_RAND_SEED_OS
#define OPENSSL_RAND_SEED_DEVRANDOM
# ifndef OPENSSL_THREADS

View File

@ -27,12 +27,7 @@ extern "C" {
# define OPENSSL_CONFIGURED_API 30000
/// This fragment was edited to avoid dependency on "getrandom" function that is not available on old libc and old Linux kernels.
/// The DEVRANDOM method is also good.
//# ifndef OPENSSL_RAND_SEED_OS
//# define OPENSSL_RAND_SEED_OS
//# endif
#define OPENSSL_RAND_SEED_OS
#define OPENSSL_RAND_SEED_DEVRANDOM
# ifndef OPENSSL_THREADS

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit d805cf5ca4cf8bdc642261cfcbe7a0a241cb7298
Subproject commit 860574c93980d887a89df141edd9ca2fb0024fa3

2
contrib/replxx vendored

@ -1 +1 @@
Subproject commit 37582f0bb8c52513c6c6b76797c02d852d701dad
Subproject commit 07cbfbec550133b88c91c4073fa5af2ae2ae6a9a

View File

@ -45,7 +45,10 @@ if (ENABLE_REPLXX)
endif ()
endif ()
target_compile_options(replxx PUBLIC -Wno-documentation)
if (NOT (COMPILER_GCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9))
target_compile_options(replxx PUBLIC -Wno-documentation)
endif ()
target_compile_definitions(replxx PUBLIC USE_REPLXX=1)
message (STATUS "Using replxx")

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit 60916318f76432b5d04814c2af50d04ec15664ad
Subproject commit 560f0742cc0895d00d78359dbdeb82064a24adb8

View File

@ -1,14 +1,13 @@
set(SIMDJSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/simdjson/include")
set(SIMDJSON_SRC_DIR "${SIMDJSON_INCLUDE_DIR}/../src")
set(SIMDJSON_SRC
${SIMDJSON_SRC_DIR}/document.cpp
${SIMDJSON_SRC_DIR}/error.cpp
${SIMDJSON_SRC_DIR}/implementation.cpp
${SIMDJSON_SRC_DIR}/jsonioutil.cpp
${SIMDJSON_SRC_DIR}/jsonminifier.cpp
${SIMDJSON_SRC_DIR}/jsonparser.cpp
${SIMDJSON_SRC_DIR}/stage1_find_marks.cpp
${SIMDJSON_SRC_DIR}/stage2_build_tape.cpp
${SIMDJSON_SRC_DIR}/parsedjson.cpp
${SIMDJSON_SRC_DIR}/parsedjsoniterator.cpp
${SIMDJSON_SRC_DIR}/simdjson.cpp
)
add_library(${SIMDJSON_LIBRARY} ${SIMDJSON_SRC})

View File

@ -32,7 +32,6 @@ target_include_directories(ltdl PUBLIC ${ODBC_SOURCE_DIR}/libltdl/libltdl)
target_compile_definitions(ltdl PRIVATE -DHAVE_CONFIG_H -DLTDL -DLTDLOPEN=libltdlc)
target_compile_options(ltdl PRIVATE -Wno-constant-logical-operand -Wno-unknown-warning-option -O2)
target_link_libraries(ltdl PRIVATE ${CMAKE_DL_LIBS})
set(SRCS

View File

@ -176,8 +176,12 @@ elseif (COMPILER_GCC)
add_cxx_compile_options(-Wsizeof-array-argument)
# Warn for suspicious length parameters to certain string and memory built-in functions if the argument uses sizeof
add_cxx_compile_options(-Wsizeof-pointer-memaccess)
# Warn about overriding virtual functions that are not marked with the override keyword
add_cxx_compile_options(-Wsuggest-override)
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 9)
# Warn about overriding virtual functions that are not marked with the override keyword
add_cxx_compile_options(-Wsuggest-override)
endif ()
# Warn whenever a switch statement has an index of boolean type and the case values are outside the range of a boolean type
add_cxx_compile_options(-Wswitch-bool)
# Warn if a self-comparison always evaluates to true or false

View File

@ -47,6 +47,7 @@ using Ports = std::vector<UInt16>;
namespace ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
extern const int BAD_ARGUMENTS;
extern const int EMPTY_DATA_PASSED;
}
@ -58,11 +59,11 @@ public:
bool cumulative_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, size_t confidence_, const Settings & settings_)
const String & json_path_, size_t confidence_, const String & query_id_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
confidence(confidence_), json_path(json_path_), settings(settings_),
json_path(json_path_), confidence(confidence_), query_id(query_id_), settings(settings_),
global_context(Context::createGlobal()), pool(concurrency)
{
const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
@ -144,8 +145,9 @@ private:
bool cumulative;
size_t max_iterations;
double max_time;
size_t confidence;
String json_path;
size_t confidence;
std::string query_id;
Settings settings;
Context global_context;
QueryProcessingStage::Enum query_processing_stage;
@ -366,6 +368,8 @@ private:
RemoteBlockInputStream stream(
*(*connection_entries[connection_index]),
query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage);
if (!query_id.empty())
stream.setQueryId(query_id);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
@ -534,6 +538,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("database", value<std::string>()->default_value("default"), "")
("stacktrace", "print stack traces of exceptions")
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
("query_id", value<std::string>()->default_value(""), "")
;
Settings settings;
@ -572,6 +577,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["timelimit"].as<double>(),
options["json"].as<std::string>(),
options["confidence"].as<size_t>(),
options["query_id"].as<std::string>(),
settings);
return benchmark.run();
}

View File

@ -4,6 +4,8 @@
#if USE_REPLXX
# include <common/ReplxxLineReader.h>
#elif USE_READLINE
# include <common/ReadlineLineReader.h>
#else
# include <common/LineReader.h>
#endif
@ -99,14 +101,11 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int NO_DATA_TO_INSERT;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_READ_HISTORY;
extern const int CANNOT_APPEND_HISTORY;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNEXPECTED_PACKET_FROM_SERVER;
extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int SYSTEM_ERROR;
extern const int INVALID_USAGE_OF_INPUT;
extern const int DEADLOCK_AVOIDED;
}
@ -484,8 +483,12 @@ private:
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
if (server_revision >= Suggest::MIN_SERVER_REVISION && !config().getBool("disable_suggestion", false))
{
if (config().has("case_insensitive_suggestion"))
Suggest::instance().setCaseInsensitive();
/// Load suggestion data from the server.
Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
}
/// Load command history if present.
if (config().has("history_file"))
@ -504,10 +507,18 @@ private:
#if USE_REPLXX
ReplxxLineReader lr(Suggest::instance(), history_file, '\\', config().has("multiline") ? ';' : 0);
#elif USE_READLINE
ReadlineLineReader lr(Suggest::instance(), history_file, '\\', config().has("multiline") ? ';' : 0);
#else
LineReader lr(history_file, '\\', config().has("multiline") ? ';' : 0);
#endif
/// Enable bracketed-paste-mode only when multiquery is enabled and multiline is
/// disabled, so that we are able to paste and execute multiline queries in a whole
/// instead of erroring out, while be less intrusive.
if (config().has("multiquery") && !config().has("multiline"))
lr.enableBracketedPaste();
do
{
auto input = lr.readLine(prompt(), ":-] ");
@ -896,9 +907,34 @@ private:
query = serializeAST(*parsed_query);
}
connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
sendExternalTables();
receiveResult();
static constexpr size_t max_retries = 10;
for (size_t retry = 0; retry < max_retries; ++retry)
{
try
{
connection->sendQuery(
connection_parameters.timeouts,
query,
query_id,
QueryProcessingStage::Complete,
&context.getSettingsRef(),
nullptr,
true);
sendExternalTables();
receiveResult();
break;
}
catch (const Exception & e)
{
/// Retry when the server said "Client should retry" and no rows has been received yet.
if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries)
continue;
throw;
}
}
}
@ -1678,6 +1714,7 @@ public:
("always_load_suggestion_data", "Load suggestion data even if clickhouse-client is run in non-interactive mode. Used for testing.")
("suggestion_limit", po::value<int>()->default_value(10000),
"Suggestion limit for how many databases, tables and columns to fetch.")
("case_insensitive_suggestion", "Case sensitive suggestions.")
("multiline,m", "multiline")
("multiquery,n", "multiquery")
("format,f", po::value<std::string>(), "default output format")

View File

@ -5,33 +5,62 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DEADLOCK_AVOIDED;
}
void Suggest::load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
{
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
{
try
for (size_t retry = 0; retry < 10; ++retry)
{
Connection connection(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"client",
connection_parameters.compression,
connection_parameters.security);
try
{
Connection connection(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"client",
connection_parameters.compression,
connection_parameters.security);
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
}
catch (...)
{
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
}
catch (const Exception & e)
{
/// Retry when the server said "Client should retry".
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
continue;
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
}
catch (...)
{
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
}
break;
}
/// Note that keyword suggestions are available even if we cannot load data from server.
std::sort(words.begin(), words.end());
if (case_insensitive)
std::sort(words.begin(), words.end(), [](const std::string & str1, const std::string & str2)
{
return std::lexicographical_compare(begin(str1), end(str1), begin(str2), end(str2), [](const char char1, const char char2)
{
return std::tolower(char1) < std::tolower(char2);
});
});
else
std::sort(words.begin(), words.end());
ready = true;
});
}

View File

@ -12,7 +12,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
}
class Suggest : public LineReader::Suggest, boost::noncopyable
@ -24,6 +23,9 @@ public:
return instance;
}
/// Need to set before load
void setCaseInsensitive() { case_insensitive = true; }
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit);
/// Older server versions cannot execute the query above.

View File

@ -6,9 +6,17 @@
#include <Common/ZooKeeper/KeeperException.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int UNFINISHED;
extern const int BAD_ARGUMENTS;
}
void ClusterCopier::init()
{
auto zookeeper = context.getZooKeeper();

View File

@ -2,6 +2,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
ConfigurationPtr getConfigurationFromXMLString(const std::string & xml_data)
{

View File

@ -74,11 +74,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NO_ZOOKEEPER;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
extern const int UNFINISHED;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
@ -135,7 +131,6 @@ struct TaskStateWithOwner
};
struct ShardPriority
{
UInt8 is_remote = 1;

View File

@ -4,6 +4,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
struct TaskCluster
{

View File

@ -6,6 +6,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
struct TaskShard;

View File

@ -298,7 +298,7 @@ void LocalServer::processQueries()
try
{
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {}, {});
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {});
}
catch (...)
{

View File

@ -60,5 +60,4 @@ void StopConditionsSet::report(UInt64 value, StopConditionsSet::StopCondition &
}
}

View File

@ -41,9 +41,8 @@ namespace DB
namespace ErrorCodes
{
extern const int READONLY;
extern const int UNKNOWN_COMPRESSION_METHOD;
extern const int LOGICAL_ERROR;
extern const int CANNOT_PARSE_TEXT;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
extern const int CANNOT_PARSE_QUOTED_STRING;
@ -274,7 +273,7 @@ void HTTPHandler::processQuery(
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
std::shared_ptr<Context> session;
std::shared_ptr<NamedSession> session;
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
@ -286,15 +285,15 @@ void HTTPHandler::processQuery(
session_timeout = parseSessionTimeout(config, params);
std::string session_check = params.get("session_check", "");
session = context.acquireSession(session_id, session_timeout, session_check == "1");
session = context.acquireNamedSession(session_id, session_timeout, session_check == "1");
context = *session;
context.setSessionContext(*session);
context = session->context;
context.setSessionContext(session->context);
}
SCOPE_EXIT({
if (session_is_set)
session->releaseSession(session_id, session_timeout);
if (session)
session->release();
});
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
@ -593,12 +592,14 @@ void HTTPHandler::processQuery(
customizeContext(context);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type, const String & format)
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
{
response.setContentType(content_type);
response.add("X-ClickHouse-Query-Id", current_query_id);
response.add("X-ClickHouse-Format", format);
},
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
response.add("X-ClickHouse-Timezone", timezone);
}
);
if (used_output.hasDelayed())
{
@ -706,7 +707,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() &&
!request.hasContentLength())
{
throw Exception("There is neither Transfer-Encoding header nor Content-Length header", ErrorCodes::HTTP_LENGTH_REQUIRED);
throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED);
}
processQuery(request, params, response, used_output);

View File

@ -36,8 +36,9 @@ using Poco::Net::SSLManager;
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
extern const int OPENSSL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
@ -281,14 +282,9 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
}
else
{
bool with_output = false;
std::function<void(const String &, const String &)> set_content_type_and_format = [&with_output](const String &, const String &) -> void
{
with_output = true;
};
String replacement_query = "select ''";
bool should_replace = false;
bool with_output = false;
// Translate query from MySQL to ClickHouse.
// This is a temporary workaround until ClickHouse supports the syntax "@@var_name".
@ -306,7 +302,13 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
ReadBufferFromString replacement(replacement_query);
Context query_context = connection_context;
executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type_and_format, {});
executeQuery(should_replace ? replacement : payload, *out, true, query_context,
[&with_output](const String &, const String &, const String &, const String &)
{
with_output = true;
}
);
if (!with_output)
packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);

View File

@ -19,7 +19,6 @@ namespace ErrorCodes
extern const int CANNOT_OPEN_FILE;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int OPENSSL_ERROR;
extern const int SYSTEM_ERROR;
}
MySQLHandlerFactory::MySQLHandlerFactory(IServer & server_)

View File

@ -44,6 +44,7 @@
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/SystemLog.cpp>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Access/AccessControlManager.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -59,6 +60,7 @@
#include "TCPHandlerFactory.h"
#include "Common/config_version.h"
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadFuzzer.h>
#include "MySQLHandlerFactory.h"
#if defined(OS_LINUX)
@ -116,7 +118,6 @@ namespace ErrorCodes
extern const int FAILED_TO_GETPWUID;
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA;
extern const int NETWORK_ERROR;
extern const int PATH_ACCESS_DENIED;
}
@ -219,6 +220,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
if (ThreadFuzzer::instance().isEffective())
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
@ -466,6 +470,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config->has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));
global_context->updateStorageConfiguration(*config);
},
/* already_loaded = */ true);
@ -494,6 +500,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
users_config_reloader->reload();
});
/// Sets a local directory storing information about access control.
std::string access_control_local_path = config().getString("access_control_path", "");
if (!access_control_local_path.empty())
global_context->getAccessControlManager().setLocalDirectory(access_control_local_path);
/// Limit on total number of concurrently executed queries.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
@ -897,6 +908,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->enableNamedSessions();
for (auto & server : servers)
server->start();
@ -1009,8 +1022,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getConfigRef(), graphite_key, async_metrics));
}
SessionCleaner session_cleaner(*global_context);
waitForTerminationRequest();
}

View File

@ -39,12 +39,13 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CLIENT_HAS_CONNECTED_TO_WRONG_PORT;
extern const int UNKNOWN_DATABASE;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_PACKET_FROM_CLIENT;
extern const int POCO_EXCEPTION;
extern const int STD_EXCEPTION;
extern const int SOCKET_TIMEOUT;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
}
@ -502,7 +503,7 @@ void TCPHandler::processOrdinaryQuery()
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
/// Some time passed.
after_send_progress.restart();
sendProgress();
}
@ -538,6 +539,8 @@ void TCPHandler::processOrdinaryQuery()
}
state.io.onFinish();
sendProgress();
}
@ -545,8 +548,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
auto & pipeline = state.io.pipeline;
if (pipeline.getMaxThreads())
num_threads = std::min(num_threads, pipeline.getMaxThreads());
/// Reduce the number of threads to recommended value.
num_threads = std::min(num_threads, pipeline.getNumThreads());
/// Send header-block, to allow client to prepare output format for data to send.
{
@ -657,6 +660,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
}
state.io.onFinish();
sendProgress();
}
@ -874,48 +879,55 @@ void TCPHandler::receiveQuery()
query_context->setCurrentQueryId(state.query_id);
/// Client info
ClientInfo & client_info = query_context->getClientInfo();
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
client_info.read(*in, client_revision);
/// For better support of old clients, that does not send ClientInfo.
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
{
ClientInfo & client_info = query_context->getClientInfo();
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
client_info.read(*in, client_revision);
/// For better support of old clients, that does not send ClientInfo.
if (client_info.query_kind == ClientInfo::QueryKind::NO_QUERY)
{
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.client_name = client_name;
client_info.client_version_major = client_version_major;
client_info.client_version_minor = client_version_minor;
client_info.client_version_patch = client_version_patch;
client_info.client_revision = client_revision;
}
/// Set fields, that are known apriori.
client_info.interface = ClientInfo::Interface::TCP;
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// 'Current' fields was set at receiveHello.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
else
{
query_context->setInitialRowPolicy();
}
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.client_name = client_name;
client_info.client_version_major = client_version_major;
client_info.client_version_minor = client_version_minor;
client_info.client_version_patch = client_version_patch;
client_info.client_revision = client_revision;
}
/// Per query settings.
Settings custom_settings{};
/// Set fields, that are known apriori.
client_info.interface = ClientInfo::Interface::TCP;
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// 'Current' fields was set at receiveHello.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
else
{
query_context->setInitialRowPolicy();
}
/// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_revision >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsBinaryFormat::STRINGS
: SettingsBinaryFormat::OLD;
custom_settings.deserialize(*in, settings_format);
auto settings_changes = custom_settings.changes();
query_context->checkSettingsConstraints(settings_changes);
Settings passed_settings;
passed_settings.deserialize(*in, settings_format);
auto settings_changes = passed_settings.changes();
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
/// Throw an exception if the passed settings violate the constraints.
query_context->checkSettingsConstraints(settings_changes);
}
else
{
/// Quietly clamp to the constraints if it's not an initial query.
query_context->clampToSettingsConstraints(settings_changes);
}
query_context->applySettingsChanges(settings_changes);
Settings & settings = query_context->getSettingsRef();
const Settings & settings = query_context->getSettingsRef();
/// Sync timeouts on client and server during current query to avoid dangling queries on server
/// NOTE: We use settings.send_timeout for the receive timeout and vice versa (change arguments ordering in TimeoutSetter),

View File

@ -0,0 +1,3 @@
<yandex>
<listen_host>::</listen_host>
</yandex>

View File

@ -0,0 +1,9 @@
<yandex>
<https_port>8443</https_port>
<tcp_port_secure>9440</tcp_port_secure>
<openSSL>
<server>
<dhParamsFile remove="remove"/>
</server>
</openSSL>
</yandex>

View File

@ -3,25 +3,6 @@
NOTE: User and query level settings are set up in "users.xml" file.
-->
<yandex>
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
If this section is not present in configuration, all hosts are allowed.
-->
<remote_url_allow_hosts>
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
If port is explicitly specified in URL, the host:port is checked as a whole.
If host specified here without port, any port with this host allowed.
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
-->
<!-- Regular expression can be specified. RE2 engine is used for regexps.
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
(forgetting to do so is a common source of error).
-->
</remote_url_allow_hosts>
<logger>
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
<level>trace</level>
@ -250,6 +231,24 @@
</test_unavailable_shard>
</remote_servers>
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
If this section is not present in configuration, all hosts are allowed.
-->
<remote_url_allow_hosts>
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
If port is explicitly specified in URL, the host:port is checked as a whole.
If host specified here without port, any port with this host allowed.
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
-->
<!-- Regular expression can be specified. RE2 engine is used for regexps.
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
(forgetting to do so is a common source of error).
-->
</remote_url_allow_hosts>
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.
By default, path to file with substitutions is /etc/metrika.xml. It could be changed in config in 'include_from' element.

View File

@ -2,6 +2,7 @@
#include <Access/MultipleAccessStorage.h>
#include <Access/MemoryAccessStorage.h>
#include <Access/UsersConfigAccessStorage.h>
#include <Access/DiskAccessStorage.h>
#include <Access/AccessRightsContextFactory.h>
#include <Access/RoleContextFactory.h>
#include <Access/RowPolicyContextFactory.h>
@ -15,10 +16,14 @@ namespace
std::vector<std::unique_ptr<IAccessStorage>> createStorages()
{
std::vector<std::unique_ptr<IAccessStorage>> list;
list.emplace_back(std::make_unique<MemoryAccessStorage>());
list.emplace_back(std::make_unique<DiskAccessStorage>());
list.emplace_back(std::make_unique<UsersConfigAccessStorage>());
list.emplace_back(std::make_unique<MemoryAccessStorage>());
return list;
}
constexpr size_t DISK_ACCESS_STORAGE_INDEX = 0;
constexpr size_t USERS_CONFIG_ACCESS_STORAGE_INDEX = 1;
}
@ -37,10 +42,17 @@ AccessControlManager::~AccessControlManager()
}
void AccessControlManager::loadFromConfig(const Poco::Util::AbstractConfiguration & users_config)
void AccessControlManager::setLocalDirectory(const String & directory_path)
{
auto & users_config_access_storage = dynamic_cast<UsersConfigAccessStorage &>(getStorageByIndex(1));
users_config_access_storage.loadFromConfig(users_config);
auto & disk_access_storage = dynamic_cast<DiskAccessStorage &>(getStorageByIndex(DISK_ACCESS_STORAGE_INDEX));
disk_access_storage.setDirectory(directory_path);
}
void AccessControlManager::setUsersConfig(const Poco::Util::AbstractConfiguration & users_config)
{
auto & users_config_access_storage = dynamic_cast<UsersConfigAccessStorage &>(getStorageByIndex(USERS_CONFIG_ACCESS_STORAGE_INDEX));
users_config_access_storage.setConfiguration(users_config);
}

View File

@ -45,7 +45,8 @@ public:
AccessControlManager();
~AccessControlManager();
void loadFromConfig(const Poco::Util::AbstractConfiguration & users_config);
void setLocalDirectory(const String & directory);
void setUsersConfig(const Poco::Util::AbstractConfiguration & users_config);
AccessRightsContextPtr getAccessRightsContext(
const UUID & user_id,

View File

@ -46,6 +46,13 @@ namespace
const AccessFlags create_table_flag = AccessType::CREATE_TABLE;
const AccessFlags create_temporary_table_flag = AccessType::CREATE_TEMPORARY_TABLE;
};
std::string_view checkCurrentDatabase(const std::string_view & current_database)
{
if (current_database.empty())
throw Exception("No current database", ErrorCodes::LOGICAL_ERROR);
return current_database;
}
}
@ -521,21 +528,21 @@ void AccessRights::grantImpl(const AccessRightsElement & element, std::string_vi
else if (element.any_table)
{
if (element.database.empty())
grantImpl(element.access_flags, current_database);
grantImpl(element.access_flags, checkCurrentDatabase(current_database));
else
grantImpl(element.access_flags, element.database);
}
else if (element.any_column)
{
if (element.database.empty())
grantImpl(element.access_flags, current_database, element.table);
grantImpl(element.access_flags, checkCurrentDatabase(current_database), element.table);
else
grantImpl(element.access_flags, element.database, element.table);
}
else
{
if (element.database.empty())
grantImpl(element.access_flags, current_database, element.table, element.columns);
grantImpl(element.access_flags, checkCurrentDatabase(current_database), element.table, element.columns);
else
grantImpl(element.access_flags, element.database, element.table, element.columns);
}
@ -576,21 +583,21 @@ void AccessRights::revokeImpl(const AccessRightsElement & element, std::string_v
else if (element.any_table)
{
if (element.database.empty())
revokeImpl<mode>(element.access_flags, current_database);
revokeImpl<mode>(element.access_flags, checkCurrentDatabase(current_database));
else
revokeImpl<mode>(element.access_flags, element.database);
}
else if (element.any_column)
{
if (element.database.empty())
revokeImpl<mode>(element.access_flags, current_database, element.table);
revokeImpl<mode>(element.access_flags, checkCurrentDatabase(current_database), element.table);
else
revokeImpl<mode>(element.access_flags, element.database, element.table);
}
else
{
if (element.database.empty())
revokeImpl<mode>(element.access_flags, current_database, element.table, element.columns);
revokeImpl<mode>(element.access_flags, checkCurrentDatabase(current_database), element.table, element.columns);
else
revokeImpl<mode>(element.access_flags, element.database, element.table, element.columns);
}
@ -711,21 +718,21 @@ bool AccessRights::isGrantedImpl(const AccessRightsElement & element, std::strin
else if (element.any_table)
{
if (element.database.empty())
return isGrantedImpl(element.access_flags, current_database);
return isGrantedImpl(element.access_flags, checkCurrentDatabase(current_database));
else
return isGrantedImpl(element.access_flags, element.database);
}
else if (element.any_column)
{
if (element.database.empty())
return isGrantedImpl(element.access_flags, current_database, element.table);
return isGrantedImpl(element.access_flags, checkCurrentDatabase(current_database), element.table);
else
return isGrantedImpl(element.access_flags, element.database, element.table);
}
else
{
if (element.database.empty())
return isGrantedImpl(element.access_flags, current_database, element.table, element.columns);
return isGrantedImpl(element.access_flags, checkCurrentDatabase(current_database), element.table, element.columns);
else
return isGrantedImpl(element.access_flags, element.database, element.table, element.columns);
}

View File

@ -186,20 +186,20 @@ void AccessRightsContext::setRolesInfo(const CurrentRolesInfoPtr & roles_info_)
}
void AccessRightsContext::checkPassword(const String & password) const
bool AccessRightsContext::isCorrectPassword(const String & password) const
{
std::lock_guard lock{mutex};
if (!user)
throw Exception(user_name + ": User has been dropped", ErrorCodes::UNKNOWN_USER);
user->authentication.checkPassword(password, user_name);
return false;
return user->authentication.isCorrectPassword(password);
}
void AccessRightsContext::checkHostIsAllowed() const
bool AccessRightsContext::isClientHostAllowed() const
{
std::lock_guard lock{mutex};
if (!user)
throw Exception(user_name + ": User has been dropped", ErrorCodes::UNKNOWN_USER);
user->allowed_client_hosts.checkContains(params.address, user_name);
return false;
return user->allowed_client_hosts.contains(params.address);
}

View File

@ -60,8 +60,8 @@ public:
UserPtr getUser() const;
String getUserName() const;
void checkPassword(const String & password) const;
void checkHostIsAllowed() const;
bool isCorrectPassword(const String & password) const;
bool isClientHostAllowed() const;
CurrentRolesInfoPtr getRolesInfo() const;
std::vector<UUID> getCurrentRoles() const;

View File

@ -15,7 +15,6 @@ namespace DB
namespace ErrorCodes
{
extern const int DNS_ERROR;
extern const int IP_ADDRESS_NOT_ALLOWED;
}
namespace
@ -367,16 +366,4 @@ bool AllowedClientHosts::contains(const IPAddress & client_address) const
return false;
}
void AllowedClientHosts::checkContains(const IPAddress & address, const String & user_name) const
{
if (!contains(address))
{
if (user_name.empty())
throw Exception("It's not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
else
throw Exception("User " + user_name + " is not allowed to connect from address " + address.toString(), ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
}
}
}

View File

@ -71,12 +71,12 @@ public:
/// For example, 312.234.1.1/255.255.255.0 or 2a02:6b8::3/64
void addSubnet(const IPSubnet & subnet);
void addSubnet(const String & subnet) { addSubnet(IPSubnet{subnet}); }
void addSubnet(const IPAddress & prefix, const IPAddress & mask) { addSubnet({prefix, mask}); }
void addSubnet(const IPAddress & prefix, size_t num_prefix_bits) { addSubnet({prefix, num_prefix_bits}); }
void addSubnet(const IPAddress & prefix, const IPAddress & mask) { addSubnet(IPSubnet{prefix, mask}); }
void addSubnet(const IPAddress & prefix, size_t num_prefix_bits) { addSubnet(IPSubnet{prefix, num_prefix_bits}); }
void removeSubnet(const IPSubnet & subnet);
void removeSubnet(const String & subnet) { removeSubnet(IPSubnet{subnet}); }
void removeSubnet(const IPAddress & prefix, const IPAddress & mask) { removeSubnet({prefix, mask}); }
void removeSubnet(const IPAddress & prefix, size_t num_prefix_bits) { removeSubnet({prefix, num_prefix_bits}); }
void removeSubnet(const IPAddress & prefix, const IPAddress & mask) { removeSubnet(IPSubnet{prefix, mask}); }
void removeSubnet(const IPAddress & prefix, size_t num_prefix_bits) { removeSubnet(IPSubnet{prefix, num_prefix_bits}); }
const std::vector<IPSubnet> & getSubnets() const { return subnets; }
/// Allows an exact host name. The `contains()` function will check that the provided address equals to one of that host's addresses.
@ -111,10 +111,6 @@ public:
/// Checks if the provided address is in the list. Returns false if not.
bool contains(const IPAddress & address) const;
/// Checks if the provided address is in the list. Throws an exception if not.
/// `username` is only used for generating an error message if the address isn't in the list.
void checkContains(const IPAddress & address, const String & user_name = String()) const;
friend bool operator ==(const AllowedClientHosts & lhs, const AllowedClientHosts & rhs);
friend bool operator !=(const AllowedClientHosts & lhs, const AllowedClientHosts & rhs) { return !(lhs == rhs); }

View File

@ -7,8 +7,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int REQUIRED_PASSWORD;
extern const int WRONG_PASSWORD;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -75,15 +75,4 @@ bool Authentication::isCorrectPassword(const String & password_) const
throw Exception("Unknown authentication type: " + std::to_string(static_cast<int>(type)), ErrorCodes::LOGICAL_ERROR);
}
void Authentication::checkPassword(const String & password_, const String & user_name) const
{
if (isCorrectPassword(password_))
return;
auto info_about_user_name = [&user_name]() { return user_name.empty() ? String() : " for user " + user_name; };
if (password_.empty() && (type != NO_PASSWORD))
throw Exception("Password required" + info_about_user_name(), ErrorCodes::REQUIRED_PASSWORD);
throw Exception("Wrong password" + info_about_user_name(), ErrorCodes::WRONG_PASSWORD);
}
}

View File

@ -70,10 +70,6 @@ public:
/// Checks if the provided password is correct. Returns false if not.
bool isCorrectPassword(const String & password) const;
/// Checks if the provided password is correct. Throws an exception if not.
/// `user_name` is only used for generating an error message if the password is incorrect.
void checkPassword(const String & password, const String & user_name = String()) const;
friend bool operator ==(const Authentication & lhs, const Authentication & rhs) { return (lhs.type == rhs.type) && (lhs.password_hash == rhs.password_hash); }
friend bool operator !=(const Authentication & lhs, const Authentication & rhs) { return !(lhs == rhs); }

View File

@ -0,0 +1,775 @@
#include <Access/DiskAccessStorage.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <Access/User.h>
#include <Access/Role.h>
#include <Access/RowPolicy.h>
#include <Access/Quota.h>
#include <Parsers/ASTCreateUserQuery.h>
#include <Parsers/ASTCreateRoleQuery.h>
#include <Parsers/ASTCreateRowPolicyQuery.h>
#include <Parsers/ASTCreateQuotaQuery.h>
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/ParserCreateUserQuery.h>
#include <Parsers/ParserCreateRoleQuery.h>
#include <Parsers/ParserCreateRowPolicyQuery.h>
#include <Parsers/ParserCreateQuotaQuery.h>
#include <Parsers/ParserGrantQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateUserQuery.h>
#include <Interpreters/InterpreterCreateRoleQuery.h>
#include <Interpreters/InterpreterCreateRowPolicyQuery.h>
#include <Interpreters/InterpreterCreateQuotaQuery.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/InterpreterShowCreateAccessEntityQuery.h>
#include <Interpreters/InterpreterShowGrantsQuery.h>
#include <Common/quoteString.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
#include <filesystem>
#include <fstream>
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_DOESNT_EXIST;
extern const int FILE_DOESNT_EXIST;
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION;
extern const int LOGICAL_ERROR;
}
namespace
{
/// Special parser for the 'ATTACH access entity' queries.
class ParserAttachAccessEntity : public IParserBase
{
protected:
const char * getName() const override { return "ATTACH access entity query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
{
if (ParserCreateUserQuery{}.enableAttachMode(true).parse(pos, node, expected))
return true;
if (ParserCreateRoleQuery{}.enableAttachMode(true).parse(pos, node, expected))
return true;
if (ParserCreateRowPolicyQuery{}.enableAttachMode(true).parse(pos, node, expected))
return true;
if (ParserCreateQuotaQuery{}.enableAttachMode(true).parse(pos, node, expected))
return true;
if (ParserGrantQuery{}.enableAttachMode(true).parse(pos, node, expected))
return true;
return false;
}
};
/// Reads a file containing ATTACH queries and then parses it to build an access entity.
AccessEntityPtr readAccessEntityFile(const std::filesystem::path & file_path)
{
/// Read the file.
ReadBufferFromFile in{file_path};
String file_contents;
readStringUntilEOF(file_contents, in);
/// Parse the file contents.
ASTs queries;
ParserAttachAccessEntity parser;
const char * begin = file_contents.data(); /// begin of current query
const char * pos = begin; /// parser moves pos from begin to the end of current query
const char * end = begin + file_contents.size();
while (pos < end)
{
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0));
while (isWhitespaceASCII(*pos) || *pos == ';')
++pos;
}
/// Interpret the AST to build an access entity.
std::shared_ptr<User> user;
std::shared_ptr<Role> role;
std::shared_ptr<RowPolicy> policy;
std::shared_ptr<Quota> quota;
AccessEntityPtr res;
for (const auto & query : queries)
{
if (auto create_user_query = query->as<ASTCreateUserQuery>())
{
if (res)
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = user = std::make_unique<User>();
InterpreterCreateUserQuery::updateUserFromQuery(*user, *create_user_query);
}
else if (auto create_role_query = query->as<ASTCreateRoleQuery>())
{
if (res)
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = role = std::make_unique<Role>();
InterpreterCreateRoleQuery::updateRoleFromQuery(*role, *create_role_query);
}
else if (auto create_policy_query = query->as<ASTCreateRowPolicyQuery>())
{
if (res)
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = policy = std::make_unique<RowPolicy>();
InterpreterCreateRowPolicyQuery::updateRowPolicyFromQuery(*policy, *create_policy_query);
}
else if (auto create_quota_query = query->as<ASTCreateQuotaQuery>())
{
if (res)
throw Exception("Two access entities are attached in the same file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
res = quota = std::make_unique<Quota>();
InterpreterCreateQuotaQuery::updateQuotaFromQuery(*quota, *create_quota_query);
}
else if (auto grant_query = query->as<ASTGrantQuery>())
{
if (!user && !role)
throw Exception("A user or role should be attached before grant in file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
if (user)
InterpreterGrantQuery::updateUserFromQuery(*user, *grant_query);
else
InterpreterGrantQuery::updateRoleFromQuery(*role, *grant_query);
}
else
throw Exception("Two access entities are attached in the same file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
}
if (!res)
throw Exception("No access entities attached in file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
return res;
}
/// Writes ATTACH queries for building a specified access entity to a file.
void writeAccessEntityFile(const std::filesystem::path & file_path, const IAccessEntity & entity)
{
/// Build list of ATTACH queries.
ASTs queries;
queries.push_back(InterpreterShowCreateAccessEntityQuery::getAttachQuery(entity));
if (entity.getType() == typeid(User) || entity.getType() == typeid(Role))
boost::range::push_back(queries, InterpreterShowGrantsQuery::getAttachGrantQueries(entity));
/// Serialize the list of ATTACH queries to a string.
std::stringstream ss;
for (const ASTPtr & query : queries)
ss << *query << ";\n";
String file_contents = std::move(ss).str();
/// First we save *.tmp file and then we rename if everything's ok.
auto tmp_file_path = std::filesystem::path{file_path}.replace_extension(".tmp");
bool succeeded = false;
SCOPE_EXIT(
{
if (!succeeded)
std::filesystem::remove(tmp_file_path);
});
/// Write the file.
WriteBufferFromFile out{tmp_file_path.string()};
out.write(file_contents.data(), file_contents.size());
/// Rename.
std::filesystem::rename(tmp_file_path, file_path);
succeeded = true;
}
/// Calculates the path to a file named <id>.sql for saving an access entity.
std::filesystem::path getAccessEntityFilePath(const String & directory_path, const UUID & id)
{
return std::filesystem::path(directory_path).append(toString(id)).replace_extension(".sql");
}
/// Reads a map of name of access entity to UUID for access entities of some type from a file.
std::unordered_map<String, UUID> readListFile(const std::filesystem::path & file_path)
{
ReadBufferFromFile in(file_path);
size_t num;
readVarUInt(num, in);
std::unordered_map<String, UUID> res;
res.reserve(num);
for (size_t i = 0; i != num; ++i)
{
String name;
readStringBinary(name, in);
UUID id;
readUUIDText(id, in);
res[name] = id;
}
return res;
}
/// Writes a map of name of access entity to UUID for access entities of some type to a file.
void writeListFile(const std::filesystem::path & file_path, const std::unordered_map<String, UUID> & map)
{
WriteBufferFromFile out(file_path);
writeVarUInt(map.size(), out);
for (const auto & [name, id] : map)
{
writeStringBinary(name, out);
writeUUIDText(id, out);
}
}
/// Calculates the path for storing a map of name of access entity to UUID for access entities of some type.
std::filesystem::path getListFilePath(const String & directory_path, std::type_index type)
{
std::string_view file_name;
if (type == typeid(User))
file_name = "users";
else if (type == typeid(Role))
file_name = "roles";
else if (type == typeid(Quota))
file_name = "quotas";
else if (type == typeid(RowPolicy))
file_name = "row_policies";
else
throw Exception("Unexpected type of access entity: " + IAccessEntity::getTypeName(type),
ErrorCodes::LOGICAL_ERROR);
return std::filesystem::path(directory_path).append(file_name).replace_extension(".list");
}
/// Calculates the path to a temporary file which existence means that list files are corrupted
/// and need to be rebuild.
std::filesystem::path getNeedRebuildListsMarkFilePath(const String & directory_path)
{
return std::filesystem::path(directory_path).append("need_rebuild_lists.mark");
}
static const std::vector<std::type_index> & getAllAccessEntityTypes()
{
static const std::vector<std::type_index> res = {typeid(User), typeid(Role), typeid(RowPolicy), typeid(Quota)};
return res;
}
bool tryParseUUID(const String & str, UUID & id)
{
try
{
id = parseFromString<UUID>(str);
return true;
}
catch (...)
{
return false;
}
}
}
DiskAccessStorage::DiskAccessStorage()
: IAccessStorage("disk")
{
for (const auto & type : getAllAccessEntityTypes())
name_to_id_maps[type];
}
DiskAccessStorage::~DiskAccessStorage()
{
stopListsWritingThread();
writeLists();
}
void DiskAccessStorage::setDirectory(const String & directory_path_)
{
Notifications notifications;
SCOPE_EXIT({ notify(notifications); });
std::lock_guard lock{mutex};
initialize(directory_path_, notifications);
}
void DiskAccessStorage::initialize(const String & directory_path_, Notifications & notifications)
{
auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path_);
if (initialized)
{
if (directory_path == canonical_directory_path)
return;
throw Exception("Storage " + getStorageName() + " already initialized with another directory", ErrorCodes::LOGICAL_ERROR);
}
std::filesystem::create_directories(canonical_directory_path);
if (!std::filesystem::exists(canonical_directory_path) || !std::filesystem::is_directory(canonical_directory_path))
throw Exception("Couldn't create directory " + canonical_directory_path.string(), ErrorCodes::DIRECTORY_DOESNT_EXIST);
directory_path = canonical_directory_path;
initialized = true;
bool should_rebuild_lists = std::filesystem::exists(getNeedRebuildListsMarkFilePath(directory_path));
if (!should_rebuild_lists)
{
if (!readLists())
should_rebuild_lists = true;
}
if (should_rebuild_lists)
{
rebuildLists();
writeLists();
}
for (const auto & [id, entry] : id_to_entry_map)
prepareNotifications(id, entry, false, notifications);
}
bool DiskAccessStorage::readLists()
{
assert(id_to_entry_map.empty());
assert(name_to_id_maps.size() == getAllAccessEntityTypes().size());
bool ok = true;
for (auto & [type, name_to_id_map] : name_to_id_maps)
{
auto file_path = getListFilePath(directory_path, type);
if (!std::filesystem::exists(file_path))
{
LOG_WARNING(getLogger(), "File " + file_path.string() + " doesn't exist");
ok = false;
break;
}
try
{
name_to_id_map = readListFile(file_path);
}
catch (...)
{
tryLogCurrentException(getLogger(), "Could not read " + file_path.string());
ok = false;
break;
}
for (const auto & [name, id] : name_to_id_map)
id_to_entry_map.emplace(id, Entry{name, type});
}
if (!ok)
{
id_to_entry_map.clear();
for (auto & name_to_id_map : name_to_id_maps | boost::adaptors::map_values)
name_to_id_map.clear();
}
return ok;
}
void DiskAccessStorage::writeLists()
{
if (failed_to_write_lists || types_of_lists_to_write.empty())
return; /// We don't try to write list files after the first fail.
/// The next restart of the server will invoke rebuilding of the list files.
for (const auto & type : types_of_lists_to_write)
{
const auto & name_to_id_map = name_to_id_maps.at(type);
auto file_path = getListFilePath(directory_path, type);
try
{
writeListFile(file_path, name_to_id_map);
}
catch (...)
{
tryLogCurrentException(getLogger(), "Could not write " + file_path.string());
failed_to_write_lists = true;
types_of_lists_to_write.clear();
return;
}
}
/// The list files was successfully written, we don't need the 'need_rebuild_lists.mark' file any longer.
std::filesystem::remove(getNeedRebuildListsMarkFilePath(directory_path));
types_of_lists_to_write.clear();
}
void DiskAccessStorage::scheduleWriteLists(std::type_index type)
{
if (failed_to_write_lists)
return;
bool already_scheduled = !types_of_lists_to_write.empty();
types_of_lists_to_write.insert(type);
if (already_scheduled)
return;
/// Create the 'need_rebuild_lists.mark' file.
/// This file will be used later to find out if writing lists is successful or not.
std::ofstream{getNeedRebuildListsMarkFilePath(directory_path)};
startListsWritingThread();
}
void DiskAccessStorage::startListsWritingThread()
{
if (lists_writing_thread.joinable())
{
if (!lists_writing_thread_exited)
return;
lists_writing_thread.detach();
}
lists_writing_thread_exited = false;
lists_writing_thread = ThreadFromGlobalPool{&DiskAccessStorage::listsWritingThreadFunc, this};
}
void DiskAccessStorage::stopListsWritingThread()
{
if (lists_writing_thread.joinable())
{
lists_writing_thread_should_exit.notify_one();
lists_writing_thread.join();
}
}
void DiskAccessStorage::listsWritingThreadFunc()
{
std::unique_lock lock{mutex};
SCOPE_EXIT({ lists_writing_thread_exited = true; });
/// It's better not to write the lists files too often, that's why we need
/// the following timeout.
const auto timeout = std::chrono::minutes(1);
if (lists_writing_thread_should_exit.wait_for(lock, timeout) != std::cv_status::timeout)
return; /// The destructor requires us to exit.
writeLists();
}
/// Reads and parses all the "<id>.sql" files from a specified directory
/// and then saves the files "users.list", "roles.list", etc. to the same directory.
void DiskAccessStorage::rebuildLists()
{
LOG_WARNING(getLogger(), "Recovering lists in directory " + directory_path);
assert(id_to_entry_map.empty());
for (const auto & directory_entry : std::filesystem::directory_iterator(directory_path))
{
if (!directory_entry.is_regular_file())
continue;
const auto & path = directory_entry.path();
if (path.extension() != ".sql")
continue;
UUID id;
if (!tryParseUUID(path.stem(), id))
continue;
auto entity = readAccessEntityFile(getAccessEntityFilePath(directory_path, id));
auto type = entity->getType();
auto & name_to_id_map = name_to_id_maps[type];
auto it_by_name = name_to_id_map.emplace(entity->getFullName(), id).first;
id_to_entry_map.emplace(id, Entry{it_by_name->first, type});
}
boost::range::copy(getAllAccessEntityTypes(), std::inserter(types_of_lists_to_write, types_of_lists_to_write.end()));
}
std::optional<UUID> DiskAccessStorage::findImpl(std::type_index type, const String & name) const
{
std::lock_guard lock{mutex};
const auto & name_to_id_map = name_to_id_maps.at(type);
auto it = name_to_id_map.find(name);
if (it == name_to_id_map.end())
return {};
return it->second;
}
std::vector<UUID> DiskAccessStorage::findAllImpl(std::type_index type) const
{
std::lock_guard lock{mutex};
const auto & name_to_id_map = name_to_id_maps.at(type);
std::vector<UUID> res;
res.reserve(name_to_id_map.size());
boost::range::copy(name_to_id_map | boost::adaptors::map_values, std::back_inserter(res));
return res;
}
bool DiskAccessStorage::existsImpl(const UUID & id) const
{
std::lock_guard lock{mutex};
return id_to_entry_map.contains(id);
}
AccessEntityPtr DiskAccessStorage::readImpl(const UUID & id) const
{
std::lock_guard lock{mutex};
auto it = id_to_entry_map.find(id);
if (it == id_to_entry_map.end())
throwNotFound(id);
auto & entry = it->second;
if (!entry.entity)
entry.entity = readAccessEntityFromDisk(id);
return entry.entity;
}
String DiskAccessStorage::readNameImpl(const UUID & id) const
{
std::lock_guard lock{mutex};
auto it = id_to_entry_map.find(id);
if (it == id_to_entry_map.end())
throwNotFound(id);
return String{it->second.name};
}
bool DiskAccessStorage::canInsertImpl(const AccessEntityPtr &) const
{
return initialized;
}
UUID DiskAccessStorage::insertImpl(const AccessEntityPtr & new_entity, bool replace_if_exists)
{
Notifications notifications;
SCOPE_EXIT({ notify(notifications); });
UUID id = generateRandomID();
std::lock_guard lock{mutex};
insertNoLock(generateRandomID(), new_entity, replace_if_exists, notifications);
return id;
}
void DiskAccessStorage::insertNoLock(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, Notifications & notifications)
{
const String & name = new_entity->getFullName();
std::type_index type = new_entity->getType();
if (!initialized)
throw Exception(
"Cannot insert " + new_entity->getTypeName() + " " + backQuote(name) + " to " + getStorageName()
+ " because the output directory is not set",
ErrorCodes::LOGICAL_ERROR);
/// Check that we can insert.
auto it_by_id = id_to_entry_map.find(id);
if (it_by_id != id_to_entry_map.end())
{
const auto & existing_entry = it_by_id->second;
throwIDCollisionCannotInsert(id, type, name, existing_entry.entity->getType(), existing_entry.entity->getFullName());
}
auto & name_to_id_map = name_to_id_maps.at(type);
auto it_by_name = name_to_id_map.find(name);
bool name_collision = (it_by_name != name_to_id_map.end());
if (name_collision && !replace_if_exists)
throwNameCollisionCannotInsert(type, name);
scheduleWriteLists(type);
writeAccessEntityToDisk(id, *new_entity);
if (name_collision && replace_if_exists)
removeNoLock(it_by_name->second, notifications);
/// Do insertion.
it_by_name = name_to_id_map.emplace(name, id).first;
it_by_id = id_to_entry_map.emplace(id, Entry{it_by_name->first, type}).first;
auto & entry = it_by_id->second;
entry.entity = new_entity;
prepareNotifications(id, entry, false, notifications);
}
void DiskAccessStorage::removeImpl(const UUID & id)
{
Notifications notifications;
SCOPE_EXIT({ notify(notifications); });
std::lock_guard lock{mutex};
removeNoLock(id, notifications);
}
void DiskAccessStorage::removeNoLock(const UUID & id, Notifications & notifications)
{
auto it = id_to_entry_map.find(id);
if (it == id_to_entry_map.end())
throwNotFound(id);
Entry & entry = it->second;
String name{it->second.name};
std::type_index type = it->second.type;
scheduleWriteLists(type);
deleteAccessEntityOnDisk(id);
/// Do removing.
prepareNotifications(id, entry, true, notifications);
id_to_entry_map.erase(it);
auto & name_to_id_map = name_to_id_maps.at(type);
name_to_id_map.erase(name);
}
void DiskAccessStorage::updateImpl(const UUID & id, const UpdateFunc & update_func)
{
Notifications notifications;
SCOPE_EXIT({ notify(notifications); });
std::lock_guard lock{mutex};
updateNoLock(id, update_func, notifications);
}
void DiskAccessStorage::updateNoLock(const UUID & id, const UpdateFunc & update_func, Notifications & notifications)
{
auto it = id_to_entry_map.find(id);
if (it == id_to_entry_map.end())
throwNotFound(id);
Entry & entry = it->second;
if (!entry.entity)
entry.entity = readAccessEntityFromDisk(id);
auto old_entity = entry.entity;
auto new_entity = update_func(old_entity);
if (*new_entity == *old_entity)
return;
String new_name = new_entity->getFullName();
auto old_name = entry.name;
const std::type_index type = entry.type;
bool name_changed = (new_name != old_name);
if (name_changed)
{
const auto & name_to_id_map = name_to_id_maps.at(type);
if (name_to_id_map.contains(new_name))
throwNameCollisionCannotRename(type, String{old_name}, new_name);
scheduleWriteLists(type);
}
writeAccessEntityToDisk(id, *new_entity);
entry.entity = new_entity;
if (name_changed)
{
auto & name_to_id_map = name_to_id_maps.at(type);
name_to_id_map.erase(String{old_name});
auto it_by_name = name_to_id_map.emplace(new_name, id).first;
entry.name = it_by_name->first;
}
prepareNotifications(id, entry, false, notifications);
}
AccessEntityPtr DiskAccessStorage::readAccessEntityFromDisk(const UUID & id) const
{
return readAccessEntityFile(getAccessEntityFilePath(directory_path, id));
}
void DiskAccessStorage::writeAccessEntityToDisk(const UUID & id, const IAccessEntity & entity) const
{
writeAccessEntityFile(getAccessEntityFilePath(directory_path, id), entity);
}
void DiskAccessStorage::deleteAccessEntityOnDisk(const UUID & id) const
{
auto file_path = getAccessEntityFilePath(directory_path, id);
if (!std::filesystem::remove(file_path))
throw Exception("Couldn't delete " + file_path.string(), ErrorCodes::FILE_DOESNT_EXIST);
}
void DiskAccessStorage::prepareNotifications(const UUID & id, const Entry & entry, bool remove, Notifications & notifications) const
{
if (!remove && !entry.entity)
return;
const AccessEntityPtr entity = remove ? nullptr : entry.entity;
for (const auto & handler : entry.handlers_by_id)
notifications.push_back({handler, id, entity});
auto range = handlers_by_type.equal_range(entry.type);
for (auto it = range.first; it != range.second; ++it)
notifications.push_back({it->second, id, entity});
}
ext::scope_guard DiskAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
{
std::lock_guard lock{mutex};
auto it = id_to_entry_map.find(id);
if (it == id_to_entry_map.end())
return {};
const Entry & entry = it->second;
auto handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler);
return [this, id, handler_it]
{
std::lock_guard lock2{mutex};
auto it2 = id_to_entry_map.find(id);
if (it2 != id_to_entry_map.end())
{
const Entry & entry2 = it2->second;
entry2.handlers_by_id.erase(handler_it);
}
};
}
ext::scope_guard DiskAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
{
std::lock_guard lock{mutex};
auto handler_it = handlers_by_type.emplace(type, handler);
return [this, handler_it]
{
std::lock_guard lock2{mutex};
handlers_by_type.erase(handler_it);
};
}
bool DiskAccessStorage::hasSubscriptionImpl(const UUID & id) const
{
std::lock_guard lock{mutex};
auto it = id_to_entry_map.find(id);
if (it != id_to_entry_map.end())
{
const Entry & entry = it->second;
return !entry.handlers_by_id.empty();
}
return false;
}
bool DiskAccessStorage::hasSubscriptionImpl(std::type_index type) const
{
std::lock_guard lock{mutex};
auto range = handlers_by_type.equal_range(type);
return range.first != range.second;
}
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Access/MemoryAccessStorage.h>
#include <Common/ThreadPool.h>
#include <boost/container/flat_set.hpp>
namespace DB
{
/// Loads and saves access entities on a local disk to a specified directory.
class DiskAccessStorage : public IAccessStorage
{
public:
DiskAccessStorage();
~DiskAccessStorage() override;
void setDirectory(const String & directory_path_);
private:
std::optional<UUID> findImpl(std::type_index type, const String & name) const override;
std::vector<UUID> findAllImpl(std::type_index type) const override;
bool existsImpl(const UUID & id) const override;
AccessEntityPtr readImpl(const UUID & id) const override;
String readNameImpl(const UUID & id) const override;
bool canInsertImpl(const AccessEntityPtr & entity) const override;
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
ext::scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
bool hasSubscriptionImpl(const UUID & id) const override;
bool hasSubscriptionImpl(std::type_index type) const override;
void initialize(const String & directory_path_, Notifications & notifications);
bool readLists();
void writeLists();
void scheduleWriteLists(std::type_index type);
void rebuildLists();
void startListsWritingThread();
void stopListsWritingThread();
void listsWritingThreadFunc();
void insertNoLock(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, Notifications & notifications);
void removeNoLock(const UUID & id, Notifications & notifications);
void updateNoLock(const UUID & id, const UpdateFunc & update_func, Notifications & notifications);
AccessEntityPtr readAccessEntityFromDisk(const UUID & id) const;
void writeAccessEntityToDisk(const UUID & id, const IAccessEntity & entity) const;
void deleteAccessEntityOnDisk(const UUID & id) const;
using NameToIDMap = std::unordered_map<String, UUID>;
struct Entry
{
Entry(const std::string_view & name_, std::type_index type_) : name(name_), type(type_) {}
std::string_view name; /// view points to a string in `name_to_id_maps`.
std::type_index type;
mutable AccessEntityPtr entity; /// may be nullptr, if the entity hasn't been loaded yet.
mutable std::list<OnChangedHandler> handlers_by_id;
};
void prepareNotifications(const UUID & id, const Entry & entry, bool remove, Notifications & notifications) const;
String directory_path;
bool initialized = false;
std::unordered_map<std::type_index, NameToIDMap> name_to_id_maps;
std::unordered_map<UUID, Entry> id_to_entry_map;
boost::container::flat_set<std::type_index> types_of_lists_to_write;
bool failed_to_write_lists = false; /// Whether writing of the list files has been failed since the recent restart of the server.
ThreadFromGlobalPool lists_writing_thread; /// List files are written in a separate thread.
std::condition_variable lists_writing_thread_should_exit; /// Signals `lists_writing_thread` to exit.
std::atomic<bool> lists_writing_thread_exited = false;
mutable std::unordered_multimap<std::type_index, OnChangedHandler> handlers_by_type;
mutable std::mutex mutex;
};
}

View File

@ -4,6 +4,8 @@
#include <Access/Role.h>
#include <Parsers/ASTGenericRoleSet.h>
#include <Parsers/formatAST.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/range/algorithm/set_algorithm.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm_ext/push_back.hpp>
@ -11,6 +13,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
GenericRoleSet::GenericRoleSet() = default;
GenericRoleSet::GenericRoleSet(const GenericRoleSet & src) = default;
GenericRoleSet & GenericRoleSet::operator =(const GenericRoleSet & src) = default;
@ -41,26 +47,51 @@ GenericRoleSet::GenericRoleSet(const boost::container::flat_set<UUID> & ids_)
}
GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager, const std::optional<UUID> & current_user_id)
GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast)
{
init(ast, nullptr, nullptr);
}
GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const UUID & current_user_id)
{
init(ast, nullptr, &current_user_id);
}
GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager)
{
init(ast, &manager, nullptr);
}
GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager, const UUID & current_user_id)
{
init(ast, &manager, &current_user_id);
}
void GenericRoleSet::init(const ASTGenericRoleSet & ast, const AccessControlManager * manager, const UUID * current_user_id)
{
all = ast.all;
auto name_to_id = [id_mode{ast.id_mode}, manager](const String & name) -> UUID
{
if (id_mode)
return parse<UUID>(name);
assert(manager);
auto id = manager->find<User>(name);
if (id)
return *id;
return manager->getID<Role>(name);
};
if (!ast.names.empty() && !all)
{
ids.reserve(ast.names.size());
for (const String & name : ast.names)
{
auto id = manager.find<User>(name);
if (!id)
id = manager.getID<Role>(name);
ids.insert(*id);
}
ids.insert(name_to_id(name));
}
if (ast.current_user && !all)
{
if (!current_user_id)
throw Exception("Current user is unknown", ErrorCodes::LOGICAL_ERROR);
assert(current_user_id);
ids.insert(*current_user_id);
}
@ -68,18 +99,12 @@ GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const AccessContro
{
except_ids.reserve(ast.except_names.size());
for (const String & except_name : ast.except_names)
{
auto except_id = manager.find<User>(except_name);
if (!except_id)
except_id = manager.getID<Role>(except_name);
except_ids.insert(*except_id);
}
except_ids.insert(name_to_id(except_name));
}
if (ast.except_current_user)
{
if (!current_user_id)
throw Exception("Current user is unknown", ErrorCodes::LOGICAL_ERROR);
assert(current_user_id);
except_ids.insert(*current_user_id);
}
@ -87,7 +112,52 @@ GenericRoleSet::GenericRoleSet(const ASTGenericRoleSet & ast, const AccessContro
ids.erase(except_id);
}
std::shared_ptr<ASTGenericRoleSet> GenericRoleSet::toAST(const AccessControlManager & manager) const
std::shared_ptr<ASTGenericRoleSet> GenericRoleSet::toAST() const
{
auto ast = std::make_shared<ASTGenericRoleSet>();
ast->id_mode = true;
ast->all = all;
if (!ids.empty())
{
ast->names.reserve(ids.size());
for (const UUID & id : ids)
ast->names.emplace_back(::DB::toString(id));
}
if (!except_ids.empty())
{
ast->except_names.reserve(except_ids.size());
for (const UUID & except_id : except_ids)
ast->except_names.emplace_back(::DB::toString(except_id));
}
return ast;
}
String GenericRoleSet::toString() const
{
auto ast = toAST();
return serializeAST(*ast);
}
Strings GenericRoleSet::toStrings() const
{
if (all || !except_ids.empty())
return {toString()};
Strings names;
names.reserve(ids.size());
for (const UUID & id : ids)
names.emplace_back(::DB::toString(id));
return names;
}
std::shared_ptr<ASTGenericRoleSet> GenericRoleSet::toASTWithNames(const AccessControlManager & manager) const
{
auto ast = std::make_shared<ASTGenericRoleSet>();
ast->all = all;
@ -120,17 +190,17 @@ std::shared_ptr<ASTGenericRoleSet> GenericRoleSet::toAST(const AccessControlMana
}
String GenericRoleSet::toString(const AccessControlManager & manager) const
String GenericRoleSet::toStringWithNames(const AccessControlManager & manager) const
{
auto ast = toAST(manager);
auto ast = toASTWithNames(manager);
return serializeAST(*ast);
}
Strings GenericRoleSet::toStrings(const AccessControlManager & manager) const
Strings GenericRoleSet::toStringsWithNames(const AccessControlManager & manager) const
{
if (all || !except_ids.empty())
return {toString(manager)};
return {toStringWithNames(manager)};
Strings names;
names.reserve(ids.size());

View File

@ -30,11 +30,19 @@ struct GenericRoleSet
GenericRoleSet(const std::vector<UUID> & ids_);
GenericRoleSet(const boost::container::flat_set<UUID> & ids_);
GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager, const std::optional<UUID> & current_user_id = {});
std::shared_ptr<ASTGenericRoleSet> toAST(const AccessControlManager & manager) const;
/// The constructor from AST requires the AccessControlManager if `ast.id_mode == false`.
GenericRoleSet(const ASTGenericRoleSet & ast);
GenericRoleSet(const ASTGenericRoleSet & ast, const UUID & current_user_id);
GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager);
GenericRoleSet(const ASTGenericRoleSet & ast, const AccessControlManager & manager, const UUID & current_user_id);
String toString(const AccessControlManager & manager) const;
Strings toStrings(const AccessControlManager & manager) const;
std::shared_ptr<ASTGenericRoleSet> toAST() const;
String toString() const;
Strings toStrings() const;
std::shared_ptr<ASTGenericRoleSet> toASTWithNames(const AccessControlManager & manager) const;
String toStringWithNames(const AccessControlManager & manager) const;
Strings toStringsWithNames(const AccessControlManager & manager) const;
bool empty() const;
void clear();
@ -61,6 +69,9 @@ struct GenericRoleSet
boost::container::flat_set<UUID> ids;
bool all = false;
boost::container::flat_set<UUID> except_ids;
private:
void init(const ASTGenericRoleSet & ast, const AccessControlManager * manager = nullptr, const UUID * current_user_id = nullptr);
};
}

View File

@ -15,8 +15,7 @@ namespace ErrorCodes
extern const int BAD_CAST;
extern const int ACCESS_ENTITY_NOT_FOUND;
extern const int ACCESS_ENTITY_ALREADY_EXISTS;
extern const int ACCESS_ENTITY_FOUND_DUPLICATES;
extern const int ACCESS_ENTITY_STORAGE_READONLY;
extern const int ACCESS_STORAGE_READONLY;
extern const int UNKNOWN_USER;
extern const int UNKNOWN_ROLE;
}
@ -73,7 +72,6 @@ bool IAccessStorage::exists(const UUID & id) const
}
AccessEntityPtr IAccessStorage::tryReadBase(const UUID & id) const
{
try
@ -420,7 +418,7 @@ void IAccessStorage::throwReadonlyCannotInsert(std::type_index type, const Strin
{
throw Exception(
"Cannot insert " + getTypeName(type) + " " + backQuote(name) + " to " + getStorageName() + " because this storage is readonly",
ErrorCodes::ACCESS_ENTITY_STORAGE_READONLY);
ErrorCodes::ACCESS_STORAGE_READONLY);
}
@ -428,7 +426,7 @@ void IAccessStorage::throwReadonlyCannotUpdate(std::type_index type, const Strin
{
throw Exception(
"Cannot update " + getTypeName(type) + " " + backQuote(name) + " in " + getStorageName() + " because this storage is readonly",
ErrorCodes::ACCESS_ENTITY_STORAGE_READONLY);
ErrorCodes::ACCESS_STORAGE_READONLY);
}
@ -436,6 +434,6 @@ void IAccessStorage::throwReadonlyCannotRemove(std::type_index type, const Strin
{
throw Exception(
"Cannot remove " + getTypeName(type) + " " + backQuote(name) + " from " + getStorageName() + " because this storage is readonly",
ErrorCodes::ACCESS_ENTITY_STORAGE_READONLY);
ErrorCodes::ACCESS_STORAGE_READONLY);
}
}

View File

@ -74,6 +74,10 @@ public:
String readName(const UUID & id) const;
std::optional<String> tryReadName(const UUID & id) const;
/// Returns true if a specified entity can be inserted into this storage.
/// This function doesn't check whether there are no entities with such name in the storage.
bool canInsert(const AccessEntityPtr & entity) const { return canInsertImpl(entity); }
/// Inserts an entity to the storage. Returns ID of a new entry in the storage.
/// Throws an exception if the specified name already exists.
UUID insert(const AccessEntityPtr & entity);
@ -133,6 +137,7 @@ protected:
virtual bool existsImpl(const UUID & id) const = 0;
virtual AccessEntityPtr readImpl(const UUID & id) const = 0;
virtual String readNameImpl(const UUID & id) const = 0;
virtual bool canInsertImpl(const AccessEntityPtr & entity) const = 0;
virtual UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) = 0;
virtual void removeImpl(const UUID & id) = 0;
virtual void updateImpl(const UUID & id, const UpdateFunc & update_func) = 0;

View File

@ -293,6 +293,7 @@ ext::scope_guard MemoryAccessStorage::subscribeForChangesImpl(const UUID & id, c
bool MemoryAccessStorage::hasSubscriptionImpl(const UUID & id) const
{
std::lock_guard lock{mutex};
auto it = entries.find(id);
if (it != entries.end())
{
@ -305,6 +306,7 @@ bool MemoryAccessStorage::hasSubscriptionImpl(const UUID & id) const
bool MemoryAccessStorage::hasSubscriptionImpl(std::type_index type) const
{
std::lock_guard lock{mutex};
auto range = handlers_by_type.equal_range(type);
return range.first != range.second;
}

View File

@ -26,6 +26,7 @@ private:
bool existsImpl(const UUID & id) const override;
AccessEntityPtr readImpl(const UUID & id) const override;
String readNameImpl(const UUID & id) const override;
bool canInsertImpl(const AccessEntityPtr &) const override { return true; }
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;

View File

@ -7,8 +7,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int ACCESS_ENTITY_NOT_FOUND;
extern const int ACCESS_ENTITY_FOUND_DUPLICATES;
extern const int ACCESS_STORAGE_FOR_INSERTION_NOT_FOUND;
}
@ -30,10 +30,9 @@ namespace
MultipleAccessStorage::MultipleAccessStorage(
std::vector<std::unique_ptr<Storage>> nested_storages_, size_t index_of_nested_storage_for_insertion_)
std::vector<std::unique_ptr<Storage>> nested_storages_)
: IAccessStorage(joinStorageNames(nested_storages_))
, nested_storages(std::move(nested_storages_))
, nested_storage_for_insertion(nested_storages[index_of_nested_storage_for_insertion_].get())
, ids_cache(512 /* cache size */)
{
}
@ -162,13 +161,39 @@ String MultipleAccessStorage::readNameImpl(const UUID & id) const
}
bool MultipleAccessStorage::canInsertImpl(const AccessEntityPtr & entity) const
{
for (const auto & nested_storage : nested_storages)
{
if (nested_storage->canInsert(entity))
return true;
}
return false;
}
UUID MultipleAccessStorage::insertImpl(const AccessEntityPtr & entity, bool replace_if_exists)
{
auto id = replace_if_exists ? nested_storage_for_insertion->insertOrReplace(entity) : nested_storage_for_insertion->insert(entity);
IAccessStorage * nested_storage_for_insertion = nullptr;
for (const auto & nested_storage : nested_storages)
{
if (nested_storage->canInsert(entity))
{
nested_storage_for_insertion = nested_storage.get();
break;
}
}
if (!nested_storage_for_insertion)
{
throw Exception(
"Not found a storage to insert " + entity->getTypeName() + backQuote(entity->getName()),
ErrorCodes::ACCESS_STORAGE_FOR_INSERTION_NOT_FOUND);
}
auto id = replace_if_exists ? nested_storage_for_insertion->insertOrReplace(entity) : nested_storage_for_insertion->insert(entity);
std::lock_guard lock{ids_cache_mutex};
ids_cache.set(id, std::make_shared<Storage *>(nested_storage_for_insertion));
return id;
}

View File

@ -13,7 +13,7 @@ class MultipleAccessStorage : public IAccessStorage
public:
using Storage = IAccessStorage;
MultipleAccessStorage(std::vector<std::unique_ptr<Storage>> nested_storages_, size_t index_of_nested_storage_for_insertion_ = 0);
MultipleAccessStorage(std::vector<std::unique_ptr<Storage>> nested_storages_);
~MultipleAccessStorage() override;
std::vector<UUID> findMultiple(std::type_index type, const String & name) const;
@ -35,6 +35,7 @@ protected:
bool existsImpl(const UUID & id) const override;
AccessEntityPtr readImpl(const UUID & id) const override;
String readNameImpl(const UUID &id) const override;
bool canInsertImpl(const AccessEntityPtr & entity) const override;
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
@ -45,7 +46,6 @@ protected:
private:
std::vector<std::unique_ptr<Storage>> nested_storages;
IAccessStorage * nested_storage_for_insertion;
mutable LRUCache<UUID, Storage *> ids_cache;
mutable std::mutex ids_cache_mutex;
};

View File

@ -5,7 +5,6 @@
#include <chrono>
namespace DB
{
/** Quota for resources consumption for specific interval.

View File

@ -9,6 +9,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int READONLY;
extern const int QUERY_IS_PROHIBITED;
extern const int NO_ELEMENTS_IN_CONFIG;
@ -198,6 +199,77 @@ void SettingsConstraints::check(const Settings & current_settings, const Setting
}
void SettingsConstraints::clamp(const Settings & current_settings, SettingChange & change) const
{
const String & name = change.name;
size_t setting_index = Settings::findIndex(name);
if (setting_index == Settings::npos)
return;
Field new_value = Settings::valueToCorrespondingType(setting_index, change.value);
Field current_value = current_settings.get(setting_index);
/// Setting isn't checked if value wasn't changed.
if (current_value == new_value)
return;
if (!current_settings.allow_ddl && name == "allow_ddl")
{
change.value = current_value;
return;
}
/** The `readonly` value is understood as follows:
* 0 - everything allowed.
* 1 - only read queries can be made; you can not change the settings.
* 2 - You can only do read queries and you can change the settings, except for the `readonly` setting.
*/
if (current_settings.readonly == 1)
{
change.value = current_value;
return;
}
if (current_settings.readonly > 1 && name == "readonly")
{
change.value = current_value;
return;
}
const Constraint * constraint = tryGetConstraint(setting_index);
if (constraint)
{
if (constraint->read_only)
{
change.value = current_value;
return;
}
if (!constraint->min_value.isNull() && (new_value < constraint->min_value))
{
if (!constraint->max_value.isNull() && (constraint->min_value > constraint->max_value))
change.value = current_value;
else
change.value = constraint->min_value;
return;
}
if (!constraint->max_value.isNull() && (new_value > constraint->max_value))
{
change.value = constraint->max_value;
return;
}
}
}
void SettingsConstraints::clamp(const Settings & current_settings, SettingsChanges & changes) const
{
for (auto & change : changes)
clamp(current_settings, change);
}
SettingsConstraints::Constraint & SettingsConstraints::getConstraintRef(size_t index)
{
auto it = constraints_by_index.find(index);

View File

@ -85,9 +85,14 @@ public:
Infos getInfo() const;
/// Checks whether `change` violates these constraints and throws an exception if so.
void check(const Settings & current_settings, const SettingChange & change) const;
void check(const Settings & current_settings, const SettingsChanges & changes) const;
/// Checks whether `change` violates these and clamps the `change` if so.
void clamp(const Settings & current_settings, SettingChange & change) const;
void clamp(const Settings & current_settings, SettingsChanges & changes) const;
/** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings).
* The profile can also be set using the `set` functions, like the profile setting.
*/

View File

@ -342,7 +342,7 @@ UsersConfigAccessStorage::UsersConfigAccessStorage() : IAccessStorage("users.xml
UsersConfigAccessStorage::~UsersConfigAccessStorage() {}
void UsersConfigAccessStorage::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
void UsersConfigAccessStorage::setConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::vector<std::pair<UUID, AccessEntityPtr>> all_entities;
for (const auto & entity : parseUsers(config, getLogger()))

View File

@ -21,7 +21,7 @@ public:
UsersConfigAccessStorage();
~UsersConfigAccessStorage() override;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
void setConfiguration(const Poco::Util::AbstractConfiguration & config);
private:
std::optional<UUID> findImpl(std::type_index type, const String & name) const override;
@ -29,6 +29,7 @@ private:
bool existsImpl(const UUID & id) const override;
AccessEntityPtr readImpl(const UUID & id) const override;
String readNameImpl(const UUID & id) const override;
bool canInsertImpl(const AccessEntityPtr &) const override { return false; }
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;

View File

@ -6,6 +6,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{

View File

@ -14,7 +14,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename T, typename Denominator>

View File

@ -6,6 +6,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{

View File

@ -7,6 +7,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{

View File

@ -21,7 +21,6 @@ struct AggregateFunctionCountData
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

View File

@ -9,7 +9,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
class AggregateFunctionCombinatorForEach final : public IAggregateFunctionCombinator

View File

@ -16,7 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int PARAMETER_OUT_OF_BOUND;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}

View File

@ -27,7 +27,6 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
extern const int LOGICAL_ERROR;
}
enum class Sampler

Some files were not shown because too many files have changed in this diff Show More