Add serializetion to some query plan steps.

This commit is contained in:
Nikolai Kochetov 2024-12-03 15:00:05 +00:00
parent e8dab58d2d
commit 6cc668bd88
65 changed files with 2030 additions and 212 deletions

View File

@ -15,6 +15,7 @@
#include <Common/logger_useful.h>
#include <base/phdr_cache.h>
#include <Common/ErrorHandlers.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <base/getMemoryAmount.h>
#include <base/getAvailableMemoryAmount.h>
#include <base/errnoToString.h>
@ -924,6 +925,8 @@ try
registerRemoteFileMetadatas();
registerSchedulerNodes();
QueryPlanStepRegistry::registerPlanSteps();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -393,7 +393,7 @@ QueryTreeNodePtr IdentifierResolver::wrapExpressionNodeInTupleElement(QueryTreeN
/// Resolve identifier functions implementation
/// Try resolve table identifier from database catalog
QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context)
std::shared_ptr<TableNode> IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context)
{
size_t parts_size = table_identifier.getPartsSize();
if (parts_size < 1 || parts_size > 2)

View File

@ -21,6 +21,7 @@ class QueryExpressionsAliasVisitor ;
class QueryNode;
class JoinNode;
class ColumnNode;
class TableNode;
using ProjectionName = String;
using ProjectionNames = std::vector<ProjectionName>;
@ -86,7 +87,7 @@ public:
/// Resolve identifier functions
static QueryTreeNodePtr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context);
static std::shared_ptr<TableNode> tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context);
QueryTreeNodePtr tryResolveIdentifierFromCompoundExpression(const Identifier & expression_identifier,
size_t identifier_bind_size,

View File

@ -3470,11 +3470,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
auto set = std::make_shared<Set>(size_limits_for_set, 0, settings[Setting::transform_null_in]);
set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName());
set->insertFromBlock(result_block.getColumnsWithTypeAndName());
set->finishInsert();
auto future_set = std::make_shared<FutureSetFromStorage>(std::move(set));
auto hash = function_arguments[1]->getTreeHash();
auto future_set = std::make_shared<FutureSetFromTuple>(hash, std::move(result_block), settings[Setting::transform_null_in], size_limits_for_set);
/// Create constant set column for constant folding

View File

@ -62,7 +62,7 @@ size_t getCompoundTypeDepth(const IDataType & type)
}
template <typename Collection>
Block createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in)
ColumnsWithTypeAndName createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in)
{
assert(collection.size() == value_types.size());
size_t columns_size = block_types.size();
@ -132,16 +132,19 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes&
columns[i]->insert(tuple_values[i]);
}
Block res;
ColumnsWithTypeAndName res(columns_size);
for (size_t i = 0; i < columns_size; ++i)
res.insert(ColumnWithTypeAndName{std::move(columns[i]), block_types[i], "argument_" + toString(i)});
{
res[i].type = block_types[i];
res[i].column = std::move(columns[i]);
}
return res;
}
}
Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in)
ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in)
{
DataTypes set_element_types = {expression_type};
const auto * lhs_tuple_type = typeid_cast<const DataTypeTuple *>(expression_type.get());
@ -158,7 +161,7 @@ Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const
size_t lhs_type_depth = getCompoundTypeDepth(*expression_type);
size_t rhs_type_depth = getCompoundTypeDepth(*value_type);
Block result_block;
ColumnsWithTypeAndName result_block;
if (lhs_type_depth == rhs_type_depth)
{

View File

@ -19,6 +19,6 @@ using SetPtr = std::shared_ptr<Set>;
* Example: SELECT id FROM test_table WHERE id IN (1, 2, 3, 4);
* Example: SELECT id FROM test_table WHERE id IN ((1, 2), (3, 4));
*/
Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in);
ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in);
}

View File

@ -198,6 +198,9 @@ public:
/// Create copy of this column, but with recursively_convert_result_to_full_column_if_low_cardinality = true
ColumnPtr recursivelyConvertResultToFullColumnIfLowCardinality() const;
const FunctionBasePtr & getFunction() const { return function; }
const ColumnsWithTypeAndName & getCapturedColumns() const { return captured_columns; }
private:
size_t elements_size;
FunctionBasePtr function;

View File

@ -145,6 +145,9 @@ public:
void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const;
void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT);
void writeChangedBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
// A debugging aid.
std::string toString() const;
@ -479,6 +482,46 @@ void BaseSettings<TTraits>::write(WriteBuffer & out, SettingsWriteFormat format)
BaseSettingsHelpers::writeString(std::string_view{}, out);
}
template <typename TTraits>
void BaseSettings<TTraits>::writeChangedBinary(WriteBuffer & out) const
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
for (auto it = this->begin(); it != this->end(); ++it)
++num_settings;
writeVarUInt(num_settings, out);
for (const auto & field : *this)
{
BaseSettingsHelpers::writeString(field.getName(), out);
using Flags = BaseSettingsHelpers::Flags;
Flags flags{0};
BaseSettingsHelpers::writeFlags(flags, out);
accessor.writeBinary(*this, field.index, out);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::readBinary(ReadBuffer & in)
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
readVarUInt(num_settings, in);
for (size_t i = 0; i < num_settings; ++i)
{
String read_name = BaseSettingsHelpers::readString(in);
std::string_view name = TTraits::resolveName(read_name);
size_t index = accessor.find(name);
std::ignore = BaseSettingsHelpers::readFlags(in);
accessor.readBinary(*this, index, in);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::read(ReadBuffer & in, SettingsWriteFormat format)
{

View File

@ -1004,6 +1004,10 @@ void readQuoted(DecimalField<T> & x, ReadBuffer & buf);
void writeFieldText(const Field & x, WriteBuffer & buf);
void writeFieldBinary(const Field & x, WriteBuffer & buf);
Field readFieldBinary(ReadBuffer & buf);
String toString(const Field & x);
}

View File

@ -15,6 +15,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
{
bool first = true;
@ -209,4 +214,58 @@ JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description)
return json_array;
}
void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out)
{
writeVarUInt(sort_description.size(), out);
for (const auto & desc : sort_description)
{
writeStringBinary(desc.column_name, out);
UInt8 flags = 0;
if (desc.direction > 0)
flags |= 1;
if (desc.nulls_direction > 0)
flags |= 2;
if (desc.collator)
flags |= 4;
if (desc.with_fill)
flags |= 8;
writeIntBinary(flags, out);
if (desc.collator)
writeStringBinary(desc.collator->getLocale(), out);
if (desc.with_fill)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in serialized sort description");
}
}
void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in)
{
size_t size = 0;
readVarUInt(size, in);
sort_description.resize(size);
for (auto & desc : sort_description)
{
readStringBinary(desc.column_name, in);
UInt8 flags = 0;
readIntBinary(flags, in);
desc.direction = (flags & 1) ? 1 : -1;
desc.nulls_direction = (flags & 2) ? 1 : -1;
if (flags & 4)
{
String collator_locale;
readStringBinary(collator_locale, in);
if (!collator_locale.empty())
desc.collator = std::make_shared<Collator>(collator_locale);
}
if (flags & 8)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in deserialized sort description");
}
}
}

View File

