mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 12:32:04 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into clickhouse-copier
This commit is contained in:
commit
9794c1cf58
@ -52,7 +52,7 @@ IncludeCategories:
|
||||
ReflowComments: false
|
||||
AlignEscapedNewlinesLeft: false
|
||||
AlignEscapedNewlines: DontAlign
|
||||
AlignTrailingComments: true
|
||||
AlignTrailingComments: false
|
||||
|
||||
# Not changed:
|
||||
AccessModifierOffset: -4
|
||||
|
@ -12,6 +12,3 @@ ClickHouse is an open-source column-oriented database management system that all
|
||||
* [Contacts](https://clickhouse.tech/#contacts) can help to get your questions answered if there are any.
|
||||
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
|
||||
|
||||
## Upcoming Events
|
||||
|
||||
* [ClickHouse Meetup in Athens](https://www.meetup.com/Athens-Big-Data/events/268379195/) on March 5.
|
||||
|
@ -80,7 +80,6 @@ dumpImpl(Out & out, T && x)
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Tuple, pair
|
||||
template <size_t N, typename Out, typename T>
|
||||
Out & dumpTupleImpl(Out & out, T && x)
|
||||
|
@ -1,44 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace ext
|
||||
{
|
||||
|
||||
/** Thread-unsafe singleton. It works simply like a global variable.
|
||||
* Supports deinitialization.
|
||||
*
|
||||
* In most of the cases, you don't need this class.
|
||||
* Use "Meyers Singleton" instead: static T & instance() { static T x; return x; }
|
||||
*/
|
||||
|
||||
template <class T>
|
||||
class Singleton
|
||||
{
|
||||
public:
|
||||
Singleton()
|
||||
{
|
||||
if (!instance)
|
||||
instance = std::make_unique<T>();
|
||||
}
|
||||
|
||||
T * operator->()
|
||||
{
|
||||
return instance.get();
|
||||
}
|
||||
|
||||
static bool isInitialized()
|
||||
{
|
||||
return !!instance;
|
||||
}
|
||||
|
||||
static void reset()
|
||||
{
|
||||
instance.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
inline static std::unique_ptr<T> instance{};
|
||||
};
|
||||
|
||||
}
|
@ -2,7 +2,6 @@
|
||||
#include <stdint.h>
|
||||
#include <time.h>
|
||||
#include "atomic.h"
|
||||
#include "musl_features.h"
|
||||
#include "syscall.h"
|
||||
|
||||
#ifdef VDSO_CGT_SYM
|
||||
@ -54,7 +53,7 @@ static void *volatile vdso_func = (void *)cgt_init;
|
||||
|
||||
#endif
|
||||
|
||||
int __clock_gettime(clockid_t clk, struct timespec *ts)
|
||||
int clock_gettime(clockid_t clk, struct timespec *ts)
|
||||
{
|
||||
int r;
|
||||
|
||||
@ -104,5 +103,3 @@ int __clock_gettime(clockid_t clk, struct timespec *ts)
|
||||
return __syscall_ret(r);
|
||||
#endif
|
||||
}
|
||||
|
||||
weak_alias(__clock_gettime, clock_gettime);
|
||||
|
@ -1,10 +1,9 @@
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
#include "musl_features.h"
|
||||
#include "syscall.h"
|
||||
|
||||
int __clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, struct timespec * rem)
|
||||
int clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, struct timespec * rem)
|
||||
{
|
||||
if (clk == CLOCK_THREAD_CPUTIME_ID)
|
||||
return EINVAL;
|
||||
@ -23,5 +22,3 @@ int __clock_nanosleep(clockid_t clk, int flags, const struct timespec * req, str
|
||||
pthread_setcanceltype(old_cancel_type, NULL);
|
||||
return status;
|
||||
}
|
||||
|
||||
weak_alias(__clock_nanosleep, clock_nanosleep);
|
||||
|
@ -2,7 +2,4 @@
|
||||
|
||||
#define weak __attribute__((__weak__))
|
||||
#define hidden __attribute__((__visibility__("hidden")))
|
||||
#define weak_alias(old, new) \
|
||||
extern __typeof(old) new __attribute__((__weak__, __alias__(#old)))
|
||||
|
||||
#define predict_false(x) __builtin_expect(x, 0)
|
||||
|
@ -2,6 +2,7 @@
|
||||
.hidden __syscall
|
||||
.type __syscall,@function
|
||||
__syscall:
|
||||
.cfi_startproc
|
||||
movq %rdi,%rax
|
||||
movq %rsi,%rdi
|
||||
movq %rdx,%rsi
|
||||
@ -11,3 +12,4 @@ __syscall:
|
||||
movq 8(%rsp),%r9
|
||||
syscall
|
||||
ret
|
||||
.cfi_endproc
|
||||
|
@ -39,7 +39,6 @@ typedef __attribute__((__aligned__(1))) uint32_t uint32_unaligned_t;
|
||||
typedef __attribute__((__aligned__(1))) uint64_t uint64_unaligned_t;
|
||||
|
||||
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// fast copy for different sizes
|
||||
//---------------------------------------------------------------------
|
||||
@ -694,4 +693,3 @@ static INLINE void* memcpy_fast(void *destination, const void *source, size_t si
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
@ -8,6 +8,7 @@ add_library (mysqlxx
|
||||
src/Row.cpp
|
||||
src/Value.cpp
|
||||
src/Pool.cpp
|
||||
src/PoolFactory.cpp
|
||||
src/PoolWithFailover.cpp
|
||||
|
||||
include/mysqlxx/Connection.h
|
||||
@ -15,6 +16,7 @@ add_library (mysqlxx
|
||||
include/mysqlxx/mysqlxx.h
|
||||
include/mysqlxx/Null.h
|
||||
include/mysqlxx/Pool.h
|
||||
include/mysqlxx/PoolFactory.h
|
||||
include/mysqlxx/PoolWithFailover.h
|
||||
include/mysqlxx/Query.h
|
||||
include/mysqlxx/ResultBase.h
|
||||
|
@ -198,6 +198,8 @@ public:
|
||||
return description;
|
||||
}
|
||||
|
||||
void removeConnection(Connection * data);
|
||||
|
||||
protected:
|
||||
/// Number of MySQL connections which are created at launch.
|
||||
unsigned default_connections;
|
||||
|
55
base/mysqlxx/include/mysqlxx/PoolFactory.h
Normal file
55
base/mysqlxx/include/mysqlxx/PoolFactory.h
Normal file
@ -0,0 +1,55 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <mysqlxx/PoolWithFailover.h>
|
||||
|
||||
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
|
||||
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
|
||||
|
||||
|
||||
namespace mysqlxx
|
||||
{
|
||||
|
||||
/*
|
||||
* PoolFactory.h
|
||||
* This class is a helper singleton to mutualize connections to MySQL.
|
||||
*/
|
||||
class PoolFactory final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static PoolFactory & instance();
|
||||
|
||||
PoolFactory(const PoolFactory &) = delete;
|
||||
|
||||
/** Allocates a PoolWithFailover to connect to MySQL. */
|
||||
PoolWithFailover Get(const std::string & config_name,
|
||||
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
|
||||
/** Allocates a PoolWithFailover to connect to MySQL. */
|
||||
PoolWithFailover Get(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name,
|
||||
unsigned default_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
|
||||
void reset();
|
||||
|
||||
|
||||
~PoolFactory() = default;
|
||||
PoolFactory& operator=(const PoolFactory &) = delete;
|
||||
|
||||
private:
|
||||
PoolFactory();
|
||||
|
||||
struct Impl;
|
||||
std::unique_ptr<Impl> impl;
|
||||
};
|
||||
|
||||
}
|
@ -77,6 +77,10 @@ namespace mysqlxx
|
||||
size_t max_tries;
|
||||
/// Mutex for set of replicas.
|
||||
std::mutex mutex;
|
||||
std::string config_name;
|
||||
|
||||
/// Can the Pool be shared
|
||||
bool shareable;
|
||||
|
||||
public:
|
||||
using Entry = Pool::Entry;
|
||||
@ -100,8 +104,6 @@ namespace mysqlxx
|
||||
|
||||
PoolWithFailover(const PoolWithFailover & other);
|
||||
|
||||
PoolWithFailover & operator=(const PoolWithFailover &) = delete;
|
||||
|
||||
/** Allocates a connection to use. */
|
||||
Entry Get();
|
||||
};
|
||||
|
@ -23,26 +23,26 @@ namespace mysqlxx
|
||||
class ResultBase;
|
||||
|
||||
|
||||
/** Представляет одно значение, считанное из MySQL.
|
||||
* Объект сам не хранит данные, а является всего лишь обёрткой над парой (const char *, size_t).
|
||||
* Если уничтожить UseQueryResult/StoreQueryResult или Connection,
|
||||
* или считать следующий Row при использовании UseQueryResult, то объект станет некорректным.
|
||||
* Позволяет преобразовать значение (распарсить) в различные типы данных:
|
||||
* - с помощью функций вида getUInt(), getString(), ... (рекомендуется);
|
||||
* - с помощью шаблонной функции get<Type>(), которая специализирована для многих типов (для шаблонного кода);
|
||||
* - шаблонная функция get<Type> работает также для всех типов, у которых есть конструктор из Value
|
||||
* (это сделано для возможности расширения);
|
||||
* - с помощью operator Type() - но этот метод реализован лишь для совместимости и не рекомендуется
|
||||
* к использованию, так как неудобен (часто возникают неоднозначности).
|
||||
/** Represents a single value read from MySQL.
|
||||
* It doesn't owns the value. It's just a wrapper of a pair (const char *, size_t).
|
||||
* If the UseQueryResult/StoreQueryResult or Connection is destroyed,
|
||||
* or you have read the next Row while using UseQueryResult, then the object is invalidated.
|
||||
* Allows to transform (parse) the value to various data types:
|
||||
* - with getUInt(), getString(), ... (recommended);
|
||||
* - with template function get<Type>() that is specialized for multiple data types;
|
||||
* - the template function get<Type> also works for all types that can be constructed from Value
|
||||
* (it is an extension point);
|
||||
* - with operator Type() - this is done for compatibility and not recommended because ambiguities possible.
|
||||
*
|
||||
* При ошибке парсинга, выкидывается исключение.
|
||||
* При попытке достать значение, которое равно nullptr, выкидывается исключение
|
||||
* - используйте метод isNull() для проверки.
|
||||
* On parsing error, exception is thrown.
|
||||
* When trying to extract a value that is nullptr, exception is thrown
|
||||
* - use isNull() method to check.
|
||||
*
|
||||
* Во всех распространённых системах, time_t - это всего лишь typedef от Int64 или Int32.
|
||||
* Для того, чтобы можно было писать row[0].get<time_t>(), ожидая, что значение вида '2011-01-01 00:00:00'
|
||||
* корректно распарсится согласно текущей тайм-зоне, сделано так, что метод getUInt и соответствующие методы get<>()
|
||||
* также умеют парсить дату и дату-время.
|
||||
* As time_t is just an alias for integer data type
|
||||
* to allow to write row[0].get<time_t>(), and expect that the values like '2011-01-01 00:00:00'
|
||||
* will be successfully parsed according to the current time zone,
|
||||
* the getUInt method and the corresponding get<>() methods
|
||||
* are capable of parsing Date and DateTime.
|
||||
*/
|
||||
class Value
|
||||
{
|
||||
@ -166,7 +166,7 @@ private:
|
||||
else
|
||||
throwException("Cannot parse DateTime");
|
||||
|
||||
return 0; /// чтобы не было warning-а.
|
||||
return 0; /// avoid warning.
|
||||
}
|
||||
|
||||
|
||||
@ -184,7 +184,7 @@ private:
|
||||
else
|
||||
throwException("Cannot parse Date");
|
||||
|
||||
return 0; /// чтобы не было warning-а.
|
||||
return 0; /// avoid warning.
|
||||
}
|
||||
|
||||
|
||||
@ -231,7 +231,7 @@ private:
|
||||
double readFloatText(const char * buf, size_t length) const;
|
||||
|
||||
/// Выкинуть исключение с подробной информацией
|
||||
void throwException(const char * text) const;
|
||||
[[noreturn]] void throwException(const char * text) const;
|
||||
};
|
||||
|
||||
|
||||
|
@ -22,15 +22,20 @@ void Pool::Entry::incrementRefCount()
|
||||
if (!data)
|
||||
return;
|
||||
++data->ref_count;
|
||||
mysql_thread_init();
|
||||
if (data->ref_count == 1)
|
||||
mysql_thread_init();
|
||||
}
|
||||
|
||||
void Pool::Entry::decrementRefCount()
|
||||
{
|
||||
if (!data)
|
||||
return;
|
||||
--data->ref_count;
|
||||
mysql_thread_end();
|
||||
if (data->ref_count > 0)
|
||||
{
|
||||
--data->ref_count;
|
||||
if (data->ref_count == 0)
|
||||
mysql_thread_end();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -169,14 +174,24 @@ Pool::Entry Pool::tryGet()
|
||||
return Entry();
|
||||
}
|
||||
|
||||
void Pool::removeConnection(Connection* connection)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
if (connection)
|
||||
{
|
||||
if (connection->ref_count > 0)
|
||||
{
|
||||
connection->conn.disconnect();
|
||||
connection->ref_count = 0;
|
||||
}
|
||||
connections.remove(connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Pool::Entry::disconnect()
|
||||
{
|
||||
if (data)
|
||||
{
|
||||
decrementRefCount();
|
||||
data->conn.disconnect();
|
||||
}
|
||||
pool->removeConnection(data);
|
||||
}
|
||||
|
||||
|
||||
|
122
base/mysqlxx/src/PoolFactory.cpp
Normal file
122
base/mysqlxx/src/PoolFactory.cpp
Normal file
@ -0,0 +1,122 @@
|
||||
#include <mysqlxx/PoolFactory.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
|
||||
namespace mysqlxx
|
||||
{
|
||||
|
||||
struct PoolFactory::Impl
|
||||
{
|
||||
// Cache of already affected pools identified by their config name
|
||||
std::map<std::string, std::shared_ptr<PoolWithFailover>> pools;
|
||||
|
||||
// Cache of Pool ID (host + port + user +...) cibling already established shareable pool
|
||||
std::map<std::string, std::string> pools_by_ids;
|
||||
|
||||
/// Protect pools and pools_by_ids caches
|
||||
std::mutex mutex;
|
||||
};
|
||||
|
||||
PoolWithFailover PoolFactory::Get(const std::string & config_name, unsigned default_connections,
|
||||
unsigned max_connections, size_t max_tries)
|
||||
{
|
||||
return Get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
|
||||
}
|
||||
|
||||
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
|
||||
static bool startsWith(const std::string & s, const char * prefix)
|
||||
{
|
||||
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
|
||||
}
|
||||
|
||||
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name)
|
||||
{
|
||||
bool shared = config.getBool(config_name + ".share_connection", false);
|
||||
|
||||
// Not shared no need to generate a name the pool won't be stored
|
||||
if (!shared)
|
||||
return "";
|
||||
|
||||
std::string entry_name = "";
|
||||
std::string host = config.getString(config_name + ".host", "");
|
||||
std::string port = config.getString(config_name + ".port", "");
|
||||
std::string user = config.getString(config_name + ".user", "");
|
||||
std::string db = config.getString(config_name + ".db", "");
|
||||
std::string table = config.getString(config_name + ".table", "");
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
if (config.has(config_name + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
config.keys(config_name, replica_keys);
|
||||
for (const auto & replica_config_key : replica_keys)
|
||||
{
|
||||
/// There could be another elements in the same level in configuration file, like "user", "port"...
|
||||
if (startsWith(replica_config_key, "replica"))
|
||||
{
|
||||
std::string replica_name = config_name + "." + replica_config_key;
|
||||
std::string tmp_host = config.getString(replica_name + ".host", host);
|
||||
std::string tmp_port = config.getString(replica_name + ".port", port);
|
||||
std::string tmp_user = config.getString(replica_name + ".user", user);
|
||||
entry_name += (entry_name.empty() ? "" : "|") + tmp_user + "@" + tmp_host + ":" + tmp_port + "/" + db;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
entry_name = user + "@" + host + ":" + port + "/" + db;
|
||||
}
|
||||
return entry_name;
|
||||
}
|
||||
|
||||
PoolWithFailover PoolFactory::Get(const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_name, unsigned default_connections, unsigned max_connections, size_t max_tries)
|
||||
{
|
||||
|
||||
std::lock_guard<std::mutex> lock(impl->mutex);
|
||||
if (auto entry = impl->pools.find(config_name); entry != impl->pools.end())
|
||||
{
|
||||
return *(entry->second.get());
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string entry_name = getPoolEntryName(config, config_name);
|
||||
if (auto id = impl->pools_by_ids.find(entry_name); id != impl->pools_by_ids.end())
|
||||
{
|
||||
entry = impl->pools.find(id->second);
|
||||
std::shared_ptr<PoolWithFailover> pool = entry->second;
|
||||
impl->pools.insert_or_assign(config_name, pool);
|
||||
return *pool;
|
||||
}
|
||||
|
||||
auto pool = std::make_shared<PoolWithFailover>(config, config_name, default_connections, max_connections, max_tries);
|
||||
// Check the pool will be shared
|
||||
if (!entry_name.empty())
|
||||
{
|
||||
// Store shared pool
|
||||
impl->pools.insert_or_assign(config_name, pool);
|
||||
impl->pools_by_ids.insert_or_assign(entry_name, config_name);
|
||||
}
|
||||
return *(pool.get());
|
||||
}
|
||||
}
|
||||
|
||||
void PoolFactory::reset()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(impl->mutex);
|
||||
impl->pools.clear();
|
||||
impl->pools_by_ids.clear();
|
||||
}
|
||||
|
||||
PoolFactory::PoolFactory() : impl(std::make_unique<PoolFactory::Impl>()) {}
|
||||
|
||||
PoolFactory & PoolFactory::instance()
|
||||
{
|
||||
static PoolFactory ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
@ -15,6 +15,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
|
||||
const unsigned max_connections, const size_t max_tries)
|
||||
: max_tries(max_tries)
|
||||
{
|
||||
shareable = cfg.getBool(config_name + ".share_connection", false);
|
||||
if (cfg.has(config_name + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
@ -48,15 +49,22 @@ PoolWithFailover::PoolWithFailover(const std::string & config_name, const unsign
|
||||
{}
|
||||
|
||||
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
|
||||
: max_tries{other.max_tries}
|
||||
: max_tries{other.max_tries}, config_name{other.config_name}, shareable{other.shareable}
|
||||
{
|
||||
for (const auto & priority_replicas : other.replicas_by_priority)
|
||||
if (shareable)
|
||||
{
|
||||
Replicas replicas;
|
||||
replicas.reserve(priority_replicas.second.size());
|
||||
for (const auto & pool : priority_replicas.second)
|
||||
replicas.emplace_back(std::make_shared<Pool>(*pool));
|
||||
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
|
||||
replicas_by_priority = other.replicas_by_priority;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & priority_replicas : other.replicas_by_priority)
|
||||
{
|
||||
Replicas replicas;
|
||||
replicas.reserve(priority_replicas.second.size());
|
||||
for (const auto & pool : priority_replicas.second)
|
||||
replicas.emplace_back(std::make_shared<Pool>(*pool));
|
||||
replicas_by_priority.emplace(priority_replicas.first, std::move(replicas));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,7 +89,7 @@ PoolWithFailover::Entry PoolWithFailover::Get()
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = pool->tryGet();
|
||||
Entry entry = shareable ? pool->Get() : pool->tryGet();
|
||||
|
||||
if (!entry.isNull())
|
||||
{
|
||||
|
2
contrib/base64
vendored
2
contrib/base64
vendored
@ -1 +1 @@
|
||||
Subproject commit 5257626d2be17a3eb23f79be17fe55ebba394ad2
|
||||
Subproject commit 95ba56a9b041f9933f5cd2bbb2ee4e083468c20a
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit 1f3e4638f250ad4d028a2499af20d4185463e07d
|
||||
Subproject commit 860574c93980d887a89df141edd9ca2fb0024fa3
|
@ -105,6 +105,7 @@ namespace ErrorCodes
|
||||
extern const int UNEXPECTED_PACKET_FROM_SERVER;
|
||||
extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED;
|
||||
extern const int INVALID_USAGE_OF_INPUT;
|
||||
extern const int DEADLOCK_AVOIDED;
|
||||
}
|
||||
|
||||
|
||||
@ -906,9 +907,34 @@ private:
|
||||
query = serializeAST(*parsed_query);
|
||||
}
|
||||
|
||||
connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
||||
sendExternalTables();
|
||||
receiveResult();
|
||||
static constexpr size_t max_retries = 10;
|
||||
for (size_t retry = 0; retry < max_retries; ++retry)
|
||||
{
|
||||
try
|
||||
{
|
||||
connection->sendQuery(
|
||||
connection_parameters.timeouts,
|
||||
query,
|
||||
query_id,
|
||||
QueryProcessingStage::Complete,
|
||||
&context.getSettingsRef(),
|
||||
nullptr,
|
||||
true);
|
||||
|
||||
sendExternalTables();
|
||||
receiveResult();
|
||||
|
||||
break;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// Retry when the server said "Client should retry" and no rows has been received yet.
|
||||
if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && retry + 1 < max_retries)
|
||||
continue;
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -9,29 +9,43 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
||||
extern const int DEADLOCK_AVOIDED;
|
||||
}
|
||||
|
||||
void Suggest::load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
|
||||
{
|
||||
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
|
||||
{
|
||||
try
|
||||
for (size_t retry = 0; retry < 10; ++retry)
|
||||
{
|
||||
Connection connection(
|
||||
connection_parameters.host,
|
||||
connection_parameters.port,
|
||||
connection_parameters.default_database,
|
||||
connection_parameters.user,
|
||||
connection_parameters.password,
|
||||
"client",
|
||||
connection_parameters.compression,
|
||||
connection_parameters.security);
|
||||
try
|
||||
{
|
||||
Connection connection(
|
||||
connection_parameters.host,
|
||||
connection_parameters.port,
|
||||
connection_parameters.default_database,
|
||||
connection_parameters.user,
|
||||
connection_parameters.password,
|
||||
"client",
|
||||
connection_parameters.compression,
|
||||
connection_parameters.security);
|
||||
|
||||
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// Retry when the server said "Client should retry".
|
||||
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
|
||||
continue;
|
||||
|
||||
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/// Note that keyword suggestions are available even if we cannot load data from server.
|
||||
|
@ -133,7 +133,6 @@ struct TaskStateWithOwner
|
||||
};
|
||||
|
||||
|
||||
|
||||
struct ShardPriority
|
||||
{
|
||||
UInt8 is_remote = 1;
|
||||
|
@ -298,7 +298,7 @@ void LocalServer::processQueries()
|
||||
|
||||
try
|
||||
{
|
||||
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {}, {});
|
||||
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {});
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -60,5 +60,4 @@ void StopConditionsSet::report(UInt64 value, StopConditionsSet::StopCondition &
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -592,12 +592,14 @@ void HTTPHandler::processQuery(
|
||||
customizeContext(context);
|
||||
|
||||
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
|
||||
[&response] (const String & content_type, const String & format)
|
||||
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
|
||||
{
|
||||
response.setContentType(content_type);
|
||||
response.add("X-ClickHouse-Query-Id", current_query_id);
|
||||
response.add("X-ClickHouse-Format", format);
|
||||
},
|
||||
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
|
||||
response.add("X-ClickHouse-Timezone", timezone);
|
||||
}
|
||||
);
|
||||
|
||||
if (used_output.hasDelayed())
|
||||
{
|
||||
|
@ -282,14 +282,9 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
|
||||
}
|
||||
else
|
||||
{
|
||||
bool with_output = false;
|
||||
std::function<void(const String &, const String &)> set_content_type_and_format = [&with_output](const String &, const String &) -> void
|
||||
{
|
||||
with_output = true;
|
||||
};
|
||||
|
||||
String replacement_query = "select ''";
|
||||
bool should_replace = false;
|
||||
bool with_output = false;
|
||||
|
||||
// Translate query from MySQL to ClickHouse.
|
||||
// This is a temporary workaround until ClickHouse supports the syntax "@@var_name".
|
||||
@ -307,7 +302,13 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
|
||||
ReadBufferFromString replacement(replacement_query);
|
||||
|
||||
Context query_context = connection_context;
|
||||
executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type_and_format, {});
|
||||
|
||||
executeQuery(should_replace ? replacement : payload, *out, true, query_context,
|
||||
[&with_output](const String &, const String &, const String &, const String &)
|
||||
{
|
||||
with_output = true;
|
||||
}
|
||||
);
|
||||
|
||||
if (!with_output)
|
||||
packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);
|
||||
|
@ -60,6 +60,7 @@
|
||||
#include "TCPHandlerFactory.h"
|
||||
#include "Common/config_version.h"
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
#include "MySQLHandlerFactory.h"
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
@ -219,6 +220,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::get());
|
||||
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
|
||||
|
||||
if (ThreadFuzzer::instance().isEffective())
|
||||
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
|
||||
|
||||
/** Context contains all that query execution is dependent:
|
||||
* settings, available functions, data types, aggregate functions, databases...
|
||||
*/
|
||||
@ -466,6 +470,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
if (config->has("max_partition_size_to_drop"))
|
||||
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));
|
||||
|
||||
global_context->updateStorageConfiguration(*config);
|
||||
},
|
||||
/* already_loaded = */ true);
|
||||
|
||||
|
@ -503,7 +503,7 @@ void TCPHandler::processOrdinaryQuery()
|
||||
|
||||
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
|
||||
{
|
||||
/// Some time passed and there is a progress.
|
||||
/// Some time passed.
|
||||
after_send_progress.restart();
|
||||
sendProgress();
|
||||
}
|
||||
@ -539,6 +539,8 @@ void TCPHandler::processOrdinaryQuery()
|
||||
}
|
||||
|
||||
state.io.onFinish();
|
||||
|
||||
sendProgress();
|
||||
}
|
||||
|
||||
|
||||
@ -546,8 +548,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
|
||||
{
|
||||
auto & pipeline = state.io.pipeline;
|
||||
|
||||
if (pipeline.getMaxThreads())
|
||||
num_threads = std::min(num_threads, pipeline.getMaxThreads());
|
||||
/// Reduce the number of threads to recommended value.
|
||||
num_threads = std::min(num_threads, pipeline.getNumThreads());
|
||||
|
||||
/// Send header-block, to allow client to prepare output format for data to send.
|
||||
{
|
||||
@ -658,6 +660,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
|
||||
}
|
||||
|
||||
state.io.onFinish();
|
||||
|
||||
sendProgress();
|
||||
}
|
||||
|
||||
|
||||
|
3
dbms/programs/server/config.d/listen.xml.disabled
Normal file
3
dbms/programs/server/config.d/listen.xml.disabled
Normal file
@ -0,0 +1,3 @@
|
||||
<yandex>
|
||||
<listen_host>::</listen_host>
|
||||
</yandex>
|
9
dbms/programs/server/config.d/tls.xml.disabled
Normal file
9
dbms/programs/server/config.d/tls.xml.disabled
Normal file
@ -0,0 +1,9 @@
|
||||
<yandex>
|
||||
<https_port>8443</https_port>
|
||||
<tcp_port_secure>9440</tcp_port_secure>
|
||||
<openSSL>
|
||||
<server>
|
||||
<dhParamsFile remove="remove"/>
|
||||
</server>
|
||||
</openSSL>
|
||||
</yandex>
|
@ -3,25 +3,6 @@
|
||||
NOTE: User and query level settings are set up in "users.xml" file.
|
||||
-->
|
||||
<yandex>
|
||||
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
|
||||
If this section is not present in configuration, all hosts are allowed.
|
||||
-->
|
||||
<remote_url_allow_hosts>
|
||||
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
|
||||
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
|
||||
If port is explicitly specified in URL, the host:port is checked as a whole.
|
||||
If host specified here without port, any port with this host allowed.
|
||||
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
|
||||
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
|
||||
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
|
||||
-->
|
||||
|
||||
<!-- Regular expression can be specified. RE2 engine is used for regexps.
|
||||
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
|
||||
(forgetting to do so is a common source of error).
|
||||
-->
|
||||
</remote_url_allow_hosts>
|
||||
|
||||
<logger>
|
||||
<!-- Possible levels: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105 -->
|
||||
<level>trace</level>
|
||||
@ -250,6 +231,24 @@
|
||||
</test_unavailable_shard>
|
||||
</remote_servers>
|
||||
|
||||
<!-- The list of hosts allowed to use in URL-related storage engines and table functions.
|
||||
If this section is not present in configuration, all hosts are allowed.
|
||||
-->
|
||||
<remote_url_allow_hosts>
|
||||
<!-- Host should be specified exactly as in URL. The name is checked before DNS resolution.
|
||||
Example: "yandex.ru", "yandex.ru." and "www.yandex.ru" are different hosts.
|
||||
If port is explicitly specified in URL, the host:port is checked as a whole.
|
||||
If host specified here without port, any port with this host allowed.
|
||||
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
|
||||
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
|
||||
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
|
||||
-->
|
||||
|
||||
<!-- Regular expression can be specified. RE2 engine is used for regexps.
|
||||
Regexps are not aligned: don't forget to add ^ and $. Also don't forget to escape dot (.) metacharacter
|
||||
(forgetting to do so is a common source of error).
|
||||
-->
|
||||
</remote_url_allow_hosts>
|
||||
|
||||
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.
|
||||
By default, path to file with substitutions is /etc/metrika.xml. It could be changed in config in 'include_from' element.
|
||||
|
@ -72,7 +72,6 @@ bool IAccessStorage::exists(const UUID & id) const
|
||||
}
|
||||
|
||||
|
||||
|
||||
AccessEntityPtr IAccessStorage::tryReadBase(const UUID & id) const
|
||||
{
|
||||
try
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include <chrono>
|
||||
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/** Quota for resources consumption for specific interval.
|
||||
|
@ -86,7 +86,6 @@ struct MovingAvgData
|
||||
};
|
||||
|
||||
|
||||
|
||||
template <typename T, typename Tlimit_num_elems, typename Data>
|
||||
class MovingImpl final
|
||||
: public IAggregateFunctionDataHelper<Data, MovingImpl<T, Tlimit_num_elems, Data>>
|
||||
|
@ -158,6 +158,21 @@ void ColumnAggregateFunction::ensureOwnership()
|
||||
}
|
||||
|
||||
|
||||
bool ColumnAggregateFunction::structureEquals(const IColumn & to) const
|
||||
{
|
||||
const auto * to_concrete = typeid_cast<const ColumnAggregateFunction *>(&to);
|
||||
if (!to_concrete)
|
||||
return false;
|
||||
|
||||
/// AggregateFunctions must be the same.
|
||||
|
||||
const IAggregateFunction & func_this = *func;
|
||||
const IAggregateFunction & func_to = *to_concrete->func;
|
||||
|
||||
return typeid(func_this) == typeid(func_to);
|
||||
}
|
||||
|
||||
|
||||
void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start, size_t length)
|
||||
{
|
||||
const ColumnAggregateFunction & from_concrete = assert_cast<const ColumnAggregateFunction &>(from);
|
||||
|
@ -204,6 +204,8 @@ public:
|
||||
}
|
||||
|
||||
void getExtremes(Field & min, Field & max) const override;
|
||||
|
||||
bool structureEquals(const IColumn &) const override;
|
||||
};
|
||||
|
||||
|
||||
|
@ -287,5 +287,4 @@ private:
|
||||
};
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ ColumnPtr ColumnVector<T>::index(const IColumn & indexes, size_t limit) const
|
||||
template <typename T>
|
||||
ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
|
||||
{
|
||||
size_t size = data.size();
|
||||
const size_t size = data.size();
|
||||
if (size != offsets.size())
|
||||
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
@ -352,7 +352,7 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
const auto span_end = res->getData().begin() + offsets[i];
|
||||
for (; it < span_end; ++it)
|
||||
for (; it != span_end; ++it)
|
||||
*it = data[i];
|
||||
}
|
||||
|
||||
|
@ -488,6 +488,7 @@ namespace ErrorCodes
|
||||
extern const int ACCESS_STORAGE_FOR_INSERTION_NOT_FOUND = 514;
|
||||
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION = 515;
|
||||
extern const int AUTHENTICATION_FAILED = 516;
|
||||
extern const int CANNOT_ASSIGN_ALTER = 517;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <map>
|
||||
|
||||
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
namespace Util
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Common/formatReadable.h>
|
||||
#include <common/likely.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <ext/singleton.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cmath>
|
||||
@ -71,14 +70,13 @@ static void logMemoryUsage(Int64 amount)
|
||||
}
|
||||
|
||||
|
||||
|
||||
void MemoryTracker::alloc(Int64 size)
|
||||
{
|
||||
if (blocker.isCancelled())
|
||||
return;
|
||||
|
||||
/** Using memory_order_relaxed means that if allocations are done simultaneously,
|
||||
* we allow exception about memory limit exceeded to be thrown only on next allocation.
|
||||
* we allow exception about memory limit exceeded to be thrown only on next allocation.
|
||||
* So, we allow over-allocations.
|
||||
*/
|
||||
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
|
||||
@ -112,8 +110,8 @@ void MemoryTracker::alloc(Int64 size)
|
||||
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
|
||||
{
|
||||
auto no_track = blocker.cancel();
|
||||
ext::Singleton<DB::TraceCollector>()->collect(size);
|
||||
setOrRaiseProfilerLimit(current_profiler_limit + Int64(std::ceil((will_be - current_profiler_limit) / profiler_step)) * profiler_step);
|
||||
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
|
||||
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
|
||||
}
|
||||
|
||||
if (unlikely(current_hard_limit && will_be > current_hard_limit))
|
||||
@ -212,7 +210,6 @@ void MemoryTracker::setOrRaiseHardLimit(Int64 value)
|
||||
|
||||
void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
|
||||
{
|
||||
/// This is just atomic set to maximum.
|
||||
Int64 old_value = profiler_limit.load(std::memory_order_relaxed);
|
||||
while (old_value < value && !profiler_limit.compare_exchange_weak(old_value, value))
|
||||
;
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <memory>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <boost/iterator_adaptors.hpp>
|
||||
|
||||
#include <common/likely.h>
|
||||
#include <common/strong_typedef.h>
|
||||
@ -275,18 +274,11 @@ protected:
|
||||
public:
|
||||
using value_type = T;
|
||||
|
||||
/// You can not just use `typedef`, because there is ambiguity for the constructors and `assign` functions.
|
||||
struct iterator : public boost::iterator_adaptor<iterator, T*>
|
||||
{
|
||||
iterator() {}
|
||||
iterator(T * ptr_) : iterator::iterator_adaptor_(ptr_) {}
|
||||
};
|
||||
/// We cannot use boost::iterator_adaptor, because it defeats loop vectorization,
|
||||
/// see https://github.com/ClickHouse/ClickHouse/pull/9442
|
||||
|
||||
struct const_iterator : public boost::iterator_adaptor<const_iterator, const T*>
|
||||
{
|
||||
const_iterator() {}
|
||||
const_iterator(const T * ptr_) : const_iterator::iterator_adaptor_(ptr_) {}
|
||||
};
|
||||
using iterator = T *;
|
||||
using const_iterator = const T *;
|
||||
|
||||
|
||||
PODArray() {}
|
||||
|
@ -9,24 +9,48 @@
|
||||
#include <common/config_common.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <common/phdr_cache.h>
|
||||
#include <ext/singleton.h>
|
||||
|
||||
#include <random>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event QueryProfilerSignalOverruns;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
thread_local size_t write_trace_iteration = 0;
|
||||
|
||||
void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context)
|
||||
{
|
||||
auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
|
||||
|
||||
int overrun_count = 0;
|
||||
#if defined(OS_LINUX)
|
||||
if (info)
|
||||
overrun_count = info->si_overrun;
|
||||
{
|
||||
int overrun_count = info->si_overrun;
|
||||
|
||||
/// Quickly drop if signal handler is called too frequently.
|
||||
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
|
||||
++write_trace_iteration;
|
||||
if (overrun_count)
|
||||
{
|
||||
/// But pass with some frequency to avoid drop of all traces.
|
||||
if (write_trace_iteration % overrun_count == 0)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
UNUSED(info);
|
||||
#endif
|
||||
@ -34,12 +58,12 @@ namespace
|
||||
const auto signal_context = *reinterpret_cast<ucontext_t *>(context);
|
||||
const StackTrace stack_trace(signal_context);
|
||||
|
||||
ext::Singleton<TraceCollector>()->collect(trace_type, stack_trace, overrun_count);
|
||||
TraceCollector::collect(trace_type, stack_trace, 0);
|
||||
|
||||
errno = saved_errno;
|
||||
}
|
||||
|
||||
[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;
|
||||
[[maybe_unused]] constexpr UInt32 TIMER_PRECISION = 1e9;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -81,7 +105,7 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
|
||||
|
||||
try
|
||||
{
|
||||
struct sigevent sev;
|
||||
struct sigevent sev {};
|
||||
sev.sigev_notify = SIGEV_THREAD_ID;
|
||||
sev.sigev_signo = pause_signal;
|
||||
|
||||
@ -156,7 +180,7 @@ QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period
|
||||
|
||||
void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
|
||||
{
|
||||
writeTraceInfo(TraceType::REAL_TIME, sig, info, context);
|
||||
writeTraceInfo(TraceType::Real, sig, info, context);
|
||||
}
|
||||
|
||||
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
|
||||
@ -165,7 +189,7 @@ QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
|
||||
|
||||
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
|
||||
{
|
||||
writeTraceInfo(TraceType::CPU_TIME, sig, info, context);
|
||||
writeTraceInfo(TraceType::CPU, sig, info, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -109,7 +109,6 @@ struct RadixSortIdentityTransform
|
||||
};
|
||||
|
||||
|
||||
|
||||
template <typename TElement>
|
||||
struct RadixSortUIntTraits
|
||||
{
|
||||
|
133
dbms/src/Common/ThreadFuzzer.cpp
Normal file
133
dbms/src/Common/ThreadFuzzer.cpp
Normal file
@ -0,0 +1,133 @@
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#if OS_LINUX
|
||||
#include <sys/sysinfo.h>
|
||||
#endif
|
||||
#include <sched.h>
|
||||
|
||||
#include <random>
|
||||
|
||||
#include <common/sleep.h>
|
||||
#include <common/getThreadId.h>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_MANIPULATE_SIGSET;
|
||||
extern const int CANNOT_SET_SIGNAL_HANDLER;
|
||||
extern const int CANNOT_CREATE_TIMER;
|
||||
}
|
||||
|
||||
|
||||
ThreadFuzzer::ThreadFuzzer()
|
||||
{
|
||||
initConfiguration();
|
||||
if (!isEffective())
|
||||
return;
|
||||
setup();
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
static void initFromEnv(T & what, const char * name)
|
||||
{
|
||||
const char * env = getenv(name);
|
||||
if (!env)
|
||||
return;
|
||||
what = parse<T>(env);
|
||||
}
|
||||
|
||||
void ThreadFuzzer::initConfiguration()
|
||||
{
|
||||
#if OS_LINUX
|
||||
num_cpus = get_nprocs();
|
||||
#else
|
||||
(void)num_cpus;
|
||||
#endif
|
||||
|
||||
initFromEnv(cpu_time_period_us, "THREAD_FUZZER_CPU_TIME_PERIOD_US");
|
||||
if (!cpu_time_period_us)
|
||||
return;
|
||||
initFromEnv(yield_probability, "THREAD_FUZZER_YIELD_PROBABILITY");
|
||||
initFromEnv(migrate_probability, "THREAD_FUZZER_MIGRATE_PROBABILITY");
|
||||
initFromEnv(sleep_probability, "THREAD_FUZZER_SLEEP_PROBABILITY");
|
||||
initFromEnv(chaos_sleep_time_us, "THREAD_FUZZER_SLEEP_TIME_US");
|
||||
}
|
||||
|
||||
void ThreadFuzzer::signalHandler(int)
|
||||
{
|
||||
auto saved_errno = errno;
|
||||
|
||||
auto & fuzzer = ThreadFuzzer::instance();
|
||||
|
||||
if (fuzzer.yield_probability > 0
|
||||
&& std::bernoulli_distribution(fuzzer.yield_probability)(thread_local_rng))
|
||||
{
|
||||
sched_yield();
|
||||
}
|
||||
|
||||
#if OS_LINUX
|
||||
if (fuzzer.num_cpus > 0
|
||||
&& fuzzer.migrate_probability > 0
|
||||
&& std::bernoulli_distribution(fuzzer.migrate_probability)(thread_local_rng))
|
||||
{
|
||||
int migrate_to = std::uniform_int_distribution<>(0, fuzzer.num_cpus - 1)(thread_local_rng);
|
||||
|
||||
cpu_set_t set;
|
||||
CPU_ZERO(&set);
|
||||
CPU_SET(migrate_to, &set);
|
||||
|
||||
(void)sched_setaffinity(0, sizeof(set), &set);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (fuzzer.sleep_probability > 0
|
||||
&& fuzzer.chaos_sleep_time_us > 0
|
||||
&& std::bernoulli_distribution(fuzzer.sleep_probability)(thread_local_rng))
|
||||
{
|
||||
sleepForNanoseconds(fuzzer.chaos_sleep_time_us * 1000);
|
||||
}
|
||||
|
||||
errno = saved_errno;
|
||||
}
|
||||
|
||||
void ThreadFuzzer::setup()
|
||||
{
|
||||
struct sigaction sa{};
|
||||
sa.sa_handler = signalHandler;
|
||||
sa.sa_flags = SA_RESTART;
|
||||
|
||||
if (sigemptyset(&sa.sa_mask))
|
||||
throwFromErrno("Failed to clean signal mask for thread fuzzer", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
|
||||
|
||||
if (sigaddset(&sa.sa_mask, SIGPROF))
|
||||
throwFromErrno("Failed to add signal to mask for thread fuzzer", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
|
||||
|
||||
if (sigaction(SIGPROF, &sa, nullptr))
|
||||
throwFromErrno("Failed to setup signal handler for thread fuzzer", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
|
||||
|
||||
static constexpr UInt32 TIMER_PRECISION = 1000000;
|
||||
|
||||
struct timeval interval;
|
||||
interval.tv_sec = cpu_time_period_us / TIMER_PRECISION;
|
||||
interval.tv_usec = cpu_time_period_us % TIMER_PRECISION;
|
||||
|
||||
struct itimerval timer = {.it_interval = interval, .it_value = interval};
|
||||
|
||||
if (0 != setitimer(ITIMER_PROF, &timer, nullptr))
|
||||
throwFromErrno("Failed to create profiling timer", ErrorCodes::CANNOT_CREATE_TIMER);
|
||||
}
|
||||
|
||||
|
||||
}
|
74
dbms/src/Common/ThreadFuzzer.h
Normal file
74
dbms/src/Common/ThreadFuzzer.h
Normal file
@ -0,0 +1,74 @@
|
||||
#include <cstdint>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Allows to randomize thread scheduling and insert various glitches across whole program for testing purposes.
|
||||
* It is done by setting up a timer that will send PROF signal to every thread when certain amount of CPU time has passed.
|
||||
*
|
||||
* To initialize ThreadFuzzer, call ThreadFuzzer::instance().
|
||||
* The behaviour is controlled by environment variables:
|
||||
*
|
||||
* THREAD_FUZZER_CPU_TIME_PERIOD_US - period of signals in microseconds.
|
||||
* THREAD_FUZZER_YIELD_PROBABILITY - probability to do 'sched_yield'.
|
||||
* THREAD_FUZZER_MIGRATE_PROBABILITY - probability to set CPU affinity to random CPU core.
|
||||
* THREAD_FUZZER_SLEEP_PROBABILITY - probability to sleep.
|
||||
* THREAD_FUZZER_SLEEP_TIME_US - amount of time to sleep in microseconds.
|
||||
*
|
||||
* ThreadFuzzer will do nothing if environment variables are not set accordingly.
|
||||
*
|
||||
* The intention is to reproduce thread synchronization bugs (race conditions and deadlocks) more frequently in tests.
|
||||
* We already have tests with TSan. But TSan only covers "physical" synchronization bugs, but not "logical" ones,
|
||||
* where all data is protected by synchronization primitives, but we still have race conditions.
|
||||
* Obviously, TSan cannot debug distributed synchronization bugs.
|
||||
*
|
||||
* The motivation for this tool is an evidence, that concurrency bugs are more likely to reproduce
|
||||
* on bad unstable virtual machines in a dirty environments.
|
||||
*
|
||||
* The idea is not new, see also:
|
||||
* https://channel9.msdn.com/blogs/peli/concurrency-fuzzing-with-cuzz
|
||||
*
|
||||
* Notes:
|
||||
* - it can be also implemented with instrumentation (example: LLVM Xray) instead of signals.
|
||||
* - it's also reasonable to insert glitches around interesting functions (example: mutex lock/unlock, starting of threads, etc.),
|
||||
* it is doable with wrapping these functions (todo).
|
||||
* - we should also make the sleep time random.
|
||||
* - sleep obviously helps, but the effect of yield and migration is unclear.
|
||||
*/
|
||||
class ThreadFuzzer
|
||||
{
|
||||
public:
|
||||
static ThreadFuzzer & instance()
|
||||
{
|
||||
static ThreadFuzzer res;
|
||||
return res;
|
||||
}
|
||||
|
||||
bool isEffective() const
|
||||
{
|
||||
return cpu_time_period_us != 0
|
||||
&& (yield_probability > 0
|
||||
|| migrate_probability > 0
|
||||
|| (sleep_probability > 0 && chaos_sleep_time_us > 0));
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t cpu_time_period_us = 0;
|
||||
double yield_probability = 0;
|
||||
double migrate_probability = 0;
|
||||
double sleep_probability = 0;
|
||||
double chaos_sleep_time_us = 0;
|
||||
|
||||
int num_cpus = 0;
|
||||
|
||||
|
||||
ThreadFuzzer();
|
||||
|
||||
void initConfiguration();
|
||||
void setup();
|
||||
|
||||
static void signalHandler(int);
|
||||
};
|
||||
|
||||
}
|
@ -17,11 +17,6 @@
|
||||
#include <fcntl.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event QueryProfilerSignalOverruns;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -30,15 +25,13 @@ namespace
|
||||
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
|
||||
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
|
||||
constexpr size_t QUERY_ID_MAX_LEN = 1024;
|
||||
|
||||
thread_local size_t write_trace_iteration = 0;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
}
|
||||
LazyPipeFDs pipe;
|
||||
|
||||
TraceCollector::TraceCollector()
|
||||
|
||||
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> trace_log_)
|
||||
: trace_log(std::move(trace_log_))
|
||||
{
|
||||
pipe.open();
|
||||
|
||||
@ -51,38 +44,20 @@ TraceCollector::TraceCollector()
|
||||
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
|
||||
}
|
||||
|
||||
|
||||
TraceCollector::~TraceCollector()
|
||||
{
|
||||
if (!thread.joinable())
|
||||
LOG_ERROR(&Poco::Logger::get("TraceCollector"), "TraceCollector thread is malformed and cannot be joined");
|
||||
else
|
||||
{
|
||||
stop();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
pipe.close();
|
||||
}
|
||||
|
||||
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count)
|
||||
{
|
||||
/// Quickly drop if signal handler is called too frequently.
|
||||
/// Otherwise we may end up infinitelly processing signals instead of doing any useful work.
|
||||
++write_trace_iteration;
|
||||
if (overrun_count)
|
||||
{
|
||||
/// But pass with some frequency to avoid drop of all traces.
|
||||
if (write_trace_iteration % overrun_count == 0)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count);
|
||||
}
|
||||
else
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, UInt64 size)
|
||||
{
|
||||
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
|
||||
8 * sizeof(char) + // maximum VarUInt length for string size
|
||||
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
|
||||
@ -99,7 +74,7 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
|
||||
|
||||
auto thread_id = CurrentThread::get().thread_id;
|
||||
|
||||
writeChar(false, out);
|
||||
writeChar(false, out); /// true if requested to stop the collecting thread.
|
||||
writeStringBinary(query_id, out);
|
||||
|
||||
size_t stack_trace_size = stack_trace.getSize();
|
||||
@ -110,64 +85,27 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
|
||||
|
||||
writePODBinary(trace_type, out);
|
||||
writePODBinary(thread_id, out);
|
||||
writePODBinary(UInt64(0), out);
|
||||
|
||||
out.next();
|
||||
}
|
||||
|
||||
void TraceCollector::collect(UInt64 size)
|
||||
{
|
||||
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
|
||||
8 * sizeof(char) + // maximum VarUInt length for string size
|
||||
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
|
||||
sizeof(UInt8) + // number of stack frames
|
||||
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
|
||||
sizeof(TraceType) + // trace type
|
||||
sizeof(UInt64) + // thread_id
|
||||
sizeof(UInt64); // size
|
||||
char buffer[buf_size];
|
||||
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
|
||||
|
||||
StringRef query_id = CurrentThread::getQueryId();
|
||||
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
|
||||
|
||||
auto thread_id = CurrentThread::get().thread_id;
|
||||
|
||||
writeChar(false, out);
|
||||
writeStringBinary(query_id, out);
|
||||
|
||||
const auto & stack_trace = StackTrace();
|
||||
|
||||
size_t stack_trace_size = stack_trace.getSize();
|
||||
size_t stack_trace_offset = stack_trace.getOffset();
|
||||
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
|
||||
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
|
||||
writePODBinary(stack_trace.getFrames()[i], out);
|
||||
|
||||
writePODBinary(TraceType::MEMORY, out);
|
||||
writePODBinary(thread_id, out);
|
||||
writePODBinary(size, out);
|
||||
|
||||
out.next();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends TraceCollector stop message
|
||||
|
||||
/** Sends TraceCollector stop message
|
||||
*
|
||||
* Each sequence of data for TraceCollector thread starts with a boolean flag.
|
||||
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
|
||||
* This function sends flag with a true value to stop TraceCollector gracefully.
|
||||
*
|
||||
* NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe
|
||||
* before stop message.
|
||||
*/
|
||||
void TraceCollector::stop()
|
||||
{
|
||||
WriteBufferFromFileDescriptor out(pipe.fds_rw[1]);
|
||||
writeChar(true, out);
|
||||
out.next();
|
||||
thread.join();
|
||||
}
|
||||
|
||||
|
||||
void TraceCollector::run()
|
||||
{
|
||||
ReadBufferFromFileDescriptor in(pipe.fds_rw[0]);
|
||||
|
@ -15,28 +15,27 @@ namespace DB
|
||||
|
||||
class TraceLog;
|
||||
|
||||
enum class TraceType : UInt8
|
||||
enum class TraceType : uint8_t
|
||||
{
|
||||
REAL_TIME,
|
||||
CPU_TIME,
|
||||
MEMORY,
|
||||
Real,
|
||||
CPU,
|
||||
Memory,
|
||||
};
|
||||
|
||||
class TraceCollector
|
||||
{
|
||||
public:
|
||||
TraceCollector();
|
||||
TraceCollector(std::shared_ptr<TraceLog> trace_log_);
|
||||
~TraceCollector();
|
||||
|
||||
void setTraceLog(const std::shared_ptr<TraceLog> & trace_log_) { trace_log = trace_log_; }
|
||||
|
||||
void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0);
|
||||
void collect(UInt64 size);
|
||||
/// Collect a stack trace. This method is signal safe.
|
||||
/// Precondition: the TraceCollector object must be created.
|
||||
/// size - for memory tracing is the amount of memory allocated; for other trace types it is 0.
|
||||
static void collect(TraceType trace_type, const StackTrace & stack_trace, UInt64 size);
|
||||
|
||||
private:
|
||||
std::shared_ptr<TraceLog> trace_log;
|
||||
ThreadFromGlobalPool thread;
|
||||
LazyPipeFDs pipe;
|
||||
|
||||
void run();
|
||||
void stop();
|
||||
|
@ -109,7 +109,6 @@ struct UInt128TrivialHash
|
||||
};
|
||||
|
||||
|
||||
|
||||
/** Used for aggregation, for putting a large number of constant-length keys in a hash table.
|
||||
*/
|
||||
struct UInt256
|
||||
|
@ -88,7 +88,6 @@ using namespace DB;
|
||||
struct ZooKeeperRequest;
|
||||
|
||||
|
||||
|
||||
/** Usage scenario: look at the documentation for IKeeper class.
|
||||
*/
|
||||
class ZooKeeper : public IKeeper
|
||||
|
@ -200,4 +200,3 @@ TEST(zkutil, multi_create_sequential)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -10,7 +10,6 @@
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
unsigned getNumberOfPhysicalCPUCores()
|
||||
{
|
||||
#if USE_CPUID
|
||||
|
@ -50,7 +50,6 @@ static bool parseNumber(const String & description, size_t l, size_t r, size_t &
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* Parse a string that generates shards and replicas. Separator - one of two characters | or ,
|
||||
* depending on whether shards or replicas are generated.
|
||||
* For example:
|
||||
|
@ -75,3 +75,6 @@ target_link_libraries (stopwatch PRIVATE clickhouse_common_io)
|
||||
|
||||
add_executable (symbol_index symbol_index.cpp)
|
||||
target_link_libraries (symbol_index PRIVATE clickhouse_common_io)
|
||||
|
||||
add_executable (chaos_sanitizer chaos_sanitizer.cpp)
|
||||
target_link_libraries (chaos_sanitizer PRIVATE clickhouse_common_io)
|
||||
|
56
dbms/src/Common/tests/chaos_sanitizer.cpp
Normal file
56
dbms/src/Common/tests/chaos_sanitizer.cpp
Normal file
@ -0,0 +1,56 @@
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
||||
#include <common/sleep.h>
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadFuzzer.h>
|
||||
|
||||
|
||||
/** Prooves that ThreadFuzzer helps to find concurrency bugs.
|
||||
*
|
||||
* for i in {1..10}; do ./chaos_sanitizer 1000000; done
|
||||
* for i in {1..10}; do THREAD_FUZZER_CPU_TIME_PERIOD_US=1000 THREAD_FUZZER_SLEEP_PROBABILITY=0.1 THREAD_FUZZER_SLEEP_TIME_US=100000 ./chaos_sanitizer 1000000; done
|
||||
*/
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
const size_t num_iterations = argc >= 2 ? DB::parse<size_t>(argv[1]) : 1000000000;
|
||||
|
||||
std::cerr << (DB::ThreadFuzzer::instance().isEffective() ? "ThreadFuzzer is enabled.\n" : "ThreadFuzzer is not enabled.\n");
|
||||
|
||||
volatile size_t counter1 = 0;
|
||||
volatile size_t counter2 = 0;
|
||||
|
||||
/// These threads are synchronized by sleep (that's intentionally incorrect).
|
||||
|
||||
std::thread t1([&]
|
||||
{
|
||||
for (size_t i = 0; i < num_iterations; ++i)
|
||||
++counter1;
|
||||
|
||||
sleepForNanoseconds(100000000);
|
||||
|
||||
for (size_t i = 0; i < num_iterations; ++i)
|
||||
++counter2;
|
||||
});
|
||||
|
||||
std::thread t2([&]
|
||||
{
|
||||
for (size_t i = 0; i < num_iterations; ++i)
|
||||
++counter2;
|
||||
|
||||
sleepForNanoseconds(100000000);
|
||||
|
||||
for (size_t i = 0; i < num_iterations; ++i)
|
||||
++counter1;
|
||||
});
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
std::cerr << "Result: " << counter1 << ", " << counter2 << "\n";
|
||||
|
||||
return 0;
|
||||
}
|
@ -7,7 +7,6 @@
|
||||
#include <Common/HashTable/HashSet.h>
|
||||
|
||||
|
||||
|
||||
int main(int, char **)
|
||||
{
|
||||
{
|
||||
|
@ -22,7 +22,6 @@ using Key = UInt64;
|
||||
using Value = UInt64;
|
||||
|
||||
|
||||
|
||||
/// Various hash functions to test
|
||||
|
||||
namespace Hashes
|
||||
@ -336,7 +335,6 @@ static void NO_INLINE testForEachMapAndHash(const Key * data, size_t size)
|
||||
}
|
||||
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
if (argc < 2)
|
||||
|
@ -244,7 +244,6 @@ void aggregate5(Map & local_map, MapSmallLocks & global_map, Source::const_itera
|
||||
}*/
|
||||
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
size_t n = atoi(argv[1]);
|
||||
|
@ -283,7 +283,6 @@ struct Merger
|
||||
};
|
||||
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
size_t n = atoi(argv[1]);
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <Common/HashTable/SmallTable.h>
|
||||
|
||||
|
||||
|
||||
int main(int, char **)
|
||||
{
|
||||
{
|
||||
|
@ -358,7 +358,6 @@ void Block::setColumns(const Columns & columns)
|
||||
}
|
||||
|
||||
|
||||
|
||||
Block Block::cloneWithColumns(MutableColumns && columns) const
|
||||
{
|
||||
Block res;
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -74,4 +74,13 @@ const BlockMissingValues::RowsBitMask & BlockMissingValues::getDefaultsBitmask(s
|
||||
return none;
|
||||
}
|
||||
|
||||
bool BlockMissingValues::hasDefaultBits(size_t column_idx) const
|
||||
{
|
||||
auto it = rows_mask_by_column_id.find(column_idx);
|
||||
if (it == rows_mask_by_column_id.end())
|
||||
return false;
|
||||
|
||||
const auto & col_mask = it->second;
|
||||
return std::find(col_mask.begin(), col_mask.end(), true) != col_mask.end();
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,10 @@ class BlockMissingValues
|
||||
public:
|
||||
using RowsBitMask = std::vector<bool>; /// a bit per row for a column
|
||||
|
||||
/// Get mask for column, column_idx is index inside corresponding block
|
||||
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
|
||||
/// Check that we have to replace default value at least in one of columns
|
||||
bool hasDefaultBits(size_t column_idx) const;
|
||||
void setBit(size_t column_idx, size_t row_idx);
|
||||
bool empty() const { return rows_mask_by_column_id.empty(); }
|
||||
size_t size() const { return rows_mask_by_column_id.size(); }
|
||||
|
@ -165,4 +165,13 @@ bool NamesAndTypesList::contains(const String & name) const
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<NameAndTypePair> NamesAndTypesList::tryGetByName(const std::string & name) const
|
||||
{
|
||||
for (const NameAndTypePair & column : *this)
|
||||
{
|
||||
if (column.name == name)
|
||||
return column;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
@ -73,7 +73,11 @@ public:
|
||||
/// Unlike `filter`, returns columns in the order in which they go in `names`.
|
||||
NamesAndTypesList addTypes(const Names & names) const;
|
||||
|
||||
/// Check that column contains in list
|
||||
bool contains(const String & name) const;
|
||||
|
||||
/// Try to get column by name, return empty optional if column not found
|
||||
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.", 0) \
|
||||
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
|
||||
M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
|
||||
M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. By default, it is determined automatically.", 0) \
|
||||
M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
|
||||
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
|
||||
M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \
|
||||
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
|
||||
@ -333,7 +333,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
|
||||
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
|
||||
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
|
||||
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will dump the allocating stacktrace. Zero means disabled memory profiler.", 0) \
|
||||
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will collect the allocating stack trace. The minimal effective step is 4 MiB (less values will work as clamped to 4 MiB). Zero means disabled memory profiler.", 0) \
|
||||
\
|
||||
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
|
||||
M(SettingUInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \
|
||||
|
@ -391,7 +391,6 @@ void SettingEnum<EnumType, Tag>::set(const Field & x)
|
||||
}
|
||||
|
||||
|
||||
|
||||
String SettingURI::toString() const
|
||||
{
|
||||
return value.toString();
|
||||
|
@ -15,7 +15,6 @@ struct TypePair
|
||||
};
|
||||
|
||||
|
||||
|
||||
template <typename T, bool _int, bool _float, bool _decimal, bool _datetime, typename F>
|
||||
bool callOnBasicType(TypeIndex number, F && f)
|
||||
{
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/evaluateMissingDefaults.h>
|
||||
#include <Interpreters/inplaceBlockConversions.h>
|
||||
#include <DataStreams/AddingDefaultsBlockInputStream.h>
|
||||
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
@ -56,11 +56,20 @@ Block AddingDefaultsBlockInputStream::readImpl()
|
||||
if (block_missing_values.empty())
|
||||
return res;
|
||||
|
||||
/// res block alredy has all columns values, with default value for type
|
||||
/// (not value specified in table). We identify which columns we need to
|
||||
/// recalculate with help of block_missing_values.
|
||||
Block evaluate_block{res};
|
||||
/// remove columns for recalculation
|
||||
for (const auto & column : column_defaults)
|
||||
{
|
||||
if (evaluate_block.has(column.first))
|
||||
evaluate_block.erase(column.first);
|
||||
{
|
||||
size_t column_idx = res.getPositionByName(column.first);
|
||||
if (block_missing_values.hasDefaultBits(column_idx))
|
||||
evaluate_block.erase(column.first);
|
||||
}
|
||||
}
|
||||
|
||||
if (!evaluate_block.columns())
|
||||
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <DataStreams/TTLBlockInputStream.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <Interpreters/evaluateMissingDefaults.h>
|
||||
#include <Interpreters/inplaceBlockConversions.h>
|
||||
#include <Interpreters/SyntaxAnalyzer.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
|
@ -25,7 +25,6 @@ VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputSt
|
||||
}
|
||||
|
||||
|
||||
|
||||
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
|
||||
{
|
||||
if constexpr (sizeof(RowSourcePart) == 1)
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
* Mostly the same as Int64.
|
||||
* But also tagged with interval kind.
|
||||
*
|
||||
* Intended isage is for temporary elements in expressions,
|
||||
* Intended usage is for temporary elements in expressions,
|
||||
* not for storing values in tables.
|
||||
*/
|
||||
class DataTypeInterval final : public DataTypeNumberBase<Int64>
|
||||
|
@ -257,7 +257,7 @@ template class DataTypeNumberBase<UInt8>;
|
||||
template class DataTypeNumberBase<UInt16>;
|
||||
template class DataTypeNumberBase<UInt32>;
|
||||
template class DataTypeNumberBase<UInt64>;
|
||||
template class DataTypeNumberBase<UInt128>;
|
||||
template class DataTypeNumberBase<UInt128>; // used only in UUID
|
||||
template class DataTypeNumberBase<Int8>;
|
||||
template class DataTypeNumberBase<Int16>;
|
||||
template class DataTypeNumberBase<Int32>;
|
||||
|
@ -66,7 +66,6 @@ DataTypeTuple::DataTypeTuple(const DataTypes & elems_, const Strings & names_)
|
||||
}
|
||||
|
||||
|
||||
|
||||
std::string DataTypeTuple::doGetName() const
|
||||
{
|
||||
size_t size = elems.size();
|
||||
|
@ -42,7 +42,6 @@ std::string DataTypeDecimal<T>::doGetName() const
|
||||
}
|
||||
|
||||
|
||||
|
||||
template <typename T>
|
||||
bool DataTypeDecimal<T>::equals(const IDataType & rhs) const
|
||||
{
|
||||
|
@ -27,7 +27,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
|
||||
DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, const Context & context_)
|
||||
: DatabaseOnDisk(name_, metadata_path_, "DatabaseLazy (" + name_ + ")")
|
||||
, expiration_time(expiration_time_)
|
||||
|
@ -25,7 +25,6 @@
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -19,7 +19,6 @@
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -51,6 +51,7 @@ namespace
|
||||
const ASTCreateQuery & query,
|
||||
DatabaseOrdinary & database,
|
||||
const String & database_name,
|
||||
const String & metadata_path,
|
||||
bool has_force_restore_data_flag)
|
||||
{
|
||||
assert(!query.is_dictionary);
|
||||
@ -64,7 +65,9 @@ namespace
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Cannot attach table '" + backQuote(query.table) + "' from query " + serializeAST(query));
|
||||
e.addMessage("Cannot attach table " + backQuote(database_name) + "." + backQuote(query.table)
|
||||
+ " from metadata file " + metadata_path
|
||||
+ " from query " + serializeAST(query));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -110,7 +113,6 @@ void DatabaseOrdinary::loadStoredObjects(
|
||||
Context & context,
|
||||
bool has_force_restore_data_flag)
|
||||
{
|
||||
|
||||
/** Tables load faster if they are loaded in sorted (by name) order.
|
||||
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
|
||||
* which does not correspond to order tables creation and does not correspond to order of their location on disk.
|
||||
@ -124,7 +126,7 @@ void DatabaseOrdinary::loadStoredObjects(
|
||||
String full_path = getMetadataPath() + file_name;
|
||||
try
|
||||
{
|
||||
auto ast = parseQueryFromMetadata(context, full_path, /*throw_on_error*/ true, /*remove_empty*/false);
|
||||
auto ast = parseQueryFromMetadata(context, full_path, /*throw_on_error*/ true, /*remove_empty*/ false);
|
||||
if (ast)
|
||||
{
|
||||
auto * create_query = ast->as<ASTCreateQuery>();
|
||||
@ -157,7 +159,7 @@ void DatabaseOrdinary::loadStoredObjects(
|
||||
if (!create_query.is_dictionary)
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
tryAttachTable(context, create_query, *this, getDatabaseName(), has_force_restore_data_flag);
|
||||
tryAttachTable(context, create_query, *this, getDatabaseName(), getMetadataPath() + name_with_query.first, has_force_restore_data_flag);
|
||||
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
logAboutProgress(log, ++tables_processed, total_tables, watch);
|
||||
|
@ -46,6 +46,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
|
||||
# include <common/logger_useful.h>
|
||||
# include <Formats/MySQLBlockInputStream.h>
|
||||
# include "readInvalidateQuery.h"
|
||||
# include <mysqlxx/PoolFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -66,11 +67,11 @@ MySQLDictionarySource::MySQLDictionarySource(
|
||||
, update_field{config.getString(config_prefix + ".update_field", "")}
|
||||
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
|
||||
, sample_block{sample_block_}
|
||||
, pool{config, config_prefix}
|
||||
, pool{mysqlxx::PoolFactory::instance().Get(config, config_prefix)}
|
||||
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
|
||||
, load_all_query{query_builder.composeLoadAllQuery()}
|
||||
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
|
||||
, close_connection{config.getBool(config_prefix + ".close_connection", false)}
|
||||
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
|
||||
{
|
||||
}
|
||||
|
||||
@ -114,19 +115,21 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadAll()
|
||||
{
|
||||
last_modification = getLastModification();
|
||||
auto connection = pool.Get();
|
||||
last_modification = getLastModification(connection, false);
|
||||
|
||||
LOG_TRACE(log, load_all_query);
|
||||
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, close_connection);
|
||||
return std::make_shared<MySQLBlockInputStream>(connection, load_all_query, sample_block, max_block_size, close_connection);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
|
||||
{
|
||||
last_modification = getLastModification();
|
||||
auto connection = pool.Get();
|
||||
last_modification = getLastModification(connection, false);
|
||||
|
||||
std::string load_update_query = getUpdateFieldAndDate();
|
||||
LOG_TRACE(log, load_update_query);
|
||||
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, close_connection);
|
||||
return std::make_shared<MySQLBlockInputStream>(connection, load_update_query, sample_block, max_block_size, close_connection);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
@ -158,8 +161,8 @@ bool MySQLDictionarySource::isModified() const
|
||||
|
||||
if (dont_check_update_time)
|
||||
return true;
|
||||
|
||||
return getLastModification() > last_modification;
|
||||
auto connection = pool.Get();
|
||||
return getLastModification(connection, true) > last_modification;
|
||||
}
|
||||
|
||||
bool MySQLDictionarySource::supportsSelectiveLoad() const
|
||||
@ -199,7 +202,7 @@ std::string MySQLDictionarySource::quoteForLike(const std::string s)
|
||||
return out.str();
|
||||
}
|
||||
|
||||
LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const
|
||||
{
|
||||
LocalDateTime modification_time{std::time(nullptr)};
|
||||
|
||||
@ -208,7 +211,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
|
||||
try
|
||||
{
|
||||
auto connection = pool.Get();
|
||||
auto query = connection->query("SHOW TABLE STATUS LIKE " + quoteForLike(table));
|
||||
|
||||
LOG_TRACE(log, query.str());
|
||||
@ -233,6 +235,11 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
++fetched_rows;
|
||||
}
|
||||
|
||||
if (close_connection && allow_connection_closure)
|
||||
{
|
||||
connection.disconnect();
|
||||
}
|
||||
|
||||
if (0 == fetched_rows)
|
||||
LOG_ERROR(log, "Cannot find table in SHOW TABLE STATUS result.");
|
||||
|
||||
@ -243,7 +250,6 @@ LocalDateTime MySQLDictionarySource::getLastModification() const
|
||||
{
|
||||
tryLogCurrentException("MySQLDictionarySource");
|
||||
}
|
||||
|
||||
/// we suppose failure to get modification time is not an error, therefore return current time
|
||||
return modification_time;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ private:
|
||||
|
||||
static std::string quoteForLike(const std::string s);
|
||||
|
||||
LocalDateTime getLastModification() const;
|
||||
LocalDateTime getLastModification(mysqlxx::Pool::Entry & connection, bool allow_connection_closure) const;
|
||||
|
||||
// execute invalidate_query. expects single cell in result
|
||||
std::string doInvalidateQuery(const std::string & request) const;
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Common/quoteString.h>
|
||||
|
||||
#include <set>
|
||||
|
||||
#include <Poco/File.h>
|
||||
|
||||
|
||||
@ -15,6 +16,7 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
|
||||
extern const int UNKNOWN_DISK;
|
||||
extern const int UNKNOWN_POLICY;
|
||||
@ -48,7 +50,68 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
|
||||
}
|
||||
|
||||
|
||||
const DiskPtr & DiskSelector::operator[](const String & name) const
|
||||
DiskSelectorPtr DiskSelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_prefix, keys);
|
||||
|
||||
auto & factory = DiskFactory::instance();
|
||||
|
||||
std::shared_ptr<DiskSelector> result = std::make_shared<DiskSelector>(*this);
|
||||
|
||||
constexpr auto default_disk_name = "default";
|
||||
std::set<String> old_disks_minus_new_disks;
|
||||
for (const auto & [disk_name, _] : result->disks)
|
||||
{
|
||||
old_disks_minus_new_disks.insert(disk_name);
|
||||
}
|
||||
|
||||
for (const auto & disk_name : keys)
|
||||
{
|
||||
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
|
||||
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
|
||||
|
||||
if (result->disks.count(disk_name) == 0)
|
||||
{
|
||||
auto disk_config_prefix = config_prefix + "." + disk_name;
|
||||
result->disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
|
||||
}
|
||||
else
|
||||
{
|
||||
old_disks_minus_new_disks.erase(disk_name);
|
||||
|
||||
/// TODO: Ideally ClickHouse shall complain if disk has changed, but
|
||||
/// implementing that may appear as not trivial task.
|
||||
}
|
||||
}
|
||||
|
||||
old_disks_minus_new_disks.erase(default_disk_name);
|
||||
|
||||
if (!old_disks_minus_new_disks.empty())
|
||||
{
|
||||
WriteBufferFromOwnString warning;
|
||||
if (old_disks_minus_new_disks.size() == 1)
|
||||
writeString("Disk ", warning);
|
||||
else
|
||||
writeString("Disks ", warning);
|
||||
|
||||
int index = 0;
|
||||
for (const String & name : old_disks_minus_new_disks)
|
||||
{
|
||||
if (index++ > 0)
|
||||
writeString(", ", warning);
|
||||
writeBackQuotedString(name, warning);
|
||||
}
|
||||
|
||||
writeString(" disappeared from configuration, this change will be applied after restart of ClickHouse", warning);
|
||||
LOG_WARNING(&Logger::get("DiskSelector"), warning.str());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
DiskPtr DiskSelector::get(const String & name) const
|
||||
{
|
||||
auto it = disks.find(name);
|
||||
if (it == disks.end())
|
||||
@ -61,7 +124,7 @@ Volume::Volume(
|
||||
String name_,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
const DiskSelector & disk_selector)
|
||||
DiskSelectorPtr disk_selector)
|
||||
: name(std::move(name_))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
@ -74,7 +137,7 @@ Volume::Volume(
|
||||
if (startsWith(disk, "disk"))
|
||||
{
|
||||
auto disk_name = config.getString(config_prefix + "." + disk);
|
||||
disks.push_back(disk_selector[disk_name]);
|
||||
disks.push_back(disk_selector->get(disk_name));
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,7 +225,7 @@ StoragePolicy::StoragePolicy(
|
||||
String name_,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
const DiskSelector & disks)
|
||||
DiskSelectorPtr disks)
|
||||
: name(std::move(name_))
|
||||
{
|
||||
String volumes_prefix = config_prefix + ".volumes";
|
||||
@ -330,6 +393,28 @@ ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
|
||||
}
|
||||
|
||||
|
||||
void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const
|
||||
{
|
||||
std::unordered_set<String> new_volume_names;
|
||||
for (const auto & volume : new_storage_policy->getVolumes())
|
||||
new_volume_names.insert(volume->getName());
|
||||
|
||||
for (const auto & volume : getVolumes())
|
||||
{
|
||||
if (new_volume_names.count(volume->getName()) == 0)
|
||||
throw Exception("New storage policy shall contain volumes of old one", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
std::unordered_set<String> new_disk_names;
|
||||
for (const auto & disk : new_storage_policy->getVolumeByName(volume->getName())->disks)
|
||||
new_disk_names.insert(disk->getName());
|
||||
|
||||
for (const auto & disk : volume->disks)
|
||||
if (new_disk_names.count(disk->getName()) == 0)
|
||||
throw Exception("New storage policy shall contain disks of old one", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
|
||||
{
|
||||
for (size_t i = 0; i < volumes.size(); ++i)
|
||||
@ -346,7 +431,7 @@ size_t StoragePolicy::getVolumeIndexByDisk(const DiskPtr & disk_ptr) const
|
||||
StoragePolicySelector::StoragePolicySelector(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
const DiskSelector & disks)
|
||||
DiskSelectorPtr disks)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_prefix, keys);
|
||||
@ -368,18 +453,41 @@ StoragePolicySelector::StoragePolicySelector(
|
||||
/// Add default policy if it's not specified explicetly
|
||||
if (policies.find(default_storage_policy_name) == policies.end())
|
||||
{
|
||||
auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks[default_disk_name]}, 0);
|
||||
auto default_volume = std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);
|
||||
|
||||
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
|
||||
policies.emplace(default_storage_policy_name, default_policy);
|
||||
}
|
||||
}
|
||||
|
||||
const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name) const
|
||||
|
||||
StoragePolicySelectorPtr StoragePolicySelector::updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_prefix, keys);
|
||||
|
||||
std::shared_ptr<StoragePolicySelector> result = std::make_shared<StoragePolicySelector>(config, config_prefix, disks);
|
||||
|
||||
constexpr auto default_storage_policy_name = "default";
|
||||
|
||||
for (const auto & [name, policy] : policies)
|
||||
{
|
||||
if (name != default_storage_policy_name && result->policies.count(name) == 0)
|
||||
throw Exception("Storage policy " + backQuote(name) + " is missing in new configuration", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
policy->checkCompatibleWith(result->policies[name]);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
StoragePolicyPtr StoragePolicySelector::get(const String & name) const
|
||||
{
|
||||
auto it = policies.find(name);
|
||||
if (it == policies.end())
|
||||
throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY);
|
||||
|
||||
return it->second;
|
||||
}
|
||||
|
||||
|
@ -17,15 +17,21 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class DiskSelector;
|
||||
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
|
||||
|
||||
/// Parse .xml configuration and store information about disks
|
||||
/// Mostly used for introspection.
|
||||
class DiskSelector
|
||||
{
|
||||
public:
|
||||
DiskSelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context);
|
||||
DiskSelector(const DiskSelector & from): disks(from.disks) {}
|
||||
|
||||
DiskSelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Context & context) const;
|
||||
|
||||
/// Get disk by name
|
||||
const DiskPtr & operator[](const String & name) const;
|
||||
DiskPtr get(const String & name) const;
|
||||
|
||||
/// Get all disks with names
|
||||
const auto & getDisksMap() const { return disks; }
|
||||
@ -54,7 +60,7 @@ public:
|
||||
String name_,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
const DiskSelector & disk_selector);
|
||||
DiskSelectorPtr disk_selector);
|
||||
|
||||
/// Next disk (round-robin)
|
||||
///
|
||||
@ -87,6 +93,8 @@ private:
|
||||
using VolumePtr = std::shared_ptr<Volume>;
|
||||
using Volumes = std::vector<VolumePtr>;
|
||||
|
||||
class StoragePolicy;
|
||||
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
|
||||
|
||||
/**
|
||||
* Contains all information about volumes configuration for Storage.
|
||||
@ -95,7 +103,7 @@ using Volumes = std::vector<VolumePtr>;
|
||||
class StoragePolicy
|
||||
{
|
||||
public:
|
||||
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
|
||||
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
|
||||
|
||||
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
|
||||
|
||||
@ -146,6 +154,9 @@ public:
|
||||
return getVolume(it->second);
|
||||
}
|
||||
|
||||
/// Checks if storage policy can be replaced by another one.
|
||||
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;
|
||||
|
||||
private:
|
||||
Volumes volumes;
|
||||
const String name;
|
||||
@ -158,17 +169,20 @@ private:
|
||||
};
|
||||
|
||||
|
||||
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
|
||||
class StoragePolicySelector;
|
||||
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
|
||||
|
||||
/// Parse .xml configuration and store information about policies
|
||||
/// Mostly used for introspection.
|
||||
class StoragePolicySelector
|
||||
{
|
||||
public:
|
||||
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DiskSelector & disks);
|
||||
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
|
||||
|
||||
StoragePolicySelectorPtr updateFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks) const;
|
||||
|
||||
/// Policy by name
|
||||
const StoragePolicyPtr & operator[](const String & name) const;
|
||||
StoragePolicyPtr get(const String & name) const;
|
||||
|
||||
/// All policies
|
||||
const std::map<String, StoragePolicyPtr> & getPoliciesMap() const { return policies; }
|
||||
|
@ -553,7 +553,6 @@ protected:
|
||||
};
|
||||
|
||||
|
||||
|
||||
class ProtobufReader::ConverterFromString : public ConverterBaseImpl
|
||||
{
|
||||
public:
|
||||
@ -864,7 +863,6 @@ PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS(google::protobuf::Fi
|
||||
#undef PROTOBUF_READER_CREATE_CONVERTER_SPECIALIZATION_FOR_NUMBERS
|
||||
|
||||
|
||||
|
||||
class ProtobufReader::ConverterFromBool : public ConverterBaseImpl
|
||||
{
|
||||
public:
|
||||
|
@ -70,4 +70,22 @@ struct DivideIntegralImpl
|
||||
#endif
|
||||
};
|
||||
|
||||
template <typename A, typename B>
|
||||
struct ModuloImpl
|
||||
{
|
||||
using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
|
||||
static const constexpr bool allow_fixed_string = false;
|
||||
|
||||
template <typename Result = ResultType>
|
||||
static inline Result apply(A a, B b)
|
||||
{
|
||||
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<A>::Type(a), typename NumberTraits::ToInteger<B>::Type(b));
|
||||
return typename NumberTraits::ToInteger<A>::Type(a) % typename NumberTraits::ToInteger<B>::Type(b);
|
||||
}
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
static constexpr bool compilable = false; /// don't know how to throw from LLVM IR
|
||||
#endif
|
||||
};
|
||||
|
||||
}
|
@ -22,7 +22,7 @@
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include "IFunctionImpl.h"
|
||||
#include "FunctionHelpers.h"
|
||||
#include "intDiv.h"
|
||||
#include "DivisionUtils.h"
|
||||
#include "castTypeToEither.h"
|
||||
#include "FunctionFactory.h"
|
||||
#include <Common/typeid_cast.h>
|
||||
@ -95,21 +95,69 @@ struct FixedStringOperationImpl
|
||||
c[i] = Op::template apply<UInt8>(a[i], b[i]);
|
||||
}
|
||||
|
||||
static void NO_INLINE vector_constant(const UInt8 * __restrict a, const UInt8 * __restrict b, UInt8 * __restrict c, size_t size, size_t N)
|
||||
template <bool inverted>
|
||||
static void NO_INLINE vector_constant_impl(const UInt8 * __restrict a, const UInt8 * __restrict b, UInt8 * __restrict c, size_t size, size_t N)
|
||||
{
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
c[i] = Op::template apply<UInt8>(a[i], b[i % N]);
|
||||
/// These complications are needed to avoid integer division in inner loop.
|
||||
|
||||
/// Create a pattern of repeated values of b with at least 16 bytes,
|
||||
/// so we can read 16 bytes of this repeated pattern starting from any offset inside b.
|
||||
///
|
||||
/// Example:
|
||||
///
|
||||
/// N = 6
|
||||
/// ------
|
||||
/// [abcdefabcdefabcdefabc]
|
||||
/// ^^^^^^^^^^^^^^^^
|
||||
/// 16 bytes starting from the last offset inside b.
|
||||
|
||||
const size_t b_repeated_size = N + 15;
|
||||
UInt8 b_repeated[b_repeated_size];
|
||||
for (size_t i = 0; i < b_repeated_size; ++i)
|
||||
b_repeated[i] = b[i % N];
|
||||
|
||||
size_t b_offset = 0;
|
||||
size_t b_increment = 16 % N;
|
||||
|
||||
/// Example:
|
||||
///
|
||||
/// At first iteration we copy 16 bytes at offset 0 from b_repeated:
|
||||
/// [abcdefabcdefabcdefabc]
|
||||
/// ^^^^^^^^^^^^^^^^
|
||||
/// At second iteration we copy 16 bytes at offset 4 = 16 % 6 from b_repeated:
|
||||
/// [abcdefabcdefabcdefabc]
|
||||
/// ^^^^^^^^^^^^^^^^
|
||||
/// At third iteration we copy 16 bytes at offset 2 = (16 * 2) % 6 from b_repeated:
|
||||
/// [abcdefabcdefabcdefabc]
|
||||
/// ^^^^^^^^^^^^^^^^
|
||||
|
||||
/// PaddedPODArray allows overflow for 15 bytes.
|
||||
for (size_t i = 0; i < size; i += 16)
|
||||
{
|
||||
/// This loop is formed in a way to be vectorized into two SIMD mov.
|
||||
for (size_t j = 0; j < 16; ++j)
|
||||
c[i + j] = inverted
|
||||
? Op::template apply<UInt8>(a[i + j], b_repeated[b_offset + j])
|
||||
: Op::template apply<UInt8>(b_repeated[b_offset + j], a[i + j]);
|
||||
|
||||
b_offset += b_increment;
|
||||
if (b_offset >= N) /// This condition is easily predictable.
|
||||
b_offset -= N;
|
||||
}
|
||||
}
|
||||
|
||||
static void NO_INLINE constant_vector(const UInt8 * __restrict a, const UInt8 * __restrict b, UInt8 * __restrict c, size_t size, size_t N)
|
||||
static void vector_constant(const UInt8 * __restrict a, const UInt8 * __restrict b, UInt8 * __restrict c, size_t size, size_t N)
|
||||
{
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
c[i] = Op::template apply<UInt8>(a[i % N], b[i]);
|
||||
vector_constant_impl<false>(a, b, c, size, N);
|
||||
}
|
||||
|
||||
static void constant_vector(const UInt8 * __restrict a, const UInt8 * __restrict b, UInt8 * __restrict c, size_t size, size_t N)
|
||||
{
|
||||
vector_constant_impl<true>(b, a, c, size, N);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
template <typename A, typename B, typename Op, typename ResultType = typename Op::ResultType>
|
||||
struct BinaryOperationImpl : BinaryOperationImplBase<A, B, Op, ResultType>
|
||||
{
|
||||
|
@ -18,7 +18,6 @@
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
/** Logical functions AND, OR, XOR and NOT support three-valued (or ternary) logic
|
||||
* https://en.wikibooks.org/wiki/Structured_Query_Language/NULLs_and_the_Three_Valued_Logic
|
||||
*
|
||||
|
@ -162,7 +162,6 @@ inline ALWAYS_INLINE void writeSlice(const NumericValueSlice<T> & slice, Generic
|
||||
}
|
||||
|
||||
|
||||
|
||||
template <typename SourceA, typename SourceB, typename Sink>
|
||||
void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
|
||||
{
|
||||
|
@ -124,5 +124,3 @@ void registerFunctionCaseWithExpression(FunctionFactory & factory)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -54,4 +54,3 @@ void registerFunctionGenerateUUIDv4(FunctionFactory & factory)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionBinaryArithmetic.h>
|
||||
#include <Functions/intDiv.h>
|
||||
|
||||
#ifdef __SSE2__
|
||||
#define LIBDIVIDE_USE_SSE2 1
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionBinaryArithmetic.h>
|
||||
|
||||
#include "intDiv.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -16,23 +16,7 @@ namespace ErrorCodes
|
||||
extern const int ILLEGAL_DIVISION;
|
||||
}
|
||||
|
||||
template <typename A, typename B>
|
||||
struct ModuloImpl
|
||||
{
|
||||
using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
|
||||
static const constexpr bool allow_fixed_string = false;
|
||||
|
||||
template <typename Result = ResultType>
|
||||
static inline Result apply(A a, B b)
|
||||
{
|
||||
throwIfDivisionLeadsToFPE(typename NumberTraits::ToInteger<A>::Type(a), typename NumberTraits::ToInteger<B>::Type(b));
|
||||
return typename NumberTraits::ToInteger<A>::Type(a) % typename NumberTraits::ToInteger<B>::Type(b);
|
||||
}
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
static constexpr bool compilable = false; /// don't know how to throw from LLVM IR
|
||||
#endif
|
||||
};
|
||||
/// Optimizations for integer modulo by a constant.
|
||||
|
||||
template <typename A, typename B>
|
||||
struct ModuloByConstantImpl
|
||||
|
36
dbms/src/Functions/moduloOrZero.cpp
Normal file
36
dbms/src/Functions/moduloOrZero.cpp
Normal file
@ -0,0 +1,36 @@
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionBinaryArithmetic.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <typename A, typename B>
|
||||
struct ModuloOrZeroImpl
|
||||
{
|
||||
using ResultType = typename NumberTraits::ResultOfModulo<A, B>::Type;
|
||||
static const constexpr bool allow_fixed_string = false;
|
||||
|
||||
template <typename Result = ResultType>
|
||||
static inline Result apply(A a, B b)
|
||||
{
|
||||
if (unlikely(divisionLeadsToFPE(a, b)))
|
||||
return 0;
|
||||
|
||||
return ModuloImpl<A, B>::template apply<Result>(a, b);
|
||||
}
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
static constexpr bool compilable = false; /// TODO implement the checks
|
||||
#endif
|
||||
};
|
||||
|
||||
struct NameModuloOrZero { static constexpr auto name = "moduloOrZero"; };
|
||||
using FunctionModuloOrZero = FunctionBinaryArithmetic<ModuloOrZeroImpl, NameModuloOrZero>;
|
||||
|
||||
void registerFunctionModuloOrZero(FunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<FunctionModuloOrZero>();
|
||||
}
|
||||
|
||||
}
|
@ -242,4 +242,3 @@ void registerFunctionMultiIf(FunctionFactory & factory)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -10,6 +10,7 @@ void registerFunctionDivide(FunctionFactory & factory);
|
||||
void registerFunctionIntDiv(FunctionFactory & factory);
|
||||
void registerFunctionIntDivOrZero(FunctionFactory & factory);
|
||||
void registerFunctionModulo(FunctionFactory & factory);
|
||||
void registerFunctionModuloOrZero(FunctionFactory & factory);
|
||||
void registerFunctionNegate(FunctionFactory & factory);
|
||||
void registerFunctionAbs(FunctionFactory & factory);
|
||||
void registerFunctionBitAnd(FunctionFactory & factory);
|
||||
@ -49,6 +50,7 @@ void registerFunctionsArithmetic(FunctionFactory & factory)
|
||||
registerFunctionIntDiv(factory);
|
||||
registerFunctionIntDivOrZero(factory);
|
||||
registerFunctionModulo(factory);
|
||||
registerFunctionModuloOrZero(factory);
|
||||
registerFunctionNegate(factory);
|
||||
registerFunctionAbs(factory);
|
||||
registerFunctionBitAnd(factory);
|
||||
|
@ -152,4 +152,3 @@ void registerFunctionsReinterpretStringAs(FunctionFactory & factory)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -18,4 +18,3 @@ void registerFunctionTimeSlot(FunctionFactory & factory)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -62,4 +62,43 @@ void writeException(const Exception & e, WriteBuffer & buf, bool with_stack_trac
|
||||
bool has_nested = false;
|
||||
writeBinary(has_nested, buf);
|
||||
}
|
||||
|
||||
|
||||
/// The same, but quotes apply only if there are characters that do not match the identifier without quotes
|
||||
template <typename F>
|
||||
static inline void writeProbablyQuotedStringImpl(const StringRef & s, WriteBuffer & buf, F && write_quoted_string)
|
||||
{
|
||||
if (!s.size || !isValidIdentifierBegin(s.data[0]))
|
||||
{
|
||||
write_quoted_string(s, buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
const char * pos = s.data + 1;
|
||||
const char * end = s.data + s.size;
|
||||
for (; pos < end; ++pos)
|
||||
if (!isWordCharASCII(*pos))
|
||||
break;
|
||||
if (pos != end)
|
||||
write_quoted_string(s, buf);
|
||||
else
|
||||
writeString(s, buf);
|
||||
}
|
||||
}
|
||||
|
||||
void writeProbablyBackQuotedString(const StringRef & s, WriteBuffer & buf)
|
||||
{
|
||||
writeProbablyQuotedStringImpl(s, buf, [](const StringRef & s_, WriteBuffer & buf_) { return writeBackQuotedString(s_, buf_); });
|
||||
}
|
||||
|
||||
void writeProbablyDoubleQuotedString(const StringRef & s, WriteBuffer & buf)
|
||||
{
|
||||
writeProbablyQuotedStringImpl(s, buf, [](const StringRef & s_, WriteBuffer & buf_) { return writeDoubleQuotedString(s_, buf_); });
|
||||
}
|
||||
|
||||
void writeProbablyBackQuotedStringMySQL(const StringRef & s, WriteBuffer & buf)
|
||||
{
|
||||
writeProbablyQuotedStringImpl(s, buf, [](const StringRef & s_, WriteBuffer & buf_) { return writeBackQuotedStringMySQL(s_, buf_); });
|
||||
}
|
||||
|
||||
}
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user