diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index ea6c782ebb4..f1d8355cc5b 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -280,6 +280,10 @@ M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\ \ M(MainConfigLoads, "Number of times the main configuration was reloaded.") \ + \ + M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ + M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ + M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") namespace ProfileEvents { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 1c71ab2cd6f..0e1d40d16a1 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -867,6 +867,9 @@ const Block & Context::getScalar(const String & name) const auto it = scalars.find(name); if (scalars.end() == it) { + it = local_scalars.find(name); + if (it != local_scalars.end()) + return it->second; // This should be a logical error, but it fails the sql_fuzz test too // often, so 'bad arguments' for now. throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::BAD_ARGUMENTS); @@ -962,7 +965,7 @@ bool Context::hasScalar(const String & name) const if (isGlobalContext()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars"); - return scalars.count(name); + return scalars.count(name) || local_scalars.count(name); } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 6b0a4671efb..abcc07a6229 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -217,6 +217,8 @@ private: /// Thus, used in HTTP interface. If not specified - then some globally default format is used. TemporaryTablesMapping external_tables_mapping; Scalars scalars; + /// Includes special scalars (_shard_num and _shard_count) but also scalars that aren't cacheable between queries / contexts + /// because they use storage views (like in MVs) Scalars local_scalars; /// Used in s3Cluster table function. With this callback, a worker node could ask an initiator diff --git a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp index 2117eec0063..03e8aec1c41 100644 --- a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp +++ b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp @@ -1,9 +1,9 @@ #include -#include #include -#include +#include #include +#include #include #include #include @@ -18,7 +18,14 @@ #include #include #include +#include +namespace ProfileEvents +{ +extern const Event ScalarSubqueriesGlobalCacheHit; +extern const Event ScalarSubqueriesLocalCacheHit; +extern const Event ScalarSubqueriesCacheMiss; +} namespace DB { @@ -77,22 +84,39 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr auto hash = subquery.getTreeHash(); auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second); + bool is_local = false; Block scalar; if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str)) { scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str); + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); + } + else if (data.local_scalars.count(scalar_query_hash_str)) + { + scalar = data.local_scalars[scalar_query_hash_str]; + is_local = true; + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit); } else if (data.scalars.count(scalar_query_hash_str)) { scalar = data.scalars[scalar_query_hash_str]; + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit); } else { + ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss); auto subquery_context = Context::createCopy(data.getContext()); Settings subquery_settings = data.getContext()->getSettings(); subquery_settings.max_result_rows = 1; subquery_settings.extremes = false; subquery_context->setSettings(subquery_settings); + if (auto context = subquery_context->getQueryContext()) + { + for (const auto & it : data.scalars) + context->addScalar(it.first, it.second); + for (const auto & it : data.local_scalars) + context->addScalar(it.first, it.second); + } ASTPtr subquery_select = subquery.children.at(0); @@ -218,7 +242,10 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr ast = std::move(func); } - data.scalars[scalar_query_hash_str] = std::move(scalar); + if (is_local) + data.local_scalars[scalar_query_hash_str] = std::move(scalar); + else + data.scalars[scalar_query_hash_str] = std::move(scalar); } void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data) diff --git a/src/Interpreters/ExecuteScalarSubqueriesVisitor.h b/src/Interpreters/ExecuteScalarSubqueriesVisitor.h index c230f346779..f42d3834c19 100644 --- a/src/Interpreters/ExecuteScalarSubqueriesVisitor.h +++ b/src/Interpreters/ExecuteScalarSubqueriesVisitor.h @@ -37,6 +37,7 @@ public: { size_t subquery_depth; Scalars & scalars; + Scalars & local_scalars; bool only_analyze; }; diff --git a/src/Interpreters/IInterpreterUnionOrSelectQuery.h b/src/Interpreters/IInterpreterUnionOrSelectQuery.h index db9cc086e35..1265a52d370 100644 --- a/src/Interpreters/IInterpreterUnionOrSelectQuery.h +++ b/src/Interpreters/IInterpreterUnionOrSelectQuery.h @@ -40,6 +40,8 @@ public: void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override; + bool usesViewSource() { return uses_view_source; } + protected: ASTPtr query_ptr; ContextMutablePtr context; @@ -48,6 +50,7 @@ protected: size_t max_streams = 1; bool settings_limit_offset_needed = false; bool settings_limit_offset_done = false; + bool uses_view_source = false; }; } diff --git a/src/Interpreters/InterpreterSelectIntersectExceptQuery.cpp b/src/Interpreters/InterpreterSelectIntersectExceptQuery.cpp index 3bb78b57702..cad570ab420 100644 --- a/src/Interpreters/InterpreterSelectIntersectExceptQuery.cpp +++ b/src/Interpreters/InterpreterSelectIntersectExceptQuery.cpp @@ -68,7 +68,10 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery( nested_interpreters.resize(num_children); for (size_t i = 0; i < num_children; ++i) + { nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i)); + uses_view_source |= nested_interpreters[i]->usesViewSource(); + } Blocks headers(num_children); for (size_t query_num = 0; query_num < num_children; ++query_num) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 8e0f73f0b31..593fc8ccd35 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -313,7 +313,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( bool got_storage_from_query = false; if (!has_input && !storage) { - storage = joined_tables.getLeftTableStorage(); + std::tie(uses_view_source, storage) = joined_tables.getLeftTableStorage(); got_storage_from_query = true; } @@ -388,9 +388,8 @@ InterpreterSelectQuery::InterpreterSelectQuery( query.setFinal(); /// Save scalar sub queries's results in the query context - /// But discard them if the Storage has been modified - /// In an ideal situation we would only discard the scalars affected by the storage change - if (!options.only_analyze && context->hasQueryContext() && !context->getViewSource()) + /// Note that we are only saving scalars and not local_scalars since the latter can't be safely shared across contexts + if (!options.only_analyze && context->hasQueryContext()) for (const auto & it : syntax_analyzer_result->getScalars()) context->getQueryContext()->addScalar(it.first, it.second); diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index e4b3e62c358..e0fc15771f9 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -138,6 +138,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery( nested_interpreters.emplace_back( buildCurrentChildInterpreter(ast->list_of_selects->children.at(query_num), require_full_header ? Names() : current_required_result_column_names)); + uses_view_source |= nested_interpreters.back()->usesViewSource(); } /// Determine structure of the result. diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp index 3aae3982758..743aba91571 100644 --- a/src/Interpreters/JoinedTables.cpp +++ b/src/Interpreters/JoinedTables.cpp @@ -186,13 +186,13 @@ std::unique_ptr JoinedTables::makeLeftTableSubq return std::make_unique(left_table_expression, context, select_options); } -StoragePtr JoinedTables::getLeftTableStorage() +JoinedTables::storage_is_view_source JoinedTables::getLeftTableStorage() { if (isLeftTableSubquery()) return {}; if (isLeftTableFunction()) - return context->getQueryContext()->executeTableFunction(left_table_expression); + return {false, context->getQueryContext()->executeTableFunction(left_table_expression)}; StorageID table_id = StorageID::createEmpty(); if (left_db_and_table) @@ -204,19 +204,20 @@ StoragePtr JoinedTables::getLeftTableStorage() table_id = StorageID("system", "one"); } - if (auto view_source = context->getViewSource()) + auto view_source = context->getViewSource(); + if (view_source) { const auto & storage_values = static_cast(*view_source); auto tmp_table_id = storage_values.getStorageID(); if (tmp_table_id.database_name == table_id.database_name && tmp_table_id.table_name == table_id.table_name) { /// Read from view source. - return context->getViewSource(); + return {true, view_source}; } } /// Read from table. Even without table expression (implicit SELECT ... FROM system.one). - return DatabaseCatalog::instance().getTable(table_id, context); + return {false, DatabaseCatalog::instance().getTable(table_id, context)}; } bool JoinedTables::resolveTables() diff --git a/src/Interpreters/JoinedTables.h b/src/Interpreters/JoinedTables.h index 9d01c081e9f..0c2119f0016 100644 --- a/src/Interpreters/JoinedTables.h +++ b/src/Interpreters/JoinedTables.h @@ -7,6 +7,8 @@ #include #include +#include + namespace DB { @@ -22,11 +24,13 @@ using StorageMetadataPtr = std::shared_ptr; class JoinedTables { public: + using storage_is_view_source = std::pair; + JoinedTables(ContextPtr context, const ASTSelectQuery & select_query, bool include_all_columns_ = false); void reset(const ASTSelectQuery & select_query); - StoragePtr getLeftTableStorage(); + JoinedTables::storage_is_view_source getLeftTableStorage(); bool resolveTables(); /// Make fake tables_with_columns[0] in case we have predefined input in InterpreterSelectQuery diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index 0285bdf333c..3f5f21cc9f1 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -479,10 +479,11 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, } /// Replacing scalar subqueries with constant values. -void executeScalarSubqueries(ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, bool only_analyze) +void executeScalarSubqueries( + ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, Scalars & local_scalars, bool only_analyze) { LogAST log; - ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, only_analyze}; + ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, local_scalars, only_analyze}; ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query); } @@ -1112,7 +1113,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect( removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates); /// Executing scalar subqueries - replacing them with constant values. - executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze); + executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, result.local_scalars, select_options.only_analyze); if (settings.legacy_column_name_of_tuple_literal) markTupleLiteralsAsLegacy(query); @@ -1195,7 +1196,7 @@ TreeRewriterResultPtr TreeRewriter::analyze( normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases); /// Executing scalar subqueries. Column defaults could be a scalar subquery. - executeScalarSubqueries(query, getContext(), 0, result.scalars, !execute_scalar_subqueries); + executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries); if (settings.legacy_column_name_of_tuple_literal) markTupleLiteralsAsLegacy(query); diff --git a/src/Interpreters/TreeRewriter.h b/src/Interpreters/TreeRewriter.h index 52c62cc4cec..7becd3f94bc 100644 --- a/src/Interpreters/TreeRewriter.h +++ b/src/Interpreters/TreeRewriter.h @@ -72,6 +72,7 @@ struct TreeRewriterResult /// Results of scalar sub queries Scalars scalars; + Scalars local_scalars; explicit TreeRewriterResult( const NamesAndTypesList & source_columns_, diff --git a/src/Parsers/ASTSelectQuery.cpp b/src/Parsers/ASTSelectQuery.cpp index 1c5a4310f1b..7b473812915 100644 --- a/src/Parsers/ASTSelectQuery.cpp +++ b/src/Parsers/ASTSelectQuery.cpp @@ -22,38 +22,20 @@ namespace ErrorCodes ASTPtr ASTSelectQuery::clone() const { auto res = std::make_shared(*this); + + /** NOTE Members must clone exactly in the same order in which they were inserted into `children` in ParserSelectQuery. + * This is important because the AST hash depends on the children order and this hash is used for multiple things, + * like the column identifiers in the case of subqueries in the IN statement or caching scalar queries (reused in CTEs so it's + * important for them to have the same hash). + * For distributed query processing, in case one of the servers is localhost and the other one is not, localhost query is executed + * within the process and is cloned, and the request is sent to the remote server in text form via TCP. + * And if the cloning order does not match the parsing order then different servers will get different identifiers. + * + * Since the positions map uses we can copy it as is and ensure the new children array is created / pushed + * in the same order as the existing one */ res->children.clear(); - res->positions.clear(); - -#define CLONE(expr) res->setExpression(expr, getExpression(expr, true)) - - /** NOTE Members must clone exactly in the same order, - * in which they were inserted into `children` in ParserSelectQuery. - * This is important because of the children's names the identifier (getTreeHash) is compiled, - * which can be used for column identifiers in the case of subqueries in the IN statement. - * For distributed query processing, in case one of the servers is localhost and the other one is not, - * localhost query is executed within the process and is cloned, - * and the request is sent to the remote server in text form via TCP. - * And if the cloning order does not match the parsing order, - * then different servers will get different identifiers. - */ - CLONE(Expression::WITH); - CLONE(Expression::SELECT); - CLONE(Expression::TABLES); - CLONE(Expression::PREWHERE); - CLONE(Expression::WHERE); - CLONE(Expression::GROUP_BY); - CLONE(Expression::HAVING); - CLONE(Expression::WINDOW); - CLONE(Expression::ORDER_BY); - CLONE(Expression::LIMIT_BY_OFFSET); - CLONE(Expression::LIMIT_BY_LENGTH); - CLONE(Expression::LIMIT_BY); - CLONE(Expression::LIMIT_OFFSET); - CLONE(Expression::LIMIT_LENGTH); - CLONE(Expression::SETTINGS); - -#undef CLONE + for (auto & child : children) + res->children.push_back(child->clone()); return res; } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index bcb12cc86b0..6efb5f3daf9 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -721,7 +721,8 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer if (joined_tables.tablesCount() == 1) { - storage_src = std::dynamic_pointer_cast(joined_tables.getLeftTableStorage()); + auto [ignore, storage] = joined_tables.getLeftTableStorage(); + storage_src = std::dynamic_pointer_cast(storage); if (storage_src) { const auto select_with_union_query = std::make_shared(); diff --git a/tests/queries/0_stateless/02177_cte_scalar_cache.reference b/tests/queries/0_stateless/02177_cte_scalar_cache.reference new file mode 100644 index 00000000000..88456b1e7ea --- /dev/null +++ b/tests/queries/0_stateless/02177_cte_scalar_cache.reference @@ -0,0 +1,2 @@ +02177_CTE_GLOBAL_ON 5 500 11 0 5 +02177_CTE_GLOBAL_OFF 1 100 5 0 1 diff --git a/tests/queries/0_stateless/02177_cte_scalar_cache.sql b/tests/queries/0_stateless/02177_cte_scalar_cache.sql new file mode 100644 index 00000000000..39a1e0d965a --- /dev/null +++ b/tests/queries/0_stateless/02177_cte_scalar_cache.sql @@ -0,0 +1,48 @@ +WITH + ( SELECT sleep(0.0001) FROM system.one ) as a1, + ( SELECT sleep(0.0001) FROM system.one ) as a2, + ( SELECT sleep(0.0001) FROM system.one ) as a3, + ( SELECT sleep(0.0001) FROM system.one ) as a4, + ( SELECT sleep(0.0001) FROM system.one ) as a5 +SELECT '02177_CTE_GLOBAL_ON', a5 +FORMAT Null +SETTINGS enable_global_with_statement = 1; + +WITH + ( SELECT sleep(0.0001) FROM system.one ) as a1, + ( SELECT sleep(0.0001) FROM system.one ) as a2, + ( SELECT sleep(0.0001) FROM system.one ) as a3, + ( SELECT sleep(0.0001) FROM system.one ) as a4, + ( SELECT sleep(0.0001) FROM system.one ) as a5 +SELECT '02177_CTE_GLOBAL_OFF', a5 + FORMAT Null +SETTINGS enable_global_with_statement = 0; + +SYSTEM FLUSH LOGS; +SELECT + '02177_CTE_GLOBAL_ON', + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds, + ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit, + ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit, + ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss +FROM system.query_log +WHERE + current_database = currentDatabase() + AND type = 'QueryFinish' + AND query LIKE '%SELECT ''02177_CTE_GLOBAL_ON%' + AND event_date >= yesterday() AND event_time > now() - interval 10 minute; + +SELECT + '02177_CTE_GLOBAL_OFF', + ProfileEvents['SleepFunctionCalls'] as sleep_calls, + ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds, + ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit, + ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit, + ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss +FROM system.query_log +WHERE + current_database = currentDatabase() + AND type = 'QueryFinish' + AND query LIKE '%02177_CTE_GLOBAL_OFF%' + AND event_date >= yesterday() AND event_time > now() - interval 10 minute;