Merge branch 'master' into martijn-asof-nomap

This commit is contained in:
Martijn Bakker 2019-03-30 02:02:48 +00:00
commit 7c5febb6aa
70 changed files with 807 additions and 260 deletions

View File

@ -74,6 +74,26 @@
<!-- Quota for user. -->
<quota>default</quota>
<!-- For testing the table filters -->
<databases>
<test>
<!-- Simple expression filter -->
<filtered_table1>
<filter>a = 1</filter>
</filtered_table1>
<!-- Complex expression filter -->
<filtered_table2>
<filter>a + b &lt; 1 or c - d &gt; 5</filter>
</filtered_table2>
<!-- Filter with ALIAS column -->
<filtered_table3>
<filter>c = 1</filter>
</filtered_table3>
</test>
</databases>
</default>
<!-- Example of user with readonly access. -->

View File

@ -824,7 +824,7 @@ ColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const
size_t prev_src_string_offset_local = prev_src_string_offset;
for (size_t k = 0; k < value_size; ++k)
{
/// Size of one row.
/// Size of one string.
size_t chars_size = src_string_offsets[k + prev_src_offset] - prev_src_string_offset_local;
current_res_string_offset += chars_size;
@ -835,7 +835,7 @@ ColumnPtr ColumnArray::replicateString(const Offsets & replicate_offsets) const
if (sum_chars_size)
{
/// Copies the characters of the array of rows.
/// Copies the characters of the array of strings.
res_chars.resize(res_chars.size() + sum_chars_size);
memcpySmallAllowReadWriteOverflow15(
&res_chars[res_chars.size() - sum_chars_size], &src_chars[prev_src_string_offset], sum_chars_size);

View File

@ -129,19 +129,6 @@ std::vector<MutableColumnPtr> ColumnFunction::scatter(IColumn::ColumnIndex num_c
return columns;
}
void ColumnFunction::insertDefault()
{
for (auto & column : captured_columns)
column.column->assumeMutableRef().insertDefault();
++size_;
}
void ColumnFunction::popBack(size_t n)
{
for (auto & column : captured_columns)
column.column->assumeMutableRef().popBack(n);
size_ -= n;
}
size_t ColumnFunction::byteSize() const
{
size_t total_size = 0;

View File

@ -34,8 +34,7 @@ public:
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void insertDefault() override;
void popBack(size_t n) override;
std::vector<MutableColumnPtr> scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector) const override;
@ -64,7 +63,12 @@ public:
void insert(const Field &) override
{
throw Exception("Cannot get insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Cannot insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertDefault() override
{
throw Exception("Cannot insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void insertRangeFrom(const IColumn &, size_t, size_t) override
@ -92,6 +96,11 @@ public:
throw Exception("updateHashWithValue is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void popBack(size_t) override
{
throw Exception("popBack is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
int compareAt(size_t, size_t, const IColumn &, int) const override
{
throw Exception("compareAt is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);

View File

@ -306,21 +306,11 @@ void ColumnLowCardinality::setSharedDictionary(const ColumnPtr & column_unique)
dictionary.setShared(column_unique);
}
ColumnLowCardinality::MutablePtr ColumnLowCardinality::compact()
{
auto positions = idx.getPositions();
/// Create column with new indexes and old dictionary.
auto column = ColumnLowCardinality::create(getDictionary().assumeMutable(), (*std::move(positions)).mutate());
/// Will create new dictionary.
column->compactInplace();
return column;
}
ColumnLowCardinality::MutablePtr ColumnLowCardinality::cutAndCompact(size_t start, size_t length) const
{
auto sub_positions = (*idx.getPositions()->cut(start, length)).mutate();
/// Create column with new indexes and old dictionary.
/// Dictionary is shared, but will be recreated after compactInplace call.
auto column = ColumnLowCardinality::create(getDictionary().assumeMutable(), std::move(sub_positions));
/// Will create new dictionary.
column->compactInplace();

View File

@ -177,10 +177,8 @@ public:
void setSharedDictionary(const ColumnPtr & column_unique);
bool isSharedDictionary() const { return dictionary.isShared(); }
/// Create column new dictionary with only keys that are mentioned in index.
MutablePtr compact();
/// Cut + compact.
/// Create column with new dictionary from column part.
/// Dictionary will have only keys that are mentioned in index.
MutablePtr cutAndCompact(size_t start, size_t length) const;
struct DictionaryEncodedColumn

View File

@ -216,7 +216,7 @@ protected:
chameleon_ptr(std::initializer_list<U> && arg) : value(std::forward<std::initializer_list<U>>(arg)) {}
const T * get() const { return value.get(); }
T * get() { return value->assumeMutable().get(); }
T * get() { return &value->assumeMutableRef(); }
const T * operator->() const { return get(); }
T * operator->() { return get(); }

View File

@ -308,6 +308,8 @@ struct Settings
M(SettingBool, allow_experimental_data_skipping_indices, false, "If it is set to true, data skipping indices can be used in CREATE TABLE/ALTER TABLE queries.") \
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
\
M(SettingBool, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

View File

@ -1,11 +0,0 @@
#include <Interpreters/ProcessList.h>
#include <DataStreams/BlockIO.h>
namespace DB
{
BlockIO::~BlockIO() = default;
BlockIO::BlockIO() = default;
BlockIO::BlockIO(const BlockIO &) = default;
}

View File

@ -11,15 +11,19 @@ class ProcessListEntry;
struct BlockIO
{
BlockIO() = default;
BlockIO(const BlockIO &) = default;
~BlockIO() = default;
BlockOutputStreamPtr out;
BlockInputStreamPtr in;
/** process_list_entry should be destroyed after in and after out,
* since in and out contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
* which could be used before destroying of in and out.
*/
std::shared_ptr<ProcessListEntry> process_list_entry;
BlockInputStreamPtr in;
BlockOutputStreamPtr out;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
@ -37,17 +41,11 @@ struct BlockIO
exception_callback();
}
/// We provide the correct order of destruction.
void reset()
BlockIO & operator= (const BlockIO & rhs)
{
out.reset();
in.reset();
process_list_entry.reset();
}
BlockIO & operator= (const BlockIO & rhs)
{
reset();
process_list_entry = rhs.process_list_entry;
in = rhs.in;
@ -58,10 +56,6 @@ struct BlockIO
return *this;
}
~BlockIO();
BlockIO();
BlockIO(const BlockIO &);
};
}

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context)
const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context)
{
const auto * ast_insert_query = ast->as<ASTInsertQuery>();
@ -28,8 +28,6 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
String format = ast_insert_query->format;
if (format.empty())
format = "Values";
if (ast_insert_query->settings_ast)
InterpreterSetQuery(ast_insert_query->settings_ast, context).executeForCurrentContext();
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.

View File

@ -19,7 +19,7 @@ class Context;
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, Context & context);
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context);
Block readImpl() override { return res_stream->read(); }
void readPrefixImpl() override { return res_stream->readPrefix(); }

View File

@ -195,6 +195,12 @@ struct DeserializeStateLowCardinality : public IDataType::DeserializeBinaryBulkS
ColumnPtr null_map;
UInt64 num_pending_rows = 0;
/// If dictionary should be updated.
/// Can happen is some granules was skipped while reading from MergeTree.
/// We should store this flag in State because
/// in case of long block of empty arrays we may not need read dictionary at first reading.
bool need_update_dictionary = false;
explicit DeserializeStateLowCardinality(UInt64 key_version) : key_version(key_version) {}
};
@ -686,7 +692,12 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
if (!settings.continuous_reading)
low_cardinality_state->num_pending_rows = 0;
bool first_dictionary = true;
if (!settings.continuous_reading)
{
/// Remember in state that some granules were skipped and we need to update dictionary.
low_cardinality_state->need_update_dictionary = true;
}
while (limit)
{
if (low_cardinality_state->num_pending_rows == 0)
@ -699,10 +710,12 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
index_type.deserialize(*indexes_stream);
if (index_type.need_global_dictionary && (!global_dictionary || index_type.need_update_dictionary || (first_dictionary && !settings.continuous_reading)))
bool need_update_dictionary =
!global_dictionary || index_type.need_update_dictionary || low_cardinality_state->need_update_dictionary;
if (index_type.need_global_dictionary && need_update_dictionary)
{
readDictionary();
first_dictionary = false;
low_cardinality_state->need_update_dictionary = false;
}
if (low_cardinality_state->index_type.has_additional_keys)

View File

@ -32,7 +32,7 @@ HTTPDictionarySource::HTTPDictionarySource(
, format{config.getString(config_prefix + ".format")}
, sample_block{sample_block}
, context(context)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
}
@ -45,7 +45,7 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
, format{other.format}
, sample_block{other.sample_block}
, context(other.context)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
}

View File

@ -84,7 +84,7 @@ XDBCDictionarySource::XDBCDictionarySource(
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
, bridge_helper{bridge_}
, timeouts{ConnectionTimeouts::getHTTPTimeouts(context_.getSettingsRef())}
, timeouts{ConnectionTimeouts::getHTTPTimeouts(context_)}
, global_context(context_)
{
bridge_url = bridge_helper->getMainURI();

View File

@ -3,6 +3,9 @@
#include <Poco/Timespan.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
@ -12,6 +15,7 @@ struct ConnectionTimeouts
Poco::Timespan send_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan tcp_keep_alive_timeout;
Poco::Timespan http_keep_alive_timeout;
ConnectionTimeouts() = default;
@ -21,7 +25,8 @@ struct ConnectionTimeouts
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(0)
tcp_keep_alive_timeout(0),
http_keep_alive_timeout(0)
{
}
@ -32,9 +37,23 @@ struct ConnectionTimeouts
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(0)
{
}
ConnectionTimeouts(const Poco::Timespan & connection_timeout_,
const Poco::Timespan & send_timeout_,
const Poco::Timespan & receive_timeout_,
const Poco::Timespan & tcp_keep_alive_timeout_,
const Poco::Timespan & http_keep_alive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_)
{
}
static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit)
{
@ -49,7 +68,8 @@ struct ConnectionTimeouts
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit));
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit));
}
/// Timeouts for the case when we have just single attempt to connect.
@ -64,9 +84,12 @@ struct ConnectionTimeouts
return ConnectionTimeouts(settings.connect_timeout_with_failover_ms, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings)
static ConnectionTimeouts getHTTPTimeouts(const Context & context)
{
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout);
const auto & settings = context.getSettingsRef();
const auto & config = context.getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, http_keep_alive_timeout);
}
};