@ -143,4 +143,11 @@ void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
std::string dumpSortDescription(const SortDescription & description);
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description);
class WriteBuffer;
class ReadBuffer;
void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out);
void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in);
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <Functions/IFunctionAdaptors.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypeFunction.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Columns/ColumnFunction.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFunction.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionAdaptors.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
@ -18,6 +18,18 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct LambdaCapture
{
Names captured_names;
DataTypes captured_types;
NamesAndTypesList lambda_arguments;
String return_name;
DataTypePtr return_type;
bool allow_constant_folding;
};
using LambdaCapturePtr = std::shared_ptr<LambdaCapture>;
class ExecutableFunctionExpression : public IExecutableFunction
{
public:
@ -30,14 +42,20 @@ public:
using SignaturePtr = std::shared_ptr<Signature>;
ExecutableFunctionExpression(ExpressionActionsPtr expression_actions_, SignaturePtr signature_)
: expression_actions(std::move(expression_actions_))
, signature(std::move(signature_))
{}
: expression_actions(std::move(expression_actions_)), signature(std::move(signature_))
{
}
String getName() const override { return "FunctionExpression"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
if (input_rows_count == 0)
return result_type->createColumn();
if (!expression_actions)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No actions were passed to FunctionExpression");
DB::Block expr_columns;
for (size_t i = 0; i < arguments.size(); ++i)
{
@ -48,7 +66,7 @@ public:
expression_actions->execute(expr_columns);
return expr_columns.getByName(signature->return_name).column;
return expr_columns.getByName(signature->return_name).column;
}
bool useDefaultImplementationForNulls() const override { return false; }
@ -71,13 +89,27 @@ public:
using Signature = ExecutableFunctionExpression::Signature;
using SignaturePtr = ExecutableFunctionExpression::SignaturePtr;
FunctionExpression(ExpressionActionsPtr expression_actions_,
DataTypes argument_types_, const Names & argument_names_,
DataTypePtr return_type_, const std::string & return_name_)
: expression_actions(std::move(expression_actions_))
, signature(std::make_shared<Signature>(Signature{argument_names_, return_name_}))
, argument_types(std::move(argument_types_)), return_type(std::move(return_type_))
FunctionExpression(LambdaCapturePtr capture_, ExpressionActionsPtr expression_actions_)
: expression_actions(std::move(expression_actions_))
, capture(std::move(capture_))
{
Names names;
DataTypes types;
names.reserve(capture->captured_names.size() + capture->lambda_arguments.size());
names.insert(names.end(), capture->captured_names.begin(), capture->captured_names.end());
types.reserve(capture->captured_types.size() + capture->lambda_arguments.size());
types.insert(types.end(), capture->captured_types.begin(), capture->captured_types.end());
for (const auto & lambda_argument : capture->lambda_arguments)
{
names.push_back(lambda_argument.name);
types.push_back(lambda_argument.type);
}
argument_types = std::move(types);
signature = std::make_shared<Signature>(Signature{names, capture->return_name});
}
String getName() const override { return "FunctionExpression"; }
@ -85,7 +117,10 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getResultType() const override { return return_type; }
const DataTypePtr & getResultType() const override { return capture->return_type; }
const LambdaCapture & getCapture() const { return *capture; }
const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); }
ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
{
@ -94,9 +129,11 @@ public:
private:
ExpressionActionsPtr expression_actions;
LambdaCapturePtr capture;
/// This is redundant and is built from capture.
SignaturePtr signature;
DataTypes argument_types;
DataTypePtr return_type;
};
/// Captures columns which are used by lambda function but not in argument list.
@ -106,20 +143,10 @@ private:
class ExecutableFunctionCapture : public IExecutableFunction
{
public:
struct Capture
ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, LambdaCapturePtr capture_)
: expression_actions(std::move(expression_actions_)), capture(std::move(capture_))
{
Names captured_names;
DataTypes captured_types;
NamesAndTypesList lambda_arguments;
String return_name;
DataTypePtr return_type;
bool allow_constant_folding;
};
using CapturePtr = std::shared_ptr<Capture>;
ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, CapturePtr capture_)
: expression_actions(std::move(expression_actions_)), capture(std::move(capture_)) {}
}
String getName() const override { return "FunctionCapture"; }
@ -148,8 +175,7 @@ public:
types.push_back(lambda_argument.type);
}
auto function = std::make_unique<FunctionExpression>(expression_actions, types, names,
capture->return_type, capture->return_name);
auto function = std::make_unique<FunctionExpression>(capture, expression_actions);
/// If all the captured arguments are constant, let's also return ColumnConst (with ColumnFunction inside it).
/// Consequently, it allows to treat higher order functions with constant arrays and constant captured columns
@ -175,17 +201,15 @@ public:
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
};
class FunctionCapture : public IFunctionBase
{
public:
using CapturePtr = ExecutableFunctionCapture::CapturePtr;
FunctionCapture(
ExpressionActionsPtr expression_actions_,
CapturePtr capture_,
LambdaCapturePtr capture_,
DataTypePtr return_type_,
String name_)
: expression_actions(std::move(expression_actions_))
@ -207,9 +231,12 @@ public:
return std::make_unique<ExecutableFunctionCapture>(expression_actions, capture);
}
const LambdaCapture & getCapture() const { return *capture; }
const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); }
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
DataTypePtr return_type;
String name;
};
@ -217,28 +244,23 @@ private:
class FunctionCaptureOverloadResolver : public IFunctionOverloadResolver
{
public:
using Capture = ExecutableFunctionCapture::Capture;
using CapturePtr = ExecutableFunctionCapture::CapturePtr;
FunctionCaptureOverloadResolver(
ExpressionActionsPtr expression_actions_,
const Names & captured_names,
const NamesAndTypesList & lambda_arguments,
const DataTypePtr & function_return_type,
const String & expression_return_name,
bool allow_constant_folding)
: expression_actions(std::move(expression_actions_))
ActionsDAG actions_dag,
const ExpressionActionsSettings & actions_settings,
const Names & captured_names,
const NamesAndTypesList & lambda_arguments,
const DataTypePtr & function_return_type,
const String & expression_return_name,
bool allow_constant_folding)
{
/// Check that expression does not contain unusual actions that will break columns structure.
for (const auto & action : expression_actions->getActions())
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression with arrayJoin or other unusual action cannot be captured");
if (actions_dag.hasArrayJoin())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression with arrayJoin or other unusual action cannot be captured");
std::unordered_map<std::string, DataTypePtr> arguments_map;
const auto & all_arguments = expression_actions->getRequiredColumnsWithTypes();
for (const auto & arg : all_arguments)
arguments_map[arg.name] = arg.type;
for (const auto * input : actions_dag.getInputs())
arguments_map[input->result_name] = input->result_type;
DataTypes captured_types;
captured_types.reserve(captured_names.size());
@ -263,14 +285,16 @@ public:
name = "Capture[" + toString(captured_types) + "](" + toString(argument_types) + ") -> "
+ function_return_type->getName();
capture = std::make_shared<Capture>(Capture{
.captured_names = captured_names,
.captured_types = std::move(captured_types),
.lambda_arguments = lambda_arguments,
.return_name = expression_return_name,
.return_type = function_return_type,
.allow_constant_folding = allow_constant_folding,
capture = std::make_shared<LambdaCapture>(LambdaCapture{
.captured_names = captured_names,
.captured_types = std::move(captured_types),
.lambda_arguments = lambda_arguments,
.return_name = expression_return_name,
.return_type = function_return_type,
.allow_constant_folding = allow_constant_folding,
});
expression_actions = std::make_shared<ExpressionActions>(std::move(actions_dag), actions_settings);
}
String getName() const override { return name; }
@ -288,7 +312,7 @@ public:
private:
ExpressionActionsPtr expression_actions;
CapturePtr capture;
LambdaCapturePtr capture;
DataTypePtr return_type;
String name;

View File

@ -3,15 +3,21 @@
#include <Analyzer/FunctionNode.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesBinaryEncoding.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/materialize.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/CastOverloadResolver.h>
#include <Functions/indexHint.h>
#include <Interpreters/Context.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/SetSerialization.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Core/SortDescription.h>
@ -20,6 +26,7 @@
#include <stack>
#include <base/sort.h>
#include <Common/JSONBuilder.h>
#include <DataTypes/DataTypeSet.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
@ -38,6 +45,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
}
namespace
@ -3205,4 +3213,430 @@ const ActionsDAG::Node * FindOriginalNodeForOutputName::find(const String & outp
return it->second;
}
static void serializeCapture(const LambdaCapture & capture, WriteBuffer & out)
{
writeStringBinary(capture.return_name, out);
encodeDataType(capture.return_type, out);
writeVarUInt(capture.captured_names.size(), out);
for (const auto & name : capture.captured_names)
writeStringBinary(name, out);
writeVarUInt(capture.captured_types.size(), out);
for (const auto & type : capture.captured_types)
encodeDataType(type, out);
writeVarUInt(capture.lambda_arguments.size(), out);
for (const auto & item : capture.lambda_arguments)
{
writeStringBinary(item.name, out);
encodeDataType(item.type, out);
}
}
static void deserializeCapture(LambdaCapture & capture, ReadBuffer & in)
{
readStringBinary(capture.return_name, in);
capture.return_type = decodeDataType(in);
UInt64 num_names;
readVarUInt(num_names, in);
capture.captured_names.resize(num_names);
for (auto & name : capture.captured_names)
readStringBinary(name, in);
UInt64 num_types;
readVarUInt(num_types, in);
capture.captured_types.resize(num_types);
for (auto & type : capture.captured_types)
type = decodeDataType(in);
UInt64 num_args;
readVarUInt(num_args, in);
capture.lambda_arguments.clear();
for (size_t i = 0; i < num_args; ++i)
{
NameAndTypePair name_and_type;
readStringBinary(name_and_type.name, in);
name_and_type.type = decodeDataType(in);
capture.lambda_arguments.push_back(std::move(name_and_type));
}
}
static void serialzieConstant(
const IDataType & type,
const IColumn & value,
WriteBuffer & out,
SerializedSetsRegistry & registry)
{
if (WhichDataType(type).isSet())
{
const IColumn * maybe_set = &value;
if (const auto * column_const = typeid_cast<const ColumnConst *>(maybe_set))
maybe_set = &column_const->getDataColumn();
const auto * column_set = typeid_cast<const ColumnSet *>(maybe_set);
if (!column_set)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ColumnSet is expected for DataTypeSet. Got {}", value.getName());
auto hash = column_set->getData()->getHash();
writeBinary(hash, out);
registry.sets.emplace(hash, column_set->getData());
return;
}
if (WhichDataType(type).isFunction())
{
const IColumn * maybe_function = &value;
if (const auto * column_const = typeid_cast<const ColumnConst *>(maybe_function))
maybe_function = &column_const->getDataColumn();
const auto * column_function = typeid_cast<const ColumnFunction *>(maybe_function);
if (!column_function)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ColumnSet is expected for DataTypeSet. Got {}", value.getName());
auto function = column_function->getFunction();
const auto * function_expression = typeid_cast<const FunctionExpression *>(function.get());
if (!function_expression)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected FunctionExpression for ColumnFunction. Got {}", function->getName());
serializeCapture(function_expression->getCapture(), out);
function_expression->getAcionsDAG().serialize(out, registry);
const auto & captured_columns = column_function->getCapturedColumns();
writeVarUInt(captured_columns.size(), out);
for (const auto & captured_column : captured_columns)
{
encodeDataType(captured_column.type, out);
serialzieConstant(*captured_column.type, *captured_column.column, out, registry);
}
return;
}
const auto * const_column = typeid_cast<const ColumnConst *>(&value);
if (!const_column)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot serialize non-constant column {}", value.getName());
const auto & data = const_column->getDataColumn();
type.getDefaultSerialization()->serializeBinary(data, 0, out, FormatSettings{});
}
static MutableColumnPtr deserializeConstant(
const IDataType & type,
ReadBuffer & in,
DeserializedSetsRegistry & registry,
const ContextPtr & context)
{
if (WhichDataType(type).isSet())
{
FutureSet::Hash hash;
readBinary(hash, in);
auto column_set = ColumnSet::create(0, nullptr);
registry.sets[hash].push_back(column_set.get());
return column_set;
}
if (WhichDataType(type).isFunction())
{
LambdaCapture capture;
deserializeCapture(capture, in);
auto capture_dag = ActionsDAG::deserialize(in, registry, context);
UInt64 num_captured_columns;
readVarUInt(num_captured_columns, in);
ColumnsWithTypeAndName captured_columns(num_captured_columns);
for (auto & captured_column : captured_columns)
{
captured_column.type = decodeDataType(in);
captured_column.column = deserializeConstant(*captured_column.type, in, registry, context);
}
auto function_expression = std::make_shared<FunctionExpression>(
std::make_shared<LambdaCapture>(std::move(capture)),
std::make_shared<ExpressionActions>(
std::move(capture_dag),
ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
return ColumnFunction::create(1, std::move(function_expression), std::move(captured_columns));
}
auto column = type.createColumn();
type.getDefaultSerialization()->deserializeBinary(*column, in, FormatSettings{});
return ColumnConst::create(std::move(column), 0);
}
void ActionsDAG::serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const
{
size_t nodes_size = nodes.size();
writeVarUInt(nodes_size, out);
std::unordered_map<const Node *, size_t> node_to_id;
for (const auto & node : nodes)
node_to_id.emplace(&node, node_to_id.size());
if (nodes_size != node_to_id.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate nodes in ActionsDAG");
for (const auto & node : nodes)
{
writeIntBinary(static_cast<UInt8>(node.type), out);
writeStringBinary(node.result_name, out);
encodeDataType(node.result_type, out);
writeVarUInt(node.children.size(), out);
for (const auto * child : node.children)
writeVarUInt(node_to_id.at(child), out);
/// Serialize column if it is present
const bool has_column = node.column != nullptr;
UInt8 column_flags = 0;
if (has_column)
{
column_flags |= 1;
if (node.is_deterministic_constant)
column_flags |= 2;
}
const auto * function_capture = typeid_cast<const FunctionCapture *>(node.function_base.get());
if (function_capture)
column_flags |= 4;
writeIntBinary(column_flags, out);
if (has_column)
serialzieConstant(*node.result_type, *node.column, out, registry);
if (node.type == ActionType::INPUT)
{
/// nothing to serialize
}
else if (node.type == ActionType::COLUMN)
{
/// nothing to serialize, column is already serialized
}
else if (node.type == ActionType::ALIAS)
{
/// nothing to serialize
}
else if (node.type == ActionType::FUNCTION)
{
writeStringBinary(node.function_base->getName(), out);
if (function_capture)
{
serializeCapture(function_capture->getCapture(), out);
function_capture->getAcionsDAG().serialize(out, registry);
}
}
else if (node.type == ActionType::ARRAY_JOIN)
{
/// nothing to serialize
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast<size_t>(node.type));
}
}
writeVarUInt(inputs.size(), out);
for (const auto * input : inputs)
writeVarUInt(node_to_id.at(input), out);
writeVarUInt(outputs.size(), out);
for (const auto * output : outputs)
writeVarUInt(node_to_id.at(output), out);
}
ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context)
{
size_t nodes_size;
readVarUInt(nodes_size, in);
std::list<Node> nodes;
std::unordered_map<size_t, Node *> id_to_node;
for (size_t i = 0; i < nodes_size; ++i)
id_to_node.emplace(i, &nodes.emplace_back(Node{}));
for (size_t i = 0; i < nodes_size; ++i)
{
Node & node = *id_to_node.at(i);
UInt8 action_type{0};
readIntBinary(action_type, in);
if (action_type > static_cast<UInt8>(ActionType::FUNCTION))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type {}", size_t(action_type));
node.type = static_cast<ActionType>(action_type);
readStringBinary(node.result_name, in);
node.result_type = decodeDataType(in);
size_t children_size;
readVarUInt(children_size, in);
for (size_t j = 0; j < children_size; ++j)
{
size_t child_id;
readVarUInt(child_id, in);
node.children.push_back(id_to_node.at(child_id));
}
UInt8 column_flags = 0;
readIntBinary(column_flags, in);
/// Deserialize column if it is present
if (column_flags & 1)
{
if ((column_flags & 2) == 0)
node.is_deterministic_constant = false;
node.column = deserializeConstant(*node.result_type, in, registry, context);
}
if (node.type == ActionType::INPUT)
{
/// nothing to deserialize
if (!node.children.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized input can't have children");
}
else if (node.type == ActionType::COLUMN)
{
/// Column is already deserialized
if (!node.children.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized column can't have children");
}
else if (node.type == ActionType::ALIAS)
{
/// nothing to deserialize
if (node.children.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized alias must have one children");
}
else if (node.type == ActionType::FUNCTION)
{
String function_name;
readStringBinary(function_name, in);
ColumnsWithTypeAndName arguments;
arguments.reserve(node.children.size());
for (const auto * child : node.children)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
arguments.emplace_back(std::move(argument));
}
if (column_flags & 4)
{
LambdaCapture capture;
deserializeCapture(capture, in);
auto capture_dag = ActionsDAG::deserialize(in, registry, context);
node.function_base = std::make_shared<FunctionCapture>(
std::make_shared<ExpressionActions>(
std::move(capture_dag),
ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)),
std::make_shared<LambdaCapture>(std::move(capture)),
node.result_type,
function_name);
node.function = node.function_base->prepare(arguments);
node.is_function_compiled = false;
}
else
{
auto function = FunctionFactory::instance().get(function_name, context);
node.function_base = function->build(arguments);
node.function = node.function_base->prepare(arguments);
node.is_function_compiled = false;
auto lhs_type = node.result_type;
if (const auto * lhs_tuple = typeid_cast<const DataTypeTuple *>(lhs_type.get()))
lhs_type = std::make_shared<DataTypeTuple>(lhs_tuple->getElements());
auto rhs_type = node.function_base->getResultType();
if (const auto * rhs_tuple = typeid_cast<const DataTypeTuple *>(rhs_type.get()))
rhs_type = std::make_shared<DataTypeTuple>(rhs_tuple->getElements());
if (!lhs_type->equals(*lhs_type))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized function {} has invalid type. Expected {}, deserialized {}.",
function_name,
rhs_type->getName(),
lhs_type->getName());
}
}
else if (node.type == ActionType::ARRAY_JOIN)
{
/// nothing to deserialize
if (node.children.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized array join must have one children");
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast<size_t>(node.type));
}
}
size_t inputs_size;
readVarUInt(inputs_size, in);
std::vector<const Node *> inputs;
std::unordered_set<const Node *> inputs_set;
inputs.reserve(inputs_size);
for (size_t i = 0; i < inputs_size; ++i)
{
size_t input_id;
readVarUInt(input_id, in);
const auto * input = id_to_node.at(input_id);
if (input->type != ActionType::INPUT)
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized input {} has type {}",
input->result_name, magic_enum::enum_name(input->type));
if (!inputs_set.emplace(input).second)
throw Exception(ErrorCodes::INCORRECT_DATA,
"Duplicate input {}", input->result_name);
inputs.push_back(input);
}
size_t outputs_size;
readVarUInt(outputs_size, in);
std::vector<const Node *> outputs;
outputs.reserve(outputs_size);
for (size_t i = 0; i < outputs_size; ++i)
{
size_t output_id;
readVarUInt(output_id, in);
outputs.push_back(id_to_node.at(output_id));
}
for (const auto & node : nodes)
if (node.type == ActionType::INPUT && !inputs_set.contains(&node))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized input {} is not in the inputs list",
node.result_name);
ActionsDAG dag;
dag.nodes = std::move(nodes);
dag.inputs = std::move(inputs);
dag.outputs = std::move(outputs);
return dag;
}
}

