Merge pull request #1391 from yandex/arbitrary-partitioning

Custom partitioning 3
This commit is contained in:
alexey-milovidov 2017-10-24 21:18:34 +03:00 committed by GitHub
commit c8d66bfed0
54 changed files with 1148 additions and 723 deletions

View File

@ -450,7 +450,7 @@ void DatabaseOrdinary::alterTable(
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
const ASTModifier & storage_modifier)
{
/// Read the definition of the table and replace the necessary parts with new ones.
@ -471,14 +471,10 @@ void DatabaseOrdinary::alterTable(
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns, materialized_columns, alias_columns, column_defaults);
auto it = std::find(ast_create_query.children.begin(), ast_create_query.children.end(), ast_create_query.columns);
if (it == ast_create_query.children.end())
throw Exception("Logical error: cannot find columns child in ASTCreateQuery", ErrorCodes::LOGICAL_ERROR);
*it = new_columns;
ast_create_query.columns = new_columns;
ast_create_query.replace(ast_create_query.columns, new_columns);
if (engine_modifier)
engine_modifier(ast_create_query.storage);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);
statement = getTableDefinitionFromCreateQuery(ast);

View File

@ -27,7 +27,7 @@ String getTableDefinitionFromCreateQuery(const ASTPtr & query)
create.if_not_exists = false;
create.is_populate = false;
String engine = typeid_cast<ASTFunction &>(*create.storage).name;
String engine = create.storage->engine->name;
/// For engine VIEW it is necessary to save the SELECT query itself, for the rest - on the contrary
if (engine != "View" && engine != "MaterializedView")
@ -59,7 +59,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.
InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(ast_create_query.columns, context);
InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(*ast_create_query.columns, context);
String storage_name;
@ -68,14 +68,14 @@ std::pair<String, StoragePtr> createTableFromDefinition(
else if (ast_create_query.is_materialized_view)
storage_name = "MaterializedView";
else
storage_name = typeid_cast<ASTFunction &>(*ast_create_query.storage).name;
storage_name = ast_create_query.storage->engine->name;
return
{
ast_create_query.table,
StorageFactory::instance().get(
storage_name, database_data_path, ast_create_query.table, database_name, context,
context.getGlobalContext(), ast, columns_info.columns,
context.getGlobalContext(), ast_create_query, columns_info.columns,
columns_info.materialized_columns, columns_info.alias_columns, columns_info.column_defaults,
true, has_force_restore_data_flag)
};

View File

@ -107,7 +107,7 @@ public:
IDatabase & to_database,
const String & to_name) = 0;
using ASTModifier = std::function<void(ASTPtr &)>;
using ASTModifier = std::function<void(IAST &)>;
/// Change the table structure in metadata.
/// You must call under the TableStructureLock of the corresponding table . If engine_modifier is empty, then engine does not change.

View File

@ -79,23 +79,26 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (!create.storage)
{
database_engine_name = "Ordinary"; /// Default database engine.
auto func = std::make_shared<ASTFunction>();
func->name = database_engine_name;
create.storage = func;
auto engine = std::make_shared<ASTFunction>();
engine->name = database_engine_name;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
else
{
const ASTFunction & engine_id = typeid_cast<const ASTFunction &>(*create.storage);
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if (engine_id.arguments || engine_id.parameters)
if (engine.arguments || engine.parameters
|| storage.partition_by || storage.order_by || storage.sample_by || storage.settings)
{
std::stringstream ostr;
formatAST(*create.storage, ostr, 0, false, false);
formatAST(storage, ostr, 0, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
database_engine_name = engine_id.name;
database_engine_name = engine.name;
}
String database_name_escaped = escapeForFileName(database_name);
@ -160,11 +163,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;
/// AST to the list of columns with types. Columns of Nested type are expanded into a list of real columns.
static ColumnsAndDefaults parseColumns(
ASTPtr expression_list, const Context & context)
static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast, const Context & context)
{
auto & column_list_ast = typeid_cast<ASTExpressionList &>(*expression_list);
/// list of table columns in correct order
NamesAndTypesList columns{};
ColumnDefaults defaults{};
@ -177,7 +177,7 @@ static ColumnsAndDefaults parseColumns(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
default_expr_list->children.reserve(column_list_ast.children.size());
for (auto & ast : column_list_ast.children)
for (const auto & ast : column_list_ast.children)
{
auto & col_decl = typeid_cast<ASTColumnDeclaration &>(*ast);
@ -346,7 +346,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(NamesAndTypesList columns,
InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::getColumnsInfo(
const ASTPtr & columns, const Context & context)
const ASTExpressionList & columns, const Context & context)
{
ColumnsInfo res;
@ -370,7 +370,7 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
if (create.columns)
{
res = getColumnsInfo(create.columns, context);
res = getColumnsInfo(*create.columns, context);
}
else if (!create.as_table.empty())
{
@ -391,16 +391,9 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(*res.columns, res.materialized_columns, res.alias_columns, res.column_defaults);
if (create.columns)
{
auto it = std::find(create.children.begin(), create.children.end(), create.columns);
if (it != create.children.end())
*it = new_columns;
else
create.children.push_back(new_columns);
}
create.replace(create.columns, new_columns);
else
create.children.push_back(new_columns);
create.columns = new_columns;
create.set(create.columns, new_columns);
/// Check for duplicates
std::set<String> all_columns;
@ -421,22 +414,23 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
}
String InterpreterCreateQuery::setEngine(
ASTCreateQuery & create, const StoragePtr & as_storage) const
String InterpreterCreateQuery::setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const
{
String storage_name;
auto set_engine = [&](const char * engine)
{
storage_name = engine;
auto func = std::make_shared<ASTFunction>();
func->name = engine;
create.storage = func;
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = engine;
auto storage_ast = std::make_shared<ASTStorage>();
storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
};
if (create.storage)
{
storage_name = typeid_cast<ASTFunction &>(*create.storage).name;
storage_name = create.storage->engine->name;
}
else if (!create.as_table.empty())
{
@ -445,17 +439,17 @@ String InterpreterCreateQuery::setEngine(
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
String as_table_name = create.as_table;
auto as_create_ptr = context.getCreateQuery(as_database_name, as_table_name);
auto & as_create = typeid_cast<const ASTCreateQuery &>(*as_create_ptr);
ASTPtr as_create_ptr = context.getCreateQuery(as_database_name, as_table_name);
const auto & as_create = typeid_cast<const ASTCreateQuery &>(*as_create_ptr);
if (!create.storage)
{
if (as_create.is_view || as_create.is_materialized_view)
create.storage = as_create.inner_storage;
create.set(create.storage, as_create.inner_storage->ptr());
else
create.storage = as_create.storage;
create.set(create.storage, as_create.storage->ptr());
storage_name = typeid_cast<const ASTFunction &>(*create.storage).name;
storage_name = create.storage->engine->name;
}
else
storage_name = as_storage->getName();
@ -494,7 +488,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// For `view` type tables, you may need `sample_block` to get the columns.
if (create.select && (!create.attach || (!create.columns && (create.is_view || create.is_materialized_view))))
{
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select, context);
create.select->setDatabaseIfNeeded(database_name);
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select->ptr(), context);
as_select_sample = interpreter_select->getSampleBlock();
}
@ -543,7 +538,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns.columns,
context.getGlobalContext(), create, columns.columns,
columns.materialized_columns, columns.alias_columns, columns.column_defaults, create.attach, false);
if (create.is_temporary)

View File

@ -11,6 +11,7 @@ namespace DB
class Context;
class ASTCreateQuery;
class ASTExpressionList;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
@ -52,7 +53,7 @@ public:
};
/// Obtain information about columns, their types and default values, for case when columns in CREATE query is specified explicitly.
static ColumnsInfo getColumnsInfo(const ASTPtr & columns, const Context & context);
static ColumnsInfo getColumnsInfo(const ASTExpressionList & columns, const Context & context);
private:
BlockIO createDatabase(ASTCreateQuery & create);

View File

@ -12,7 +12,7 @@ class ASTSetQuery;
using ASTPtr = std::shared_ptr<IAST>;
/** Change one or several settings, for session or globally, or just for current context.
/** Change one or several settings for the session or just for the current context.
*/
class InterpreterSetQuery : public IInterpreter
{
@ -20,7 +20,7 @@ public:
InterpreterSetQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
/** Usual SET query. Set setting for session or globally (if GLOBAL was specified).
/** Usual SET query. Set setting for the session.
*/
BlockIO execute() override;
@ -34,8 +34,6 @@ public:
private:
ASTPtr query_ptr;
Context & context;
void executeImpl(const ASTSetQuery & ast, Context & target);
};

View File

@ -298,13 +298,11 @@ struct Settings
/* Timeout for DDL query responses from all hosts in cluster. Negative value means infinite. */ \
M(SettingInt64, distributed_ddl_task_timeout, 120) \
\
/** If true, and the date parameter of MergeTree engines is an expression (not a column name), \
* it will be interpreted as the partitioning expression, allowing custom partitions. \
* IMPORTANT: Don't use this setting just yet. \
* It is for testing purposes, the syntax will likely change soon and the server will not be able \
* to load the tables created this way. You have been warned. \
/** If true, allow parameters of storage engines such as partitioning expression, primary key, etc. \
* to be set not in the engine parameters but as separate clauses (PARTITION BY, ORDER BY...) \
* Enable this setting to allow custom MergeTree partitions. \
*/ \
M(SettingBool, experimental_merge_tree_allow_custom_partitions, false) \
M(SettingBool, experimental_allow_extended_storage_definition_syntax, false) \
/* Timeout for flushing data from streaming storages. */ \
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS) \
/* Schema identifier (used by schema-based formats) */ \

View File

@ -10,7 +10,7 @@
#include <Common/Stopwatch.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTInsertQuery.h>
@ -86,7 +86,7 @@ public:
Context & context_,
const String & database_name_,
const String & table_name_,
const String & engine_,
const String & storage_def_,
size_t flush_interval_milliseconds_);
~SystemLog();
@ -105,8 +105,8 @@ private:
Context & context;
const String database_name;
const String table_name;
const String storage_def;
StoragePtr table;
const String engine;
const size_t flush_interval_milliseconds;
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
@ -142,10 +142,10 @@ template <typename LogElement>
SystemLog<LogElement>::SystemLog(Context & context_,
const String & database_name_,
const String & table_name_,
const String & engine_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: context(context_),
database_name(database_name_), table_name(table_name_), engine(engine_),
database_name(database_name_), table_name(table_name_), storage_def(storage_def_),
flush_interval_milliseconds(flush_interval_milliseconds_)
{
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
@ -328,11 +328,13 @@ void SystemLog<LogElement>::prepareTable()
create->table = table_name;
Block sample = LogElement::createBlock();
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
create->set(create->columns, InterpreterCreateQuery::formatColumns(sample.getColumnsList()));
ParserFunction engine_parser;
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name());
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name());
create->set(create->storage, storage_ast);
InterpreterCreateQuery(create, context).execute();

View File

@ -1,7 +1,6 @@
#include <Core/Block.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ExpressionElementParsers.h>
@ -23,7 +22,7 @@ namespace ErrorCodes
}
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::shared_ptr<IAST> & node, const Context & context)
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context)
{
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(
node, context, nullptr, NamesAndTypesList{{ "_dummy", std::make_shared<DataTypeUInt8>() }}).getConstActions();
@ -51,7 +50,7 @@ std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::sha
}
ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & context)
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
if (typeid_cast<const ASTLiteral *>(node.get()))
return node;
@ -61,7 +60,7 @@ ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & contex
}
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(ASTPtr & node, const Context & context)
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context)
{
if (auto id = typeid_cast<const ASTIdentifier *>(node.get()))
return std::make_shared<ASTLiteral>(node->range, Field(id->name));

View File

@ -2,13 +2,13 @@
#include <memory>
#include <Core/Field.h>
#include <Parsers/IAST.h>
#include <Parsers/IParser.h>
namespace DB
{
class IAST;
class Context;
class IDataType;
@ -17,13 +17,13 @@ class IDataType;
* Used in rare cases - for elements of set for IN, for data to INSERT.
* Quite suboptimal.
*/
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::shared_ptr<IAST> & node, const Context & context);
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context);
/** Evaluate constant expression
* and returns ASTLiteral with its value.
*/
std::shared_ptr<IAST> evaluateConstantExpressionAsLiteral(std::shared_ptr<IAST> & node, const Context & context);
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context);
/** Evaluate constant expression
@ -31,7 +31,7 @@ std::shared_ptr<IAST> evaluateConstantExpressionAsLiteral(std::shared_ptr<IAST>
* Also, if AST is identifier, then return string literal with its name.
* Useful in places where some name may be specified as identifier, or as result of a constant expression.
*/
std::shared_ptr<IAST> evaluateConstantExpressionOrIdentifierAsLiteral(std::shared_ptr<IAST> & node, const Context & context);
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context);
/** Parses a name of an object which could be written in 3 forms:
* name, `name` or 'name' */

