mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #3851 from abyss7/issue-3592
Refactor constant folding
This commit is contained in:
commit
3cefb27e56
3
.gitignore
vendored
3
.gitignore
vendored
@ -248,3 +248,6 @@ website/package-lock.json
|
|||||||
|
|
||||||
# Ignore files for locally disabled tests
|
# Ignore files for locally disabled tests
|
||||||
/dbms/tests/queries/**/*.disabled
|
/dbms/tests/queries/**/*.disabled
|
||||||
|
|
||||||
|
# cquery cache
|
||||||
|
/.cquery-cache
|
||||||
|
@ -187,6 +187,20 @@
|
|||||||
</replica>
|
</replica>
|
||||||
</shard>
|
</shard>
|
||||||
</test_shard_localhost_secure>
|
</test_shard_localhost_secure>
|
||||||
|
<test_unavailable_shard>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>localhost</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>localhost</host>
|
||||||
|
<port>1</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</test_unavailable_shard>
|
||||||
</remote_servers>
|
</remote_servers>
|
||||||
|
|
||||||
|
|
||||||
|
@ -97,8 +97,8 @@ public:
|
|||||||
/// Approximate number of allocated bytes in memory - for profiling and limits.
|
/// Approximate number of allocated bytes in memory - for profiling and limits.
|
||||||
size_t allocatedBytes() const;
|
size_t allocatedBytes() const;
|
||||||
|
|
||||||
operator bool() const { return !data.empty(); }
|
operator bool() const { return !!columns(); }
|
||||||
bool operator!() const { return data.empty(); }
|
bool operator!() const { return !this->operator bool(); }
|
||||||
|
|
||||||
/** Get a list of column names separated by commas. */
|
/** Get a list of column names separated by commas. */
|
||||||
std::string dumpNames() const;
|
std::string dumpNames() const;
|
||||||
|
@ -397,14 +397,24 @@ void Cluster::initMisc()
|
|||||||
|
|
||||||
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
|
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
|
||||||
{
|
{
|
||||||
return std::unique_ptr<Cluster>{ new Cluster(*this, index) };
|
return std::unique_ptr<Cluster>{ new Cluster(*this, {index}) };
|
||||||
}
|
}
|
||||||
|
|
||||||
Cluster::Cluster(const Cluster & from, size_t index)
|
std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector<size_t> & indices) const
|
||||||
: shards_info{from.shards_info[index]}
|
|
||||||
{
|
{
|
||||||
if (!from.addresses_with_failover.empty())
|
return std::unique_ptr<Cluster>{ new Cluster(*this, indices) };
|
||||||
addresses_with_failover.emplace_back(from.addresses_with_failover[index]);
|
}
|
||||||
|
|
||||||
|
Cluster::Cluster(const Cluster & from, const std::vector<size_t> & indices)
|
||||||
|
: shards_info{}
|
||||||
|
{
|
||||||
|
for (size_t index : indices)
|
||||||
|
{
|
||||||
|
shards_info.emplace_back(from.shards_info.at(index));
|
||||||
|
|
||||||
|
if (!from.addresses_with_failover.empty())
|
||||||
|
addresses_with_failover.emplace_back(from.addresses_with_failover.at(index));
|
||||||
|
}
|
||||||
|
|
||||||
initMisc();
|
initMisc();
|
||||||
}
|
}
|
||||||
|
@ -143,6 +143,9 @@ public:
|
|||||||
/// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
|
/// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
|
||||||
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
|
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
|
||||||
|
|
||||||
|
/// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
|
||||||
|
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using SlotToShard = std::vector<UInt64>;
|
using SlotToShard = std::vector<UInt64>;
|
||||||
SlotToShard slot_to_shard;
|
SlotToShard slot_to_shard;
|
||||||
@ -153,8 +156,8 @@ public:
|
|||||||
private:
|
private:
|
||||||
void initMisc();
|
void initMisc();
|
||||||
|
|
||||||
/// For getClusterWithSingleShard implementation.
|
/// For getClusterWithMultipleShards implementation.
|
||||||
Cluster(const Cluster & from, size_t index);
|
Cluster(const Cluster & from, const std::vector<size_t> & indices);
|
||||||
|
|
||||||
String hash_of_addresses;
|
String hash_of_addresses;
|
||||||
/// Description of the cluster shards.
|
/// Description of the cluster shards.
|
||||||
|
@ -89,6 +89,7 @@ struct Settings
|
|||||||
M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \
|
M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \
|
||||||
\
|
\
|
||||||
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
|
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
|
||||||
|
M(SettingBool, distributed_optimize_skip_select_on_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
|
||||||
\
|
\
|
||||||
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \
|
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \
|
||||||
M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.") \
|
M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.") \
|
||||||
|
@ -1,18 +1,20 @@
|
|||||||
#include <Core/Block.h>
|
#include <Interpreters/evaluateConstantExpression.h>
|
||||||
|
|
||||||
#include <Columns/ColumnConst.h>
|
#include <Columns/ColumnConst.h>
|
||||||
#include <Columns/ColumnsNumber.h>
|
#include <Columns/ColumnsNumber.h>
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Core/Block.h>
|
||||||
#include <Parsers/ASTLiteral.h>
|
|
||||||
#include <Parsers/ASTFunction.h>
|
|
||||||
#include <Parsers/ExpressionElementParsers.h>
|
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Interpreters/SyntaxAnalyzer.h>
|
#include <Interpreters/convertFieldToType.h>
|
||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Interpreters/SyntaxAnalyzer.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
|
#include <Parsers/ASTIdentifier.h>
|
||||||
|
#include <Parsers/ASTLiteral.h>
|
||||||
|
#include <Parsers/ExpressionElementParsers.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
|
#include <Common/typeid_cast.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -77,4 +79,236 @@ ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, cons
|
|||||||
return evaluateConstantExpressionAsLiteral(node, context);
|
return evaluateConstantExpressionAsLiteral(node, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
using Conjunction = ColumnsWithTypeAndName;
|
||||||
|
using Disjunction = std::vector<Conjunction>;
|
||||||
|
|
||||||
|
Disjunction analyzeEquals(const ASTIdentifier * identifier, const ASTLiteral * literal, const ExpressionActionsPtr & expr)
|
||||||
|
{
|
||||||
|
if (!identifier || !literal)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & name_and_type : expr->getRequiredColumnsWithTypes())
|
||||||
|
{
|
||||||
|
const auto & name = name_and_type.name;
|
||||||
|
const auto & type = name_and_type.type;
|
||||||
|
|
||||||
|
if (name == identifier->name)
|
||||||
|
{
|
||||||
|
ColumnWithTypeAndName column;
|
||||||
|
// FIXME: what to do if field is not convertable?
|
||||||
|
column.column = type->createColumnConst(1, convertFieldToType(literal->value, *type));
|
||||||
|
column.name = name;
|
||||||
|
column.type = type;
|
||||||
|
return {{std::move(column)}};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction andDNF(const Disjunction & left, const Disjunction & right)
|
||||||
|
{
|
||||||
|
if (left.empty())
|
||||||
|
{
|
||||||
|
return right;
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction result;
|
||||||
|
|
||||||
|
for (const auto & conjunct1 : left)
|
||||||
|
{
|
||||||
|
for (const auto & conjunct2 : right)
|
||||||
|
{
|
||||||
|
Conjunction new_conjunct{conjunct1};
|
||||||
|
new_conjunct.insert(new_conjunct.end(), conjunct2.begin(), conjunct2.end());
|
||||||
|
result.emplace_back(new_conjunct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction analyzeFunction(const ASTFunction * fn, const ExpressionActionsPtr & expr)
|
||||||
|
{
|
||||||
|
if (!fn)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: enumerate all possible function names!
|
||||||
|
|
||||||
|
if (fn->name == "equals")
|
||||||
|
{
|
||||||
|
const auto * left = fn->arguments->children.front().get();
|
||||||
|
const auto * right = fn->arguments->children.back().get();
|
||||||
|
const auto * identifier = typeid_cast<const ASTIdentifier *>(left) ? typeid_cast<const ASTIdentifier *>(left)
|
||||||
|
: typeid_cast<const ASTIdentifier *>(right);
|
||||||
|
const auto * literal = typeid_cast<const ASTLiteral *>(left) ? typeid_cast<const ASTLiteral *>(left)
|
||||||
|
: typeid_cast<const ASTLiteral *>(right);
|
||||||
|
|
||||||
|
return analyzeEquals(identifier, literal, expr);
|
||||||
|
}
|
||||||
|
else if (fn->name == "in")
|
||||||
|
{
|
||||||
|
const auto * left = fn->arguments->children.front().get();
|
||||||
|
const auto * right = fn->arguments->children.back().get();
|
||||||
|
const auto * identifier = typeid_cast<const ASTIdentifier *>(left);
|
||||||
|
const auto * inner_fn = typeid_cast<const ASTFunction *>(right);
|
||||||
|
|
||||||
|
if (!inner_fn)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto * tuple = typeid_cast<const ASTExpressionList *>(inner_fn->children.front().get());
|
||||||
|
|
||||||
|
if (!tuple)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction result;
|
||||||
|
|
||||||
|
for (const auto & child : tuple->children)
|
||||||
|
{
|
||||||
|
const auto * literal = typeid_cast<const ASTLiteral *>(child.get());
|
||||||
|
const auto dnf = analyzeEquals(identifier, literal, expr);
|
||||||
|
|
||||||
|
if (dnf.empty())
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
result.insert(result.end(), dnf.begin(), dnf.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
else if (fn->name == "or")
|
||||||
|
{
|
||||||
|
const auto * args = typeid_cast<const ASTExpressionList *>(fn->children.front().get());
|
||||||
|
|
||||||
|
if (!args)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction result;
|
||||||
|
|
||||||
|
for (const auto & arg : args->children)
|
||||||
|
{
|
||||||
|
const auto dnf = analyzeFunction(typeid_cast<const ASTFunction *>(arg.get()), expr);
|
||||||
|
|
||||||
|
if (dnf.empty())
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
result.insert(result.end(), dnf.begin(), dnf.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
else if (fn->name == "and")
|
||||||
|
{
|
||||||
|
const auto * args = typeid_cast<const ASTExpressionList *>(fn->children.front().get());
|
||||||
|
|
||||||
|
if (!args)
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
Disjunction result;
|
||||||
|
|
||||||
|
for (const auto & arg : args->children)
|
||||||
|
{
|
||||||
|
const auto dnf = analyzeFunction(typeid_cast<const ASTFunction *>(arg.get()), expr);
|
||||||
|
|
||||||
|
if (dnf.empty())
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = andDNF(result, dnf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & node, const ExpressionActionsPtr & target_expr)
|
||||||
|
{
|
||||||
|
Blocks result;
|
||||||
|
|
||||||
|
// TODO: `node` may be always-false literal.
|
||||||
|
|
||||||
|
if (const auto fn = typeid_cast<const ASTFunction *>(node.get()))
|
||||||
|
{
|
||||||
|
const auto dnf = analyzeFunction(fn, target_expr);
|
||||||
|
|
||||||
|
if (dnf.empty())
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
auto hasRequiredColumns = [&target_expr](const Block & block) -> bool
|
||||||
|
{
|
||||||
|
for (const auto & name : target_expr->getRequiredColumns())
|
||||||
|
{
|
||||||
|
bool hasColumn = false;
|
||||||
|
for (const auto & column_name : block.getNames())
|
||||||
|
{
|
||||||
|
if (column_name == name)
|
||||||
|
{
|
||||||
|
hasColumn = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasColumn)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
for (const auto & conjunct : dnf)
|
||||||
|
{
|
||||||
|
Block block(conjunct);
|
||||||
|
|
||||||
|
// Block should contain all required columns from `target_expr`
|
||||||
|
if (!hasRequiredColumns(block))
|
||||||
|
{
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
target_expr->execute(block);
|
||||||
|
|
||||||
|
if (block.rows() == 1)
|
||||||
|
{
|
||||||
|
result.push_back(block);
|
||||||
|
}
|
||||||
|
else if (block.rows() == 0)
|
||||||
|
{
|
||||||
|
// filter out cases like "WHERE a = 1 AND a = 2"
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// FIXME: shouldn't happen
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {result};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,17 +1,22 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <memory>
|
#include <Core/Block.h>
|
||||||
#include <Core/Field.h>
|
#include <Core/Field.h>
|
||||||
#include <Parsers/IAST.h>
|
#include <Parsers/IAST.h>
|
||||||
#include <Parsers/IParser.h>
|
#include <Parsers/IParser.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
class Context;
|
class Context;
|
||||||
|
class ExpressionActions;
|
||||||
class IDataType;
|
class IDataType;
|
||||||
|
|
||||||
|
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||||
|
|
||||||
/** Evaluate constant expression and its type.
|
/** Evaluate constant expression and its type.
|
||||||
* Used in rare cases - for elements of set for IN, for data to INSERT.
|
* Used in rare cases - for elements of set for IN, for data to INSERT.
|
||||||
@ -20,17 +25,24 @@ class IDataType;
|
|||||||
std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context);
|
std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context);
|
||||||
|
|
||||||
|
|
||||||
/** Evaluate constant expression
|
/** Evaluate constant expression and returns ASTLiteral with its value.
|
||||||
* and returns ASTLiteral with its value.
|
|
||||||
*/
|
*/
|
||||||
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context);
|
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context);
|
||||||
|
|
||||||
|
|
||||||
/** Evaluate constant expression
|
/** Evaluate constant expression and returns ASTLiteral with its value.
|
||||||
* and returns ASTLiteral with its value.
|
|
||||||
* Also, if AST is identifier, then return string literal with its name.
|
* Also, if AST is identifier, then return string literal with its name.
|
||||||
* Useful in places where some name may be specified as identifier, or as result of a constant expression.
|
* Useful in places where some name may be specified as identifier, or as result of a constant expression.
|
||||||
*/
|
*/
|
||||||
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context);
|
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context);
|
||||||
|
|
||||||
|
/** Try to fold condition to countable set of constant values.
|
||||||
|
* @param condition a condition that we try to fold.
|
||||||
|
* @param target_expr expression evaluated over a set of constants.
|
||||||
|
* @return optional blocks each with a single row and a single column for target expression,
|
||||||
|
* or empty blocks if condition is always false,
|
||||||
|
* or nothing if condition can't be folded to a set of constants.
|
||||||
|
*/
|
||||||
|
std::optional<Blocks> evaluateExpressionOverConstantCondition(const ASTPtr & condition, const ExpressionActionsPtr & target_expr);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -313,7 +313,7 @@ bool KeyCondition::addCondition(const String & column, const Range & range)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Computes value of constant expression and it data type.
|
/** Computes value of constant expression and its data type.
|
||||||
* Returns false, if expression isn't constant.
|
* Returns false, if expression isn't constant.
|
||||||
*/
|
*/
|
||||||
static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & out_value, DataTypePtr & out_type)
|
static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & out_value, DataTypePtr & out_type)
|
||||||
|
@ -253,7 +253,7 @@ public:
|
|||||||
/// Get the maximum number of the key element used in the condition.
|
/// Get the maximum number of the key element used in the condition.
|
||||||
size_t getMaxKeyColumn() const;
|
size_t getMaxKeyColumn() const;
|
||||||
|
|
||||||
/// Impose an additional condition: the value in the column column must be in the `range` range.
|
/// Impose an additional condition: the value in the column `column` must be in the range `range`.
|
||||||
/// Returns whether there is such a column in the key.
|
/// Returns whether there is such a column in the key.
|
||||||
bool addCondition(const String & column, const Range & range);
|
bool addCondition(const String & column, const Range & range);
|
||||||
|
|
||||||
|
@ -1,38 +1,41 @@
|
|||||||
|
#include <Storages/StorageDistributed.h>
|
||||||
|
|
||||||
#include <DataStreams/OneBlockInputStream.h>
|
#include <DataStreams/OneBlockInputStream.h>
|
||||||
#include <DataStreams/materializeBlock.h>
|
#include <DataStreams/materializeBlock.h>
|
||||||
|
|
||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
|
|
||||||
#include <DataTypes/DataTypeFactory.h>
|
#include <DataTypes/DataTypeFactory.h>
|
||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
|
||||||
#include <Storages/StorageDistributed.h>
|
|
||||||
#include <Storages/Distributed/DistributedBlockOutputStream.h>
|
|
||||||
#include <Storages/Distributed/DirectoryMonitor.h>
|
#include <Storages/Distributed/DirectoryMonitor.h>
|
||||||
|
#include <Storages/Distributed/DistributedBlockOutputStream.h>
|
||||||
#include <Storages/StorageFactory.h>
|
#include <Storages/StorageFactory.h>
|
||||||
|
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
#include <Common/escapeForFileName.h>
|
#include <Common/escapeForFileName.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
|
|
||||||
#include <Parsers/ASTInsertQuery.h>
|
|
||||||
#include <Parsers/ASTSelectQuery.h>
|
|
||||||
#include <Parsers/TablePropertiesQueriesASTs.h>
|
|
||||||
#include <Parsers/ParserAlterQuery.h>
|
|
||||||
#include <Parsers/parseQuery.h>
|
|
||||||
#include <Parsers/ASTLiteral.h>
|
|
||||||
#include <Parsers/ASTExpressionList.h>
|
|
||||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
|
||||||
#include <Parsers/ASTDropQuery.h>
|
#include <Parsers/ASTDropQuery.h>
|
||||||
|
#include <Parsers/ASTExpressionList.h>
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
|
#include <Parsers/ASTInsertQuery.h>
|
||||||
|
#include <Parsers/ASTLiteral.h>
|
||||||
|
#include <Parsers/ASTSelectQuery.h>
|
||||||
|
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||||
|
#include <Parsers/ParserAlterQuery.h>
|
||||||
|
#include <Parsers/TablePropertiesQueriesASTs.h>
|
||||||
|
#include <Parsers/parseQuery.h>
|
||||||
|
|
||||||
#include <Interpreters/InterpreterSelectQuery.h>
|
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
||||||
|
#include <Interpreters/ClusterProxy/executeQuery.h>
|
||||||
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Interpreters/InterpreterAlterQuery.h>
|
#include <Interpreters/InterpreterAlterQuery.h>
|
||||||
#include <Interpreters/InterpreterDescribeQuery.h>
|
#include <Interpreters/InterpreterDescribeQuery.h>
|
||||||
|
#include <Interpreters/InterpreterSelectQuery.h>
|
||||||
#include <Interpreters/SyntaxAnalyzer.h>
|
#include <Interpreters/SyntaxAnalyzer.h>
|
||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
#include <Interpreters/createBlockSelector.h>
|
||||||
#include <Interpreters/evaluateConstantExpression.h>
|
#include <Interpreters/evaluateConstantExpression.h>
|
||||||
#include <Interpreters/ClusterProxy/executeQuery.h>
|
|
||||||
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
|
||||||
#include <Interpreters/getClusterName.h>
|
#include <Interpreters/getClusterName.h>
|
||||||
|
|
||||||
#include <Core/Field.h>
|
#include <Core/Field.h>
|
||||||
@ -58,6 +61,7 @@ namespace ErrorCodes
|
|||||||
extern const int INFINITE_LOOP;
|
extern const int INFINITE_LOOP;
|
||||||
extern const int TYPE_MISMATCH;
|
extern const int TYPE_MISMATCH;
|
||||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||||
|
extern const int TOO_MANY_ROWS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -133,6 +137,29 @@ void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & in
|
|||||||
increment.set(getMaximumFileNumber(path));
|
increment.set(getMaximumFileNumber(path));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// the same as DistributedBlockOutputStream::createSelector, should it be static?
|
||||||
|
IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
|
||||||
|
{
|
||||||
|
const auto & slot_to_shard = cluster->getSlotToShard();
|
||||||
|
|
||||||
|
#define CREATE_FOR_TYPE(TYPE) \
|
||||||
|
if (typeid_cast<const DataType##TYPE *>(result.type.get())) \
|
||||||
|
return createBlockSelector<TYPE>(*result.column, slot_to_shard);
|
||||||
|
|
||||||
|
CREATE_FOR_TYPE(UInt8)
|
||||||
|
CREATE_FOR_TYPE(UInt16)
|
||||||
|
CREATE_FOR_TYPE(UInt32)
|
||||||
|
CREATE_FOR_TYPE(UInt64)
|
||||||
|
CREATE_FOR_TYPE(Int8)
|
||||||
|
CREATE_FOR_TYPE(Int16)
|
||||||
|
CREATE_FOR_TYPE(Int32)
|
||||||
|
CREATE_FOR_TYPE(Int64)
|
||||||
|
|
||||||
|
#undef CREATE_FOR_TYPE
|
||||||
|
|
||||||
|
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -267,6 +294,14 @@ BlockInputStreams StorageDistributed::read(
|
|||||||
: ClusterProxy::SelectStreamFactory(
|
: ClusterProxy::SelectStreamFactory(
|
||||||
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
|
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
|
||||||
|
|
||||||
|
if (settings.distributed_optimize_skip_select_on_unused_shards)
|
||||||
|
{
|
||||||
|
auto smaller_cluster = skipUnusedShards(cluster, query_info);
|
||||||
|
|
||||||
|
if (smaller_cluster)
|
||||||
|
cluster = smaller_cluster;
|
||||||
|
}
|
||||||
|
|
||||||
return ClusterProxy::executeQuery(
|
return ClusterProxy::executeQuery(
|
||||||
select_stream_factory, cluster, modified_query_ast, context, settings);
|
select_stream_factory, cluster, modified_query_ast, context, settings);
|
||||||
}
|
}
|
||||||
@ -425,6 +460,41 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
|
|||||||
directory_monitor->shutdownAndDropAllData();
|
directory_monitor->shutdownAndDropAllData();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
|
||||||
|
/// using constraints from "WHERE" condition, otherwise returns `nullptr`
|
||||||
|
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info)
|
||||||
|
{
|
||||||
|
const auto & select = typeid_cast<ASTSelectQuery &>(*query_info.query);
|
||||||
|
|
||||||
|
if (!select.where_expression)
|
||||||
|
{
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto & blocks = evaluateExpressionOverConstantCondition(select.where_expression, sharding_key_expr);
|
||||||
|
|
||||||
|
// Can't get definite answer if we can skip any shards
|
||||||
|
if (!blocks)
|
||||||
|
{
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::set<int> shards;
|
||||||
|
|
||||||
|
for (const auto & block : *blocks)
|
||||||
|
{
|
||||||
|
if (!block.has(sharding_key_column_name))
|
||||||
|
throw Exception("sharding_key_expr should evaluate as a single row", ErrorCodes::TOO_MANY_ROWS);
|
||||||
|
|
||||||
|
const auto result = block.getByName(sharding_key_column_name);
|
||||||
|
const auto selector = createSelector(cluster, result);
|
||||||
|
|
||||||
|
shards.insert(selector.begin(), selector.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void registerStorageDistributed(StorageFactory & factory)
|
void registerStorageDistributed(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
|
@ -166,6 +166,8 @@ protected:
|
|||||||
const ASTPtr & sharding_key_,
|
const ASTPtr & sharding_key_,
|
||||||
const String & data_path_,
|
const String & data_path_,
|
||||||
bool attach);
|
bool attach);
|
||||||
|
|
||||||
|
ClusterPtr skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
OK
|
||||||
|
OK
|
||||||
|
1
|
||||||
|
OK
|
||||||
|
0
|
||||||
|
4
|
||||||
|
2
|
||||||
|
1
|
||||||
|
1
|
||||||
|
1
|
||||||
|
4
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
||||||
|
OK
|
@ -0,0 +1,105 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.mergetree;"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.distributed;"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.mergetree (a Int64, b Int64, c Int64) ENGINE = MergeTree ORDER BY (a, b);"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "CREATE TABLE test.distributed AS test.mergetree ENGINE = Distributed(test_unavailable_shard, test, mergetree, jumpConsistentHash(a+b, 2));"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (0, 0, 0);"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (1, 0, 0);"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (0, 1, 1);"
|
||||||
|
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.mergetree VALUES (1, 1, 1);"
|
||||||
|
|
||||||
|
# Should fail because second shard is unavailable
|
||||||
|
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM test.distributed;" 2>&1 \
|
||||||
|
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Should fail without setting `distributed_optimize_skip_select_on_unused_shards`
|
||||||
|
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0;" 2>&1 \
|
||||||
|
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Should pass now
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0;
|
||||||
|
"
|
||||||
|
|
||||||
|
# Should still fail because of matching unavailable shard
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 2 AND b = 2;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
# Try more complext expressions for constant folding - all should pass.
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 1 AND a = 0 AND b = 0;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a IN (0, 1) AND b IN (0, 1);
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR a = 1 AND b = 1;
|
||||||
|
"
|
||||||
|
|
||||||
|
# TODO: should pass one day.
|
||||||
|
#${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
# SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
# SELECT count(*) FROM test.distributed WHERE a = 0 AND b >= 0 AND b <= 1;
|
||||||
|
#"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND c = 0;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND c != 10;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 AND (a+b)*b != 12;
|
||||||
|
"
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE (a = 0 OR a = 1) AND (b = 0 OR b = 1);
|
||||||
|
"
|
||||||
|
|
||||||
|
# These ones should fail.
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b <= 1;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND c = 0;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 OR a = 1 AND b = 0;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR a = 2 AND b = 2;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -n --query="
|
||||||
|
SET distributed_optimize_skip_select_on_unused_shards = 1;
|
||||||
|
SELECT count(*) FROM test.distributed WHERE a = 0 AND b = 0 OR c = 0;
|
||||||
|
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
|
@ -53,6 +53,20 @@
|
|||||||
<timezone>Europe/Moscow</timezone>
|
<timezone>Europe/Moscow</timezone>
|
||||||
<remote_servers incl="clickhouse_remote_servers" >
|
<remote_servers incl="clickhouse_remote_servers" >
|
||||||
<!-- Test only shard config for testing distributed storage -->
|
<!-- Test only shard config for testing distributed storage -->
|
||||||
|
<test_unavailable_shard>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>localhost</host>
|
||||||
|
<port>59000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
<shard>
|
||||||
|
<replica>
|
||||||
|
<host>localhost</host>
|
||||||
|
<port>1</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</test_unavailable_shard>
|
||||||
<test_shard_localhost>
|
<test_shard_localhost>
|
||||||
<shard>
|
<shard>
|
||||||
<replica>
|
<replica>
|
||||||
|
Loading…
Reference in New Issue
Block a user