View File

@ -35,6 +35,9 @@ namespace JSONBuilder
class SortDescription;
struct SerializedSetsRegistry;
struct DeserializedSetsRegistry;
/// Directed acyclic graph of expressions.
/// This is an intermediate representation of actions which is usually built from expression list AST.
/// Node of DAG describe calculation of a single column with known type, name, and constant value (if applicable).
@ -130,6 +133,9 @@ public:
std::string dumpNames() const;
std::string dumpDAG() const;
void serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const;
static ActionsDAG deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context);
const Node & addInput(std::string name, DataTypePtr type);
const Node & addInput(ColumnWithTypeAndName column);
const Node & addColumn(ColumnWithTypeAndName column);

View File

@ -111,7 +111,7 @@ static size_t getTypeDepth(const DataTypePtr & type)
/// 33.33 in the set is converted to 33.3, but it is not equal to 33.3 in the column, so the result should still be empty.
/// We can not include values that don't represent any possible value from the type of filtered column to the set.
template<typename Collection>
static Block createBlockFromCollection(const Collection & collection, const DataTypes & value_types, const DataTypes & types, bool transform_null_in)
static ColumnsWithTypeAndName createBlockFromCollection(const Collection & collection, const DataTypes & value_types, const DataTypes & types, bool transform_null_in)
{
size_t columns_num = types.size();
MutableColumns columns(columns_num);
@ -169,9 +169,13 @@ static Block createBlockFromCollection(const Collection & collection, const Data
}
}
Block res;
ColumnsWithTypeAndName res(columns_num);
for (size_t i = 0; i < columns_num; ++i)
res.insert(ColumnWithTypeAndName{std::move(columns[i]), types[i], "_" + toString(i)});
{
res[i].type = types[i];
res[i].column = std::move(columns[i]);
}
return res;
}
@ -189,16 +193,14 @@ static Field extractValueFromNode(const ASTPtr & node, const IDataType & type, C
throw Exception(ErrorCodes::INCORRECT_ELEMENT_OF_SET, "Incorrect element of set. Must be literal or constant expression.");
}
static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, ContextPtr context)
static ColumnsWithTypeAndName createBlockFromAST(const ASTPtr & node, const DataTypes & types, ContextPtr context)
{
/// Will form a block with values from the set.
Block header;
size_t num_columns = types.size();
MutableColumns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
header.insert(ColumnWithTypeAndName(types[i]->createColumn(), types[i], "_" + toString(i)));
MutableColumns columns = header.cloneEmptyColumns();
columns[i] = types[i]->createColumn();
DataTypePtr tuple_type;
Row tuple_values;
@ -290,7 +292,14 @@ static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, Co
throw Exception(ErrorCodes::INCORRECT_ELEMENT_OF_SET, "Incorrect element of set");
}
return header.cloneWithColumns(std::move(columns));
ColumnsWithTypeAndName res(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
res[i].type = types[i];
res[i].column = std::move(columns[i]);
}
return res;
}
@ -304,7 +313,7 @@ namespace
* We need special implementation for ASTFunction, because in case, when we interpret
* large tuple or array as function, `evaluateConstantExpression` works extremely slow.
*/
Block createBlockForSet(
ColumnsWithTypeAndName createBlockForSet(
const DataTypePtr & left_arg_type,
const ASTPtr & right_arg,
const DataTypes & set_element_types,
@ -321,7 +330,7 @@ Block createBlockForSet(
type->getName());
};
Block block;
ColumnsWithTypeAndName block;
bool tranform_null_in = context->getSettingsRef()[Setting::transform_null_in];
/// 1 in 1; (1, 2) in (1, 2); identity(tuple(tuple(tuple(1)))) in tuple(tuple(tuple(1))); etc.
@ -360,7 +369,7 @@ Block createBlockForSet(
* 'set_element_types' - types of what are on the left hand side of IN.
* 'right_arg' - Literal - Tuple or Array.
*/
Block createBlockForSet(
ColumnsWithTypeAndName createBlockForSet(
const DataTypePtr & left_arg_type,
const std::shared_ptr<ASTFunction> & right_arg,
const DataTypes & set_element_types,
@ -442,14 +451,14 @@ FutureSetPtr makeExplicitSet(
if (const auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(element_type.get()))
element_type = low_cardinality_type->getDictionaryType();
Block block;
ColumnsWithTypeAndName block;
const auto & right_arg_func = std::dynamic_pointer_cast<ASTFunction>(right_arg);
if (right_arg_func && (right_arg_func->name == "tuple" || right_arg_func->name == "array"))
block = createBlockForSet(left_arg_type, right_arg_func, set_element_types, context);
else
block = createBlockForSet(left_arg_type, right_arg, set_element_types, context);
return prepared_sets.addFromTuple(set_key, block, context->getSettingsRef());
return prepared_sets.addFromTuple(set_key, std::move(block), context->getSettingsRef());
}
class ScopeStack::Index
@ -1291,14 +1300,10 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
String result_name = lambda->arguments->children.at(1)->getColumnName();
lambda_dag.removeUnusedActions(Names(1, result_name));
auto lambda_actions = std::make_shared<ExpressionActions>(
std::move(lambda_dag),
ExpressionActionsSettings::fromContext(data.getContext(), CompileExpressions::yes));
DataTypePtr result_type = lambda_actions->getSampleBlock().getByName(result_name).type;
DataTypePtr result_type = lambda_dag.findInOutputs(result_name).result_type;
Names captured;
Names required = lambda_actions->getRequiredColumns();
Names required = lambda_dag.getRequiredColumnsNames();
for (const auto & required_arg : required)
if (findColumn(required_arg, lambda_arguments) == lambda_arguments.end())
captured.push_back(required_arg);
@ -1307,8 +1312,9 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
/// because it does not uniquely define the expression (the types of arguments can be different).
String lambda_name = data.getUniqueName("__lambda");
auto actions_settings = ExpressionActionsSettings::fromContext(data.getContext(), CompileExpressions::yes);
auto function_capture = std::make_shared<FunctionCaptureOverloadResolver>(
lambda_actions, captured, lambda_arguments, result_type, result_name, false);
std::move(lambda_dag), actions_settings, captured, lambda_arguments, result_type, result_name, false);
data.addFunction(function_capture, captured, lambda_name);
argument_types[i] = std::make_shared<DataTypeFunction>(lambda_type->getArgumentTypes(), result_type);

View File

@ -1,13 +1,21 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <IO/Operators.h>
#include <Interpreters/AggregateDescription.h>
#include <Common/FieldVisitorToString.h>
#include <Common/JSONBuilder.h>
#include <DataTypes/DataTypesBinaryEncoding.h>
#include <Parsers/NullsAction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void AggregateDescription::explain(WriteBuffer & out, size_t indent) const
{
String prefix(indent, ' ');
@ -121,4 +129,77 @@ void AggregateDescription::explain(JSONBuilder::JSONMap & map) const
map.add("Arguments", std::move(args_array));
}
void serializeAggregateDescriptions(const AggregateDescriptions & aggregates, WriteBuffer & out)
{
writeVarUInt(aggregates.size(), out);
for (const auto & aggregate : aggregates)
{
writeStringBinary(aggregate.column_name, out);
UInt64 num_args = aggregate.argument_names.size();
const auto & argument_types = aggregate.function->getArgumentTypes();
if (argument_types.size() != num_args)
{
WriteBufferFromOwnString buf;
aggregate.explain(buf, 0);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid number of for aggregate function. Expected {}, got {}. Description:\n{}",
argument_types.size(), num_args, buf.str());
}
writeVarUInt(num_args, out);
for (size_t i = 0; i < num_args; ++i)
{
writeStringBinary(aggregate.argument_names[i], out);
encodeDataType(argument_types[i], out);
}
writeStringBinary(aggregate.function->getName(), out);
writeVarUInt(aggregate.parameters.size(), out);
for (const auto & param : aggregate.parameters)
writeFieldBinary(param, out);
}
}
void deserializeAggregateDescriptions(AggregateDescriptions & aggregates, ReadBuffer & in)
{
UInt64 num_aggregates;
readVarUInt(num_aggregates, in);
aggregates.resize(num_aggregates);
for (auto & aggregate : aggregates)
{
readStringBinary(aggregate.column_name, in);
UInt64 num_args;
readVarUInt(num_args, in);
aggregate.argument_names.resize(num_args);
DataTypes argument_types;
argument_types.reserve(num_args);
for (auto & arg_name : aggregate.argument_names)
{
readStringBinary(arg_name, in);
argument_types.emplace_back(decodeDataType(in));
}
String function_name;
readStringBinary(function_name, in);
UInt64 num_params;
readVarUInt(num_params, in);
aggregate.parameters.resize(num_params);
for (auto & param : aggregate.parameters)
param = readFieldBinary(in);
auto action = NullsAction::EMPTY; /// As I understand, it should be resolved to function name.
AggregateFunctionProperties properties;
aggregate.function = AggregateFunctionFactory::instance().get(
function_name, action, argument_types, aggregate.parameters, properties);
}
}
}

View File

@ -25,4 +25,8 @@ struct AggregateDescription
};
using AggregateDescriptions = std::vector<AggregateDescription>;
void serializeAggregateDescriptions(const AggregateDescriptions & aggregates, WriteBuffer & out);
void deserializeAggregateDescriptions(AggregateDescriptions & aggregates, ReadBuffer & in);
}

