Better scalar cache handling

- Fixes global CTE scalar cache.
- Adds MVs back (views dependent on the source are cached locally and others globally
This commit is contained in:
Raúl Marín 2022-01-17 19:32:55 +01:00
parent 662444fe13
commit 4b5ab80e3b
17 changed files with 133 additions and 50 deletions

View File

@ -280,6 +280,10 @@
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
\
M(MainConfigLoads, "Number of times the main configuration was reloaded.") \
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
namespace ProfileEvents
{

View File

@ -867,6 +867,9 @@ const Block & Context::getScalar(const String & name) const
auto it = scalars.find(name);
if (scalars.end() == it)
{
it = local_scalars.find(name);
if (it != local_scalars.end())
return it->second;
// This should be a logical error, but it fails the sql_fuzz test too
// often, so 'bad arguments' for now.
throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::BAD_ARGUMENTS);
@ -962,7 +965,7 @@ bool Context::hasScalar(const String & name) const
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars");
return scalars.count(name);
return scalars.count(name) || local_scalars.count(name);
}

View File

@ -217,6 +217,8 @@ private:
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
TemporaryTablesMapping external_tables_mapping;
Scalars scalars;
/// Includes special scalars (_shard_num and _shard_count) but also scalars that aren't cacheable between queries / contexts
/// because they use storage views (like in MVs)
Scalars local_scalars;
/// Used in s3Cluster table function. With this callback, a worker node could ask an initiator

View File

@ -1,9 +1,9 @@
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -18,7 +18,14 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ScalarSubqueriesGlobalCacheHit;
extern const Event ScalarSubqueriesLocalCacheHit;
extern const Event ScalarSubqueriesCacheMiss;
}
namespace DB
{
@ -77,22 +84,39 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
auto hash = subquery.getTreeHash();
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
bool is_local = false;
Block scalar;
if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str))
{
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
}
else if (data.local_scalars.count(scalar_query_hash_str))
{
scalar = data.local_scalars[scalar_query_hash_str];
is_local = true;
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit);
}
else if (data.scalars.count(scalar_query_hash_str))
{
scalar = data.scalars[scalar_query_hash_str];
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
}
else
{
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
auto subquery_context = Context::createCopy(data.getContext());
Settings subquery_settings = data.getContext()->getSettings();
subquery_settings.max_result_rows = 1;
subquery_settings.extremes = false;
subquery_context->setSettings(subquery_settings);
if (auto context = subquery_context->getQueryContext())
{
for (const auto & it : data.scalars)
context->addScalar(it.first, it.second);
for (const auto & it : data.local_scalars)
context->addScalar(it.first, it.second);
}
ASTPtr subquery_select = subquery.children.at(0);
@ -218,7 +242,10 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
ast = std::move(func);
}
data.scalars[scalar_query_hash_str] = std::move(scalar);
if (is_local)
data.local_scalars[scalar_query_hash_str] = std::move(scalar);
else
data.scalars[scalar_query_hash_str] = std::move(scalar);
}
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)

View File

@ -37,6 +37,7 @@ public:
{
size_t subquery_depth;
Scalars & scalars;
Scalars & local_scalars;
bool only_analyze;
};

View File

@ -40,6 +40,8 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
bool usesViewSource() { return uses_view_source; }
protected:
ASTPtr query_ptr;
ContextMutablePtr context;
@ -48,6 +50,7 @@ protected:
size_t max_streams = 1;
bool settings_limit_offset_needed = false;
bool settings_limit_offset_done = false;
bool uses_view_source = false;
};
}

View File

@ -68,7 +68,10 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
nested_interpreters.resize(num_children);
for (size_t i = 0; i < num_children; ++i)
{
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
uses_view_source |= nested_interpreters[i]->usesViewSource();
}
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)

View File

