mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
Merge branch 'master' into fix-url-globs
This commit is contained in:
commit
538830931b
@ -281,6 +281,10 @@
|
||||
M(ExternalDataSourceLocalCacheReadBytes, "Bytes read from local cache buffer in RemoteReadBufferCache")\
|
||||
\
|
||||
M(MainConfigLoads, "Number of times the main configuration was reloaded.") \
|
||||
\
|
||||
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
||||
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
||||
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -1,9 +1,9 @@
|
||||
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
|
||||
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
|
||||
@ -18,7 +18,14 @@
|
||||
#include <Parsers/ASTWithElement.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ScalarSubqueriesGlobalCacheHit;
|
||||
extern const Event ScalarSubqueriesLocalCacheHit;
|
||||
extern const Event ScalarSubqueriesCacheMiss;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -72,40 +79,95 @@ static bool worthConvertingToLiteral(const Block & scalar)
|
||||
return !useless_literal_types.count(scalar_type_name);
|
||||
}
|
||||
|
||||
static auto getQueryInterpreter(const ASTSubquery & subquery, ExecuteScalarSubqueriesMatcher::Data & data)
|
||||
{
|
||||
auto subquery_context = Context::createCopy(data.getContext());
|
||||
Settings subquery_settings = data.getContext()->getSettings();
|
||||
subquery_settings.max_result_rows = 1;
|
||||
subquery_settings.extremes = false;
|
||||
subquery_context->setSettings(subquery_settings);
|
||||
if (!data.only_analyze && subquery_context->hasQueryContext())
|
||||
{
|
||||
/// Save current cached scalars in the context before analyzing the query
|
||||
/// This is specially helpful when analyzing CTE scalars
|
||||
auto context = subquery_context->getQueryContext();
|
||||
for (const auto & it : data.scalars)
|
||||
context->addScalar(it.first, it.second);
|
||||
}
|
||||
|
||||
ASTPtr subquery_select = subquery.children.at(0);
|
||||
|
||||
auto options = SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1, true);
|
||||
options.analyze(data.only_analyze);
|
||||
|
||||
return std::make_unique<InterpreterSelectWithUnionQuery>(subquery_select, subquery_context, options);
|
||||
}
|
||||
|
||||
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
|
||||
{
|
||||
auto hash = subquery.getTreeHash();
|
||||
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
|
||||
|
||||
std::unique_ptr<InterpreterSelectWithUnionQuery> interpreter = nullptr;
|
||||
bool hit = false;
|
||||
bool is_local = false;
|
||||
|
||||
Block scalar;
|
||||
if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str))
|
||||
if (data.local_scalars.count(scalar_query_hash_str))
|
||||
{
|
||||
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
|
||||
hit = true;
|
||||
scalar = data.local_scalars[scalar_query_hash_str];
|
||||
is_local = true;
|
||||
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesLocalCacheHit);
|
||||
}
|
||||
else if (data.scalars.count(scalar_query_hash_str))
|
||||
{
|
||||
hit = true;
|
||||
scalar = data.scalars[scalar_query_hash_str];
|
||||
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto subquery_context = Context::createCopy(data.getContext());
|
||||
Settings subquery_settings = data.getContext()->getSettings();
|
||||
subquery_settings.max_result_rows = 1;
|
||||
subquery_settings.extremes = false;
|
||||
subquery_context->setSettings(subquery_settings);
|
||||
if (data.getContext()->hasQueryContext() && data.getContext()->getQueryContext()->hasScalar(scalar_query_hash_str))
|
||||
{
|
||||
if (!data.getContext()->getViewSource())
|
||||
{
|
||||
/// We aren't using storage views so we can safely use the context cache
|
||||
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
|
||||
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
|
||||
hit = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If we are under a context that uses views that means that the cache might contain values that reference
|
||||
/// the original table and not the view, so in order to be able to check the global cache we need to first
|
||||
/// make sure that the query doesn't use the view
|
||||
/// Note in any case the scalar will end up cached in *data* so this won't be repeated inside this context
|
||||
interpreter = getQueryInterpreter(subquery, data);
|
||||
if (!interpreter->usesViewSource())
|
||||
{
|
||||
scalar = data.getContext()->getQueryContext()->getScalar(scalar_query_hash_str);
|
||||
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesGlobalCacheHit);
|
||||
hit = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ASTPtr subquery_select = subquery.children.at(0);
|
||||
if (!hit)
|
||||
{
|
||||
if (!interpreter)
|
||||
interpreter = getQueryInterpreter(subquery, data);
|
||||
|
||||
auto options = SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1, true);
|
||||
options.analyze(data.only_analyze);
|
||||
ProfileEvents::increment(ProfileEvents::ScalarSubqueriesCacheMiss);
|
||||
is_local = interpreter->usesViewSource();
|
||||
|
||||
auto interpreter = InterpreterSelectWithUnionQuery(subquery_select, subquery_context, options);
|
||||
Block block;
|
||||
|
||||
if (data.only_analyze)
|
||||
{
|
||||
/// If query is only analyzed, then constants are not correct.
|
||||
block = interpreter.getSampleBlock();
|
||||
block = interpreter->getSampleBlock();
|
||||
for (auto & column : block)
|
||||
{
|
||||
if (column.column->empty())
|
||||
@ -118,14 +180,14 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
|
||||
}
|
||||
else
|
||||
{
|
||||
auto io = interpreter.execute();
|
||||
auto io = interpreter->execute();
|
||||
|
||||
PullingAsyncPipelineExecutor executor(io.pipeline);
|
||||
while (block.rows() == 0 && executor.pull(block));
|
||||
|
||||
if (block.rows() == 0)
|
||||
{
|
||||
auto types = interpreter.getSampleBlock().getDataTypes();
|
||||
auto types = interpreter->getSampleBlock().getDataTypes();
|
||||
if (types.size() != 1)
|
||||
types = {std::make_shared<DataTypeTuple>(types)};
|
||||
|
||||
@ -218,7 +280,10 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
|
||||
ast = std::move(func);
|
||||
}
|
||||
|
||||
data.scalars[scalar_query_hash_str] = std::move(scalar);
|
||||
if (is_local)
|
||||
data.local_scalars[scalar_query_hash_str] = std::move(scalar);
|
||||
else
|
||||
data.scalars[scalar_query_hash_str] = std::move(scalar);
|
||||
}
|
||||
|
||||
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
|
||||
|
@ -19,11 +19,8 @@ struct ASTTableExpression;
|
||||
*
|
||||
* Features
|
||||
*
|
||||
* A replacement occurs during query analysis, and not during the main runtime.
|
||||
* This means that the progress indicator will not work during the execution of these requests,
|
||||
* and also such queries can not be aborted.
|
||||
*
|
||||
* But the query result can be used for the index in the table.
|
||||
* A replacement occurs during query analysis, and not during the main runtime, so
|
||||
* the query result can be used for the index in the table.
|
||||
*
|
||||
* Scalar subqueries are executed on the request-initializer server.
|
||||
* The request is sent to remote servers with already substituted constants.
|
||||
@ -37,6 +34,7 @@ public:
|
||||
{
|
||||
size_t subquery_depth;
|
||||
Scalars & scalars;
|
||||
Scalars & local_scalars;
|
||||
bool only_analyze;
|
||||
};
|
||||
|
||||
|
@ -40,6 +40,15 @@ public:
|
||||
|
||||
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
|
||||
|
||||
/// Returns whether the query uses the view source from the Context
|
||||
/// The view source is a virtual storage that currently only materialized views use to replace the source table
|
||||
/// with the incoming block only
|
||||
/// This flag is useful to know for how long we can cache scalars generated by this query: If it doesn't use the virtual storage
|
||||
/// then we can cache the scalars forever (for any query that doesn't use the virtual storage either), but if it does use the virtual
|
||||
/// storage then we can only keep the scalar result around while we are working with that source block
|
||||
/// You can find more details about this under ExecuteScalarSubqueriesMatcher::visit
|
||||
bool usesViewSource() { return uses_view_source; }
|
||||
|
||||
protected:
|
||||
ASTPtr query_ptr;
|
||||
ContextMutablePtr context;
|
||||
@ -48,6 +57,7 @@ protected:
|
||||
size_t max_streams = 1;
|
||||
bool settings_limit_offset_needed = false;
|
||||
bool settings_limit_offset_done = false;
|
||||
bool uses_view_source = false;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,10 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
|
||||
nested_interpreters.resize(num_children);
|
||||
|
||||
for (size_t i = 0; i < num_children; ++i)
|
||||
{
|
||||
nested_interpreters[i] = buildCurrentChildInterpreter(children.at(i));
|
||||
uses_view_source |= nested_interpreters[i]->usesViewSource();
|
||||
}
|
||||
|
||||
Blocks headers(num_children);
|
||||
for (size_t query_num = 0; query_num < num_children; ++query_num)
|
||||
|
@ -64,8 +64,9 @@
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||
#include <Storages/StorageValues.h>
|
||||
#include <Storages/StorageView.h>
|
||||
|
||||
#include <Functions/IFunction.h>
|
||||
@ -315,6 +316,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
if (!has_input && !storage)
|
||||
{
|
||||
storage = joined_tables.getLeftTableStorage();
|
||||
// Mark uses_view_source if the returned storage is the same as the one saved in viewSource
|
||||
uses_view_source |= storage && storage == context->getViewSource();
|
||||
got_storage_from_query = true;
|
||||
}
|
||||
|
||||
@ -336,6 +339,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
|
||||
joined_tables.reset(getSelectQuery());
|
||||
joined_tables.resolveTables();
|
||||
if (auto view_source = context->getViewSource())
|
||||
{
|
||||
// If we are using a virtual block view to replace a table and that table is used
|
||||
// inside the JOIN then we need to update uses_view_source accordingly so we avoid propagating scalars that we can't cache
|
||||
const auto & storage_values = static_cast<const StorageValues &>(*view_source);
|
||||
auto tmp_table_id = storage_values.getStorageID();
|
||||
for (const auto & t : joined_tables.tablesWithColumns())
|
||||
uses_view_source |= (t.table.database == tmp_table_id.database_name && t.table.table == tmp_table_id.table_name);
|
||||
}
|
||||
|
||||
if (storage && joined_tables.isLeftTableSubquery())
|
||||
{
|
||||
@ -351,7 +363,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
{
|
||||
interpreter_subquery = joined_tables.makeLeftTableSubquery(options.subquery());
|
||||
if (interpreter_subquery)
|
||||
{
|
||||
source_header = interpreter_subquery->getSampleBlock();
|
||||
uses_view_source |= interpreter_subquery->usesViewSource();
|
||||
}
|
||||
}
|
||||
|
||||
joined_tables.rewriteDistributedInAndJoins(query_ptr);
|
||||
@ -389,9 +404,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
query.setFinal();
|
||||
|
||||
/// Save scalar sub queries's results in the query context
|
||||
/// But discard them if the Storage has been modified
|
||||
/// In an ideal situation we would only discard the scalars affected by the storage change
|
||||
if (!options.only_analyze && context->hasQueryContext() && !context->getViewSource())
|
||||
/// Note that we are only saving scalars and not local_scalars since the latter can't be safely shared across contexts
|
||||
if (!options.only_analyze && context->hasQueryContext())
|
||||
for (const auto & it : syntax_analyzer_result->getScalars())
|
||||
context->getQueryContext()->addScalar(it.first, it.second);
|
||||
|
||||
@ -479,6 +493,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
/// If there is an aggregation in the outer query, WITH TOTALS is ignored in the subquery.
|
||||
if (query_analyzer->hasAggregation())
|
||||
interpreter_subquery->ignoreWithTotals();
|
||||
uses_view_source |= interpreter_subquery->usesViewSource();
|
||||
}
|
||||
|
||||
required_columns = syntax_analyzer_result->requiredSourceColumns();
|
||||
|
@ -138,6 +138,9 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
|
||||
|
||||
nested_interpreters.emplace_back(
|
||||
buildCurrentChildInterpreter(ast->list_of_selects->children.at(query_num), require_full_header ? Names() : current_required_result_column_names));
|
||||
// We need to propagate the uses_view_source flag from children to the (self) parent since, if one of the children uses
|
||||
// a view source that means that the parent uses it too and can be cached globally
|
||||
uses_view_source |= nested_interpreters.back()->usesViewSource();
|
||||
}
|
||||
|
||||
/// Determine structure of the result.
|
||||
|
@ -479,10 +479,11 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
|
||||
}
|
||||
|
||||
/// Replacing scalar subqueries with constant values.
|
||||
void executeScalarSubqueries(ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, bool only_analyze)
|
||||
void executeScalarSubqueries(
|
||||
ASTPtr & query, ContextPtr context, size_t subquery_depth, Scalars & scalars, Scalars & local_scalars, bool only_analyze)
|
||||
{
|
||||
LogAST log;
|
||||
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, only_analyze};
|
||||
ExecuteScalarSubqueriesVisitor::Data visitor_data{WithContext{context}, subquery_depth, scalars, local_scalars, only_analyze};
|
||||
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
|
||||
}
|
||||
|
||||
@ -1158,7 +1159,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
|
||||
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
|
||||
|
||||
/// Executing scalar subqueries - replacing them with constant values.
|
||||
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze);
|
||||
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, result.local_scalars, select_options.only_analyze);
|
||||
|
||||
if (settings.legacy_column_name_of_tuple_literal)
|
||||
markTupleLiteralsAsLegacy(query);
|
||||
@ -1248,7 +1249,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
|
||||
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
|
||||
|
||||
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
|
||||
executeScalarSubqueries(query, getContext(), 0, result.scalars, !execute_scalar_subqueries);
|
||||
executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries);
|
||||
|
||||
if (settings.legacy_column_name_of_tuple_literal)
|
||||
markTupleLiteralsAsLegacy(query);
|
||||
|
@ -75,6 +75,7 @@ struct TreeRewriterResult
|
||||
|
||||
/// Results of scalar sub queries
|
||||
Scalars scalars;
|
||||
Scalars local_scalars;
|
||||
|
||||
explicit TreeRewriterResult(
|
||||
const NamesAndTypesList & source_columns_,
|
||||
|
@ -22,38 +22,20 @@ namespace ErrorCodes
|
||||
ASTPtr ASTSelectQuery::clone() const
|
||||
{
|
||||
auto res = std::make_shared<ASTSelectQuery>(*this);
|
||||
|
||||
/** NOTE Members must clone exactly in the same order in which they were inserted into `children` in ParserSelectQuery.
|
||||
* This is important because the AST hash depends on the children order and this hash is used for multiple things,
|
||||
* like the column identifiers in the case of subqueries in the IN statement or caching scalar queries (reused in CTEs so it's
|
||||
* important for them to have the same hash).
|
||||
* For distributed query processing, in case one of the servers is localhost and the other one is not, localhost query is executed
|
||||
* within the process and is cloned, and the request is sent to the remote server in text form via TCP.
|
||||
* And if the cloning order does not match the parsing order then different servers will get different identifiers.
|
||||
*
|
||||
* Since the positions map uses <key, position> we can copy it as is and ensure the new children array is created / pushed
|
||||
* in the same order as the existing one */
|
||||
res->children.clear();
|
||||
res->positions.clear();
|
||||
|
||||
#define CLONE(expr) res->setExpression(expr, getExpression(expr, true))
|
||||
|
||||
/** NOTE Members must clone exactly in the same order,
|
||||
* in which they were inserted into `children` in ParserSelectQuery.
|
||||
* This is important because of the children's names the identifier (getTreeHash) is compiled,
|
||||
* which can be used for column identifiers in the case of subqueries in the IN statement.
|
||||
* For distributed query processing, in case one of the servers is localhost and the other one is not,
|
||||
* localhost query is executed within the process and is cloned,
|
||||
* and the request is sent to the remote server in text form via TCP.
|
||||
* And if the cloning order does not match the parsing order,
|
||||
* then different servers will get different identifiers.
|
||||
*/
|
||||
CLONE(Expression::WITH);
|
||||
CLONE(Expression::SELECT);
|
||||
CLONE(Expression::TABLES);
|
||||
CLONE(Expression::PREWHERE);
|
||||
CLONE(Expression::WHERE);
|
||||
CLONE(Expression::GROUP_BY);
|
||||
CLONE(Expression::HAVING);
|
||||
CLONE(Expression::WINDOW);
|
||||
CLONE(Expression::ORDER_BY);
|
||||
CLONE(Expression::LIMIT_BY_OFFSET);
|
||||
CLONE(Expression::LIMIT_BY_LENGTH);
|
||||
CLONE(Expression::LIMIT_BY);
|
||||
CLONE(Expression::LIMIT_OFFSET);
|
||||
CLONE(Expression::LIMIT_LENGTH);
|
||||
CLONE(Expression::SETTINGS);
|
||||
|
||||
#undef CLONE
|
||||
for (const auto & child : children)
|
||||
res->children.push_back(child->clone());
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -24,7 +24,9 @@ RemoteInserter::RemoteInserter(
|
||||
const String & query_,
|
||||
const Settings & settings_,
|
||||
const ClientInfo & client_info_)
|
||||
: connection(connection_), query(query_)
|
||||
: connection(connection_)
|
||||
, query(query_)
|
||||
, server_revision(connection.getServerRevision(timeouts))
|
||||
{
|
||||
ClientInfo modified_client_info = client_info_;
|
||||
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
|
@ -35,12 +35,14 @@ public:
|
||||
~RemoteInserter();
|
||||
|
||||
const Block & getHeader() const { return header; }
|
||||
UInt64 getServerRevision() const { return server_revision; }
|
||||
|
||||
private:
|
||||
Connection & connection;
|
||||
String query;
|
||||
Block header;
|
||||
bool finished = false;
|
||||
UInt64 server_revision;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -132,6 +132,7 @@ namespace
|
||||
|
||||
struct DistributedHeader
|
||||
{
|
||||
UInt64 revision = 0;
|
||||
Settings insert_settings;
|
||||
std::string insert_query;
|
||||
ClientInfo client_info;
|
||||
@ -166,9 +167,8 @@ namespace
|
||||
/// Read the parts of the header.
|
||||
ReadBufferFromString header_buf(header_data);
|
||||
|
||||
UInt64 initiator_revision;
|
||||
readVarUInt(initiator_revision, header_buf);
|
||||
if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
|
||||
readVarUInt(distributed_header.revision, header_buf);
|
||||
if (DBMS_TCP_PROTOCOL_VERSION < distributed_header.revision)
|
||||
{
|
||||
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
|
||||
}
|
||||
@ -177,7 +177,7 @@ namespace
|
||||
distributed_header.insert_settings.read(header_buf);
|
||||
|
||||
if (header_buf.hasPendingData())
|
||||
distributed_header.client_info.read(header_buf, initiator_revision);
|
||||
distributed_header.client_info.read(header_buf, distributed_header.revision);
|
||||
|
||||
if (header_buf.hasPendingData())
|
||||
{
|
||||
@ -188,10 +188,12 @@ namespace
|
||||
|
||||
if (header_buf.hasPendingData())
|
||||
{
|
||||
NativeReader header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
||||
NativeReader header_block_in(header_buf, distributed_header.revision);
|
||||
distributed_header.block_header = header_block_in.read();
|
||||
if (!distributed_header.block_header)
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName());
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
|
||||
"Cannot read header from the {} batch. Data was written with protocol version {}, current version: {}",
|
||||
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
|
||||
}
|
||||
|
||||
/// Add handling new data here, for example:
|
||||
@ -264,10 +266,10 @@ namespace
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void writeAndConvert(RemoteInserter & remote, ReadBufferFromFile & in)
|
||||
void writeAndConvert(RemoteInserter & remote, const DistributedHeader & distributed_header, ReadBufferFromFile & in)
|
||||
{
|
||||
CompressedReadBuffer decompressing_in(in);
|
||||
NativeReader block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
||||
NativeReader block_in(decompressing_in, distributed_header.revision);
|
||||
|
||||
while (Block block = block_in.read())
|
||||
{
|
||||
@ -304,7 +306,7 @@ namespace
|
||||
{
|
||||
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
|
||||
|
||||
writeAndConvert(remote, in);
|
||||
writeAndConvert(remote, distributed_header, in);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -314,14 +316,20 @@ namespace
|
||||
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
|
||||
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
|
||||
|
||||
writeAndConvert(remote, in);
|
||||
writeAndConvert(remote, distributed_header, in);
|
||||
return;
|
||||
}
|
||||
|
||||
/// If connection does not use compression, we have to uncompress the data.
|
||||
if (!compression_expected)
|
||||
{
|
||||
writeAndConvert(remote, in);
|
||||
writeAndConvert(remote, distributed_header, in);
|
||||
return;
|
||||
}
|
||||
|
||||
if (distributed_header.revision != remote.getServerRevision())
|
||||
{
|
||||
writeAndConvert(remote, distributed_header, in);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -915,10 +923,10 @@ public:
|
||||
{
|
||||
in = std::make_unique<ReadBufferFromFile>(file_name);
|
||||
decompressing_in = std::make_unique<CompressedReadBuffer>(*in);
|
||||
block_in = std::make_unique<NativeReader>(*decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
||||
log = &Poco::Logger::get("DirectoryMonitorSource");
|
||||
|
||||
readDistributedHeader(*in, log);
|
||||
auto distributed_header = readDistributedHeader(*in, log);
|
||||
block_in = std::make_unique<NativeReader>(*decompressing_in, distributed_header.revision);
|
||||
|
||||
first_block = block_in->read();
|
||||
}
|
||||
@ -1040,7 +1048,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
|
||||
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
|
||||
|
||||
CompressedReadBuffer decompressing_in(in);
|
||||
NativeReader block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
||||
NativeReader block_in(decompressing_in, distributed_header.revision);
|
||||
|
||||
while (Block block = block_in.read())
|
||||
{
|
||||
|
@ -0,0 +1 @@
|
||||
|
@ -0,0 +1,12 @@
|
||||
<clickhouse>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
@ -0,0 +1,39 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node_shard = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
|
||||
|
||||
node_dist = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], image='yandex/clickhouse-server',
|
||||
tag='21.11.9.1', stay_alive=True, with_installed_binary=True)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
node_shard.query("CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id")
|
||||
node_dist.query("CREATE TABLE local_table(id UInt32, val String) ENGINE = MergeTree ORDER BY id")
|
||||
node_dist.query("CREATE TABLE dist_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table, rand())")
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_distributed_in_tuple(started_cluster):
|
||||
node_dist.query("SYSTEM STOP DISTRIBUTED SENDS dist_table")
|
||||
|
||||
node_dist.query("INSERT INTO dist_table VALUES (1, 'foo')")
|
||||
assert node_dist.query("SELECT count() FROM dist_table") == "0\n"
|
||||
assert node_shard.query("SELECT count() FROM local_table") == "0\n"
|
||||
|
||||
node_dist.restart_with_latest_version(signal=9)
|
||||
node_dist.query("SYSTEM FLUSH DISTRIBUTED dist_table")
|
||||
|
||||
assert node_dist.query("SELECT count() FROM dist_table") == "1\n"
|
||||
assert node_shard.query("SELECT count() FROM local_table") == "1\n"
|
@ -0,0 +1,24 @@
|
||||
0 Value
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "dictGet(02154_test_dictionary, 'value', toUInt64(0))",
|
||||
"type": "String"
|
||||
},
|
||||
{
|
||||
"name": "dictGet(02154_test_dictionary, 'value', toUInt64(1))",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
{
|
||||
"dictGet(02154_test_dictionary, 'value', toUInt64(0))": "Value",
|
||||
"dictGet(02154_test_dictionary, 'value', toUInt64(1))": ""
|
||||
}
|
||||
],
|
||||
|
||||
"rows": 1
|
||||
}
|
39
tests/queries/0_stateless/02154_dictionary_get_http_json.sh
Executable file
39
tests/queries/0_stateless/02154_dictionary_get_http_json.sh
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 02154_test_source_table"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q """
|
||||
CREATE TABLE 02154_test_source_table
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE=TinyLog;
|
||||
"""
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO 02154_test_source_table VALUES (0, 'Value')"
|
||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM 02154_test_source_table"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY IF EXISTS 02154_test_dictionary"
|
||||
$CLICKHOUSE_CLIENT -q """
|
||||
CREATE DICTIONARY 02154_test_dictionary
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
)
|
||||
PRIMARY KEY id
|
||||
LAYOUT(HASHED())
|
||||
LIFETIME(0)
|
||||
SOURCE(CLICKHOUSE(TABLE '02154_test_source_table'))
|
||||
"""
|
||||
|
||||
echo """
|
||||
SELECT dictGet(02154_test_dictionary, 'value', toUInt64(0)), dictGet(02154_test_dictionary, 'value', toUInt64(1))
|
||||
FORMAT JSON
|
||||
""" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&wait_end_of_query=1&output_format_write_statistics=0" -d @-
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY 02154_test_dictionary"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE 02154_test_source_table"
|
@ -0,0 +1,2 @@
|
||||
02177_CTE_GLOBAL_ON 5 500 11 0 5
|
||||
02177_CTE_GLOBAL_OFF 1 100 5 0 1
|
48
tests/queries/0_stateless/02174_cte_scalar_cache.sql
Normal file
48
tests/queries/0_stateless/02174_cte_scalar_cache.sql
Normal file
@ -0,0 +1,48 @@
|
||||
WITH
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a1,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a2,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a3,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a4,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a5
|
||||
SELECT '02177_CTE_GLOBAL_ON', a5 FROM system.numbers LIMIT 100
|
||||
FORMAT Null
|
||||
SETTINGS enable_global_with_statement = 1;
|
||||
|
||||
WITH
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a1,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a2,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a3,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a4,
|
||||
( SELECT sleep(0.0001) FROM system.one ) as a5
|
||||
SELECT '02177_CTE_GLOBAL_OFF', a5 FROM system.numbers LIMIT 100
|
||||
FORMAT Null
|
||||
SETTINGS enable_global_with_statement = 0;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
SELECT
|
||||
'02177_CTE_GLOBAL_ON',
|
||||
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
|
||||
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
|
||||
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
|
||||
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
|
||||
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
|
||||
FROM system.query_log
|
||||
WHERE
|
||||
current_database = currentDatabase()
|
||||
AND type = 'QueryFinish'
|
||||
AND query LIKE '%SELECT ''02177_CTE_GLOBAL_ON%'
|
||||
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
|
||||
|
||||
SELECT
|
||||
'02177_CTE_GLOBAL_OFF',
|
||||
ProfileEvents['SleepFunctionCalls'] as sleep_calls,
|
||||
ProfileEvents['SleepFunctionMicroseconds'] as sleep_microseconds,
|
||||
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
|
||||
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
|
||||
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
|
||||
FROM system.query_log
|
||||
WHERE
|
||||
current_database = currentDatabase()
|
||||
AND type = 'QueryFinish'
|
||||
AND query LIKE '%02177_CTE_GLOBAL_OFF%'
|
||||
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
|
@ -0,0 +1,63 @@
|
||||
4 4 4 4 5
|
||||
9 9 9 9 5
|
||||
14 14 14 14 5
|
||||
19 19 19 19 5
|
||||
24 24 24 24 5
|
||||
29 29 29 29 5
|
||||
34 34 34 34 5
|
||||
39 39 39 39 5
|
||||
44 44 44 44 5
|
||||
49 49 49 49 5
|
||||
54 54 54 54 5
|
||||
59 59 59 59 5
|
||||
64 64 64 64 5
|
||||
69 69 69 69 5
|
||||
74 74 74 74 5
|
||||
79 79 79 79 5
|
||||
84 84 84 84 5
|
||||
89 89 89 89 5
|
||||
94 94 94 94 5
|
||||
99 99 99 99 5
|
||||
02177_MV 7 80 22
|
||||
10
|
||||
40
|
||||
70
|
||||
100
|
||||
130
|
||||
160
|
||||
190
|
||||
220
|
||||
250
|
||||
280
|
||||
310
|
||||
340
|
||||
370
|
||||
400
|
||||
430
|
||||
460
|
||||
490
|
||||
520
|
||||
550
|
||||
580
|
||||
02177_MV_2 0 0 21
|
||||
8
|
||||
18
|
||||
28
|
||||
38
|
||||
48
|
||||
58
|
||||
68
|
||||
78
|
||||
88
|
||||
98
|
||||
108
|
||||
118
|
||||
128
|
||||
138
|
||||
148
|
||||
158
|
||||
168
|
||||
178
|
||||
188
|
||||
198
|
||||
02177_MV_3 19 0 2
|
133
tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql
Normal file
133
tests/queries/0_stateless/02174_cte_scalar_cache_mv.sql
Normal file
@ -0,0 +1,133 @@
|
||||
-- TEST CACHE
|
||||
CREATE TABLE t1 (i Int64, j Int64) ENGINE = Memory;
|
||||
INSERT INTO t1 SELECT number, number FROM system.numbers LIMIT 100;
|
||||
CREATE TABLE t2 (k Int64, l Int64, m Int64, n Int64) ENGINE = Memory;
|
||||
|
||||
CREATE MATERIALIZED VIEW mv1 TO t2 AS
|
||||
WITH
|
||||
(SELECT max(i) FROM t1) AS t1
|
||||
SELECT
|
||||
t1 as k, -- Using local cache x 4
|
||||
t1 as l,
|
||||
t1 as m,
|
||||
t1 as n
|
||||
FROM t1
|
||||
LIMIT 5;
|
||||
|
||||
-- FIRST INSERT
|
||||
INSERT INTO t1
|
||||
WITH
|
||||
(SELECT max(i) FROM t1) AS t1
|
||||
SELECT
|
||||
number as i,
|
||||
t1 + t1 + t1 AS j -- Using global cache
|
||||
FROM system.numbers
|
||||
LIMIT 100
|
||||
SETTINGS
|
||||
min_insert_block_size_rows=5,
|
||||
max_insert_block_size=5,
|
||||
min_insert_block_size_rows_for_materialized_views=5,
|
||||
max_block_size=5,
|
||||
max_threads=1;
|
||||
|
||||
SELECT k, l, m, n, count()
|
||||
FROM t2
|
||||
GROUP BY k, l, m, n
|
||||
ORDER BY k, l, m, n;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
-- The main query should have a cache miss and 3 global hits
|
||||
-- The MV is executed 20 times (100 / 5) and each run does 1 miss and 4 hits to the LOCAL cache
|
||||
-- In addition to this, to prepare the MV, there is an extra preparation to get the list of columns via
|
||||
-- InterpreterSelectQuery, which adds 1 miss and 4 global hits (since it uses the global cache)
|
||||
-- So in total we have:
|
||||
-- Main query: 1 miss, 3 global
|
||||
-- Preparation: 1 miss, 4 global
|
||||
-- Blocks (20): 20 miss, 0 global, 80 local hits
|
||||
|
||||
-- TOTAL: 22 miss, 7 global, 80 local
|
||||
SELECT
|
||||
'02177_MV',
|
||||
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
|
||||
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
|
||||
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
|
||||
FROM system.query_log
|
||||
WHERE
|
||||
current_database = currentDatabase()
|
||||
AND type = 'QueryFinish'
|
||||
AND query LIKE '-- FIRST INSERT\nINSERT INTO t1\n%'
|
||||
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
|
||||
|
||||
DROP TABLE mv1;
|
||||
|
||||
CREATE TABLE t3 (z Int64) ENGINE = Memory;
|
||||
CREATE MATERIALIZED VIEW mv2 TO t3 AS
|
||||
SELECT
|
||||
-- This includes an unnecessarily complex query to verify that the local cache is used (since it uses t1)
|
||||
sum(i) + sum(j) + (SELECT * FROM (SELECT min(i) + min(j) FROM (SELECT * FROM system.one _a, t1 _b))) AS z
|
||||
FROM t1;
|
||||
|
||||
-- SECOND INSERT
|
||||
INSERT INTO t1
|
||||
SELECT 0 as i, number as j from numbers(100)
|
||||
SETTINGS
|
||||
min_insert_block_size_rows=5,
|
||||
max_insert_block_size=5,
|
||||
min_insert_block_size_rows_for_materialized_views=5,
|
||||
max_block_size=5,
|
||||
max_threads=1;
|
||||
|
||||
SELECT * FROM t3 ORDER BY z ASC;
|
||||
SYSTEM FLUSH LOGS;
|
||||
SELECT
|
||||
'02177_MV_2',
|
||||
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
|
||||
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
|
||||
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
|
||||
FROM system.query_log
|
||||
WHERE
|
||||
current_database = currentDatabase()
|
||||
AND type = 'QueryFinish'
|
||||
AND query LIKE '-- SECOND INSERT\nINSERT INTO t1%'
|
||||
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
|
||||
|
||||
DROP TABLE mv2;
|
||||
|
||||
|
||||
CREATE TABLE t4 (z Int64) ENGINE = Memory;
|
||||
CREATE MATERIALIZED VIEW mv3 TO t4 AS
|
||||
SELECT
|
||||
-- This includes an unnecessarily complex query but now it uses t2 so it can be cached
|
||||
min(i) + min(j) + (SELECT * FROM (SELECT min(k) + min(l) FROM (SELECT * FROM system.one _a, t2 _b))) AS z
|
||||
FROM t1;
|
||||
|
||||
-- THIRD INSERT
|
||||
INSERT INTO t1
|
||||
SELECT number as i, number as j from numbers(100)
|
||||
SETTINGS
|
||||
min_insert_block_size_rows=5,
|
||||
max_insert_block_size=5,
|
||||
min_insert_block_size_rows_for_materialized_views=5,
|
||||
max_block_size=5,
|
||||
max_threads=1;
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SELECT * FROM t4 ORDER BY z ASC;
|
||||
|
||||
SELECT
|
||||
'02177_MV_3',
|
||||
ProfileEvents['ScalarSubqueriesGlobalCacheHit'] as scalar_cache_global_hit,
|
||||
ProfileEvents['ScalarSubqueriesLocalCacheHit'] as scalar_cache_local_hit,
|
||||
ProfileEvents['ScalarSubqueriesCacheMiss'] as scalar_cache_miss
|
||||
FROM system.query_log
|
||||
WHERE
|
||||
current_database = currentDatabase()
|
||||
AND type = 'QueryFinish'
|
||||
AND query LIKE '-- THIRD INSERT\nINSERT INTO t1%'
|
||||
AND event_date >= yesterday() AND event_time > now() - interval 10 minute;
|
||||
|
||||
DROP TABLE mv3;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t2;
|
||||
DROP TABLE t3;
|
||||
DROP TABLE t4;
|
Loading…
Reference in New Issue
Block a user