View File

@ -76,22 +76,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace Setting
{
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsDouble max_bytes_ratio_before_external_group_by;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
};
}
namespace
@ -195,45 +179,59 @@ Block Aggregator::getHeader(bool final) const
}
Aggregator::Params::Params(
const Settings & settings,
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
size_t max_threads_,
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
float min_hit_rate_to_use_consecutive_keys_optimization_,
const StatsCollectingParams & stats_collecting_params_)
: keys(keys_)
, keys_size(keys.size())
, aggregates(aggregates_)
, aggregates_size(aggregates.size())
, overflow_row(overflow_row_)
, max_rows_to_group_by(settings[Setting::max_rows_to_group_by])
, group_by_overflow_mode(settings[Setting::group_by_overflow_mode])
, max_rows_to_group_by(max_rows_to_group_by_)
, group_by_overflow_mode(group_by_overflow_mode_)
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(settings[Setting::max_bytes_before_external_group_by])
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_data_scope(std::move(tmp_data_scope_))
, max_threads(settings[Setting::max_threads])
, min_free_disk_space(settings[Setting::min_free_disk_space_for_temporary_data])
, compile_aggregate_expressions(settings[Setting::compile_aggregate_expressions])
, min_count_to_compile_aggregate_expression(settings[Setting::min_count_to_compile_aggregate_expression])
, max_block_size(settings[Setting::max_block_size])
, max_threads(max_threads_)
, min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_)
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, enable_prefetch(settings[Setting::enable_software_prefetch_in_aggregation])
, optimize_group_by_constant_keys(settings[Setting::optimize_group_by_constant_keys])
, min_hit_rate_to_use_consecutive_keys_optimization(settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization])
, enable_prefetch(enable_prefetch_)
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_)
, stats_collecting_params(stats_collecting_params_)
{
if (settings[Setting::max_bytes_ratio_before_external_group_by] != 0.)
}
size_t Aggregator::Params::getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by)
{
if (max_bytes_ratio_before_external_group_by != 0.)
{
if (settings[Setting::max_bytes_before_external_group_by] > 0)
if (max_bytes_before_external_group_by > 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings max_bytes_ratio_before_external_group_by and max_bytes_before_external_group_by cannot be set simultaneously");
double ratio = settings[Setting::max_bytes_ratio_before_external_group_by];
double ratio = max_bytes_ratio_before_external_group_by;
if (ratio < 0 || ratio >= 1.)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting max_bytes_ratio_before_external_group_by should be >= 0 and < 1 ({})", ratio);
@ -251,6 +249,8 @@ Aggregator::Params::Params(
LOG_WARNING(getLogger("Aggregator"), "No system memory limits configured. Ignoring max_bytes_ratio_before_external_group_by");
}
}
return max_bytes_before_external_group_by;
}
Aggregator::Params::Params(
@ -259,7 +259,7 @@ Aggregator::Params::Params(
bool overflow_row_,
size_t max_threads_,
size_t max_block_size_,
double min_hit_rate_to_use_consecutive_keys_optimization_)
float min_hit_rate_to_use_consecutive_keys_optimization_)
: keys(keys_)
, keys_size(keys.size())
, aggregates(aggregates_)

View File

@ -131,19 +131,31 @@ public:
bool only_merge = false;
bool enable_prefetch = false;
bool optimize_group_by_constant_keys = false;
const double min_hit_rate_to_use_consecutive_keys_optimization = 0.;
const float min_hit_rate_to_use_consecutive_keys_optimization = 0.;
StatsCollectingParams stats_collecting_params;
static size_t getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by);
Params(
const Settings & settings,
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
size_t max_threads_,
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
float min_hit_rate_to_use_consecutive_keys_optimization_,
const StatsCollectingParams & stats_collecting_params_);
/// Only parameters that matter during merge.
@ -153,7 +165,7 @@ public:
bool overflow_row_,
size_t max_threads_,
size_t max_block_size_,
double min_hit_rate_to_use_consecutive_keys_optimization_);
float min_hit_rate_to_use_consecutive_keys_optimization_);
Params cloneWithKeys(const Names & keys_, bool only_merge_ = false)
{

View File

@ -131,7 +131,6 @@ namespace Setting
extern const SettingsBool extremes;
extern const SettingsBool final;
extern const SettingsBool force_aggregation_in_order;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 group_by_two_level_threshold;
extern const SettingsUInt64 group_by_two_level_threshold_bytes;
extern const SettingsBool group_by_use_nulls;
@ -149,14 +148,12 @@ namespace Setting
extern const SettingsUInt64 max_result_rows;
extern const SettingsUInt64 max_rows_in_distinct;
extern const SettingsUInt64 max_rows_in_set_to_optimize_join;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsUInt64 max_rows_to_read;
extern const SettingsUInt64 max_size_to_preallocate_for_aggregation;
extern const SettingsFloat max_streams_to_max_threads_ratio;
extern const SettingsUInt64 max_subquery_depth;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_count_to_compile_sort_description;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsBool multiple_joins_try_to_keep_original_names;
extern const SettingsBool optimize_aggregation_in_order;
extern const SettingsBool optimize_move_to_prewhere;
@ -178,6 +175,16 @@ namespace Setting
extern const SettingsTotalsMode totals_mode;
extern const SettingsBool use_concurrency_control;
extern const SettingsBool use_with_fill_by_sorting_prefix;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsDouble max_bytes_ratio_before_external_group_by;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
}
namespace ServerSetting
@ -2751,16 +2758,29 @@ static Aggregator::Params getAggregatorParams(
return Aggregator::Params
{
settings,
keys,
aggregates,
overflow_row,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty() && query_analyzer.hasConstAggregationKeys()),
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty()
&& query_analyzer.hasConstAggregationKeys()),
context.getTempDataOnDisk(),
/* only_merge_= */ false,
stats_collecting_params};
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params
};
}
void InterpreterSelectQuery::executeAggregation(

View File

@ -38,7 +38,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static SizeLimits getSizeLimitsForSet(const Settings & settings)
SizeLimits PreparedSets::getSizeLimitsForSet(const Settings & settings)
{
return SizeLimits(settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]);
}
@ -59,8 +59,9 @@ static bool equals(const DataTypes & lhs, const DataTypes & rhs)
}
FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {}
FutureSetFromStorage::FutureSetFromStorage(Hash hash_, SetPtr set_) : hash(hash_), set(std::move(set_)) {}
SetPtr FutureSetFromStorage::get() const { return set; }
FutureSet::Hash FutureSetFromStorage::getHash() const { return hash; }
DataTypes FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); }
SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &)
@ -69,24 +70,41 @@ SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &)
}
FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
FutureSetFromTuple::FutureSetFromTuple(
Hash hash_, ColumnsWithTypeAndName block,
bool transform_null_in, SizeLimits size_limits)
: hash(hash_)
{
auto size_limits = getSizeLimitsForSet(settings);
set = std::make_shared<Set>(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]);
set->setHeader(block.cloneEmpty().getColumnsWithTypeAndName());
ColumnsWithTypeAndName header = block;
for (auto & elem : header)
elem.column = elem.column->cloneEmpty();
set = std::make_shared<Set>(size_limits, 0, transform_null_in);
set->setHeader(header);
Columns columns;
columns.reserve(block.columns());
columns.reserve(block.size());
size_t num_rows = 0;
for (const auto & column : block)
{
columns.emplace_back(column.column);
num_rows = column.column->size();
}
set_key_columns.filter = ColumnUInt8::create(block.rows());
set_key_columns.filter = ColumnUInt8::create(num_rows);
set->insertFromColumns(columns, set_key_columns);
set->finishInsert();
for (const auto & type : set->getElementsTypes())
{
auto name = type->getName();
hash = CityHash_v1_0_2::CityHash128WithSeed(name.data(), name.size(), hash);
}
}
DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
FutureSet::Hash FutureSetFromTuple::getHash() const { return hash; }
SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)
{
@ -107,34 +125,33 @@ SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)
FutureSetFromSubquery::FutureSetFromSubquery(
String key,
Hash hash_,
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings)
: external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_))
bool transform_null_in,
SizeLimits size_limits,
size_t max_size_for_index)
: hash(hash_), external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_))
{
set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key);
set_and_key->key = PreparedSets::toString(hash_, {});
auto size_limits = getSizeLimitsForSet(settings);
set_and_key->set
= std::make_shared<Set>(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]);
set_and_key->set = std::make_shared<Set>(size_limits, max_size_for_index, transform_null_in);
set_and_key->set->setHeader(source->getCurrentHeader().getColumnsWithTypeAndName());
}
FutureSetFromSubquery::FutureSetFromSubquery(
String key,
Hash hash_,
QueryTreeNodePtr query_tree_,
const Settings & settings)
: query_tree(std::move(query_tree_))
bool transform_null_in,
SizeLimits size_limits,
size_t max_size_for_index)
: hash(hash_), query_tree(std::move(query_tree_))
{
set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key);
auto size_limits = getSizeLimitsForSet(settings);
set_and_key->set
= std::make_shared<Set>(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]);
set_and_key->key = PreparedSets::toString(hash_, {});
set_and_key->set = std::make_shared<Set>(size_limits, max_size_for_index, transform_null_in);
}
FutureSetFromSubquery::~FutureSetFromSubquery() = default;
@ -158,6 +175,8 @@ DataTypes FutureSetFromSubquery::getTypes() const
return set_and_key->set->getElementsTypes();
}
FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; }
std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const ContextPtr & context)
{
if (set_and_key->set->isCreated())
@ -266,9 +285,12 @@ String PreparedSets::toString(const PreparedSets::Hash & key, const DataTypes &
return buf.str();
}
FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, Block block, const Settings & settings)
FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, ColumnsWithTypeAndName block, const Settings & settings)
{
auto from_tuple = std::make_shared<FutureSetFromTuple>(std::move(block), settings);
auto size_limits = getSizeLimitsForSet(settings);
auto from_tuple = std::make_shared<FutureSetFromTuple>(
key, std::move(block),
settings[Setting::transform_null_in], size_limits);
const auto & set_types = from_tuple->getTypes();
auto & sets_by_hash = sets_from_tuple[key];
@ -282,7 +304,7 @@ FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, Block block,
FutureSetFromStoragePtr PreparedSets::addFromStorage(const Hash & key, SetPtr set_)
{
auto from_storage = std::make_shared<FutureSetFromStorage>(std::move(set_));
auto from_storage = std::make_shared<FutureSetFromStorage>(key, std::move(set_));
auto [it, inserted] = sets_from_storage.emplace(key, from_storage);
if (!inserted)
@ -298,8 +320,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery(
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings)
{
auto size_limits = getSizeLimitsForSet(settings);
auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), settings);
key, std::move(source), std::move(external_table), std::move(external_table_set),
settings[Setting::transform_null_in], size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values]);
auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);
@ -314,10 +338,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery(
QueryTreeNodePtr query_tree,
const Settings & settings)
{
auto size_limits = getSizeLimitsForSet(settings);
auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}),
std::move(query_tree),
settings);
key, std::move(query_tree),
settings[Setting::transform_null_in], size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values]);
auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);