View File

@ -45,13 +45,14 @@ namespace ErrorCodes
namespace
{
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
#if defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION >= 0x02000000
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
bool isHTTPS(const Poco::URI & uri)
@ -99,7 +100,6 @@ namespace
const UInt16 port;
bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
ObjectPtr allocObject() override
{
return makeHTTPSessionImpl(host, port, https, true);
@ -140,7 +140,10 @@ namespace
HTTPSessionPool() = default;
public:
Entry getSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t max_connections_per_endpoint)
Entry getSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint)
{
std::unique_lock lock(mutex);
const std::string & host = uri.getHost();
@ -156,6 +159,7 @@ namespace
auto session = pool_ptr->second->get(retry_timeout);
setTimeouts(*session, timeouts);
return session;
}
};

View File

@ -26,7 +26,7 @@
#include <Core/Settings.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/RuntimeComponentsFactory.h>
#include <Interpreters/ISecurityManager.h>
#include <Interpreters/IUsersManager.h>
#include <Interpreters/Quota.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionaries.h>
@ -129,7 +129,7 @@ struct ContextShared
mutable std::optional<ExternalModels> external_models;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
std::unique_ptr<ISecurityManager> security_manager; /// Known users.
std::unique_ptr<IUsersManager> users_manager; /// Known users.
Quotas quotas; /// Known quotas for resource use.
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@ -291,7 +291,7 @@ struct ContextShared
private:
void initialize()
{
security_manager = runtime_components_factory->createSecurityManager();
users_manager = runtime_components_factory->createUsersManager();
}
};
@ -571,7 +571,7 @@ void Context::setUsersConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->users_config = config;
shared->security_manager->loadFromConfig(*shared->users_config);
shared->users_manager->loadFromConfig(*shared->users_config);
shared->quotas.loadFromConfig(*shared->users_config);
}
@ -581,11 +581,39 @@ ConfigurationPtr Context::getUsersConfig()
return shared->users_config;
}
bool Context::hasUserProperty(const String & database, const String & table, const String & name) const
{
auto lock = getLock();
// No user - no properties.
if (client_info.current_user.empty())
return false;
const auto & props = shared->users_manager->getUser(client_info.current_user)->table_props;
auto db = props.find(database);
if (db == props.end())
return false;
auto table_props = db->second.find(table);
if (table_props == db->second.end())
return false;
return !!table_props->second.count(name);
}
const String & Context::getUserProperty(const String & database, const String & table, const String & name) const
{
auto lock = getLock();
const auto & props = shared->users_manager->getUser(client_info.current_user)->table_props;
return props.at(database).at(table).at(name);
}
void Context::calculateUserSettings()
{
auto lock = getLock();
String profile = shared->security_manager->getUser(client_info.current_user)->profile;
String profile = shared->users_manager->getUser(client_info.current_user)->profile;
/// 1) Set default settings (hardcoded values)
/// NOTE: we ignore global_context settings (from which it is usually copied)
@ -606,7 +634,7 @@ void Context::setUser(const String & name, const String & password, const Poco::
{
auto lock = getLock();
auto user_props = shared->security_manager->authorizeAndGetUser(name, password, address.host());
auto user_props = shared->users_manager->authorizeAndGetUser(name, password, address.host());
client_info.current_user = name;
client_info.current_address = address;
@ -644,7 +672,7 @@ bool Context::hasDatabaseAccessRights(const String & database_name) const
{
auto lock = getLock();
return client_info.current_user.empty() || (database_name == "system") ||
shared->security_manager->hasAccessToDatabase(client_info.current_user, database_name);
shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name);
}
void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const
@ -655,7 +683,7 @@ void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) c
/// All users have access to the database system.
return;
}
if (!shared->security_manager->hasAccessToDatabase(client_info.current_user, database_name))
if (!shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name))
throw Exception("Access denied to database " + database_name + " for user " + client_info.current_user , ErrorCodes::DATABASE_ACCESS_DENIED);
}

View File

@ -188,6 +188,10 @@ public:
void setUsersConfig(const ConfigurationPtr & config);
ConfigurationPtr getUsersConfig();
// User property is a key-value pair from the configuration entry: users.<username>.databases.<db_name>.<table_name>.<key_name>
bool hasUserProperty(const String & database, const String & table, const String & name) const;
const String & getUserProperty(const String & database, const String & table, const String & name) const;
/// Must be called before getClientInfo.
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key);
/// Compute and set actual user settings, client_info.current_user should be set

View File

@ -2,7 +2,7 @@
#include <Dictionaries/Embedded/IGeoDictionariesLoader.h>
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <Interpreters/ISecurityManager.h>
#include <Interpreters/IUsersManager.h>
#include <memory>
@ -16,7 +16,9 @@ namespace DB
class IRuntimeComponentsFactory
{
public:
virtual std::unique_ptr<ISecurityManager> createSecurityManager() = 0;
virtual ~IRuntimeComponentsFactory() = default;
virtual std::unique_ptr<IUsersManager> createUsersManager() = 0;
virtual std::unique_ptr<IGeoDictionariesLoader> createGeoDictionariesLoader() = 0;
@ -24,8 +26,6 @@ public:
virtual std::unique_ptr<IExternalLoaderConfigRepository> createExternalDictionariesConfigRepository() = 0;
virtual std::unique_ptr<IExternalLoaderConfigRepository> createExternalModelsConfigRepository() = 0;
virtual ~IRuntimeComponentsFactory() {}
};
}

View File

@ -5,16 +5,18 @@
namespace DB
{
/** Duties of security manager:
/** Duties of users manager:
* 1) Authenticate users
* 2) Provide user settings (profile, quota, ACLs)
* 3) Grant access to databases
*/
class ISecurityManager
class IUsersManager
{
public:
using UserPtr = std::shared_ptr<const User>;
virtual ~IUsersManager() = default;
virtual void loadFromConfig(const Poco::Util::AbstractConfiguration & config) = 0;
/// Find user and make authorize checks
@ -28,8 +30,6 @@ public:
/// Check if the user has access to the database.
virtual bool hasAccessToDatabase(const String & user_name, const String & database_name) const = 0;
virtual ~ISecurityManager() {}
};
}

View File

@ -36,7 +36,7 @@ namespace ErrorCodes
InterpreterInsertQuery::InterpreterInsertQuery(
const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_)
const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_)
: query_ptr(query_ptr_), context(context_), allow_materialized(allow_materialized_)
{
}

