mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge
This commit is contained in:
commit
775812734d
@ -12,8 +12,6 @@
|
|||||||
#include <DB/Core/Protocol.h>
|
#include <DB/Core/Protocol.h>
|
||||||
#include <DB/Core/QueryProcessingStage.h>
|
#include <DB/Core/QueryProcessingStage.h>
|
||||||
|
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
|
|
||||||
#include <DB/DataStreams/IBlockInputStream.h>
|
#include <DB/DataStreams/IBlockInputStream.h>
|
||||||
#include <DB/DataStreams/IBlockOutputStream.h>
|
#include <DB/DataStreams/IBlockOutputStream.h>
|
||||||
#include <DB/DataStreams/BlockStreamProfileInfo.h>
|
#include <DB/DataStreams/BlockStreamProfileInfo.h>
|
||||||
@ -50,7 +48,6 @@ class Connection : private boost::noncopyable
|
|||||||
public:
|
public:
|
||||||
Connection(const String & host_, UInt16 port_, const String & default_database_,
|
Connection(const String & host_, UInt16 port_, const String & default_database_,
|
||||||
const String & user_, const String & password_,
|
const String & user_, const String & password_,
|
||||||
const DataTypeFactory & data_type_factory_,
|
|
||||||
const String & client_name_ = "client",
|
const String & client_name_ = "client",
|
||||||
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
|
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
|
||||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||||
@ -61,7 +58,7 @@ public:
|
|||||||
host(host_), port(port_), default_database(default_database_),
|
host(host_), port(port_), default_database(default_database_),
|
||||||
user(user_), password(password_),
|
user(user_), password(password_),
|
||||||
client_name(client_name_),
|
client_name(client_name_),
|
||||||
compression(compression_), data_type_factory(data_type_factory_),
|
compression(compression_),
|
||||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
|
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
|
||||||
ping_timeout(ping_timeout_),
|
ping_timeout(ping_timeout_),
|
||||||
log_wrapper(host, port)
|
log_wrapper(host, port)
|
||||||
@ -172,8 +169,6 @@ private:
|
|||||||
/// каким алгоритмом сжимать данные при INSERT и данные внешних таблиц
|
/// каким алгоритмом сжимать данные при INSERT и данные внешних таблиц
|
||||||
CompressionMethod network_compression_method = CompressionMethod::LZ4;
|
CompressionMethod network_compression_method = CompressionMethod::LZ4;
|
||||||
|
|
||||||
const DataTypeFactory & data_type_factory;
|
|
||||||
|
|
||||||
/** Если не nullptr, то используется, чтобы ограничить сетевой трафик.
|
/** Если не nullptr, то используется, чтобы ограничить сетевой трафик.
|
||||||
* Учитывается только трафик при передаче блоков. Другие пакеты не учитываются.
|
* Учитывается только трафик при передаче блоков. Другие пакеты не учитываются.
|
||||||
*/
|
*/
|
||||||
|
@ -56,7 +56,6 @@ public:
|
|||||||
ConnectionPool(unsigned max_connections_,
|
ConnectionPool(unsigned max_connections_,
|
||||||
const String & host_, UInt16 port_, const String & default_database_,
|
const String & host_, UInt16 port_, const String & default_database_,
|
||||||
const String & user_, const String & password_,
|
const String & user_, const String & password_,
|
||||||
const DataTypeFactory & data_type_factory_,
|
|
||||||
const String & client_name_ = "client",
|
const String & client_name_ = "client",
|
||||||
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
|
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
|
||||||
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
|
||||||
@ -65,7 +64,7 @@ public:
|
|||||||
: Base(max_connections_, &Logger::get("ConnectionPool (" + Poco::Net::SocketAddress(host_, port_).toString() + ")")),
|
: Base(max_connections_, &Logger::get("ConnectionPool (" + Poco::Net::SocketAddress(host_, port_).toString() + ")")),
|
||||||
host(host_), port(port_), default_database(default_database_),
|
host(host_), port(port_), default_database(default_database_),
|
||||||
user(user_), password(password_),
|
user(user_), password(password_),
|
||||||
client_name(client_name_), compression(compression_), data_type_factory(data_type_factory_),
|
client_name(client_name_), compression(compression_),
|
||||||
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
|
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -91,7 +90,7 @@ protected:
|
|||||||
{
|
{
|
||||||
return new Connection(
|
return new Connection(
|
||||||
host, port, default_database, user, password,
|
host, port, default_database, user, password,
|
||||||
data_type_factory, client_name, compression,
|
client_name, compression,
|
||||||
connect_timeout, receive_timeout, send_timeout);
|
connect_timeout, receive_timeout, send_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,8 +104,6 @@ private:
|
|||||||
String client_name;
|
String client_name;
|
||||||
Protocol::Compression::Enum compression; /// Сжимать ли данные при взаимодействии с сервером.
|
Protocol::Compression::Enum compression; /// Сжимать ли данные при взаимодействии с сервером.
|
||||||
|
|
||||||
const DataTypeFactory & data_type_factory;
|
|
||||||
|
|
||||||
Poco::Timespan connect_timeout;
|
Poco::Timespan connect_timeout;
|
||||||
Poco::Timespan receive_timeout;
|
Poco::Timespan receive_timeout;
|
||||||
Poco::Timespan send_timeout;
|
Poco::Timespan send_timeout;
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
|
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
|
||||||
#include <DB/DataStreams/FormatFactory.h>
|
#include <DB/DataStreams/FormatFactory.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
#include <DB/Interpreters/Context.h>
|
#include <DB/Interpreters/Context.h>
|
||||||
#include <DB/IO/copyData.h>
|
#include <DB/IO/copyData.h>
|
||||||
#include <DB/IO/ReadBufferFromIStream.h>
|
#include <DB/IO/ReadBufferFromIStream.h>
|
||||||
@ -42,11 +43,13 @@ public:
|
|||||||
/// Инициализировать sample_block по структуре таблицы сохраненной в structure
|
/// Инициализировать sample_block по структуре таблицы сохраненной в structure
|
||||||
virtual void initSampleBlock(const Context & context)
|
virtual void initSampleBlock(const Context & context)
|
||||||
{
|
{
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
for (size_t i = 0; i < structure.size(); ++i)
|
for (size_t i = 0; i < structure.size(); ++i)
|
||||||
{
|
{
|
||||||
ColumnWithNameAndType column;
|
ColumnWithNameAndType column;
|
||||||
column.name = structure[i].first;
|
column.name = structure[i].first;
|
||||||
column.type = context.getDataTypeFactory().get(structure[i].second);
|
column.type = data_type_factory.get(structure[i].second);
|
||||||
column.column = column.type->createColumn();
|
column.column = column.type->createColumn();
|
||||||
sample_block.insert(column);
|
sample_block.insert(column);
|
||||||
}
|
}
|
||||||
@ -58,7 +61,7 @@ public:
|
|||||||
initReadBuffer();
|
initReadBuffer();
|
||||||
initSampleBlock(context);
|
initSampleBlock(context);
|
||||||
ExternalTableData res = std::make_pair(new AsynchronousBlockInputStream(context.getFormatFactory().getInput(
|
ExternalTableData res = std::make_pair(new AsynchronousBlockInputStream(context.getFormatFactory().getInput(
|
||||||
format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE, context.getDataTypeFactory())), name);
|
format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE)), name);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,6 +192,8 @@ public:
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
T * data() { return t_start(); }
|
||||||
|
const T * data() const { return t_start(); }
|
||||||
|
|
||||||
size_t size() const { return t_end() - t_start(); }
|
size_t size() const { return t_end() - t_start(); }
|
||||||
bool empty() const { return t_end() == t_start(); }
|
bool empty() const { return t_end() == t_start(); }
|
||||||
|
@ -285,6 +285,7 @@ namespace ErrorCodes
|
|||||||
LEADERSHIP_LOST = 278,
|
LEADERSHIP_LOST = 278,
|
||||||
ALL_CONNECTION_TRIES_FAILED = 279,
|
ALL_CONNECTION_TRIES_FAILED = 279,
|
||||||
|
|
||||||
|
KEEPER_EXCEPTION = 999,
|
||||||
POCO_EXCEPTION = 1000,
|
POCO_EXCEPTION = 1000,
|
||||||
STD_EXCEPTION = 1001,
|
STD_EXCEPTION = 1001,
|
||||||
UNKNOWN_EXCEPTION = 1002,
|
UNKNOWN_EXCEPTION = 1002,
|
||||||
|
@ -9,7 +9,6 @@
|
|||||||
#include <sparsehash/dense_hash_map>
|
#include <sparsehash/dense_hash_map>
|
||||||
|
|
||||||
#include <DB/DataTypes/IDataType.h>
|
#include <DB/DataTypes/IDataType.h>
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
#include <DB/IO/ReadBufferFromString.h>
|
#include <DB/IO/ReadBufferFromString.h>
|
||||||
#include "Names.h"
|
#include "Names.h"
|
||||||
|
|
||||||
@ -45,11 +44,11 @@ class NamesAndTypesList : public std::list<NameAndTypePair>
|
|||||||
public:
|
public:
|
||||||
using std::list<NameAndTypePair>::list;
|
using std::list<NameAndTypePair>::list;
|
||||||
|
|
||||||
void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory);
|
void readText(ReadBuffer & buf);
|
||||||
void writeText(WriteBuffer & buf) const;
|
void writeText(WriteBuffer & buf) const;
|
||||||
|
|
||||||
String toString() const;
|
String toString() const;
|
||||||
static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory);
|
static NamesAndTypesList parse(const String & s);
|
||||||
|
|
||||||
/// Все элементы rhs должны быть различны.
|
/// Все элементы rhs должны быть различны.
|
||||||
bool isSubsetOf(const NamesAndTypesList & rhs) const;
|
bool isSubsetOf(const NamesAndTypesList & rhs) const;
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
|
|
||||||
#include <DB/DataStreams/IBlockInputStream.h>
|
#include <DB/DataStreams/IBlockInputStream.h>
|
||||||
#include <DB/DataStreams/IBlockOutputStream.h>
|
#include <DB/DataStreams/IBlockOutputStream.h>
|
||||||
|
|
||||||
@ -16,7 +14,7 @@ class FormatFactory
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
|
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
|
||||||
Block & sample, size_t max_block_size, const DataTypeFactory & data_type_factory) const;
|
Block & sample, size_t max_block_size) const;
|
||||||
|
|
||||||
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
|
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
|
||||||
Block & sample) const;
|
Block & sample) const;
|
||||||
|
@ -66,9 +66,9 @@ public:
|
|||||||
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
|
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
|
||||||
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_,
|
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_,
|
||||||
size_t max_merged_block_size_, size_t limit_,
|
size_t max_merged_block_size_, size_t limit_,
|
||||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, const DataTypeFactory & data_type_factory_)
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
|
||||||
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
||||||
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), data_type_factory(data_type_factory_)
|
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
|
||||||
{
|
{
|
||||||
children.push_back(input_);
|
children.push_back(input_);
|
||||||
}
|
}
|
||||||
@ -97,7 +97,6 @@ private:
|
|||||||
|
|
||||||
size_t max_bytes_before_external_sort;
|
size_t max_bytes_before_external_sort;
|
||||||
const std::string tmp_path;
|
const std::string tmp_path;
|
||||||
const DataTypeFactory & data_type_factory;
|
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
||||||
|
|
||||||
@ -115,8 +114,8 @@ private:
|
|||||||
CompressedReadBuffer compressed_in;
|
CompressedReadBuffer compressed_in;
|
||||||
BlockInputStreamPtr block_in;
|
BlockInputStreamPtr block_in;
|
||||||
|
|
||||||
TemporaryFileStream(const std::string & path, const DataTypeFactory & data_type_factory)
|
TemporaryFileStream(const std::string & path)
|
||||||
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, data_type_factory)) {}
|
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in)) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||||
|
|
||||||
|
|
||||||
@ -16,8 +15,8 @@ public:
|
|||||||
/** В случае указания ненулевой server_revision, может ожидаться и считываться дополнительная информация о блоке,
|
/** В случае указания ненулевой server_revision, может ожидаться и считываться дополнительная информация о блоке,
|
||||||
* в зависимости от поддерживаемой для указанной ревизии.
|
* в зависимости от поддерживаемой для указанной ревизии.
|
||||||
*/
|
*/
|
||||||
NativeBlockInputStream(ReadBuffer & istr_, const DataTypeFactory & data_type_factory_, UInt64 server_revision_ = 0)
|
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_ = 0)
|
||||||
: istr(istr_), data_type_factory(data_type_factory_), server_revision(server_revision_) {}
|
: istr(istr_), server_revision(server_revision_) {}
|
||||||
|
|
||||||
String getName() const override { return "NativeBlockInputStream"; }
|
String getName() const override { return "NativeBlockInputStream"; }
|
||||||
|
|
||||||
@ -35,7 +34,6 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
ReadBuffer & istr;
|
ReadBuffer & istr;
|
||||||
const DataTypeFactory & data_type_factory;
|
|
||||||
UInt64 server_revision;
|
UInt64 server_revision;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -20,10 +20,12 @@ const auto max_connections = 16;
|
|||||||
class ClickHouseDictionarySource final : public IDictionarySource
|
class ClickHouseDictionarySource final : public IDictionarySource
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ClickHouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
|
ClickHouseDictionarySource(const DictionaryStructure & dict_struct,
|
||||||
|
const Poco::Util::AbstractConfiguration & config,
|
||||||
const std::string & config_prefix,
|
const std::string & config_prefix,
|
||||||
Block & sample_block, Context & context)
|
Block & sample_block, Context & context)
|
||||||
: host{config.getString(config_prefix + ".host")},
|
: dict_struct{dict_struct},
|
||||||
|
host{config.getString(config_prefix + ".host")},
|
||||||
port(config.getInt(config_prefix + ".port")),
|
port(config.getInt(config_prefix + ".port")),
|
||||||
user{config.getString(config_prefix + ".user", "")},
|
user{config.getString(config_prefix + ".user", "")},
|
||||||
password{config.getString(config_prefix + ".password", "")},
|
password{config.getString(config_prefix + ".password", "")},
|
||||||
@ -33,7 +35,7 @@ public:
|
|||||||
sample_block{sample_block}, context(context),
|
sample_block{sample_block}, context(context),
|
||||||
is_local{isLocalAddress({ host, port })},
|
is_local{isLocalAddress({ host, port })},
|
||||||
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
|
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
|
||||||
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
|
max_connections, host, port, db, user, password,
|
||||||
"ClickHouseDictionarySource")
|
"ClickHouseDictionarySource")
|
||||||
},
|
},
|
||||||
load_all_query{composeLoadAllQuery()}
|
load_all_query{composeLoadAllQuery()}
|
||||||
@ -41,13 +43,14 @@ public:
|
|||||||
|
|
||||||
/// copy-constructor is provided in order to support cloneability
|
/// copy-constructor is provided in order to support cloneability
|
||||||
ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
|
ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
|
||||||
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
|
: dict_struct{other.dict_struct},
|
||||||
|
host{other.host}, port{other.port}, user{other.user}, password{other.password},
|
||||||
db{other.db}, table{other.table},
|
db{other.db}, table{other.table},
|
||||||
where{other.where},
|
where{other.where},
|
||||||
sample_block{other.sample_block}, context(other.context),
|
sample_block{other.sample_block}, context(other.context),
|
||||||
is_local{other.is_local},
|
is_local{other.is_local},
|
||||||
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
|
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
|
||||||
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
|
max_connections, host, port, db, user, password,
|
||||||
"ClickHouseDictionarySource")},
|
"ClickHouseDictionarySource")},
|
||||||
load_all_query{other.load_all_query}
|
load_all_query{other.load_all_query}
|
||||||
{}
|
{}
|
||||||
@ -90,14 +93,19 @@ private:
|
|||||||
WriteBufferFromString out{query};
|
WriteBufferFromString out{query};
|
||||||
writeString("SELECT ", out);
|
writeString("SELECT ", out);
|
||||||
|
|
||||||
auto first = true;
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
for (const auto idx : ext::range(0, sample_block.columns()))
|
|
||||||
{
|
|
||||||
if (!first)
|
|
||||||
writeString(", ", out);
|
|
||||||
|
|
||||||
writeString(sample_block.getByPosition(idx).name, out);
|
for (const auto & attr : dict_struct.attributes)
|
||||||
first = false;
|
{
|
||||||
|
writeString(", ", out);
|
||||||
|
|
||||||
|
if (!attr.expression.empty())
|
||||||
|
{
|
||||||
|
writeString(attr.expression, out);
|
||||||
|
writeString(" AS ", out);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeProbablyBackQuotedString(attr.name, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
writeString(" FROM ", out);
|
writeString(" FROM ", out);
|
||||||
@ -128,17 +136,21 @@ private:
|
|||||||
WriteBufferFromString out{query};
|
WriteBufferFromString out{query};
|
||||||
writeString("SELECT ", out);
|
writeString("SELECT ", out);
|
||||||
|
|
||||||
auto first = true;
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
for (const auto idx : ext::range(0, sample_block.columns()))
|
|
||||||
{
|
|
||||||
if (!first)
|
|
||||||
writeString(", ", out);
|
|
||||||
|
|
||||||
writeString(sample_block.getByPosition(idx).name, out);
|
for (const auto & attr : dict_struct.attributes)
|
||||||
first = false;
|
{
|
||||||
|
writeString(", ", out);
|
||||||
|
|
||||||
|
if (!attr.expression.empty())
|
||||||
|
{
|
||||||
|
writeString(attr.expression, out);
|
||||||
|
writeString(" AS ", out);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeProbablyBackQuotedString(attr.name, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto & id_column_name = sample_block.getByPosition(0).name;
|
|
||||||
writeString(" FROM ", out);
|
writeString(" FROM ", out);
|
||||||
if (!db.empty())
|
if (!db.empty())
|
||||||
{
|
{
|
||||||
@ -155,10 +167,10 @@ private:
|
|||||||
writeString(" AND ", out);
|
writeString(" AND ", out);
|
||||||
}
|
}
|
||||||
|
|
||||||
writeProbablyBackQuotedString(id_column_name, out);
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
writeString(" IN (", out);
|
writeString(" IN (", out);
|
||||||
|
|
||||||
first = true;
|
auto first = true;
|
||||||
for (const auto id : ids)
|
for (const auto id : ids)
|
||||||
{
|
{
|
||||||
if (!first)
|
if (!first)
|
||||||
@ -174,6 +186,7 @@ private:
|
|||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DictionaryStructure dict_struct;
|
||||||
const std::string host;
|
const std::string host;
|
||||||
const UInt16 port;
|
const UInt16 port;
|
||||||
const std::string user;
|
const std::string user;
|
||||||
|
@ -64,11 +64,11 @@ public:
|
|||||||
}
|
}
|
||||||
else if ("mysql" == source_type)
|
else if ("mysql" == source_type)
|
||||||
{
|
{
|
||||||
return std::make_unique<MySQLDictionarySource>(config, config_prefix + ".mysql", sample_block);
|
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block);
|
||||||
}
|
}
|
||||||
else if ("clickhouse" == source_type)
|
else if ("clickhouse" == source_type)
|
||||||
{
|
{
|
||||||
return std::make_unique<ClickHouseDictionarySource>(config, config_prefix + ".clickhouse",
|
return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse",
|
||||||
sample_block, context);
|
sample_block, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ inline std::string toString(const AttributeUnderlyingType type)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Min and max lifetimes for a dictionary or it's entry
|
/// Min and max lifetimes for a dictionary or it's entry
|
||||||
struct DictionaryLifetime
|
struct DictionaryLifetime final
|
||||||
{
|
{
|
||||||
std::uint64_t min_sec;
|
std::uint64_t min_sec;
|
||||||
std::uint64_t max_sec;
|
std::uint64_t max_sec;
|
||||||
@ -101,18 +101,19 @@ struct DictionaryLifetime
|
|||||||
* - hierarchical, whether this attribute defines a hierarchy;
|
* - hierarchical, whether this attribute defines a hierarchy;
|
||||||
* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?)
|
* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?)
|
||||||
*/
|
*/
|
||||||
struct DictionaryAttribute
|
struct DictionaryAttribute final
|
||||||
{
|
{
|
||||||
std::string name;
|
const std::string name;
|
||||||
AttributeUnderlyingType underlying_type;
|
const AttributeUnderlyingType underlying_type;
|
||||||
DataTypePtr type;
|
const DataTypePtr type;
|
||||||
Field null_value;
|
const std::string expression;
|
||||||
bool hierarchical;
|
const Field null_value;
|
||||||
bool injective;
|
const bool hierarchical;
|
||||||
|
const bool injective;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Name of identifier plus list of attributes
|
/// Name of identifier plus list of attributes
|
||||||
struct DictionaryStructure
|
struct DictionaryStructure final
|
||||||
{
|
{
|
||||||
std::string id_name;
|
std::string id_name;
|
||||||
std::vector<DictionaryAttribute> attributes;
|
std::vector<DictionaryAttribute> attributes;
|
||||||
@ -142,6 +143,8 @@ struct DictionaryStructure
|
|||||||
const auto type = DataTypeFactory::instance().get(type_string);
|
const auto type = DataTypeFactory::instance().get(type_string);
|
||||||
const auto underlying_type = getAttributeUnderlyingType(type_string);
|
const auto underlying_type = getAttributeUnderlyingType(type_string);
|
||||||
|
|
||||||
|
const auto expression = config.getString(prefix + "expression", "");
|
||||||
|
|
||||||
const auto null_value_string = config.getString(prefix + "null_value");
|
const auto null_value_string = config.getString(prefix + "null_value");
|
||||||
Field null_value;
|
Field null_value;
|
||||||
try
|
try
|
||||||
@ -174,7 +177,7 @@ struct DictionaryStructure
|
|||||||
has_hierarchy = has_hierarchy || hierarchical;
|
has_hierarchy = has_hierarchy || hierarchical;
|
||||||
|
|
||||||
attributes.emplace_back(DictionaryAttribute{
|
attributes.emplace_back(DictionaryAttribute{
|
||||||
name, underlying_type, type, null_value, hierarchical, injective
|
name, underlying_type, type, expression, null_value, hierarchical, injective
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ public:
|
|||||||
{
|
{
|
||||||
auto in_ptr = std::make_unique<ReadBufferFromFile>(filename);
|
auto in_ptr = std::make_unique<ReadBufferFromFile>(filename);
|
||||||
auto stream = context.getFormatFactory().getInput(
|
auto stream = context.getFormatFactory().getInput(
|
||||||
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
|
format, *in_ptr, sample_block, max_block_size);
|
||||||
last_modification = getLastModification();
|
last_modification = getLastModification();
|
||||||
|
|
||||||
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
|
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
|
||||||
|
@ -16,9 +16,11 @@ class MySQLDictionarySource final : public IDictionarySource
|
|||||||
static const auto max_block_size = 8192;
|
static const auto max_block_size = 8192;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MySQLDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
|
MySQLDictionarySource(const DictionaryStructure & dict_struct,
|
||||||
|
const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
|
||||||
Block & sample_block)
|
Block & sample_block)
|
||||||
: db{config.getString(config_prefix + ".db", "")},
|
: dict_struct{dict_struct},
|
||||||
|
db{config.getString(config_prefix + ".db", "")},
|
||||||
table{config.getString(config_prefix + ".table")},
|
table{config.getString(config_prefix + ".table")},
|
||||||
where{config.getString(config_prefix + ".where", "")},
|
where{config.getString(config_prefix + ".where", "")},
|
||||||
sample_block{sample_block},
|
sample_block{sample_block},
|
||||||
@ -29,7 +31,8 @@ public:
|
|||||||
|
|
||||||
/// copy-constructor is provided in order to support cloneability
|
/// copy-constructor is provided in order to support cloneability
|
||||||
MySQLDictionarySource(const MySQLDictionarySource & other)
|
MySQLDictionarySource(const MySQLDictionarySource & other)
|
||||||
: db{other.db},
|
: dict_struct{other.dict_struct},
|
||||||
|
db{other.db},
|
||||||
table{other.table},
|
table{other.table},
|
||||||
where{other.where},
|
where{other.where},
|
||||||
sample_block{other.sample_block},
|
sample_block{other.sample_block},
|
||||||
@ -101,14 +104,19 @@ private:
|
|||||||
WriteBufferFromString out{query};
|
WriteBufferFromString out{query};
|
||||||
writeString("SELECT ", out);
|
writeString("SELECT ", out);
|
||||||
|
|
||||||
auto first = true;
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
for (const auto idx : ext::range(0, sample_block.columns()))
|
|
||||||
{
|
|
||||||
if (!first)
|
|
||||||
writeString(", ", out);
|
|
||||||
|
|
||||||
writeString(sample_block.getByPosition(idx).name, out);
|
for (const auto & attr : dict_struct.attributes)
|
||||||
first = false;
|
{
|
||||||
|
writeString(", ", out);
|
||||||
|
|
||||||
|
if (!attr.expression.empty())
|
||||||
|
{
|
||||||
|
writeString(attr.expression, out);
|
||||||
|
writeString(" AS ", out);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeProbablyBackQuotedString(attr.name, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
writeString(" FROM ", out);
|
writeString(" FROM ", out);
|
||||||
@ -139,17 +147,21 @@ private:
|
|||||||
WriteBufferFromString out{query};
|
WriteBufferFromString out{query};
|
||||||
writeString("SELECT ", out);
|
writeString("SELECT ", out);
|
||||||
|
|
||||||
auto first = true;
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
for (const auto idx : ext::range(0, sample_block.columns()))
|
|
||||||
{
|
|
||||||
if (!first)
|
|
||||||
writeString(", ", out);
|
|
||||||
|
|
||||||
writeString(sample_block.getByPosition(idx).name, out);
|
for (const auto & attr : dict_struct.attributes)
|
||||||
first = false;
|
{
|
||||||
|
writeString(", ", out);
|
||||||
|
|
||||||
|
if (!attr.expression.empty())
|
||||||
|
{
|
||||||
|
writeString(attr.expression, out);
|
||||||
|
writeString(" AS ", out);
|
||||||
|
}
|
||||||
|
|
||||||
|
writeProbablyBackQuotedString(attr.name, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto & id_column_name = sample_block.getByPosition(0).name;
|
|
||||||
writeString(" FROM ", out);
|
writeString(" FROM ", out);
|
||||||
if (!db.empty())
|
if (!db.empty())
|
||||||
{
|
{
|
||||||
@ -166,10 +178,10 @@ private:
|
|||||||
writeString(" AND ", out);
|
writeString(" AND ", out);
|
||||||
}
|
}
|
||||||
|
|
||||||
writeProbablyBackQuotedString(id_column_name, out);
|
writeProbablyBackQuotedString(dict_struct.id_name, out);
|
||||||
writeString(" IN (", out);
|
writeString(" IN (", out);
|
||||||
|
|
||||||
first = true;
|
auto first = true;
|
||||||
for (const auto id : ids)
|
for (const auto id : ids)
|
||||||
{
|
{
|
||||||
if (!first)
|
if (!first)
|
||||||
@ -185,6 +197,7 @@ private:
|
|||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DictionaryStructure dict_struct;
|
||||||
const std::string db;
|
const std::string db;
|
||||||
const std::string table;
|
const std::string table;
|
||||||
const std::string where;
|
const std::string where;
|
||||||
|
@ -97,6 +97,68 @@ namespace DB
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** Быстрое вычисление остатка от деления для применения к округлению целых чисел.
|
||||||
|
* Без проверки, потому что делитель всегда положительный.
|
||||||
|
*/
|
||||||
|
template<typename T, typename Enable = void>
|
||||||
|
struct FastModulo
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
struct FastModulo<T, typename std::enable_if<std::is_integral<T>::value>::type>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
template<typename InputType, typename Enable = void>
|
||||||
|
struct Extend
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename InputType>
|
||||||
|
struct Extend<InputType,
|
||||||
|
typename std::enable_if<std::is_same<InputType, Int8>::value
|
||||||
|
|| std::is_same<InputType, Int16>::value>::type>
|
||||||
|
{
|
||||||
|
using Type = Int64;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename InputType>
|
||||||
|
struct Extend<InputType,
|
||||||
|
typename std::enable_if<std::is_same<InputType, UInt8>::value
|
||||||
|
|| std::is_same<InputType, UInt16>::value>::type>
|
||||||
|
{
|
||||||
|
using Type = UInt64;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename InputType>
|
||||||
|
struct Extend<InputType,
|
||||||
|
typename std::enable_if<std::is_integral<InputType>::value
|
||||||
|
&& (sizeof(InputType) >= 4)>::type>
|
||||||
|
{
|
||||||
|
using Type = InputType;
|
||||||
|
};
|
||||||
|
|
||||||
|
using U = typename Extend<T>::Type;
|
||||||
|
|
||||||
|
public:
|
||||||
|
using Divisor = std::pair<size_t, typename libdivide::divider<U> >;
|
||||||
|
|
||||||
|
static inline Divisor prepare(size_t b)
|
||||||
|
{
|
||||||
|
return std::make_pair(b, libdivide::divider<U>(b));
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline T compute(T a, const Divisor & divisor)
|
||||||
|
{
|
||||||
|
if (divisor.first == 1)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
U val = static_cast<U>(a);
|
||||||
|
U rem = val - (val / divisor.second) * static_cast<U>(divisor.first);
|
||||||
|
return static_cast<T>(rem);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/** Этот параметр контролирует поведение функций округления.
|
/** Этот параметр контролирует поведение функций округления.
|
||||||
*/
|
*/
|
||||||
enum ScaleMode
|
enum ScaleMode
|
||||||
@ -119,7 +181,14 @@ namespace DB
|
|||||||
typename std::enable_if<std::is_integral<T>::value
|
typename std::enable_if<std::is_integral<T>::value
|
||||||
&& ((scale_mode == PositiveScale) || (scale_mode == ZeroScale))>::type>
|
&& ((scale_mode == PositiveScale) || (scale_mode == ZeroScale))>::type>
|
||||||
{
|
{
|
||||||
static inline T compute(const T in, size_t scale)
|
using Divisor = int;
|
||||||
|
|
||||||
|
static inline Divisor prepare(size_t scale)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline T compute(T in, const Divisor & scale)
|
||||||
{
|
{
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
@ -129,14 +198,26 @@ namespace DB
|
|||||||
struct IntegerRoundingComputation<T, _MM_FROUND_NINT, NegativeScale,
|
struct IntegerRoundingComputation<T, _MM_FROUND_NINT, NegativeScale,
|
||||||
typename std::enable_if<std::is_integral<T>::value>::type>
|
typename std::enable_if<std::is_integral<T>::value>::type>
|
||||||
{
|
{
|
||||||
static inline T compute(T in, size_t scale)
|
using Op = FastModulo<T>;
|
||||||
|
using Divisor = typename Op::Divisor;
|
||||||
|
|
||||||
|
static inline Divisor prepare(size_t scale)
|
||||||
{
|
{
|
||||||
T rem = in % scale;
|
return Op::prepare(scale);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline T compute(T in, const Divisor & scale)
|
||||||
|
{
|
||||||
|
T factor = (in < 0) ? -1 : 1;
|
||||||
|
in *= factor;
|
||||||
|
T rem = Op::compute(in, scale);
|
||||||
in -= rem;
|
in -= rem;
|
||||||
if (static_cast<size_t>(2 * rem) < scale)
|
T res;
|
||||||
return in;
|
if ((2 * rem) < static_cast<T>(scale.first))
|
||||||
|
res = in;
|
||||||
else
|
else
|
||||||
return in + scale;
|
res = in + scale.first;
|
||||||
|
return factor * res;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -144,10 +225,21 @@ namespace DB
|
|||||||
struct IntegerRoundingComputation<T, _MM_FROUND_CEIL, NegativeScale,
|
struct IntegerRoundingComputation<T, _MM_FROUND_CEIL, NegativeScale,
|
||||||
typename std::enable_if<std::is_integral<T>::value>::type>
|
typename std::enable_if<std::is_integral<T>::value>::type>
|
||||||
{
|
{
|
||||||
static inline T compute(const T in, size_t scale)
|
using Op = FastModulo<T>;
|
||||||
|
using Divisor = typename Op::Divisor;
|
||||||
|
|
||||||
|
static inline Divisor prepare(size_t scale)
|
||||||
{
|
{
|
||||||
T rem = in % scale;
|
return Op::prepare(scale);
|
||||||
return in - rem + scale;
|
}
|
||||||
|
|
||||||
|
static inline T compute(T in, const Divisor & scale)
|
||||||
|
{
|
||||||
|
T factor = (in < 0) ? -1 : 1;
|
||||||
|
in *= factor;
|
||||||
|
T rem = Op::compute(in, scale);
|
||||||
|
T res = in - rem + scale.first;
|
||||||
|
return factor * res;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -155,10 +247,21 @@ namespace DB
|
|||||||
struct IntegerRoundingComputation<T, _MM_FROUND_FLOOR, NegativeScale,
|
struct IntegerRoundingComputation<T, _MM_FROUND_FLOOR, NegativeScale,
|
||||||
typename std::enable_if<std::is_integral<T>::value>::type>
|
typename std::enable_if<std::is_integral<T>::value>::type>
|
||||||
{
|
{
|
||||||
static inline T compute(const T in, size_t scale)
|
using Op = FastModulo<T>;
|
||||||
|
using Divisor = typename Op::Divisor;
|
||||||
|
|
||||||
|
static inline Divisor prepare(size_t scale)
|
||||||
{
|
{
|
||||||
T rem = in % scale;
|
return Op::prepare(scale);
|
||||||
return in - rem;
|
}
|
||||||
|
|
||||||
|
static inline T compute(T in, const Divisor & scale)
|
||||||
|
{
|
||||||
|
T factor = (in < 0) ? -1 : 1;
|
||||||
|
in *= factor;
|
||||||
|
T rem = Op::compute(in, scale);
|
||||||
|
T res = in - rem;
|
||||||
|
return factor * res;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -192,13 +295,13 @@ namespace DB
|
|||||||
struct FloatRoundingComputation<Float32, rounding_mode, PositiveScale>
|
struct FloatRoundingComputation<Float32, rounding_mode, PositiveScale>
|
||||||
: public BaseFloatRoundingComputation<Float32>
|
: public BaseFloatRoundingComputation<Float32>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
Float32 fscale = static_cast<Float32>(scale);
|
Float32 fscale = static_cast<Float32>(scale);
|
||||||
mm_scale = _mm_load1_ps(&fscale);
|
mm_scale = _mm_load1_ps(&fscale);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float32 * in, const Scale & scale, Float32 * out)
|
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128 val = _mm_loadu_ps(in);
|
__m128 val = _mm_loadu_ps(in);
|
||||||
val = _mm_mul_ps(val, scale);
|
val = _mm_mul_ps(val, scale);
|
||||||
@ -212,20 +315,37 @@ namespace DB
|
|||||||
struct FloatRoundingComputation<Float32, rounding_mode, NegativeScale>
|
struct FloatRoundingComputation<Float32, rounding_mode, NegativeScale>
|
||||||
: public BaseFloatRoundingComputation<Float32>
|
: public BaseFloatRoundingComputation<Float32>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
Float32 fscale = static_cast<Float32>(scale);
|
Float32 fscale = static_cast<Float32>(scale);
|
||||||
mm_scale = _mm_load1_ps(&fscale);
|
mm_scale = _mm_load1_ps(&fscale);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float32 * in, const Scale & scale, Float32 * out)
|
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128 val = _mm_loadu_ps(in);
|
__m128 val = _mm_loadu_ps(in);
|
||||||
|
|
||||||
|
/// Превратить отрицательные значения в положительные.
|
||||||
|
__m128 factor = _mm_cmpge_ps(val, getZero());
|
||||||
|
factor = _mm_min_ps(factor, getTwo());
|
||||||
|
factor = _mm_sub_ps(factor, getOne());
|
||||||
|
val = _mm_mul_ps(val, factor);
|
||||||
|
|
||||||
|
/// Алгоритм округления.
|
||||||
val = _mm_div_ps(val, scale);
|
val = _mm_div_ps(val, scale);
|
||||||
__m128 res = _mm_cmpge_ps(val, getOneTenth());
|
__m128 res = _mm_cmpge_ps(val, getOneTenth());
|
||||||
val = _mm_round_ps(val, rounding_mode);
|
val = _mm_round_ps(val, rounding_mode);
|
||||||
val = _mm_mul_ps(val, scale);
|
val = _mm_mul_ps(val, scale);
|
||||||
val = _mm_and_ps(val, res);
|
val = _mm_and_ps(val, res);
|
||||||
|
|
||||||
|
/// Предотвратить появление отрицательных нолей определённых в стандарте IEEE-754.
|
||||||
|
__m128 check = _mm_cmpeq_ps(val, getZero());
|
||||||
|
check = _mm_min_ps(check, getOne());
|
||||||
|
factor = _mm_add_ps(factor, check);
|
||||||
|
|
||||||
|
/// Вернуть настоящие знаки всех значений.
|
||||||
|
val = _mm_mul_ps(val, factor);
|
||||||
|
|
||||||
_mm_storeu_ps(out, val);
|
_mm_storeu_ps(out, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,17 +355,35 @@ namespace DB
|
|||||||
static const __m128 one_tenth = _mm_set1_ps(0.1);
|
static const __m128 one_tenth = _mm_set1_ps(0.1);
|
||||||
return one_tenth;
|
return one_tenth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline const __m128 & getZero()
|
||||||
|
{
|
||||||
|
static const __m128 zero = _mm_set1_ps(0.0);
|
||||||
|
return zero;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline const __m128 & getOne()
|
||||||
|
{
|
||||||
|
static const __m128 one = _mm_set1_ps(1.0);
|
||||||
|
return one;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline const __m128 & getTwo()
|
||||||
|
{
|
||||||
|
static const __m128 two = _mm_set1_ps(2.0);
|
||||||
|
return two;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template<int rounding_mode>
|
template<int rounding_mode>
|
||||||
struct FloatRoundingComputation<Float32, rounding_mode, ZeroScale>
|
struct FloatRoundingComputation<Float32, rounding_mode, ZeroScale>
|
||||||
: public BaseFloatRoundingComputation<Float32>
|
: public BaseFloatRoundingComputation<Float32>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float32 * in, const Scale & scale, Float32 * out)
|
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128 val = _mm_loadu_ps(in);
|
__m128 val = _mm_loadu_ps(in);
|
||||||
val = _mm_round_ps(val, rounding_mode);
|
val = _mm_round_ps(val, rounding_mode);
|
||||||
@ -257,13 +395,13 @@ namespace DB
|
|||||||
struct FloatRoundingComputation<Float64, rounding_mode, PositiveScale>
|
struct FloatRoundingComputation<Float64, rounding_mode, PositiveScale>
|
||||||
: public BaseFloatRoundingComputation<Float64>
|
: public BaseFloatRoundingComputation<Float64>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
Float64 fscale = static_cast<Float64>(scale);
|
Float64 fscale = static_cast<Float64>(scale);
|
||||||
mm_scale = _mm_load1_pd(&fscale);
|
mm_scale = _mm_load1_pd(&fscale);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float64 * in, const Scale & scale, Float64 * out)
|
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128d val = _mm_loadu_pd(in);
|
__m128d val = _mm_loadu_pd(in);
|
||||||
val = _mm_mul_pd(val, scale);
|
val = _mm_mul_pd(val, scale);
|
||||||
@ -277,20 +415,37 @@ namespace DB
|
|||||||
struct FloatRoundingComputation<Float64, rounding_mode, NegativeScale>
|
struct FloatRoundingComputation<Float64, rounding_mode, NegativeScale>
|
||||||
: public BaseFloatRoundingComputation<Float64>
|
: public BaseFloatRoundingComputation<Float64>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
Float64 fscale = static_cast<Float64>(scale);
|
Float64 fscale = static_cast<Float64>(scale);
|
||||||
mm_scale = _mm_load1_pd(&fscale);
|
mm_scale = _mm_load1_pd(&fscale);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float64 * in, const Scale & scale, Float64 * out)
|
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128d val = _mm_loadu_pd(in);
|
__m128d val = _mm_loadu_pd(in);
|
||||||
|
|
||||||
|
/// Превратить отрицательные значения в положительные.
|
||||||
|
__m128d factor = _mm_cmpge_pd(val, getZero());
|
||||||
|
factor = _mm_min_pd(factor, getTwo());
|
||||||
|
factor = _mm_sub_pd(factor, getOne());
|
||||||
|
val = _mm_mul_pd(val, factor);
|
||||||
|
|
||||||
|
/// Алгоритм округления.
|
||||||
val = _mm_div_pd(val, scale);
|
val = _mm_div_pd(val, scale);
|
||||||
__m128d res = _mm_cmpge_pd(val, getOneTenth());
|
__m128d res = _mm_cmpge_pd(val, getOneTenth());
|
||||||
val = _mm_round_pd(val, rounding_mode);
|
val = _mm_round_pd(val, rounding_mode);
|
||||||
val = _mm_mul_pd(val, scale);
|
val = _mm_mul_pd(val, scale);
|
||||||
val = _mm_and_pd(val, res);
|
val = _mm_and_pd(val, res);
|
||||||
|
|
||||||
|
/// Предотвратить появление отрицательных нолей определённых в стандарте IEEE-754.
|
||||||
|
__m128d check = _mm_cmpeq_pd(val, getZero());
|
||||||
|
check = _mm_min_pd(check, getOne());
|
||||||
|
factor = _mm_add_pd(factor, check);
|
||||||
|
|
||||||
|
/// Вернуть настоящие знаки всех значений.
|
||||||
|
val = _mm_mul_pd(val, factor);
|
||||||
|
|
||||||
_mm_storeu_pd(out, val);
|
_mm_storeu_pd(out, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -300,17 +455,35 @@ namespace DB
|
|||||||
static const __m128d one_tenth = _mm_set1_pd(0.1);
|
static const __m128d one_tenth = _mm_set1_pd(0.1);
|
||||||
return one_tenth;
|
return one_tenth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline const __m128d & getZero()
|
||||||
|
{
|
||||||
|
static const __m128d zero = _mm_set1_pd(0.0);
|
||||||
|
return zero;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline const __m128d & getOne()
|
||||||
|
{
|
||||||
|
static const __m128d one = _mm_set1_pd(1.0);
|
||||||
|
return one;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline const __m128d & getTwo()
|
||||||
|
{
|
||||||
|
static const __m128d two = _mm_set1_pd(2.0);
|
||||||
|
return two;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template<int rounding_mode>
|
template<int rounding_mode>
|
||||||
struct FloatRoundingComputation<Float64, rounding_mode, ZeroScale>
|
struct FloatRoundingComputation<Float64, rounding_mode, ZeroScale>
|
||||||
: public BaseFloatRoundingComputation<Float64>
|
: public BaseFloatRoundingComputation<Float64>
|
||||||
{
|
{
|
||||||
static inline void prepareScale(size_t scale, Scale & mm_scale)
|
static inline void prepare(size_t scale, Scale & mm_scale)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void compute(const Float64 * in, const Scale & scale, Float64 * out)
|
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
|
||||||
{
|
{
|
||||||
__m128d val = _mm_loadu_pd(in);
|
__m128d val = _mm_loadu_pd(in);
|
||||||
val = _mm_round_pd(val, rounding_mode);
|
val = _mm_round_pd(val, rounding_mode);
|
||||||
@ -337,14 +510,23 @@ namespace DB
|
|||||||
public:
|
public:
|
||||||
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
|
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
|
||||||
{
|
{
|
||||||
size_t size = in.size();
|
auto divisor = Op::prepare(scale);
|
||||||
for (size_t i = 0; i < size; ++i)
|
|
||||||
out[i] = Op::compute(in[i], scale);
|
const T* begin_in = &in[0];
|
||||||
|
const T* end_in = begin_in + in.size();
|
||||||
|
|
||||||
|
T* __restrict p_out = &out[0];
|
||||||
|
for (const T* __restrict p_in = begin_in; p_in != end_in; ++p_in)
|
||||||
|
{
|
||||||
|
*p_out = Op::compute(*p_in, divisor);
|
||||||
|
++p_out;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline T apply(T val, size_t scale)
|
static inline T apply(T val, size_t scale)
|
||||||
{
|
{
|
||||||
return Op::compute(val, scale);
|
auto divisor = Op::prepare(scale);
|
||||||
|
return Op::compute(val, divisor);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -363,26 +545,49 @@ namespace DB
|
|||||||
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
|
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
|
||||||
{
|
{
|
||||||
Scale mm_scale;
|
Scale mm_scale;
|
||||||
Op::prepareScale(scale, mm_scale);
|
Op::prepare(scale, mm_scale);
|
||||||
|
|
||||||
const size_t size = in.size();
|
|
||||||
const size_t data_count = std::tuple_size<Data>();
|
const size_t data_count = std::tuple_size<Data>();
|
||||||
|
|
||||||
size_t i;
|
const T* begin_in = &in[0];
|
||||||
for (i = 0; i < (size - data_count + 1); i += data_count)
|
const T* end_in = begin_in + in.size();
|
||||||
Op::compute(reinterpret_cast<const T *>(&in[i]), mm_scale, reinterpret_cast<T *>(&out[i]));
|
|
||||||
|
|
||||||
if (i < size)
|
T* begin_out = &out[0];
|
||||||
|
const T* end_out = begin_out + out.size();
|
||||||
|
|
||||||
|
const T* limit = end_in - (data_count - 1);
|
||||||
|
|
||||||
|
const T* __restrict p_in = begin_in;
|
||||||
|
T* __restrict p_out = begin_out;
|
||||||
|
for (; p_in < limit; p_in += data_count)
|
||||||
|
{
|
||||||
|
Op::compute(p_in, mm_scale, p_out);
|
||||||
|
p_out += data_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p_in < end_in)
|
||||||
{
|
{
|
||||||
Data tmp{0};
|
Data tmp{0};
|
||||||
for (size_t j = 0; (j < data_count) && ((i + j) < size); ++j)
|
T* begin_tmp = &tmp[0];
|
||||||
tmp[j] = in[i + j];
|
const T* end_tmp = begin_tmp + data_count;
|
||||||
|
|
||||||
|
for (T* __restrict p_tmp = begin_tmp; (p_tmp != end_tmp) && (p_in != end_in); ++p_tmp)
|
||||||
|
{
|
||||||
|
*p_tmp = *p_in;
|
||||||
|
++p_in;
|
||||||
|
}
|
||||||
|
|
||||||
Data res;
|
Data res;
|
||||||
|
const T* begin_res = &res[0];
|
||||||
|
const T* end_res = begin_res + data_count;
|
||||||
|
|
||||||
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
|
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
|
||||||
|
|
||||||
for (size_t j = 0; (j < data_count) && ((i + j) < size); ++j)
|
for (const T* __restrict p_res = begin_res; (p_res != end_res) && (p_out != end_out); ++p_res)
|
||||||
out[i + j] = res[j];
|
{
|
||||||
|
*p_out = *p_res;
|
||||||
|
++p_out;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,7 +598,7 @@ namespace DB
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
Scale mm_scale;
|
Scale mm_scale;
|
||||||
Op::prepareScale(scale, mm_scale);
|
Op::prepare(scale, mm_scale);
|
||||||
|
|
||||||
Data tmp{0};
|
Data tmp{0};
|
||||||
tmp[0] = val;
|
tmp[0] = val;
|
||||||
|
@ -234,6 +234,172 @@ private:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <char not_case_lower_bound, char not_case_upper_bound>
|
||||||
|
struct LowerUpperImplVectorized
|
||||||
|
{
|
||||||
|
template <char, char, int(int)> friend class LowerUpperUTF8ImplVectorized;
|
||||||
|
|
||||||
|
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
|
||||||
|
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
res_offsets.assign(offsets);
|
||||||
|
array(data.data(), data.data() + data.size(), res_data.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vector_fixed(const ColumnString::Chars_t & data, size_t n,
|
||||||
|
ColumnString::Chars_t & res_data)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
array(data.data(), data.data() + data.size(), res_data.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void constant(const std::string & data, std::string & res_data)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
array(reinterpret_cast<const UInt8 *>(data.data()), reinterpret_cast<const UInt8 *>(data.data() + data.size()),
|
||||||
|
reinterpret_cast<UInt8 *>(&res_data[0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
|
||||||
|
{
|
||||||
|
const auto src_end_sse = src_end - (src_end - src) % 16;
|
||||||
|
|
||||||
|
const auto flip_case_mask = 1 << 5;
|
||||||
|
|
||||||
|
const auto v_not_case_lower_bound = _mm_set1_epi8(not_case_lower_bound - 1);
|
||||||
|
const auto v_not_case_upper_bound = _mm_set1_epi8(not_case_upper_bound + 1);
|
||||||
|
const auto v_flip_case_mask = _mm_set1_epi8(flip_case_mask);
|
||||||
|
|
||||||
|
for (; src < src_end_sse; src += 16, dst += 16)
|
||||||
|
{
|
||||||
|
/// load 16 sequential 8-bit characters
|
||||||
|
const auto chars = _mm_loadu_si128(reinterpret_cast<const __m128i *>(src));
|
||||||
|
|
||||||
|
/// find which 8-bit sequences belong to range [case_lower_bound, case_upper_bound]
|
||||||
|
const auto is_not_case = _mm_and_si128(_mm_cmpgt_epi8(chars, v_not_case_lower_bound),
|
||||||
|
_mm_cmplt_epi8(chars, v_not_case_upper_bound));
|
||||||
|
|
||||||
|
/// keep `flip_case_mask` only where necessary, zero out elsewhere
|
||||||
|
const auto xor_mask = _mm_and_si128(v_flip_case_mask, is_not_case);
|
||||||
|
|
||||||
|
/// flip case by applying calculated mask
|
||||||
|
const auto cased_chars = _mm_xor_si128(chars, xor_mask);
|
||||||
|
|
||||||
|
/// store result back to destination
|
||||||
|
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst), cased_chars);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; src < src_end; ++src, ++dst)
|
||||||
|
*dst = (*src >= not_case_lower_bound && *src <= not_case_upper_bound) ? *src ^ flip_case_mask : *src;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <char not_case_lower_bound, char not_case_upper_bound, int to_case(int)>
|
||||||
|
struct LowerUpperUTF8ImplVectorized
|
||||||
|
{
|
||||||
|
static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets,
|
||||||
|
ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
res_offsets.assign(offsets);
|
||||||
|
array(data.data(), data.data() + data.size(), res_data.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vector_fixed(const ColumnString::Chars_t & data, size_t n,
|
||||||
|
ColumnString::Chars_t & res_data)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
array(data.data(), data.data() + data.size(), res_data.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void constant(const std::string & data, std::string & res_data)
|
||||||
|
{
|
||||||
|
res_data.resize(data.size());
|
||||||
|
array(reinterpret_cast<const UInt8 *>(data.data()), reinterpret_cast<const UInt8 *>(data.data() + data.size()),
|
||||||
|
reinterpret_cast<UInt8 *>(&res_data[0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
|
||||||
|
{
|
||||||
|
auto is_ascii = false;
|
||||||
|
|
||||||
|
if (isCaseASCII(src, src_end, is_ascii))
|
||||||
|
std::copy(src, src_end, dst);
|
||||||
|
else if (is_ascii)
|
||||||
|
LowerUpperImplVectorized<not_case_lower_bound, not_case_upper_bound>::array(src, src_end, dst);
|
||||||
|
else
|
||||||
|
UTF8ToCase(src, src_end, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isCaseASCII(const UInt8 * src, const UInt8 * const src_end, bool & is_ascii)
|
||||||
|
{
|
||||||
|
const auto src_end_sse = src_end - (src_end - src) % 16;
|
||||||
|
|
||||||
|
const auto not_case_a_16 = _mm_set1_epi8('A' - 1);
|
||||||
|
const auto not_case_z_16 = _mm_set1_epi8('Z' + 1);
|
||||||
|
const auto zero_16 = _mm_setzero_si128();
|
||||||
|
|
||||||
|
auto is_case = true;
|
||||||
|
|
||||||
|
for (; src < src_end_sse; src += 16)
|
||||||
|
{
|
||||||
|
const auto chars = _mm_loadu_si128(reinterpret_cast<const __m128i *>(src));
|
||||||
|
|
||||||
|
/// check for ASCII and case
|
||||||
|
const auto is_not_ascii = _mm_cmplt_epi8(chars, zero_16);
|
||||||
|
const auto mask_is_not_ascii = _mm_movemask_epi8(is_not_ascii);
|
||||||
|
|
||||||
|
if (mask_is_not_ascii != 0)
|
||||||
|
{
|
||||||
|
is_ascii = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto is_not_case = _mm_and_si128(_mm_cmpgt_epi8(chars, not_case_a_16),
|
||||||
|
_mm_cmplt_epi8(chars, not_case_z_16));
|
||||||
|
const auto mask_is_not_case = _mm_movemask_epi8(is_not_case);
|
||||||
|
|
||||||
|
if (mask_is_not_case != 0)
|
||||||
|
is_case = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// handle remaining symbols
|
||||||
|
for (; src < src_end; ++src)
|
||||||
|
if (*src > '\x7f')
|
||||||
|
{
|
||||||
|
is_ascii = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (*src >= 'A' && *src <= 'Z')
|
||||||
|
is_case = false;
|
||||||
|
|
||||||
|
is_ascii = true;
|
||||||
|
return is_case;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void UTF8ToCase(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
|
||||||
|
{
|
||||||
|
static const Poco::UTF8Encoding utf8;
|
||||||
|
|
||||||
|
while (src < src_end)
|
||||||
|
{
|
||||||
|
if (const auto chars = utf8.convert(to_case(utf8.convert(src)), dst, src_end - src))
|
||||||
|
{
|
||||||
|
src += chars;
|
||||||
|
dst += chars;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
++src;
|
||||||
|
++dst;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/** Если строка содержит текст в кодировке UTF-8 - перевести его в нижний (верхний) регистр.
|
/** Если строка содержит текст в кодировке UTF-8 - перевести его в нижний (верхний) регистр.
|
||||||
* Замечание: предполагается, что после перевода символа в другой регистр,
|
* Замечание: предполагается, что после перевода символа в другой регистр,
|
||||||
@ -1424,6 +1590,11 @@ struct NameReverseUTF8 { static constexpr auto name = "reverseUTF8"; };
|
|||||||
struct NameSubstring { static constexpr auto name = "substring"; };
|
struct NameSubstring { static constexpr auto name = "substring"; };
|
||||||
struct NameSubstringUTF8 { static constexpr auto name = "substringUTF8"; };
|
struct NameSubstringUTF8 { static constexpr auto name = "substringUTF8"; };
|
||||||
|
|
||||||
|
struct NameSSELower { static constexpr auto name = "sse_lower"; };
|
||||||
|
struct NameSSEUpper { static constexpr auto name = "sse_upper"; };
|
||||||
|
struct NameSSELowerUTF8 { static constexpr auto name = "sse_lowerUTF8"; };
|
||||||
|
struct NameSSEUpperUTF8 { static constexpr auto name = "sse_upperUTF8"; };
|
||||||
|
|
||||||
typedef FunctionStringOrArrayToT<EmptyImpl<false>, NameEmpty, UInt8> FunctionEmpty;
|
typedef FunctionStringOrArrayToT<EmptyImpl<false>, NameEmpty, UInt8> FunctionEmpty;
|
||||||
typedef FunctionStringOrArrayToT<EmptyImpl<true>, NameNotEmpty, UInt8> FunctionNotEmpty;
|
typedef FunctionStringOrArrayToT<EmptyImpl<true>, NameNotEmpty, UInt8> FunctionNotEmpty;
|
||||||
typedef FunctionStringOrArrayToT<LengthImpl, NameLength, UInt64> FunctionLength;
|
typedef FunctionStringOrArrayToT<LengthImpl, NameLength, UInt64> FunctionLength;
|
||||||
@ -1437,5 +1608,10 @@ typedef FunctionStringToString<ReverseUTF8Impl, NameReverseUTF8> FunctionReve
|
|||||||
typedef FunctionStringNumNumToString<SubstringImpl, NameSubstring> FunctionSubstring;
|
typedef FunctionStringNumNumToString<SubstringImpl, NameSubstring> FunctionSubstring;
|
||||||
typedef FunctionStringNumNumToString<SubstringUTF8Impl, NameSubstringUTF8> FunctionSubstringUTF8;
|
typedef FunctionStringNumNumToString<SubstringUTF8Impl, NameSubstringUTF8> FunctionSubstringUTF8;
|
||||||
|
|
||||||
|
using FunctionSSELower = FunctionStringToString<LowerUpperImplVectorized<'A', 'Z'>, NameSSELower>;
|
||||||
|
using FunctionSSEUpper = FunctionStringToString<LowerUpperImplVectorized<'a', 'z'>, NameSSEUpper>;
|
||||||
|
using FunctionSSELowerUTF8 = FunctionStringToString<LowerUpperUTF8ImplVectorized<'A', 'Z', Poco::Unicode::toLower>, NameSSELowerUTF8>;
|
||||||
|
using FunctionSSEUpperUTF8 = FunctionStringToString<LowerUpperUTF8ImplVectorized<'a', 'z', Poco::Unicode::toUpper>, NameSSEUpperUTF8>;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <DB/Interpreters/Settings.h>
|
#include <DB/Interpreters/Settings.h>
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
#include <DB/Client/ConnectionPool.h>
|
#include <DB/Client/ConnectionPool.h>
|
||||||
#include <DB/Client/ConnectionPoolWithFailover.h>
|
#include <DB/Client/ConnectionPoolWithFailover.h>
|
||||||
#include <Poco/Net/SocketAddress.h>
|
#include <Poco/Net/SocketAddress.h>
|
||||||
@ -16,10 +15,10 @@ namespace DB
|
|||||||
class Cluster : private boost::noncopyable
|
class Cluster : private boost::noncopyable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const String & cluster_name);
|
Cluster(const Settings & settings, const String & cluster_name);
|
||||||
|
|
||||||
/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
|
/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
|
||||||
Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, std::vector<std::vector<String>> names,
|
Cluster(const Settings & settings, std::vector<std::vector<String>> names,
|
||||||
const String & username, const String & password);
|
const String & username, const String & password);
|
||||||
|
|
||||||
/// количество узлов clickhouse сервера, расположенных локально
|
/// количество узлов clickhouse сервера, расположенных локально
|
||||||
@ -98,8 +97,7 @@ struct Clusters
|
|||||||
typedef std::map<String, Cluster> Impl;
|
typedef std::map<String, Cluster> Impl;
|
||||||
Impl impl;
|
Impl impl;
|
||||||
|
|
||||||
Clusters(const Settings & settings, const DataTypeFactory & data_type_factory,
|
Clusters(const Settings & settings, const String & config_name = "remote_servers");
|
||||||
const String & config_name = "remote_servers");
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,6 @@ public:
|
|||||||
|
|
||||||
const TableFunctionFactory & getTableFunctionFactory() const;
|
const TableFunctionFactory & getTableFunctionFactory() const;
|
||||||
const AggregateFunctionFactory & getAggregateFunctionFactory() const;
|
const AggregateFunctionFactory & getAggregateFunctionFactory() const;
|
||||||
const DataTypeFactory & getDataTypeFactory() const;
|
|
||||||
const FormatFactory & getFormatFactory() const;
|
const FormatFactory & getFormatFactory() const;
|
||||||
const Dictionaries & getDictionaries() const;
|
const Dictionaries & getDictionaries() const;
|
||||||
const ExternalDictionaries & getExternalDictionaries() const;
|
const ExternalDictionaries & getExternalDictionaries() const;
|
||||||
|
@ -22,6 +22,7 @@ struct SubqueryForSet
|
|||||||
{
|
{
|
||||||
/// Источник - получен с помощью InterpreterSelectQuery подзапроса.
|
/// Источник - получен с помощью InterpreterSelectQuery подзапроса.
|
||||||
BlockInputStreamPtr source;
|
BlockInputStreamPtr source;
|
||||||
|
Block source_sample;
|
||||||
|
|
||||||
/// Если задано - создать из результата Set.
|
/// Если задано - создать из результата Set.
|
||||||
SetPtr set;
|
SetPtr set;
|
||||||
|
@ -77,7 +77,7 @@ private:
|
|||||||
|
|
||||||
Context context;
|
Context context;
|
||||||
|
|
||||||
static void parseAlter(const ASTAlterQuery::ParameterContainer & params, const DataTypeFactory & data_type_factory,
|
static void parseAlter(const ASTAlterQuery::ParameterContainer & params,
|
||||||
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands);
|
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ struct ColumnsDescription
|
|||||||
|
|
||||||
String toString() const;
|
String toString() const;
|
||||||
|
|
||||||
static ColumnsDescription parse(const String & str, const DataTypeFactory & data_type_factory);
|
static ColumnsDescription parse(const String & str);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ private:
|
|||||||
const std::string & user, const std::string & password) {
|
const std::string & user, const std::string & password) {
|
||||||
return new ConnectionPool{
|
return new ConnectionPool{
|
||||||
1, host, port, "",
|
1, host, port, "",
|
||||||
user, password, storage.context.getDataTypeFactory(),
|
user, password,
|
||||||
storage.getName() + '_' + name};
|
storage.getName() + '_' + name};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -477,7 +477,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
||||||
columns.readText(file, storage.context.getDataTypeFactory());
|
columns.readText(file);
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkNotBroken(bool require_part_metadata)
|
void checkNotBroken(bool require_part_metadata)
|
||||||
@ -853,13 +853,13 @@ public:
|
|||||||
const MergeTreeSettings settings;
|
const MergeTreeSettings settings;
|
||||||
|
|
||||||
const ASTPtr primary_expr_ast;
|
const ASTPtr primary_expr_ast;
|
||||||
|
Block primary_key_sample;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool require_part_metadata;
|
bool require_part_metadata;
|
||||||
|
|
||||||
ExpressionActionsPtr primary_expr;
|
ExpressionActionsPtr primary_expr;
|
||||||
SortDescription sort_descr;
|
SortDescription sort_descr;
|
||||||
Block primary_key_sample;
|
|
||||||
|
|
||||||
String full_path;
|
String full_path;
|
||||||
|
|
||||||
|
@ -28,8 +28,11 @@ public:
|
|||||||
* - Проверяет правильность засечек.
|
* - Проверяет правильность засечек.
|
||||||
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
|
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
|
||||||
*/
|
*/
|
||||||
static void checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
|
static void checkDataPart(
|
||||||
MergeTreeData::DataPart::Checksums * out_checksums = nullptr);
|
String path,
|
||||||
|
const Settings & settings,
|
||||||
|
const Block & primary_key_sample, /// Проверять первичный ключ. Если не надо - передайте пустой Block.
|
||||||
|
MergeTreeData::DataPart::Checksums * out_checksums = nullptr);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ protected:
|
|||||||
void restore();
|
void restore();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory);
|
void restoreFromFile(const String & file_path);
|
||||||
|
|
||||||
/// Вставить блок в состояние.
|
/// Вставить блок в состояние.
|
||||||
virtual void insertBlock(const Block & block) = 0;
|
virtual void insertBlock(const Block & block) = 0;
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <DB/TableFunctions/ITableFunction.h>
|
#include <DB/TableFunctions/ITableFunction.h>
|
||||||
#include <DB/Storages/StorageDistributed.h>
|
#include <DB/Storages/StorageDistributed.h>
|
||||||
#include <DB/Parsers/ASTIdentifier.h>
|
#include <DB/Parsers/ASTIdentifier.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
#include <DB/DataStreams/RemoteBlockInputStream.h>
|
#include <DB/DataStreams/RemoteBlockInputStream.h>
|
||||||
#include <DB/Interpreters/reinterpretAsIdentifier.h>
|
#include <DB/Interpreters/reinterpretAsIdentifier.h>
|
||||||
#include <DB/Interpreters/Cluster.h>
|
#include <DB/Interpreters/Cluster.h>
|
||||||
@ -117,7 +118,7 @@ public:
|
|||||||
if (names.empty())
|
if (names.empty())
|
||||||
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
|
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
|
||||||
|
|
||||||
SharedPtr<Cluster> cluster = new Cluster(context.getSettings(), context.getDataTypeFactory(), names, username, password);
|
SharedPtr<Cluster> cluster = new Cluster(context.getSettings(), names, username, password);
|
||||||
|
|
||||||
return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context),
|
return StorageDistributed::create(getName(), chooseColumns(*cluster, remote_database, remote_table, context),
|
||||||
remote_database, remote_table, cluster, context);
|
remote_database, remote_table, cluster, context);
|
||||||
@ -140,6 +141,8 @@ private:
|
|||||||
};
|
};
|
||||||
input->readPrefix();
|
input->readPrefix();
|
||||||
|
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
Block current = input->read();
|
Block current = input->read();
|
||||||
@ -153,7 +156,7 @@ private:
|
|||||||
String column_name = (*name)[i].get<const String &>();
|
String column_name = (*name)[i].get<const String &>();
|
||||||
String data_type_name = (*type)[i].get<const String &>();
|
String data_type_name = (*type)[i].get<const String &>();
|
||||||
|
|
||||||
res.emplace_back(column_name, context.getDataTypeFactory().get(data_type_name));
|
res.emplace_back(column_name, data_type_factory.get(data_type_name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public:
|
|||||||
const String & host_, UInt16 port_, const String & default_database_,
|
const String & host_, UInt16 port_, const String & default_database_,
|
||||||
const String & user_, const String & password_, const Settings & settings_)
|
const String & user_, const String & password_, const Settings & settings_)
|
||||||
: concurrency(concurrency_), delay(delay_), queue(concurrency),
|
: concurrency(concurrency_), delay(delay_), queue(concurrency),
|
||||||
connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory),
|
connections(concurrency, host_, port_, default_database_, user_, password_),
|
||||||
settings(settings_), pool(concurrency)
|
settings(settings_), pool(concurrency)
|
||||||
{
|
{
|
||||||
std::cerr << std::fixed << std::setprecision(3);
|
std::cerr << std::fixed << std::setprecision(3);
|
||||||
@ -73,7 +73,6 @@ private:
|
|||||||
typedef ConcurrentBoundedQueue<Query> Queue;
|
typedef ConcurrentBoundedQueue<Query> Queue;
|
||||||
Queue queue;
|
Queue queue;
|
||||||
|
|
||||||
DataTypeFactory data_type_factory;
|
|
||||||
ConnectionPool connections;
|
ConnectionPool connections;
|
||||||
Settings settings;
|
Settings settings;
|
||||||
|
|
||||||
|
@ -336,7 +336,7 @@ private:
|
|||||||
<< (!user.empty() ? " as user " + user : "")
|
<< (!user.empty() ? " as user " + user : "")
|
||||||
<< "." << std::endl;
|
<< "." << std::endl;
|
||||||
|
|
||||||
connection = new Connection(host, port, default_database, user, password, context.getDataTypeFactory(), "client", compression,
|
connection = new Connection(host, port, default_database, user, password, "client", compression,
|
||||||
Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||||
Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||||
Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
|
||||||
@ -698,7 +698,7 @@ private:
|
|||||||
current_format = insert->format;
|
current_format = insert->format;
|
||||||
|
|
||||||
BlockInputStreamPtr block_input = context.getFormatFactory().getInput(
|
BlockInputStreamPtr block_input = context.getFormatFactory().getInput(
|
||||||
current_format, buf, sample, insert_format_max_block_size, context.getDataTypeFactory());
|
current_format, buf, sample, insert_format_max_block_size);
|
||||||
|
|
||||||
BlockInputStreamPtr async_block_input = new AsynchronousBlockInputStream(block_input);
|
BlockInputStreamPtr async_block_input = new AsynchronousBlockInputStream(block_input);
|
||||||
|
|
||||||
|
@ -494,7 +494,7 @@ void Connection::initBlockInput()
|
|||||||
else
|
else
|
||||||
maybe_compressed_in = in;
|
maybe_compressed_in = in;
|
||||||
|
|
||||||
block_in = new NativeBlockInputStream(*maybe_compressed_in, data_type_factory, server_revision);
|
block_in = new NativeBlockInputStream(*maybe_compressed_in, server_revision);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,10 +1,14 @@
|
|||||||
#include <DB/Core/NamesAndTypes.h>
|
#include <DB/Core/NamesAndTypes.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
void NamesAndTypesList::readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory)
|
void NamesAndTypesList::readText(ReadBuffer & buf)
|
||||||
{
|
{
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
DB::assertString("columns format version: 1\n", buf);
|
DB::assertString("columns format version: 1\n", buf);
|
||||||
size_t count;
|
size_t count;
|
||||||
DB::readText(count, buf);
|
DB::readText(count, buf);
|
||||||
@ -45,11 +49,11 @@ String NamesAndTypesList::toString() const
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
NamesAndTypesList NamesAndTypesList::parse(const String & s, const DataTypeFactory & data_type_factory)
|
NamesAndTypesList NamesAndTypesList::parse(const String & s)
|
||||||
{
|
{
|
||||||
ReadBufferFromString in(s);
|
ReadBufferFromString in(s);
|
||||||
NamesAndTypesList res;
|
NamesAndTypesList res;
|
||||||
res.readText(in, data_type_factory);
|
res.readText(in);
|
||||||
assertEOF(in);
|
assertEOF(in);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -25,10 +25,10 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf,
|
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf,
|
||||||
Block & sample, size_t max_block_size, const DataTypeFactory & data_type_factory) const
|
Block & sample, size_t max_block_size) const
|
||||||
{
|
{
|
||||||
if (name == "Native")
|
if (name == "Native")
|
||||||
return new NativeBlockInputStream(buf, data_type_factory);
|
return new NativeBlockInputStream(buf);
|
||||||
else if (name == "TabSeparated")
|
else if (name == "TabSeparated")
|
||||||
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size);
|
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size);
|
||||||
else if (name == "RowBinary")
|
else if (name == "RowBinary")
|
||||||
|
@ -65,7 +65,7 @@ Block MergeSortingBlockInputStream::readImpl()
|
|||||||
/// Сформируем сортированные потоки для слияния.
|
/// Сформируем сортированные потоки для слияния.
|
||||||
for (const auto & file : temporary_files)
|
for (const auto & file : temporary_files)
|
||||||
{
|
{
|
||||||
temporary_inputs.emplace_back(new TemporaryFileStream(file->path(), data_type_factory));
|
temporary_inputs.emplace_back(new TemporaryFileStream(file->path()));
|
||||||
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
|
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include <DB/Columns/ColumnArray.h>
|
#include <DB/Columns/ColumnArray.h>
|
||||||
#include <DB/DataTypes/DataTypeArray.h>
|
#include <DB/DataTypes/DataTypeArray.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
|
|
||||||
#include <DB/DataStreams/NativeBlockInputStream.h>
|
#include <DB/DataStreams/NativeBlockInputStream.h>
|
||||||
|
|
||||||
@ -44,6 +45,8 @@ Block NativeBlockInputStream::readImpl()
|
|||||||
{
|
{
|
||||||
Block res;
|
Block res;
|
||||||
|
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
if (istr.eof())
|
if (istr.eof())
|
||||||
return res;
|
return res;
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
#include <DB/DataTypes/DataTypeString.h>
|
#include <DB/DataTypes/DataTypeString.h>
|
||||||
#include <DB/DataTypes/DataTypeFixedString.h>
|
#include <DB/DataTypes/DataTypeFixedString.h>
|
||||||
#include <DB/DataTypes/DataTypeDateTime.h>
|
#include <DB/DataTypes/DataTypeDateTime.h>
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
|
|
||||||
#include <DB/DataStreams/NativeBlockInputStream.h>
|
#include <DB/DataStreams/NativeBlockInputStream.h>
|
||||||
#include <DB/DataStreams/NativeBlockOutputStream.h>
|
#include <DB/DataStreams/NativeBlockOutputStream.h>
|
||||||
@ -117,11 +116,9 @@ int main(int argc, char ** argv)
|
|||||||
/// читаем данные из native файла и одновременно пишем в таблицу
|
/// читаем данные из native файла и одновременно пишем в таблицу
|
||||||
if (argc == 2 && 0 == strcmp(argv[1], "write"))
|
if (argc == 2 && 0 == strcmp(argv[1], "write"))
|
||||||
{
|
{
|
||||||
DataTypeFactory factory;
|
|
||||||
|
|
||||||
ReadBufferFromFileDescriptor in1(STDIN_FILENO);
|
ReadBufferFromFileDescriptor in1(STDIN_FILENO);
|
||||||
CompressedReadBuffer in2(in1);
|
CompressedReadBuffer in2(in1);
|
||||||
NativeBlockInputStream in3(in2, factory, Revision::get());
|
NativeBlockInputStream in3(in2, Revision::get());
|
||||||
SharedPtr<IBlockOutputStream> out = table->write(0);
|
SharedPtr<IBlockOutputStream> out = table->write(0);
|
||||||
copyData(in3, *out);
|
copyData(in3, *out);
|
||||||
}
|
}
|
||||||
|
@ -148,11 +148,10 @@ int main(int argc, char ** argv)
|
|||||||
sort_columns.push_back(SortColumnDescription(3, 1));
|
sort_columns.push_back(SortColumnDescription(3, 1));
|
||||||
|
|
||||||
QueryProcessingStage::Enum stage;
|
QueryProcessingStage::Enum stage;
|
||||||
DataTypeFactory data_type_factory;
|
|
||||||
|
|
||||||
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
|
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
|
||||||
in = new PartialSortingBlockInputStream(in, sort_columns);
|
in = new PartialSortingBlockInputStream(in, sort_columns);
|
||||||
in = new MergeSortingBlockInputStream(in, sort_columns, DEFAULT_BLOCK_SIZE, 0, 0, "", data_type_factory);
|
in = new MergeSortingBlockInputStream(in, sort_columns, DEFAULT_BLOCK_SIZE, 0, 0, "");
|
||||||
//in = new LimitBlockInputStream(in, 10);
|
//in = new LimitBlockInputStream(in, 10);
|
||||||
|
|
||||||
WriteBufferFromOStream ob(std::cout);
|
WriteBufferFromOStream ob(std::cout);
|
||||||
|
@ -20,6 +20,10 @@ void registerFunctionsString(FunctionFactory & factory)
|
|||||||
factory.registerFunction<FunctionSubstring>();
|
factory.registerFunction<FunctionSubstring>();
|
||||||
factory.registerFunction<FunctionSubstringUTF8>();
|
factory.registerFunction<FunctionSubstringUTF8>();
|
||||||
factory.registerFunction<FunctionAppendTrailingCharIfAbsent>();
|
factory.registerFunction<FunctionAppendTrailingCharIfAbsent>();
|
||||||
|
factory.registerFunction<FunctionSSELower>();
|
||||||
|
factory.registerFunction<FunctionSSEUpper>();
|
||||||
|
factory.registerFunction<FunctionSSELowerUTF8>();
|
||||||
|
factory.registerFunction<FunctionSSEUpperUTF8>();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ namespace
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_factory, const String & config_name)
|
Clusters::Clusters(const Settings & settings, const String & config_name)
|
||||||
{
|
{
|
||||||
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
|
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
|
||||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||||
@ -56,11 +56,11 @@ Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_
|
|||||||
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
|
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
|
||||||
impl.emplace(std::piecewise_construct,
|
impl.emplace(std::piecewise_construct,
|
||||||
std::forward_as_tuple(*it),
|
std::forward_as_tuple(*it),
|
||||||
std::forward_as_tuple(settings, data_type_factory, config_name + "." + *it));
|
std::forward_as_tuple(settings, config_name + "." + *it));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const String & cluster_name)
|
Cluster::Cluster(const Settings & settings, const String & cluster_name)
|
||||||
{
|
{
|
||||||
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
|
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
|
||||||
Poco::Util::AbstractConfiguration::Keys config_keys;
|
Poco::Util::AbstractConfiguration::Keys config_keys;
|
||||||
@ -179,7 +179,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
|
|||||||
replicas.emplace_back(new ConnectionPool(
|
replicas.emplace_back(new ConnectionPool(
|
||||||
settings.distributed_connections_pool_size,
|
settings.distributed_connections_pool_size,
|
||||||
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
|
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
|
||||||
data_type_factory, "server", Protocol::Compression::Enable,
|
"server", Protocol::Compression::Enable,
|
||||||
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
||||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||||
@ -205,7 +205,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
|
|||||||
pools.emplace_back(new ConnectionPool(
|
pools.emplace_back(new ConnectionPool(
|
||||||
settings.distributed_connections_pool_size,
|
settings.distributed_connections_pool_size,
|
||||||
address.host_port.host().toString(), address.host_port.port(), "", address.user, address.password,
|
address.host_port.host().toString(), address.host_port.port(), "", address.user, address.password,
|
||||||
data_type_factory, "server", Protocol::Compression::Enable,
|
"server", Protocol::Compression::Enable,
|
||||||
saturate(settings.connect_timeout, settings.limits.max_execution_time),
|
saturate(settings.connect_timeout, settings.limits.max_execution_time),
|
||||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||||
@ -217,7 +217,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, std::vector<std::vector<String>> names,
|
Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> names,
|
||||||
const String & username, const String & password)
|
const String & username, const String & password)
|
||||||
{
|
{
|
||||||
for (const auto & shard : names)
|
for (const auto & shard : names)
|
||||||
@ -238,7 +238,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
|
|||||||
replicas.emplace_back(new ConnectionPool(
|
replicas.emplace_back(new ConnectionPool(
|
||||||
settings.distributed_connections_pool_size,
|
settings.distributed_connections_pool_size,
|
||||||
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
|
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
|
||||||
data_type_factory, "server", Protocol::Compression::Enable,
|
"server", Protocol::Compression::Enable,
|
||||||
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
|
||||||
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
saturate(settings.receive_timeout, settings.limits.max_execution_time),
|
||||||
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
saturate(settings.send_timeout, settings.limits.max_execution_time)));
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
#include <DB/DataStreams/FormatFactory.h>
|
#include <DB/DataStreams/FormatFactory.h>
|
||||||
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
|
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
|
||||||
#include <DB/TableFunctions/TableFunctionFactory.h>
|
#include <DB/TableFunctions/TableFunctionFactory.h>
|
||||||
#include <DB/DataTypes/DataTypeFactory.h>
|
|
||||||
#include <DB/Storages/IStorage.h>
|
#include <DB/Storages/IStorage.h>
|
||||||
#include <DB/Storages/MarkCache.h>
|
#include <DB/Storages/MarkCache.h>
|
||||||
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
|
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
|
||||||
@ -72,7 +71,6 @@ struct ContextShared
|
|||||||
Databases databases; /// Список БД и таблиц в них.
|
Databases databases; /// Список БД и таблиц в них.
|
||||||
TableFunctionFactory table_function_factory; /// Табличные функции.
|
TableFunctionFactory table_function_factory; /// Табличные функции.
|
||||||
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
|
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
|
||||||
DataTypeFactory data_type_factory; /// Типы данных.
|
|
||||||
FormatFactory format_factory; /// Форматы.
|
FormatFactory format_factory; /// Форматы.
|
||||||
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
|
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
|
||||||
mutable SharedPtr<ExternalDictionaries> external_dictionaries;
|
mutable SharedPtr<ExternalDictionaries> external_dictionaries;
|
||||||
@ -155,7 +153,6 @@ Context::~Context() = default;
|
|||||||
|
|
||||||
const TableFunctionFactory & Context::getTableFunctionFactory() const { return shared->table_function_factory; }
|
const TableFunctionFactory & Context::getTableFunctionFactory() const { return shared->table_function_factory; }
|
||||||
const AggregateFunctionFactory & Context::getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
|
const AggregateFunctionFactory & Context::getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
|
||||||
const DataTypeFactory & Context::getDataTypeFactory() const { return shared->data_type_factory; }
|
|
||||||
const FormatFactory & Context::getFormatFactory() const { return shared->format_factory; }
|
const FormatFactory & Context::getFormatFactory() const { return shared->format_factory; }
|
||||||
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
|
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
|
||||||
Poco::Mutex & Context::getMutex() const { return shared->mutex; }
|
Poco::Mutex & Context::getMutex() const { return shared->mutex; }
|
||||||
@ -800,7 +797,7 @@ void Context::initClusters()
|
|||||||
{
|
{
|
||||||
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
|
||||||
if (!shared->clusters)
|
if (!shared->clusters)
|
||||||
shared->clusters = new Clusters(settings, shared->data_type_factory);
|
shared->clusters = new Clusters(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
Cluster & Context::getCluster(const std::string & cluster_name)
|
Cluster & Context::getCluster(const std::string & cluster_name)
|
||||||
|
@ -769,6 +769,7 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
|
|||||||
|
|
||||||
external_tables[external_table_name] = external_storage;
|
external_tables[external_table_name] = external_storage;
|
||||||
subqueries_for_sets[external_table_name].source = interpreter->execute();
|
subqueries_for_sets[external_table_name].source = interpreter->execute();
|
||||||
|
subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock();
|
||||||
subqueries_for_sets[external_table_name].table = external_storage;
|
subqueries_for_sets[external_table_name].table = external_storage;
|
||||||
|
|
||||||
/** NOTE Если было написано IN tmp_table - существующая временная (но не внешняя) таблица,
|
/** NOTE Если было написано IN tmp_table - существующая временная (но не внешняя) таблица,
|
||||||
@ -842,6 +843,7 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
|
|||||||
{
|
{
|
||||||
auto interpreter = interpretSubquery(arg, context, subquery_depth);
|
auto interpreter = interpretSubquery(arg, context, subquery_depth);
|
||||||
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
||||||
|
subquery_for_set.source_sample = interpreter->getSampleBlock();
|
||||||
|
|
||||||
/** Зачем используется LazyBlockInputStream?
|
/** Зачем используется LazyBlockInputStream?
|
||||||
*
|
*
|
||||||
@ -1591,10 +1593,12 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
|
|||||||
{
|
{
|
||||||
auto interpreter = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns);
|
auto interpreter = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns);
|
||||||
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
subquery_for_set.source = new LazyBlockInputStream([interpreter]() mutable { return interpreter->execute(); });
|
||||||
join->setSampleBlock(interpreter->getSampleBlock());
|
subquery_for_set.source_sample = interpreter->getSampleBlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO Это не нужно выставлять, когда JOIN нужен только на удалённых серверах.
|
||||||
subquery_for_set.join = join;
|
subquery_for_set.join = join;
|
||||||
|
subquery_for_set.join->setSampleBlock(subquery_for_set.source_sample);
|
||||||
}
|
}
|
||||||
|
|
||||||
addJoinAction(step.actions, false);
|
addJoinAction(step.actions, false);
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
#include <DB/IO/copyData.h>
|
#include <DB/IO/copyData.h>
|
||||||
#include <DB/IO/ReadBufferFromFile.h>
|
#include <DB/IO/ReadBufferFromFile.h>
|
||||||
#include <DB/Common/escapeForFileName.h>
|
#include <DB/Common/escapeForFileName.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
#include <DB/Parsers/formatAST.h>
|
#include <DB/Parsers/formatAST.h>
|
||||||
#include <DB/Parsers/parseQuery.h>
|
#include <DB/Parsers/parseQuery.h>
|
||||||
|
|
||||||
@ -35,7 +36,7 @@ void InterpreterAlterQuery::execute()
|
|||||||
|
|
||||||
AlterCommands alter_commands;
|
AlterCommands alter_commands;
|
||||||
PartitionCommands partition_commands;
|
PartitionCommands partition_commands;
|
||||||
parseAlter(alter.parameters, context.getDataTypeFactory(), alter_commands, partition_commands);
|
parseAlter(alter.parameters, alter_commands, partition_commands);
|
||||||
|
|
||||||
for (const PartitionCommand & command : partition_commands)
|
for (const PartitionCommand & command : partition_commands)
|
||||||
{
|
{
|
||||||
@ -71,9 +72,11 @@ void InterpreterAlterQuery::execute()
|
|||||||
}
|
}
|
||||||
|
|
||||||
void InterpreterAlterQuery::parseAlter(
|
void InterpreterAlterQuery::parseAlter(
|
||||||
const ASTAlterQuery::ParameterContainer & params_container, const DataTypeFactory & data_type_factory,
|
const ASTAlterQuery::ParameterContainer & params_container,
|
||||||
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands)
|
AlterCommands & out_alter_commands, PartitionCommands & out_partition_commands)
|
||||||
{
|
{
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
for (const auto & params : params_container)
|
for (const auto & params : params_container)
|
||||||
{
|
{
|
||||||
if (params.type == ASTAlterQuery::ADD_COLUMN)
|
if (params.type == ASTAlterQuery::ADD_COLUMN)
|
||||||
|
@ -25,9 +25,11 @@
|
|||||||
#include <DB/Interpreters/InterpreterSelectQuery.h>
|
#include <DB/Interpreters/InterpreterSelectQuery.h>
|
||||||
#include <DB/Interpreters/InterpreterCreateQuery.h>
|
#include <DB/Interpreters/InterpreterCreateQuery.h>
|
||||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||||
|
|
||||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||||
#include <DB/DataTypes/DataTypeNested.h>
|
#include <DB/DataTypes/DataTypeNested.h>
|
||||||
#include <DB/DataTypes/DataTypeFixedString.h>
|
#include <DB/DataTypes/DataTypeFixedString.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -272,6 +274,8 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
|
|||||||
ASTPtr default_expr_list{new ASTExpressionList};
|
ASTPtr default_expr_list{new ASTExpressionList};
|
||||||
default_expr_list->children.reserve(column_list_ast.children.size());
|
default_expr_list->children.reserve(column_list_ast.children.size());
|
||||||
|
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
for (auto & ast : column_list_ast.children)
|
for (auto & ast : column_list_ast.children)
|
||||||
{
|
{
|
||||||
auto & col_decl = typeid_cast<ASTColumnDeclaration &>(*ast);
|
auto & col_decl = typeid_cast<ASTColumnDeclaration &>(*ast);
|
||||||
@ -280,7 +284,7 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
|
|||||||
{
|
{
|
||||||
const auto & type_range = col_decl.type->range;
|
const auto & type_range = col_decl.type->range;
|
||||||
columns.emplace_back(col_decl.name,
|
columns.emplace_back(col_decl.name,
|
||||||
context.getDataTypeFactory().get({ type_range.first, type_range.second }));
|
data_type_factory.get({ type_range.first, type_range.second }));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
/// we're creating dummy DataTypeUInt8 in order to prevent the NullPointerException in ExpressionActions
|
/// we're creating dummy DataTypeUInt8 in order to prevent the NullPointerException in ExpressionActions
|
||||||
|
@ -114,8 +114,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
|
|||||||
|
|
||||||
BlockInputStreamPtr in{
|
BlockInputStreamPtr in{
|
||||||
context.getFormatFactory().getInput(
|
context.getFormatFactory().getInput(
|
||||||
format, istr, sample, context.getSettings().max_insert_block_size,
|
format, istr, sample, context.getSettings().max_insert_block_size)};
|
||||||
context.getDataTypeFactory())};
|
|
||||||
|
|
||||||
copyData(*in, *out);
|
copyData(*in, *out);
|
||||||
}
|
}
|
||||||
|
@ -887,7 +887,7 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
|
|||||||
/// Сливаем сортированные блоки.
|
/// Сливаем сортированные блоки.
|
||||||
stream = new MergeSortingBlockInputStream(
|
stream = new MergeSortingBlockInputStream(
|
||||||
stream, order_descr, settings.max_block_size, limit,
|
stream, order_descr, settings.max_block_size, limit,
|
||||||
settings.limits.max_bytes_before_external_sort, context.getTemporaryPath(), context.getDataTypeFactory());
|
settings.limits.max_bytes_before_external_sort, context.getTemporaryPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <DB/Parsers/formatAST.h>
|
#include <DB/Parsers/formatAST.h>
|
||||||
#include <DB/Parsers/parseQuery.h>
|
#include <DB/Parsers/parseQuery.h>
|
||||||
#include <DB/Parsers/ExpressionListParsers.h>
|
#include <DB/Parsers/ExpressionListParsers.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char ** argv)
|
int main(int argc, char ** argv)
|
||||||
@ -25,7 +26,7 @@ int main(int argc, char ** argv)
|
|||||||
{
|
{
|
||||||
NameAndTypePair col;
|
NameAndTypePair col;
|
||||||
col.name = argv[i];
|
col.name = argv[i];
|
||||||
col.type = context.getDataTypeFactory().get(argv[i + 1]);
|
col.type = DataTypeFactory::instance().get(argv[i + 1]);
|
||||||
columns.push_back(col);
|
columns.push_back(col);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,6 +375,37 @@ bool ParserPrefixUnaryOperatorExpression::parseImpl(Pos & pos, Pos end, ASTPtr &
|
|||||||
|
|
||||||
ws.ignore(pos, end);
|
ws.ignore(pos, end);
|
||||||
|
|
||||||
|
/// Позволяем парсить цепочки вида NOT NOT x. Это хак.
|
||||||
|
/** Так сделано, потому что среди унарных операторов есть только минус и NOT.
|
||||||
|
* Но для минуса цепочку из унарных операторов не требуется поддерживать.
|
||||||
|
*/
|
||||||
|
if (it[0] && 0 == strncmp(it[0], "NOT", 3))
|
||||||
|
{
|
||||||
|
/// Было ли чётное количество NOT.
|
||||||
|
bool even = false;
|
||||||
|
|
||||||
|
const char ** jt;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
for (jt = operators; *jt; jt += 2)
|
||||||
|
{
|
||||||
|
ParserString op(jt[0], true, true);
|
||||||
|
if (op.ignore(pos, end, max_parsed_pos, expected))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!*jt)
|
||||||
|
break;
|
||||||
|
|
||||||
|
even = !even;
|
||||||
|
|
||||||
|
ws.ignore(pos, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (even)
|
||||||
|
it = jt; /// Зануляем результат парсинга первого NOT. Получается, как будто цепочки NOT нет вообще.
|
||||||
|
}
|
||||||
|
|
||||||
ASTPtr elem;
|
ASTPtr elem;
|
||||||
if (!elem_parser->parse(pos, end, elem, max_parsed_pos, expected))
|
if (!elem_parser->parse(pos, end, elem, max_parsed_pos, expected))
|
||||||
return false;
|
return false;
|
||||||
|
@ -606,7 +606,6 @@ void TCPHandler::initBlockInput()
|
|||||||
|
|
||||||
state.block_in = new NativeBlockInputStream(
|
state.block_in = new NativeBlockInputStream(
|
||||||
*state.maybe_compressed_in,
|
*state.maybe_compressed_in,
|
||||||
query_context.getDataTypeFactory(),
|
|
||||||
client_revision);
|
client_revision);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include <DB/Parsers/ExpressionListParsers.h>
|
#include <DB/Parsers/ExpressionListParsers.h>
|
||||||
#include <DB/IO/WriteBufferFromString.h>
|
#include <DB/IO/WriteBufferFromString.h>
|
||||||
#include <DB/Storages/ColumnsDescription.h>
|
#include <DB/Storages/ColumnsDescription.h>
|
||||||
|
#include <DB/DataTypes/DataTypeFactory.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -50,7 +51,7 @@ String ColumnsDescription<store>::toString() const
|
|||||||
|
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
ColumnsDescription<true> ColumnsDescription<true>::parse(const String & str, const DataTypeFactory & data_type_factory)
|
ColumnsDescription<true> ColumnsDescription<true>::parse(const String & str)
|
||||||
{
|
{
|
||||||
ReadBufferFromString buf{str};
|
ReadBufferFromString buf{str};
|
||||||
|
|
||||||
@ -60,6 +61,7 @@ ColumnsDescription<true> ColumnsDescription<true>::parse(const String & str, con
|
|||||||
assertString(" columns:\n", buf);
|
assertString(" columns:\n", buf);
|
||||||
|
|
||||||
ParserTernaryOperatorExpression expr_parser;
|
ParserTernaryOperatorExpression expr_parser;
|
||||||
|
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
|
||||||
|
|
||||||
ColumnsDescription<true> result{};
|
ColumnsDescription<true> result{};
|
||||||
for (size_t i = 0; i < count; ++i)
|
for (size_t i = 0; i < count; ++i)
|
||||||
|
@ -1037,7 +1037,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const St
|
|||||||
MergeTreePartChecker::Settings settings;
|
MergeTreePartChecker::Settings settings;
|
||||||
settings.setIndexGranularity(index_granularity);
|
settings.setIndexGranularity(index_granularity);
|
||||||
settings.setRequireColumnFiles(true);
|
settings.setRequireColumnFiles(true);
|
||||||
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);
|
MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, primary_key_sample, &part->checksums);
|
||||||
|
|
||||||
{
|
{
|
||||||
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
|
WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
|
||||||
|
@ -249,8 +249,11 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreePartChecker::checkDataPart(String path, const Settings & settings, const DataTypeFactory & data_type_factory,
|
void MergeTreePartChecker::checkDataPart(
|
||||||
MergeTreeData::DataPart::Checksums * out_checksums)
|
String path,
|
||||||
|
const Settings & settings,
|
||||||
|
const Block & primary_key_sample,
|
||||||
|
MergeTreeData::DataPart::Checksums * out_checksums)
|
||||||
{
|
{
|
||||||
if (!path.empty() && path.back() != '/')
|
if (!path.empty() && path.back() != '/')
|
||||||
path += "/";
|
path += "/";
|
||||||
@ -262,7 +265,7 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
|
|||||||
|
|
||||||
{
|
{
|
||||||
ReadBufferFromFile buf(path + "columns.txt");
|
ReadBufferFromFile buf(path + "columns.txt");
|
||||||
columns.readText(buf, data_type_factory);
|
columns.readText(buf);
|
||||||
assertEOF(buf);
|
assertEOF(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -275,12 +278,30 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
|
|||||||
|
|
||||||
/// Реальные чексуммы по содержимому данных. Их несоответствие checksums_txt будет говорить о битых данных.
|
/// Реальные чексуммы по содержимому данных. Их несоответствие checksums_txt будет говорить о битых данных.
|
||||||
MergeTreeData::DataPart::Checksums checksums_data;
|
MergeTreeData::DataPart::Checksums checksums_data;
|
||||||
size_t primary_idx_size;
|
|
||||||
|
|
||||||
|
size_t marks_in_primary_key = 0;
|
||||||
{
|
{
|
||||||
ReadBufferFromFile file_buf(path + "primary.idx");
|
ReadBufferFromFile file_buf(path + "primary.idx");
|
||||||
HashingReadBuffer hashing_buf(file_buf);
|
HashingReadBuffer hashing_buf(file_buf);
|
||||||
primary_idx_size = hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
|
|
||||||
|
if (primary_key_sample)
|
||||||
|
{
|
||||||
|
Field tmp_field;
|
||||||
|
size_t key_size = primary_key_sample.columns();
|
||||||
|
while (!hashing_buf.eof())
|
||||||
|
{
|
||||||
|
++marks_in_primary_key;
|
||||||
|
for (size_t j = 0; j < key_size; ++j)
|
||||||
|
primary_key_sample.unsafeGetByPosition(j).type->deserializeBinary(tmp_field, hashing_buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t primary_idx_size = hashing_buf.count();
|
||||||
|
|
||||||
checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash());
|
checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,9 +366,17 @@ void MergeTreePartChecker::checkDataPart(String path, const Settings & settings,
|
|||||||
if (rows == Stream::UNKNOWN)
|
if (rows == Stream::UNKNOWN)
|
||||||
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
||||||
|
|
||||||
if (primary_idx_size % ((rows - 1) / settings.index_granularity + 1))
|
if (primary_key_sample)
|
||||||
throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks ("
|
{
|
||||||
+ toString(rows) + "/" + toString(settings.index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
|
const size_t expected_marks = (rows - 1) / settings.index_granularity + 1;
|
||||||
|
if (expected_marks != marks_in_primary_key)
|
||||||
|
throw Exception("Size of primary key doesn't match expected number of marks."
|
||||||
|
" Number of rows in columns: " + toString(rows)
|
||||||
|
+ ", index_granularity: " + toString(settings.index_granularity)
|
||||||
|
+ ", expected number of marks: " + toString(expected_marks)
|
||||||
|
+ ", size of primary key: " + toString(marks_in_primary_key),
|
||||||
|
ErrorCodes::CORRUPTED_DATA);
|
||||||
|
}
|
||||||
|
|
||||||
if (settings.require_checksums || !checksums_txt.files.empty())
|
if (settings.require_checksums || !checksums_txt.files.empty())
|
||||||
checksums_txt.checkEqual(checksums_data, true);
|
checksums_txt.checkEqual(checksums_data, true);
|
||||||
|
@ -268,8 +268,7 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
|
|||||||
assertEOF(buf);
|
assertEOF(buf);
|
||||||
|
|
||||||
zkutil::Stat stat;
|
zkutil::Stat stat;
|
||||||
auto columns_desc = ColumnsDescription<true>::parse(
|
auto columns_desc = ColumnsDescription<true>::parse(zookeeper->get(zookeeper_path + "/columns", &stat));
|
||||||
zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
|
|
||||||
|
|
||||||
auto & columns = columns_desc.columns;
|
auto & columns = columns_desc.columns;
|
||||||
auto & materialized_columns = columns_desc.materialized;
|
auto & materialized_columns = columns_desc.materialized;
|
||||||
@ -1459,7 +1458,7 @@ void StorageReplicatedMergeTree::alterThread()
|
|||||||
|
|
||||||
zkutil::Stat stat;
|
zkutil::Stat stat;
|
||||||
const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
|
const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
|
||||||
auto columns_desc = ColumnsDescription<true>::parse(columns_str, context.getDataTypeFactory());
|
auto columns_desc = ColumnsDescription<true>::parse(columns_str);
|
||||||
|
|
||||||
auto & columns = columns_desc.columns;
|
auto & columns = columns_desc.columns;
|
||||||
auto & materialized_columns = columns_desc.materialized;
|
auto & materialized_columns = columns_desc.materialized;
|
||||||
@ -1809,7 +1808,7 @@ void StorageReplicatedMergeTree::partCheckThread()
|
|||||||
zk_checksums.checkEqual(part->checksums, true);
|
zk_checksums.checkEqual(part->checksums, true);
|
||||||
|
|
||||||
auto zk_columns = NamesAndTypesList::parse(
|
auto zk_columns = NamesAndTypesList::parse(
|
||||||
zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory());
|
zookeeper->get(replica_path + "/parts/" + part_name + "/columns"));
|
||||||
if (part->columns != zk_columns)
|
if (part->columns != zk_columns)
|
||||||
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
|
throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");
|
||||||
|
|
||||||
@ -1818,7 +1817,7 @@ void StorageReplicatedMergeTree::partCheckThread()
|
|||||||
settings.setRequireChecksums(true);
|
settings.setRequireChecksums(true);
|
||||||
settings.setRequireColumnFiles(true);
|
settings.setRequireColumnFiles(true);
|
||||||
MergeTreePartChecker::checkDataPart(
|
MergeTreePartChecker::checkDataPart(
|
||||||
data.getFullPath() + part_name, settings, context.getDataTypeFactory());
|
data.getFullPath() + part_name, settings, data.primary_key_sample);
|
||||||
|
|
||||||
LOG_INFO(log, "Part " << part_name << " looks good.");
|
LOG_INFO(log, "Part " << part_name << " looks good.");
|
||||||
}
|
}
|
||||||
|
@ -87,8 +87,6 @@ void StorageSetOrJoinBase::restore()
|
|||||||
constexpr auto file_suffix = ".bin";
|
constexpr auto file_suffix = ".bin";
|
||||||
constexpr auto file_suffix_size = strlen(file_suffix);
|
constexpr auto file_suffix_size = strlen(file_suffix);
|
||||||
|
|
||||||
DataTypeFactory data_type_factory;
|
|
||||||
|
|
||||||
Poco::DirectoryIterator dir_end;
|
Poco::DirectoryIterator dir_end;
|
||||||
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
|
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
|
||||||
{
|
{
|
||||||
@ -104,17 +102,17 @@ void StorageSetOrJoinBase::restore()
|
|||||||
if (file_num > increment)
|
if (file_num > increment)
|
||||||
increment = file_num;
|
increment = file_num;
|
||||||
|
|
||||||
restoreFromFile(dir_it->path(), data_type_factory);
|
restoreFromFile(dir_it->path());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageSetOrJoinBase::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory)
|
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
|
||||||
{
|
{
|
||||||
ReadBufferFromFile backup_buf(file_path);
|
ReadBufferFromFile backup_buf(file_path);
|
||||||
CompressedReadBuffer compressed_backup_buf(backup_buf);
|
CompressedReadBuffer compressed_backup_buf(backup_buf);
|
||||||
NativeBlockInputStream backup_stream(compressed_backup_buf, data_type_factory);
|
NativeBlockInputStream backup_stream(compressed_backup_buf);
|
||||||
|
|
||||||
backup_stream.readPrefix();
|
backup_stream.readPrefix();
|
||||||
while (Block block = backup_stream.read())
|
while (Block block = backup_stream.read())
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
|
|
||||||
int main(int argc, char ** argv)
|
int main(int argc, char ** argv)
|
||||||
{
|
{
|
||||||
|
using namespace DB;
|
||||||
|
|
||||||
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
|
||||||
Logger::root().setChannel(channel);
|
Logger::root().setChannel(channel);
|
||||||
Logger::root().setLevel("trace");
|
Logger::root().setLevel("trace");
|
||||||
@ -16,18 +18,18 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
DB::MergeTreePartChecker::Settings settings;
|
MergeTreePartChecker::Settings settings;
|
||||||
if (argc == 4)
|
if (argc == 4)
|
||||||
settings.setIndexGranularity(DB::parse<size_t>(argv[3]));
|
settings.setIndexGranularity(parse<size_t>(argv[3]));
|
||||||
settings.setRequireChecksums(argv[2][0] == '1');
|
settings.setRequireChecksums(argv[2][0] == '1');
|
||||||
settings.setRequireColumnFiles(argv[2][0] == '1');
|
settings.setRequireColumnFiles(argv[2][0] == '1');
|
||||||
settings.setVerbose(true);
|
settings.setVerbose(true);
|
||||||
|
|
||||||
DB::MergeTreePartChecker::checkDataPart(argv[1], settings, DB::DataTypeFactory());
|
MergeTreePartChecker::checkDataPart(argv[1], settings, Block());
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,16 +458,16 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
0 0 0
|
0 0 0
|
||||||
0 0 0
|
0 0 0
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
@ -499,18 +499,18 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
@ -540,18 +540,18 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
@ -581,18 +581,18 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-16 -6 -16
|
-10 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-13 -6 -16
|
-13 -20 -10
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-16 84 -16
|
0 -100 0
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
@ -622,18 +622,18 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
-13 0 0
|
-13 -20 -10
|
||||||
-13 0 0
|
-13 -20 -10
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
@ -663,18 +663,18 @@
|
|||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
-13 -13 -13
|
-13 -13 -13
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
0 0 0
|
-10 -20 -10
|
||||||
-13 0 0
|
-13 -20 -10
|
||||||
-13 0 0
|
-13 -20 -10
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
0 0 0
|
0 -100 0
|
||||||
2.72 2.72 2.71
|
2.72 2.72 2.71
|
||||||
2.72 2.72 2.71
|
2.72 2.72 2.71
|
||||||
2.72 2.72 2.71
|
2.72 2.72 2.71
|
||||||
@ -745,12 +745,12 @@
|
|||||||
-3 -2 -3
|
-3 -2 -3
|
||||||
-3 -2 -3
|
-3 -2 -3
|
||||||
-3 -2 -3
|
-3 -2 -3
|
||||||
0 0 0
|
0 -10 0
|
||||||
0 0 0
|
0 -10 0
|
||||||
0 0 0
|
0 -10 0
|
||||||
0 0 0
|
0 -10 0
|
||||||
-2.7 0 0
|
-2.7 -10 0
|
||||||
-2.7 0 0
|
-2.7 -10 0
|
||||||
0 0 0
|
0 0 0
|
||||||
0 0 0
|
0 0 0
|
||||||
0 0 0
|
0 0 0
|
||||||
|
@ -0,0 +1,2 @@
|
|||||||
|
2 6 Hello
|
||||||
|
2 6 Hello
|
1
dbms/tests/queries/0_stateless/00162_global_join.sql
Normal file
1
dbms/tests/queries/0_stateless/00162_global_join.sql
Normal file
@ -0,0 +1 @@
|
|||||||
|
SELECT toFloat64(dummy + 2) AS n, j1, j2 FROM remote('127.0.0.{2,3}', system.one) GLOBAL ANY LEFT JOIN (SELECT number / 3 AS n, number AS j1, 'Hello' AS j2 FROM system.numbers LIMIT 10) USING n LIMIT 10;
|
@ -0,0 +1,20 @@
|
|||||||
|
0 0 0
|
||||||
|
1 0.5 0
|
||||||
|
2 1 0
|
||||||
|
3 1.5 0
|
||||||
|
4 2 0
|
||||||
|
5 2.5 0
|
||||||
|
6 3 0
|
||||||
|
7 3.5 0
|
||||||
|
8 4 0
|
||||||
|
9 4.5 0
|
||||||
|
0 0 0
|
||||||
|
1 0.5 0
|
||||||
|
2 1 0
|
||||||
|
3 1.5 0
|
||||||
|
4 2 0
|
||||||
|
5 2.5 0
|
||||||
|
6 3 0
|
||||||
|
7 3.5 0
|
||||||
|
8 4 0
|
||||||
|
9 4.5 0
|
@ -0,0 +1,4 @@
|
|||||||
|
SELECT number, number / 2 AS n, j1, j2 FROM remote('127.0.0.{2,3}', system.numbers) ANY LEFT JOIN (SELECT number / 3 AS n, number AS j1, 'Hello' AS j2 FROM system.numbers LIMIT 0) USING n LIMIT 10;
|
||||||
|
SELECT dummy + 2 AS number, number / 2 AS n, j1, j2 FROM remote('127.0.0.{2,3}', system.one) ANY INNER JOIN (SELECT number / 3 AS n, number AS j1, 'Hello' AS j2 FROM system.numbers LIMIT 0) USING n LIMIT 10;
|
||||||
|
SELECT number, number / 2 AS n, j1, j2 FROM remote('127.0.0.{2,3}', system.numbers) GLOBAL ANY LEFT JOIN (SELECT number / 3 AS n, number AS j1, 'Hello' AS j2 FROM system.numbers LIMIT 0) USING n LIMIT 10;
|
||||||
|
SELECT dummy + 2 AS number, number / 2 AS n, j1, j2 FROM remote('127.0.0.{2,3}', system.one) GLOBAL ANY INNER JOIN (SELECT number / 3 AS n, number AS j1, 'Hello' AS j2 FROM system.numbers LIMIT 0) USING n LIMIT 10;
|
5
dbms/tests/queries/0_stateless/00164_not_chain.reference
Normal file
5
dbms/tests/queries/0_stateless/00164_not_chain.reference
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
0
|
||||||
|
1
|
||||||
|
0
|
||||||
|
1
|
||||||
|
0
|
5
dbms/tests/queries/0_stateless/00164_not_chain.sql
Normal file
5
dbms/tests/queries/0_stateless/00164_not_chain.sql
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
SELECT NOT 1;
|
||||||
|
SELECT NOT NOT 1;
|
||||||
|
SELECT NOT NOT NOT 1;
|
||||||
|
SELECT NOT NOT NOT NOT 1 = 1;
|
||||||
|
SELECT NOT NOT not NoT NOT 1 = 1;
|
@ -2,21 +2,26 @@
|
|||||||
#include <statdaemons/Exception.h>
|
#include <statdaemons/Exception.h>
|
||||||
#include <zkutil/Types.h>
|
#include <zkutil/Types.h>
|
||||||
#include <DB/Common/ProfileEvents.h>
|
#include <DB/Common/ProfileEvents.h>
|
||||||
|
#include <DB/Core/ErrorCodes.h>
|
||||||
|
|
||||||
namespace zkutil
|
namespace zkutil
|
||||||
{
|
{
|
||||||
|
|
||||||
class KeeperException : public DB::Exception
|
class KeeperException : public DB::Exception
|
||||||
{
|
{
|
||||||
|
private:
|
||||||
|
/// delegate constructor, used to minimize repetition; last parameter used for overload resolution
|
||||||
|
KeeperException(const std::string & msg, const int32_t code, int)
|
||||||
|
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code) { incrementEventCounter(); }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
KeeperException(const std::string & msg) : DB::Exception(msg), code(ZOK) { incrementEventCounter(); }
|
KeeperException(const std::string & msg) : KeeperException(msg, ZOK, 0) {}
|
||||||
KeeperException(const std::string & msg, int32_t code_)
|
KeeperException(const std::string & msg, const int32_t code)
|
||||||
: DB::Exception(msg + " (" + zerror(code_) + ")"), code(code_) { incrementEventCounter(); }
|
: KeeperException(msg + " (" + zerror(code) + ")", code, 0) {}
|
||||||
KeeperException(int32_t code_)
|
KeeperException(const int32_t code) : KeeperException(zerror(code), code, 0) {}
|
||||||
: DB::Exception(zerror(code_)), code(code_) { incrementEventCounter(); }
|
KeeperException(const int32_t code, const std::string & path)
|
||||||
KeeperException(int32_t code_, const std::string & path_)
|
: KeeperException(std::string{zerror(code)} + ", path: " + path, code, 0) {}
|
||||||
: DB::Exception(std::string(zerror(code_)) + ", path: " + path_), code(code_) { incrementEventCounter(); }
|
|
||||||
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
|
KeeperException(const KeeperException & exc) : DB::Exception(exc), code(exc.code) { incrementEventCounter(); }
|
||||||
|
|
||||||
const char * name() const throw() { return "zkutil::KeeperException"; }
|
const char * name() const throw() { return "zkutil::KeeperException"; }
|
||||||
@ -26,12 +31,13 @@ public:
|
|||||||
/// при этих ошибках надо переинициализировать сессию с zookeeper
|
/// при этих ошибках надо переинициализировать сессию с zookeeper
|
||||||
bool isUnrecoverable() const
|
bool isUnrecoverable() const
|
||||||
{
|
{
|
||||||
return code == ZINVALIDSTATE || code == ZSESSIONEXPIRED;
|
return code == ZINVALIDSTATE || code == ZSESSIONEXPIRED || code == ZSESSIONMOVED;
|
||||||
}
|
}
|
||||||
int32_t code;
|
|
||||||
|
const int32_t code;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void incrementEventCounter()
|
static void incrementEventCounter()
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
|
ProfileEvents::increment(ProfileEvents::ZooKeeperExceptions);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user