mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Added method "getHeader" in IBlockInputStream [#CLICKHOUSE-2]
This commit is contained in:
parent
1780e6c1d9
commit
fe880d73e7
@ -66,7 +66,7 @@ std::ostream & operator<<(std::ostream & stream, const IFunction & what)
|
||||
std::ostream & operator<<(std::ostream & stream, const Block & what)
|
||||
{
|
||||
stream << "Block("
|
||||
<< "size = " << what.columns()
|
||||
<< "num_columns = " << what.columns()
|
||||
<< "){" << what.dumpStructure() << "}";
|
||||
return stream;
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public:
|
||||
Block getHeader() override
|
||||
{
|
||||
Block res = children.back()->getHeader();
|
||||
res.insert({nullptr, data_type, column_name});
|
||||
res.insert({data_type->createColumn(), data_type, column_name});
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,9 @@ const Block & ExpressionBlockInputStream::getTotals()
|
||||
|
||||
Block ExpressionBlockInputStream::getHeader()
|
||||
{
|
||||
return expression->getSampleBlock();
|
||||
Block res = children.back()->getHeader();
|
||||
expression->execute(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
Block ExpressionBlockInputStream::readImpl()
|
||||
|
@ -59,7 +59,8 @@ const Block & FilterBlockInputStream::getTotals()
|
||||
|
||||
Block FilterBlockInputStream::getHeader()
|
||||
{
|
||||
Block res = expression->getSampleBlock();
|
||||
Block res = children.back()->getHeader();
|
||||
expression->execute(res);
|
||||
|
||||
/// Isn't the filter already constant?
|
||||
ColumnPtr column = res.safeGetByPosition(filter_column).column;
|
||||
|
@ -49,8 +49,7 @@ public:
|
||||
IBlockInputStream() {}
|
||||
|
||||
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
|
||||
* Header block contains column names, data types, and constant columns.
|
||||
* If the column is not constant (and not dummy), then the header block will contain nullptr instead of column.
|
||||
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
|
||||
* It is guaranteed that method "read" returns blocks of exactly that structure.
|
||||
*/
|
||||
virtual Block getHeader() = 0;
|
||||
|
@ -36,6 +36,8 @@ public:
|
||||
|
||||
Block getHeader() override
|
||||
{
|
||||
std::cerr << "LazyBlockInputStream::getHeader()\n";
|
||||
|
||||
init();
|
||||
if (!input)
|
||||
return {};
|
||||
@ -67,12 +69,16 @@ private:
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
std::cerr << "LazyBlockInputStream::init()\n";
|
||||
|
||||
input = generator();
|
||||
initialized = true;
|
||||
|
||||
if (!input)
|
||||
return;
|
||||
|
||||
std::cerr << "!\n";
|
||||
|
||||
auto * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get());
|
||||
|
||||
if (p_input)
|
||||
|
@ -24,14 +24,7 @@ String MaterializingBlockInputStream::getID() const
|
||||
|
||||
Block MaterializingBlockInputStream::getHeader()
|
||||
{
|
||||
Block res = children.back()->getHeader();
|
||||
|
||||
/// Constant columns become non constant.
|
||||
for (auto & elem : res)
|
||||
if (!elem.column->isColumnConst())
|
||||
elem.column = nullptr;
|
||||
|
||||
return res;
|
||||
return materializeBlock(children.back()->getHeader());
|
||||
}
|
||||
|
||||
Block MaterializingBlockInputStream::readImpl()
|
||||
|
@ -59,7 +59,10 @@ Block NativeBlockInputStream::getHeader()
|
||||
|
||||
Block res;
|
||||
for (const auto & column : index_block_it->columns)
|
||||
res.insert({ nullptr, DataTypeFactory::instance().get(column.type), column.name });
|
||||
{
|
||||
auto type = DataTypeFactory::instance().get(column.type);
|
||||
res.insert({ type->createColumn(), type, column.name });
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
{
|
||||
Block res;
|
||||
for (const auto & elem : block)
|
||||
res.insert({ elem.column->isColumnConst() ? elem.column->cloneEmpty() : nullptr, elem.type, elem.name });
|
||||
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -169,7 +169,6 @@ Block RemoteBlockInputStream::receiveBlock()
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::Data:
|
||||
LOG_INFO(log, "received " << packet.block.rows() << ": " << packet.block.dumpStructure());
|
||||
return packet.block;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
|
@ -85,9 +85,11 @@ const Block & TotalsHavingBlockInputStream::getTotals()
|
||||
|
||||
Block TotalsHavingBlockInputStream::getHeader()
|
||||
{
|
||||
unfinalized_header = children.at(0)->getHeader();
|
||||
auto res = unfinalized_header;
|
||||
Block res = children.at(0)->getHeader();
|
||||
unfinalized_header = res;
|
||||
finalize(res);
|
||||
if (expression)
|
||||
expression->execute(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ private:
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
public:
|
||||
/// expression may be nullptr
|
||||
TotalsHavingBlockInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
bool overflow_row_, const ExpressionActionsPtr & expression_,
|
||||
|
@ -28,13 +28,7 @@ Block DictionaryBlockInputStreamBase::readImpl()
|
||||
|
||||
Block DictionaryBlockInputStreamBase::getHeader()
|
||||
{
|
||||
Block block = getBlock(0, 0);
|
||||
|
||||
/// Columns are non constant. Reset them for header block.
|
||||
for (auto & elem : block)
|
||||
elem.column = nullptr;
|
||||
|
||||
return block;
|
||||
return getBlock(0, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
@ -99,20 +100,18 @@ Block Aggregator::getHeader(bool final) const
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
ColumnWithTypeAndName col;
|
||||
col.name = params.aggregates[i].column_name;
|
||||
|
||||
size_t arguments_size = params.aggregates[i].arguments.size();
|
||||
DataTypes argument_types(arguments_size);
|
||||
for (size_t j = 0; j < arguments_size; ++j)
|
||||
argument_types[j] = params.src_header.safeGetByPosition(params.aggregates[i].arguments[j]).type;
|
||||
|
||||
DataTypePtr type;
|
||||
if (final)
|
||||
col.type = params.aggregates[i].function->getReturnType();
|
||||
type = params.aggregates[i].function->getReturnType();
|
||||
else
|
||||
col.type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
|
||||
type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
|
||||
|
||||
res.insert(std::move(col));
|
||||
res.insert({ type->createColumn(), type, params.aggregates[i].column_name });
|
||||
}
|
||||
}
|
||||
else if (params.intermediate_header)
|
||||
@ -120,13 +119,16 @@ Block Aggregator::getHeader(bool final) const
|
||||
res = params.intermediate_header.cloneEmpty();
|
||||
|
||||
if (final)
|
||||
{
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
res.safeGetByPosition(params.keys_size + i).type = params.aggregates[i].function->getReturnType();
|
||||
}
|
||||
{
|
||||
auto & elem = res.getByPosition(params.keys_size + i);
|
||||
|
||||
/// All columns are non-constant in result.
|
||||
for (auto & elem : res)
|
||||
elem.column = nullptr;
|
||||
elem.type = params.aggregates[i].function->getReturnType();
|
||||
elem.column = elem.type->createColumn();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -349,102 +351,70 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
|
||||
}
|
||||
|
||||
|
||||
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const
|
||||
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(Sizes & key_sizes) const
|
||||
{
|
||||
/// If no keys. All aggregating to single row.
|
||||
if (params.keys_size == 0)
|
||||
return AggregatedDataVariants::Type::without_key;
|
||||
|
||||
/// Check if at least one of the specified keys is nullable.
|
||||
/// Create a set of nested key columns from the corresponding key columns.
|
||||
/// Here "nested" means that, if a key column is nullable, we take its nested
|
||||
/// column; otherwise we take the key column as is.
|
||||
ColumnRawPtrs nested_key_columns;
|
||||
nested_key_columns.reserve(key_columns.size());
|
||||
DataTypes types_removed_nullable;
|
||||
types_removed_nullable.reserve(params.keys.size());
|
||||
bool has_nullable_key = false;
|
||||
|
||||
for (const auto & col : key_columns)
|
||||
for (const auto & pos : params.keys)
|
||||
{
|
||||
if (col->isColumnNullable())
|
||||
const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
|
||||
|
||||
if (type->isNullable())
|
||||
{
|
||||
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(*col);
|
||||
nested_key_columns.push_back(&nullable_col.getNestedColumn());
|
||||
has_nullable_key = true;
|
||||
types_removed_nullable.push_back(removeNullable(type));
|
||||
}
|
||||
else
|
||||
nested_key_columns.push_back(col);
|
||||
types_removed_nullable.push_back(type);
|
||||
}
|
||||
|
||||
/** Returns ordinary (not two-level) methods, because we start from them.
|
||||
* Later, during aggregation process, data may be converted (partitioned) to two-level structure, if cardinality is high.
|
||||
*/
|
||||
|
||||
bool all_fixed = true;
|
||||
size_t keys_bytes = 0;
|
||||
|
||||
size_t num_array_keys = 0;
|
||||
bool has_arrays_of_non_fixed_elems = false;
|
||||
bool all_non_array_keys_are_fixed = true;
|
||||
bool has_tuples = false;
|
||||
bool has_arrays_of_nullable = false;
|
||||
size_t num_contiguous_keys = 0;
|
||||
size_t num_fixed_contiguous_keys = 0;
|
||||
size_t num_string_keys = 0;
|
||||
|
||||
key_sizes.resize(params.keys_size);
|
||||
for (size_t j = 0; j < params.keys_size; ++j)
|
||||
{
|
||||
if (nested_key_columns[j]->isFixedAndContiguous())
|
||||
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
|
||||
{
|
||||
key_sizes[j] = nested_key_columns[j]->sizeOfValueIfFixed();
|
||||
keys_bytes += key_sizes[j];
|
||||
}
|
||||
else
|
||||
{
|
||||
all_fixed = false;
|
||||
++num_contiguous_keys;
|
||||
|
||||
if (const ColumnArray * arr = typeid_cast<const ColumnArray *>(nested_key_columns[j]))
|
||||
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
|
||||
{
|
||||
++num_array_keys;
|
||||
|
||||
if (arr->getData().isColumnNullable())
|
||||
has_arrays_of_nullable = true;
|
||||
|
||||
if (!arr->getData().isFixedAndContiguous())
|
||||
has_arrays_of_non_fixed_elems = true;
|
||||
++num_fixed_contiguous_keys;
|
||||
key_sizes[j] = types_removed_nullable[j]->getSizeOfValueInMemory();
|
||||
keys_bytes += key_sizes[j];
|
||||
}
|
||||
else
|
||||
{
|
||||
all_non_array_keys_are_fixed = false;
|
||||
|
||||
if (typeid_cast<const ColumnTuple *>(nested_key_columns[j]))
|
||||
has_tuples = true;
|
||||
if (types_removed_nullable[j]->isString())
|
||||
{
|
||||
++num_string_keys;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If no keys. All aggregating to single row.
|
||||
if (params.keys_size == 0)
|
||||
return AggregatedDataVariants::Type::without_key;
|
||||
|
||||
if (has_nullable_key || has_arrays_of_nullable)
|
||||
if (has_nullable_key)
|
||||
{
|
||||
/// At least one key is nullable. Therefore we choose an aggregation method
|
||||
/// that takes into account this fact.
|
||||
if ((params.keys_size == 1) && (nested_key_columns[0]->isNumeric()))
|
||||
{
|
||||
/// We have exactly one key and it is nullable. We shall add it a tag
|
||||
/// which specifies whether its value is null or not.
|
||||
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
|
||||
if ((size_of_field == 1) || (size_of_field == 2) || (size_of_field == 4) || (size_of_field == 8) || (size_of_field == 16))
|
||||
return AggregatedDataVariants::Type::nullable_keys128;
|
||||
else
|
||||
throw Exception{"Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16.",
|
||||
ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
|
||||
if (all_fixed)
|
||||
if (params.keys_size == num_fixed_contiguous_keys)
|
||||
{
|
||||
/// Pack if possible all the keys along with information about which key values are nulls
|
||||
/// into a fixed 16- or 32-byte blob.
|
||||
if (keys_bytes > (std::numeric_limits<size_t>::max() - std::tuple_size<KeysNullMap<UInt128>>::value))
|
||||
throw Exception{"Aggregator: keys sizes overflow", ErrorCodes::LOGICAL_ERROR};
|
||||
if ((std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes) <= 16)
|
||||
if (std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::nullable_keys128;
|
||||
if ((std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes) <= 32)
|
||||
if (std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::nullable_keys256;
|
||||
}
|
||||
|
||||
@ -455,9 +425,9 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
/// No key has been found to be nullable.
|
||||
|
||||
/// Single numeric key.
|
||||
if ((params.keys_size == 1) && nested_key_columns[0]->isNumeric())
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
|
||||
{
|
||||
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
|
||||
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
|
||||
if (size_of_field == 1)
|
||||
return AggregatedDataVariants::Type::key8;
|
||||
if (size_of_field == 2)
|
||||
@ -472,23 +442,24 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
}
|
||||
|
||||
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
|
||||
if (all_fixed && keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::keys128;
|
||||
if (all_fixed && keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::keys256;
|
||||
if (params.keys_size == num_fixed_contiguous_keys)
|
||||
{
|
||||
if (keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::keys128;
|
||||
if (keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::keys256;
|
||||
}
|
||||
|
||||
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
|
||||
if (params.keys_size == 1 && typeid_cast<const ColumnString *>(nested_key_columns[0]))
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isString())
|
||||
return AggregatedDataVariants::Type::key_string;
|
||||
|
||||
if (params.keys_size == 1 && typeid_cast<const ColumnFixedString *>(nested_key_columns[0]))
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isFixedString())
|
||||
return AggregatedDataVariants::Type::key_fixed_string;
|
||||
|
||||
/** If some keys are arrays.
|
||||
* If there is no more than one key that is array, and it is array of fixed-size elements, and all other keys are fixed-size,
|
||||
* then it is possible to use 'concat' method (due to one-to-one correspondense). Otherwise the method will be 'serialized'.
|
||||
/** If it is possible to use 'concat' method due to one-to-one correspondense. Otherwise the method will be 'serialized'.
|
||||
*/
|
||||
if (num_array_keys == 1 && !has_arrays_of_non_fixed_elems && all_non_array_keys_are_fixed)
|
||||
if (params.keys_size == num_contiguous_keys && num_fixed_contiguous_keys + 1 >= num_contiguous_keys)
|
||||
return AggregatedDataVariants::Type::concat;
|
||||
|
||||
/** For case with multiple strings, we use 'concat' method despite the fact, that correspondense is not one-to-one.
|
||||
@ -496,11 +467,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
* But if strings contains zero bytes in between, different keys may clash.
|
||||
* For example, keys ('a\0b', 'c') and ('a', 'b\0c') will be aggregated as one key.
|
||||
* This is documented behaviour. It may be avoided by just switching to 'serialized' method, which is less efficient.
|
||||
*
|
||||
* Some of aggregation keys may be tuples. In most cases, tuples are flattened in expression analyzer and not passed here.
|
||||
* But in rare cases, they are not flattened. Will fallback to 'serialized' method for simplicity.
|
||||
*/
|
||||
if (num_array_keys == 0 && !has_tuples)
|
||||
if (params.keys_size == num_fixed_contiguous_keys + num_string_keys)
|
||||
return AggregatedDataVariants::Type::concat;
|
||||
|
||||
return AggregatedDataVariants::Type::serialized;
|
||||
@ -739,7 +707,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
/// How to perform the aggregation?
|
||||
if (result.empty())
|
||||
{
|
||||
result.init(chooseAggregationMethod(key_columns, key_sizes));
|
||||
result.init(chooseAggregationMethod(key_sizes));
|
||||
result.keys_size = params.keys_size;
|
||||
result.key_sizes = key_sizes;
|
||||
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
|
||||
@ -1995,9 +1963,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
|
||||
/** If the remote servers used a two-level aggregation method,
|
||||
* then blocks will contain information about the number of the bucket.
|
||||
* Then the calculations can be parallelized by buckets.
|
||||
@ -2026,14 +1991,8 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
if (bucket_to_blocks.empty())
|
||||
return;
|
||||
|
||||
Block header = getHeader(true);
|
||||
|
||||
/// How to perform the aggregation?
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = header.safeGetByPosition(i).column.get();
|
||||
|
||||
Sizes key_sizes;
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_sizes);
|
||||
|
||||
/** `minus one` means the absence of information about the bucket
|
||||
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
|
||||
@ -2178,17 +2137,8 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
|
||||
Stopwatch watch;
|
||||
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
|
||||
Block header = getHeader(true);
|
||||
|
||||
/// How to perform the aggregation?
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = header.safeGetByPosition(i).column.get();
|
||||
|
||||
Sizes key_sizes;
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_sizes);
|
||||
|
||||
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
|
||||
* Better hash function is needed because during external aggregation,
|
||||
@ -2196,13 +2146,13 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
*/
|
||||
|
||||
#define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
|
||||
#define M(NAME) \
|
||||
if (method == AggregatedDataVariants::Type::NAME) \
|
||||
@ -2350,7 +2300,7 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = block.safeGetByPosition(i).column.get();
|
||||
|
||||
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
|
||||
AggregatedDataVariants::Type type = chooseAggregationMethod(key_sizes);
|
||||
data.keys_size = params.keys_size;
|
||||
data.key_sizes = key_sizes;
|
||||
|
||||
|
@ -900,25 +900,25 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key32) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(hashed) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(nullable_keys128) \
|
||||
M(nullable_keys256) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(hashed) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(nullable_keys128) \
|
||||
M(nullable_keys256) \
|
||||
|
||||
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key8) \
|
||||
M(key8) \
|
||||
M(key16) \
|
||||
M(key64_hash64) \
|
||||
M(key_string_hash64) \
|
||||
M(key_string_hash64)\
|
||||
M(key_fixed_string_hash64) \
|
||||
M(keys128_hash64) \
|
||||
M(keys256_hash64) \
|
||||
M(concat_hash64) \
|
||||
M(keys128_hash64) \
|
||||
M(keys256_hash64) \
|
||||
M(concat_hash64) \
|
||||
M(serialized_hash64) \
|
||||
|
||||
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
|
||||
@ -943,16 +943,16 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
void convertToTwoLevel();
|
||||
|
||||
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
|
||||
M(key32_two_level) \
|
||||
M(key64_two_level) \
|
||||
M(key_string_two_level) \
|
||||
M(key_fixed_string_two_level) \
|
||||
M(keys128_two_level) \
|
||||
M(keys256_two_level) \
|
||||
M(hashed_two_level) \
|
||||
M(concat_two_level) \
|
||||
M(serialized_two_level) \
|
||||
M(nullable_keys128_two_level) \
|
||||
M(key32_two_level) \
|
||||
M(key64_two_level) \
|
||||
M(key_string_two_level) \
|
||||
M(key_fixed_string_two_level) \
|
||||
M(keys128_two_level) \
|
||||
M(keys256_two_level) \
|
||||
M(hashed_two_level) \
|
||||
M(concat_two_level) \
|
||||
M(serialized_two_level) \
|
||||
M(nullable_keys128_two_level) \
|
||||
M(nullable_keys256_two_level)
|
||||
};
|
||||
|
||||
@ -1182,7 +1182,7 @@ protected:
|
||||
TemporaryFiles temporary_files;
|
||||
|
||||
/** Select the aggregation method based on the number and types of keys. */
|
||||
AggregatedDataVariants::Type chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const;
|
||||
AggregatedDataVariants::Type chooseAggregationMethod(Sizes & key_sizes) const;
|
||||
|
||||
/** Create states of aggregate functions for one key.
|
||||
*/
|
||||
|
@ -1621,10 +1621,10 @@ void ExpressionAnalyzer::makeSet(const ASTFunction * node, const Block & sample_
|
||||
|
||||
/** Why is LazyBlockInputStream used?
|
||||
*
|
||||
* The fact is that when processing a request of the form
|
||||
* The fact is that when processing a query of the form
|
||||
* SELECT ... FROM remote_test WHERE column GLOBAL IN (subquery),
|
||||
* if the distributed remote_test table contains localhost as one of the servers,
|
||||
* the request will be interpreted locally again (and not sent over TCP, as in the case of a remote server).
|
||||
* the query will be interpreted locally again (and not sent over TCP, as in the case of a remote server).
|
||||
*
|
||||
* The query execution pipeline will be:
|
||||
* CreatingSets
|
||||
@ -1633,7 +1633,7 @@ void ExpressionAnalyzer::makeSet(const ASTFunction * node, const Block & sample_
|
||||
* reading from the table _data1, creating the set (2)
|
||||
* read from the table subordinate to remote_test.
|
||||
*
|
||||
* (The second part of the pipeline under CreateSets is a reinterpretation of the request inside StorageDistributed,
|
||||
* (The second part of the pipeline under CreateSets is a reinterpretation of the query inside StorageDistributed,
|
||||
* the query differs in that the database name and tables are replaced with subordinates, and the subquery is replaced with _data1.)
|
||||
*
|
||||
* But when creating the pipeline, when creating the source (2), it will be found that the _data1 table is empty
|
||||
|
@ -907,7 +907,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeWhere(ExpressionActionsPtr expression)
|
||||
void InterpreterSelectQuery::executeWhere(const ExpressionActionsPtr & expression)
|
||||
{
|
||||
transformStreams([&](auto & stream)
|
||||
{
|
||||
@ -916,7 +916,7 @@ void InterpreterSelectQuery::executeWhere(ExpressionActionsPtr expression)
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, bool overflow_row, bool final)
|
||||
void InterpreterSelectQuery::executeAggregation(const ExpressionActionsPtr & expression, bool overflow_row, bool final)
|
||||
{
|
||||
transformStreams([&](auto & stream)
|
||||
{
|
||||
@ -1034,7 +1034,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeHaving(ExpressionActionsPtr expression)
|
||||
void InterpreterSelectQuery::executeHaving(const ExpressionActionsPtr & expression)
|
||||
{
|
||||
transformStreams([&](auto & stream)
|
||||
{
|
||||
@ -1043,7 +1043,7 @@ void InterpreterSelectQuery::executeHaving(ExpressionActionsPtr expression)
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, ExpressionActionsPtr expression, bool overflow_row)
|
||||
void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, const ExpressionActionsPtr & expression, bool overflow_row)
|
||||
{
|
||||
executeUnion();
|
||||
|
||||
@ -1055,7 +1055,7 @@ void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, ExpressionA
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeExpression(ExpressionActionsPtr expression)
|
||||
void InterpreterSelectQuery::executeExpression(const ExpressionActionsPtr & expression)
|
||||
{
|
||||
transformStreams([&](auto & stream)
|
||||
{
|
||||
@ -1156,7 +1156,7 @@ void InterpreterSelectQuery::executeMergeSorted()
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeProjection(ExpressionActionsPtr expression)
|
||||
void InterpreterSelectQuery::executeProjection(const ExpressionActionsPtr & expression)
|
||||
{
|
||||
transformStreams([&](auto & stream)
|
||||
{
|
||||
|
@ -138,19 +138,19 @@ private:
|
||||
/// Fetch data from the table. Returns the stage to which the query was processed in Storage.
|
||||
QueryProcessingStage::Enum executeFetchColumns();
|
||||
|
||||
void executeWhere(ExpressionActionsPtr expression);
|
||||
void executeAggregation(ExpressionActionsPtr expression, bool overflow_row, bool final);
|
||||
void executeWhere(const ExpressionActionsPtr & expression);
|
||||
void executeAggregation(const ExpressionActionsPtr & expression, bool overflow_row, bool final);
|
||||
void executeMergeAggregated(bool overflow_row, bool final);
|
||||
void executeTotalsAndHaving(bool has_having, ExpressionActionsPtr expression, bool overflow_row);
|
||||
void executeHaving(ExpressionActionsPtr expression);
|
||||
void executeExpression(ExpressionActionsPtr expression);
|
||||
void executeTotalsAndHaving(bool has_having, const ExpressionActionsPtr & expression, bool overflow_row);
|
||||
void executeHaving(const ExpressionActionsPtr & expression);
|
||||
void executeExpression(const ExpressionActionsPtr & expression);
|
||||
void executeOrder();
|
||||
void executeMergeSorted();
|
||||
void executePreLimit();
|
||||
void executeUnion();
|
||||
void executeLimitBy();
|
||||
void executeLimit();
|
||||
void executeProjection(ExpressionActionsPtr expression);
|
||||
void executeProjection(const ExpressionActionsPtr & expression);
|
||||
void executeDistinct(bool before_order, Names columns);
|
||||
void executeSubqueriesInSetsAndJoins(std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
|
||||
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
|
||||
Block getHeader() override
|
||||
{
|
||||
return { ColumnWithTypeAndName(nullptr, std::make_shared<DataTypeUInt64>(), "number") };
|
||||
return { ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number") };
|
||||
}
|
||||
|
||||
protected:
|
||||
|
Loading…
Reference in New Issue
Block a user