@ -313,7 +313,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
bool got_storage_from_query = false;
if (!has_input && !storage)
{
storage = joined_tables.getLeftTableStorage();
std::tie(uses_view_source, storage) = joined_tables.getLeftTableStorage();
got_storage_from_query = true;
}
@ -388,9 +388,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query.setFinal();
/// Save scalar sub queries's results in the query context
/// But discard them if the Storage has been modified
/// In an ideal situation we would only discard the scalars affected by the storage change
if (!options.only_analyze && context->hasQueryContext() && !context->getViewSource())
/// Note that we are only saving scalars and not local_scalars since the latter can't be safely shared across contexts
if (!options.only_analyze && context->hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())
context->getQueryContext()->addScalar(it.first, it.second);

View File

@ -138,6 +138,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
nested_interpreters.emplace_back(
buildCurrentChildInterpreter(ast->list_of_selects->children.at(query_num), require_full_header ? Names() : current_required_result_column_names));
uses_view_source |= nested_interpreters.back()->usesViewSource();
}
/// Determine structure of the result.

View File

@ -186,13 +186,13 @@ std::unique_ptr<InterpreterSelectWithUnionQuery> JoinedTables::makeLeftTableSubq
return std::make_unique<InterpreterSelectWithUnionQuery>(left_table_expression, context, select_options);
}
StoragePtr JoinedTables::getLeftTableStorage()
JoinedTables::storage_is_view_source JoinedTables::getLeftTableStorage()
{
if (isLeftTableSubquery())
return {};
if (isLeftTableFunction())
return context->getQueryContext()->executeTableFunction(left_table_expression);
return {false, context->getQueryContext()->executeTableFunction(left_table_expression)};
StorageID table_id = StorageID::createEmpty();
if (left_db_and_table)
@ -204,19 +204,20 @@ StoragePtr JoinedTables::getLeftTableStorage()
table_id = StorageID("system", "one");
}
if (auto view_source = context->getViewSource())
auto view_source = context->getViewSource();
if (view_source)
{
const auto & storage_values = static_cast<const StorageValues &>(*view_source);
auto tmp_table_id = storage_values.getStorageID();
if (tmp_table_id.database_name == table_id.database_name && tmp_table_id.table_name == table_id.table_name)
{
/// Read from view source.
return context->getViewSource();
return {true, view_source};
}
}
/// Read from table. Even without table expression (implicit SELECT ... FROM system.one).
return DatabaseCatalog::instance().getTable(table_id, context);
return {false, DatabaseCatalog::instance().getTable(table_id, context)};
}
bool JoinedTables::resolveTables()

View File

@ -7,6 +7,8 @@
#include <Interpreters/StorageID.h>
#include <Storages/IStorage_fwd.h>
#include <tuple>
namespace DB
{
@ -22,11 +24,13 @@ using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
class JoinedTables
{
public:
using storage_is_view_source = std::pair<bool, StoragePtr>;
JoinedTables(ContextPtr context, const ASTSelectQuery & select_query, bool include_all_columns_ = false);
void reset(const ASTSelectQuery & select_query);
StoragePtr getLeftTableStorage();
JoinedTables::storage_is_view_source getLeftTableStorage();
bool resolveTables();
/// Make fake tables_with_columns[0] in case we have predefined input in InterpreterSelectQuery

View File

@ -479,10 +479,11 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
}
/// Replacing scalar subqueries with constant values.
void executeScalarSubqueries(ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, bool only_analyze)
void executeScalarSubqueries(
ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, Scalars & local_scalars, bool only_analyze)
{
LogAST log;
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, only_analyze};
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, local_scalars, only_analyze};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
}
@ -1112,7 +1113,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze);
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, result.local_scalars, select_options.only_analyze);
if (settings.legacy_column_name_of_tuple_literal)
markTupleLiteralsAsLegacy(query);
@ -1195,7 +1196,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, getContext(), 0, result.scalars, !execute_scalar_subqueries);
executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries);
if (settings.legacy_column_name_of_tuple_literal)
markTupleLiteralsAsLegacy(query);

View File