View File

@ -2,6 +2,8 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -9,6 +11,69 @@
namespace DB
{
class ASTStorage : public IAST
{
public:
ASTFunction * engine = nullptr;
IAST * partition_by = nullptr;
IAST * order_by = nullptr;
IAST * sample_by = nullptr;
ASTSetQuery * settings = nullptr;
ASTStorage() = default;
ASTStorage(StringRange range_) : IAST(range_) {}
String getID() const override { return "Storage definition"; }
ASTPtr clone() const override
{
auto res = std::make_shared<ASTStorage>(*this);
res->children.clear();
if (engine)
res->set(res->engine, engine->clone());
if (partition_by)
res->set(res->partition_by, partition_by->clone());
if (order_by)
res->set(res->order_by, order_by->clone());
if (sample_by)
res->set(res->sample_by, sample_by->clone());
if (settings)
res->set(res->settings, settings->clone());
return res;
}
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
if (engine)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ENGINE" << (s.hilite ? hilite_none : "") << " = ";
engine->formatImpl(s, state, frame);
}
if (partition_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "PARTITION BY " << (s.hilite ? hilite_none : "");
partition_by->formatImpl(s, state, frame);
}
if (order_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ORDER BY " << (s.hilite ? hilite_none : "");
order_by->formatImpl(s, state, frame);
}
if (sample_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SAMPLE BY " << (s.hilite ? hilite_none : "");
sample_by->formatImpl(s, state, frame);
}
if (settings)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SETTINGS " << (s.hilite ? hilite_none : "");
settings->formatImpl(s, state, frame);
}
}
};
/// CREATE TABLE or ATTACH TABLE query
class ASTCreateQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
@ -22,12 +87,12 @@ public:
bool is_temporary{false};
String database;
String table;
ASTPtr columns;
ASTPtr storage;
ASTPtr inner_storage; /// Internal engine for the CREATE MATERIALIZED VIEW query
ASTExpressionList * columns = nullptr;
ASTStorage * storage = nullptr;
ASTStorage * inner_storage = nullptr; /// Internal engine for the CREATE MATERIALIZED VIEW query
String as_database;
String as_table;
ASTPtr select;
ASTSelectQuery * select = nullptr;
ASTCreateQuery() = default;
ASTCreateQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
@ -40,10 +105,14 @@ public:
auto res = std::make_shared<ASTCreateQuery>(*this);
res->children.clear();
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (storage) { res->storage = storage->clone(); res->children.push_back(res->storage); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); }
if (columns)
res->set(res->columns, columns->clone());
if (storage)
res->set(res->storage, storage->clone());
if (select)
res->set(res->select, select->clone());
if (inner_storage)
res->set(res->inner_storage, inner_storage->clone());
cloneOutputOptions(*res);
@ -77,10 +146,7 @@ protected:
formatOnCluster(settings);
if (storage)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
storage->formatImpl(settings, state, frame);
}
return;
}
@ -119,16 +185,10 @@ protected:
}
if (storage && !is_materialized_view && !is_view)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
storage->formatImpl(settings, state, frame);
}
if (inner_storage)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
inner_storage->formatImpl(settings, state, frame);
}
if (is_populate)
{

View File

@ -17,14 +17,14 @@ public:
ASTQueryWithOutput() = default;
explicit ASTQueryWithOutput(const StringRange range_) : IAST(range_) {}
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final;
protected:
/// NOTE: call this helper at the end of the clone() method of descendant class.
void cloneOutputOptions(ASTQueryWithOutput & cloned) const;
/// Format only the query part of the AST (without output options).
virtual void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const = 0;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final;
};

View File

@ -315,15 +315,7 @@ void ASTSelectQuery::formatQueryImpl(const FormatSettings & s, FormatState & sta
if (settings)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SETTINGS " << (s.hilite ? hilite_none : "");
const ASTSetQuery & ast_set = typeid_cast<const ASTSetQuery &>(*settings);
for (ASTSetQuery::Changes::const_iterator it = ast_set.changes.begin(); it != ast_set.changes.end(); ++it)
{
if (it != ast_set.changes.begin())
s.ostr << ", ";
s.ostr << it->name << " = " << applyVisitor(FieldVisitorToString(), it->value);
}
settings->formatImpl(s, state, frame);
}
if (next_union_all)

View File

@ -14,6 +14,8 @@ namespace DB
class ASTSetQuery : public IAST
{
public:
bool is_standalone = true; /// If false, this AST is a part of another query, such as SELECT.
struct Change
{
String name;
@ -31,10 +33,10 @@ public:
ASTPtr clone() const override { return std::make_shared<ASTSetQuery>(*this); }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : "");
if (is_standalone)
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : "");
for (ASTSetQuery::Changes::const_iterator it = changes.begin(); it != changes.end(); ++it)
{

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
extern const int NOT_A_COLUMN;
extern const int UNKNOWN_TYPE_OF_AST_NODE;
extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int LOGICAL_ERROR;
}
using IdentifierNameSet = std::set<String>;
@ -33,7 +34,7 @@ class WriteBuffer;
/** Element of the syntax tree (hereinafter - directed acyclic graph with elements of semantics)
*/
class IAST
class IAST : public std::enable_shared_from_this<IAST>
{
public:
ASTs children;
@ -66,6 +67,8 @@ public:
/** Get the text that identifies this element. */
virtual String getID() const = 0;
ASTPtr ptr() { return shared_from_this(); }
/** Get a deep copy of the tree. */
virtual ASTPtr clone() const = 0;
@ -110,6 +113,42 @@ public:
child->collectIdentifierNames(set);
}
template <typename T>
void set(T * & field, const ASTPtr & child)
{
if (!child)
return;
T * casted = dynamic_cast<T *>(child.get());
if (!casted)
throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR);
children.push_back(child);
field = casted;
}
template <typename T>
void replace(T * & field, const ASTPtr & child)
{
if (!child)
throw Exception("Trying to replace AST subtree with nullptr", ErrorCodes::LOGICAL_ERROR);
T * casted = dynamic_cast<T *>(child.get());
if (!casted)
throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR);
for (ASTPtr & current_child : children)
{
if (current_child.get() == field)
{
current_child = child;
field = casted;
return;
}
}
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
/// Convert to a string.

View File

@ -5,6 +5,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ParserSetQuery.h>
namespace DB
@ -106,21 +107,78 @@ bool ParserColumnDeclarationList::parseImpl(Pos & pos, ASTPtr & node, Expected &
}
bool ParserEngine::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_engine("ENGINE");
ParserToken s_eq(TokenType::Equals);
ParserIdentifierWithOptionalParameters storage_p;
ParserKeyword s_partition_by("PARTITION BY");
ParserKeyword s_order_by("ORDER BY");
ParserKeyword s_sample_by("SAMPLE BY");
ParserKeyword s_settings("SETTINGS");
if (s_engine.ignore(pos, expected))
ParserIdentifierWithOptionalParameters ident_with_optional_params_p;
ParserExpression expression_p;
ParserSetQuery settings_p(/* parse_only_internals_ = */ true);
Pos begin = pos;
ASTPtr engine;
ASTPtr partition_by;
ASTPtr order_by;
ASTPtr sample_by;
ASTPtr settings;
if (!s_engine.ignore(pos, expected))
return false;
s_eq.ignore(pos, expected);
if (!ident_with_optional_params_p.parse(pos, engine, expected))
return false;
while (true)
{
if (!s_eq.ignore(pos, expected))
return false;
if (!partition_by && s_partition_by.ignore(pos, expected))
{
if (expression_p.parse(pos, partition_by, expected))
continue;
else
return false;
}
if (!storage_p.parse(pos, node, expected))
return false;
if (!order_by && s_order_by.ignore(pos, expected))
{
if (expression_p.parse(pos, order_by, expected))
continue;
else
return false;
}
if (!sample_by && s_sample_by.ignore(pos, expected))
{
if (expression_p.parse(pos, sample_by, expected))
continue;
else
return false;
}
if (s_settings.ignore(pos, expected))
{
if (!settings_p.parse(pos, settings, expected))
return false;
}
break;
}
auto storage = std::make_shared<ASTStorage>(StringRange(begin, pos));
storage->set(storage->engine, engine);
storage->set(storage->partition_by, partition_by);
storage->set(storage->order_by, order_by);
storage->set(storage->sample_by, sample_by);
storage->set(storage->settings, settings);
node = storage;
return true;
}
@ -136,16 +194,16 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_database("DATABASE");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_as("AS");
ParserKeyword s_select("SELECT");
ParserKeyword s_view("VIEW");
ParserKeyword s_materialized("MATERIALIZED");
ParserKeyword s_populate("POPULATE");
ParserToken s_dot(TokenType::Dot);
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
ParserEngine engine_p;
ParserStorage storage_p;
ParserIdentifier name_p;
ParserColumnDeclarationList columns_p;
ParserSelectQuery select_p;
ASTPtr database;
ASTPtr table;
@ -190,7 +248,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
}
else if (s_table.ignore(pos, expected))
{
@ -222,39 +280,31 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!s_rparen.ignore(pos, expected))
return false;
if (!engine_p.parse(pos, storage, expected))
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
return false;
/// For engine VIEW, you also need to read AS SELECT
if (storage && (typeid_cast<ASTFunction &>(*storage).name == "View"
|| typeid_cast<ASTFunction &>(*storage).name == "MaterializedView"))
if (storage)
{
if (!s_as.ignore(pos, expected))
return false;
Pos before_select = pos;
if (!s_select.ignore(pos, expected))
return false;
pos = before_select;
ParserSelectQuery select_p;
select_p.parse(pos, select, expected);
const auto & storage_ast = typeid_cast<const ASTStorage &>(*storage);
/// For engine VIEW, you also need to read AS SELECT
if (storage_ast.engine->name == "View" || storage_ast.engine->name == "MaterializedView")
{
if (!s_as.ignore(pos, expected))
return false;
if (!select_p.parse(pos, select, expected))
return false;
}
}
}
else
{
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
if (!s_as.ignore(pos, expected))
return false;
/// AS SELECT ...
Pos before_select = pos;
if (s_select.ignore(pos, expected))
{
pos = before_select;
ParserSelectQuery select_p;
select_p.parse(pos, select, expected);
}
else
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
@ -268,7 +318,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
/// Optional - ENGINE can be specified.
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
}
}
}
@ -315,7 +365,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
/// Optional - internal ENGINE for MATERIALIZED VIEW can be specified
engine_p.parse(pos, inner_storage, expected);
storage_p.parse(pos, inner_storage, expected);
if (s_populate.ignore(pos, expected))
is_populate = true;
@ -344,25 +394,15 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (table)
query->table = typeid_cast<ASTIdentifier &>(*table).name;
query->cluster = cluster_str;
if (inner_storage)
query->inner_storage = inner_storage;
query->set(query->inner_storage, inner_storage);
query->columns = columns;
query->storage = storage;
query->set(query->columns, columns);
query->set(query->storage, storage);
if (as_database)
query->as_database = typeid_cast<ASTIdentifier &>(*as_database).name;
if (as_table)
query->as_table = typeid_cast<ASTIdentifier &>(*as_table).name;
query->select = select;
if (columns)
query->children.push_back(columns);
if (storage)
query->children.push_back(storage);
if (select)
query->children.push_back(select);
if (inner_storage)
query->children.push_back(inner_storage);
query->set(query->select, select);
return true;
}