View File

@ -9,6 +9,8 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/SetKeys.h>
#include <QueryPipeline/SizeLimits.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace DB
{
@ -50,6 +52,9 @@ public:
virtual DataTypes getTypes() const = 0;
/// If possible, return set with stored elements useful for PK analysis.
virtual SetPtr buildOrderedSetInplace(const ContextPtr & context) = 0;
using Hash = CityHash_v1_0_2::uint128;
virtual Hash getHash() const = 0;
};
using FutureSetPtr = std::shared_ptr<FutureSet>;
@ -59,13 +64,15 @@ using FutureSetPtr = std::shared_ptr<FutureSet>;
class FutureSetFromStorage final : public FutureSet
{
public:
explicit FutureSetFromStorage(SetPtr set_);
explicit FutureSetFromStorage(Hash hash_, SetPtr set_);
SetPtr get() const override;
DataTypes getTypes() const override;
SetPtr buildOrderedSetInplace(const ContextPtr &) override;
Hash getHash() const override;
private:
Hash hash;
SetPtr set;
};
@ -76,14 +83,16 @@ using FutureSetFromStoragePtr = std::shared_ptr<FutureSetFromStorage>;
class FutureSetFromTuple final : public FutureSet
{
public:
FutureSetFromTuple(Block block, const Settings & settings);
FutureSetFromTuple(Hash hash_, ColumnsWithTypeAndName block, bool transform_null_in, SizeLimits size_limits);
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
DataTypes getTypes() const override;
Hash getHash() const override;
private:
Hash hash;
SetPtr set;
SetKeyColumns set_key_columns;
};
@ -106,21 +115,26 @@ class FutureSetFromSubquery final : public FutureSet
{
public:
FutureSetFromSubquery(
String key,
Hash hash_,
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings);
bool transform_null_in,
SizeLimits size_limits,
size_t max_size_for_index);
FutureSetFromSubquery(
String key,
Hash hash_,
QueryTreeNodePtr query_tree_,
const Settings & settings);
bool transform_null_in,
SizeLimits size_limits,
size_t max_size_for_index);
~FutureSetFromSubquery() override;
SetPtr get() const override;
DataTypes getTypes() const override;
Hash getHash() const override;
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
std::unique_ptr<QueryPlan> build(const ContextPtr & context);
@ -130,6 +144,7 @@ public:
void setQueryPlan(std::unique_ptr<QueryPlan> source_);
private:
Hash hash;
SetAndKeyPtr set_and_key;
StoragePtr external_table;
std::shared_ptr<FutureSetFromSubquery> external_table_set;
@ -156,7 +171,7 @@ public:
using SetsFromSubqueries = std::unordered_map<Hash, FutureSetFromSubqueryPtr, Hashing>;
FutureSetFromStoragePtr addFromStorage(const Hash & key, SetPtr set_);
FutureSetFromTuplePtr addFromTuple(const Hash & key, Block block, const Settings & settings);
FutureSetFromTuplePtr addFromTuple(const Hash & key, ColumnsWithTypeAndName block, const Settings & settings);
FutureSetFromSubqueryPtr addFromSubquery(
const Hash & key,
@ -183,6 +198,7 @@ public:
// const SetsFromSubqueries & getSetsFromSubquery() const { return sets_from_subqueries; }
static String toString(const Hash & key, const DataTypes & types);
static SizeLimits getSizeLimitsForSet(const Settings & settings);
private:
SetsFromTuple sets_from_tuple;

View File

@ -0,0 +1,32 @@
#pragma once
#include <Interpreters/PreparedSets.h>
namespace DB
{
class FutureSet;
using FutureSetPtr = std::shared_ptr<FutureSet>;
struct SerializedSetsRegistry
{
struct Hashing
{
UInt64 operator()(const FutureSet::Hash & key) const { return key.low64 ^ key.high64; }
};
std::unordered_map<FutureSet::Hash, FutureSetPtr, Hashing> sets;
};
class ColumnSet;
struct DeserializedSetsRegistry
{
struct Hashing
{
UInt64 operator()(const FutureSet::Hash & key) const { return key.low64 ^ key.high64; }
};
std::unordered_map<FutureSet::Hash, std::list<ColumnSet *>, Hashing> sets;
};
}

View File

@ -106,7 +106,6 @@ namespace Setting
extern const SettingsBool exact_rows_before_limit;
extern const SettingsBool extremes;
extern const SettingsBool force_aggregation_in_order;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 group_by_two_level_threshold;
extern const SettingsUInt64 group_by_two_level_threshold_bytes;
extern const SettingsBool group_by_use_nulls;
@ -115,9 +114,7 @@ namespace Setting
extern const SettingsUInt64 max_size_to_preallocate_for_aggregation;
extern const SettingsUInt64 max_subquery_depth;
extern const SettingsUInt64 max_rows_in_distinct;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsMaxThreads max_threads;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsBool parallel_replicas_allow_in_with_subquery;
extern const SettingsString parallel_replicas_custom_key;
extern const SettingsUInt64 parallel_replicas_min_number_of_rows_per_replica;
@ -126,6 +123,16 @@ namespace Setting
extern const SettingsFloat totals_auto_threshold;
extern const SettingsTotalsMode totals_mode;
extern const SettingsBool use_with_fill_by_sorting_prefix;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsDouble max_bytes_ratio_before_external_group_by;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
}
namespace ServerSetting
@ -429,15 +436,27 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
}
Aggregator::Params aggregator_params = Aggregator::Params(
settings,
aggregation_analysis_result.aggregation_keys,
aggregate_descriptions,
query_analysis_result.aggregate_overflow_row,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
settings[Setting::group_by_two_level_threshold],
settings[Setting::group_by_two_level_threshold_bytes],
settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty() && aggregation_analysis_result.group_by_with_constant_keys),
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty()
&& aggregation_analysis_result.group_by_with_constant_keys),
query_context->getTempDataOnDisk(),
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params);
return aggregator_params;

View File

@ -778,12 +778,9 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
lambda_actions_dag.getOutputs().push_back(actions_stack.back().getNodeOrThrow(lambda_expression_node_name));
lambda_actions_dag.removeUnusedActions(Names(1, lambda_expression_node_name));
auto expression_actions_settings = ExpressionActionsSettings::fromContext(planner_context->getQueryContext(), CompileExpressions::yes);
auto lambda_actions = std::make_shared<ExpressionActions>(std::move(lambda_actions_dag), expression_actions_settings);
Names captured_column_names;
ActionsDAG::NodeRawConstPtrs lambda_children;
Names required_column_names = lambda_actions->getRequiredColumns();
Names required_column_names = lambda_actions_dag.getRequiredColumnsNames();
actions_stack.pop_back();
levels.reset(actions_stack.size());
@ -802,9 +799,10 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
}
}
auto expression_actions_settings = ExpressionActionsSettings::fromContext(planner_context->getQueryContext(), CompileExpressions::yes);
auto lambda_node_name = calculateActionNodeName(node, *planner_context);
auto function_capture = std::make_shared<FunctionCaptureOverloadResolver>(
lambda_actions, captured_column_names, lambda_arguments_names_and_types, lambda_node.getExpression()->getResultType(), lambda_expression_node_name, true);
std::move(lambda_actions_dag), expression_actions_settings, captured_column_names, lambda_arguments_names_and_types, lambda_node.getExpression()->getResultType(), lambda_expression_node_name, true);
// TODO: Pass IFunctionBase here not FunctionCaptureOverloadResolver.
const auto * actions_node = actions_stack[level].addFunctionIfNecessary(lambda_node_name, std::move(lambda_children), function_capture);

View File

