diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index b453b73c4cb..a245cf66449 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -73,6 +74,7 @@ namespace ErrorCodes extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int TOO_MANY_ROWS; extern const int UNABLE_TO_SKIP_UNUSED_SHARDS; + extern const int LOGICAL_ERROR; } namespace ActionLocks @@ -378,8 +380,54 @@ StoragePtr StorageDistributed::createWithOwnCluster( } -QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const +bool StorageDistributed::canForceGroupByNoMerge(const Context &context, const ASTPtr & query_ptr) const { + const auto & settings = context.getSettingsRef(); + + if (settings.distributed_group_by_no_merge) + return true; + if (!settings.optimize_skip_unused_shards) + return false; + if (!has_sharding_key) + return false; + + const auto & select = query_ptr->as(); + + if (select.orderBy()) + return false; + if (select.distinct) + return false; + + // This can use distributed_group_by_no_merge but in this case limit stage + // should be done later (which is not the case right now). + if (select.limitBy() || select.limitLength()) + return false; + + const ASTPtr group_by = select.groupBy(); + if (!group_by) + return false; + + // injective functions are optimized out in optimizeGroupBy() + // hence all we need to check is that column in GROUP BY matches sharding expression + auto & group_exprs = group_by->children; + if (!group_exprs.size()) + throw Exception("No ASTExpressionList in GROUP BY", ErrorCodes::LOGICAL_ERROR); + + auto id = group_exprs[0]->as(); + if (!id) + return false; + if (!sharding_key_expr->getSampleBlock().has(id->name)) + return false; + + LOG_DEBUG(log, "Force distributed_group_by_no_merge for GROUP BY " << backQuote(serializeAST(*group_by, true)) << " (injective)"); + return true; +} + +QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const +{ + if (canForceGroupByNoMerge(context, query_ptr)) + return QueryProcessingStage::Complete; + auto cluster = getOptimizedCluster(context, query_ptr); return getQueryProcessingStageImpl(context, to_stage, cluster); } diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 81c6b54a63e..05b8b9fb55c 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -67,6 +67,8 @@ public: bool isRemote() const override { return true; } + /// Return true if distributed_group_by_no_merge may be applied. + bool canForceGroupByNoMerge(const Context &, const ASTPtr &) const; QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override; Pipes read(