View File

@ -15,7 +15,7 @@ namespace DB
class InterpreterInsertQuery : public IInterpreter
{
public:
InterpreterInsertQuery(const ASTPtr & query_ptr_, Context & context_, bool allow_materialized_ = false);
InterpreterInsertQuery(const ASTPtr & query_ptr_, const Context & context_, bool allow_materialized_ = false);
/** Prepare a request for execution. Return block streams
* - the stream into which you can write data to execute the query, if INSERT;
@ -32,7 +32,7 @@ private:
void checkAccess(const ASTInsertQuery & query);
ASTPtr query_ptr;
Context & context;
const Context & context;
bool allow_materialized;
};

View File

@ -23,12 +23,13 @@
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -75,6 +76,60 @@ namespace ErrorCodes
extern const int INVALID_LIMIT_EXPRESSION;
}
namespace
{
/// Assumes `storage` is set and the table filter is not empty.
String generateFilterActions(ExpressionActionsPtr & actions, const StoragePtr & storage, const Context & context, const Names & prerequisite_columns = {})
{
const auto & db_name = storage->getDatabaseName();
const auto & table_name = storage->getTableName();
const auto & filter_str = context.getUserProperty(db_name, table_name, "filter");
/// TODO: implement some AST builders for this kind of stuff
ASTPtr query_ast = std::make_shared<ASTSelectQuery>();
auto * select_ast = query_ast->as<ASTSelectQuery>();
auto expr_list = std::make_shared<ASTExpressionList>();
select_ast->children.push_back(expr_list);
select_ast->select_expression_list = select_ast->children.back();
auto parseExpression = [] (const String & expr)
{
ParserExpression expr_parser;
return parseQuery(expr_parser, expr, 0);
};
// The first column is our filter expression.
expr_list->children.push_back(parseExpression(filter_str));
/// Keep columns that are required after the filter actions.
for (const auto & column_str : prerequisite_columns)
expr_list->children.push_back(parseExpression(column_str));
auto tables = std::make_shared<ASTTablesInSelectQuery>();
auto tables_elem = std::make_shared<ASTTablesInSelectQueryElement>();
auto table_expr = std::make_shared<ASTTableExpression>();
select_ast->children.push_back(tables);
select_ast->tables = select_ast->children.back();
tables->children.push_back(tables_elem);
tables_elem->table_expression = table_expr;
tables_elem->children.push_back(table_expr);
table_expr->database_and_table_name = createTableIdentifier(db_name, table_name);
table_expr->children.push_back(table_expr->database_and_table_name);
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = SyntaxAnalyzer(context).analyze(query_ast, storage->getColumns().getAllPhysical());
ExpressionAnalyzer analyzer(query_ast, syntax_result, context);
ExpressionActionsChain new_chain(context);
analyzer.appendSelect(new_chain, false);
actions = new_chain.getLastActions();
return expr_list->children.at(0)->getColumnName();
}
} // namespace
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const Context & context_,
@ -302,7 +357,8 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
return pipeline.streams;
}
InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run)
InterpreterSelectQuery::AnalysisResult
InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info)
{
AnalysisResult res;
@ -318,6 +374,7 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
* throw out unnecessary columns based on the entire query. In unnecessary parts of the query, we will not execute subqueries.
*/
bool has_filter = false;
bool has_prewhere = false;
bool has_where = false;
size_t where_step_num;
@ -350,10 +407,15 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
res.columns_to_remove_after_prewhere = std::move(columns_to_remove);
}
else if (has_filter)
{
/// Can't have prewhere and filter set simultaneously
res.filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
}
if (has_where)
res.remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
has_prewhere = has_where = false;
has_filter = has_prewhere = has_where = false;
chain.clear();
};
@ -378,6 +440,26 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
columns_for_final.begin(), columns_for_final.end());
}
if (storage && context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
{
has_filter = true;
/// XXX: aggregated copy-paste from ExpressionAnalyzer::appendSmth()
if (chain.steps.empty())
{
chain.steps.emplace_back(std::make_shared<ExpressionActions>(source_columns, context));
}
ExpressionActionsChain::Step & step = chain.steps.back();
// FIXME: assert(filter_info);
res.filter_info = filter_info;
step.actions = filter_info->actions;
step.required_output.push_back(res.filter_info->column_name);
step.can_remove_required_output = {true};
chain.addStep();
}
if (query_analyzer->appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
{
has_prewhere = true;
@ -445,6 +527,8 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression
}
/// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys).
if (res.filter_info)
res.filter_info->actions->prependProjectInput();
if (res.has_where)
res.before_where->prependProjectInput();
if (res.has_having)
@ -491,7 +575,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
/// PREWHERE optimization
if (storage)
/// Turn off, if the table filter is applied.
if (storage && !context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
{
if (!dry_run)
from_stage = storage->getQueryProcessingStage(context);
@ -517,11 +602,23 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
}
AnalysisResult expressions;
FilterInfoPtr filter_info;
/// We need proper `source_header` for `NullBlockInputStream` in dry-run.
if (storage && context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
{
filter_info = std::make_shared<FilterInfo>();
filter_info->column_name = generateFilterActions(filter_info->actions, storage, context, required_columns);
source_header = storage->getSampleBlockForColumns(filter_info->actions->getRequiredColumns());
}
if (dry_run)
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true);
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true, filter_info);
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
if (expressions.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
@ -533,12 +630,15 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
if (prepared_input)
pipeline.streams.push_back(prepared_input);
expressions = analyzeExpressions(from_stage, false);
expressions = analyzeExpressions(from_stage, false, filter_info);
if (from_stage == QueryProcessingStage::WithMergeableState &&
options.to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
@ -563,6 +663,18 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
if (expressions.first_stage)
{
if (expressions.filter_info)
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(
stream,
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
});
}
if (expressions.hasJoin())
{
const auto & join = query.join()->table_join->as<ASTTableJoin &>();
@ -788,11 +900,26 @@ void InterpreterSelectQuery::executeFetchColumns(
/// Actions to calculate ALIAS if required.
ExpressionActionsPtr alias_actions;
/// Are ALIAS columns required for query execution?
auto alias_columns_required = false;
if (storage)
{
/// Append columns from the table filter to required
if (context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
{
auto initial_required_columns = required_columns;
ExpressionActionsPtr actions;
generateFilterActions(actions, storage, context, initial_required_columns);
auto required_columns_from_filter = actions->getRequiredColumns();
for (const auto & column : required_columns_from_filter)
{
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
}
}
/// Detect, if ALIAS columns are required for query execution
auto alias_columns_required = false;
const ColumnsDescription & storage_columns = storage->getColumns();
for (const auto & column_name : required_columns)
{
@ -804,25 +931,33 @@ void InterpreterSelectQuery::executeFetchColumns(
}
}
/// There are multiple sources of required columns:
/// - raw required columns,
/// - columns deduced from ALIAS columns,
/// - raw required columns from PREWHERE,
/// - columns deduced from ALIAS columns from PREWHERE.
/// PREWHERE is a special case, since we need to resolve it and pass directly to `IStorage::read()`
/// before any other executions.
if (alias_columns_required)
{
/// Columns required for prewhere actions.
NameSet required_prewhere_columns;
/// Columns required for prewhere actions which are aliases in storage.
NameSet required_prewhere_aliases;
Block prewhere_actions_result;
NameSet required_columns_from_prewhere; /// Set of all (including ALIAS) required columns for PREWHERE
NameSet required_aliases_from_prewhere; /// Set of ALIAS required columns for PREWHERE
if (prewhere_info)
{
/// Get some columns directly from PREWHERE expression actions
auto prewhere_required_columns = prewhere_info->prewhere_actions->getRequiredColumns();
required_prewhere_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
required_columns_from_prewhere.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
/// We will create an expression to return all the requested columns, with the calculation of the required ALIAS columns.
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
/// Separate expression for columns used in prewhere.
ASTPtr required_prewhere_columns_expr_list = std::make_shared<ASTExpressionList>();
/// Expression, that contains all raw required columns
ASTPtr required_columns_all_expr = std::make_shared<ASTExpressionList>();
/// Expression, that contains raw required columns for PREWHERE
ASTPtr required_columns_from_prewhere_expr = std::make_shared<ASTExpressionList>();
/// Sort out already known required columns between expressions,
/// also populate `required_aliases_from_prewhere`.
for (const auto & column : required_columns)
{
ASTPtr column_expr;
@ -833,36 +968,47 @@ void InterpreterSelectQuery::executeFetchColumns(
else
column_expr = std::make_shared<ASTIdentifier>(column);
if (required_prewhere_columns.count(column))
if (required_columns_from_prewhere.count(column))
{
required_prewhere_columns_expr_list->children.emplace_back(std::move(column_expr));
required_columns_from_prewhere_expr->children.emplace_back(std::move(column_expr));
if (is_alias)
required_prewhere_aliases.insert(column);
required_aliases_from_prewhere.insert(column);
}
else
required_columns_expr_list->children.emplace_back(std::move(column_expr));
required_columns_all_expr->children.emplace_back(std::move(column_expr));
}
/// Columns which we will get after prewhere execution.
NamesAndTypesList additional_source_columns;
/// Add columns which will be added by prewhere (otherwise we will remove them in project action).
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
for (const auto & column : prewhere_actions_result)
/// Columns, which we will get after prewhere and filter executions.
NamesAndTypesList required_columns_after_prewhere;
NameSet required_columns_after_prewhere_set;
/// Collect required columns from prewhere expression actions.
if (prewhere_info)
{
if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
continue;
NameSet columns_to_remove(columns_to_remove_after_prewhere.begin(), columns_to_remove_after_prewhere.end());
Block prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
if (columns_to_remove.count(column.name))
continue;
/// Populate required columns with the columns, added by PREWHERE actions and not removed afterwards.
/// XXX: looks hacky that we already know which columns after PREWHERE we won't need for sure.
for (const auto & column : prewhere_actions_result)
{
if (prewhere_info->remove_prewhere_column && column.name == prewhere_info->prewhere_column_name)
continue;
required_columns_expr_list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
additional_source_columns.emplace_back(column.name, column.type);
if (columns_to_remove.count(column.name))
continue;
required_columns_all_expr->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
required_columns_after_prewhere.emplace_back(column.name, column.type);
}
required_columns_after_prewhere_set
= ext::map<NameSet>(required_columns_after_prewhere, [](const auto & it) { return it.name; });
}
auto additional_source_columns_set = ext::map<NameSet>(additional_source_columns, [] (const auto & it) { return it.name; });
auto syntax_result = SyntaxAnalyzer(context).analyze(required_columns_expr_list, additional_source_columns, {}, storage);
alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActions(true);
auto syntax_result = SyntaxAnalyzer(context).analyze(required_columns_all_expr, required_columns_after_prewhere, {}, storage);
alias_actions = ExpressionAnalyzer(required_columns_all_expr, syntax_result, context).getActions(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
required_columns = alias_actions->getRequiredColumns();
@ -874,17 +1020,10 @@ void InterpreterSelectQuery::executeFetchColumns(
prewhere_info->remove_prewhere_column = false;
/// Remove columns which will be added by prewhere.
size_t next_req_column_pos = 0;
for (size_t i = 0; i < required_columns.size(); ++i)
required_columns.erase(std::remove_if(required_columns.begin(), required_columns.end(), [&](const String & name)
{
if (!additional_source_columns_set.count(required_columns[i]))
{
if (next_req_column_pos < i)
std::swap(required_columns[i], required_columns[next_req_column_pos]);
++next_req_column_pos;
}
}
required_columns.resize(next_req_column_pos);
return !!required_columns_after_prewhere_set.count(name);
}), required_columns.end());
if (prewhere_info)
{
@ -898,21 +1037,22 @@ void InterpreterSelectQuery::executeFetchColumns(
}
prewhere_info->prewhere_actions = std::move(new_actions);
auto analyzed_result = SyntaxAnalyzer(context).analyze(required_prewhere_columns_expr_list, storage->getColumns().getAllPhysical());
prewhere_info->alias_actions =
ExpressionAnalyzer(required_prewhere_columns_expr_list, analyzed_result, context)
.getActions(true, false);
auto analyzed_result
= SyntaxAnalyzer(context).analyze(required_columns_from_prewhere_expr, storage->getColumns().getAllPhysical());
prewhere_info->alias_actions
= ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, context).getActions(true, false);
/// Add columns required by alias actions.
auto required_aliased_columns = prewhere_info->alias_actions->getRequiredColumns();
for (auto & column : required_aliased_columns)
/// Add (physical?) columns required by alias actions.
auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions->getSampleBlock();
for (auto & column : required_columns_from_alias)
if (!prewhere_actions_result.has(column))
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
/// Add columns required by prewhere actions.
for (const auto & column : required_prewhere_columns)
if (required_prewhere_aliases.count(column) == 0)
/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
if (required_aliases_from_prewhere.count(column) == 0)
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column))
required_columns.push_back(column);
}
@ -1013,12 +1153,17 @@ void InterpreterSelectQuery::executeFetchColumns(
if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
pipeline.streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))};
if (query_info.prewhere_info)
pipeline.streams.back() = std::make_shared<FilterBlockInputStream>(
pipeline.streams.back(), prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column);
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FilterBlockInputStream>(
stream,
prewhere_info->prewhere_actions,
prewhere_info->prewhere_column_name,
prewhere_info->remove_prewhere_column);
});
}
pipeline.transform([&](auto & stream)
@ -1434,6 +1579,7 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline)
}
// TODO: move to anonymous namespace
bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
{
if (query.group_by_with_totals)

View File

@ -104,13 +104,13 @@ private:
BlockInputStreamPtr & firstStream() { return streams.at(0); }
template <typename Transform>
void transform(Transform && transform)
void transform(Transform && transformation)
{
for (auto & stream : streams)
transform(stream);
transformation(stream);
if (stream_with_non_joined_data)
transform(stream_with_non_joined_data);
transformation(stream_with_non_joined_data);
}
bool hasMoreThanOneStream() const
@ -154,9 +154,10 @@ private:
SubqueriesForSets subqueries_for_sets;
PrewhereInfoPtr prewhere_info;
FilterInfoPtr filter_info;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run);
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info);
/** From which table to read. With JOIN, the "left" table is returned.

View File

@ -1279,8 +1279,12 @@ public:
makeResultSampleBlock(left_sample_block, right_sample_block, columns_added_by_join,
key_positions_left, is_left_key, left_to_right_key_map);
auto nullability_changes = getNullabilityChanges(parent.sample_block_with_keys, result_sample_block,
key_positions_left, left_to_right_key_map);
column_indices_left.reserve(left_sample_block.columns() - key_names_left.size());
column_indices_keys_and_right.reserve(key_names_left.size() + right_sample_block.columns());
key_nullability_changes.reserve(key_positions_left.size());
/// Use right key columns if present. @note left & right key columns could have different nullability.
for (size_t key_pos : key_positions_left)
@ -1293,11 +1297,12 @@ public:
auto it = left_to_right_key_map.find(key_pos);
if (it != left_to_right_key_map.end())
{
column_indices_keys_and_right.push_back(it->second);
column_indices_left.push_back(key_pos);
key_pos = it->second;
}
else
column_indices_keys_and_right.push_back(key_pos);
column_indices_keys_and_right.push_back(key_pos);
key_nullability_changes.push_back(nullability_changes.count(key_pos));
}
for (size_t i = 0; i < left_sample_block.columns(); ++i)
@ -1324,7 +1329,7 @@ protected:
if (parent.dispatch([&](auto, auto strictness, auto & map) { block = createBlock<strictness>(map); }))
;
else
throw Exception("Logical error: unknown JOIN strictness (must be ANY or ALL)", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: unknown JOIN strictness (must be on of: ANY, ALL, ASOF)", ErrorCodes::LOGICAL_ERROR);
return block;
}
@ -1333,11 +1338,13 @@ private:
UInt64 max_block_size;
Block result_sample_block;
/// Indices of columns in result_sample_block that come from the left-side table (except key columns).
/// Indices of columns in result_sample_block that come from the left-side table (except shared right+left key columns).
ColumnNumbers column_indices_left;
/// Indices of key columns in result_sample_block or columns that come from the right-side table.
/// Order is significant: it is the same as the order of columns in the blocks of the right-side table that are saved in parent.blocks.
ColumnNumbers column_indices_keys_and_right;
/// Which key columns need change nullability (right is nullable and left is not or vice versa)
std::vector<bool> key_nullability_changes;
std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
@ -1393,6 +1400,9 @@ private:
MutableColumns columns_left = columnsForIndex(result_sample_block, column_indices_left);
MutableColumns columns_keys_and_right = columnsForIndex(result_sample_block, column_indices_keys_and_right);
/// Temporary change destination key columns' nullability according to mapped block
changeNullability(columns_keys_and_right, key_nullability_changes);
size_t rows_added = 0;
switch (parent.type)
@ -1411,6 +1421,9 @@ private:
if (!rows_added)
return {};
/// Revert columns nullability
changeNullability(columns_keys_and_right, key_nullability_changes);
Block res = result_sample_block.cloneEmpty();
/// @note it's possible to make ColumnConst here and materialize it later
@ -1468,6 +1481,47 @@ private:
return rows_added;
}
static std::unordered_set<size_t> getNullabilityChanges(const Block & sample_block_with_keys, const Block & out_block,
const std::vector<size_t> & key_positions,
const std::unordered_map<size_t, size_t> & left_to_right_key_map)
{
std::unordered_set<size_t> nullability_changes;
for (size_t i = 0; i < key_positions.size(); ++i)
{
size_t key_pos = key_positions[i];
auto it = left_to_right_key_map.find(key_pos);
if (it != left_to_right_key_map.end())
key_pos = it->second;
const auto & dst = out_block.getByPosition(key_pos).column;
const auto & src = sample_block_with_keys.getByPosition(i).column;
if (dst->isColumnNullable() != src->isColumnNullable())
nullability_changes.insert(key_pos);
}
return nullability_changes;
}
static void changeNullability(MutableColumns & columns, const std::vector<bool> & changes_bitmap)
{
/// @note changes_bitmap.size() <= columns.size()
for (size_t i = 0; i < changes_bitmap.size(); ++i)
{
if (changes_bitmap[i])
{
ColumnPtr column = std::move(columns[i]);
if (column->isColumnNullable())
column = static_cast<const ColumnNullable &>(*column).getNestedColumnPtr();
else
column = makeNullable(column);
columns[i] = (*std::move(column)).mutate();
}
}
}
};

View File

@ -1,9 +1,9 @@
#pragma once
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/IRuntimeComponentsFactory.h>
#include <Interpreters/ExternalLoaderConfigRepository.h>
#include <Interpreters/SecurityManager.h>
#include <Interpreters/IRuntimeComponentsFactory.h>
#include <Interpreters/UsersManager.h>
namespace DB
{
@ -14,9 +14,9 @@ namespace DB
class RuntimeComponentsFactory : public IRuntimeComponentsFactory
{
public:
std::unique_ptr<ISecurityManager> createSecurityManager() override
std::unique_ptr<IUsersManager> createUsersManager() override
{
return std::make_unique<SecurityManager>();
return std::make_unique<UsersManager>();
}
std::unique_ptr<IGeoDictionariesLoader> createGeoDictionariesLoader() override

View File

@ -197,7 +197,7 @@ bool Set::insertFromBlock(const Block & block)
if (set_elements[i]->empty())
set_elements[i] = filtered_column;
else
set_elements[i]->assumeMutableRef().insertRangeFrom(*filtered_column, 0, filtered_column->size());
set_elements[i]->insertRangeFrom(*filtered_column, 0, filtered_column->size());
}
}

View File

@ -68,7 +68,7 @@ public:
const DataTypes & getDataTypes() const { return data_types; }
bool hasExplicitSetElements() const { return fill_set_elements; }
const Columns & getSetElements() const { return set_elements; }
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; }
private:
size_t keys_size = 0;
@ -113,7 +113,7 @@ private:
/// Collected elements of `Set`.
/// It is necessary for the index to work on the primary key in the IN statement.
Columns set_elements;
std::vector<IColumn::WrappedPtr> set_elements;
/** Protects work with the set in the functions `insertFromBlock` and `execute`.
* These functions can be called simultaneously from different threads only when using StorageSet,

View File

@ -315,6 +315,34 @@ User::User(const String & name_, const String & config_elem, const Poco::Util::A
databases.insert(database_name);
}
}
/// Read properties per "database.table"
/// Only tables are expected to have properties, so that all the keys inside "database" are table names.
const auto config_databases = config_elem + ".databases";
if (config.has(config_databases))
{
Poco::Util::AbstractConfiguration::Keys database_names;
config.keys(config_databases, database_names);
/// Read tables within databases
for (const auto & database : database_names)
{
const auto config_database = config_databases + "." + database;
Poco::Util::AbstractConfiguration::Keys table_names;
config.keys(config_database, table_names);
/// Read table properties
for (const auto & table : table_names)
{
const auto config_filter = config_database + "." + table + ".filter";
if (config.has(config_filter))
{
const auto filter_query = config.getString(config_filter);
table_props[database][table]["filter"] = filter_query;
}
}
}
}
}

View File

@ -2,9 +2,10 @@
#include <Core/Types.h>
#include <vector>
#include <unordered_set>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <vector>
namespace Poco
@ -65,6 +66,12 @@ struct User
using DatabaseSet = std::unordered_set<std::string>;
DatabaseSet databases;
/// Table properties.
using PropertyMap = std::unordered_map<std::string /* name */, std::string /* value */>;
using TableMap = std::unordered_map<std::string /* table */, PropertyMap /* properties */>;
using DatabaseMap = std::unordered_map<std::string /* database */, TableMap /* tables */>;
DatabaseMap table_props;
User(const String & name_, const String & config_elem, const Poco::Util::AbstractConfiguration & config);
};