@ -13,7 +13,9 @@
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
@ -30,6 +32,8 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_DATA;
}
static bool memoryBoundMergingWillBeUsed(
@ -147,6 +151,25 @@ const SortDescription & AggregatingStep::getSortDescription() const
return IQueryPlanStep::getSortDescription();
}
static void updateThreadsValues(
size_t & new_merge_threads,
size_t & new_temporary_data_merge_threads,
Aggregator::Params & params,
const BuildQueryPipelineSettings & settings)
{
/// Update values from settings if plan was deserialized.
if (new_merge_threads == 0)
new_merge_threads = settings.max_threads;
if (new_temporary_data_merge_threads == 0)
new_temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads;
if (new_temporary_data_merge_threads == 0)
new_temporary_data_merge_threads = new_merge_threads;
if (params.max_threads == 0)
params.max_threads = settings.max_threads;
}
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
@ -204,6 +227,10 @@ ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
size_t new_merge_threads = merge_threads;
size_t new_temporary_data_merge_threads = temporary_data_merge_threads;
updateThreadsValues(new_merge_threads, new_temporary_data_merge_threads, params, settings);
QueryPipelineProcessorsCollector collector(pipeline, this);
/// Forget about current totals and extremes. They will be calculated again after aggregation if needed.
@ -279,8 +306,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params_for_set,
many_data,
j,
merge_threads,
temporary_data_merge_threads,
new_merge_threads,
new_temporary_data_merge_threads,
should_produce_results_in_order_of_bucket_number,
skip_merging);
// For each input stream we have `grouping_sets_size` copies, so port index
@ -373,7 +400,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
return std::make_shared<AggregatingInOrderTransform>(
header, transform_params,
sort_description_for_merging, group_by_sort_description,
max_block_size, aggregation_in_order_max_block_bytes / merge_threads,
max_block_size, aggregation_in_order_max_block_bytes / new_merge_threads,
many_data, counter++);
});
@ -399,7 +426,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
pipeline.addTransform(std::move(transform));
/// Do merge of aggregated data in parallel.
pipeline.resize(merge_threads);
pipeline.resize(new_merge_threads);
const auto & required_sort_description = memoryBoundMergingWillBeUsed() ? group_by_sort_description : SortDescription{};
pipeline.addSimpleTransform(
@ -455,8 +482,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params,
many_data,
counter++,
merge_threads,
temporary_data_merge_threads,
new_merge_threads,
new_temporary_data_merge_threads,
should_produce_results_in_order_of_bucket_number,
skip_merging);
});
@ -484,6 +511,7 @@ void AggregatingStep::describeActions(FormatSettings & settings) const
settings.out << prefix << "Order: " << dumpSortDescription(sort_description_for_merging) << '\n';
}
settings.out << prefix << "Skip merging: " << skip_merging << '\n';
// settings.out << prefix << "Memory bound merging: " << memory_bound_merging_of_aggregation_results_enabled << '\n';
}
void AggregatingStep::describeActions(JSONBuilder::JSONMap & map) const
@ -588,8 +616,12 @@ void AggregatingProjectionStep::updateOutputHeader()
QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline(
QueryPipelineBuilders pipelines,
const BuildQueryPipelineSettings &)
const BuildQueryPipelineSettings & settings)
{
size_t new_merge_threads = merge_threads;
size_t new_temporary_data_merge_threads = temporary_data_merge_threads;
updateThreadsValues(new_merge_threads, new_temporary_data_merge_threads, params, settings);
auto & normal_parts_pipeline = pipelines.front();
auto & projection_parts_pipeline = pipelines.back();
@ -624,7 +656,7 @@ QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline(
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
header, transform_params, many_data, counter++, new_merge_threads, new_temporary_data_merge_threads);
});
};
@ -641,4 +673,173 @@ QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline(
return pipeline;
}
void AggregatingStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_block_size = max_block_size;
settings.aggregation_in_order_max_block_bytes = aggregation_in_order_max_block_bytes;
settings.aggregation_in_order_memory_bound_merging = should_produce_results_in_order_of_bucket_number;
settings.aggregation_sort_result_by_bucket_number = memory_bound_merging_of_aggregation_results_enabled;
settings.max_rows_to_group_by = params.max_rows_to_group_by;
settings.group_by_overflow_mode = params.group_by_overflow_mode;
settings.group_by_two_level_threshold = params.group_by_two_level_threshold;
settings.group_by_two_level_threshold_bytes = params.group_by_two_level_threshold_bytes;
settings.max_bytes_before_external_group_by = params.max_bytes_before_external_group_by;
settings.empty_result_for_aggregation_by_empty_set = params.empty_result_for_aggregation_by_empty_set;
settings.min_free_disk_space_for_temporary_data = params.min_free_disk_space;
settings.compile_aggregate_expressions = params.compile_aggregate_expressions;
settings.min_count_to_compile_aggregate_expression = params.min_count_to_compile_aggregate_expression;
settings.enable_software_prefetch_in_aggregation = params.enable_prefetch;
settings.optimize_group_by_constant_keys = params.optimize_group_by_constant_keys;
settings.min_hit_rate_to_use_consecutive_keys_optimization = params.min_hit_rate_to_use_consecutive_keys_optimization;
settings.collect_hash_table_stats_during_aggregation = params.stats_collecting_params.isCollectionAndUseEnabled();
settings.max_entries_for_hash_table_stats = params.stats_collecting_params.max_entries_for_hash_table_stats;
settings.max_size_to_preallocate_for_aggregation = params.stats_collecting_params.max_size_to_preallocate;
}
void AggregatingStep::serialize(Serialization & ctx) const
{
if (!sort_description_for_merging.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep optimized for in-order is not supported.");
if (!grouping_sets_params.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep with grouping sets is not supported.");
if (explicit_sorting_required_for_aggregation_in_order)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep explicit_sorting_required_for_aggregation_in_order is not supported.");
/// If you wonder why something is serialized using settings, and other is serialized using flags, considerations are following:
/// * flags are something that may change data format returning from the step
/// * settings are something which already was in Settings.h and, usually, is passed to Aggregator unchanged
/// Flags `final` and `group_by_use_nulls` change types, and `overflow_row` appends additional block to results.
/// Settings like `max_rows_to_group_by` or `empty_result_for_aggregation_by_empty_set` affect the result,
/// but does not change data format.
/// Overall, the rule is not strict.
UInt8 flags = 0;
if (final)
flags |= 1;
if (params.overflow_row)
flags |= 2;
if (group_by_use_nulls)
flags |= 4;
if (!grouping_sets_params.empty())
flags |= 8;
/// Ideally, key should be calculated from QueryPlan on the follower.
/// So, let's have a flag to disable sending/reading pre-calculated value.
if (params.stats_collecting_params.isCollectionAndUseEnabled())
flags |= 16;
writeIntBinary(flags, ctx.out);
if (explicit_sorting_required_for_aggregation_in_order)
serializeSortDescription(group_by_sort_description, ctx.out);
writeVarUInt(params.keys.size(), ctx.out);
for (const auto & key : params.keys)
writeStringBinary(key, ctx.out);
serializeAggregateDescriptions(params.aggregates, ctx.out);
if (params.stats_collecting_params.isCollectionAndUseEnabled())
writeIntBinary(params.stats_collecting_params.key, ctx.out);
}
std::unique_ptr<IQueryPlanStep> AggregatingStep::deserialize(Deserialization & ctx)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "AggregatingStep must have one input stream");
UInt8 flags;
readIntBinary(flags, ctx.in);
bool final = bool(flags & 1);
bool overflow_row = bool(flags & 2);
bool group_by_use_nulls = bool(flags & 4);
bool has_grouping_sets = bool(flags & 8);
bool has_stats_key = bool(flags & 16);
if (has_grouping_sets)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep with grouping sets is not supported.");
UInt64 num_keys;
readVarUInt(num_keys, ctx.in);
Names keys(num_keys);
for (auto & key : keys)
readStringBinary(key, ctx.in);
AggregateDescriptions aggregates;
deserializeAggregateDescriptions(aggregates, ctx.in);
UInt64 stats_key = 0;
if (has_stats_key)
readIntBinary(stats_key, ctx.in);
StatsCollectingParams stats_collecting_params(
stats_key,
ctx.settings.collect_hash_table_stats_during_aggregation,
ctx.settings.max_entries_for_hash_table_stats,
ctx.settings.max_size_to_preallocate_for_aggregation);
Aggregator::Params params
{
keys,
aggregates,
overflow_row,
ctx.settings.max_rows_to_group_by,
ctx.settings.group_by_overflow_mode,
ctx.settings.group_by_two_level_threshold,
ctx.settings.group_by_two_level_threshold_bytes,
ctx.settings.max_bytes_before_external_group_by,
ctx.settings.empty_result_for_aggregation_by_empty_set,
Context::getGlobalContextInstance()->getTempDataOnDisk(),
0, //settings.max_threads,
ctx.settings.min_free_disk_space_for_temporary_data,
ctx.settings.compile_aggregate_expressions,
ctx.settings.min_count_to_compile_aggregate_expression,
ctx.settings.max_block_size,
ctx.settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
ctx.settings.optimize_group_by_constant_keys,
ctx.settings.min_hit_rate_to_use_consecutive_keys_optimization,
stats_collecting_params
};
SortDescription sort_description_for_merging;
GroupingSetsParamsList grouping_sets_params;
auto aggregating_step = std::make_unique<AggregatingStep>(
ctx.input_headers.front(),
std::move(params),
std::move(grouping_sets_params),
final,
ctx.settings.max_block_size,
ctx.settings.aggregation_in_order_max_block_bytes,
0, //merge_threads,
0, //temporary_data_merge_threads,
false, // storage_has_evenly_distributed_read, TODO: later
group_by_use_nulls,
std::move(sort_description_for_merging),
SortDescription{},
ctx.settings.aggregation_in_order_memory_bound_merging,
ctx.settings.aggregation_sort_result_by_bucket_number,
false);
return aggregating_step;
}
void registerAggregatingStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Aggregating", AggregatingStep::deserialize);
}
}

View File

@ -74,6 +74,13 @@ public:
UInt64 group,
bool group_by_use_nulls);
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
void enableMemoryBoundMerging() { memory_bound_merging_of_aggregation_results_enabled = true; }
private:
void updateOutputHeader() override;
@ -98,7 +105,7 @@ private:
/// These settings are used to determine if we should resize pipeline to 1 at the end.
const bool should_produce_results_in_order_of_bucket_number;
const bool memory_bound_merging_of_aggregation_results_enabled;
bool memory_bound_merging_of_aggregation_results_enabled;
bool explicit_sorting_required_for_aggregation_in_order;
Processors aggregating_in_order;
@ -120,7 +127,7 @@ public:
);
String getName() const override { return "AggregatingProjection"; }
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override;
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
private:
void updateOutputHeader() override;

View File

@ -1,4 +1,7 @@
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/ArrayJoinTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -79,4 +82,50 @@ void ArrayJoinStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Columns", std::move(columns_array));
}
void ArrayJoinStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_block_size = max_block_size;
}
void ArrayJoinStep::serialize(Serialization & ctx) const
{
UInt8 flags = 0;
if (array_join.is_left)
flags |= 1;
if (is_unaligned)
flags |= 2;
writeIntBinary(flags, ctx.out);
writeVarUInt(array_join.columns.size(), ctx.out);
for (const auto & column : array_join.columns)
writeStringBinary(column, ctx.out);
}
std::unique_ptr<IQueryPlanStep> ArrayJoinStep::deserialize(Deserialization & ctx)
{
UInt8 flags;
readIntBinary(flags, ctx.in);
bool is_left = bool(flags & 1);
bool is_unaligned = bool(flags & 2);
UInt64 num_columns;
readVarUInt(num_columns, ctx.in);
ArrayJoin array_join;
array_join.is_left = is_left;
array_join.columns.resize(num_columns);
for (auto & column : array_join.columns)
readStringBinary(column, ctx.in);
return std::make_unique<ArrayJoinStep>(ctx.input_headers.front(), std::move(array_join), is_unaligned, ctx.settings.max_block_size);
}
void registerArrayJoinStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("ArrayJoin", ArrayJoinStep::deserialize);
}
}

View File

@ -22,6 +22,11 @@ public:
const Names & getColumns() const { return array_join.columns; }
bool isLeft() const { return array_join.is_left; }
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override;

View File

@ -9,6 +9,8 @@ namespace DB
namespace Setting
{
extern const SettingsBool query_plan_merge_filters;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 aggregation_memory_efficient_merge_threads;
}
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
@ -19,6 +21,9 @@ BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr fr
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();
settings.max_threads = from->getSettingsRef()[Setting::max_threads];
settings.aggregation_memory_efficient_merge_threads = from->getSettingsRef()[Setting::aggregation_memory_efficient_merge_threads];
/// Setting query_plan_merge_filters is enabled by default.
/// But it can brake short-circuit without splitting filter step into smaller steps.
/// So, enable and disable this optimizations together.

View File

@ -24,6 +24,9 @@ struct BuildQueryPipelineSettings
ProgressCallback progress_callback = nullptr;
TemporaryFileLookupPtr temporary_file_lookup;
size_t max_threads;
size_t aggregation_memory_efficient_merge_threads;
const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; }
static BuildQueryPipelineSettings fromContext(ContextPtr from);
};

View File

@ -66,6 +66,8 @@ public:
ContextPtr getContext() const { return context; }
PreparedSets::Subqueries detachSets() { return std::move(subqueries); }
void serialize(Serialization &) const override {}
private:
void updateOutputHeader() override { output_header = getInputHeaders().front(); }

View File

@ -1,4 +1,7 @@
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/DistinctSortedStreamTransform.h>
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
@ -13,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
}
static ITransformingStep::Traits getTraits(bool pre_distinct)
@ -48,6 +52,16 @@ DistinctStep::DistinctStep(
{
}
void DistinctStep::updateLimitHint(UInt64 hint)
{
if (hint && limit_hint)
/// Both limits are set - take the min
limit_hint = std::min(hint, limit_hint);
else
/// Some limit is not set - take the other one
limit_hint = std::max(hint, limit_hint);
}
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!pre_distinct)
@ -158,4 +172,58 @@ void DistinctStep::updateOutputHeader()
output_header = input_headers.front();
}
void DistinctStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_rows_in_distinct = set_size_limits.max_rows;
settings.max_bytes_in_distinct = set_size_limits.max_bytes;
settings.distinct_overflow_mode = set_size_limits.overflow_mode;
}
void DistinctStep::serialize(Serialization & ctx) const
{
/// Let's not serialize limit_hint.
/// Ideally, we can get if from a query plan optimization on the follower.
writeVarUInt(columns.size(), ctx.out);
for (const auto & column : columns)
writeStringBinary(column, ctx.out);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserialize(Deserialization & ctx, bool pre_distinct_)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "DistinctStep must have one input stream");
size_t columns_size;
readVarUInt(columns_size, ctx.in);
Names column_names(columns_size);
for (size_t i = 0; i < columns_size; ++i)
readStringBinary(column_names[i], ctx.in);
SizeLimits size_limits;
size_limits.max_rows = ctx.settings.max_rows_in_distinct;
size_limits.max_bytes = ctx.settings.max_bytes_in_distinct;
size_limits.overflow_mode = ctx.settings.distinct_overflow_mode;
return std::make_unique<DistinctStep>(
ctx.input_headers.front(), size_limits, 0, column_names, pre_distinct_);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserializeNormal(Deserialization & ctx)
{
return DistinctStep::deserialize(ctx, false);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserializePre(Deserialization & ctx)
{
return DistinctStep::deserialize(ctx, true);
}
void registerDistinctStep(QueryPlanStepRegistry & registry)
{
/// Preliminary distinct probably can be a query plan optimization.
/// It's easier to serialize it using different names, so that pre-distinct can be potentially removed later.
registry.registerStep("Distinct", DistinctStep::deserializeNormal);
registry.registerStep("PreDistinct", DistinctStep::deserializePre);
}
}

