diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 67b4a4e986a..f2ef36b6981 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -450,7 +450,7 @@ void DatabaseOrdinary::alterTable( const NamesAndTypesList & materialized_columns, const NamesAndTypesList & alias_columns, const ColumnDefaults & column_defaults, - const ASTModifier & engine_modifier) + const ASTModifier & storage_modifier) { /// Read the definition of the table and replace the necessary parts with new ones. @@ -471,14 +471,10 @@ void DatabaseOrdinary::alterTable( ASTCreateQuery & ast_create_query = typeid_cast(*ast); ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns, materialized_columns, alias_columns, column_defaults); - auto it = std::find(ast_create_query.children.begin(), ast_create_query.children.end(), ast_create_query.columns); - if (it == ast_create_query.children.end()) - throw Exception("Logical error: cannot find columns child in ASTCreateQuery", ErrorCodes::LOGICAL_ERROR); - *it = new_columns; - ast_create_query.columns = new_columns; + ast_create_query.replace(ast_create_query.columns, new_columns); - if (engine_modifier) - engine_modifier(ast_create_query.storage); + if (storage_modifier) + storage_modifier(*ast_create_query.storage); statement = getTableDefinitionFromCreateQuery(ast); diff --git a/dbms/src/Databases/DatabasesCommon.cpp b/dbms/src/Databases/DatabasesCommon.cpp index 6e5945554db..849d8abc2c5 100644 --- a/dbms/src/Databases/DatabasesCommon.cpp +++ b/dbms/src/Databases/DatabasesCommon.cpp @@ -27,7 +27,7 @@ String getTableDefinitionFromCreateQuery(const ASTPtr & query) create.if_not_exists = false; create.is_populate = false; - String engine = typeid_cast(*create.storage).name; + String engine = create.storage->engine->name; /// For engine VIEW it is necessary to save the SELECT query itself, for the rest - on the contrary if (engine != "View" && engine != "MaterializedView") @@ -59,7 +59,7 @@ std::pair createTableFromDefinition( /// - the database has not been created yet; /// - the code is simpler, since the query is already brought to a suitable form. - InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(ast_create_query.columns, context); + InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(*ast_create_query.columns, context); String storage_name; @@ -68,14 +68,14 @@ std::pair createTableFromDefinition( else if (ast_create_query.is_materialized_view) storage_name = "MaterializedView"; else - storage_name = typeid_cast(*ast_create_query.storage).name; + storage_name = ast_create_query.storage->engine->name; return { ast_create_query.table, StorageFactory::instance().get( storage_name, database_data_path, ast_create_query.table, database_name, context, - context.getGlobalContext(), ast, columns_info.columns, + context.getGlobalContext(), ast_create_query, columns_info.columns, columns_info.materialized_columns, columns_info.alias_columns, columns_info.column_defaults, true, has_force_restore_data_flag) }; diff --git a/dbms/src/Databases/IDatabase.h b/dbms/src/Databases/IDatabase.h index 33f0b637224..a73f58bc29f 100644 --- a/dbms/src/Databases/IDatabase.h +++ b/dbms/src/Databases/IDatabase.h @@ -107,7 +107,7 @@ public: IDatabase & to_database, const String & to_name) = 0; - using ASTModifier = std::function; + using ASTModifier = std::function; /// Change the table structure in metadata. /// You must call under the TableStructureLock of the corresponding table . If engine_modifier is empty, then engine does not change. diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 6d570ad09d4..2233afebaf3 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -79,23 +79,26 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (!create.storage) { database_engine_name = "Ordinary"; /// Default database engine. - auto func = std::make_shared(); - func->name = database_engine_name; - create.storage = func; + auto engine = std::make_shared(); + engine->name = database_engine_name; + auto storage = std::make_shared(); + storage->set(storage->engine, engine); + create.set(create.storage, storage); } else { - const ASTFunction & engine_id = typeid_cast(*create.storage); - + const ASTStorage & storage = *create.storage; + const ASTFunction & engine = *storage.engine; /// Currently, there are no database engines, that support any arguments. - if (engine_id.arguments || engine_id.parameters) + if (engine.arguments || engine.parameters + || storage.partition_by || storage.order_by || storage.sample_by || storage.settings) { std::stringstream ostr; - formatAST(*create.storage, ostr, 0, false, false); + formatAST(storage, ostr, 0, false, false); throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE); } - database_engine_name = engine_id.name; + database_engine_name = engine.name; } String database_name_escaped = escapeForFileName(database_name); @@ -160,11 +163,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) using ColumnsAndDefaults = std::pair; /// AST to the list of columns with types. Columns of Nested type are expanded into a list of real columns. -static ColumnsAndDefaults parseColumns( - ASTPtr expression_list, const Context & context) +static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast, const Context & context) { - auto & column_list_ast = typeid_cast(*expression_list); - /// list of table columns in correct order NamesAndTypesList columns{}; ColumnDefaults defaults{}; @@ -177,7 +177,7 @@ static ColumnsAndDefaults parseColumns( ASTPtr default_expr_list = std::make_shared(); default_expr_list->children.reserve(column_list_ast.children.size()); - for (auto & ast : column_list_ast.children) + for (const auto & ast : column_list_ast.children) { auto & col_decl = typeid_cast(*ast); @@ -346,7 +346,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(NamesAndTypesList columns, InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::getColumnsInfo( - const ASTPtr & columns, const Context & context) + const ASTExpressionList & columns, const Context & context) { ColumnsInfo res; @@ -370,7 +370,7 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns( if (create.columns) { - res = getColumnsInfo(create.columns, context); + res = getColumnsInfo(*create.columns, context); } else if (!create.as_table.empty()) { @@ -391,16 +391,9 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns( /// Even if query has list of columns, canonicalize it (unfold Nested columns). ASTPtr new_columns = formatColumns(*res.columns, res.materialized_columns, res.alias_columns, res.column_defaults); if (create.columns) - { - auto it = std::find(create.children.begin(), create.children.end(), create.columns); - if (it != create.children.end()) - *it = new_columns; - else - create.children.push_back(new_columns); - } + create.replace(create.columns, new_columns); else - create.children.push_back(new_columns); - create.columns = new_columns; + create.set(create.columns, new_columns); /// Check for duplicates std::set all_columns; @@ -421,22 +414,23 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns( } -String InterpreterCreateQuery::setEngine( - ASTCreateQuery & create, const StoragePtr & as_storage) const +String InterpreterCreateQuery::setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const { String storage_name; auto set_engine = [&](const char * engine) { storage_name = engine; - auto func = std::make_shared(); - func->name = engine; - create.storage = func; + auto engine_ast = std::make_shared(); + engine_ast->name = engine; + auto storage_ast = std::make_shared(); + storage_ast->set(storage_ast->engine, engine_ast); + create.set(create.storage, storage_ast); }; if (create.storage) { - storage_name = typeid_cast(*create.storage).name; + storage_name = create.storage->engine->name; } else if (!create.as_table.empty()) { @@ -445,17 +439,17 @@ String InterpreterCreateQuery::setEngine( String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database; String as_table_name = create.as_table; - auto as_create_ptr = context.getCreateQuery(as_database_name, as_table_name); - auto & as_create = typeid_cast(*as_create_ptr); + ASTPtr as_create_ptr = context.getCreateQuery(as_database_name, as_table_name); + const auto & as_create = typeid_cast(*as_create_ptr); if (!create.storage) { if (as_create.is_view || as_create.is_materialized_view) - create.storage = as_create.inner_storage; + create.set(create.storage, as_create.inner_storage->ptr()); else - create.storage = as_create.storage; + create.set(create.storage, as_create.storage->ptr()); - storage_name = typeid_cast(*create.storage).name; + storage_name = create.storage->engine->name; } else storage_name = as_storage->getName(); @@ -494,7 +488,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) /// For `view` type tables, you may need `sample_block` to get the columns. if (create.select && (!create.attach || (!create.columns && (create.is_view || create.is_materialized_view)))) { - interpreter_select = std::make_unique(create.select, context); + create.select->setDatabaseIfNeeded(database_name); + interpreter_select = std::make_unique(create.select->ptr(), context); as_select_sample = interpreter_select->getSampleBlock(); } @@ -543,7 +538,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) res = StorageFactory::instance().get( storage_name, data_path, table_name, database_name, context, - context.getGlobalContext(), query_ptr, columns.columns, + context.getGlobalContext(), create, columns.columns, columns.materialized_columns, columns.alias_columns, columns.column_defaults, create.attach, false); if (create.is_temporary) diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.h b/dbms/src/Interpreters/InterpreterCreateQuery.h index d35b69e3f26..82749f110eb 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.h +++ b/dbms/src/Interpreters/InterpreterCreateQuery.h @@ -11,6 +11,7 @@ namespace DB class Context; class ASTCreateQuery; +class ASTExpressionList; class IStorage; using StoragePtr = std::shared_ptr; @@ -52,7 +53,7 @@ public: }; /// Obtain information about columns, their types and default values, for case when columns in CREATE query is specified explicitly. - static ColumnsInfo getColumnsInfo(const ASTPtr & columns, const Context & context); + static ColumnsInfo getColumnsInfo(const ASTExpressionList & columns, const Context & context); private: BlockIO createDatabase(ASTCreateQuery & create); diff --git a/dbms/src/Interpreters/InterpreterSetQuery.h b/dbms/src/Interpreters/InterpreterSetQuery.h index 68ba1fe9815..7dbd444f6f1 100644 --- a/dbms/src/Interpreters/InterpreterSetQuery.h +++ b/dbms/src/Interpreters/InterpreterSetQuery.h @@ -12,7 +12,7 @@ class ASTSetQuery; using ASTPtr = std::shared_ptr; -/** Change one or several settings, for session or globally, or just for current context. +/** Change one or several settings for the session or just for the current context. */ class InterpreterSetQuery : public IInterpreter { @@ -20,7 +20,7 @@ public: InterpreterSetQuery(const ASTPtr & query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) {} - /** Usual SET query. Set setting for session or globally (if GLOBAL was specified). + /** Usual SET query. Set setting for the session. */ BlockIO execute() override; @@ -34,8 +34,6 @@ public: private: ASTPtr query_ptr; Context & context; - - void executeImpl(const ASTSetQuery & ast, Context & target); }; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index a0170eff295..e50fd5f984c 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -298,13 +298,11 @@ struct Settings /* Timeout for DDL query responses from all hosts in cluster. Negative value means infinite. */ \ M(SettingInt64, distributed_ddl_task_timeout, 120) \ \ - /** If true, and the date parameter of MergeTree engines is an expression (not a column name), \ - * it will be interpreted as the partitioning expression, allowing custom partitions. \ - * IMPORTANT: Don't use this setting just yet. \ - * It is for testing purposes, the syntax will likely change soon and the server will not be able \ - * to load the tables created this way. You have been warned. \ + /** If true, allow parameters of storage engines such as partitioning expression, primary key, etc. \ + * to be set not in the engine parameters but as separate clauses (PARTITION BY, ORDER BY...) \ + * Enable this setting to allow custom MergeTree partitions. \ */ \ - M(SettingBool, experimental_merge_tree_allow_custom_partitions, false) \ + M(SettingBool, experimental_allow_extended_storage_definition_syntax, false) \ /* Timeout for flushing data from streaming storages. */ \ M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS) \ /* Schema identifier (used by schema-based formats) */ \ diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index 1df54449136..6933dad1c1b 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include @@ -86,7 +86,7 @@ public: Context & context_, const String & database_name_, const String & table_name_, - const String & engine_, + const String & storage_def_, size_t flush_interval_milliseconds_); ~SystemLog(); @@ -105,8 +105,8 @@ private: Context & context; const String database_name; const String table_name; + const String storage_def; StoragePtr table; - const String engine; const size_t flush_interval_milliseconds; using QueueItem = std::pair; /// First element is shutdown flag for thread. @@ -142,10 +142,10 @@ template SystemLog::SystemLog(Context & context_, const String & database_name_, const String & table_name_, - const String & engine_, + const String & storage_def_, size_t flush_interval_milliseconds_) : context(context_), - database_name(database_name_), table_name(table_name_), engine(engine_), + database_name(database_name_), table_name(table_name_), storage_def(storage_def_), flush_interval_milliseconds(flush_interval_milliseconds_) { log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")"); @@ -328,11 +328,13 @@ void SystemLog::prepareTable() create->table = table_name; Block sample = LogElement::createBlock(); - create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList()); + create->set(create->columns, InterpreterCreateQuery::formatColumns(sample.getColumnsList())); - ParserFunction engine_parser; - - create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name()); + ParserStorage storage_parser; + ASTPtr storage_ast = parseQuery( + storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), + "Storage to create table for " + LogElement::name()); + create->set(create->storage, storage_ast); InterpreterCreateQuery(create, context).execute(); diff --git a/dbms/src/Interpreters/evaluateConstantExpression.cpp b/dbms/src/Interpreters/evaluateConstantExpression.cpp index 82fda244573..6c54b1eb6fb 100644 --- a/dbms/src/Interpreters/evaluateConstantExpression.cpp +++ b/dbms/src/Interpreters/evaluateConstantExpression.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -23,7 +22,7 @@ namespace ErrorCodes } -std::pair> evaluateConstantExpression(std::shared_ptr & node, const Context & context) +std::pair> evaluateConstantExpression(const ASTPtr & node, const Context & context) { ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer( node, context, nullptr, NamesAndTypesList{{ "_dummy", std::make_shared() }}).getConstActions(); @@ -51,7 +50,7 @@ std::pair> evaluateConstantExpression(std::sha } -ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & context) +ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context) { if (typeid_cast(node.get())) return node; @@ -61,7 +60,7 @@ ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & contex } -ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(ASTPtr & node, const Context & context) +ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context) { if (auto id = typeid_cast(node.get())) return std::make_shared(node->range, Field(id->name)); diff --git a/dbms/src/Interpreters/evaluateConstantExpression.h b/dbms/src/Interpreters/evaluateConstantExpression.h index f5833a1af18..ba8c7e23d6e 100644 --- a/dbms/src/Interpreters/evaluateConstantExpression.h +++ b/dbms/src/Interpreters/evaluateConstantExpression.h @@ -2,13 +2,13 @@ #include #include +#include #include namespace DB { -class IAST; class Context; class IDataType; @@ -17,13 +17,13 @@ class IDataType; * Used in rare cases - for elements of set for IN, for data to INSERT. * Quite suboptimal. */ -std::pair> evaluateConstantExpression(std::shared_ptr & node, const Context & context); +std::pair> evaluateConstantExpression(const ASTPtr & node, const Context & context); /** Evaluate constant expression * and returns ASTLiteral with its value. */ -std::shared_ptr evaluateConstantExpressionAsLiteral(std::shared_ptr & node, const Context & context); +ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context); /** Evaluate constant expression @@ -31,7 +31,7 @@ std::shared_ptr evaluateConstantExpressionAsLiteral(std::shared_ptr * Also, if AST is identifier, then return string literal with its name. * Useful in places where some name may be specified as identifier, or as result of a constant expression. */ -std::shared_ptr evaluateConstantExpressionOrIdentifierAsLiteral(std::shared_ptr & node, const Context & context); +ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context); /** Parses a name of an object which could be written in 3 forms: * name, `name` or 'name' */ diff --git a/dbms/src/Parsers/ASTCreateQuery.h b/dbms/src/Parsers/ASTCreateQuery.h index 209c2fb9b0f..150c340f058 100644 --- a/dbms/src/Parsers/ASTCreateQuery.h +++ b/dbms/src/Parsers/ASTCreateQuery.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -9,6 +11,69 @@ namespace DB { +class ASTStorage : public IAST +{ +public: + ASTFunction * engine = nullptr; + IAST * partition_by = nullptr; + IAST * order_by = nullptr; + IAST * sample_by = nullptr; + ASTSetQuery * settings = nullptr; + + ASTStorage() = default; + ASTStorage(StringRange range_) : IAST(range_) {} + String getID() const override { return "Storage definition"; } + + ASTPtr clone() const override + { + auto res = std::make_shared(*this); + res->children.clear(); + + if (engine) + res->set(res->engine, engine->clone()); + if (partition_by) + res->set(res->partition_by, partition_by->clone()); + if (order_by) + res->set(res->order_by, order_by->clone()); + if (sample_by) + res->set(res->sample_by, sample_by->clone()); + if (settings) + res->set(res->settings, settings->clone()); + + return res; + } + + void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override + { + if (engine) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ENGINE" << (s.hilite ? hilite_none : "") << " = "; + engine->formatImpl(s, state, frame); + } + if (partition_by) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "PARTITION BY " << (s.hilite ? hilite_none : ""); + partition_by->formatImpl(s, state, frame); + } + if (order_by) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ORDER BY " << (s.hilite ? hilite_none : ""); + order_by->formatImpl(s, state, frame); + } + if (sample_by) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SAMPLE BY " << (s.hilite ? hilite_none : ""); + sample_by->formatImpl(s, state, frame); + } + if (settings) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SETTINGS " << (s.hilite ? hilite_none : ""); + settings->formatImpl(s, state, frame); + } + + } +}; + /// CREATE TABLE or ATTACH TABLE query class ASTCreateQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster @@ -22,12 +87,12 @@ public: bool is_temporary{false}; String database; String table; - ASTPtr columns; - ASTPtr storage; - ASTPtr inner_storage; /// Internal engine for the CREATE MATERIALIZED VIEW query + ASTExpressionList * columns = nullptr; + ASTStorage * storage = nullptr; + ASTStorage * inner_storage = nullptr; /// Internal engine for the CREATE MATERIALIZED VIEW query String as_database; String as_table; - ASTPtr select; + ASTSelectQuery * select = nullptr; ASTCreateQuery() = default; ASTCreateQuery(const StringRange range_) : ASTQueryWithOutput(range_) {} @@ -40,10 +105,14 @@ public: auto res = std::make_shared(*this); res->children.clear(); - if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } - if (storage) { res->storage = storage->clone(); res->children.push_back(res->storage); } - if (select) { res->select = select->clone(); res->children.push_back(res->select); } - if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); } + if (columns) + res->set(res->columns, columns->clone()); + if (storage) + res->set(res->storage, storage->clone()); + if (select) + res->set(res->select, select->clone()); + if (inner_storage) + res->set(res->inner_storage, inner_storage->clone()); cloneOutputOptions(*res); @@ -77,10 +146,7 @@ protected: formatOnCluster(settings); if (storage) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = "; storage->formatImpl(settings, state, frame); - } return; } @@ -119,16 +185,10 @@ protected: } if (storage && !is_materialized_view && !is_view) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = "; storage->formatImpl(settings, state, frame); - } if (inner_storage) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = "; inner_storage->formatImpl(settings, state, frame); - } if (is_populate) { diff --git a/dbms/src/Parsers/ASTQueryWithOutput.h b/dbms/src/Parsers/ASTQueryWithOutput.h index 587ca1ee174..5785ffa60cb 100644 --- a/dbms/src/Parsers/ASTQueryWithOutput.h +++ b/dbms/src/Parsers/ASTQueryWithOutput.h @@ -17,14 +17,14 @@ public: ASTQueryWithOutput() = default; explicit ASTQueryWithOutput(const StringRange range_) : IAST(range_) {} + void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final; + protected: /// NOTE: call this helper at the end of the clone() method of descendant class. void cloneOutputOptions(ASTQueryWithOutput & cloned) const; /// Format only the query part of the AST (without output options). virtual void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const = 0; - - void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final; }; diff --git a/dbms/src/Parsers/ASTSelectQuery.cpp b/dbms/src/Parsers/ASTSelectQuery.cpp index b4feb245224..17ae72a0780 100644 --- a/dbms/src/Parsers/ASTSelectQuery.cpp +++ b/dbms/src/Parsers/ASTSelectQuery.cpp @@ -315,15 +315,7 @@ void ASTSelectQuery::formatQueryImpl(const FormatSettings & s, FormatState & sta if (settings) { s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SETTINGS " << (s.hilite ? hilite_none : ""); - - const ASTSetQuery & ast_set = typeid_cast(*settings); - for (ASTSetQuery::Changes::const_iterator it = ast_set.changes.begin(); it != ast_set.changes.end(); ++it) - { - if (it != ast_set.changes.begin()) - s.ostr << ", "; - - s.ostr << it->name << " = " << applyVisitor(FieldVisitorToString(), it->value); - } + settings->formatImpl(s, state, frame); } if (next_union_all) diff --git a/dbms/src/Parsers/ASTSetQuery.h b/dbms/src/Parsers/ASTSetQuery.h index c2acb2c3aff..d7cc8ec1c36 100644 --- a/dbms/src/Parsers/ASTSetQuery.h +++ b/dbms/src/Parsers/ASTSetQuery.h @@ -14,6 +14,8 @@ namespace DB class ASTSetQuery : public IAST { public: + bool is_standalone = true; /// If false, this AST is a part of another query, such as SELECT. + struct Change { String name; @@ -31,10 +33,10 @@ public: ASTPtr clone() const override { return std::make_shared(*this); } -protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override { - settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : ""); + if (is_standalone) + settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : ""); for (ASTSetQuery::Changes::const_iterator it = changes.begin(); it != changes.end(); ++it) { diff --git a/dbms/src/Parsers/IAST.h b/dbms/src/Parsers/IAST.h index 0cc861e6571..6d410fe93ef 100644 --- a/dbms/src/Parsers/IAST.h +++ b/dbms/src/Parsers/IAST.h @@ -20,6 +20,7 @@ namespace ErrorCodes extern const int NOT_A_COLUMN; extern const int UNKNOWN_TYPE_OF_AST_NODE; extern const int UNKNOWN_ELEMENT_IN_AST; + extern const int LOGICAL_ERROR; } using IdentifierNameSet = std::set; @@ -33,7 +34,7 @@ class WriteBuffer; /** Element of the syntax tree (hereinafter - directed acyclic graph with elements of semantics) */ -class IAST +class IAST : public std::enable_shared_from_this { public: ASTs children; @@ -66,6 +67,8 @@ public: /** Get the text that identifies this element. */ virtual String getID() const = 0; + ASTPtr ptr() { return shared_from_this(); } + /** Get a deep copy of the tree. */ virtual ASTPtr clone() const = 0; @@ -110,6 +113,42 @@ public: child->collectIdentifierNames(set); } + template + void set(T * & field, const ASTPtr & child) + { + if (!child) + return; + + T * casted = dynamic_cast(child.get()); + if (!casted) + throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR); + + children.push_back(child); + field = casted; + } + + template + void replace(T * & field, const ASTPtr & child) + { + if (!child) + throw Exception("Trying to replace AST subtree with nullptr", ErrorCodes::LOGICAL_ERROR); + + T * casted = dynamic_cast(child.get()); + if (!casted) + throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR); + + for (ASTPtr & current_child : children) + { + if (current_child.get() == field) + { + current_child = child; + field = casted; + return; + } + } + + throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR); + } /// Convert to a string. diff --git a/dbms/src/Parsers/ParserCreateQuery.cpp b/dbms/src/Parsers/ParserCreateQuery.cpp index 25037bd67b9..7e330d6eb68 100644 --- a/dbms/src/Parsers/ParserCreateQuery.cpp +++ b/dbms/src/Parsers/ParserCreateQuery.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -106,21 +107,78 @@ bool ParserColumnDeclarationList::parseImpl(Pos & pos, ASTPtr & node, Expected & } -bool ParserEngine::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserKeyword s_engine("ENGINE"); ParserToken s_eq(TokenType::Equals); - ParserIdentifierWithOptionalParameters storage_p; + ParserKeyword s_partition_by("PARTITION BY"); + ParserKeyword s_order_by("ORDER BY"); + ParserKeyword s_sample_by("SAMPLE BY"); + ParserKeyword s_settings("SETTINGS"); - if (s_engine.ignore(pos, expected)) + ParserIdentifierWithOptionalParameters ident_with_optional_params_p; + ParserExpression expression_p; + ParserSetQuery settings_p(/* parse_only_internals_ = */ true); + + Pos begin = pos; + + ASTPtr engine; + ASTPtr partition_by; + ASTPtr order_by; + ASTPtr sample_by; + ASTPtr settings; + + if (!s_engine.ignore(pos, expected)) + return false; + + s_eq.ignore(pos, expected); + + if (!ident_with_optional_params_p.parse(pos, engine, expected)) + return false; + + while (true) { - if (!s_eq.ignore(pos, expected)) - return false; + if (!partition_by && s_partition_by.ignore(pos, expected)) + { + if (expression_p.parse(pos, partition_by, expected)) + continue; + else + return false; + } - if (!storage_p.parse(pos, node, expected)) - return false; + if (!order_by && s_order_by.ignore(pos, expected)) + { + if (expression_p.parse(pos, order_by, expected)) + continue; + else + return false; + } + + if (!sample_by && s_sample_by.ignore(pos, expected)) + { + if (expression_p.parse(pos, sample_by, expected)) + continue; + else + return false; + } + + if (s_settings.ignore(pos, expected)) + { + if (!settings_p.parse(pos, settings, expected)) + return false; + } + + break; } + auto storage = std::make_shared(StringRange(begin, pos)); + storage->set(storage->engine, engine); + storage->set(storage->partition_by, partition_by); + storage->set(storage->order_by, order_by); + storage->set(storage->sample_by, sample_by); + storage->set(storage->settings, settings); + + node = storage; return true; } @@ -136,16 +194,16 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_database("DATABASE"); ParserKeyword s_if_not_exists("IF NOT EXISTS"); ParserKeyword s_as("AS"); - ParserKeyword s_select("SELECT"); ParserKeyword s_view("VIEW"); ParserKeyword s_materialized("MATERIALIZED"); ParserKeyword s_populate("POPULATE"); ParserToken s_dot(TokenType::Dot); ParserToken s_lparen(TokenType::OpeningRoundBracket); ParserToken s_rparen(TokenType::ClosingRoundBracket); - ParserEngine engine_p; + ParserStorage storage_p; ParserIdentifier name_p; ParserColumnDeclarationList columns_p; + ParserSelectQuery select_p; ASTPtr database; ASTPtr table; @@ -190,7 +248,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) return false; } - engine_p.parse(pos, storage, expected); + storage_p.parse(pos, storage, expected); } else if (s_table.ignore(pos, expected)) { @@ -222,39 +280,31 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (!s_rparen.ignore(pos, expected)) return false; - if (!engine_p.parse(pos, storage, expected)) + if (!storage_p.parse(pos, storage, expected) && !is_temporary) return false; - /// For engine VIEW, you also need to read AS SELECT - if (storage && (typeid_cast(*storage).name == "View" - || typeid_cast(*storage).name == "MaterializedView")) + if (storage) { - if (!s_as.ignore(pos, expected)) - return false; - Pos before_select = pos; - if (!s_select.ignore(pos, expected)) - return false; - pos = before_select; - ParserSelectQuery select_p; - select_p.parse(pos, select, expected); + const auto & storage_ast = typeid_cast(*storage); + /// For engine VIEW, you also need to read AS SELECT + if (storage_ast.engine->name == "View" || storage_ast.engine->name == "MaterializedView") + { + if (!s_as.ignore(pos, expected)) + return false; + + if (!select_p.parse(pos, select, expected)) + return false; + } } } else { - engine_p.parse(pos, storage, expected); + storage_p.parse(pos, storage, expected); if (!s_as.ignore(pos, expected)) return false; - /// AS SELECT ... - Pos before_select = pos; - if (s_select.ignore(pos, expected)) - { - pos = before_select; - ParserSelectQuery select_p; - select_p.parse(pos, select, expected); - } - else + if (!select_p.parse(pos, select, expected)) /// AS SELECT ... { /// AS [db.]table if (!name_p.parse(pos, as_table, expected)) @@ -268,7 +318,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } /// Optional - ENGINE can be specified. - engine_p.parse(pos, storage, expected); + storage_p.parse(pos, storage, expected); } } } @@ -315,7 +365,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } /// Optional - internal ENGINE for MATERIALIZED VIEW can be specified - engine_p.parse(pos, inner_storage, expected); + storage_p.parse(pos, inner_storage, expected); if (s_populate.ignore(pos, expected)) is_populate = true; @@ -344,25 +394,15 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (table) query->table = typeid_cast(*table).name; query->cluster = cluster_str; - if (inner_storage) - query->inner_storage = inner_storage; + query->set(query->inner_storage, inner_storage); - query->columns = columns; - query->storage = storage; + query->set(query->columns, columns); + query->set(query->storage, storage); if (as_database) query->as_database = typeid_cast(*as_database).name; if (as_table) query->as_table = typeid_cast(*as_table).name; - query->select = select; - - if (columns) - query->children.push_back(columns); - if (storage) - query->children.push_back(storage); - if (select) - query->children.push_back(select); - if (inner_storage) - query->children.push_back(inner_storage); + query->set(query->select, select); return true; } diff --git a/dbms/src/Parsers/ParserCreateQuery.h b/dbms/src/Parsers/ParserCreateQuery.h index 3aa78f9cc5c..b8fae5d57d7 100644 --- a/dbms/src/Parsers/ParserCreateQuery.h +++ b/dbms/src/Parsers/ParserCreateQuery.h @@ -187,11 +187,11 @@ protected: }; -/** ENGINE = name. */ -class ParserEngine : public IParserBase +/** ENGINE = name [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name = value, ...] */ +class ParserStorage : public IParserBase { protected: - const char * getName() const { return "ENGINE"; } + const char * getName() const { return "storage definition"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected); }; diff --git a/dbms/src/Parsers/ParserSetQuery.cpp b/dbms/src/Parsers/ParserSetQuery.cpp index 6a5ee151707..109c4e5acc1 100644 --- a/dbms/src/Parsers/ParserSetQuery.cpp +++ b/dbms/src/Parsers/ParserSetQuery.cpp @@ -47,7 +47,6 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (!parse_only_internals) { ParserKeyword s_set("SET"); - ParserKeyword s_global("GLOBAL"); if (!s_set.ignore(pos, expected)) return false; @@ -69,6 +68,7 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) auto query = std::make_shared(StringRange(begin, pos)); node = query; + query->is_standalone = !parse_only_internals; query->changes = changes; return true; diff --git a/dbms/src/Parsers/ParserSetQuery.h b/dbms/src/Parsers/ParserSetQuery.h index d48ecd8ddab..13bc30acfce 100644 --- a/dbms/src/Parsers/ParserSetQuery.h +++ b/dbms/src/Parsers/ParserSetQuery.h @@ -8,7 +8,7 @@ namespace DB { /** Query like this: - * SET [GLOBAL] name1 = value1, name2 = value2, ... + * SET name1 = value1, name2 = value2, ... */ class ParserSetQuery : public IParserBase { @@ -19,7 +19,7 @@ protected: const char * getName() const override { return "SET query"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; - /// Parse the list `name = value` pairs, without SET [GLOBAL]. + /// Parse the list `name = value` pairs, without SET. bool parse_only_internals; }; diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index dcfef278189..26377bd4230 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -30,6 +30,7 @@ using BlockOutputStreamPtr = std::shared_ptr; using BlockInputStreamPtr = std::shared_ptr; using BlockInputStreams = std::vector; +class ASTCreateQuery; class IStorage; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 6d4452e8e60..25ed7334ae2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -83,7 +83,7 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor( const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block) : data_part(data_part_) { - number_of_rows_in_part = data_part->getExactSizeRows(); + number_of_rows_in_part = data_part->rows_count; /// Initialize with sample block untill update won't called. initialize(sample_block, columns); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index eaf6e50c107..44ddcc93860 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -86,7 +86,6 @@ MergeTreeData::MergeTreeData( const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, - size_t index_granularity_, const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, @@ -96,7 +95,7 @@ MergeTreeData::MergeTreeData( PartsCleanCallback parts_clean_callback_) : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), sampling_expression(sampling_expression_), - index_granularity(index_granularity_), + index_granularity(settings_.index_granularity), merging_params(merging_params_), settings(settings_), primary_expr_ast(primary_expr_ast_), @@ -110,11 +109,16 @@ MergeTreeData::MergeTreeData( { merging_params.check(*columns); + if (primary_expr_ast && merging_params.mode == MergingParams::Unsorted) + throw Exception("Primary key cannot be set for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS); if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted) - throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS); + throw Exception("Primary key can be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS); initPrimaryKey(); + if (sampling_expression && (!primary_expr_ast || !primary_key_sample.has(sampling_expression->getColumnName()))) + throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS); + MergeTreeDataFormatVersion min_format_version(0); if (!date_column_name.empty()) { @@ -577,7 +581,7 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life time_t current_time = time(nullptr); ssize_t deadline = (custom_directories_lifetime_seconds >= 0) ? current_time - custom_directories_lifetime_seconds - : current_time - settings.temporary_directories_lifetime; + : current_time - settings.temporary_directories_lifetime.totalSeconds(); /// Delete temporary directories older than a day. Poco::DirectoryIterator end; @@ -622,7 +626,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() { if (it->unique() && /// After this ref_count cannot increase. (*it)->remove_time < now && - now - (*it)->remove_time > settings.old_parts_lifetime) + now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds()) { res.push_back(*it); all_data_parts.erase(it++); @@ -1087,7 +1091,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( else { const IDataType & type = *new_primary_key_sample.safeGetByPosition(i).type; - new_index[i] = type.createConstColumn(part->size, type.getDefault())->convertToFullColumnIfConst(); + new_index[i] = type.createConstColumn(part->marks_count, type.getDefault())->convertToFullColumnIfConst(); } } @@ -1098,7 +1102,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( WriteBufferFromFile index_file(index_tmp_path); HashingWriteBuffer index_stream(index_file); - for (size_t i = 0, size = part->size; i < size; ++i) + for (size_t i = 0, marks_count = part->marks_count; i < marks_count; ++i) for (size_t j = 0; j < new_key_size; ++j) new_primary_key_sample.getByPosition(j).type->serializeBinary(*new_index[j].get(), i, index_stream); @@ -1118,7 +1122,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( /// Apply the expression and write the result to temporary files. if (expression) { - MarkRanges ranges{MarkRange(0, part->size)}; + MarkRanges ranges{MarkRange(0, part->marks_count)}; BlockInputStreamPtr part_in = std::make_shared( *this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges, false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index a01740e62a8..4c2697d09d1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -62,7 +62,8 @@ namespace ErrorCodes /// Part directory - / partiiton-id _ min-id _ max-id _ level / /// Inside the part directory: /// The same files as for month-partitioned tables, plus -/// partition.dat - contains the value of the partitioning expression +/// count.txt - contains total number of rows in this part. +/// partition.dat - contains the value of the partitioning expression. /// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression. /// /// Several modes are implemented. Modes determine additional actions during merge: @@ -236,7 +237,6 @@ public: /// primary_expr_ast - expression used for sorting; empty for UnsortedMergeTree. /// date_column_name - if not empty, the name of the Date column used for partitioning by month. /// Otherwise, partition_expr_ast is used for partitioning. - /// index_granularity - how many rows correspond to one primary key value. /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory. /// attach - whether the existing table is attached or the new table is created. MergeTreeData( const String & database_, const String & table_, @@ -249,7 +249,6 @@ public: const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported. - size_t index_granularity_, const MergingParams & merging_params_, const MergeTreeSettings & settings_, const String & log_name_, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index ad23021335a..280e1cc30a6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -499,7 +499,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart std::shared_lock part_lock(part->columns_lock); merge_entry->total_size_bytes_compressed += part->size_in_bytes; - merge_entry->total_size_marks += part->size; + merge_entry->total_size_marks += part->marks_count; } MergeTreeData::DataPart::ColumnToSize merged_column_to_size; @@ -557,7 +557,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart for (const auto & part : parts) { auto input = std::make_unique( - data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->size)), + data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); input->setProgressCallback(MergeProgressCallback( @@ -691,7 +691,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart for (size_t part_num = 0; part_num < parts.size(); ++part_num) { auto column_part_stream = std::make_shared( - data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)}, + data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)}, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true); column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep( @@ -755,7 +755,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns); /// For convenience, even CollapsingSortedBlockInputStream can not return zero rows. - if (0 == to.marksCount()) + if (0 == to.getRowsCount()) throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR); return new_data_part; @@ -862,12 +862,16 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( /// Merge all parts of the partition. + size_t total_input_rows = 0; + for (const MergeTreeData::DataPartPtr & part : parts) { + total_input_rows += part->rows_count; + std::shared_lock part_lock(part->columns_lock); merge_entry->total_size_bytes_compressed += part->size_in_bytes; - merge_entry->total_size_marks += part->size; + merge_entry->total_size_marks += part->marks_count; } MergeTreeData::DataPart::ColumnToSize merged_column_to_size; @@ -882,22 +886,18 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( BlockInputStreams src_streams; - size_t sum_rows_approx = 0; - - const auto rows_total = merge_entry->total_size_marks * data.index_granularity; - for (size_t i = 0; i < parts.size(); ++i) { - MarkRanges ranges(1, MarkRange(0, parts[i]->size)); + MarkRanges ranges(1, MarkRange(0, parts[i]->marks_count)); auto input = std::make_unique( data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names, ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); - input->setProgressCallback([&merge_entry, rows_total] (const Progress & value) + input->setProgressCallback([&merge_entry, total_input_rows] (const Progress & value) { const auto new_rows_read = merge_entry->rows_read += value.rows; - merge_entry->progress = static_cast(new_rows_read) / rows_total; + merge_entry->progress = static_cast(new_rows_read) / total_input_rows; merge_entry->bytes_read_uncompressed += value.bytes; }); @@ -906,8 +906,6 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( std::make_shared(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression()))); else src_streams.emplace_back(std::move(input)); - - sum_rows_approx += parts[i]->size * data.index_granularity; } /// Sharding of merged blocks. @@ -1038,7 +1036,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; if (disk_reservation) - disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation)); + disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / total_input_rows)) * initial_reservation)); } } @@ -1050,7 +1048,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition( abortReshardPartitionIfRequested(); MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no); - if (0 == output_stream->marksCount()) + if (0 == output_stream->getRowsCount()) { /// There was no data in this shard. Ignore. LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 66f8e05e38a..0503d02fb99 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -319,6 +319,7 @@ void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & storage, const HashingWriteBuffer out_hashing(out); type->serializeBinary(min_values[i], out_hashing); type->serializeBinary(max_values[i], out_hashing); + out_hashing.next(); checksums.files[file_name].file_size = out_hashing.count(); checksums.files[file_name].file_hash = out_hashing.getHash(); } @@ -426,43 +427,6 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const } -size_t MergeTreeDataPart::getExactSizeRows() const -{ - size_t rows_approx = storage.index_granularity * size; - - for (const NameAndTypePair & column : columns) - { - ColumnPtr column_col = column.type->createColumn(); - const auto checksum = tryGetBinChecksum(column.name); - - /// Should be fixed non-nullable column - if (!checksum || !column_col->isFixed() || column_col->isNullable()) - continue; - - size_t sizeof_field = column_col->sizeOfField(); - size_t rows = checksum->uncompressed_size / sizeof_field; - - if (checksum->uncompressed_size % sizeof_field != 0) - { - throw Exception( - "Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size) - + ", sizeof " + toString(sizeof_field), - ErrorCodes::LOGICAL_ERROR); - } - - if (!(rows_approx - storage.index_granularity < rows && rows <= rows_approx)) - { - throw Exception("Unexpected size of column " + column.name + ": " + toString(rows) + " rows", - ErrorCodes::LOGICAL_ERROR); - } - - return rows; - } - - throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR); -} - - String MergeTreeDataPart::getFullPath() const { if (relative_path.empty()) @@ -647,6 +611,7 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu loadColumns(require_columns_checksums); loadChecksums(require_columns_checksums); loadIndex(); + loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `marks_count`. loadPartitionAndMinMaxIndex(); if (check_consistency) checkConsistency(require_columns_checksums); @@ -655,13 +620,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu void MergeTreeDataPart::loadIndex() { - /// Size - in number of marks. - if (!size) + if (!marks_count) { if (columns.empty()) throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); - size = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk") + marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk") .getSize() / MERGE_TREE_MARK_SIZE; } @@ -675,20 +639,20 @@ void MergeTreeDataPart::loadIndex() for (size_t i = 0; i < key_size; ++i) { index[i] = storage.primary_key_data_types[i]->createColumn(); - index[i]->reserve(size); + index[i]->reserve(marks_count); } String index_path = getFullPath() + "primary.idx"; ReadBufferFromFile index_file = openForReading(index_path); - for (size_t i = 0; i < size; ++i) + for (size_t i = 0; i < marks_count; ++i) for (size_t j = 0; j < key_size; ++j) storage.primary_key_data_types[j]->deserializeBinary(*index[j].get(), index_file); for (size_t i = 0; i < key_size; ++i) - if (index[i]->size() != size) + if (index[i]->size() != marks_count) throw Exception("Cannot read all data from index file " + index_path - + "(expected size: " + toString(size) + ", read: " + toString(index[i]->size()) + ")", + + "(expected size: " + toString(marks_count) + ", read: " + toString(index[i]->size()) + ")", ErrorCodes::CANNOT_READ_ALL_DATA); if (!index_file.eof()) @@ -740,6 +704,54 @@ void MergeTreeDataPart::loadChecksums(bool require) assertEOF(file); } +void MergeTreeDataPart::loadRowsCount() +{ + if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + { + String path = getFullPath() + "count.txt"; + if (!Poco::File(path).exists()) + throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); + + ReadBufferFromFile file = openForReading(path); + readIntText(rows_count, file); + assertEOF(file); + } + else + { + size_t rows_approx = storage.index_granularity * marks_count; + + for (const NameAndTypePair & column : columns) + { + ColumnPtr column_col = column.type->createColumn(); + const auto checksum = tryGetBinChecksum(column.name); + + /// Should be fixed non-nullable column + if (!checksum || !column_col->isFixed() || column_col->isNullable()) + continue; + + size_t sizeof_field = column_col->sizeOfField(); + rows_count = checksum->uncompressed_size / sizeof_field; + + if (checksum->uncompressed_size % sizeof_field != 0) + { + throw Exception( + "Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size) + + ", sizeof " + toString(sizeof_field), + ErrorCodes::LOGICAL_ERROR); + } + + if (!(rows_count <= rows_approx && rows_approx < rows_count + storage.index_granularity)) + throw Exception( + "Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows", + ErrorCodes::LOGICAL_ERROR); + + return; + } + + throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR); + } +} + void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const { std::shared_lock part_lock(columns_lock); @@ -799,6 +811,9 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata) if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { + if (!checksums.files.count("count.txt")) + throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART); + if (storage.partition_expr && !checksums.files.count("partition.dat")) throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART); @@ -827,6 +842,8 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata) if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { + check_file_not_empty(path + "count.txt"); + if (storage.partition_expr) check_file_not_empty(path + "partition.dat"); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index c7d94c4ccb8..7633cf42ab4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -108,9 +108,6 @@ struct MergeTreeDataPart /// If no checksums are present returns the name of the first physically existing column. String getColumnNameWithMinumumCompressedSize() const; - /// If part has column with fixed size, will return exact size of part (in rows) - size_t getExactSizeRows() const; - /// Returns full path to part dir String getFullPath() const; @@ -132,7 +129,8 @@ struct MergeTreeDataPart /// Examples: 'detached/tmp_fetch_', 'tmp_', '' mutable String relative_path; - size_t size = 0; /// in number of marks. + size_t rows_count = 0; + size_t marks_count = 0; std::atomic size_in_bytes {0}; /// size in bytes, 0 - if not counted; /// is used from several threads without locks (it is changed with ALTER). time_t modification_time = 0; @@ -239,9 +237,13 @@ private: /// If checksums.txt exists, reads files' checksums (and sizes) from it void loadChecksums(bool require); - /// Loads index file. Also calculates this->size if size=0 + /// Loads index file. Also calculates this->marks_count if marks_count = 0 void loadIndex(); + /// Load rows count for this part from disk (for the newer storage format version). + /// For the older format version calculates rows count from the size of a column with a fixed size. + void loadRowsCount(); + void loadPartitionAndMinMaxIndex(); void checkConsistency(bool require_part_metadata); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 1d17475d0ef..0ef03e01ca5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -526,7 +526,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings); else - ranges.ranges = MarkRanges{MarkRange{0, part->size}}; + ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}}; if (!ranges.ranges.empty()) { diff --git a/dbms/src/Storages/MergeTree/MergeTreePartition.cpp b/dbms/src/Storages/MergeTree/MergeTreePartition.cpp index e33896a3cdf..7031d188812 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartition.cpp @@ -125,6 +125,7 @@ void MergeTreePartition::store(const MergeTreeData & storage, const String & par HashingWriteBuffer out_hashing(out); for (size_t i = 0; i < value.size(); ++i) storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing); + out_hashing.next(); checksums.files["partition.dat"].file_size = out_hashing.count(); checksums.files["partition.dat"].file_hash = out_hashing.getHash(); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 5391561013f..1a4d8841c5a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -403,7 +403,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con std::string filename = name + NULL_MAP_EXTENSION; streams.emplace(filename, std::make_unique( - path + escaped_column_name, NULL_MAP_EXTENSION, data_part->size, + path + escaped_column_name, NULL_MAP_EXTENSION, data_part->marks_count, all_mark_ranges, mark_cache, save_marks_in_cache, uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); @@ -425,7 +425,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con if (!streams.count(size_name)) streams.emplace(size_name, std::make_unique( - path + escaped_size_name, DATA_FILE_EXTENSION, data_part->size, + path + escaped_size_name, DATA_FILE_EXTENSION, data_part->marks_count, all_mark_ranges, mark_cache, save_marks_in_cache, uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); @@ -436,7 +436,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con } else streams.emplace(name, std::make_unique( - path + escaped_column_name, DATA_FILE_EXTENSION, data_part->size, + path + escaped_column_name, DATA_FILE_EXTENSION, data_part->marks_count, all_mark_ranges, mark_cache, save_marks_in_cache, uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp b/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp new file mode 100644 index 00000000000..4fa366277f2 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.cpp @@ -0,0 +1,75 @@ +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INVALID_CONFIG_PARAMETER; + extern const int BAD_ARGUMENTS; +} + +void MergeTreeSettings::loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) +{ + if (!config.has(config_elem)) + return; + + Poco::Util::AbstractConfiguration::Keys config_keys; + config.keys(config_elem, config_keys); + + for (const String & key : config_keys) + { + String value = config.getString(config_elem + "." + key); + +#define SET(TYPE, NAME, DEFAULT) \ + else if (key == #NAME) NAME.set(value); + + if (false) {} + APPLY_FOR_MERGE_TREE_SETTINGS(SET) + else + throw Exception("Unknown MergeTree setting " + key + " in config", ErrorCodes::INVALID_CONFIG_PARAMETER); +#undef SET + } +} + +void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def) +{ + if (storage_def.settings) + { + for (const ASTSetQuery::Change & setting : storage_def.settings->changes) + { +#define SET(TYPE, NAME, DEFAULT) \ + else if (setting.name == #NAME) NAME.set(setting.value); + + if (false) {} + APPLY_FOR_MERGE_TREE_SETTINGS(SET) + else + throw Exception( + "Unknown setting " + setting.name + " for storage " + storage_def.engine->name, + ErrorCodes::BAD_ARGUMENTS); +#undef SET + } + } + else + { + auto settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + storage_def.set(storage_def.settings, settings_ast); + } + + ASTSetQuery::Changes & changes = storage_def.settings->changes; + +#define ADD_IF_ABSENT(NAME) \ + if (std::find_if(changes.begin(), changes.end(), \ + [](const ASTSetQuery::Change & c) { return c.name == #NAME; }) \ + == changes.end()) \ + changes.push_back(ASTSetQuery::Change{#NAME, NAME.value}); + + APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(ADD_IF_ABSENT); +#undef ADD_IF_ABSENT +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 80608b79101..38a5bd56764 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -3,173 +3,143 @@ #include #include #include -#include +#include namespace DB { -namespace ErrorCodes -{ - extern const int INVALID_CONFIG_PARAMETER; -} +class ASTStorage; -/** Advanced settings of MergeTree. - * Could be loaded from config. +/** Settings for the MergeTree family of engines. + * Could be loaded from config or from a CREATE TABLE query (SETTINGS clause). */ struct MergeTreeSettings { - /** Merge settings. */ - /// Maximum in total size of parts to merge, when there are maximum (minimum) free threads in background pool (or entries in replication queue). - size_t max_bytes_to_merge_at_max_space_in_pool = size_t(150) * 1024 * 1024 * 1024; - size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024; +#define APPLY_FOR_MERGE_TREE_SETTINGS(M) \ + /** How many rows correspond to one primary key value. */ \ + M(SettingUInt64, index_granularity, 8192) \ + \ + /** Merge settings. */ \ + \ + /** Maximum in total size of parts to merge, when there are maximum (minimum) free threads \ + * in background pool (or entries in replication queue). */ \ + M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024) \ + M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024) \ + \ + /** How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue. */ \ + M(SettingUInt64, max_replicated_merges_in_queue, 16) \ + \ + /** When there is less than specified number of free entries in pool (or replicated queue), \ + * start to lower maximum size of merge to process (or to put in queue). \ + * This is to allow small merges to process - not filling the pool with long running merges. */ \ + M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8) \ + \ + /** How many seconds to keep obsolete parts. */ \ + M(SettingSeconds, old_parts_lifetime, 8 * 60) \ + \ + /** How many seconds to keep tmp_-directories. */ \ + M(SettingSeconds, temporary_directories_lifetime, 86400) \ + \ + /** Inserts settings. */ \ + \ + /** If table contains at least that many active parts, artificially slow down insert into table. */ \ + M(SettingUInt64, parts_to_delay_insert, 150) \ + \ + /** If more than this number active parts, throw 'Too much parts ...' exception */ \ + M(SettingUInt64, parts_to_throw_insert, 300) \ + \ + /** Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts. */ \ + M(SettingUInt64, max_delay_to_insert, 1) \ + \ + /** Replication settings. */ \ + \ + /** How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted). */ \ + M(SettingUInt64, replicated_deduplication_window, 100) \ + /** Similar to previous, but determines old blocks by their lifetime. \ + * Hash of an inserted block will be deleted (and the block will not be deduplicated after) \ + * if it outside of one "window". You can set very big replicated_deduplication_window to avoid \ + * duplicating INSERTs during that period of time. */ \ + M(SettingUInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60) /** one week */ \ + \ + /** Keep about this number of last records in ZooKeeper log, even if they are obsolete. \ + * It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. */ \ + M(SettingUInt64, replicated_logs_to_keep, 100) \ + \ + /** After specified amount of time passed after replication log entry creation \ + * and sum size of parts is greater than threshold, \ + * prefer fetching merged part from replica instead of doing merge locally. \ + * To speed up very long merges. */ \ + M(SettingSeconds, prefer_fetch_merged_part_time_threshold, 3600) \ + M(SettingUInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024) \ + \ + /** Max broken parts, if more - deny automatic deletion. */ \ + M(SettingUInt64, max_suspicious_broken_parts, 10) \ + \ + /** Not apply ALTER if number of files for modification(deletion, addition) more than this. */ \ + M(SettingUInt64, max_files_to_modify_in_alter_columns, 75) \ + /** Not apply ALTER, if number of files for deletion more than this. */ \ + M(SettingUInt64, max_files_to_remove_in_alter_columns, 50) \ + \ + /** If ratio of wrong parts to total number of parts is less than this - allow to start. */ \ + M(SettingFloat, replicated_max_ratio_of_wrong_parts, 0.5) \ + \ + /** Limit parallel fetches */ \ + M(SettingUInt64, replicated_max_parallel_fetches, 0) \ + M(SettingUInt64, replicated_max_parallel_fetches_for_table, 0) \ + /** Limit parallel sends */ \ + M(SettingUInt64, replicated_max_parallel_sends, 0) \ + M(SettingUInt64, replicated_max_parallel_sends_for_table, 0) \ + \ + /** If true, Replicated tables replicas on this node will try to acquire leadership. */ \ + M(SettingBool, replicated_can_become_leader, true) \ + \ + M(SettingSeconds, zookeeper_session_expiration_check_period, 60) \ + \ + /** Check delay of replicas settings. */ \ + \ + /** Period to check replication delay and compare with other replicas. */ \ + M(SettingUInt64, check_delay_period, 60) \ + \ + /** Period to clean old queue logs, blocks hashes and parts */ \ + M(SettingUInt64, cleanup_delay_period, 30) \ + \ + /** Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. */ \ + M(SettingUInt64, min_relative_delay_to_yield_leadership, 120) \ + \ + /** Minimal delay from other replicas to close, stop serving requests and not return Ok \ + * during status check. */ \ + M(SettingUInt64, min_relative_delay_to_close, 300) \ + \ + /** Minimal absolute delay to close, stop serving requests and not return Ok during status check. */ \ + M(SettingUInt64, min_absolute_delay_to_close, 0) \ + \ + /** Enable usage of Vertical merge algorithm. */ \ + M(SettingUInt64, enable_vertical_merge_algorithm, 1) \ + \ + /** Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm */ \ + M(SettingUInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * DEFAULT_MERGE_BLOCK_SIZE) \ + \ + /** Minimal amount of non-PK columns to activate Vertical merge algorithm */ \ + M(SettingUInt64, vertical_merge_algorithm_min_columns_to_activate, 11) - /// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue. - size_t max_replicated_merges_in_queue = 16; + /// Settings that should not change after the creation of a table. +#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \ + M(index_granularity) - /// When there is less than specified number of free entries in pool (or replicated queue), - /// start to lower maximum size of merge to process (or to put in queue). - /// This is to allow small merges to process - not filling the pool with long running merges. - size_t number_of_free_entries_in_pool_to_lower_max_size_of_merge = 8; +#define DECLARE(TYPE, NAME, DEFAULT) \ + TYPE NAME {DEFAULT}; - /// How many seconds to keep obsolete parts. - time_t old_parts_lifetime = 8 * 60; + APPLY_FOR_MERGE_TREE_SETTINGS(DECLARE) - /// How many seconds to keep tmp_-directories. - time_t temporary_directories_lifetime = 86400; +#undef DECLARE - /** Inserts settings. */ +public: + void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config); - /// If table contains at least that many active parts, artificially slow down insert into table. - size_t parts_to_delay_insert = 150; - - /// If more than this number active parts, throw 'Too much parts ...' exception - size_t parts_to_throw_insert = 300; - - /// Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts. - size_t max_delay_to_insert = 1; - - /** Replication settings. */ - - /// How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted). - size_t replicated_deduplication_window = 100; - /// Similar to previous, but determines old blocks by their lifetime. - /// Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one "window". - /// You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time. - size_t replicated_deduplication_window_seconds = 7 * 24 * 60 * 60; /// one week - - /// Keep about this number of last records in ZooKeeper log, even if they are obsolete. - /// It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. - size_t replicated_logs_to_keep = 100; - - /// After specified amount of time passed after replication log entry creation - /// and sum size of parts is greater than threshold, - /// prefer fetching merged part from replica instead of doing merge locally. - /// To speed up very long merges. - time_t prefer_fetch_merged_part_time_threshold = 3600; - size_t prefer_fetch_merged_part_size_threshold = 10ULL * 1024 * 1024 * 1024; - - /// Max broken parts, if more - deny automatic deletion. - size_t max_suspicious_broken_parts = 10; - - /// Not apply ALTER if number of files for modification(deletion, addition) more than this. - size_t max_files_to_modify_in_alter_columns = 75; - /// Not apply ALTER, if number of files for deletion more than this. - size_t max_files_to_remove_in_alter_columns = 50; - - /// If ratio of wrong parts to total number of parts is less than this - allow to start. - double replicated_max_ratio_of_wrong_parts = 0.5; - - /// Limit parallel fetches - size_t replicated_max_parallel_fetches = 0; - size_t replicated_max_parallel_fetches_for_table = 0; - /// Limit parallel sends - size_t replicated_max_parallel_sends = 0; - size_t replicated_max_parallel_sends_for_table = 0; - - /// If true, Replicated tables replicas on this node will try to acquire leadership. - bool replicated_can_become_leader = true; - - /// In seconds. - size_t zookeeper_session_expiration_check_period = 60; - - /** Check delay of replicas settings. */ - - /// Period to check replication delay and compare with other replicas. - size_t check_delay_period = 60; - - /// Period to clean old queue logs, blocks hashes and parts - size_t cleanup_delay_period = 30; - - /// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. - size_t min_relative_delay_to_yield_leadership = 120; - - /// Minimal delay from other replicas to close, stop serving requests and not return Ok during status check. - size_t min_relative_delay_to_close = 300; - - /// Minimal absolute delay to close, stop serving requests and not return Ok during status check. - size_t min_absolute_delay_to_close = 0; - - /// Enable usage of Vertical merge algorithm. - size_t enable_vertical_merge_algorithm = 1; - - /// Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm - size_t vertical_merge_algorithm_min_rows_to_activate = 16 * DEFAULT_MERGE_BLOCK_SIZE; - - /// Minimal amount of non-PK columns to activate Vertical merge algorithm - size_t vertical_merge_algorithm_min_columns_to_activate = 11; - - - void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) - { - #define SET(NAME, GETTER) \ - try \ - { \ - NAME = config.GETTER(config_elem + "." #NAME, NAME); \ - } \ - catch (const Poco::Exception & e) \ - { \ - throw Exception( \ - "Invalid config parameter: " + config_elem + "/" #NAME + ": " + e.message() + ".", \ - ErrorCodes::INVALID_CONFIG_PARAMETER); \ - } - - SET(max_bytes_to_merge_at_max_space_in_pool, getUInt64); - SET(max_bytes_to_merge_at_min_space_in_pool, getUInt64); - SET(max_replicated_merges_in_queue, getUInt64); - SET(number_of_free_entries_in_pool_to_lower_max_size_of_merge, getUInt64); - SET(old_parts_lifetime, getUInt64); - SET(temporary_directories_lifetime, getUInt64); - SET(parts_to_delay_insert, getUInt64); - SET(parts_to_throw_insert, getUInt64); - SET(max_delay_to_insert, getUInt64); - SET(replicated_deduplication_window, getUInt64); - SET(replicated_deduplication_window_seconds, getUInt64); - SET(replicated_logs_to_keep, getUInt64); - SET(prefer_fetch_merged_part_time_threshold, getUInt64); - SET(prefer_fetch_merged_part_size_threshold, getUInt64); - SET(max_suspicious_broken_parts, getUInt64); - SET(max_files_to_modify_in_alter_columns, getUInt64); - SET(max_files_to_remove_in_alter_columns, getUInt64); - SET(replicated_max_ratio_of_wrong_parts, getDouble); - SET(replicated_max_parallel_fetches, getUInt64); - SET(replicated_max_parallel_fetches_for_table, getUInt64); - SET(replicated_max_parallel_sends, getUInt64); - SET(replicated_max_parallel_sends_for_table, getUInt64); - SET(replicated_can_become_leader, getBool); - SET(zookeeper_session_expiration_check_period, getUInt64); - SET(check_delay_period, getUInt64); - SET(cleanup_delay_period, getUInt64); - SET(min_relative_delay_to_yield_leadership, getUInt64); - SET(min_relative_delay_to_close, getUInt64); - SET(min_absolute_delay_to_close, getUInt64); - SET(enable_vertical_merge_algorithm, getUInt64); - SET(vertical_merge_algorithm_min_rows_to_activate, getUInt64); - SET(vertical_merge_algorithm_min_columns_to_activate, getUInt64); - - #undef SET - } + /// NOTE: will rewrite the AST to add immutable settings. + void loadFromQuery(ASTStorage & storage_def); }; } diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 6fb7fb1c935..7e086cfe6b2 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -408,7 +408,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( column_streams.clear(); - if (marks_count == 0) + if (rows_count == 0) { /// A part is empty - all records are deleted. Poco::File(part_path).remove(true); @@ -419,6 +419,13 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( { new_part->partition.store(storage, part_path, checksums); new_part->minmax_idx.store(storage, part_path, checksums); + + WriteBufferFromFile count_out(part_path + "count.txt", 4096); + HashingWriteBuffer count_out_hashing(count_out); + writeIntText(rows_count, count_out_hashing); + count_out_hashing.next(); + checksums.files["count.txt"].file_size = count_out_hashing.count(); + checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); } { @@ -433,7 +440,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( checksums.write(out); } - new_part->size = marks_count; + new_part->rows_count = rows_count; + new_part->marks_count = marks_count; new_part->modification_time = time(nullptr); new_part->columns = *total_column_list; new_part->index.swap(index_columns); @@ -441,11 +449,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( new_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part->getFullPath()); } -size_t MergedBlockOutputStream::marksCount() -{ - return marks_count; -} - void MergedBlockOutputStream::init() { Poco::File(part_path).createDirectories(); @@ -525,6 +528,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } } + rows_count += rows; + { /** While filling index (index_columns), disable memory tracker. * Because memory is allocated here (maybe in context of INSERT query), diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h index 869570dfacd..c28d368b00e 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -130,8 +130,8 @@ public: const NamesAndTypesList * total_columns_list = nullptr, MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); - /// How many marks are already written. - size_t marksCount(); + /// How many rows are already written. + size_t getRowsCount() const { return rows_count; } private: void init(); @@ -145,6 +145,7 @@ private: NamesAndTypesList columns_list; String part_path; + size_t rows_count = 0; size_t marks_count = 0; std::unique_ptr index_file_stream; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 96c416c3cbd..f445dfbfc7e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -85,7 +85,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() std::sort(entries.begin(), entries.end()); /// We will not touch the last `replicated_logs_to_keep` records. - entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end()); + entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep.value), entries.end()); /// We will not touch records that are no less than `min_pointer`. entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end()); @@ -164,7 +164,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() /// Virtual node, all nodes that are "greater" than this one will be deleted NodeWithStat block_threshold("", RequiredStat(time_threshold)); - size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window); + size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 16096e5e762..c6d0e0a9cce 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -50,7 +50,7 @@ void ReplicatedMergeTreeRestartingThread::run() constexpr auto retry_period_ms = 10 * 1000; /// The frequency of checking expiration of session in ZK. - Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period * 1000; + Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000; /// Periodicity of checking lag of replica. if (check_period_ms > static_cast(storage.data.settings.check_delay_period) * 1000) diff --git a/dbms/src/Storages/StorageDictionary.cpp b/dbms/src/Storages/StorageDictionary.cpp index 835b6f19f6e..2746b6a9861 100644 --- a/dbms/src/Storages/StorageDictionary.cpp +++ b/dbms/src/Storages/StorageDictionary.cpp @@ -16,19 +16,18 @@ namespace DB StoragePtr StorageDictionary::create( const String & table_name, Context & context, - ASTPtr & query, + const ASTCreateQuery & query, NamesAndTypesListPtr columns, const NamesAndTypesList & materialized_columns, const NamesAndTypesList & alias_columns, const ColumnDefaults & column_defaults) { - ASTCreateQuery & create = typeid_cast(*query); - const ASTFunction & function = typeid_cast (*create.storage); + const ASTFunction & engine = *query.storage->engine; String dictionary_name; - if (function.arguments) + if (engine.arguments) { std::stringstream iss; - function.arguments->format(IAST::FormatSettings(iss, false, false)); + engine.arguments->format(IAST::FormatSettings(iss, false, false)); dictionary_name = iss.str(); } diff --git a/dbms/src/Storages/StorageDictionary.h b/dbms/src/Storages/StorageDictionary.h index c3f28f31027..2e0a2c43d83 100644 --- a/dbms/src/Storages/StorageDictionary.h +++ b/dbms/src/Storages/StorageDictionary.h @@ -24,7 +24,7 @@ class StorageDictionary : private ext::shared_ptr_helper, pub public: static StoragePtr create(const String & table_name_, Context & context_, - ASTPtr & query_, + const ASTCreateQuery & query, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 83325bb56f2..1233333c8dd 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -64,9 +64,9 @@ namespace ErrorCodes * It can be specified in the tuple: (CounterID, Date), * or as one column: CounterID. */ -static ASTPtr extractKeyExpressionList(const ASTPtr & node) +static ASTPtr extractKeyExpressionList(IAST & node) { - const ASTFunction * expr_func = typeid_cast(&*node); + const ASTFunction * expr_func = typeid_cast(&node); if (expr_func && expr_func->name == "tuple") { @@ -77,7 +77,7 @@ static ASTPtr extractKeyExpressionList(const ASTPtr & node) { /// Primary key consists of one column. auto res = std::make_shared(); - res->children.push_back(node); + res->children.push_back(node.ptr()); return res; } } @@ -245,6 +245,117 @@ static void checkAllTypesAreAllowedInTable(const NamesAndTypesList & names_and_t } +static String getMergeTreeVerboseHelp(bool is_extended_syntax) +{ + String help = R"( + +MergeTree is a family of storage engines. + +MergeTrees are different in two ways: +- they may be replicated and non-replicated; +- they may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions. + +So we have 14 combinations: + MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree + ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree + +In most of cases, you need MergeTree or ReplicatedMergeTree. + +For replicated merge trees, you need to supply a path in ZooKeeper and a replica name as the first two parameters. +Path in ZooKeeper is like '/clickhouse/tables/01/' where /clickhouse/tables/ is a common prefix and 01 is a shard name. +Replica name is like 'mtstat01-1' - it may be the hostname or any suitable string identifying replica. +You may use macro substitutions for these parameters. It's like ReplicatedMergeTree('/clickhouse/tables/{shard}/', '{replica}'... +Look at the section in server configuration file. +)"; + + if (!is_extended_syntax) + help += R"( +Next parameter (which is the first for unreplicated tables and the third for replicated tables) is the name of date column. +Date column must exist in the table and have type Date (not DateTime). +It is used for internal data partitioning and works like some kind of index. + +If your source data doesn't have a column of type Date, but has a DateTime column, you may add values for Date column while loading, + or you may INSERT your source data to a table of type Log and then transform it with INSERT INTO t SELECT toDate(time) AS date, * FROM ... +If your source data doesn't have any date or time, you may just pass any constant for a date column while loading. + +Next parameter is optional sampling expression. Sampling expression is used to implement SAMPLE clause in query for approximate query execution. +If you don't need approximate query execution, simply omit this parameter. +Sample expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID). + +Next parameter is the primary key tuple. It's like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions in round brackets. If your primary key has just one element, you may omit round brackets. + +Careful choice of the primary key is extremely important for processing short-time queries. + +Next parameter is index (primary key) granularity. Good value is 8192. You have no reasons to use any other value. +)"; + + help += R"( +For the Collapsing mode, the last parameter is the name of a sign column - a special column that is used to 'collapse' rows with the same primary key while merging. + +For the Summing mode, the optional last parameter is a list of columns to sum while merging. This list is passed in round brackets, like (PageViews, Cost). +If this parameter is omitted, the storage will sum all numeric columns except columns participating in the primary key. + +For the Replacing mode, the optional last parameter is the name of a 'version' column. While merging, for all rows with the same primary key, only one row is selected: the last row, if the version column was not specified, or the last row with the maximum version value, if specified. +)"; + + if (is_extended_syntax) + help += R"( +You can specify a partitioning expression in the PARTITION BY clause. It is optional but highly recommended. +A common partitioning expression is some function of the event date column e.g. PARTITION BY toYYYYMM(EventDate) will partition the table by month. +Rows with different partition expression values are never merged together. That allows manipulating partitions with ALTER commands. +Also it acts as a kind of index. + +Primary key is specified in the ORDER BY clause. It is mandatory for all MergeTree types except UnsortedMergeTree. +It is like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions in round brackets. +If your primary key has just one element, you may omit round brackets. + +Careful choice of the primary key is extremely important for processing short-time queries. + +Optional sampling expression can be specified in the SAMPLE BY clause. It is used to implement the SAMPLE clause in a SELECT query for approximate query execution. +Sampling expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID). + +Engine settings can be specified in the SETTINGS clause. Full list is in the source code in the 'dbms/src/Storages/MergeTree/MergeTreeSettings.h' file. +E.g. you can specify the index (primary key) granularity with SETTINGS index_granularity = 8192. + +Examples: + +MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity = 8192 + +MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID) + +CollapsingMergeTree(Sign) PARTITION BY StartDate SAMPLE BY intHash32(UserID) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID) + +SummingMergeTree PARTITION BY toMonday(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo) + +SummingMergeTree((Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) PARTITION BY toYYYYMM(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo) + +ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY EventDate ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID) +)"; + else + help += R"( +Examples: + +MergeTree(EventDate, (CounterID, EventDate), 8192) + +MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) + +CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign) + +SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192) + +SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192, (Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) + +ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) +)"; + + help += R"( +For further info please read the documentation: https://clickhouse.yandex/ +)"; + + return help; +} + + StoragePtr StorageFactory::get( const String & name, const String & data_path, @@ -252,7 +363,7 @@ StoragePtr StorageFactory::get( const String & database_name, Context & local_context, Context & context, - ASTPtr & query, + ASTCreateQuery & query, NamesAndTypesListPtr columns, const NamesAndTypesList & materialized_columns, const NamesAndTypesList & alias_columns, @@ -269,8 +380,32 @@ StoragePtr StorageFactory::get( checkAllTypesAreAllowedInTable(alias_columns); } + ASTStorage & storage_def = *query.storage; + const ASTFunction & engine_def = *storage_def.engine; + + if (engine_def.parameters) + throw Exception( + "Engine definition cannot take the form of a parametric function", ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS); + + if ((storage_def.partition_by || storage_def.order_by || storage_def.sample_by || storage_def.settings) + && !endsWith(name, "MergeTree")) + { + throw Exception( + "Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. " + "Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS); + } + + auto check_arguments_empty = [&] + { + if (engine_def.arguments) + throw Exception( + "Engine " + name + " doesn't support any arguments (" + toString(engine_def.arguments->children.size()) + " given)", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + }; + if (name == "Log") { + check_arguments_empty(); return StorageLog::create( data_path, table_name, columns, materialized_columns, alias_columns, column_defaults, @@ -278,12 +413,14 @@ StoragePtr StorageFactory::get( } else if (name == "View") { + check_arguments_empty(); return StorageView::create( table_name, database_name, context, query, columns, materialized_columns, alias_columns, column_defaults); } else if (name == "MaterializedView") { + check_arguments_empty(); return StorageMaterializedView::create( table_name, database_name, context, query, columns, materialized_columns, alias_columns, column_defaults, @@ -297,6 +434,7 @@ StoragePtr StorageFactory::get( } else if (name == "TinyLog") { + check_arguments_empty(); return StorageTinyLog::create( data_path, table_name, columns, materialized_columns, alias_columns, column_defaults, @@ -304,6 +442,7 @@ StoragePtr StorageFactory::get( } else if (name == "StripeLog") { + check_arguments_empty(); return StorageStripeLog::create( data_path, table_name, columns, materialized_columns, alias_columns, column_defaults, @@ -311,16 +450,12 @@ StoragePtr StorageFactory::get( } else if (name == "File") { - auto & func = typeid_cast(*typeid_cast(*query).storage); - auto & args = typeid_cast(*func.arguments).children; - - constexpr auto error_msg = "Storage File requires 1 or 2 arguments: name of used format and source."; - - if (func.parameters) - throw Exception(error_msg, ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS); + ASTs args = engine_def.arguments->children; if (args.empty() || args.size() > 2) - throw Exception(error_msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + throw Exception( + "Storage File requires 1 or 2 arguments: name of used format and source.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], local_context); String format_name = static_cast(*args[0]).value.safeGet(); @@ -365,6 +500,7 @@ StoragePtr StorageFactory::get( } else if (name == "Set") { + check_arguments_empty(); return StorageSet::create( data_path, table_name, columns, materialized_columns, alias_columns, column_defaults); @@ -373,17 +509,12 @@ StoragePtr StorageFactory::get( { /// Join(ANY, LEFT, k1, k2, ...) - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; - - constexpr auto params_error_message = "Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...)."; - - if (args_func.size() != 1) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - ASTs & args = typeid_cast(*args_func.at(0)).children; + const ASTs & args = engine_def.arguments->children; if (args.size() < 3) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + throw Exception( + "Storage Join requires at least 3 parameters: Join(ANY|ALL, LEFT|INNER, keys...).", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); const ASTIdentifier * strictness_id = typeid_cast(&*args[0]); if (!strictness_id) @@ -433,6 +564,7 @@ StoragePtr StorageFactory::get( } else if (name == "Memory") { + check_arguments_empty(); return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults); } else if (name == "Null") @@ -444,14 +576,8 @@ StoragePtr StorageFactory::get( /** In query, the name of database is specified as table engine argument which contains source tables, * as well as regex for source-table names. */ - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; - if (args_func.size() != 1) - throw Exception("Storage Merge requires exactly 2 parameters" - " - name of source database and regexp for table names.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - ASTs & args = typeid_cast(*args_func.at(0)).children; + ASTs args = engine_def.arguments->children; if (args.size() != 2) throw Exception("Storage Merge requires exactly 2 parameters" @@ -482,19 +608,13 @@ StoragePtr StorageFactory::get( * -- string literal as specific case; * - empty string means 'use default database from cluster'. */ - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; - const auto params_error_message = "Storage Distributed requires 3 or 4 parameters" - " - name of configuration section with list of remote servers, name of remote database, name of remote table," - " sharding key expression (optional)."; - - if (args_func.size() != 1) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - ASTs & args = typeid_cast(*args_func.at(0)).children; + ASTs args = engine_def.arguments->children; if (args.size() != 3 && args.size() != 4) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + throw Exception("Storage Distributed requires 3 or 4 parameters" + " - name of configuration section with list of remote servers, name of remote database, name of remote table," + " sharding key expression (optional).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); String cluster_name = getClusterName(*args[0]); @@ -537,14 +657,7 @@ StoragePtr StorageFactory::get( * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer. */ - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; - - if (args_func.size() != 1) - throw Exception("Storage Buffer requires 9 parameters: " - " destination database, destination table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - ASTs & args = typeid_cast(*args_func.at(0)).children; + ASTs args = engine_def.arguments->children; if (args.size() != 9) throw Exception("Storage Buffer requires 9 parameters: " @@ -585,18 +698,14 @@ StoragePtr StorageFactory::get( * - Message format (string) * - Schema (optional, if the format supports it) */ - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; - const auto params_error_message = "Storage Kafka requires 4 parameters" - " - Kafka broker list, list of topics to consume, consumer group ID, message format"; - - if (args_func.size() != 1) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - - ASTs & args = typeid_cast(*args_func.at(0)).children; + ASTs args = engine_def.arguments->children; if (args.size() != 4 && args.size() != 5) - throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + throw Exception( + "Storage Kafka requires 4 parameters" + " - Kafka broker list, list of topics to consume, consumer group ID, message format", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); String brokers; auto ast = typeid_cast(&*args[0]); @@ -647,7 +756,7 @@ StoragePtr StorageFactory::get( * - the name of the column with the date; * - (optional) expression for sampling * (the query with `SAMPLE x` will select rows that have a lower value in this column than `x * UINT32_MAX`); - * - an expression for sorting (either a scalar expression or a tuple from several); + * - an expression for sorting (either a scalar expression or a tuple of several); * - index_granularity; * - (for Collapsing) the name of Int8 column that contains `sign` type with the change of "visit" (taking values 1 and -1). * For example: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). @@ -663,72 +772,15 @@ StoragePtr StorageFactory::get( * ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column]) * GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element') * UnsortedMergeTree(date, index_granularity) TODO Add description below. + * + * Alternatively, if experimental_allow_extended_storage_definition_syntax setting is specified, + * you can specify: + * - Partitioning expression in the PARTITION BY clause; + * - Primary key in the ORDER BY clause; + * - Sampling expression in the SAMPLE BY clause; + * - Additional MergeTreeSettings in the SETTINGS clause; */ - const char * verbose_help = R"( - -MergeTree is family of storage engines. - -MergeTrees is different in two ways: -- it may be replicated and non-replicated; -- it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions. - -So we have 14 combinations: - MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree, GraphiteMergeTree - ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree, ReplicatedGraphiteMergeTree - -In most of cases, you need MergeTree or ReplicatedMergeTree. - -For replicated merge trees, you need to supply path in ZooKeeper and replica name as first two parameters. -Path in ZooKeeper is like '/clickhouse/tables/01/' where /clickhouse/tables/ is common prefix and 01 is shard name. -Replica name is like 'mtstat01-1' - it may be hostname or any suitable string identifying replica. -You may use macro substitutions for these parameters. It's like ReplicatedMergeTree('/clickhouse/tables/{shard}/', '{replica}'... -Look at section in server configuration file. - -Next parameter (which is first for unreplicated tables and third for replicated tables) is name of date column. -Date column must exist in table and have type Date (not DateTime). -It is used for internal data partitioning and works like some kind of index. - -If your source data doesn't have column of type Date, but have DateTime column, you may add values for Date column while loading, - or you may INSERT your source data to table of type Log and then transform it with INSERT INTO t SELECT toDate(time) AS date, * FROM ... -If your source data doesn't have any date or time, you may just pass any constant for date column while loading. - -Next parameter is optional sampling expression. Sampling expression is used to implement SAMPLE clause in query for approximate query execution. -If you don't need approximate query execution, simply omit this parameter. -Sample expression must be one of elements of primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID). - -Next parameter is primary key tuple. It's like (CounterID, EventDate, intHash64(UserID)) - list of column names or functional expressions in round brackets. If your primary key have just one element, you may omit round brackets. - -Careful choice of primary key is extremely important for processing short-time queries. - -Next parameter is index (primary key) granularity. Good value is 8192. You have no reasons to use any other value. - -For Collapsing mode, last parameter is name of sign column - special column that is used to 'collapse' rows with same primary key while merge. - -For Summing mode, last parameter is optional list of columns to sum while merge. List is passed in round brackets, like (PageViews, Cost). -If this parameter is omitted, storage will sum all numeric columns except columns participated in primary key. - -For Replacing mode, last parameter is optional name of 'version' column. While merging, for all rows with same primary key, only one row is selected: last row, if version column was not specified, or last row with maximum version value, if specified. - - -Examples: - -MergeTree(EventDate, (CounterID, EventDate), 8192) - -MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) - -CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign) - -SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192) - -SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192, (Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) - -ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) - - -For further info please read the documentation: https://clickhouse.yandex/ -)"; - String name_part = name.substr(0, name.size() - strlen("MergeTree")); bool replicated = startsWith(name_part, "Replicated"); @@ -738,6 +790,9 @@ For further info please read the documentation: https://clickhouse.yandex/ MergeTreeData::MergingParams merging_params; merging_params.mode = MergeTreeData::MergingParams::Ordinary; + const bool allow_extended_storage_def = + local_context.getSettingsRef().experimental_allow_extended_storage_definition_syntax; + if (name_part == "Collapsing") merging_params.mode = MergeTreeData::MergingParams::Collapsing; else if (name_part == "Summing") @@ -751,113 +806,138 @@ For further info please read the documentation: https://clickhouse.yandex/ else if (name_part == "Graphite") merging_params.mode = MergeTreeData::MergingParams::Graphite; else if (!name_part.empty()) - throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); - - ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; + throw Exception( + "Unknown storage " + name + getMergeTreeVerboseHelp(allow_extended_storage_def), + ErrorCodes::UNKNOWN_STORAGE); ASTs args; - - if (args_func.size() == 1) - args = typeid_cast(*args_func.at(0)).children; + if (engine_def.arguments) + args = engine_def.arguments->children; /// NOTE Quite complicated. - size_t num_additional_params = (replicated ? 2 : 0) - + (merging_params.mode == MergeTreeData::MergingParams::Collapsing) - + (merging_params.mode == MergeTreeData::MergingParams::Graphite); - String params_for_replicated; + bool is_extended_storage_def = + storage_def.partition_by || storage_def.order_by || storage_def.sample_by || storage_def.settings; + + if (is_extended_storage_def && !allow_extended_storage_def) + throw Exception( + "Extended storage definition syntax (PARTITION BY, ORDER BY, SAMPLE BY and SETTINGS clauses) " + "is disabled. Enable it with experimental_allow_extended_storage_definition_syntax user setting"); + + size_t min_num_params = 0; + size_t max_num_params = 0; + String needed_params; + + auto add_mandatory_param = [&](const char * desc) + { + ++min_num_params; + ++max_num_params; + needed_params += needed_params.empty() ? "\n" : ",\n"; + needed_params += desc; + }; + auto add_optional_param = [&](const char * desc) + { + ++max_num_params; + needed_params += needed_params.empty() ? "\n" : ",\n["; + needed_params += desc; + needed_params += "]"; + }; + if (replicated) - params_for_replicated = - "\npath in ZooKeeper," - "\nreplica name,"; - - if (merging_params.mode == MergeTreeData::MergingParams::Unsorted - && args.size() != num_additional_params + 2) { - String params = - "\nname of column with date," - "\nindex granularity\n"; - - throw Exception("Storage " + name + " requires " - + toString(num_additional_params + 2) + " parameters: " + params_for_replicated + params + verbose_help, - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + add_mandatory_param("path in ZooKeeper"); + add_mandatory_param("replica name"); } - if (merging_params.mode != MergeTreeData::MergingParams::Summing - && merging_params.mode != MergeTreeData::MergingParams::Replacing - && merging_params.mode != MergeTreeData::MergingParams::Unsorted - && args.size() != num_additional_params + 3 - && args.size() != num_additional_params + 4) + if (!is_extended_storage_def) { - String params = - "\nname of column with date," - "\n[sampling element of primary key]," - "\nprimary key expression," - "\nindex granularity\n"; - - if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) - params += ", sign column\n"; - - if (merging_params.mode == MergeTreeData::MergingParams::Graphite) - params += ", 'config_element_for_graphite_schema'\n"; - - throw Exception("Storage " + name + " requires " - + toString(num_additional_params + 3) + " or " - + toString(num_additional_params + 4) + " parameters: " + params_for_replicated + params + verbose_help, - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - } - - if ((merging_params.mode == MergeTreeData::MergingParams::Summing - || merging_params.mode == MergeTreeData::MergingParams::Replacing) - && args.size() != num_additional_params + 3 - && args.size() != num_additional_params + 4 - && args.size() != num_additional_params + 5) - { - String params = - "\nname of column with date," - "\n[sampling element of primary key]," - "\nprimary key expression," - "\nindex granularity,"; - - if (merging_params.mode == MergeTreeData::MergingParams::Summing) - params += "\n[list of columns to sum]\n"; + if (merging_params.mode == MergeTreeData::MergingParams::Unsorted) + { + if (args.size() == min_num_params && allow_extended_storage_def) + is_extended_storage_def = true; + else + { + add_mandatory_param("name of column with date"); + add_mandatory_param("index granularity"); + } + } else - params += "\n[version]\n"; + { + add_mandatory_param("name of column with date"); + add_optional_param("sampling element of primary key"); + add_mandatory_param("primary key expression"); + add_mandatory_param("index granularity"); + } + } - throw Exception("Storage " + name + " requires " - + toString(num_additional_params + 3) + " or " - + toString(num_additional_params + 4) + " or " - + toString(num_additional_params + 5) + " parameters: " + params_for_replicated + params + verbose_help, - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + switch (merging_params.mode) + { + default: + break; + case MergeTreeData::MergingParams::Summing: + add_optional_param("list of columns to sum"); + break; + case MergeTreeData::MergingParams::Replacing: + add_optional_param("version"); + break; + case MergeTreeData::MergingParams::Collapsing: + add_mandatory_param("sign column"); + break; + case MergeTreeData::MergingParams::Graphite: + add_mandatory_param("'config_element_for_graphite_schema'"); + break; + } + + if (args.size() < min_num_params || args.size() > max_num_params) + { + String msg; + if (is_extended_storage_def) + msg += "With extended storage definition syntax storage " + name + " requires "; + else + msg += "Storage " + name + " requires "; + + if (max_num_params) + { + if (min_num_params == max_num_params) + msg += toString(min_num_params) + " parameters: "; + else + msg += toString(min_num_params) + " to " + toString(max_num_params) + " parameters: "; + msg += needed_params; + } + else + msg += "no parameters"; + + msg += getMergeTreeVerboseHelp(is_extended_storage_def); + + throw Exception(msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } /// For Replicated. String zookeeper_path; String replica_name; - /// For all. - String date_column_name; - ASTPtr partition_expr_ast; - ASTPtr primary_expr_list; - ASTPtr sampling_expression; - UInt64 index_granularity; - if (replicated) { auto ast = typeid_cast(&*args[0]); if (ast && ast->value.getType() == Field::Types::String) zookeeper_path = safeGet(ast->value); else - throw Exception(String("Path in ZooKeeper must be a string literal") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + throw Exception( + "Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); ast = typeid_cast(&*args[1]); if (ast && ast->value.getType() == Field::Types::String) replica_name = safeGet(ast->value); else - throw Exception(String("Replica name must be a string literal") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + throw Exception( + "Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); if (replica_name.empty()) - throw Exception(String("No replica name in config") + verbose_help, ErrorCodes::NO_REPLICA_NAME_GIVEN); + throw Exception( + "No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::NO_REPLICA_NAME_GIVEN); args.erase(args.begin(), args.begin() + 2); } @@ -867,27 +947,31 @@ For further info please read the documentation: https://clickhouse.yandex/ if (auto ast = typeid_cast(&*args.back())) merging_params.sign_column = ast->name; else - throw Exception(String("Sign column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + throw Exception( + "Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); args.pop_back(); } else if (merging_params.mode == MergeTreeData::MergingParams::Replacing) { - /// If the last element is not an index granularity (literal), then this is the name of the version column. - if (!typeid_cast(&*args.back())) + /// If the last element is not index_granularity or replica_name (a literal), then this is the name of the version column. + if (!args.empty() && !typeid_cast(&*args.back())) { if (auto ast = typeid_cast(&*args.back())) merging_params.version_column = ast->name; else - throw Exception(String("Version column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + throw Exception( + "Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); args.pop_back(); } } else if (merging_params.mode == MergeTreeData::MergingParams::Summing) { - /// If the last element is not an index granularity (literal), then this is a list of summable columns. - if (!typeid_cast(&*args.back())) + /// If the last element is not index_granularity or replica_name (a literal), then this is a list of summable columns. + if (!args.empty() && !typeid_cast(&*args.back())) { merging_params.columns_to_sum = extractColumnNames(args.back()); args.pop_back(); @@ -897,7 +981,7 @@ For further info please read the documentation: https://clickhouse.yandex/ { String graphite_config_name; String error_msg = "Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options"; - error_msg += verbose_help; + error_msg += getMergeTreeVerboseHelp(is_extended_storage_def); if (auto ast = typeid_cast(&*args.back())) { @@ -913,47 +997,69 @@ For further info please read the documentation: https://clickhouse.yandex/ setGraphitePatternsFromConfig(context, graphite_config_name, merging_params.graphite_params); } - /// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity) - if (args.size() == 4) + String date_column_name; + ASTPtr partition_expr_list; + ASTPtr primary_expr_list; + ASTPtr sampling_expression; + MergeTreeSettings storage_settings = context.getMergeTreeSettings(); + + if (is_extended_storage_def) { - sampling_expression = args[1]; - args.erase(args.begin() + 1); + if (storage_def.partition_by) + partition_expr_list = extractKeyExpressionList(*storage_def.partition_by); + + if (storage_def.order_by) + primary_expr_list = extractKeyExpressionList(*storage_def.order_by); + + if (storage_def.sample_by) + sampling_expression = storage_def.sample_by->ptr(); + + storage_settings.loadFromQuery(storage_def); } - - /// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity. - - if (auto ast = typeid_cast(args[0].get())) - date_column_name = ast->name; - else if (local_context.getSettingsRef().experimental_merge_tree_allow_custom_partitions) - partition_expr_ast = extractKeyExpressionList(args[0]); else - throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + { + /// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity) + if (args.size() == 4) + { + sampling_expression = args[1]; + args.erase(args.begin() + 1); + } - if (merging_params.mode != MergeTreeData::MergingParams::Unsorted) - primary_expr_list = extractKeyExpressionList(args[1]); + /// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity. - auto ast = typeid_cast(&*args.back()); - if (ast && ast->value.getType() == Field::Types::UInt64) - index_granularity = safeGet(ast->value); - else - throw Exception(String("Index granularity must be a positive integer") + verbose_help, ErrorCodes::BAD_ARGUMENTS); + if (auto ast = typeid_cast(args[0].get())) + date_column_name = ast->name; + else + throw Exception( + "Date column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); + + if (merging_params.mode != MergeTreeData::MergingParams::Unsorted) + primary_expr_list = extractKeyExpressionList(*args[1]); + + auto ast = typeid_cast(&*args.back()); + if (ast && ast->value.getType() == Field::Types::UInt64) + storage_settings.index_granularity = safeGet(ast->value); + else + throw Exception( + "Index granularity must be a positive integer" + getMergeTreeVerboseHelp(is_extended_storage_def), + ErrorCodes::BAD_ARGUMENTS); + } if (replicated) return StorageReplicatedMergeTree::create( zookeeper_path, replica_name, attach, data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, - context, primary_expr_list, date_column_name, partition_expr_ast, - sampling_expression, index_granularity, merging_params, - has_force_restore_data_flag, - context.getMergeTreeSettings()); + context, primary_expr_list, date_column_name, partition_expr_list, + sampling_expression, merging_params, storage_settings, + has_force_restore_data_flag); else return StorageMergeTree::create( data_path, database_name, table_name, columns, materialized_columns, alias_columns, column_defaults, attach, - context, primary_expr_list, date_column_name, partition_expr_ast, - sampling_expression, index_granularity, merging_params, - has_force_restore_data_flag, - context.getMergeTreeSettings()); + context, primary_expr_list, date_column_name, partition_expr_list, + sampling_expression, merging_params, storage_settings, + has_force_restore_data_flag); } else throw Exception("Unknown storage " + name, ErrorCodes::UNKNOWN_STORAGE); diff --git a/dbms/src/Storages/StorageFactory.h b/dbms/src/Storages/StorageFactory.h index a63860ae56f..b19402d29fe 100644 --- a/dbms/src/Storages/StorageFactory.h +++ b/dbms/src/Storages/StorageFactory.h @@ -24,7 +24,7 @@ public: const String & database_name, Context & local_context, Context & context, - ASTPtr & query, + ASTCreateQuery & query, NamesAndTypesListPtr columns, const NamesAndTypesList & materialized_columns, const NamesAndTypesList & alias_columns, diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index c59b84125f8..3e95b75a5c4 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -57,7 +57,7 @@ StorageMaterializedView::StorageMaterializedView( const String & table_name_, const String & database_name_, Context & context_, - ASTPtr & query_, + const ASTCreateQuery & query, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, @@ -66,20 +66,13 @@ StorageMaterializedView::StorageMaterializedView( : IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_), database_name(database_name_), context(context_), columns(columns_) { - ASTCreateQuery & create = typeid_cast(*query_); - - if (!create.select) + if (!query.select) throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); - if (!create.inner_storage) + if (!query.inner_storage) throw Exception("ENGINE of MaterializedView should be specified explicitly", ErrorCodes::INCORRECT_QUERY); - ASTSelectQuery & select = typeid_cast(*create.select); - - /// If the internal query does not specify a database, retrieve it from the context and write it to the query. - select.setDatabaseIfNeeded(database_name); - - extractDependentTable(select, select_database_name, select_table_name); + extractDependentTable(*query.select, select_database_name, select_table_name); if (!select_table_name.empty()) context.getGlobalContext().addDependency( @@ -87,7 +80,7 @@ StorageMaterializedView::StorageMaterializedView( DatabaseAndTableName(database_name, table_name)); String inner_table_name = getInnerTableName(); - inner_query = create.select; + inner_query = query.select->ptr(); /// If there is an ATTACH request, then the internal table must already be connected. if (!attach_) @@ -96,10 +89,8 @@ StorageMaterializedView::StorageMaterializedView( auto manual_create_query = std::make_shared(); manual_create_query->database = database_name; manual_create_query->table = inner_table_name; - manual_create_query->columns = create.columns; - manual_create_query->children.push_back(manual_create_query->columns); - manual_create_query->storage = create.inner_storage; - manual_create_query->children.push_back(manual_create_query->storage); + manual_create_query->set(manual_create_query->columns, query.columns->ptr()); + manual_create_query->set(manual_create_query->storage, query.inner_storage->ptr()); /// Execute the query. try @@ -176,5 +167,4 @@ StoragePtr StorageMaterializedView::getInnerTable() const return context.getTable(database_name, getInnerTableName()); } - } diff --git a/dbms/src/Storages/StorageMaterializedView.h b/dbms/src/Storages/StorageMaterializedView.h index efc6927e26e..788a6e66150 100644 --- a/dbms/src/Storages/StorageMaterializedView.h +++ b/dbms/src/Storages/StorageMaterializedView.h @@ -58,7 +58,7 @@ private: const String & table_name_, const String & database_name_, Context & context_, - ASTPtr & query_, + const ASTCreateQuery & query, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 88e3ee10051..122e01bc515 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -44,10 +44,9 @@ StorageMergeTree::StorageMergeTree( const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported. - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag, - const MergeTreeSettings & settings_) + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), context(context_), background_pool(context_.getBackgroundPool()), @@ -55,7 +54,7 @@ StorageMergeTree::StorageMergeTree( full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name, partition_expr_ast_, - sampling_expression_, index_granularity_, merging_params_, + sampling_expression_, merging_params_, settings_, database_name_ + "." + table_name, false, attach), reader(data), writer(data), merger(data, context.getBackgroundPool()), log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")) @@ -198,9 +197,9 @@ void StorageMergeTree::alter( auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__); - IDatabase::ASTModifier engine_modifier; + IDatabase::ASTModifier storage_modifier; if (primary_key_is_modified) - engine_modifier = [&new_primary_key_ast] (ASTPtr & engine_ast) + storage_modifier = [&new_primary_key_ast] (IAST & ast) { auto tuple = std::make_shared(new_primary_key_ast->range); tuple->name = "tuple"; @@ -209,13 +208,14 @@ void StorageMergeTree::alter( /// Primary key is in the second place in table engine description and can be represented as a tuple. /// TODO: Not always in second place. If there is a sampling key, then the third one. Fix it. - typeid_cast(*typeid_cast(*engine_ast).arguments).children.at(1) = tuple; + auto & storage_ast = typeid_cast(ast); + typeid_cast(*storage_ast.engine->arguments).children.at(1) = tuple; }; context.getDatabase(database_name)->alterTable( context, table_name, new_columns, new_materialized_columns, new_alias_columns, new_column_defaults, - engine_modifier); + storage_modifier); materialized_columns = new_materialized_columns; alias_columns = new_alias_columns; diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 9fdcd1339df..f98425c12e5 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -119,7 +119,6 @@ private: * primary_expr_ast - expression for sorting; * date_column_name - if not empty, the name of the column with the date used for partitioning by month; otherwise, partition_expr_ast is used as the partitioning expression; - * index_granularity - fow how many rows one index value is written. */ StorageMergeTree( const String & path_, @@ -135,10 +134,9 @@ private: const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported. - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag, - const MergeTreeSettings & settings_); + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag); /** Determines what parts should be merged and merges it. * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 4ef15042226..a1f81341b72 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -188,10 +188,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag, - const MergeTreeSettings & settings_) + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), current_zookeeper(context.getZooKeeper()), database_name(database_name_), table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), @@ -201,7 +200,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name, partition_expr_ast_, - sampling_expression_, index_granularity_, merging_params_, + sampling_expression_, merging_params_, settings_, database_name_ + "." + table_name, true, attach, [this] (const std::string & name) { enqueuePartForCheck(name); }, [this] () { clearOldPartsAndRemoveFromZK(); }), @@ -311,18 +310,17 @@ StoragePtr StorageReplicatedMergeTree::create( const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag_, - const MergeTreeSettings & settings_) + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag_) { auto res = make_shared( zookeeper_path_, replica_name_, attach, path_, database_name_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name, partition_expr_ast_, - sampling_expression_, index_granularity_, - merging_params_, has_force_restore_data_flag_, settings_); + sampling_expression_, merging_params_, settings_, + has_force_restore_data_flag_); StoragePtr res_ptr = res; auto get_endpoint_holder = [&res](InterserverIOEndpointPtr endpoint) @@ -1096,7 +1094,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) do_fetch = true; LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead"); } - else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold <= time(nullptr)) + else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) { /// If entry is old enough, and have enough size, and part are exists in any replica, /// then prefer fetching of merged part from replica. @@ -1239,7 +1237,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) static std::atomic_uint total_fetches {0}; if (data.settings.replicated_max_parallel_fetches && total_fetches >= data.settings.replicated_max_parallel_fetches) { - throw Exception("Too much total fetches from replicas, maximum: " + toString(data.settings.replicated_max_parallel_fetches), + throw Exception("Too much total fetches from replicas, maximum: " + data.settings.replicated_max_parallel_fetches.toString(), ErrorCodes::TOO_MUCH_FETCHES); } @@ -1248,7 +1246,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) if (data.settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= data.settings.replicated_max_parallel_fetches_for_table) { - throw Exception("Too much fetches from replicas for table, maximum: " + toString(data.settings.replicated_max_parallel_fetches_for_table), + throw Exception("Too much fetches from replicas for table, maximum: " + data.settings.replicated_max_parallel_fetches_for_table.toString(), ErrorCodes::TOO_MUCH_FETCHES); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 33b8658a8d6..f01a379a8a6 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -91,10 +91,9 @@ public: const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported. - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag, - const MergeTreeSettings & settings_); + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag); void startup() override; void shutdown() override; @@ -339,10 +338,9 @@ private: const String & date_column_name, const ASTPtr & partition_expr_ast_, const ASTPtr & sampling_expression_, - size_t index_granularity_, const MergeTreeData::MergingParams & merging_params_, - bool has_force_restore_data_flag, - const MergeTreeSettings & settings_); + const MergeTreeSettings & settings_, + bool has_force_restore_data_flag); /// Initialization. diff --git a/dbms/src/Storages/StorageView.cpp b/dbms/src/Storages/StorageView.cpp index c232fa6d709..3cf5b392098 100644 --- a/dbms/src/Storages/StorageView.cpp +++ b/dbms/src/Storages/StorageView.cpp @@ -12,7 +12,7 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; + extern const int INCORRECT_QUERY; } @@ -20,7 +20,7 @@ StorageView::StorageView( const String & table_name_, const String & database_name_, Context & context_, - ASTPtr & query_, + const ASTCreateQuery & query, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, @@ -28,13 +28,10 @@ StorageView::StorageView( : IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_), database_name(database_name_), context(context_), columns(columns_) { - ASTCreateQuery & create = typeid_cast(*query_); - ASTSelectQuery & select = typeid_cast(*create.select); + if (!query.select) + throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); - /// If the internal query does not specify a database, retrieve it from the context and write it to the query. - select.setDatabaseIfNeeded(database_name); - - inner_query = create.select; + inner_query = query.select->ptr(); } diff --git a/dbms/src/Storages/StorageView.h b/dbms/src/Storages/StorageView.h index 31b332da495..cffac728aff 100644 --- a/dbms/src/Storages/StorageView.h +++ b/dbms/src/Storages/StorageView.h @@ -47,7 +47,7 @@ private: const String & table_name_, const String & database_name_, Context & context_, - ASTPtr & query_, + const ASTCreateQuery & query, NamesAndTypesListPtr columns_, const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & alias_columns_, diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index e1e568db1d1..30211f6a691 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -209,7 +209,7 @@ BlockInputStreams StorageSystemParts::read( } block.getByPosition(i++).column->insert(part->name); block.getByPosition(i++).column->insert(static_cast(active_parts.count(part))); - block.getByPosition(i++).column->insert(static_cast(part->size)); + block.getByPosition(i++).column->insert(static_cast(part->marks_count)); size_t marks_size = 0; for (const NameAndTypePair & it : part->columns) @@ -221,7 +221,7 @@ BlockInputStreams StorageSystemParts::read( } block.getByPosition(i++).column->insert(static_cast(marks_size)); - block.getByPosition(i++).column->insert(static_cast(part->getExactSizeRows())); + block.getByPosition(i++).column->insert(static_cast(part->rows_count)); block.getByPosition(i++).column->insert(static_cast(part->size_in_bytes)); block.getByPosition(i++).column->insert(static_cast(part->modification_time)); block.getByPosition(i++).column->insert(static_cast(part->remove_time)); diff --git a/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.reference b/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.reference index 57de310b212..9efd2d8a965 100644 --- a/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.reference +++ b/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.reference @@ -48,3 +48,15 @@ Sum before DROP PARTITION: 15 Sum after DROP PARTITION: 8 +*** Table without columns with fixed size *** +Parts: +1 1_1_1_0 2 +2 2_2_2_0 2 +Before DROP PARTITION: +a +aa +b +cc +After DROP PARTITION: +aa +cc diff --git a/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.sql b/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.sql index 81c4b0171ee..61882a6a3b5 100644 --- a/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.sql +++ b/dbms/tests/queries/0_stateless/00502_custom_partitioning_local.sql @@ -1,12 +1,9 @@ --- IMPORTANT: Don't use this setting just yet. --- It is for testing purposes, the syntax will likely change soon and the server will not be able --- to load the tables created this way. You have been warned. -SET experimental_merge_tree_allow_custom_partitions = 1; +SET experimental_allow_extended_storage_definition_syntax = 1; SELECT '*** Not partitioned ***'; DROP TABLE IF EXISTS test.not_partitioned; -CREATE TABLE test.not_partitioned(x UInt8) ENGINE = MergeTree(tuple(), x, 8192); +CREATE TABLE test.not_partitioned(x UInt8) ENGINE MergeTree ORDER BY x; INSERT INTO test.not_partitioned VALUES (1), (2), (3); INSERT INTO test.not_partitioned VALUES (4), (5); @@ -28,7 +25,7 @@ DROP TABLE test.not_partitioned; SELECT '*** Partitioned by week ***'; DROP TABLE IF EXISTS test.partitioned_by_week; -CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree(toMonday(d), x, 8192); +CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree PARTITION BY toMonday(d) ORDER BY x; -- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02 INSERT INTO test.partitioned_by_week VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3); @@ -51,7 +48,7 @@ DROP TABLE test.partitioned_by_week; SELECT '*** Partitioned by a (Date, UInt8) tuple ***'; DROP TABLE IF EXISTS test.partitioned_by_tuple; -CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE = MergeTree((d, x), x, 8192); +CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE MergeTree ORDER BY x PARTITION BY (d, x); INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3); INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5); @@ -74,7 +71,7 @@ DROP TABLE test.partitioned_by_tuple; SELECT '*** Partitioned by String ***'; DROP TABLE IF EXISTS test.partitioned_by_string; -CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree(tuple(s), x, 8192); +CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree PARTITION BY s ORDER BY x; INSERT INTO test.partitioned_by_string VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3); INSERT INTO test.partitioned_by_string VALUES ('bbb', 4), ('aaa', 5); @@ -92,3 +89,21 @@ SELECT 'Sum after DROP PARTITION:'; SELECT sum(x) FROM test.partitioned_by_string; DROP TABLE test.partitioned_by_string; + +SELECT '*** Table without columns with fixed size ***'; + +DROP TABLE IF EXISTS test.without_fixed_size_columns; +CREATE TABLE test.without_fixed_size_columns(s String) ENGINE MergeTree PARTITION BY length(s) ORDER BY s; + +INSERT INTO test.without_fixed_size_columns VALUES ('a'), ('aa'), ('b'), ('cc'); + +SELECT 'Parts:'; +SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns' AND active ORDER BY name; + +SELECT 'Before DROP PARTITION:'; +SELECT * FROM test.without_fixed_size_columns ORDER BY s; +ALTER TABLE test.without_fixed_size_columns DROP PARTITION 1; +SELECT 'After DROP PARTITION:'; +SELECT * FROM test.without_fixed_size_columns ORDER BY s; + +DROP TABLE test.without_fixed_size_columns; diff --git a/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.reference b/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.reference index c4ba61ce205..ce412811db2 100644 --- a/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.reference +++ b/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.reference @@ -48,3 +48,15 @@ Sum before DROP PARTITION: 15 Sum after DROP PARTITION: 8 +*** Table without columns with fixed size *** +Parts: +1 1_0_0_1 2 +2 2_0_0_0 2 +Before DROP PARTITION: +a +aa +b +cc +After DROP PARTITION: +aa +cc diff --git a/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.sql b/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.sql index 022b7047cec..e301dcde0d3 100644 --- a/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.sql +++ b/dbms/tests/queries/0_stateless/00502_custom_partitioning_replicated_zookeeper.sql @@ -1,7 +1,4 @@ --- IMPORTANT: Don't use this setting just yet. --- It is for testing purposes, the syntax will likely change soon and the server will not be able --- to load the tables created this way. You have been warned. -SET experimental_merge_tree_allow_custom_partitions = 1; +SET experimental_allow_extended_storage_definition_syntax = 1; SET replication_alter_partitions_sync = 2; @@ -9,8 +6,8 @@ SELECT '*** Not partitioned ***'; DROP TABLE IF EXISTS test.not_partitioned_replica1; DROP TABLE IF EXISTS test.not_partitioned_replica2; -CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1', tuple(), x, 8192); -CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2', tuple(), x, 8192); +CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1') ORDER BY x; +CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2') ORDER BY x; INSERT INTO test.not_partitioned_replica1 VALUES (1), (2), (3); INSERT INTO test.not_partitioned_replica1 VALUES (4), (5); @@ -34,8 +31,8 @@ SELECT '*** Partitioned by week ***'; DROP TABLE IF EXISTS test.partitioned_by_week_replica1; DROP TABLE IF EXISTS test.partitioned_by_week_replica2; -CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1', toMonday(d), x, 8192); -CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2', toMonday(d), x, 8192); +CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1') PARTITION BY toMonday(d) ORDER BY x; +CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2') PARTITION BY toMonday(d) ORDER BY x; -- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02 INSERT INTO test.partitioned_by_week_replica1 VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3); @@ -60,8 +57,8 @@ SELECT '*** Partitioned by a (Date, UInt8) tuple ***'; DROP TABLE IF EXISTS test.partitioned_by_tuple_replica1; DROP TABLE IF EXISTS test.partitioned_by_tuple_replica2; -CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1', (d, x), x, 8192); -CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2', (d, x), x, 8192); +CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1') ORDER BY x PARTITION BY (d, x); +CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2') ORDER BY x PARTITION BY (d, x); INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3); INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5); @@ -86,8 +83,8 @@ SELECT '*** Partitioned by String ***'; DROP TABLE IF EXISTS test.partitioned_by_string_replica1; DROP TABLE IF EXISTS test.partitioned_by_string_replica2; -CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1', tuple(s), x, 8192); -CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2', tuple(s), x, 8192); +CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1') PARTITION BY s ORDER BY x; +CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2') PARTITION BY s ORDER BY x; INSERT INTO test.partitioned_by_string_replica1 VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3); INSERT INTO test.partitioned_by_string_replica1 VALUES ('bbb', 4), ('aaa', 5); @@ -106,3 +103,26 @@ SELECT sum(x) FROM test.partitioned_by_string_replica2; DROP TABLE test.partitioned_by_string_replica1; DROP TABLE test.partitioned_by_string_replica2; + +SELECT '*** Table without columns with fixed size ***'; + +DROP TABLE IF EXISTS test.without_fixed_size_columns_replica1; +DROP TABLE IF EXISTS test.without_fixed_size_columns_replica2; +CREATE TABLE test.without_fixed_size_columns_replica1(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '1') PARTITION BY length(s) ORDER BY s; +CREATE TABLE test.without_fixed_size_columns_replica2(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '2') PARTITION BY length(s) ORDER BY s; + +INSERT INTO test.without_fixed_size_columns_replica1 VALUES ('a'), ('aa'), ('b'), ('cc'); + +OPTIMIZE TABLE test.without_fixed_size_columns_replica2 PARTITION 1 FINAL; -- Wait for replication. + +SELECT 'Parts:'; +SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns_replica2' AND active ORDER BY name; + +SELECT 'Before DROP PARTITION:'; +SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s; +ALTER TABLE test.without_fixed_size_columns_replica1 DROP PARTITION 1; +SELECT 'After DROP PARTITION:'; +SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s; + +DROP TABLE test.without_fixed_size_columns_replica1; +DROP TABLE test.without_fixed_size_columns_replica2; diff --git a/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.reference b/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.reference new file mode 100644 index 00000000000..0f8b6de9166 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.reference @@ -0,0 +1,12 @@ +*** Without PARTITION BY and ORDER BY *** +1 +2 +*** Replicated with sampling *** +1 +*** Replacing with implicit version *** +2017-10-23 1 c +*** Replicated Collapsing *** +2017-10-23 2 1 +*** Table definition with SETTINGS *** +0 +0 diff --git a/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql b/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql new file mode 100644 index 00000000000..a72b2b6984a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql @@ -0,0 +1,73 @@ +SET experimental_allow_extended_storage_definition_syntax = 1; + +SELECT '*** Without PARTITION BY and ORDER BY ***'; + +DROP TABLE IF EXISTS test.unsorted; + +CREATE TABLE test.unsorted(x UInt32) ENGINE UnsortedMergeTree; +INSERT INTO test.unsorted VALUES (1), (2); +SELECT * FROM test.unsorted; + +DROP TABLE test.unsorted; + +SELECT '*** Replicated with sampling ***'; + +DROP TABLE IF EXISTS test.replicated_with_sampling; + +CREATE TABLE test.replicated_with_sampling(x UInt8) + ENGINE ReplicatedMergeTree('/clickhouse/tables/test/replicated_with_sampling', 'r1') + ORDER BY x + SAMPLE BY x; + +INSERT INTO test.replicated_with_sampling VALUES (1), (128); +SELECT sum(x) FROM test.replicated_with_sampling SAMPLE 1/2; + +DROP TABLE test.replicated_with_sampling; + +SELECT '*** Replacing with implicit version ***'; + +DROP TABLE IF EXISTS test.replacing; + +CREATE TABLE test.replacing(d Date, x UInt32, s String) ENGINE = ReplacingMergeTree ORDER BY x PARTITION BY d; + +INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'a'); +INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'b'); +INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'c'); + +OPTIMIZE TABLE test.replacing PARTITION '2017-10-23' FINAL; + +SELECT * FROM test.replacing; + +DROP TABLE test.replacing; + +SELECT '*** Replicated Collapsing ***'; + +DROP TABLE IF EXISTS test.replicated_collapsing; + +CREATE TABLE test.replicated_collapsing(d Date, x UInt32, sign Int8) + ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/test/replicated_collapsing', 'r1', sign) + PARTITION BY toYYYYMM(d) ORDER BY d; + +INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, 1); +INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, -1), ('2017-10-23', 2, 1); + +OPTIMIZE TABLE test.replicated_collapsing PARTITION 201710 FINAL; + +SELECT * FROM test.replicated_collapsing; + +DROP TABLE test.replicated_collapsing; + +SELECT '*** Table definition with SETTINGS ***'; + +DROP TABLE IF EXISTS test.with_settings; + +CREATE TABLE test.with_settings(x UInt32) + ENGINE ReplicatedMergeTree('/clickhouse/tables/test/with_settings', 'r1') + ORDER BY x + SETTINGS replicated_can_become_leader = 0; + +SELECT sleep(1); -- If replicated_can_become_leader were true, this replica would become the leader after 1 second. + +SELECT is_leader FROM system.replicas WHERE database = 'test' AND table = 'with_settings'; + +DROP TABLE test.with_settings;