View File

@ -1,4 +1,5 @@
#include "SecurityManager.h"
#include <Interpreters/UsersManager.h>
#include <Poco/Net/IPAddress.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/String.h>
@ -28,9 +29,9 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
using UserPtr = SecurityManager::UserPtr;
using UserPtr = UsersManager::UserPtr;
void SecurityManager::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
void UsersManager::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
{
Container new_users;
@ -46,7 +47,7 @@ void SecurityManager::loadFromConfig(const Poco::Util::AbstractConfiguration & c
users = std::move(new_users);
}
UserPtr SecurityManager::authorizeAndGetUser(
UserPtr UsersManager::authorizeAndGetUser(
const String & user_name,
const String & password,
const Poco::Net::IPAddress & address) const
@ -100,7 +101,7 @@ UserPtr SecurityManager::authorizeAndGetUser(
return it->second;
}
UserPtr SecurityManager::getUser(const String & user_name) const
UserPtr UsersManager::getUser(const String & user_name) const
{
auto it = users.find(user_name);
@ -110,7 +111,7 @@ UserPtr SecurityManager::getUser(const String & user_name) const
return it->second;
}
bool SecurityManager::hasAccessToDatabase(const std::string & user_name, const std::string & database_name) const
bool UsersManager::hasAccessToDatabase(const std::string & user_name, const std::string & database_name) const
{
auto it = users.find(user_name);

View File

@ -1,21 +1,17 @@
#pragma once
#include <Interpreters/ISecurityManager.h>
#include <Interpreters/IUsersManager.h>
#include <map>
namespace DB
{
/** Default implementation of security manager used by native server application.
/** Default implementation of users manager used by native server application.
* Manages fixed set of users listed in 'Users' configuration file.
*/
class SecurityManager : public ISecurityManager
class UsersManager : public IUsersManager
{
private:
using Container = std::map<String, UserPtr>;
Container users;
public:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config) override;
@ -27,6 +23,10 @@ public:
UserPtr getUser(const String & user_name) const override;
bool hasAccessToDatabase(const String & user_name, const String & database_name) const override;
private:
using Container = std::map<String, UserPtr>;
Container users;
};
}

View File

@ -170,6 +170,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ast = parseQuery(parser, begin, end, "", max_query_size);
auto * insert_query = ast->as<ASTInsertQuery>();
if (insert_query && insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
if (insert_query && insert_query->data)
{
query_end = insert_query->data;

View File

@ -1,5 +1,5 @@
#include <Common/Config/ConfigProcessor.h>
#include <Interpreters/SecurityManager.h>
#include <Interpreters/UsersManager.h>
#include <boost/filesystem.hpp>
#include <vector>
#include <string>
@ -197,11 +197,11 @@ void runOneTest(const TestDescriptor & test_descriptor)
throw std::runtime_error(os.str());
}
DB::SecurityManager security_manager;
DB::UsersManager users_manager;
try
{
security_manager.loadFromConfig(*config);
users_manager.loadFromConfig(*config);
}
catch (const Poco::Exception & ex)
{
@ -216,7 +216,7 @@ void runOneTest(const TestDescriptor & test_descriptor)
try
{
res = security_manager.hasAccessToDatabase(entry.user_name, entry.database_name);
res = users_manager.hasAccessToDatabase(entry.user_name, entry.database_name);
}
catch (const Poco::Exception &)
{

View File

@ -61,9 +61,9 @@ public:
/// The main name of the table type (for example, StorageMergeTree).
virtual std::string getName() const = 0;
/** The name of the table.
*/
/// The name of the table.
virtual std::string getTableName() const = 0;
virtual std::string getDatabaseName() const { return {}; } // FIXME: should be abstract method.
/** Returns true if the storage receives data from a remote server or servers. */
virtual bool isRemote() const { return false; }

View File

@ -27,7 +27,7 @@ friend class KafkaBlockOutputStream;
public:
std::string getName() const override { return "Kafka"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const { return database_name; }
std::string getDatabaseName() const override { return database_name; }
void startup() override;
void shutdown() override;

View File

@ -16,7 +16,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.data.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{
Stopwatch watch;

View File

@ -13,14 +13,15 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_)
: storage(storage_) {}
MergeTreeBlockOutputStream(StorageMergeTree & storage_, size_t max_parts_per_block)
: storage(storage_), max_parts_per_block(max_parts_per_block) {}
Block getHeader() const override;
void write(const Block & block) override;
private:
StorageMergeTree & storage;
size_t max_parts_per_block;
};
}

View File

@ -457,7 +457,7 @@ bool BloomFilterCondition::tryPrepareSetBloomFilter(
std::vector<std::vector<StringBloomFilter>> bloom_filters;
std::vector<size_t> key_position;
const auto & columns = prepared_set->getSetElements();
Columns columns = prepared_set->getSetElements();
for (size_t col = 0; col < key_tuple_mapping.size(); ++col)
{
bloom_filters.emplace_back();

View File

@ -465,6 +465,7 @@ public:
/// Delete all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
/// Must be called with locked lockStructureForShare().
void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
/// After the call to dropAllData() no method can be called.

View File

@ -1,9 +1,10 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Interpreters/AggregationCommon.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -22,6 +23,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS;
}
namespace
@ -30,7 +32,8 @@ namespace
void buildScatterSelector(
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector)
IColumn::Selector & selector,
size_t max_parts)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
@ -47,6 +50,9 @@ void buildScatterSelector(
if (inserted)
{
if (max_parts && partitions_count >= max_parts)
throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).", ErrorCodes::TOO_MANY_PARTS);
partition_num_to_first_row.push_back(i);
it->getSecond() = partitions_count;
@ -67,7 +73,7 @@ void buildScatterSelector(
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -92,7 +98,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector);
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);

View File

@ -41,7 +41,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
BlocksWithPartition splitBlockIntoParts(const Block & block);
BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.

View File

@ -33,8 +33,8 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), deduplicate(deduplicate_),
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block), deduplicate(deduplicate_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
@ -122,7 +122,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
checkQuorumPrecondition(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
{

View File

@ -22,8 +22,9 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_,
bool deduplicate_);
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block,
bool deduplicate_);
Block getHeader() const override;
void writePrefix() override;
@ -56,6 +57,7 @@ private:
StorageReplicatedMergeTree & storage;
size_t quorum;
size_t quorum_timeout_ms;
size_t max_parts_per_block;
bool deduplicate = true;
bool last_block_is_duplicate = false;

View File

@ -53,7 +53,12 @@ void ReplicatedMergeTreeCleanupThread::run()
void ReplicatedMergeTreeCleanupThread::iterate()
{
storage.clearOldPartsAndRemoveFromZK();
storage.data.clearOldTemporaryDirectories();
{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(false, "");
storage.data.clearOldTemporaryDirectories();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment
/// and two replicas will try to do cleanup simultaneously.

View File

@ -25,7 +25,16 @@ struct PrewhereInfo
: prewhere_actions(std::move(prewhere_actions_)), prewhere_column_name(std::move(prewhere_column_name_)) {}
};
/// Helper struct to store all the information about the filter expression.
struct FilterInfo
{
ExpressionActionsPtr actions;
String column_name;
bool do_remove_column = false;
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;

View File

@ -126,9 +126,9 @@ BlockInputStreams StorageMergeTree::read(
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & /*context*/)
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
}
void StorageMergeTree::checkTableCanBeDropped() const
@ -690,7 +690,11 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
data.clearOldPartsFromFilesystem();
data.clearOldTemporaryDirectories();
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
data.clearOldTemporaryDirectories();
}
clearOldMutations();
}

View File

@ -27,12 +27,9 @@ public:
void shutdown() override;
~StorageMergeTree() override;
std::string getName() const override
{
return data.merging_params.getModeName() + "MergeTree";
}
std::string getName() const override { return data.merging_params.getModeName() + "MergeTree"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }

View File

@ -1786,7 +1786,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context.getSettingsRef());
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
@ -2741,7 +2741,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
else
{
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context.getSettingsRef());
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
@ -2959,7 +2959,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
bool deduplicate = data.settings.replicated_deduplication_window != 0 && settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), deduplicate);
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds(), settings.max_partitions_per_insert_block, deduplicate);
}
@ -3577,7 +3577,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
}
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
for (auto & part : loaded_parts)
{
String old_name = part->name;

View File

@ -79,12 +79,10 @@ public:
void shutdown() override;
~StorageReplicatedMergeTree() override;
std::string getName() const override
{
return "Replicated" + data.merging_params.getModeName() + "MergeTree";
}
std::string getName() const override { return "Replicated" + data.merging_params.getModeName() + "MergeTree"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }

View File

@ -173,7 +173,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
getHeaderBlock(column_names),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()));
ConnectionTimeouts::getHTTPTimeouts(context));
auto column_defaults = getColumns().getDefaults();
@ -187,7 +187,7 @@ void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global.getSettingsRef()));
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global));
}
void registerStorageURL(StorageFactory & factory)