View File

@ -20,6 +20,8 @@ public:
String getName() const override { return "Distinct"; }
const Names & getColumnNames() const { return columns; }
String getSerializationName() const override { return pre_distinct ? "PreDistinct" : "Distinct"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
@ -28,6 +30,14 @@ public:
bool isPreliminary() const { return pre_distinct; }
UInt64 getLimitHint() const { return limit_hint; }
void updateLimitHint(UInt64 hint);
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx, bool pre_distinct_);
static std::unique_ptr<IQueryPlanStep> deserializeNormal(Deserialization & ctx);
static std::unique_ptr<IQueryPlanStep> deserializePre(Deserialization & ctx);
const SizeLimits & getSetSizeLimits() const { return set_size_limits; }

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/JoiningTransform.h>
@ -10,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
static ITransformingStep::Traits getTraits(const ActionsDAG & actions)
{
return ITransformingStep::Traits
@ -109,4 +116,23 @@ void ExpressionStep::updateOutputHeader()
output_header = ExpressionTransform::transformHeader(input_headers.front(), actions_dag);
}
void ExpressionStep::serialize(Serialization & ctx) const
{
actions_dag.serialize(ctx.out, ctx.registry);
}
std::unique_ptr<IQueryPlanStep> ExpressionStep::deserialize(Deserialization & ctx)
{
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "ExpressionStep must have one input stream");
return std::make_unique<ExpressionStep>(ctx.input_headers.front(), std::move(actions_dag));
}
void registerExpressionStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Expression", ExpressionStep::deserialize);
}
}

View File

@ -25,6 +25,9 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override;

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
@ -29,4 +31,19 @@ void ExtremesStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
pipeline.addExtremesTransform();
}
void ExtremesStep::serialize(Serialization & ctx) const
{
(void)ctx;
}
std::unique_ptr<IQueryPlanStep> ExtremesStep::deserialize(Deserialization & ctx)
{
return std::make_unique<ExtremesStep>(ctx.input_headers.front());
}
void registerExtremesStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Extremes", ExtremesStep::deserialize);
}
}

View File

@ -13,6 +13,9 @@ public:
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override
{

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/FilterTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/ExpressionTransform.h>
@ -16,6 +18,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
@ -240,4 +248,39 @@ void FilterStep::updateOutputHeader()
return;
}
void FilterStep::serialize(Serialization & ctx) const
{
UInt8 flags = 0;
if (remove_filter_column)
flags |= 1;
writeIntBinary(flags, ctx.out);
writeStringBinary(filter_column_name, ctx.out);
actions_dag.serialize(ctx.out, ctx.registry);
}
std::unique_ptr<IQueryPlanStep> FilterStep::deserialize(Deserialization & ctx)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "FilterStep must have one input stream");
UInt8 flags;
readIntBinary(flags, ctx.in);
bool remove_filter_column = bool(flags & 1);
String filter_column_name;
readStringBinary(filter_column_name, ctx.in);
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
return std::make_unique<FilterStep>(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name), remove_filter_column);
}
void registerFilterStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Filter", FilterStep::deserialize);
}
}

View File

@ -26,6 +26,10 @@ public:
const String & getFilterColumnName() const { return filter_column_name; }
bool removesFilterColumn() const { return remove_filter_column; }
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override;

View File

@ -8,6 +8,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
IQueryPlanStep::IQueryPlanStep()
@ -146,4 +147,11 @@ void IQueryPlanStep::appendExtraProcessors(const Processors & extra_processors)
processors.insert(processors.end(), extra_processors.begin(), extra_processors.end());
}
void IQueryPlanStep::serialize(Serialization & /*ctx*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serialize is not implemented for {}", getName());
}
void IQueryPlanStep::updateOutputHeader() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); }
}

View File

@ -24,6 +24,8 @@ namespace JSONBuilder { class JSONMap; }
class QueryPlan;
using QueryPlanRawPtrs = std::list<QueryPlan *>;
struct QueryPlanSerializationSettings;
using Header = Block;
using Headers = std::vector<Header>;
@ -36,6 +38,7 @@ public:
virtual ~IQueryPlanStep() = default;
virtual String getName() const = 0;
virtual String getSerializationName() const { return getName(); }
/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
@ -54,6 +57,11 @@ public:
const std::string & getStepDescription() const { return step_description; }
void setStepDescription(std::string description) { step_description = std::move(description); }
struct Serialization;
struct Deserialization;
virtual void serializeSettings(QueryPlanSerializationSettings & /*settings*/) const {}
virtual void serialize(Serialization & /*ctx*/) const;
virtual const SortDescription & getSortDescription() const;
struct FormatSettings

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/LimitByTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
@ -24,11 +26,11 @@ static ITransformingStep::Traits getTraits()
LimitByStep::LimitByStep(
const Header & input_header_,
size_t group_length_, size_t group_offset_, const Names & columns_)
size_t group_length_, size_t group_offset_, Names columns_)
: ITransformingStep(input_header_, input_header_, getTraits())
, group_length(group_length_)
, group_offset(group_offset_)
, columns(columns_)
, columns(std::move(columns_))
{
}
@ -83,4 +85,37 @@ void LimitByStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Offset", group_offset);
}
void LimitByStep::serialize(Serialization & ctx) const
{
writeVarUInt(group_length, ctx.out);
writeVarUInt(group_offset, ctx.out);
writeVarUInt(columns.size(), ctx.out);
for (const auto & column : columns)
writeStringBinary(column, ctx.out);
}
std::unique_ptr<IQueryPlanStep> LimitByStep::deserialize(Deserialization & ctx)
{
UInt64 group_length;
UInt64 group_offset;
readVarUInt(group_length, ctx.in);
readVarUInt(group_offset, ctx.in);
UInt64 num_columns;
readVarUInt(num_columns, ctx.in);
Names columns(num_columns);
for (auto & column : columns)
readStringBinary(column, ctx.in);
return std::make_unique<LimitByStep>(ctx.input_headers.front(), group_length, group_offset, std::move(columns));
}
void registerLimitByStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("LimitBy", LimitByStep::deserialize);
}
}

View File

@ -10,7 +10,7 @@ class LimitByStep : public ITransformingStep
public:
explicit LimitByStep(
const Header & input_header_,
size_t group_length_, size_t group_offset_, const Names & columns_);
size_t group_length_, size_t group_offset_, Names columns_);
String getName() const override { return "LimitBy"; }
@ -19,6 +19,10 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override
{

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/LimitTransform.h>
#include <IO/Operators.h>
@ -76,4 +78,47 @@ void LimitStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Reads All Data", always_read_till_end);
}
void LimitStep::serialize(Serialization & ctx) const
{
UInt8 flags = 0;
if (always_read_till_end)
flags |= 1;
if (with_ties)
flags |= 2;
writeIntBinary(flags, ctx.out);
writeVarUInt(limit, ctx.out);
writeVarUInt(offset, ctx.out);
if (with_ties)
serializeSortDescription(description, ctx.out);
}
std::unique_ptr<IQueryPlanStep> LimitStep::deserialize(Deserialization & ctx)
{
UInt8 flags;
readIntBinary(flags, ctx.in);
bool always_read_till_end = bool(flags & 1);
bool with_ties = bool(flags & 2);
UInt64 limit;
UInt64 offset;
readVarUInt(limit, ctx.in);
readVarUInt(offset, ctx.in);
SortDescription description;
if (with_ties)
deserializeSortDescription(description, ctx.in);
return std::make_unique<LimitStep>(ctx.input_headers.front(), limit, offset, always_read_till_end, with_ties, std::move(description));
}
void registerLimitStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Limit", LimitStep::deserialize);
}
}

View File

@ -33,6 +33,10 @@ public:
bool withTies() const { return with_ties; }
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override
{

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/OffsetStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/OffsetTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
@ -46,4 +48,22 @@ void OffsetStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Offset", offset);
}
void OffsetStep::serialize(Serialization & ctx) const
{
writeVarUInt(offset, ctx.out);
}
std::unique_ptr<IQueryPlanStep> OffsetStep::deserialize(Deserialization & ctx)
{
UInt64 offset;
readVarUInt(offset, ctx.in);
return std::make_unique<OffsetStep>(ctx.input_headers.front(), offset);
}
void registerOffsetStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Offset", OffsetStep::deserialize);
}
}

View File

@ -18,6 +18,10 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override
{

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations
@ -63,6 +64,12 @@ size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting()))
return 0;
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
{
distinct->updateLimitHint(limit->getLimitForSorting());
return 0;
}
if (typeid_cast<const SortingStep *>(child.get()))
return 0;

View File