View File

@ -187,11 +187,11 @@ protected:
};
/** ENGINE = name. */
class ParserEngine : public IParserBase
/** ENGINE = name [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name = value, ...] */
class ParserStorage : public IParserBase
{
protected:
const char * getName() const { return "ENGINE"; }
const char * getName() const { return "storage definition"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};

View File

@ -47,7 +47,6 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parse_only_internals)
{
ParserKeyword s_set("SET");
ParserKeyword s_global("GLOBAL");
if (!s_set.ignore(pos, expected))
return false;
@ -69,6 +68,7 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTSetQuery>(StringRange(begin, pos));
node = query;
query->is_standalone = !parse_only_internals;
query->changes = changes;
return true;

View File

@ -8,7 +8,7 @@ namespace DB
{
/** Query like this:
* SET [GLOBAL] name1 = value1, name2 = value2, ...
* SET name1 = value1, name2 = value2, ...
*/
class ParserSetQuery : public IParserBase
{
@ -19,7 +19,7 @@ protected:
const char * getName() const override { return "SET query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
/// Parse the list `name = value` pairs, without SET [GLOBAL].
/// Parse the list `name = value` pairs, without SET.
bool parse_only_internals;
};

View File

@ -30,6 +30,7 @@ using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class ASTCreateQuery;
class IStorage;

View File

@ -83,7 +83,7 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block)
: data_part(data_part_)
{
number_of_rows_in_part = data_part->getExactSizeRows();
number_of_rows_in_part = data_part->rows_count;
/// Initialize with sample block untill update won't called.
initialize(sample_block, columns);
}

View File

@ -86,7 +86,6 @@ MergeTreeData::MergeTreeData(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
@ -96,7 +95,7 @@ MergeTreeData::MergeTreeData(
PartsCleanCallback parts_clean_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
index_granularity(settings_.index_granularity),
merging_params(merging_params_),
settings(settings_),
primary_expr_ast(primary_expr_ast_),
@ -110,11 +109,16 @@ MergeTreeData::MergeTreeData(
{
merging_params.check(*columns);
if (primary_expr_ast && merging_params.mode == MergingParams::Unsorted)
throw Exception("Primary key cannot be set for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Primary key can be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
initPrimaryKey();
if (sampling_expression && (!primary_expr_ast || !primary_key_sample.has(sampling_expression->getColumnName())))
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeDataFormatVersion min_format_version(0);
if (!date_column_name.empty())
{
@ -577,7 +581,7 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
time_t current_time = time(nullptr);
ssize_t deadline = (custom_directories_lifetime_seconds >= 0)
? current_time - custom_directories_lifetime_seconds
: current_time - settings.temporary_directories_lifetime;
: current_time - settings.temporary_directories_lifetime.totalSeconds();
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
@ -622,7 +626,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{
if (it->unique() && /// After this ref_count cannot increase.
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds())
{
res.push_back(*it);
all_data_parts.erase(it++);
@ -1087,7 +1091,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
else
{
const IDataType & type = *new_primary_key_sample.safeGetByPosition(i).type;
new_index[i] = type.createConstColumn(part->size, type.getDefault())->convertToFullColumnIfConst();
new_index[i] = type.createConstColumn(part->marks_count, type.getDefault())->convertToFullColumnIfConst();
}
}
@ -1098,7 +1102,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
WriteBufferFromFile index_file(index_tmp_path);
HashingWriteBuffer index_stream(index_file);
for (size_t i = 0, size = part->size; i < size; ++i)
for (size_t i = 0, marks_count = part->marks_count; i < marks_count; ++i)
for (size_t j = 0; j < new_key_size; ++j)
new_primary_key_sample.getByPosition(j).type->serializeBinary(*new_index[j].get(), i, index_stream);
@ -1118,7 +1122,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
/// Apply the expression and write the result to temporary files.
if (expression)
{
MarkRanges ranges{MarkRange(0, part->size)};
MarkRanges ranges{MarkRange(0, part->marks_count)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);

View File

@ -62,7 +62,8 @@ namespace ErrorCodes
/// Part directory - / partiiton-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// partition.dat - contains the value of the partitioning expression
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
/// Several modes are implemented. Modes determine additional actions during merge:
@ -236,7 +237,6 @@ public:
/// primary_expr_ast - expression used for sorting; empty for UnsortedMergeTree.
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
/// Otherwise, partition_expr_ast is used for partitioning.
/// index_granularity - how many rows correspond to one primary key value.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData( const String & database_, const String & table_,
@ -249,7 +249,6 @@ public:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,

View File

@ -499,7 +499,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
merge_entry->total_size_marks += part->marks_count;
}
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
@ -557,7 +557,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->size)),
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback(
@ -691,7 +691,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
@ -755,7 +755,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns);
/// For convenience, even CollapsingSortedBlockInputStream can not return zero rows.
if (0 == to.marksCount())
if (0 == to.getRowsCount())
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
return new_data_part;
@ -862,12 +862,16 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// Merge all parts of the partition.
size_t total_input_rows = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
{
total_input_rows += part->rows_count;
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
merge_entry->total_size_marks += part->marks_count;
}
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
@ -882,22 +886,18 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
MarkRanges ranges(1, MarkRange(0, parts[i]->marks_count));
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names,
ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
input->setProgressCallback([&merge_entry, total_input_rows] (const Progress & value)
{
const auto new_rows_read = merge_entry->rows_read += value.rows;
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
merge_entry->progress = static_cast<Float64>(new_rows_read) / total_input_rows;
merge_entry->bytes_read_uncompressed += value.bytes;
});
@ -906,8 +906,6 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
src_streams.emplace_back(std::move(input));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Sharding of merged blocks.
@ -1038,7 +1036,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / total_input_rows)) * initial_reservation));
}
}
@ -1050,7 +1048,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
abortReshardPartitionIfRequested();
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
if (0 == output_stream->marksCount())
if (0 == output_stream->getRowsCount())
{
/// There was no data in this shard. Ignore.
LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first);

View File

@ -319,6 +319,7 @@ void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & storage, const
HashingWriteBuffer out_hashing(out);
type->serializeBinary(min_values[i], out_hashing);
type->serializeBinary(max_values[i], out_hashing);
out_hashing.next();
checksums.files[file_name].file_size = out_hashing.count();
checksums.files[file_name].file_hash = out_hashing.getHash();
}
@ -426,43 +427,6 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
}
size_t MergeTreeDataPart::getExactSizeRows() const
{
size_t rows_approx = storage.index_granularity * size;
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn();
const auto checksum = tryGetBinChecksum(column.name);
/// Should be fixed non-nullable column
if (!checksum || !column_col->isFixed() || column_col->isNullable())
continue;
size_t sizeof_field = column_col->sizeOfField();
size_t rows = checksum->uncompressed_size / sizeof_field;
if (checksum->uncompressed_size % sizeof_field != 0)
{
throw Exception(
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
+ ", sizeof " + toString(sizeof_field),
ErrorCodes::LOGICAL_ERROR);
}
if (!(rows_approx - storage.index_granularity < rows && rows <= rows_approx))
{
throw Exception("Unexpected size of column " + column.name + ": " + toString(rows) + " rows",
ErrorCodes::LOGICAL_ERROR);
}
return rows;
}
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
}
String MergeTreeDataPart::getFullPath() const
{
if (relative_path.empty())
@ -647,6 +611,7 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
loadIndex();
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `marks_count`.
loadPartitionAndMinMaxIndex();
if (check_consistency)
checkConsistency(require_columns_checksums);
@ -655,13 +620,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
void MergeTreeDataPart::loadIndex()
{
/// Size - in number of marks.
if (!size)
if (!marks_count)
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
size = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
}
@ -675,20 +639,20 @@ void MergeTreeDataPart::loadIndex()
for (size_t i = 0; i < key_size; ++i)
{
index[i] = storage.primary_key_data_types[i]->createColumn();
index[i]->reserve(size);
index[i]->reserve(marks_count);
}
String index_path = getFullPath() + "primary.idx";
ReadBufferFromFile index_file = openForReading(index_path);
for (size_t i = 0; i < size; ++i)
for (size_t i = 0; i < marks_count; ++i)
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*index[j].get(), index_file);
for (size_t i = 0; i < key_size; ++i)
if (index[i]->size() != size)
if (index[i]->size() != marks_count)
throw Exception("Cannot read all data from index file " + index_path
+ "(expected size: " + toString(size) + ", read: " + toString(index[i]->size()) + ")",
+ "(expected size: " + toString(marks_count) + ", read: " + toString(index[i]->size()) + ")",
ErrorCodes::CANNOT_READ_ALL_DATA);
if (!index_file.eof())
@ -740,6 +704,54 @@ void MergeTreeDataPart::loadChecksums(bool require)
assertEOF(file);
}
void MergeTreeDataPart::loadRowsCount()
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
String path = getFullPath() + "count.txt";
if (!Poco::File(path).exists())
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
ReadBufferFromFile file = openForReading(path);
readIntText(rows_count, file);
assertEOF(file);
}
else
{
size_t rows_approx = storage.index_granularity * marks_count;
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn();
const auto checksum = tryGetBinChecksum(column.name);
/// Should be fixed non-nullable column
if (!checksum || !column_col->isFixed() || column_col->isNullable())
continue;
size_t sizeof_field = column_col->sizeOfField();
rows_count = checksum->uncompressed_size / sizeof_field;
if (checksum->uncompressed_size % sizeof_field != 0)
{
throw Exception(
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
+ ", sizeof " + toString(sizeof_field),
ErrorCodes::LOGICAL_ERROR);
}
if (!(rows_count <= rows_approx && rows_approx < rows_count + storage.index_granularity))
throw Exception(
"Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows",
ErrorCodes::LOGICAL_ERROR);
return;
}
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
}
}
void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
@ -799,6 +811,9 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (!checksums.files.count("count.txt"))
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.partition_expr && !checksums.files.count("partition.dat"))
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
@ -827,6 +842,8 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(path + "count.txt");
if (storage.partition_expr)
check_file_not_empty(path + "partition.dat");