View File

@ -501,6 +501,10 @@ class ClickHouseInstance:
raise Exception('Cmd "{}" failed! Return code {}. Output: {}'.format(' '.join(cmd), exit_code, output))
return output
def contains_in_log(self, substring):
result = self.exec_in_container(["bash", "-c", "grep '{}' /var/log/clickhouse-server/clickhouse-server.log || true".format(substring)])
return len(result) > 0
def copy_file_to_container(self, local_path, dest_path):
with open(local_path, 'r') as fdata:
data = fdata.read()

View File

@ -52,6 +52,22 @@ def test_single_endpoint_connections_count(start_small_cluster):
assert node2.query("SELECT value FROM system.events where event='CreatedHTTPConnections'") == '1\n'
def test_keepalive_timeout(start_small_cluster):
current_count = int(node1.query("select count() from test_table").strip())
node1.query("insert into test_table values ('2017-06-16', 777, 0)")
assert_eq_with_retry(node2, "select count() from test_table", str(current_count + 1))
# Server keepAliveTimeout is 3 seconds, default client session timeout is 8
# lets sleep in that interval
time.sleep(4)
node1.query("insert into test_table values ('2017-06-16', 888, 0)")
time.sleep(3)
assert_eq_with_retry(node2, "select count() from test_table", str(current_count + 2))
assert not node2.contains_in_log("No message received"), "Found 'No message received' in clickhouse-server.log"
node3 = cluster.add_instance('node3', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/log_conf.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/log_conf.xml'], with_zookeeper=True)
node5 = cluster.add_instance('node5', config_dir="configs", main_configs=['configs/remote_servers.xml', 'configs/log_conf.xml'], with_zookeeper=True)

View File

@ -0,0 +1,16 @@
foo \N 2 0 Nullable(String) Nullable(String)
bar bar 1 2 Nullable(String) Nullable(String)
\N 0 1 Nullable(String) Nullable(String)
\N test 0 1 Nullable(String) Nullable(String)
foo \N 2 0 Nullable(String) Nullable(String)
bar bar 1 2 Nullable(String) Nullable(String)
\N 0 1 Nullable(String) Nullable(String)
\N test 0 1 Nullable(String) Nullable(String)
foo 2 0 String Nullable(String)
bar bar 1 2 String Nullable(String)
test 0 1 String Nullable(String)
bar bar 1 2 String Nullable(String)
test 0 1 String Nullable(String)
foo 2 0 String
bar 1 2 String
test 0 1 String

View File

@ -0,0 +1,57 @@
USE test;
DROP TABLE IF EXISTS table_a;
DROP TABLE IF EXISTS table_b;
CREATE TABLE table_a (
event_id UInt64,
something String,
other Nullable(String)
) ENGINE = MergeTree ORDER BY (event_id);
CREATE TABLE table_b (
event_id UInt64,
something Nullable(String),
other String
) ENGINE = MergeTree ORDER BY (event_id);
INSERT INTO table_a VALUES (1, 'foo', 'foo'), (2, 'foo', 'foo'), (3, 'bar', 'bar');
INSERT INTO table_b VALUES (1, 'bar', 'bar'), (2, 'bar', 'bar'), (3, 'test', 'test'), (4, NULL, '');
SELECT s1.other, s2.other, count_a, count_b, toTypeName(s1.other), toTypeName(s2.other) FROM
( SELECT other, count() AS count_a FROM table_a GROUP BY other ) s1
ALL FULL JOIN
( SELECT other, count() AS count_b FROM table_b GROUP BY other ) s2
ON s1.other = s2.other
ORDER BY count_a DESC;
SELECT s1.other, s2.other, count_a, count_b, toTypeName(s1.other), toTypeName(s2.other) FROM
( SELECT other, count() AS count_a FROM table_a GROUP BY other ) s1
ALL FULL JOIN
( SELECT other, count() AS count_b FROM table_b GROUP BY other ) s2
USING other
ORDER BY count_a DESC;
SELECT s1.something, s2.something, count_a, count_b, toTypeName(s1.something), toTypeName(s2.something) FROM
( SELECT something, count() AS count_a FROM table_a GROUP BY something ) s1
ALL FULL JOIN
( SELECT something, count() AS count_b FROM table_b GROUP BY something ) s2
ON s1.something = s2.something
ORDER BY count_a DESC;
SELECT s1.something, s2.something, count_a, count_b, toTypeName(s1.something), toTypeName(s2.something) FROM
( SELECT something, count() AS count_a FROM table_a GROUP BY something ) s1
ALL RIGHT JOIN
( SELECT something, count() AS count_b FROM table_b GROUP BY something ) s2
USING (something)
ORDER BY count_a DESC;
SELECT something, count_a, count_b, toTypeName(something) FROM
( SELECT something, count() AS count_a FROM table_a GROUP BY something )
ALL FULL JOIN
( SELECT something, count() AS count_b FROM table_b GROUP BY something )
USING (something)
ORDER BY count_a DESC;
DROP TABLE table_a;
DROP TABLE table_b;

View File

@ -0,0 +1,32 @@
-- PREWHERE should fail
1 0
1 1
0 0 0 0
0 0 6 0
0 1
1 0
1
1
0
1
1
1
1
1
0
1
1
0
1
1
1
1
0
1
1
1
1
1
1 0 1 1
1 1 1 1
1 1 1 0

View File

@ -0,0 +1,44 @@
DROP TABLE IF EXISTS filtered_table1;
DROP TABLE IF EXISTS filtered_table2;
DROP TABLE IF EXISTS filtered_table3;
-- Filter: a = 1, values: (1, 0), (1, 1)
CREATE TABLE test.filtered_table1 (a UInt8, b UInt8) ENGINE MergeTree ORDER BY a;
INSERT INTO test.filtered_table1 values (0, 0), (0, 1), (1, 0), (1, 1);
-- Filter: a + b < 1 or c - d > 5, values: (0, 0, 0, 0), (0, 0, 6, 0)
CREATE TABLE test.filtered_table2 (a UInt8, b UInt8, c UInt8, d UInt8) ENGINE MergeTree ORDER BY a;
INSERT INTO test.filtered_table2 values (0, 0, 0, 0), (1, 2, 3, 4), (4, 3, 2, 1), (0, 0, 6, 0);
-- Filter: c = 1, values: (0, 1), (1, 0)
CREATE TABLE test.filtered_table3 (a UInt8, b UInt8, c UInt16 ALIAS a + b) ENGINE MergeTree ORDER BY a;
INSERT INTO test.filtered_table3 values (0, 0), (0, 1), (1, 0), (1, 1);
SELECT '-- PREWHERE should fail';
SELECT * FROM test.filtered_table1 PREWHERE 1; -- { serverError 182 }
SELECT * FROM test.filtered_table2 PREWHERE 1; -- { serverError 182 }
SELECT * FROM test.filtered_table3 PREWHERE 1; -- { serverError 182 }
SELECT * FROM test.filtered_table1;
SELECT * FROM test.filtered_table2;
SELECT * FROM test.filtered_table3;
SELECT a FROM test.filtered_table1;
SELECT b FROM test.filtered_table1;
SELECT a FROM test.filtered_table1 WHERE a = 1;
SELECT a = 1 FROM test.filtered_table1;
SELECT a FROM test.filtered_table3;
SELECT b FROM test.filtered_table3;
SELECT c FROM test.filtered_table3;
SELECT a + b FROM test.filtered_table3;
SELECT a FROM test.filtered_table3 WHERE c = 1;
SELECT c = 1 FROM test.filtered_table3;
SELECT a + b = 1 FROM test.filtered_table3;
SELECT * FROM test.filtered_table1 as t1 ANY LEFT JOIN test.filtered_table1 as t2 ON t1.a = t2.b;
SELECT * FROM test.filtered_table1 as t2 ANY RIGHT JOIN test.filtered_table1 as t1 ON t2.b = t1.a;
DROP TABLE test.filtered_table1;
DROP TABLE test.filtered_table2;
DROP TABLE test.filtered_table3;

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.partitions;
CREATE TABLE test.partitions (x UInt64) ENGINE = MergeTree ORDER BY x PARTITION BY x;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 100;
SELECT count() FROM system.parts WHERE database = 'test' AND table = 'partitions';
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 100;
SELECT count() FROM system.parts WHERE database = 'test' AND table = 'partitions';
SET max_partitions_per_insert_block = 1;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 1;
INSERT INTO test.partitions SELECT * FROM system.numbers LIMIT 2; -- { serverError 252 }
DROP TABLE test.partitions;

View File

@ -0,0 +1,7 @@
drop table if exists test.lc;
create table test.lc (key UInt64, value Array(LowCardinality(String))) engine = MergeTree order by key;
insert into test.lc select number, if(number < 10000 or number > 100000, [toString(number)], emptyArrayString()) from system.numbers limit 200000;
select * from test.lc where (key < 100 or key > 50000) and not has(value, toString(key)) and length(value) == 1 limit 10 settings max_block_size = 8192, max_threads = 1;
drop table if exists test.lc;

View File

@ -0,0 +1,6 @@
DROP TABLE IF EXISTS test.test;
CREATE TABLE test.test (a UInt8, b UInt8, c UInt16 ALIAS a + b) ENGINE = MergeTree ORDER BY a;
SELECT b FROM test.test PREWHERE c = 1;
DROP TABLE test;

View File

@ -5,7 +5,7 @@ RUN apt-get update && apt-get -y install tzdata python llvm-6.0 llvm-6.0-dev
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
CMD echo "TSAN_OPTIONS='halt_on_error=1'" >> /etc/environment;
CMD echo "TSAN_OPTIONS='halt_on_error=1 history_size=7'" >> /etc/environment;
CMD echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment;
CMD echo "ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment;
CMD echo "UBSAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment;

View File

@ -16,7 +16,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/lib/llvm-8/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1'" >> /etc/environment; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7'" >> /etc/environment; \
echo "TSAN_SYMBOLIZER_PATH=/usr/lib/llvm-8/bin/llvm-symbolizer" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
echo "ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \

View File

@ -41,7 +41,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/lib/llvm-8/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1'" >> /etc/environment; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7'" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
echo "ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \
echo "UBSAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \

View File

@ -30,7 +30,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/lib/llvm-8/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1'" >> /etc/environment; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7'" >> /etc/environment; \
echo "TSAN_SYMBOLIZER_PATH=/usr/lib/llvm-8/bin/llvm-symbolizer" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
echo "ASAN_SYMBOLIZER_PATH=/usr/lib/llvm-6.0/bin/llvm-symbolizer" >> /etc/environment; \

View File

@ -30,17 +30,26 @@ user String — The name of the user for connecting to the server.
## system.columns
Contains information about the columns in all tables.
You can use this table to get information similar to `DESCRIBE TABLE`, but for multiple tables at once.
Contains information about the columns in all the tables.
```
database String — The name of the database the table is in.
table String Table name.
name String — Column name.
type String — Column type.
default_type String — Expression type (DEFAULT, MATERIALIZED, ALIAS) for the default value, or an empty string if it is not defined.
default_expression String — Expression for the default value, or an empty string if it is not defined.
```
You can use this table to get information similar to the [DESCRIBE TABLE](../query_language/misc.md#misc-describe-table) query, but for multiple tables at once.
The `system.columns` table contains the following columns (the type of the corresponding column is shown in brackets):
- `database` (String) — Database name.
- `table` (String) — Table name.
- `name` (String) — Column name.
- `type` (String) — Column type.
- `default_kind` (String) — Expression type (`DEFAULT`, `MATERIALIZED`, `ALIAS`) for the default value, or an empty string if it is not defined.
- `default_expression` (String) — Expression for the default value, or an empty string if it is not defined.
- `data_compressed_bytes` (UInt64) — The size of compressed data, in bytes.
- `data_uncompressed_bytes` (UInt64) — The size of decompressed data, in bytes.
- `marks_bytes` (UInt64) — The size of marks, in bytes.
- `comment` (String) — The comment about column, or an empty string if it is not defined.
- `is_in_partition_key` (UInt8) — Flag that indicates whether the column is in partition expression.
- `is_in_sorting_key` (UInt8) — Flag that indicates whether the column is in sorting key expression.
- `is_in_primary_key` (UInt8) — Flag that indicates whether the column is in primary key expression.
- `is_in_sampling_key` (UInt8) — Flag that indicates whether the column is in sampling key expression.
## system.databases
@ -374,10 +383,27 @@ WHERE changed
## system.tables
This table contains the String columns 'database', 'name', and 'engine'.
The table also contains three virtual columns: metadata_modification_time (DateTime type), create_table_query, and engine_full (String type).
Each table that the server knows about is entered in the 'system.tables' table.
This system table is used for implementing SHOW TABLES queries.
Contains metadata of each table that the server knows about. Detached tables are not shown in `system.tables`.
This table contains the following columns (the type of the corresponding column is shown in brackets):
- `database` (String) — The name of database the table is in.
- `name` (String) — Table name.
- `engine` (String) — Table engine name (without parameters).
- `is_temporary` (UInt8) - Flag that indicates whether the table is temporary.
- `data_path` (String) - Path to the table data in the file system.
- `metadata_path` (String) - Path to the table metadata in the file system.
- `metadata_modification_time` (DateTime) - Time of latest modification of the table metadata.
- `dependencies_database` (Array(String)) - Database dependencies.
- `dependencies_table` (Array(String)) - Table dependencies ([MaterializedView](table_engines/materializedview.md) tables based on the current table).
- `create_table_query` (String) - The query that was used to create the table.
- `engine_full` (String) - Parameters of the table engine.
- `partition_key` (String) - The partition key expression specified in the table.
- `sorting_key` (String) - The sorting key expression specified in the table.
- `primary_key` (String) - The primary key expression specified in the table.
- `sampling_key` (String) - The sampling key expression specified in the table.
The `system.tables` is used in `SHOW TABLES` query implementation.
## system.zookeeper

View File

@ -51,7 +51,7 @@ If the table is corrupted, you can copy the non-corrupted data to another table.
3. Execute the query `INSERT INTO <new_table_name> SELECT * FROM <damaged_table_name>`. This request copies the non-corrupted data from the damaged table to another table. Only the data before the corrupted part will be copied.
4. Restart the `clickhouse-client` to reset the `max_threads` value.
## DESCRIBE TABLE
## DESCRIBE TABLE {#misc-describe-table}
``` sql
DESC|DESCRIBE TABLE [db.]table [INTO OUTFILE filename] [FORMAT format]

View File

@ -51,18 +51,18 @@ The `SAMPLE` clause allows for approximated query processing. Approximated query
The features of data sampling are listed below:
- Data sampling is a deterministic mechanism. The result of the same `SELECT .. SAMPLE` query is always the same.
- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This allows using the sample in subqueries in the `IN` clause, as well as for manually correlating results of different queries with samples.
- Sampling allows reading less data from a disk. Note that for this you must specify the sampling key correctly. For more details see [Creating a MergeTree Table](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table).
- Sampling works consistently for different tables. For tables with a single sampling key, a sample with the same coefficient always selects the same subset of possible data. For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This means that you can use the sample in subqueries in the `IN` clause, as well as manually correlate results of different queries with samples.
- Sampling allows reading less data from a disk. Note that you must specify the sampling key correctly. For more information, see [Creating a MergeTree Table](../operations/table_engines/mergetree.md#table_engine-mergetree-creating-a-table).
The `SAMPLE` clause can be specified in several ways:
For the `SAMPLE` clause the following syntax is supported:
- `SAMPLE k`, where `k` is a decimal number from 0 to 1. The query is executed on `k` percent of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k)
- `SAMPLE k`, where `k` is a decimal number from 0 to 1. The query is executed on `k` fraction of data. For example, `SAMPLE 0.1` runs the query on 10% of data. [Read more](#select-sample-k)
- `SAMPLE n`, where `n` is a sufficiently large integer. The query is executed on a sample of at least `n` rows (but not significantly more than this). For example, `SAMPLE 10000000` runs the query on a minimum of 10,000,000 rows. [Read more](#select-sample-n)
- `SAMPLE k OFFSET m` where `k` and `m` are numbers from 0 to 1. The query is executed on a sample of `k` percent of the data. The data used for the sample is offset by `m` percent. [Read more](#select-sample-offset)
#### SAMPLE k {#select-sample-k}
In a `SAMPLE k` clause, `k` is a percent amount of data that the sample is taken from. The example is shown below:
In a `SAMPLE k` clause, the sample is taken from the `k` fraction of data. The example is shown below:
``` sql
SELECT
@ -84,9 +84,9 @@ In this case, the query is executed on a sample of at least `n` rows, where `n`
Since the minimum unit for data reading is one granule (its size is set by the `index_granularity` setting), it makes sense to set a sample that is much larger than the size of the granule.
When using the `SAMPLE n` clause, the relative coefficient is calculated dynamically. Since you do not know which relative percent of data was processed, you do not know the coefficient the aggregate functions should be multiplied by (for example, you do not know if the `SAMPLE 1000000` was taken from a set of 10,000,000 rows or from a set of 1,000,000,000 rows). In this case, use the `_sample_factor` column to get the approximate result.
When using the `SAMPLE n` clause, the relative coefficient is calculated dynamically. Since you do not know which relative percent of data was processed, you do not know the coefficient the aggregate functions should be multiplied by (for example, you do not know if `SAMPLE 1000000` was taken from a set of 10,000,000 rows or from a set of 1,000,000,000 rows). In this case, use the `_sample_factor` virtual column to get the approximate result.
The `_sample_factor` is a virtual column that ClickHouse stores relative coefficients in. This column is created automatically when you create a table with the specified sampling key. The usage example is shown below:
The `_sample_factor` column is where ClickHouse stores relative coefficients. This column is created automatically when you create a table with the specified sampling key. The usage example is shown below:
``` sql
SELECT sum(Duration * _sample_factor)
@ -94,7 +94,7 @@ FROM visits
SAMPLE 10000000
```
If you need to get the approximate count of rows in a `SELECT .. SAMPLE n` query, get the sum() of `_sample_factor` column instead of counting `count(column * _sample_factor)` value. For example:
If you need to get the approximate count of rows in a `SELECT .. SAMPLE n` query, get the sum() of the `_sample_factor` column instead of counting the `count(*) * _sample_factor` value. For example:
``` sql
SELECT sum(_sample_factor)
@ -102,7 +102,7 @@ FROM visits
SAMPLE 10000000
```
Note that to calculate the average in a `SELECT .. SAMPLE n` query, you do not need to use `_sample_factor` column:
Note that to calculate the average in a `SELECT .. SAMPLE n` query, you do not need to use the `_sample_factor` column:
``` sql
SELECT avg(Duration)
@ -120,7 +120,7 @@ Example 1.
SAMPLE 1/10
```
In this example, the sample is the 1/10th of all data:
In this example, the sample is 1/10th of all data:
`[++------------------]`
@ -130,7 +130,7 @@ Example 2.
SAMPLE 1/10 OFFSET 1/2
```
Here, the sample of 10% is taken from the second half of data.
Here, a sample of 10% is taken from the second half of data.
`[----------++--------]`