Merge pull request #66912 from ClickHouse/fix-distributed-analyzer

Fix views over distributed tables with Analyzer
This commit is contained in:
Alexey Milovidov 2024-07-24 11:56:52 +00:00 committed by GitHub
commit eb5400b604
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 53 additions and 56 deletions

View File

@ -49,7 +49,7 @@ enum class QueryTreeNodeType : uint8_t
/// Convert query tree node type to string
const char * toString(QueryTreeNodeType type);
/** Query tree is semantical representation of query.
/** Query tree is a semantic representation of query.
* Query tree node represent node in query tree.
* IQueryTreeNode is base class for all query tree nodes.
*

View File

@ -33,7 +33,7 @@ size_t toMilliseconds(auto duration)
return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
}
const auto epsilon = 500us;
const auto epsilon = 1ms;
class ResolvePoolMock : public DB::HostResolver
{
@ -358,53 +358,59 @@ void check_no_failed_address(size_t iteration, auto & resolver, auto & addresses
TEST_F(ResolvePoolTest, BannedForConsiquenceFail)
{
auto history = 5ms;
auto history = 10ms;
auto resolver = make_resolver(toMilliseconds(history));
auto failed_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*failed_addr));
auto start_at = now();
failed_addr.setFail();
auto start_at = now();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
check_no_failed_address(1, resolver, addresses, failed_addr, metrics, start_at + history - epsilon);
sleep_until(start_at + history + epsilon);
start_at = now();
resolver->update();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
failed_addr.setFail();
start_at = now();
check_no_failed_address(2, resolver, addresses, failed_addr, metrics, start_at + history - epsilon);
sleep_until(start_at + history + epsilon);
start_at = now();
resolver->update();
// too much time has passed
if (now() > start_at + 2*history - epsilon)
return;
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
// ip still banned adter history_ms + update, because it was his second consiquent fail
check_no_failed_address(2, resolver, addresses, failed_addr, metrics, start_at + history - epsilon);
check_no_failed_address(2, resolver, addresses, failed_addr, metrics, start_at + 2*history - epsilon);
}
TEST_F(ResolvePoolTest, NoAditionalBannForConcurrentFail)
{
auto history = 5ms;
auto history = 10ms;
auto resolver = make_resolver(toMilliseconds(history));
auto failed_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*failed_addr));
auto start_at = now();
failed_addr.setFail();
failed_addr.setFail();
failed_addr.setFail();
failed_addr.setFail();
failed_addr.setFail();
failed_addr.setFail();
auto start_at = now();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
@ -413,6 +419,7 @@ TEST_F(ResolvePoolTest, NoAditionalBannForConcurrentFail)
sleep_until(start_at + history + epsilon);
resolver->update();
// ip is cleared after just 1 history_ms interval.
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));

View File

@ -17,13 +17,19 @@
namespace DB
{
IInterpreterUnionOrSelectQuery::IInterpreterUnionOrSelectQuery(const DB::ASTPtr& query_ptr_,
const DB::ContextMutablePtr& context_, const DB::SelectQueryOptions& options_)
: query_ptr(query_ptr_)
, context(context_)
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
IInterpreterUnionOrSelectQuery::IInterpreterUnionOrSelectQuery(const ASTPtr & query_ptr_,
const ContextMutablePtr & context_, const SelectQueryOptions & options_)
: query_ptr(query_ptr_)
, context(context_)
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
{
/// FIXME All code here will work with the old analyzer, however for views over Distributed tables
/// it's possible that new analyzer will be enabled in ::getQueryProcessingStage method
/// of the underlying storage when all other parts of infrastructure are not ready for it
/// (built with old analyzer).
context->setSetting("allow_experimental_analyzer", false);
if (options.shard_num)
context->addSpecialScalar(
"_shard_num",

View File

@ -75,7 +75,6 @@
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMerge.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageView.h>
@ -214,11 +213,11 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
Pipe input_pipe_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(query_ptr_, context_, std::move(input_pipe_), nullptr, options_.copy().noSubquery())
const ASTPtr & query_ptr_,
const ContextPtr & context_,
Pipe input_pipe_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(query_ptr_, context_, std::move(input_pipe_), nullptr, options_.copy().noSubquery())
{}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -227,18 +226,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const StoragePtr & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const SelectQueryOptions & options_)
: InterpreterSelectQuery(
query_ptr_, context_, std::nullopt, storage_, options_.copy().noSubquery(), {}, metadata_snapshot_)
{
}
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, storage_, options_.copy().noSubquery(), {}, metadata_snapshot_)
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
const ContextPtr & context_,
const SelectQueryOptions & options_,
PreparedSetsPtr prepared_sets_)
: InterpreterSelectQuery(
query_ptr_, context_, std::nullopt, nullptr, options_, {}, {}, prepared_sets_)
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, nullptr, options_, {}, {}, prepared_sets_)
{}
InterpreterSelectQuery::~InterpreterSelectQuery() = default;

View File

@ -26,7 +26,6 @@ class Logger;
namespace DB
{
class SubqueryForSet;
class InterpreterSelectWithUnionQuery;
class Context;
class QueryPlan;

View File

@ -43,7 +43,6 @@
#include <Parsers/parseQuery.h>
#include <Parsers/IAST.h>
#include <Analyzer/Utils.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableNode.h>
@ -61,26 +60,20 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/Context.h>
#include <Interpreters/createBlockSelector.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/getCustomKeyFilterForParallelReplicas.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -90,7 +83,6 @@
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
@ -496,7 +488,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
}
std::optional<QueryProcessingStage::Enum> optimized_stage;
if (settings.allow_experimental_analyzer)
if (query_info.query_tree)
optimized_stage = getOptimizedQueryProcessingStageAnalyzer(query_info, settings);
else
optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
@ -860,31 +852,28 @@ void StorageDistributed::read(
modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed);
modified_query_info.query_tree = std::move(query_tree_distributed);
/// Return directly (with correct header) if no shard to query.
if (modified_query_info.getCluster()->getShardsInfo().empty())
return;
}
else
{
header = InterpreterSelectQuery(modified_query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
if (!settings.allow_experimental_analyzer)
{
modified_query_info.query = ClusterProxy::rewriteSelectQuery(
local_context, modified_query_info.query,
remote_database, remote_table, remote_table_function_ptr);
}
/// Return directly (with correct header) if no shard to query.
if (modified_query_info.getCluster()->getShardsInfo().empty())
{
if (settings.allow_experimental_analyzer)
if (modified_query_info.getCluster()->getShardsInfo().empty())
{
Pipe pipe(std::make_shared<NullSource>(header));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource (Distributed)");
query_plan.addStep(std::move(read_from_pipe));
return;
Pipe pipe(std::make_shared<NullSource>(header));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
read_from_pipe->setStepDescription("Read from NullSource (Distributed)");
query_plan.addStep(std::move(read_from_pipe));
return;
}
}
const auto & snapshot_data = assert_cast<const SnapshotData &>(*storage_snapshot->data);

View File

@ -172,7 +172,7 @@ static ExpressionAndSets buildExpressionAndSets(ASTPtr & ast, const NamesAndType
/// with subqueries it's possible that new analyzer will be enabled in ::read method
/// of underlying storage when all other parts of infra are not ready for it
/// (built with old analyzer).
context_copy->setSetting("allow_experimental_analyzer", Field{0});
context_copy->setSetting("allow_experimental_analyzer", false);
auto syntax_analyzer_result = TreeRewriter(context_copy).analyze(ast, columns);
ExpressionAnalyzer analyzer(ast, syntax_analyzer_result, context_copy);
auto dag = analyzer.getActionsDAG(false);