View File

@ -108,9 +108,6 @@ struct MergeTreeDataPart
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
/// If part has column with fixed size, will return exact size of part (in rows)
size_t getExactSizeRows() const;
/// Returns full path to part dir
String getFullPath() const;
@ -132,7 +129,8 @@ struct MergeTreeDataPart
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
mutable String relative_path;
size_t size = 0; /// in number of marks.
size_t rows_count = 0;
size_t marks_count = 0;
std::atomic<size_t> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
/// is used from several threads without locks (it is changed with ALTER).
time_t modification_time = 0;
@ -239,9 +237,13 @@ private:
/// If checksums.txt exists, reads files' checksums (and sizes) from it
void loadChecksums(bool require);
/// Loads index file. Also calculates this->size if size=0
/// Loads index file. Also calculates this->marks_count if marks_count = 0
void loadIndex();
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
void loadPartitionAndMinMaxIndex();
void checkConsistency(bool require_part_metadata);

View File

@ -526,7 +526,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
if (!ranges.ranges.empty())
{

View File

@ -125,6 +125,7 @@ void MergeTreePartition::store(const MergeTreeData & storage, const String & par
HashingWriteBuffer out_hashing(out);
for (size_t i = 0; i < value.size(); ++i)
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
}

View File

@ -403,7 +403,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
std::string filename = name + NULL_MAP_EXTENSION;
streams.emplace(filename, std::make_unique<Stream>(
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->size,
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
@ -425,7 +425,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
if (!streams.count(size_name))
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->size,
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
@ -436,7 +436,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
}
else
streams.emplace(name, std::make_unique<Stream>(
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->size,
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
}

View File

@ -0,0 +1,75 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
}
void MergeTreeSettings::loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
if (!config.has(config_elem))
return;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
for (const String & key : config_keys)
{
String value = config.getString(config_elem + "." + key);
#define SET(TYPE, NAME, DEFAULT) \
else if (key == #NAME) NAME.set(value);
if (false) {}
APPLY_FOR_MERGE_TREE_SETTINGS(SET)
else
throw Exception("Unknown MergeTree setting " + key + " in config", ErrorCodes::INVALID_CONFIG_PARAMETER);
#undef SET
}
}
void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
for (const ASTSetQuery::Change & setting : storage_def.settings->changes)
{
#define SET(TYPE, NAME, DEFAULT) \
else if (setting.name == #NAME) NAME.set(setting.value);
if (false) {}
APPLY_FOR_MERGE_TREE_SETTINGS(SET)
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + storage_def.engine->name,
ErrorCodes::BAD_ARGUMENTS);
#undef SET
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
ASTSetQuery::Changes & changes = storage_def.settings->changes;
#define ADD_IF_ABSENT(NAME) \
if (std::find_if(changes.begin(), changes.end(), \
[](const ASTSetQuery::Change & c) { return c.name == #NAME; }) \
== changes.end()) \
changes.push_back(ASTSetQuery::Change{#NAME, NAME.value});
APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(ADD_IF_ABSENT);
#undef ADD_IF_ABSENT
}
}

View File

@ -3,173 +3,143 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Interpreters/SettingsCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
class ASTStorage;
/** Advanced settings of MergeTree.
* Could be loaded from config.
/** Settings for the MergeTree family of engines.
* Could be loaded from config or from a CREATE TABLE query (SETTINGS clause).
*/
struct MergeTreeSettings
{
/** Merge settings. */
/// Maximum in total size of parts to merge, when there are maximum (minimum) free threads in background pool (or entries in replication queue).
size_t max_bytes_to_merge_at_max_space_in_pool = size_t(150) * 1024 * 1024 * 1024;
size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024;
#define APPLY_FOR_MERGE_TREE_SETTINGS(M) \
/** How many rows correspond to one primary key value. */ \
M(SettingUInt64, index_granularity, 8192) \
\
/** Merge settings. */ \
\
/** Maximum in total size of parts to merge, when there are maximum (minimum) free threads \
* in background pool (or entries in replication queue). */ \
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024) \
M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024) \
\
/** How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue. */ \
M(SettingUInt64, max_replicated_merges_in_queue, 16) \
\
/** When there is less than specified number of free entries in pool (or replicated queue), \
* start to lower maximum size of merge to process (or to put in queue). \
* This is to allow small merges to process - not filling the pool with long running merges. */ \
M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8) \
\
/** How many seconds to keep obsolete parts. */ \
M(SettingSeconds, old_parts_lifetime, 8 * 60) \
\
/** How many seconds to keep tmp_-directories. */ \
M(SettingSeconds, temporary_directories_lifetime, 86400) \
\
/** Inserts settings. */ \
\
/** If table contains at least that many active parts, artificially slow down insert into table. */ \
M(SettingUInt64, parts_to_delay_insert, 150) \
\
/** If more than this number active parts, throw 'Too much parts ...' exception */ \
M(SettingUInt64, parts_to_throw_insert, 300) \
\
/** Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts. */ \
M(SettingUInt64, max_delay_to_insert, 1) \
\
/** Replication settings. */ \
\
/** How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted). */ \
M(SettingUInt64, replicated_deduplication_window, 100) \
/** Similar to previous, but determines old blocks by their lifetime. \
* Hash of an inserted block will be deleted (and the block will not be deduplicated after) \
* if it outside of one "window". You can set very big replicated_deduplication_window to avoid \
* duplicating INSERTs during that period of time. */ \
M(SettingUInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60) /** one week */ \
\
/** Keep about this number of last records in ZooKeeper log, even if they are obsolete. \
* It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. */ \
M(SettingUInt64, replicated_logs_to_keep, 100) \
\
/** After specified amount of time passed after replication log entry creation \
* and sum size of parts is greater than threshold, \
* prefer fetching merged part from replica instead of doing merge locally. \
* To speed up very long merges. */ \
M(SettingSeconds, prefer_fetch_merged_part_time_threshold, 3600) \
M(SettingUInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024) \
\
/** Max broken parts, if more - deny automatic deletion. */ \
M(SettingUInt64, max_suspicious_broken_parts, 10) \
\
/** Not apply ALTER if number of files for modification(deletion, addition) more than this. */ \
M(SettingUInt64, max_files_to_modify_in_alter_columns, 75) \
/** Not apply ALTER, if number of files for deletion more than this. */ \
M(SettingUInt64, max_files_to_remove_in_alter_columns, 50) \
\
/** If ratio of wrong parts to total number of parts is less than this - allow to start. */ \
M(SettingFloat, replicated_max_ratio_of_wrong_parts, 0.5) \
\
/** Limit parallel fetches */ \
M(SettingUInt64, replicated_max_parallel_fetches, 0) \
M(SettingUInt64, replicated_max_parallel_fetches_for_table, 0) \
/** Limit parallel sends */ \
M(SettingUInt64, replicated_max_parallel_sends, 0) \
M(SettingUInt64, replicated_max_parallel_sends_for_table, 0) \
\
/** If true, Replicated tables replicas on this node will try to acquire leadership. */ \
M(SettingBool, replicated_can_become_leader, true) \
\
M(SettingSeconds, zookeeper_session_expiration_check_period, 60) \
\
/** Check delay of replicas settings. */ \
\
/** Period to check replication delay and compare with other replicas. */ \
M(SettingUInt64, check_delay_period, 60) \
\
/** Period to clean old queue logs, blocks hashes and parts */ \
M(SettingUInt64, cleanup_delay_period, 30) \
\
/** Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. */ \
M(SettingUInt64, min_relative_delay_to_yield_leadership, 120) \
\
/** Minimal delay from other replicas to close, stop serving requests and not return Ok \
* during status check. */ \
M(SettingUInt64, min_relative_delay_to_close, 300) \
\
/** Minimal absolute delay to close, stop serving requests and not return Ok during status check. */ \
M(SettingUInt64, min_absolute_delay_to_close, 0) \
\
/** Enable usage of Vertical merge algorithm. */ \
M(SettingUInt64, enable_vertical_merge_algorithm, 1) \
\
/** Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm */ \
M(SettingUInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * DEFAULT_MERGE_BLOCK_SIZE) \
\
/** Minimal amount of non-PK columns to activate Vertical merge algorithm */ \
M(SettingUInt64, vertical_merge_algorithm_min_columns_to_activate, 11)
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 16;
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
M(index_granularity)
/// When there is less than specified number of free entries in pool (or replicated queue),
/// start to lower maximum size of merge to process (or to put in queue).
/// This is to allow small merges to process - not filling the pool with long running merges.
size_t number_of_free_entries_in_pool_to_lower_max_size_of_merge = 8;
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;
APPLY_FOR_MERGE_TREE_SETTINGS(DECLARE)
/// How many seconds to keep tmp_-directories.
time_t temporary_directories_lifetime = 86400;
#undef DECLARE
/** Inserts settings. */
public:
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
/// If table contains at least that many active parts, artificially slow down insert into table.
size_t parts_to_delay_insert = 150;
/// If more than this number active parts, throw 'Too much parts ...' exception
size_t parts_to_throw_insert = 300;
/// Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.
size_t max_delay_to_insert = 1;
/** Replication settings. */
/// How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).
size_t replicated_deduplication_window = 100;
/// Similar to previous, but determines old blocks by their lifetime.
/// Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one "window".
/// You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.
size_t replicated_deduplication_window_seconds = 7 * 24 * 60 * 60; /// one week
/// Keep about this number of last records in ZooKeeper log, even if they are obsolete.
/// It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.
size_t replicated_logs_to_keep = 100;
/// After specified amount of time passed after replication log entry creation
/// and sum size of parts is greater than threshold,
/// prefer fetching merged part from replica instead of doing merge locally.
/// To speed up very long merges.
time_t prefer_fetch_merged_part_time_threshold = 3600;
size_t prefer_fetch_merged_part_size_threshold = 10ULL * 1024 * 1024 * 1024;
/// Max broken parts, if more - deny automatic deletion.
size_t max_suspicious_broken_parts = 10;
/// Not apply ALTER if number of files for modification(deletion, addition) more than this.
size_t max_files_to_modify_in_alter_columns = 75;
/// Not apply ALTER, if number of files for deletion more than this.
size_t max_files_to_remove_in_alter_columns = 50;
/// If ratio of wrong parts to total number of parts is less than this - allow to start.
double replicated_max_ratio_of_wrong_parts = 0.5;
/// Limit parallel fetches
size_t replicated_max_parallel_fetches = 0;
size_t replicated_max_parallel_fetches_for_table = 0;
/// Limit parallel sends
size_t replicated_max_parallel_sends = 0;
size_t replicated_max_parallel_sends_for_table = 0;
/// If true, Replicated tables replicas on this node will try to acquire leadership.
bool replicated_can_become_leader = true;
/// In seconds.
size_t zookeeper_session_expiration_check_period = 60;
/** Check delay of replicas settings. */
/// Period to check replication delay and compare with other replicas.
size_t check_delay_period = 60;
/// Period to clean old queue logs, blocks hashes and parts
size_t cleanup_delay_period = 30;
/// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited.
size_t min_relative_delay_to_yield_leadership = 120;
/// Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.
size_t min_relative_delay_to_close = 300;
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 1;
/// Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_rows_to_activate = 16 * DEFAULT_MERGE_BLOCK_SIZE;
/// Minimal amount of non-PK columns to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_columns_to_activate = 11;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
#define SET(NAME, GETTER) \
try \
{ \
NAME = config.GETTER(config_elem + "." #NAME, NAME); \
} \
catch (const Poco::Exception & e) \
{ \
throw Exception( \
"Invalid config parameter: " + config_elem + "/" #NAME + ": " + e.message() + ".", \
ErrorCodes::INVALID_CONFIG_PARAMETER); \
}
SET(max_bytes_to_merge_at_max_space_in_pool, getUInt64);
SET(max_bytes_to_merge_at_min_space_in_pool, getUInt64);
SET(max_replicated_merges_in_queue, getUInt64);
SET(number_of_free_entries_in_pool_to_lower_max_size_of_merge, getUInt64);
SET(old_parts_lifetime, getUInt64);
SET(temporary_directories_lifetime, getUInt64);
SET(parts_to_delay_insert, getUInt64);
SET(parts_to_throw_insert, getUInt64);
SET(max_delay_to_insert, getUInt64);
SET(replicated_deduplication_window, getUInt64);
SET(replicated_deduplication_window_seconds, getUInt64);
SET(replicated_logs_to_keep, getUInt64);
SET(prefer_fetch_merged_part_time_threshold, getUInt64);
SET(prefer_fetch_merged_part_size_threshold, getUInt64);
SET(max_suspicious_broken_parts, getUInt64);
SET(max_files_to_modify_in_alter_columns, getUInt64);
SET(max_files_to_remove_in_alter_columns, getUInt64);
SET(replicated_max_ratio_of_wrong_parts, getDouble);
SET(replicated_max_parallel_fetches, getUInt64);
SET(replicated_max_parallel_fetches_for_table, getUInt64);
SET(replicated_max_parallel_sends, getUInt64);
SET(replicated_max_parallel_sends_for_table, getUInt64);
SET(replicated_can_become_leader, getBool);
SET(zookeeper_session_expiration_check_period, getUInt64);
SET(check_delay_period, getUInt64);
SET(cleanup_delay_period, getUInt64);
SET(min_relative_delay_to_yield_leadership, getUInt64);
SET(min_relative_delay_to_close, getUInt64);
SET(min_absolute_delay_to_close, getUInt64);
SET(enable_vertical_merge_algorithm, getUInt64);
SET(vertical_merge_algorithm_min_rows_to_activate, getUInt64);
SET(vertical_merge_algorithm_min_columns_to_activate, getUInt64);
#undef SET
}
/// NOTE: will rewrite the AST to add immutable settings.
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -408,7 +408,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
column_streams.clear();
if (marks_count == 0)
if (rows_count == 0)
{
/// A part is empty - all records are deleted.
Poco::File(part_path).remove(true);
@ -419,6 +419,13 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
{
new_part->partition.store(storage, part_path, checksums);
new_part->minmax_idx.store(storage, part_path, checksums);
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
}
{
@ -433,7 +440,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
checksums.write(out);
}
new_part->size = marks_count;
new_part->rows_count = rows_count;
new_part->marks_count = marks_count;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
new_part->index.swap(index_columns);
@ -441,11 +449,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part->getFullPath());
}
size_t MergedBlockOutputStream::marksCount()
{
return marks_count;
}
void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
@ -525,6 +528,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
rows_count += rows;
{
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),

View File

@ -130,8 +130,8 @@ public:
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
/// How many marks are already written.
size_t marksCount();
/// How many rows are already written.
size_t getRowsCount() const { return rows_count; }
private:
void init();
@ -145,6 +145,7 @@ private:
NamesAndTypesList columns_list;
String part_path;
size_t rows_count = 0;
size_t marks_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream;

View File

@ -85,7 +85,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std::sort(entries.begin(), entries.end());
/// We will not touch the last `replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end());
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
@ -164,7 +164,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold("", RequiredStat(time_threshold));
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window);
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);

