Hotfix for pushdown with StorageMerge (#13679)

This commit is contained in:
Artem Zuikov 2020-08-14 12:38:18 +03:00 committed by GitHub
parent 09a72d0c64
commit 89e9673336
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 72 additions and 5 deletions

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
{
extern const int ALIAS_REQUIRED;
extern const int AMBIGUOUS_COLUMN_NAME;
extern const int LOGICAL_ERROR;
}
namespace
@ -187,7 +188,8 @@ StoragePtr JoinedTables::getLeftTableStorage()
bool JoinedTables::resolveTables()
{
tables_with_columns = getDatabaseAndTablesWithColumns(table_expressions, context);
assert(tables_with_columns.size() == table_expressions.size());
if (tables_with_columns.size() != table_expressions.size())
throw Exception("Unexpected tables count", ErrorCodes::LOGICAL_ERROR);
const auto & settings = context.getSettingsRef();
if (settings.joined_subquery_requires_alias && tables_with_columns.size() > 1)

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PredicateExpressionsOptimizer::PredicateExpressionsOptimizer(
@ -111,6 +112,10 @@ bool PredicateExpressionsOptimizer::tryRewritePredicatesToTables(ASTs & tables_e
{
bool is_rewrite_tables = false;
if (tables_element.size() != tables_predicates.size())
throw Exception("Unexpected elements count in predicate push down: `set enable_optimize_predicate_expression = 0` to disable",
ErrorCodes::LOGICAL_ERROR);
for (size_t index = tables_element.size(); index > 0; --index)
{
size_t table_pos = index - 1;

View File

@ -37,6 +37,27 @@ namespace ErrorCodes
extern const int SAMPLING_NOT_SUPPORTED;
}
namespace
{
/// Rewrite original query removing joined tables from it
void removeJoin(const ASTSelectQuery & select)
{
const auto & tables = select.tables();
if (!tables || tables->children.size() < 2)
return;
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
if (!joined_table.table_join)
return;
/// The most simple temporary solution: leave only the first table in query.
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
tables->children.resize(1);
}
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
@ -243,6 +264,9 @@ Pipe StorageMerge::createSources(
SelectQueryInfo modified_query_info = query_info;
modified_query_info.query = query_info.query->clone();
/// Original query could contain JOIN but we need only the first joined table and its columns.
removeJoin(*modified_query_info.query->as<ASTSelectQuery>());
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name);
Pipe pipe;
@ -428,9 +452,14 @@ Block StorageMerge::getQueryHeader(
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())),
SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
{
auto query = query_info.query->clone();
removeJoin(*query->as<ASTSelectQuery>());
auto stream = std::make_shared<OneBlockInputStream>(
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()));
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -26,7 +26,6 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.
/// Returns true if at least one row is discarded.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// Extract from the input stream a set of `name` column values

View File

@ -0,0 +1,30 @@
DROP TABLE IF EXISTS test1;
DROP TABLE IF EXISTS test1_distributed;
DROP TABLE IF EXISTS test_merge;
SET enable_optimize_predicate_expression = 1;
CREATE TABLE test1 (id Int64, name String) ENGINE MergeTree PARTITION BY (id) ORDER BY (id);
CREATE TABLE test1_distributed AS test1 ENGINE = Distributed(test_cluster_two_shards_localhost, default, test1);
CREATE TABLE test_merge AS test1 ENGINE = Merge('default', 'test1_distributed');
SELECT count() FROM test_merge
JOIN (SELECT 'anystring' AS name) AS n
USING name
WHERE id = 1;
DROP TABLE test1;
DROP TABLE test_merge;
CREATE TABLE test1 (id Int64, name String) ENGINE MergeTree PARTITION BY (id) ORDER BY (id);
CREATE TABLE test_merge AS test1 ENGINE = Merge('default', 'test1');
SELECT count() FROM test_merge
JOIN (SELECT 'anystring' AS name) AS n
USING name
WHERE id = 1;
DROP TABLE test1;
DROP TABLE test_merge;
DROP TABLE test1_distributed;