mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 13:32:13 +00:00
Attempt to fix regression: missing support for remote tables in IN section when querying Distributed tables [#CLICKHOUSE-2]
This commit is contained in:
parent
597160443a
commit
efd493b323
@ -61,6 +61,8 @@
|
|||||||
#include <DataTypes/DataTypeFunction.h>
|
#include <DataTypes/DataTypeFunction.h>
|
||||||
#include <Functions/FunctionsMiscellaneous.h>
|
#include <Functions/FunctionsMiscellaneous.h>
|
||||||
|
|
||||||
|
#include <Core/iostream_debug_helpers.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -2438,6 +2440,9 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
|
|||||||
initChain(chain, source_columns);
|
initChain(chain, source_columns);
|
||||||
ExpressionActionsChain::Step & step = chain.steps.back();
|
ExpressionActionsChain::Step & step = chain.steps.back();
|
||||||
|
|
||||||
|
DUMP(only_types);
|
||||||
|
DUMP(StackTrace().toString());
|
||||||
|
|
||||||
step.required_output.push_back(select_query->where_expression->getColumnName());
|
step.required_output.push_back(select_query->where_expression->getColumnName());
|
||||||
getRootActions(select_query->where_expression, only_types, false, step.actions);
|
getRootActions(select_query->where_expression, only_types, false, step.actions);
|
||||||
|
|
||||||
|
@ -676,7 +676,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
|
|||||||
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
|
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
|
||||||
|
|
||||||
if (pipeline.streams.empty())
|
if (pipeline.streams.empty())
|
||||||
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
|
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(
|
||||||
|
storage->analyze(required_columns, query_info, context, from_stage)));
|
||||||
|
|
||||||
pipeline.transform([&](auto & stream)
|
pipeline.transform([&](auto & stream)
|
||||||
{
|
{
|
||||||
|
@ -180,6 +180,20 @@ public:
|
|||||||
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Without executing a query determine,
|
||||||
|
* what structure of result and what query processed stage
|
||||||
|
* we will get if we will call 'read' method.
|
||||||
|
*/
|
||||||
|
virtual Block analyze(
|
||||||
|
const Names & column_names,
|
||||||
|
const SelectQueryInfo & /*query_info*/,
|
||||||
|
const Context & /*context*/,
|
||||||
|
QueryProcessingStage::Enum & processed_stage)
|
||||||
|
{
|
||||||
|
processed_stage = QueryProcessingStage::FetchColumns;
|
||||||
|
return getSampleBlockForColumns(column_names);
|
||||||
|
}
|
||||||
|
|
||||||
/** Writes the data to a table.
|
/** Writes the data to a table.
|
||||||
* Receives a description of the query, which can contain information about the data write method.
|
* Receives a description of the query, which can contain information about the data write method.
|
||||||
* Returns an object by which you can write data sequentially.
|
* Returns an object by which you can write data sequentially.
|
||||||
|
@ -197,6 +197,26 @@ BlockInputStreams StorageDistributed::read(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Block StorageDistributed::analyze(
|
||||||
|
const Names & /*column_names*/,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const Context & context,
|
||||||
|
QueryProcessingStage::Enum & processed_stage)
|
||||||
|
{
|
||||||
|
auto cluster = getCluster();
|
||||||
|
|
||||||
|
const Settings & settings = context.getSettingsRef();
|
||||||
|
|
||||||
|
size_t result_size = (cluster->getRemoteShardCount() * settings.max_parallel_replicas) + cluster->getLocalShardCount();
|
||||||
|
|
||||||
|
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
|
||||||
|
? QueryProcessingStage::Complete
|
||||||
|
: QueryProcessingStage::WithMergeableState;
|
||||||
|
|
||||||
|
return materializeBlock(InterpreterSelectQuery(query_info.query, context, {}, processed_stage).getSampleBlock());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
|
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Settings & settings)
|
||||||
{
|
{
|
||||||
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
|
auto cluster = (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
|
||||||
|
@ -60,6 +60,12 @@ public:
|
|||||||
size_t max_block_size,
|
size_t max_block_size,
|
||||||
unsigned num_streams) override;
|
unsigned num_streams) override;
|
||||||
|
|
||||||
|
Block analyze(
|
||||||
|
const Names & column_names,
|
||||||
|
const SelectQueryInfo & query_info,
|
||||||
|
const Context & context,
|
||||||
|
QueryProcessingStage::Enum & processed_stage) override;
|
||||||
|
|
||||||
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
|
BlockOutputStreamPtr write(const ASTPtr & query, const Settings & settings) override;
|
||||||
|
|
||||||
void drop() override {}
|
void drop() override {}
|
||||||
|
Loading…
Reference in New Issue
Block a user