@ -72,6 +72,7 @@ struct TreeRewriterResult
/// Results of scalar sub queries
Scalars scalars;
Scalars local_scalars;
explicit TreeRewriterResult(
const NamesAndTypesList & source_columns_,

View File

@ -22,38 +22,20 @@ namespace ErrorCodes
ASTPtr ASTSelectQuery::clone() const
{
auto res = std::make_shared<ASTSelectQuery>(*this);
/** NOTE Members must clone exactly in the same order in which they were inserted into `children` in ParserSelectQuery.
* This is important because the AST hash depends on the children order and this hash is used for multiple things,
* like the column identifiers in the case of subqueries in the IN statement or caching scalar queries (reused in CTEs so it's
* important for them to have the same hash).
* For distributed query processing, in case one of the servers is localhost and the other one is not, localhost query is executed
* within the process and is cloned, and the request is sent to the remote server in text form via TCP.
* And if the cloning order does not match the parsing order then different servers will get different identifiers.
*
* Since the positions map uses <key, position> we can copy it as is and ensure the new children array is created / pushed
* in the same order as the existing one */
res->children.clear();
res->positions.clear();
#define CLONE(expr) res->setExpression(expr, getExpression(expr, true))
/** NOTE Members must clone exactly in the same order,
* in which they were inserted into `children` in ParserSelectQuery.
* This is important because of the children's names the identifier (getTreeHash) is compiled,
* which can be used for column identifiers in the case of subqueries in the IN statement.
* For distributed query processing, in case one of the servers is localhost and the other one is not,
* localhost query is executed within the process and is cloned,
* and the request is sent to the remote server in text form via TCP.
* And if the cloning order does not match the parsing order,
* then different servers will get different identifiers.
*/
CLONE(Expression::WITH);
CLONE(Expression::SELECT);
CLONE(Expression::TABLES);
CLONE(Expression::PREWHERE);
CLONE(Expression::WHERE);
CLONE(Expression::GROUP_BY);
CLONE(Expression::HAVING);
CLONE(Expression::WINDOW);
CLONE(Expression::ORDER_BY);
CLONE(Expression::LIMIT_BY_OFFSET);
CLONE(Expression::LIMIT_BY_LENGTH);
CLONE(Expression::LIMIT_BY);
CLONE(Expression::LIMIT_OFFSET);
CLONE(Expression::LIMIT_LENGTH);
CLONE(Expression::SETTINGS);
#undef CLONE
for (auto & child : children)
res->children.push_back(child->clone());
return res;
}

View File

@ -721,7 +721,8 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
if (joined_tables.tablesCount() == 1)
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
auto [ignore, storage] = joined_tables.getLeftTableStorage();
storage_src = std::dynamic_pointer_cast<StorageDistributed>(storage);
if (storage_src)
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();

View File

@ -0,0 +1,2 @@
02177_CTE_GLOBAL_ON 5 500 11 0 5
02177_CTE_GLOBAL_OFF 1 100 5 0 1

View File

@ -0,0 +1,48 @@
WITH
( SELECT sleep(0.0001) FROM system.one ) as a1,
( SELECT sleep(0.0001) FROM system.one ) as a2,
( SELECT sleep(0.0001) FROM system.one ) as a3,
( SELECT sleep(0.0001) FROM system.one ) as a4,
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_GLOBAL_ON', a5
FORMAT Null
SETTINGS enable_global_with_statement = 1;
WITH
( SELECT sleep(0.0001) FROM system.one ) as a1,
( SELECT sleep(0.0001) FROM system.one ) as a2,
( SELECT sleep(0.0001) FROM system.one ) as a3,
( SELECT sleep(0.0001) FROM system.one ) as a4,
( SELECT sleep(0.0001) FROM system.one ) as a5
SELECT '02177_CTE_GLOBAL_OFF', a5
FORMAT Null
SETTINGS enable_global_with_statement = 0;
SYSTEM FLUSH LOGS;
SELECT
'02177_CTE_GLOBAL_ON',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%SELECT ''02177_CTE_GLOBAL_ON%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
SELECT
'02177_CTE_GLOBAL_OFF',
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
FROM system.query_log
WHERE
current_database = currentDatabase()
AND type = 'QueryFinish'
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;