View File

@ -50,7 +50,7 @@ void ReplicatedMergeTreeRestartingThread::run()
constexpr auto retry_period_ms = 10 * 1000;
/// The frequency of checking expiration of session in ZK.
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period * 1000;
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)

View File

@ -16,19 +16,18 @@ namespace DB
StoragePtr StorageDictionary::create(
const String & table_name,
Context & context,
ASTPtr & query,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage);
const ASTFunction & engine = *query.storage->engine;
String dictionary_name;
if (function.arguments)
if (engine.arguments)
{
std::stringstream iss;
function.arguments->format(IAST::FormatSettings(iss, false, false));
engine.arguments->format(IAST::FormatSettings(iss, false, false));
dictionary_name = iss.str();
}

View File

@ -24,7 +24,7 @@ class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, pub
public:
static StoragePtr create(const String & table_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,

View File

@ -64,9 +64,9 @@ namespace ErrorCodes
* It can be specified in the tuple: (CounterID, Date),
* or as one column: CounterID.
*/
static ASTPtr extractKeyExpressionList(const ASTPtr & node)
static ASTPtr extractKeyExpressionList(IAST & node)
{
const ASTFunction * expr_func = typeid_cast<const ASTFunction *>(&*node);
const ASTFunction * expr_func = typeid_cast<const ASTFunction *>(&node);
if (expr_func && expr_func->name == "tuple")
{
@ -77,7 +77,7 @@ static ASTPtr extractKeyExpressionList(const ASTPtr & node)
{
/// Primary key consists of one column.
auto res = std::make_shared<ASTExpressionList>();
res->children.push_back(node);
res->children.push_back(node.ptr());
return res;
}
}
@ -245,6 +245,117 @@ static void checkAllTypesAreAllowedInTable(const NamesAndTypesList & names_and_t
}
static String getMergeTreeVerboseHelp(bool is_extended_syntax)
{
String help = R"(
MergeTree is a family of storage engines.
MergeTrees are different in two ways:
- they may be replicated and non-replicated;
- they may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions.
So we have 14 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree
In most of cases, you need MergeTree or ReplicatedMergeTree.
For replicated merge trees, you need to supply a path in ZooKeeper and a replica name as the first two parameters.
Path in ZooKeeper is like '/clickhouse/tables/01/' where /clickhouse/tables/ is a common prefix and 01 is a shard name.
Replica name is like 'mtstat01-1' - it may be the hostname or any suitable string identifying replica.
You may use macro substitutions for these parameters. It's like ReplicatedMergeTree('/clickhouse/tables/{shard}/', '{replica}'...
Look at the <macros> section in server configuration file.
)";
if (!is_extended_syntax)
help += R"(
Next parameter (which is the first for unreplicated tables and the third for replicated tables) is the name of date column.
Date column must exist in the table and have type Date (not DateTime).
It is used for internal data partitioning and works like some kind of index.
If your source data doesn't have a column of type Date, but has a DateTime column, you may add values for Date column while loading,
or you may INSERT your source data to a table of type Log and then transform it with INSERT INTO t SELECT toDate(time) AS date, * FROM ...
If your source data doesn't have any date or time, you may just pass any constant for a date column while loading.
Next parameter is optional sampling expression. Sampling expression is used to implement SAMPLE clause in query for approximate query execution.
If you don't need approximate query execution, simply omit this parameter.
Sample expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID).
Next parameter is the primary key tuple. It's like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions in round brackets. If your primary key has just one element, you may omit round brackets.
Careful choice of the primary key is extremely important for processing short-time queries.
Next parameter is index (primary key) granularity. Good value is 8192. You have no reasons to use any other value.
)";
help += R"(
For the Collapsing mode, the last parameter is the name of a sign column - a special column that is used to 'collapse' rows with the same primary key while merging.
For the Summing mode, the optional last parameter is a list of columns to sum while merging. This list is passed in round brackets, like (PageViews, Cost).
If this parameter is omitted, the storage will sum all numeric columns except columns participating in the primary key.
For the Replacing mode, the optional last parameter is the name of a 'version' column. While merging, for all rows with the same primary key, only one row is selected: the last row, if the version column was not specified, or the last row with the maximum version value, if specified.
)";
if (is_extended_syntax)
help += R"(
You can specify a partitioning expression in the PARTITION BY clause. It is optional but highly recommended.
A common partitioning expression is some function of the event date column e.g. PARTITION BY toYYYYMM(EventDate) will partition the table by month.
Rows with different partition expression values are never merged together. That allows manipulating partitions with ALTER commands.
Also it acts as a kind of index.
Primary key is specified in the ORDER BY clause. It is mandatory for all MergeTree types except UnsortedMergeTree.
It is like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions in round brackets.
If your primary key has just one element, you may omit round brackets.
Careful choice of the primary key is extremely important for processing short-time queries.
Optional sampling expression can be specified in the SAMPLE BY clause. It is used to implement the SAMPLE clause in a SELECT query for approximate query execution.
Sampling expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID).
Engine settings can be specified in the SETTINGS clause. Full list is in the source code in the 'dbms/src/Storages/MergeTree/MergeTreeSettings.h' file.
E.g. you can specify the index (primary key) granularity with SETTINGS index_granularity = 8192.
Examples:
MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity = 8192
MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID)
CollapsingMergeTree(Sign) PARTITION BY StartDate SAMPLE BY intHash32(UserID) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID)
SummingMergeTree PARTITION BY toMonday(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo)
SummingMergeTree((Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) PARTITION BY toYYYYMM(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo)
ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY EventDate ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID)
)";
else
help += R"(
Examples:
MergeTree(EventDate, (CounterID, EventDate), 8192)
MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192)
CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)
SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192)
SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192, (Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth))
ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192)
)";
help += R"(
For further info please read the documentation: https://clickhouse.yandex/
)";
return help;
}
StoragePtr StorageFactory::get(
const String & name,
const String & data_path,
@ -252,7 +363,7 @@ StoragePtr StorageFactory::get(
const String & database_name,
Context & local_context,
Context & context,
ASTPtr & query,
ASTCreateQuery & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
@ -269,8 +380,32 @@ StoragePtr StorageFactory::get(
checkAllTypesAreAllowedInTable(alias_columns);
}
ASTStorage & storage_def = *query.storage;
const ASTFunction & engine_def = *storage_def.engine;
if (engine_def.parameters)
throw Exception(
"Engine definition cannot take the form of a parametric function", ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS);
if ((storage_def.partition_by || storage_def.order_by || storage_def.sample_by || storage_def.settings)
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}
auto check_arguments_empty = [&]
{
if (engine_def.arguments)
throw Exception(
"Engine " + name + " doesn't support any arguments (" + toString(engine_def.arguments->children.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
};
if (name == "Log")
{
check_arguments_empty();
return StorageLog::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults,
@ -278,12 +413,14 @@ StoragePtr StorageFactory::get(
}
else if (name == "View")
{
check_arguments_empty();
return StorageView::create(
table_name, database_name, context, query, columns,
materialized_columns, alias_columns, column_defaults);
}
else if (name == "MaterializedView")
{
check_arguments_empty();
return StorageMaterializedView::create(
table_name, database_name, context, query, columns,
materialized_columns, alias_columns, column_defaults,
@ -297,6 +434,7 @@ StoragePtr StorageFactory::get(
}
else if (name == "TinyLog")
{
check_arguments_empty();
return StorageTinyLog::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults,
@ -304,6 +442,7 @@ StoragePtr StorageFactory::get(
}
else if (name == "StripeLog")
{
check_arguments_empty();
return StorageStripeLog::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults,
@ -311,16 +450,12 @@ StoragePtr StorageFactory::get(
}
else if (name == "File")
{
auto & func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage);
auto & args = typeid_cast<ASTExpressionList &>(*func.arguments).children;
constexpr auto error_msg = "Storage File requires 1 or 2 arguments: name of used format and source.";
if (func.parameters)
throw Exception(error_msg, ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS);
ASTs args = engine_def.arguments->children;
if (args.empty() || args.size() > 2)
throw Exception(error_msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(
"Storage File requires 1 or 2 arguments: name of used format and source.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], local_context);
String format_name = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
@ -365,6 +500,7 @@ StoragePtr StorageFactory::get(
}
else if (name == "Set")
{
check_arguments_empty();
return StorageSet::create(
data_path, table_name, columns,
materialized_columns, alias_columns, column_defaults);
@ -373,17 +509,12 @@ StoragePtr StorageFactory::get(
{
/// Join(ANY, LEFT, k1, k2, ...)
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
constexpr auto params_error_message = "Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...).";
if (args_func.size() != 1)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
const ASTs & args = engine_def.arguments->children;
if (args.size() < 3)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(
"Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const ASTIdentifier * strictness_id = typeid_cast<ASTIdentifier *>(&*args[0]);
if (!strictness_id)
@ -433,6 +564,7 @@ StoragePtr StorageFactory::get(
}
else if (name == "Memory")
{
check_arguments_empty();
return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults);
}
else if (name == "Null")
@ -444,14 +576,8 @@ StoragePtr StorageFactory::get(
/** In query, the name of database is specified as table engine argument which contains source tables,
* as well as regex for source-table names.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage Merge requires exactly 2 parameters"
" - name of source database and regexp for table names.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
ASTs args = engine_def.arguments->children;
if (args.size() != 2)
throw Exception("Storage Merge requires exactly 2 parameters"
@ -482,19 +608,13 @@ StoragePtr StorageFactory::get(
* -- string literal as specific case;
* - empty string means 'use default database from cluster'.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
const auto params_error_message = "Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).";
if (args_func.size() != 1)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
ASTs args = engine_def.arguments->children;
if (args.size() != 3 && args.size() != 4)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name = getClusterName(*args[0]);
@ -537,14 +657,7 @@ StoragePtr StorageFactory::get(
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage Buffer requires 9 parameters: "
" destination database, destination table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
ASTs args = engine_def.arguments->children;
if (args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
@ -585,18 +698,14 @@ StoragePtr StorageFactory::get(
* - Message format (string)
* - Schema (optional, if the format supports it)
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
const auto params_error_message = "Storage Kafka requires 4 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format";
if (args_func.size() != 1)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
ASTs args = engine_def.arguments->children;
if (args.size() != 4 && args.size() != 5)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(
"Storage Kafka requires 4 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String brokers;
auto ast = typeid_cast<ASTLiteral *>(&*args[0]);
@ -647,7 +756,7 @@ StoragePtr StorageFactory::get(
* - the name of the column with the date;
* - (optional) expression for sampling
* (the query with `SAMPLE x` will select rows that have a lower value in this column than `x * UINT32_MAX`);
* - an expression for sorting (either a scalar expression or a tuple from several);
* - an expression for sorting (either a scalar expression or a tuple of several);
* - index_granularity;
* - (for Collapsing) the name of Int8 column that contains `sign` type with the change of "visit" (taking values 1 and -1).
* For example: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
@ -663,72 +772,15 @@ StoragePtr StorageFactory::get(
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element')
* UnsortedMergeTree(date, index_granularity) TODO Add description below.
*
* Alternatively, if experimental_allow_extended_storage_definition_syntax setting is specified,
* you can specify:
* - Partitioning expression in the PARTITION BY clause;
* - Primary key in the ORDER BY clause;
* - Sampling expression in the SAMPLE BY clause;
* - Additional MergeTreeSettings in the SETTINGS clause;
*/
const char * verbose_help = R"(
MergeTree is family of storage engines.
MergeTrees is different in two ways:
- it may be replicated and non-replicated;
- it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions.
So we have 14 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree
In most of cases, you need MergeTree or ReplicatedMergeTree.
For replicated merge trees, you need to supply path in ZooKeeper and replica name as first two parameters.
Path in ZooKeeper is like '/clickhouse/tables/01/' where /clickhouse/tables/ is common prefix and 01 is shard name.
Replica name is like 'mtstat01-1' - it may be hostname or any suitable string identifying replica.
You may use macro substitutions for these parameters. It's like ReplicatedMergeTree('/clickhouse/tables/{shard}/', '{replica}'...
Look at <macros> section in server configuration file.
Next parameter (which is first for unreplicated tables and third for replicated tables) is name of date column.
Date column must exist in table and have type Date (not DateTime).
It is used for internal data partitioning and works like some kind of index.
If your source data doesn't have column of type Date, but have DateTime column, you may add values for Date column while loading,
or you may INSERT your source data to table of type Log and then transform it with INSERT INTO t SELECT toDate(time) AS date, * FROM ...
If your source data doesn't have any date or time, you may just pass any constant for date column while loading.
Next parameter is optional sampling expression. Sampling expression is used to implement SAMPLE clause in query for approximate query execution.
If you don't need approximate query execution, simply omit this parameter.
Sample expression must be one of elements of primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID).
Next parameter is primary key tuple. It's like (CounterID, EventDate, intHash64(UserID)) - list of column names or functional expressions in round brackets. If your primary key have just one element, you may omit round brackets.
Careful choice of primary key is extremely important for processing short-time queries.
Next parameter is index (primary key) granularity. Good value is 8192. You have no reasons to use any other value.
For Collapsing mode, last parameter is name of sign column - special column that is used to 'collapse' rows with same primary key while merge.
For Summing mode, last parameter is optional list of columns to sum while merge. List is passed in round brackets, like (PageViews, Cost).
If this parameter is omitted, storage will sum all numeric columns except columns participated in primary key.
For Replacing mode, last parameter is optional name of 'version' column. While merging, for all rows with same primary key, only one row is selected: last row, if version column was not specified, or last row with maximum version value, if specified.
Examples:
MergeTree(EventDate, (CounterID, EventDate), 8192)
MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192)
CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)
SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192)
SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192, (Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth))
ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192)
For further info please read the documentation: https://clickhouse.yandex/
)";
String name_part = name.substr(0, name.size() - strlen("MergeTree"));
bool replicated = startsWith(name_part, "Replicated");
@ -738,6 +790,9 @@ For further info please read the documentation: https://clickhouse.yandex/
MergeTreeData::MergingParams merging_params;
merging_params.mode = MergeTreeData::MergingParams::Ordinary;
const bool allow_extended_storage_def =
local_context.getSettingsRef().experimental_allow_extended_storage_definition_syntax;
if (name_part == "Collapsing")
merging_params.mode = MergeTreeData::MergingParams::Collapsing;
else if (name_part == "Summing")
@ -751,113 +806,138 @@ For further info please read the documentation: https://clickhouse.yandex/
else if (name_part == "Graphite")
merging_params.mode = MergeTreeData::MergingParams::Graphite;
else if (!name_part.empty())
throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE);
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
throw Exception(
"Unknown storage " + name + getMergeTreeVerboseHelp(allow_extended_storage_def),
ErrorCodes::UNKNOWN_STORAGE);
ASTs args;
if (args_func.size() == 1)
args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (engine_def.arguments)
args = engine_def.arguments->children;
/// NOTE Quite complicated.
size_t num_additional_params = (replicated ? 2 : 0)
+ (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
+ (merging_params.mode == MergeTreeData::MergingParams::Graphite);
String params_for_replicated;
bool is_extended_storage_def =
storage_def.partition_by || storage_def.order_by || storage_def.sample_by || storage_def.settings;
if (is_extended_storage_def && !allow_extended_storage_def)
throw Exception(
"Extended storage definition syntax (PARTITION BY, ORDER BY, SAMPLE BY and SETTINGS clauses) "
"is disabled. Enable it with experimental_allow_extended_storage_definition_syntax user setting");
size_t min_num_params = 0;
size_t max_num_params = 0;
String needed_params;
auto add_mandatory_param = [&](const char * desc)
{
++min_num_params;
++max_num_params;
needed_params += needed_params.empty() ? "\n" : ",\n";
needed_params += desc;
};
auto add_optional_param = [&](const char * desc)
{
++max_num_params;
needed_params += needed_params.empty() ? "\n" : ",\n[";
needed_params += desc;
needed_params += "]";
};
if (replicated)
params_for_replicated =
"\npath in ZooKeeper,"
"\nreplica name,";
if (merging_params.mode == MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 2)
{
String params =
"\nname of column with date,"
"\nindex granularity\n";
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 2) + " parameters: " + params_for_replicated + params + verbose_help,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
add_mandatory_param("path in ZooKeeper");
add_mandatory_param("replica name");
}
if (merging_params.mode != MergeTreeData::MergingParams::Summing
&& merging_params.mode != MergeTreeData::MergingParams::Replacing
&& merging_params.mode != MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4)
if (!is_extended_storage_def)
{
String params =
"\nname of column with date,"
"\n[sampling element of primary key],"
"\nprimary key expression,"
"\nindex granularity\n";
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
params += ", sign column\n";
if (merging_params.mode == MergeTreeData::MergingParams::Graphite)
params += ", 'config_element_for_graphite_schema'\n";
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 3) + " or "
+ toString(num_additional_params + 4) + " parameters: " + params_for_replicated + params + verbose_help,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if ((merging_params.mode == MergeTreeData::MergingParams::Summing
|| merging_params.mode == MergeTreeData::MergingParams::Replacing)
&& args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4
&& args.size() != num_additional_params + 5)
{
String params =
"\nname of column with date,"
"\n[sampling element of primary key],"
"\nprimary key expression,"
"\nindex granularity,";
if (merging_params.mode == MergeTreeData::MergingParams::Summing)
params += "\n[list of columns to sum]\n";
if (merging_params.mode == MergeTreeData::MergingParams::Unsorted)
{
if (args.size() == min_num_params && allow_extended_storage_def)
is_extended_storage_def = true;
else
{
add_mandatory_param("name of column with date");
add_mandatory_param("index granularity");
}
}
else
params += "\n[version]\n";
{
add_mandatory_param("name of column with date");
add_optional_param("sampling element of primary key");
add_mandatory_param("primary key expression");
add_mandatory_param("index granularity");
}
}
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 3) + " or "
+ toString(num_additional_params + 4) + " or "
+ toString(num_additional_params + 5) + " parameters: " + params_for_replicated + params + verbose_help,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
switch (merging_params.mode)
{
default:
break;
case MergeTreeData::MergingParams::Summing:
add_optional_param("list of columns to sum");
break;
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
case MergeTreeData::MergingParams::Graphite:
add_mandatory_param("'config_element_for_graphite_schema'");
break;
}
if (args.size() < min_num_params || args.size() > max_num_params)
{
String msg;
if (is_extended_storage_def)
msg += "With extended storage definition syntax storage " + name + " requires ";
else
msg += "Storage " + name + " requires ";
if (max_num_params)
{
if (min_num_params == max_num_params)
msg += toString(min_num_params) + " parameters: ";
else
msg += toString(min_num_params) + " to " + toString(max_num_params) + " parameters: ";
msg += needed_params;
}
else
msg += "no parameters";
msg += getMergeTreeVerboseHelp(is_extended_storage_def);
throw Exception(msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
/// For Replicated.
String zookeeper_path;
String replica_name;
/// For all.
String date_column_name;
ASTPtr partition_expr_ast;
ASTPtr primary_expr_list;
ASTPtr sampling_expression;
UInt64 index_granularity;
if (replicated)
{
auto ast = typeid_cast<ASTLiteral *>(&*args[0]);
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception(String("Path in ZooKeeper must be a string literal") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
throw Exception(
"Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
ast = typeid_cast<ASTLiteral *>(&*args[1]);
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
throw Exception(String("Replica name must be a string literal") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
throw Exception(
"Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
if (replica_name.empty())
throw Exception(String("No replica name in config") + verbose_help, ErrorCodes::NO_REPLICA_NAME_GIVEN);
throw Exception(
"No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::NO_REPLICA_NAME_GIVEN);
args.erase(args.begin(), args.begin() + 2);
}
@ -867,27 +947,31 @@ For further info please read the documentation: https://clickhouse.yandex/
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
merging_params.sign_column = ast->name;
else
throw Exception(String("Sign column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
args.pop_back();
}
else if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
/// If the last element is not an index granularity (literal), then this is the name of the version column.
if (!typeid_cast<const ASTLiteral *>(&*args.back()))
/// If the last element is not index_granularity or replica_name (a literal), then this is the name of the version column.
if (!args.empty() && !typeid_cast<const ASTLiteral *>(&*args.back()))
{
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
merging_params.version_column = ast->name;
else
throw Exception(String("Version column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
args.pop_back();
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::Summing)
{
/// If the last element is not an index granularity (literal), then this is a list of summable columns.
if (!typeid_cast<const ASTLiteral *>(&*args.back()))
/// If the last element is not index_granularity or replica_name (a literal), then this is a list of summable columns.
if (!args.empty() && !typeid_cast<const ASTLiteral *>(&*args.back()))
{
merging_params.columns_to_sum = extractColumnNames(args.back());
args.pop_back();
@ -897,7 +981,7 @@ For further info please read the documentation: https://clickhouse.yandex/
{
String graphite_config_name;
String error_msg = "Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options";
error_msg += verbose_help;
error_msg += getMergeTreeVerboseHelp(is_extended_storage_def);
if (auto ast = typeid_cast<ASTLiteral *>(&*args.back()))
{
@ -913,47 +997,69 @@ For further info please read the documentation: https://clickhouse.yandex/
setGraphitePatternsFromConfig(context, graphite_config_name, merging_params.graphite_params);
}
/// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity)
if (args.size() == 4)
String date_column_name;
ASTPtr partition_expr_list;
ASTPtr primary_expr_list;
ASTPtr sampling_expression;
MergeTreeSettings storage_settings = context.getMergeTreeSettings();
if (is_extended_storage_def)
{
sampling_expression = args[1];
args.erase(args.begin() + 1);
if (storage_def.partition_by)
partition_expr_list = extractKeyExpressionList(*storage_def.partition_by);
if (storage_def.order_by)
primary_expr_list = extractKeyExpressionList(*storage_def.order_by);
if (storage_def.sample_by)
sampling_expression = storage_def.sample_by->ptr();
storage_settings.loadFromQuery(storage_def);
}
/// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity.
if (auto ast = typeid_cast<ASTIdentifier *>(args[0].get()))
date_column_name = ast->name;
else if (local_context.getSettingsRef().experimental_merge_tree_allow_custom_partitions)
partition_expr_ast = extractKeyExpressionList(args[0]);
else
throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
{
/// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity)
if (args.size() == 4)
{
sampling_expression = args[1];
args.erase(args.begin() + 1);
}
if (merging_params.mode != MergeTreeData::MergingParams::Unsorted)
primary_expr_list = extractKeyExpressionList(args[1]);
/// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity.
auto ast = typeid_cast<ASTLiteral *>(&*args.back());
if (ast && ast->value.getType() == Field::Types::UInt64)
index_granularity = safeGet<UInt64>(ast->value);
else
throw Exception(String("Index granularity must be a positive integer") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
if (auto ast = typeid_cast<ASTIdentifier *>(args[0].get()))
date_column_name = ast->name;
else
throw Exception(
"Date column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
if (merging_params.mode != MergeTreeData::MergingParams::Unsorted)
primary_expr_list = extractKeyExpressionList(*args[1]);
auto ast = typeid_cast<ASTLiteral *>(&*args.back());
if (ast && ast->value.getType() == Field::Types::UInt64)
storage_settings.index_granularity = safeGet<UInt64>(ast->value);
else
throw Exception(
"Index granularity must be a positive integer" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
}
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, attach, data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name, partition_expr_ast,
sampling_expression, index_granularity, merging_params,
has_force_restore_data_flag,
context.getMergeTreeSettings());
context, primary_expr_list, date_column_name, partition_expr_list,
sampling_expression, merging_params, storage_settings,
has_force_restore_data_flag);
else
return StorageMergeTree::create(
data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults, attach,
context, primary_expr_list, date_column_name, partition_expr_ast,
sampling_expression, index_granularity, merging_params,
has_force_restore_data_flag,
context.getMergeTreeSettings());
context, primary_expr_list, date_column_name, partition_expr_list,
sampling_expression, merging_params, storage_settings,
has_force_restore_data_flag);
}
else
throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE);

View File

@ -24,7 +24,7 @@ public:
const String & database_name,
Context & local_context,
Context & context,
ASTPtr & query,
ASTCreateQuery & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,

View File

@ -57,7 +57,7 @@ StorageMaterializedView::StorageMaterializedView(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
@ -66,20 +66,13 @@ StorageMaterializedView::StorageMaterializedView(
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
if (!create.select)
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
if (!create.inner_storage)
if (!query.inner_storage)
throw Exception("ENGINE of MaterializedView should be specified explicitly", ErrorCodes::INCORRECT_QUERY);
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*create.select);
/// If the internal query does not specify a database, retrieve it from the context and write it to the query.
select.setDatabaseIfNeeded(database_name);
extractDependentTable(select, select_database_name, select_table_name);
extractDependentTable(*query.select, select_database_name, select_table_name);
if (!select_table_name.empty())
context.getGlobalContext().addDependency(
@ -87,7 +80,7 @@ StorageMaterializedView::StorageMaterializedView(
DatabaseAndTableName(database_name, table_name));
String inner_table_name = getInnerTableName();
inner_query = create.select;
inner_query = query.select->ptr();
/// If there is an ATTACH request, then the internal table must already be connected.
if (!attach_)
@ -96,10 +89,8 @@ StorageMaterializedView::StorageMaterializedView(
auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = database_name;
manual_create_query->table = inner_table_name;
manual_create_query->columns = create.columns;
manual_create_query->children.push_back(manual_create_query->columns);
manual_create_query->storage = create.inner_storage;
manual_create_query->children.push_back(manual_create_query->storage);
manual_create_query->set(manual_create_query->columns, query.columns->ptr());
manual_create_query->set(manual_create_query->storage, query.inner_storage->ptr());
/// Execute the query.
try
@ -176,5 +167,4 @@ StoragePtr StorageMaterializedView::getInnerTable() const
return context.getTable(database_name, getInnerTableName());
}
}

View File

@ -58,7 +58,7 @@ private:
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,

View File

@ -44,10 +44,9 @@ StorageMergeTree::StorageMergeTree(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()),
@ -55,7 +54,7 @@ StorageMergeTree::StorageMergeTree(
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_, merging_params_,
sampling_expression_, merging_params_,
settings_, database_name_ + "." + table_name, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
@ -198,9 +197,9 @@ void StorageMergeTree::alter(
auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__);
IDatabase::ASTModifier engine_modifier;
IDatabase::ASTModifier storage_modifier;
if (primary_key_is_modified)
engine_modifier = [&new_primary_key_ast] (ASTPtr & engine_ast)
storage_modifier = [&new_primary_key_ast] (IAST & ast)
{
auto tuple = std::make_shared<ASTFunction>(new_primary_key_ast->range);
tuple->name = "tuple";
@ -209,13 +208,14 @@ void StorageMergeTree::alter(
/// Primary key is in the second place in table engine description and can be represented as a tuple.
/// TODO: Not always in second place. If there is a sampling key, then the third one. Fix it.
typeid_cast<ASTExpressionList &>(*typeid_cast<ASTFunction &>(*engine_ast).arguments).children.at(1) = tuple;
auto & storage_ast = typeid_cast<ASTStorage &>(ast);
typeid_cast<ASTExpressionList &>(*storage_ast.engine->arguments).children.at(1) = tuple;
};
context.getDatabase(database_name)->alterTable(
context, table_name,
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults,
engine_modifier);
storage_modifier);
materialized_columns = new_materialized_columns;
alias_columns = new_alias_columns;

View File

@ -119,7 +119,6 @@ private:
* primary_expr_ast - expression for sorting;
* date_column_name - if not empty, the name of the column with the date used for partitioning by month;
otherwise, partition_expr_ast is used as the partitioning expression;
* index_granularity - fow how many rows one index value is written.
*/
StorageMergeTree(
const String & path_,
@ -135,10 +134,9 @@ private:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
/** Determines what parts should be merged and merges it.
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).

View File

@ -188,10 +188,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
@ -201,7 +200,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_, merging_params_,
sampling_expression_, merging_params_,
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }),
@ -311,18 +310,17 @@ StoragePtr StorageReplicatedMergeTree::create(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag_,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag_)
{
auto res = make_shared(
zookeeper_path_, replica_name_, attach,
path_, database_name_, name_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_,
merging_params_, has_force_restore_data_flag_, settings_);
sampling_expression_, merging_params_, settings_,
has_force_restore_data_flag_);
StoragePtr res_ptr = res;
auto get_endpoint_holder = [&res](InterserverIOEndpointPtr endpoint)
@ -1096,7 +1094,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
}
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold <= time(nullptr))
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
@ -1239,7 +1237,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
static std::atomic_uint total_fetches {0};
if (data.settings.replicated_max_parallel_fetches && total_fetches >= data.settings.replicated_max_parallel_fetches)
{
throw Exception("Too much total fetches from replicas, maximum: " + toString(data.settings.replicated_max_parallel_fetches),
throw Exception("Too much total fetches from replicas, maximum: " + data.settings.replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MUCH_FETCHES);
}
@ -1248,7 +1246,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (data.settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= data.settings.replicated_max_parallel_fetches_for_table)
{
throw Exception("Too much fetches from replicas for table, maximum: " + toString(data.settings.replicated_max_parallel_fetches_for_table),
throw Exception("Too much fetches from replicas for table, maximum: " + data.settings.replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MUCH_FETCHES);
}

View File

@ -91,10 +91,9 @@ public:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
void startup() override;
void shutdown() override;
@ -339,10 +338,9 @@ private:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
/// Initialization.

View File

@ -12,7 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
@ -20,7 +20,7 @@ StorageView::StorageView(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
@ -28,13 +28,10 @@ StorageView::StorageView(
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*create.select);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
/// If the internal query does not specify a database, retrieve it from the context and write it to the query.
select.setDatabaseIfNeeded(database_name);
inner_query = create.select;
inner_query = query.select->ptr();
}

View File

@ -47,7 +47,7 @@ private:
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,

View File

@ -209,7 +209,7 @@ BlockInputStreams StorageSystemParts::read(
}
block.getByPosition(i++).column->insert(part->name);
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part)));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
size_t marks_size = 0;
for (const NameAndTypePair & it : part->columns)
@ -221,7 +221,7 @@ BlockInputStreams StorageSystemParts::read(
}
block.getByPosition(i++).column->insert(static_cast<UInt64>(marks_size));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getExactSizeRows()));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->rows_count));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size_in_bytes));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->modification_time));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));

View File

@ -48,3 +48,15 @@ Sum before DROP PARTITION:
15
Sum after DROP PARTITION:
8
*** Table without columns with fixed size ***
Parts:
1 1_1_1_0 2
2 2_2_2_0 2
Before DROP PARTITION:
a
aa
b
cc
After DROP PARTITION:
aa
cc

View File

@ -1,12 +1,9 @@
-- IMPORTANT: Don't use this setting just yet.
-- It is for testing purposes, the syntax will likely change soon and the server will not be able
-- to load the tables created this way. You have been warned.
SET experimental_merge_tree_allow_custom_partitions = 1;
SET experimental_allow_extended_storage_definition_syntax = 1;
SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS test.not_partitioned;
CREATE TABLE test.not_partitioned(x UInt8) ENGINE = MergeTree(tuple(), x, 8192);
CREATE TABLE test.not_partitioned(x UInt8) ENGINE MergeTree ORDER BY x;
INSERT INTO test.not_partitioned VALUES (1), (2), (3);
INSERT INTO test.not_partitioned VALUES (4), (5);
@ -28,7 +25,7 @@ DROP TABLE test.not_partitioned;
SELECT '*** Partitioned by week ***';
DROP TABLE IF EXISTS test.partitioned_by_week;
CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree(toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree PARTITION BY toMonday(d) ORDER BY x;
-- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02
INSERT INTO test.partitioned_by_week VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3);
@ -51,7 +48,7 @@ DROP TABLE test.partitioned_by_week;
SELECT '*** Partitioned by a (Date, UInt8) tuple ***';
DROP TABLE IF EXISTS test.partitioned_by_tuple;
CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE = MergeTree((d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE MergeTree ORDER BY x PARTITION BY (d, x);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5);
@ -74,7 +71,7 @@ DROP TABLE test.partitioned_by_tuple;
SELECT '*** Partitioned by String ***';
DROP TABLE IF EXISTS test.partitioned_by_string;
CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree(tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree PARTITION BY s ORDER BY x;
INSERT INTO test.partitioned_by_string VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3);
INSERT INTO test.partitioned_by_string VALUES ('bbb', 4), ('aaa', 5);
@ -92,3 +89,21 @@ SELECT 'Sum after DROP PARTITION:';
SELECT sum(x) FROM test.partitioned_by_string;
DROP TABLE test.partitioned_by_string;
SELECT '*** Table without columns with fixed size ***';
DROP TABLE IF EXISTS test.without_fixed_size_columns;
CREATE TABLE test.without_fixed_size_columns(s String) ENGINE MergeTree PARTITION BY length(s) ORDER BY s;
INSERT INTO test.without_fixed_size_columns VALUES ('a'), ('aa'), ('b'), ('cc');
SELECT 'Parts:';
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns' AND active ORDER BY name;
SELECT 'Before DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
ALTER TABLE test.without_fixed_size_columns DROP PARTITION 1;
SELECT 'After DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
DROP TABLE test.without_fixed_size_columns;

View File

@ -48,3 +48,15 @@ Sum before DROP PARTITION:
15
Sum after DROP PARTITION:
8
*** Table without columns with fixed size ***
Parts:
1 1_0_0_1 2
2 2_0_0_0 2
Before DROP PARTITION:
a
aa
b
cc
After DROP PARTITION:
aa
cc

View File

@ -1,7 +1,4 @@
-- IMPORTANT: Don't use this setting just yet.
-- It is for testing purposes, the syntax will likely change soon and the server will not be able
-- to load the tables created this way. You have been warned.
SET experimental_merge_tree_allow_custom_partitions = 1;
SET experimental_allow_extended_storage_definition_syntax = 1;
SET replication_alter_partitions_sync = 2;
@ -9,8 +6,8 @@ SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS test.not_partitioned_replica1;
DROP TABLE IF EXISTS test.not_partitioned_replica2;
CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1', tuple(), x, 8192);
CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2', tuple(), x, 8192);
CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1') ORDER BY x;
CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2') ORDER BY x;
INSERT INTO test.not_partitioned_replica1 VALUES (1), (2), (3);
INSERT INTO test.not_partitioned_replica1 VALUES (4), (5);
@ -34,8 +31,8 @@ SELECT '*** Partitioned by week ***';
DROP TABLE IF EXISTS test.partitioned_by_week_replica1;
DROP TABLE IF EXISTS test.partitioned_by_week_replica2;
CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1', toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2', toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1') PARTITION BY toMonday(d) ORDER BY x;
CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2') PARTITION BY toMonday(d) ORDER BY x;
-- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02
INSERT INTO test.partitioned_by_week_replica1 VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3);
@ -60,8 +57,8 @@ SELECT '*** Partitioned by a (Date, UInt8) tuple ***';
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica1;
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica2;
CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1', (d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2', (d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1') ORDER BY x PARTITION BY (d, x);
CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2') ORDER BY x PARTITION BY (d, x);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5);
@ -86,8 +83,8 @@ SELECT '*** Partitioned by String ***';
DROP TABLE IF EXISTS test.partitioned_by_string_replica1;
DROP TABLE IF EXISTS test.partitioned_by_string_replica2;
CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1', tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2', tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1') PARTITION BY s ORDER BY x;
CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2') PARTITION BY s ORDER BY x;
INSERT INTO test.partitioned_by_string_replica1 VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3);
INSERT INTO test.partitioned_by_string_replica1 VALUES ('bbb', 4), ('aaa', 5);
@ -106,3 +103,26 @@ SELECT sum(x) FROM test.partitioned_by_string_replica2;
DROP TABLE test.partitioned_by_string_replica1;
DROP TABLE test.partitioned_by_string_replica2;
SELECT '*** Table without columns with fixed size ***';
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica1;
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica2;
CREATE TABLE test.without_fixed_size_columns_replica1(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '1') PARTITION BY length(s) ORDER BY s;
CREATE TABLE test.without_fixed_size_columns_replica2(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '2') PARTITION BY length(s) ORDER BY s;
INSERT INTO test.without_fixed_size_columns_replica1 VALUES ('a'), ('aa'), ('b'), ('cc');
OPTIMIZE TABLE test.without_fixed_size_columns_replica2 PARTITION 1 FINAL; -- Wait for replication.
SELECT 'Parts:';
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns_replica2' AND active ORDER BY name;
SELECT 'Before DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
ALTER TABLE test.without_fixed_size_columns_replica1 DROP PARTITION 1;
SELECT 'After DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
DROP TABLE test.without_fixed_size_columns_replica1;
DROP TABLE test.without_fixed_size_columns_replica2;

View File

@ -0,0 +1,12 @@
*** Without PARTITION BY and ORDER BY ***
1
2
*** Replicated with sampling ***
1
*** Replacing with implicit version ***
2017-10-23 1 c
*** Replicated Collapsing ***
2017-10-23 2 1
*** Table definition with SETTINGS ***
0
0

View File

@ -0,0 +1,73 @@
SET experimental_allow_extended_storage_definition_syntax = 1;
SELECT '*** Without PARTITION BY and ORDER BY ***';
DROP TABLE IF EXISTS test.unsorted;
CREATE TABLE test.unsorted(x UInt32) ENGINE UnsortedMergeTree;
INSERT INTO test.unsorted VALUES (1), (2);
SELECT * FROM test.unsorted;
DROP TABLE test.unsorted;
SELECT '*** Replicated with sampling ***';
DROP TABLE IF EXISTS test.replicated_with_sampling;
CREATE TABLE test.replicated_with_sampling(x UInt8)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/replicated_with_sampling', 'r1')
ORDER BY x
SAMPLE BY x;
INSERT INTO test.replicated_with_sampling VALUES (1), (128);
SELECT sum(x) FROM test.replicated_with_sampling SAMPLE 1/2;
DROP TABLE test.replicated_with_sampling;
SELECT '*** Replacing with implicit version ***';
DROP TABLE IF EXISTS test.replacing;
CREATE TABLE test.replacing(d Date, x UInt32, s String) ENGINE = ReplacingMergeTree ORDER BY x PARTITION BY d;
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'a');
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'b');
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'c');
OPTIMIZE TABLE test.replacing PARTITION '2017-10-23' FINAL;
SELECT * FROM test.replacing;
DROP TABLE test.replacing;
SELECT '*** Replicated Collapsing ***';
DROP TABLE IF EXISTS test.replicated_collapsing;
CREATE TABLE test.replicated_collapsing(d Date, x UInt32, sign Int8)
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/test/replicated_collapsing', 'r1', sign)
PARTITION BY toYYYYMM(d) ORDER BY d;
INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, 1);
INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, -1), ('2017-10-23', 2, 1);
OPTIMIZE TABLE test.replicated_collapsing PARTITION 201710 FINAL;
SELECT * FROM test.replicated_collapsing;
DROP TABLE test.replicated_collapsing;
SELECT '*** Table definition with SETTINGS ***';
DROP TABLE IF EXISTS test.with_settings;
CREATE TABLE test.with_settings(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/with_settings', 'r1')
ORDER BY x
SETTINGS replicated_can_become_leader = 0;
SELECT sleep(1); -- If replicated_can_become_leader were true, this replica would become the leader after 1 second.
SELECT is_leader FROM system.replicas WHERE database = 'test' AND table = 'with_settings';
DROP TABLE test.with_settings;