@ -0,0 +1,8 @@
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
namespace DB
{
IMPLEMENT_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS);
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
namespace DB
{
#define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \
M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size in rows for reading", 0) \
\
M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \
M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \
M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, prefer_external_sort_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging.", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \
M(Bool, aggregation_in_order_memory_bound_merging, true, "Enable memory bound merging strategy when in-order is applied.", 0) \
M(Bool, aggregation_sort_result_by_bucket_number, true, "Send intermediate aggregation result in order of bucket number.", 0) \
\
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than the specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \
M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \
M(Float, min_hit_rate_to_use_consecutive_keys_optimization, 0.5, "Minimal hit rate of a cache which is used for consecutive keys optimization in aggregation to keep it enabled", 0) \
M(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
M(UInt64, max_size_to_preallocate_for_aggregation, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
\
M(TotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = any are present.", IMPORTANT) \
M(Float, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.", 0) \
\
DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS)
struct QueryPlanSerializationSettings : public BaseSettings<QueryPlanSerializationSettingsTraits>
{
QueryPlanSerializationSettings() = default;
};
}

View File

@ -0,0 +1,70 @@
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int LOGICAL_ERROR;
}
QueryPlanStepRegistry & QueryPlanStepRegistry::instance()
{
static QueryPlanStepRegistry registry;
return registry;
}
void QueryPlanStepRegistry::registerStep(const std::string & name, StepCreateFunction && create_function)
{
if (steps.contains(name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan step '{}' is already registered", name);
steps[name] = std::move(create_function);
}
QueryPlanStepPtr QueryPlanStepRegistry::createStep(
const std::string & name,
IQueryPlanStep::Deserialization & ctx) const
{
StepCreateFunction create_function;
{
auto it = steps.find(name);
if (it == steps.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown query plan step: {}", name);
create_function = it->second;
}
return create_function(ctx);
}
void registerExpressionStep(QueryPlanStepRegistry & registry);
void registerUnionStep(QueryPlanStepRegistry & registry);
void registerDistinctStep(QueryPlanStepRegistry & registry);
void registerSortingStep(QueryPlanStepRegistry & registry);
void registerAggregatingStep(QueryPlanStepRegistry & registry);
void registerArrayJoinStep(QueryPlanStepRegistry & registry);
void registerLimitByStep(QueryPlanStepRegistry & registry);
void registerLimitStep(QueryPlanStepRegistry & registry);
void registerOffsetStep(QueryPlanStepRegistry & registry);
void registerFilterStep(QueryPlanStepRegistry & registry);
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
void registerExtremesStep(QueryPlanStepRegistry & registry);
void QueryPlanStepRegistry::registerPlanSteps()
{
QueryPlanStepRegistry & registry = QueryPlanStepRegistry::instance();
registerExpressionStep(registry);
registerUnionStep(registry);
registerDistinctStep(registry);
registerSortingStep(registry);
registerAggregatingStep(registry);
registerArrayJoinStep(registry);
registerLimitByStep(registry);
registerLimitStep(registry);
registerOffsetStep(registry);
registerFilterStep(registry);
registerTotalsHavingStep(registry);
registerExtremesStep(registry);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class QueryPlanStepRegistry
{
public:
using StepCreateFunction = std::function<QueryPlanStepPtr(IQueryPlanStep::Deserialization &)>;
QueryPlanStepRegistry() = default;
QueryPlanStepRegistry(const QueryPlanStepRegistry &) = delete;
QueryPlanStepRegistry & operator=(const QueryPlanStepRegistry &) = delete;
static QueryPlanStepRegistry & instance();
static void registerPlanSteps();
void registerStep(const std::string & name, StepCreateFunction && create_function);
QueryPlanStepPtr createStep(
const std::string & name,
IQueryPlanStep::Deserialization & ctx) const;
private:
std::unordered_map<std::string, StepCreateFunction> steps;
};
}

View File

@ -1959,9 +1959,10 @@ bool ReadFromMergeTree::isQueryWithSampling() const
if (context->getSettingsRef()[Setting::parallel_replicas_count] > 1 && data.supportsSampling())
return true;
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->getSampleSizeRatio() != std::nullopt;
const auto & select = query_info.query->as<ASTSelectQuery &>();
return select.sampleSize() != nullptr;
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
struct SerializedSetsRegistry;
struct DeserializedSetsRegistry;
struct IQueryPlanStep::Serialization
{
WriteBuffer & out;
SerializedSetsRegistry & registry;
};
struct SerializedSetsRegistry;
struct IQueryPlanStep::Deserialization
{
ReadBuffer & in;
DeserializedSetsRegistry & registry;
const ContextPtr & context;
const Headers & input_headers;
const Header * output_header;
const QueryPlanSerializationSettings & settings;
};
}

View File

@ -2,6 +2,9 @@
#include <Interpreters/Context.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
@ -43,6 +46,8 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int INCORRECT_DATA;
}
SortingStep::Settings::Settings(const Context & context)
@ -88,6 +93,33 @@ SortingStep::Settings::Settings(size_t max_block_size_)
max_block_size = max_block_size_;
}
SortingStep::Settings::Settings(const QueryPlanSerializationSettings & settings)
{
max_block_size = settings.max_block_size;
size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
tmp_data = Context::getGlobalContextInstance()->getTempDataOnDisk();
min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
max_block_bytes = settings.prefer_external_sort_block_bytes;
read_in_order_use_buffering = false; //settings.read_in_order_use_buffering;
}
void SortingStep::Settings::updatePlanSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_block_size = max_block_size;
settings.max_rows_to_sort = size_limits.max_rows;
settings.max_bytes_to_sort = size_limits.max_bytes;
settings.sort_overflow_mode = size_limits.overflow_mode;
settings.max_bytes_before_remerge_sort = max_bytes_before_remerge;
settings.remerge_sort_lowered_memory_bytes_ratio = remerge_lowered_memory_bytes_ratio;
settings.max_bytes_before_external_sort = max_bytes_before_external_sort;
settings.min_free_disk_space_for_temporary_data = min_free_disk_space;
settings.prefer_external_sort_block_bytes = max_block_bytes;
}
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
@ -452,4 +484,52 @@ void SortingStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Limit", limit);
}
void SortingStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
sort_settings.updatePlanSettings(settings);
}
void SortingStep::serialize(Serialization & ctx) const
{
if (type != Type::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of SortingStep is implemented only for Full sorting");
/// Do not serialize type here; Later we can use different names if needed.\
/// Do not serialize limit for now; it is expected to be pushed down from plan optimization.
serializeSortDescription(result_description, ctx.out);
/// Later
if (!partition_by_description.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of partitioned sorting is not implemented for SortingStep");
writeVarUInt(partition_by_description.size(), ctx.out);
}
std::unique_ptr<IQueryPlanStep> SortingStep::deserialize(Deserialization & ctx)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "SortingStep must have one input stream");
SortingStep::Settings sort_settings(ctx.settings);
SortDescription result_description;
deserializeSortDescription(result_description, ctx.in);
UInt64 partition_desc_size;
readVarUInt(partition_desc_size, ctx.in);
if (partition_desc_size)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization of partitioned sorting is not implemented for SortingStep");
return std::make_unique<SortingStep>(
ctx.input_headers.front(), std::move(result_description), 0, std::move(sort_settings));
}
void registerSortingStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Sorting", SortingStep::deserialize);
}
}

View File

@ -23,7 +23,7 @@ public:
size_t max_block_size;
SizeLimits size_limits;
size_t max_bytes_before_remerge = 0;
double remerge_lowered_memory_bytes_ratio = 0;
float remerge_lowered_memory_bytes_ratio = 0;
size_t max_bytes_before_external_sort = 0;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
@ -32,6 +32,9 @@ public:
explicit Settings(const Context & context);
explicit Settings(size_t max_block_size_);
explicit Settings(const QueryPlanSerializationSettings & settings);
void updatePlanSettings(QueryPlanSerializationSettings & settings) const;
};
/// Full
@ -96,6 +99,11 @@ public:
UInt64 limit_,
bool skip_partial_sort = false);
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline);
void updateOutputHeader() override;

View File

@ -53,6 +53,9 @@ public:
void setLimit(size_t limit_value)
{
if (limit)
limit_value = std::min(limit_value, *limit);
limit = limit_value;
}

View File

@ -1,4 +1,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -10,6 +13,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
static ITransformingStep::Traits getTraits(bool has_filter)
{
return ITransformingStep::Traits
@ -33,7 +41,7 @@ TotalsHavingStep::TotalsHavingStep(
const std::string & filter_column_,
bool remove_filter_,
TotalsMode totals_mode_,
double auto_include_threshold_,
float auto_include_threshold_,
bool final_)
: ITransformingStep(
input_header_,
@ -141,5 +149,75 @@ void TotalsHavingStep::updateOutputHeader()
getAggregatesMask(input_headers.front(), aggregates));
}
void TotalsHavingStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.totals_mode = totals_mode;
settings.totals_auto_threshold = auto_include_threshold;
}
void TotalsHavingStep::serialize(Serialization & ctx) const
{
UInt8 flags = 0;
if (final)
flags |= 1;
if (overflow_row)
flags |= 2;
if (actions_dag)
flags |= 4;
if (actions_dag && remove_filter)
flags |= 8;
writeIntBinary(flags, ctx.out);
serializeAggregateDescriptions(aggregates, ctx.out);
if (actions_dag)
{
writeStringBinary(filter_column_name, ctx.out);
actions_dag->serialize(ctx.out, ctx.registry);
}
}
std::unique_ptr<IQueryPlanStep> TotalsHavingStep::deserialize(Deserialization & ctx)
{
if (ctx.input_headers.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "TotalsHaving must have one input stream");
UInt8 flags;
readIntBinary(flags, ctx.in);
bool final = bool(flags & 1);
bool overflow_row = bool(flags & 2);
bool has_actions_dag = bool(flags & 4);
bool remove_filter_column = bool(flags & 8);
AggregateDescriptions aggregates;
deserializeAggregateDescriptions(aggregates, ctx.in);
std::optional<ActionsDAG> actions_dag;
String filter_column_name;
if (has_actions_dag)
{
readStringBinary(filter_column_name, ctx.in);
actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
}
return std::make_unique<TotalsHavingStep>(
ctx.input_headers.front(),
std::move(aggregates),
overflow_row,
std::move(actions_dag),
std::move(filter_column_name),
remove_filter_column,
ctx.settings.totals_mode,
ctx.settings.totals_auto_threshold,
final);
}
void registerTotalsHavingStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("TotalsHaving", TotalsHavingStep::deserialize);
}
}

View File

@ -20,7 +20,7 @@ public:
const std::string & filter_column_,
bool remove_filter_,
TotalsMode totals_mode_,
double auto_include_threshold_,
float auto_include_threshold_,
bool final_);
String getName() const override { return "TotalsHaving"; }
@ -32,6 +32,11 @@ public:
const ActionsDAG * getActions() const { return actions_dag ? &*actions_dag : nullptr; }
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override;
@ -42,7 +47,7 @@ private:
String filter_column_name;
bool remove_filter;
TotalsMode totals_mode;
double auto_include_threshold;
float auto_include_threshold;
bool final;
};

View File

@ -1,6 +1,8 @@
#include <type_traits>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/Serialization.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -37,7 +39,7 @@ void UnionStep::updateOutputHeader()
output_header = checkHeaders(input_headers);
}
QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings)
{
auto pipeline = std::make_unique<QueryPipelineBuilder>();
@ -49,6 +51,8 @@ QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipeline
return pipeline;
}
size_t new_max_threads = max_threads ? max_threads : settings.max_threads;
for (auto & cur_pipeline : pipelines)
{
#if !defined(NDEBUG)
@ -75,7 +79,7 @@ QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipeline
}
}
*pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors);
*pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), new_max_threads, &processors);
return pipeline;
}
@ -84,4 +88,19 @@ void UnionStep::describePipeline(FormatSettings & settings) const
IQueryPlanStep::describePipeline(processors, settings);
}
void UnionStep::serialize(Serialization & ctx) const
{
(void)ctx;
}
std::unique_ptr<IQueryPlanStep> UnionStep::deserialize(Deserialization & ctx)
{
return std::make_unique<UnionStep>(ctx.input_headers);
}
void registerUnionStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Union", &UnionStep::deserialize);
}
}

View File

@ -13,12 +13,15 @@ public:
String getName() const override { return "Union"; }
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override;
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
void describePipeline(FormatSettings & settings) const override;
size_t getMaxThreads() const { return max_threads; }
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
private:
void updateOutputHeader() override;

View File

@ -43,15 +43,25 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
const Settings & settings = storage_.getContext()->getSettingsRef();
Aggregator::Params params(
settings,
keys,
aggregates,
/*overflow_row_=*/ false,
/*group_by_two_level_threshold_=*/ 0,
/*group_by_two_level_threshold_bytes_=*/ 0,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
/*group_by_two_level_threshold*/ 0,
/*group_by_two_level_threshold_bytes*/ 0,
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
settings[Setting::empty_result_for_aggregation_by_empty_set],
storage_.getContext()->getTempDataOnDisk(),
/*only_merge_=*/false,
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/*only_merge=*/false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_chunk_bytes_for_parallel_parsing],
/*stats_collecting_params_=*/{});
aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -2,6 +2,8 @@
#include <Common/formatReadable.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
@ -14,6 +16,12 @@ namespace ProfileEvents
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
}
bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_many_rows_exception_code, int too_many_bytes_exception_code) const
{
if (overflow_mode == OverflowMode::THROW)
@ -64,4 +72,25 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti
return check(rows, bytes, what, exception_code, exception_code);
}
static void checkAllowedOwerflowMode(OverflowMode mode, int code)
{
if (!(mode == OverflowMode::BREAK || mode == OverflowMode::THROW))
throw Exception(code, "Unexpected overflow mode {}", mode);
}
void SizeLimits::serialize(WriteBuffer & out) const
{
checkAllowedOwerflowMode(overflow_mode, ErrorCodes::LOGICAL_ERROR);
writeVarUInt(max_rows, out);
writeVarUInt(max_bytes, out);
writeIntBinary(overflow_mode, out);
}
void SizeLimits::deserialize(ReadBuffer & in)
{
checkAllowedOwerflowMode(overflow_mode, ErrorCodes::INCORRECT_DATA);
readVarUInt(max_rows, in);
readVarUInt(max_bytes, in);
readIntBinary(overflow_mode, in);
}
}

View File

@ -18,6 +18,9 @@ enum class OverflowMode : uint8_t
ANY = 2,
};
class WriteBuffer;
class ReadBuffer;
struct SizeLimits
{
@ -38,6 +41,9 @@ struct SizeLimits
bool softCheck(UInt64 rows, UInt64 bytes) const;
bool hasLimits() const { return max_rows || max_bytes; }
void serialize(WriteBuffer & out) const;
void deserialize(ReadBuffer & in);
};
}