mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge branch 'master' into new-nav
This commit is contained in:
commit
ecf6f28c77
@ -16,7 +16,7 @@ ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY COMMENT 'Comment'
|
||||
|
||||
**Examples**
|
||||
|
||||
Creating a table with comment (for more information, see the [COMMENT] clause(../../../sql-reference/statements/create/table.md#comment-table)):
|
||||
Creating a table with comment (for more information, see the [COMMENT](../../../sql-reference/statements/create/table.md#comment-table) clause):
|
||||
|
||||
``` sql
|
||||
CREATE TABLE table_with_comment
|
||||
|
@ -393,15 +393,15 @@ These codecs are designed to make compression more effective by using specific f
|
||||
|
||||
#### DoubleDelta
|
||||
|
||||
`DoubleDelta` — Calculates delta of deltas and writes it in compact binary form. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-byte deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
|
||||
`DoubleDelta(bytes_size)` — Calculates delta of deltas and writes it in compact binary form. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1. Optimal compression rates are achieved for monotonic sequences with a constant stride, such as time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Uses 1 extra bit for 32-bit deltas: 5-bit prefixes instead of 4-bit prefixes. For additional information, see Compressing Time Stamps in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
|
||||
|
||||
#### Gorilla
|
||||
|
||||
`Gorilla` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
|
||||
`Gorilla(bytes_size)` — Calculates XOR between current and previous floating point value and writes it in compact binary form. The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate. Implements the algorithm used in Gorilla TSDB, extending it to support 64-bit types. Possible `bytes_size` values: 1, 2, 4, 8, the default value is `sizeof(type)` if equal to 1, 2, 4, or 8. In all other cases, it’s 1. For additional information, see section 4.1 in [Gorilla: A Fast, Scalable, In-Memory Time Series Database](https://doi.org/10.14778/2824032.2824078).
|
||||
|
||||
#### FPC
|
||||
|
||||
`FPC` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
|
||||
`FPC(level, float_size)` - Repeatedly predicts the next floating point value in the sequence using the better of two predictors, then XORs the actual with the predicted value, and leading-zero compresses the result. Similar to Gorilla, this is efficient when storing a series of floating point values that change slowly. For 64-bit values (double), FPC is faster than Gorilla, for 32-bit values your mileage may vary. Possible `level` values: 1-28, the default value is 12. Possible `float_size` values: 4, 8, the default value is `sizeof(type)` if type is Float. In all other cases, it’s 4. For a detailed description of the algorithm see [High Throughput Compression of Double-Precision Floating-Point Data](https://userweb.cs.txstate.edu/~burtscher/papers/dcc07a.pdf).
|
||||
|
||||
#### T64
|
||||
|
||||
|
@ -66,6 +66,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
|
||||
using namespace DB;
|
||||
namespace po = boost::program_options;
|
||||
|
||||
bool print_stacktrace = false;
|
||||
try
|
||||
{
|
||||
po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
|
||||
@ -84,6 +85,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
|
||||
("level", po::value<int>(), "compression level for codecs specified via flags")
|
||||
("none", "use no compression instead of LZ4")
|
||||
("stat", "print block statistics of compressed data")
|
||||
("stacktrace", "print stacktrace of exception")
|
||||
;
|
||||
|
||||
po::positional_options_description positional_desc;
|
||||
@ -107,6 +109,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
|
||||
bool use_deflate_qpl = options.count("deflate_qpl");
|
||||
bool stat_mode = options.count("stat");
|
||||
bool use_none = options.count("none");
|
||||
print_stacktrace = options.count("stacktrace");
|
||||
unsigned block_size = options["block-size"].as<unsigned>();
|
||||
std::vector<std::string> codecs;
|
||||
if (options.count("codec"))
|
||||
@ -188,11 +191,12 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
|
||||
/// Compression
|
||||
CompressedWriteBuffer to(*wb, codec, block_size);
|
||||
copyData(*rb, to);
|
||||
to.finalize();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << getCurrentExceptionMessage(true) << '\n';
|
||||
std::cerr << getCurrentExceptionMessage(print_stacktrace) << '\n';
|
||||
return getCurrentExceptionCode();
|
||||
}
|
||||
|
||||
|
@ -193,7 +193,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
|
||||
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Delta);
|
||||
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
|
||||
{
|
||||
UInt8 delta_bytes_size = 0;
|
||||
/// Default bytes size is 1.
|
||||
UInt8 delta_bytes_size = 1;
|
||||
|
||||
if (arguments && !arguments->children.empty())
|
||||
{
|
||||
@ -202,8 +203,8 @@ void registerCodecDelta(CompressionCodecFactory & factory)
|
||||
|
||||
const auto children = arguments->children;
|
||||
const auto * literal = children[0]->as<ASTLiteral>();
|
||||
if (!literal)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be integer");
|
||||
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Delta codec argument must be unsigned integer");
|
||||
|
||||
size_t user_bytes_size = literal->value.safeGet<UInt64>();
|
||||
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/BitHelpers.h>
|
||||
@ -31,7 +31,7 @@ namespace DB
|
||||
/** DoubleDelta column codec implementation.
|
||||
*
|
||||
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
|
||||
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
|
||||
* to support 64bit types. The drawback is 1 extra bit for 32-bit wide deltas: 5-bit prefix
|
||||
* instead of 4-bit prefix.
|
||||
*
|
||||
* This codec is best used against monotonic integer sequences with constant (or almost constant)
|
||||
@ -145,6 +145,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_COMPRESS;
|
||||
extern const int CANNOT_DECOMPRESS;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
|
||||
extern const int ILLEGAL_CODEC_PARAMETER;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -549,10 +551,28 @@ void registerCodecDoubleDelta(CompressionCodecFactory & factory)
|
||||
factory.registerCompressionCodecWithType("DoubleDelta", method_code,
|
||||
[&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
|
||||
{
|
||||
if (arguments)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec DoubleDelta does not accept any arguments");
|
||||
/// Default bytes size is 1.
|
||||
UInt8 data_bytes_size = 1;
|
||||
if (arguments && !arguments->children.empty())
|
||||
{
|
||||
if (arguments->children.size() > 1)
|
||||
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "DoubleDelta codec must have 1 parameter, given {}", arguments->children.size());
|
||||
|
||||
const auto children = arguments->children;
|
||||
const auto * literal = children[0]->as<ASTLiteral>();
|
||||
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "DoubleDelta codec argument must be unsigned integer");
|
||||
|
||||
size_t user_bytes_size = literal->value.safeGet<UInt64>();
|
||||
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for DoubleDelta codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
|
||||
data_bytes_size = static_cast<UInt8>(user_bytes_size);
|
||||
}
|
||||
else if (column_type)
|
||||
{
|
||||
data_bytes_size = getDataBytesSize(column_type);
|
||||
}
|
||||
|
||||
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
|
||||
return std::make_shared<CompressionCodecDoubleDelta>(data_bytes_size);
|
||||
});
|
||||
}
|
||||
|
@ -109,28 +109,42 @@ void registerCodecFPC(CompressionCodecFactory & factory)
|
||||
auto method_code = static_cast<UInt8>(CompressionMethodByte::FPC);
|
||||
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
|
||||
{
|
||||
UInt8 float_width = 0;
|
||||
/// Set default float width to 4.
|
||||
UInt8 float_width = 4;
|
||||
if (column_type != nullptr)
|
||||
float_width = getFloatBytesSize(*column_type);
|
||||
|
||||
UInt8 level = CompressionCodecFPC::DEFAULT_COMPRESSION_LEVEL;
|
||||
if (arguments && !arguments->children.empty())
|
||||
{
|
||||
if (arguments->children.size() > 1)
|
||||
if (arguments->children.size() > 2)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE,
|
||||
"FPC codec must have 1 parameter, given {}", arguments->children.size());
|
||||
"FPC codec must have from 0 to 2 parameters, given {}", arguments->children.size());
|
||||
}
|
||||
|
||||
const auto * literal = arguments->children.front()->as<ASTLiteral>();
|
||||
if (!literal)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be integer");
|
||||
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
|
||||
|
||||
level = literal->value.safeGet<UInt8>();
|
||||
if (level < 1 || level > CompressionCodecFPC::MAX_COMPRESSION_LEVEL)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec level must be between {} and {}",
|
||||
1, static_cast<int>(CompressionCodecFPC::MAX_COMPRESSION_LEVEL));
|
||||
|
||||
if (arguments->children.size() == 2)
|
||||
{
|
||||
literal = arguments->children[1]->as<ASTLiteral>();
|
||||
if (!literal || !isInt64OrUInt64FieldType(literal->value.getType()))
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "FPC codec argument must be unsigned integer");
|
||||
|
||||
size_t user_float_width = literal->value.safeGet<UInt64>();
|
||||
if (user_float_width != 4 && user_float_width != 8)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Float size for FPC codec can be 4 or 8, given {}", user_float_width);
|
||||
float_width = static_cast<UInt8>(user_float_width);
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_shared<CompressionCodecFPC>(float_width, level);
|
||||
};
|
||||
factory.registerCompressionCodecWithType("FPC", method_code, codec_builder);
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Compression/CompressionFactory.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/BitHelpers.h>
|
||||
@ -134,6 +135,8 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_COMPRESS;
|
||||
extern const int CANNOT_DECOMPRESS;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
|
||||
extern const int ILLEGAL_CODEC_PARAMETER;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -445,10 +448,28 @@ void registerCodecGorilla(CompressionCodecFactory & factory)
|
||||
UInt8 method_code = static_cast<UInt8>(CompressionMethodByte::Gorilla);
|
||||
auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
|
||||
{
|
||||
if (arguments)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Codec Gorilla does not accept any arguments");
|
||||
/// Default bytes size is 1
|
||||
UInt8 data_bytes_size = 1;
|
||||
if (arguments && !arguments->children.empty())
|
||||
{
|
||||
if (arguments->children.size() > 1)
|
||||
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "Gorilla codec must have 1 parameter, given {}", arguments->children.size());
|
||||
|
||||
const auto children = arguments->children;
|
||||
const auto * literal = children[0]->as<ASTLiteral>();
|
||||
if (!literal || literal->value.getType() != Field::Types::Which::UInt64)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Gorilla codec argument must be unsigned integer");
|
||||
|
||||
size_t user_bytes_size = literal->value.safeGet<UInt64>();
|
||||
if (user_bytes_size != 1 && user_bytes_size != 2 && user_bytes_size != 4 && user_bytes_size != 8)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Argument value for Gorilla codec can be 1, 2, 4 or 8, given {}", user_bytes_size);
|
||||
data_bytes_size = static_cast<UInt8>(user_bytes_size);
|
||||
}
|
||||
else if (column_type)
|
||||
{
|
||||
data_bytes_size = getDataBytesSize(column_type);
|
||||
}
|
||||
|
||||
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
|
||||
return std::make_shared<CompressionCodecGorilla>(data_bytes_size);
|
||||
};
|
||||
factory.registerCompressionCodecWithType("Gorilla", method_code, codec_builder);
|
||||
|
@ -33,7 +33,8 @@ public:
|
||||
Bit
|
||||
};
|
||||
|
||||
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
|
||||
// type_idx_ is required for compression, but not for decompression.
|
||||
CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_);
|
||||
|
||||
uint8_t getMethodByte() const override;
|
||||
|
||||
@ -53,7 +54,7 @@ protected:
|
||||
bool isGenericCompression() const override { return false; }
|
||||
|
||||
private:
|
||||
TypeIndex type_idx;
|
||||
std::optional<TypeIndex> type_idx;
|
||||
Variant variant;
|
||||
};
|
||||
|
||||
@ -91,9 +92,12 @@ enum class MagicNumber : uint8_t
|
||||
IPv4 = 21,
|
||||
};
|
||||
|
||||
MagicNumber serializeTypeId(TypeIndex type_id)
|
||||
MagicNumber serializeTypeId(std::optional<TypeIndex> type_id)
|
||||
{
|
||||
switch (type_id)
|
||||
if (!type_id)
|
||||
throw Exception(ErrorCodes::CANNOT_COMPRESS, "T64 codec doesn't support compression without information about column type");
|
||||
|
||||
switch (*type_id)
|
||||
{
|
||||
case TypeIndex::UInt8: return MagicNumber::UInt8;
|
||||
case TypeIndex::UInt16: return MagicNumber::UInt16;
|
||||
@ -115,7 +119,7 @@ MagicNumber serializeTypeId(TypeIndex type_id)
|
||||
break;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(type_id));
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Type is not supported by T64 codec: {}", static_cast<UInt32>(*type_id));
|
||||
}
|
||||
|
||||
TypeIndex deserializeTypeId(uint8_t serialized_type_id)
|
||||
@ -632,7 +636,7 @@ UInt32 CompressionCodecT64::doCompressData(const char * src, UInt32 src_size, ch
|
||||
memcpy(dst, &cookie, 1);
|
||||
dst += 1;
|
||||
|
||||
switch (baseType(type_idx))
|
||||
switch (baseType(*type_idx))
|
||||
{
|
||||
case TypeIndex::Int8:
|
||||
return 1 + compressData<Int8>(src, src_size, dst, variant);
|
||||
@ -699,7 +703,7 @@ uint8_t CompressionCodecT64::getMethodByte() const
|
||||
return codecId();
|
||||
}
|
||||
|
||||
CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
|
||||
CompressionCodecT64::CompressionCodecT64(std::optional<TypeIndex> type_idx_, Variant variant_)
|
||||
: type_idx(type_idx_)
|
||||
, variant(variant_)
|
||||
{
|
||||
@ -712,7 +716,7 @@ CompressionCodecT64::CompressionCodecT64(TypeIndex type_idx_, Variant variant_)
|
||||
void CompressionCodecT64::updateHash(SipHash & hash) const
|
||||
{
|
||||
getCodecDesc()->updateTreeHash(hash);
|
||||
hash.update(type_idx);
|
||||
hash.update(type_idx.value_or(TypeIndex::Nothing));
|
||||
hash.update(variant);
|
||||
}
|
||||
|
||||
@ -742,9 +746,14 @@ void registerCodecT64(CompressionCodecFactory & factory)
|
||||
throw Exception(ErrorCodes::ILLEGAL_CODEC_PARAMETER, "Wrong modification for T64: {}", name);
|
||||
}
|
||||
|
||||
auto type_idx = typeIdx(type);
|
||||
if (type && type_idx == TypeIndex::Nothing)
|
||||
throw Exception(ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
|
||||
std::optional<TypeIndex> type_idx;
|
||||
if (type)
|
||||
{
|
||||
type_idx = typeIdx(type);
|
||||
if (type_idx == TypeIndex::Nothing)
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE, "T64 codec is not supported for specified type {}", type->getName());
|
||||
}
|
||||
return std::make_shared<CompressionCodecT64>(type_idx, variant);
|
||||
};
|
||||
|
||||
|
@ -848,6 +848,23 @@ std::string ExpressionActions::dumpActions() const
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
void ExpressionActions::describeActions(WriteBuffer & out, std::string_view prefix) const
|
||||
{
|
||||
bool first = true;
|
||||
|
||||
for (const auto & action : actions)
|
||||
{
|
||||
out << prefix << (first ? "Actions: " : " ");
|
||||
out << action.toString() << '\n';
|
||||
first = false;
|
||||
}
|
||||
|
||||
out << prefix << "Positions:";
|
||||
for (const auto & pos : result_positions)
|
||||
out << ' ' << pos;
|
||||
out << '\n';
|
||||
}
|
||||
|
||||
JSONBuilder::ItemPtr ExpressionActions::toTree() const
|
||||
{
|
||||
auto inputs_array = std::make_unique<JSONBuilder::JSONArray>();
|
||||
|
@ -109,6 +109,9 @@ public:
|
||||
const Block & getSampleBlock() const { return sample_block; }
|
||||
|
||||
std::string dumpActions() const;
|
||||
|
||||
void describeActions(WriteBuffer & out, std::string_view prefix) const;
|
||||
|
||||
JSONBuilder::ItemPtr toTree() const;
|
||||
|
||||
static NameAndTypePair getSmallestColumn(const NamesAndTypesList & columns);
|
||||
|
@ -38,7 +38,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
|
||||
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
|
@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = false,
|
||||
|
@ -40,7 +40,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -21,7 +21,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
@ -32,9 +31,6 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
|
||||
, final(final_)
|
||||
, use_nulls(use_nulls_)
|
||||
{
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
}
|
||||
|
||||
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
|
||||
@ -89,9 +85,5 @@ void CubeStep::updateOutputStream()
|
||||
{
|
||||
output_stream = createOutputStream(
|
||||
input_streams.front(), generateOutputHeader(params.getHeader(input_streams.front().header, final), params.keys, use_nulls), getDataStreamTraits());
|
||||
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
}
|
||||
}
|
||||
|
@ -10,28 +10,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names)
|
||||
{
|
||||
if (distinct_names.empty())
|
||||
return false;
|
||||
|
||||
/// Now we need to check that distinct_names is a subset of columns.
|
||||
std::unordered_set<std::string_view> columns_set(columns.begin(), columns.end());
|
||||
for (const auto & name : distinct_names)
|
||||
if (!columns_set.contains(name))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static ITransformingStep::Traits getTraits(bool pre_distinct, bool already_distinct_columns)
|
||||
static ITransformingStep::Traits getTraits(bool pre_distinct)
|
||||
{
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
|
||||
.returns_single_stream = !pre_distinct && !already_distinct_columns,
|
||||
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
|
||||
.returns_single_stream = !pre_distinct,
|
||||
.preserves_number_of_streams = pre_distinct,
|
||||
.preserves_sorting = true, /// Sorting is preserved indeed because of implementation.
|
||||
},
|
||||
{
|
||||
@ -62,34 +47,23 @@ DistinctStep::DistinctStep(
|
||||
: ITransformingStep(
|
||||
input_stream_,
|
||||
input_stream_.header,
|
||||
getTraits(pre_distinct_, checkColumnsAlreadyDistinct(columns_, input_stream_.distinct_columns)))
|
||||
getTraits(pre_distinct_))
|
||||
, set_size_limits(set_size_limits_)
|
||||
, limit_hint(limit_hint_)
|
||||
, columns(columns_)
|
||||
, pre_distinct(pre_distinct_)
|
||||
, optimize_distinct_in_order(optimize_distinct_in_order_)
|
||||
{
|
||||
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
|
||||
&& (!pre_distinct /// Main distinct
|
||||
|| input_stream_.has_single_port)) /// pre_distinct for single port works as usual one
|
||||
{
|
||||
/// Build distinct set.
|
||||
for (const auto & name : columns)
|
||||
output_stream->distinct_columns.insert(name);
|
||||
}
|
||||
}
|
||||
|
||||
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
const auto & input_stream = input_streams.back();
|
||||
if (checkColumnsAlreadyDistinct(columns, input_stream.distinct_columns))
|
||||
return;
|
||||
|
||||
if (!pre_distinct)
|
||||
pipeline.resize(1);
|
||||
|
||||
if (optimize_distinct_in_order)
|
||||
{
|
||||
const auto & input_stream = input_streams.back();
|
||||
const SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
|
||||
if (!distinct_sort_desc.empty())
|
||||
{
|
||||
@ -197,16 +171,7 @@ void DistinctStep::updateOutputStream()
|
||||
output_stream = createOutputStream(
|
||||
input_streams.front(),
|
||||
input_streams.front().header,
|
||||
getTraits(pre_distinct, checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns)).data_stream_traits);
|
||||
|
||||
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
|
||||
&& (!pre_distinct /// Main distinct
|
||||
|| input_streams.front().has_single_port)) /// pre_distinct for single port works as usual one
|
||||
{
|
||||
/// Build distinct set.
|
||||
for (const auto & name : columns)
|
||||
output_stream->distinct_columns.insert(name);
|
||||
}
|
||||
getTraits(pre_distinct).data_stream_traits);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions, const
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = !actions->hasArrayJoin(),
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = actions->isSortingPreserved(header, sort_description),
|
||||
@ -33,8 +32,6 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, const ActionsDA
|
||||
getTraits(actions_dag_, input_stream_.header, input_stream_.sort_description))
|
||||
, actions_dag(actions_dag_)
|
||||
{
|
||||
/// Some columns may be removed by expression.
|
||||
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
|
||||
}
|
||||
|
||||
void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
|
||||
@ -63,22 +60,9 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu
|
||||
|
||||
void ExpressionStep::describeActions(FormatSettings & settings) const
|
||||
{
|
||||
String prefix(settings.offset, ' ');
|
||||
bool first = true;
|
||||
|
||||
String prefix(settings.offset, settings.indent_char);
|
||||
auto expression = std::make_shared<ExpressionActions>(actions_dag);
|
||||
for (const auto & action : expression->getActions())
|
||||
{
|
||||
settings.out << prefix << (first ? "Actions: "
|
||||
: " ");
|
||||
first = false;
|
||||
settings.out << action.toString() << '\n';
|
||||
}
|
||||
|
||||
settings.out << prefix << "Positions:";
|
||||
for (const auto & pos : expression->getResultPositions())
|
||||
settings.out << ' ' << pos;
|
||||
settings.out << '\n';
|
||||
expression->describeActions(settings.out, prefix);
|
||||
}
|
||||
|
||||
void ExpressionStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||
|
@ -9,7 +9,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -17,7 +17,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -23,7 +23,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & expression, con
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = !expression->hasArrayJoin(), /// I suppose it actually never happens
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = preserves_sorting,
|
||||
@ -51,8 +50,6 @@ FilterStep::FilterStep(
|
||||
, filter_column_name(std::move(filter_column_name_))
|
||||
, remove_filter_column(remove_filter_column_)
|
||||
{
|
||||
/// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name.
|
||||
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
|
||||
}
|
||||
|
||||
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
|
||||
@ -82,27 +79,15 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
|
||||
|
||||
void FilterStep::describeActions(FormatSettings & settings) const
|
||||
{
|
||||
String prefix(settings.offset, ' ');
|
||||
String prefix(settings.offset, settings.indent_char);
|
||||
settings.out << prefix << "Filter column: " << filter_column_name;
|
||||
|
||||
if (remove_filter_column)
|
||||
settings.out << " (removed)";
|
||||
settings.out << '\n';
|
||||
|
||||
bool first = true;
|
||||
auto expression = std::make_shared<ExpressionActions>(actions_dag);
|
||||
for (const auto & action : expression->getActions())
|
||||
{
|
||||
settings.out << prefix << (first ? "Actions: "
|
||||
: " ");
|
||||
first = false;
|
||||
settings.out << action.toString() << '\n';
|
||||
}
|
||||
|
||||
settings.out << prefix << "Positions:";
|
||||
for (const auto & pos : expression->getResultPositions())
|
||||
settings.out << ' ' << pos;
|
||||
settings.out << '\n';
|
||||
expression->describeActions(settings.out, prefix);
|
||||
}
|
||||
|
||||
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||
|
@ -23,11 +23,6 @@ class DataStream
|
||||
public:
|
||||
Block header;
|
||||
|
||||
/// Tuples with those columns are distinct.
|
||||
/// It doesn't mean that columns are distinct separately.
|
||||
/// Removing any column from this list breaks this invariant.
|
||||
NameSet distinct_columns = {};
|
||||
|
||||
/// QueryPipeline has single port. Totals or extremes ports are not counted.
|
||||
bool has_single_port = false;
|
||||
|
||||
@ -51,8 +46,7 @@ public:
|
||||
|
||||
bool hasEqualPropertiesWith(const DataStream & other) const
|
||||
{
|
||||
return distinct_columns == other.distinct_columns
|
||||
&& has_single_port == other.has_single_port
|
||||
return has_single_port == other.has_single_port
|
||||
&& sort_description == other.sort_description
|
||||
&& (sort_description.empty() || sort_scope == other.sort_scope);
|
||||
}
|
||||
|
@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
|
||||
{
|
||||
DataStream output_stream{.header = std::move(output_header)};
|
||||
|
||||
if (stream_traits.preserves_distinct_columns)
|
||||
output_stream.distinct_columns = input_stream.distinct_columns;
|
||||
|
||||
output_stream.has_single_port = stream_traits.returns_single_stream
|
||||
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
|
||||
|
||||
@ -50,21 +47,6 @@ QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders
|
||||
return std::move(pipelines.front());
|
||||
}
|
||||
|
||||
void ITransformingStep::updateDistinctColumns(const Block & res_header, NameSet & distinct_columns)
|
||||
{
|
||||
if (distinct_columns.empty())
|
||||
return;
|
||||
|
||||
for (const auto & column : distinct_columns)
|
||||
{
|
||||
if (!res_header.has(column))
|
||||
{
|
||||
distinct_columns.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ITransformingStep::describePipeline(FormatSettings & settings) const
|
||||
{
|
||||
IQueryPlanStep::describePipeline(processors, settings);
|
||||
|
@ -18,11 +18,6 @@ public:
|
||||
/// They are specified in constructor and cannot be changed.
|
||||
struct DataStreamTraits
|
||||
{
|
||||
/// Keep distinct_columns unchanged.
|
||||
/// Examples: true for LimitStep, false for ExpressionStep with ARRAY JOIN
|
||||
/// It some columns may be removed from result header, call updateDistinctColumns
|
||||
bool preserves_distinct_columns;
|
||||
|
||||
/// True if pipeline has single output port after this step.
|
||||
/// Examples: MergeSortingStep, AggregatingStep
|
||||
bool returns_single_stream;
|
||||
@ -69,8 +64,6 @@ public:
|
||||
input_streams.emplace_back(std::move(input_stream));
|
||||
|
||||
updateOutputStream();
|
||||
|
||||
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
|
||||
}
|
||||
|
||||
void describePipeline(FormatSettings & settings) const override;
|
||||
@ -83,9 +76,6 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Clear distinct_columns if res_header doesn't contain all of them.
|
||||
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);
|
||||
|
||||
/// Create output stream from header and traits.
|
||||
static DataStream createOutputStream(
|
||||
const DataStream & input_stream,
|
||||
|
@ -83,7 +83,6 @@ static ITransformingStep::Traits getStorageJoinTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = false,
|
||||
|
@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = true,
|
||||
|
@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -24,7 +24,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
@ -62,10 +61,6 @@ MergingAggregatedStep::MergingAggregatedStep(
|
||||
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
|
||||
, memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)
|
||||
{
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
|
||||
if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
|
||||
{
|
||||
output_stream->sort_description = group_by_sort_description;
|
||||
@ -157,10 +152,6 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||
void MergingAggregatedStep::updateOutputStream()
|
||||
{
|
||||
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
|
||||
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
}
|
||||
|
||||
bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const
|
||||
|
@ -27,6 +27,7 @@ public:
|
||||
bool memory_bound_merging_of_aggregation_results_enabled_);
|
||||
|
||||
String getName() const override { return "MergingAggregated"; }
|
||||
const Aggregator::Params & getParams() const { return params; }
|
||||
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
|
@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Processors/QueryPlan/JoinStep.h>
|
||||
#include <Processors/QueryPlan/LimitByStep.h>
|
||||
#include <Processors/QueryPlan/LimitStep.h>
|
||||
#include <Processors/QueryPlan/MergingAggregatedStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/RollupStep.h>
|
||||
#include <Processors/QueryPlan/SortingStep.h>
|
||||
@ -100,24 +101,29 @@ namespace
|
||||
logDebug("aggregation_keys", aggregation_keys);
|
||||
logDebug("aggregation_keys size", aggregation_keys.size());
|
||||
logDebug("distinct_columns size", distinct_columns.size());
|
||||
if (aggregation_keys.size() != distinct_columns.size())
|
||||
return false;
|
||||
|
||||
/// compare columns of two DISTINCTs
|
||||
std::set<std::string_view> original_distinct_columns;
|
||||
for (const auto & column : distinct_columns)
|
||||
{
|
||||
logDebug("distinct column name", column);
|
||||
const auto * alias_node = getOriginalNodeForOutputAlias(path_actions, String(column));
|
||||
if (!alias_node)
|
||||
{
|
||||
logDebug("original name for alias is not found for", column);
|
||||
return false;
|
||||
logDebug("original name for alias is not found", column);
|
||||
original_distinct_columns.insert(column);
|
||||
}
|
||||
|
||||
logDebug("alias result name", alias_node->result_name);
|
||||
if (std::find(cbegin(aggregation_keys), cend(aggregation_keys), alias_node->result_name) == aggregation_keys.cend())
|
||||
else
|
||||
{
|
||||
logDebug("alias result name is not found in aggregation keys", alias_node->result_name);
|
||||
logDebug("alias result name", alias_node->result_name);
|
||||
original_distinct_columns.insert(alias_node->result_name);
|
||||
}
|
||||
}
|
||||
/// if aggregation keys are part of distinct columns then rows already distinct
|
||||
for (const auto & key : aggregation_keys)
|
||||
{
|
||||
if (!original_distinct_columns.contains(key))
|
||||
{
|
||||
logDebug("aggregation key NOT found: {}", key);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -176,7 +182,7 @@ namespace
|
||||
while (!node->children.empty())
|
||||
{
|
||||
const IQueryPlanStep * current_step = node->step.get();
|
||||
if (typeid_cast<const AggregatingStep *>(current_step))
|
||||
if (typeid_cast<const AggregatingStep *>(current_step) || typeid_cast<const MergingAggregatedStep *>(current_step))
|
||||
{
|
||||
aggregation_before_distinct = current_step;
|
||||
break;
|
||||
@ -208,6 +214,9 @@ namespace
|
||||
|
||||
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
|
||||
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
|
||||
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
|
||||
merging_aggregated_step)
|
||||
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -340,57 +340,55 @@ Pipe ReadFromMergeTree::readFromPool(
|
||||
/ max_block_size * max_block_size / fixed_index_granularity;
|
||||
}
|
||||
|
||||
bool all_parts_are_remote = true;
|
||||
bool all_parts_are_local = true;
|
||||
for (const auto & part : parts_with_range)
|
||||
{
|
||||
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
|
||||
all_parts_are_local &= !is_remote;
|
||||
all_parts_are_remote &= is_remote;
|
||||
}
|
||||
bool all_parts_are_remote = true;
|
||||
bool all_parts_are_local = true;
|
||||
for (const auto & part : parts_with_range)
|
||||
{
|
||||
const bool is_remote = part.data_part->isStoredOnRemoteDisk();
|
||||
all_parts_are_local &= !is_remote;
|
||||
all_parts_are_remote &= is_remote;
|
||||
}
|
||||
|
||||
MergeTreeReadPoolPtr pool;
|
||||
MergeTreeReadPoolPtr pool;
|
||||
|
||||
if ((all_parts_are_remote
|
||||
&& settings.allow_prefetched_read_pool_for_remote_filesystem
|
||||
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|
||||
|| (all_parts_are_local
|
||||
&& settings.allow_prefetched_read_pool_for_local_filesystem
|
||||
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method)))
|
||||
{
|
||||
pool = std::make_shared<MergeTreePrefetchedReadPool>(
|
||||
max_streams,
|
||||
sum_marks,
|
||||
min_marks_for_concurrent_read,
|
||||
std::move(parts_with_range),
|
||||
storage_snapshot,
|
||||
prewhere_info,
|
||||
actions_settings,
|
||||
required_columns,
|
||||
virt_column_names,
|
||||
settings.preferred_block_size_bytes,
|
||||
reader_settings,
|
||||
context,
|
||||
use_uncompressed_cache,
|
||||
all_parts_are_remote,
|
||||
*data.getSettings());
|
||||
}
|
||||
else
|
||||
{
|
||||
pool = std::make_shared<MergeTreeReadPool>(
|
||||
max_streams,
|
||||
sum_marks,
|
||||
min_marks_for_concurrent_read,
|
||||
std::move(parts_with_range),
|
||||
storage_snapshot,
|
||||
prewhere_info,
|
||||
actions_settings,
|
||||
reader_settings,
|
||||
required_columns,
|
||||
virt_column_names,
|
||||
context,
|
||||
false);
|
||||
}
|
||||
if ((all_parts_are_remote && settings.allow_prefetched_read_pool_for_remote_filesystem
|
||||
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.remote_fs_method))
|
||||
|| (all_parts_are_local && settings.allow_prefetched_read_pool_for_local_filesystem
|
||||
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method)))
|
||||
{
|
||||
pool = std::make_shared<MergeTreePrefetchedReadPool>(
|
||||
max_streams,
|
||||
sum_marks,
|
||||
min_marks_for_concurrent_read,
|
||||
std::move(parts_with_range),
|
||||
storage_snapshot,
|
||||
prewhere_info,
|
||||
actions_settings,
|
||||
required_columns,
|
||||
virt_column_names,
|
||||
settings.preferred_block_size_bytes,
|
||||
reader_settings,
|
||||
context,
|
||||
use_uncompressed_cache,
|
||||
all_parts_are_remote,
|
||||
*data.getSettings());
|
||||
}
|
||||
else
|
||||
{
|
||||
pool = std::make_shared<MergeTreeReadPool>(
|
||||
max_streams,
|
||||
sum_marks,
|
||||
min_marks_for_concurrent_read,
|
||||
std::move(parts_with_range),
|
||||
storage_snapshot,
|
||||
prewhere_info,
|
||||
actions_settings,
|
||||
reader_settings,
|
||||
required_columns,
|
||||
virt_column_names,
|
||||
context,
|
||||
false);
|
||||
}
|
||||
|
||||
auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)");
|
||||
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, max_streams);
|
||||
@ -1732,6 +1730,36 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
|
||||
format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n';
|
||||
format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n';
|
||||
}
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
format_settings.out << prefix << "Prewhere info" << '\n';
|
||||
format_settings.out << prefix << "Need filter: " << prewhere_info->need_filter << '\n';
|
||||
|
||||
prefix.push_back(format_settings.indent_char);
|
||||
prefix.push_back(format_settings.indent_char);
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
{
|
||||
format_settings.out << prefix << "Prewhere filter" << '\n';
|
||||
format_settings.out << prefix << "Prewhere filter column: " << prewhere_info->prewhere_column_name;
|
||||
if (prewhere_info->remove_prewhere_column)
|
||||
format_settings.out << " (removed)";
|
||||
format_settings.out << '\n';
|
||||
|
||||
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
|
||||
expression->describeActions(format_settings.out, prefix);
|
||||
}
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
format_settings.out << prefix << "Row level filter" << '\n';
|
||||
format_settings.out << prefix << "Row level filter column: " << prewhere_info->row_level_column_name << '\n';
|
||||
|
||||
auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
|
||||
expression->describeActions(format_settings.out, prefix);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
|
||||
@ -1743,6 +1771,35 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
|
||||
map.add("Parts", result.index_stats.back().num_parts_after);
|
||||
map.add("Granules", result.index_stats.back().num_granules_after);
|
||||
}
|
||||
|
||||
if (prewhere_info)
|
||||
{
|
||||
std::unique_ptr<JSONBuilder::JSONMap> prewhere_info_map = std::make_unique<JSONBuilder::JSONMap>();
|
||||
prewhere_info_map->add("Need filter", prewhere_info->need_filter);
|
||||
|
||||
if (prewhere_info->prewhere_actions)
|
||||
{
|
||||
std::unique_ptr<JSONBuilder::JSONMap> prewhere_filter_map = std::make_unique<JSONBuilder::JSONMap>();
|
||||
prewhere_filter_map->add("Prewhere filter column", prewhere_info->prewhere_column_name);
|
||||
prewhere_filter_map->add("Prewhere filter remove filter column", prewhere_info->remove_prewhere_column);
|
||||
auto expression = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions);
|
||||
prewhere_filter_map->add("Prewhere filter expression", expression->toTree());
|
||||
|
||||
prewhere_info_map->add("Prewhere filter", std::move(prewhere_filter_map));
|
||||
}
|
||||
|
||||
if (prewhere_info->row_level_filter)
|
||||
{
|
||||
std::unique_ptr<JSONBuilder::JSONMap> row_level_filter_map = std::make_unique<JSONBuilder::JSONMap>();
|
||||
row_level_filter_map->add("Row level filter column", prewhere_info->row_level_column_name);
|
||||
auto expression = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter);
|
||||
row_level_filter_map->add("Row level filter expression", expression->toTree());
|
||||
|
||||
prewhere_info_map->add("Row level filter", std::move(row_level_filter_map));
|
||||
}
|
||||
|
||||
map.add("Prewhere info", std::move(prewhere_info_map));
|
||||
}
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
|
||||
|
@ -11,7 +11,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
@ -29,9 +28,6 @@ RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params para
|
||||
, final(final_)
|
||||
, use_nulls(use_nulls_)
|
||||
{
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
}
|
||||
|
||||
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
|
||||
@ -54,10 +50,6 @@ void RollupStep::updateOutputStream()
|
||||
{
|
||||
output_stream = createOutputStream(
|
||||
input_streams.front(), appendGroupingSetColumn(params.getHeader(input_streams.front().header, final)), getDataStreamTraits());
|
||||
|
||||
/// Aggregation keys are distinct
|
||||
for (const auto & key : params.keys)
|
||||
output_stream->distinct_columns.insert(key);
|
||||
}
|
||||
|
||||
|
||||
|
@ -45,7 +45,6 @@ static ITransformingStep::Traits getTraits(size_t limit)
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
|
@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits(bool has_filter)
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = true,
|
||||
|
@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits()
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = true,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = true,
|
||||
|
@ -2,5 +2,5 @@ CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Delta, ZSTD(22)))
|
||||
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(DoubleDelta, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
|
||||
CREATE TABLE delta_codec_synthetic (`id` Decimal(38, 10) CODEC(Gorilla, ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
|
||||
|
||||
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(DoubleDelta(3), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
|
||||
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(Gorilla('hello, world'), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError 36 }
|
||||
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(DoubleDelta(3), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError ILLEGAL_CODEC_PARAMETER }
|
||||
CREATE TABLE delta_codec_synthetic (`id` UInt64 CODEC(Gorilla('hello, world'), ZSTD(22))) ENGINE = MergeTree() ORDER BY tuple(); -- { serverError ILLEGAL_CODEC_PARAMETER }
|
||||
|
@ -464,3 +464,16 @@ Expression ((Projection + (Before ORDER BY + (Projection + Before ORDER BY))))
|
||||
1
|
||||
|
||||
0
|
||||
-- DISTINCT COUNT() with GROUP BY => do _not_ remove DISTINCT
|
||||
-- query
|
||||
select distinct count() from numbers(10) group by number
|
||||
-- explain
|
||||
Expression (Projection)
|
||||
Distinct
|
||||
Distinct (Preliminary DISTINCT)
|
||||
Expression (Before ORDER BY)
|
||||
Aggregating
|
||||
Expression (Before GROUP BY)
|
||||
ReadFromStorage (SystemNumbers)
|
||||
-- execute
|
||||
1
|
||||
|
@ -256,3 +256,7 @@ FROM
|
||||
GROUP BY a WITH TOTALS
|
||||
)"
|
||||
run_query "$query"
|
||||
|
||||
echo "-- DISTINCT COUNT() with GROUP BY => do _not_ remove DISTINCT"
|
||||
query="select distinct count() from numbers(10) group by number"
|
||||
run_query "$query"
|
||||
|
@ -0,0 +1,9 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
34
tests/queries/0_stateless/02584_compressor_codecs.sh
Executable file
34
tests/queries/0_stateless/02584_compressor_codecs.sh
Executable file
@ -0,0 +1,34 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
echo "Hello, World!" > 02584_test_data
|
||||
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Delta' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Delta(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Delta([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Delta(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'DoubleDelta(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out'
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla([1,2])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'Gorilla(4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'FPC' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5, 1)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'FPC([1,2,3])' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "ILLEGAL_CODEC_PARAMETER";
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'FPC(5, 4)' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out';
|
||||
|
||||
|
||||
$CLICKHOUSE_COMPRESSOR --codec 'T64' --codec 'LZ4' --input '02584_test_data' --output '02584_test_out' 2>&1 | grep -c "CANNOT_COMPRESS";
|
||||
|
||||
rm 02584_test_data 02584_test_out
|
||||
|
@ -0,0 +1,26 @@
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
Header: id UInt64
|
||||
value String
|
||||
Actions: INPUT :: 0 -> id UInt64 : 0
|
||||
INPUT :: 1 -> value String : 1
|
||||
Positions: 0 1
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Header: id UInt64
|
||||
value String
|
||||
ReadType: Default
|
||||
Parts: 0
|
||||
Granules: 0
|
||||
Prewhere info
|
||||
Need filter: 1
|
||||
Prewhere filter
|
||||
Prewhere filter column: equals(id, 5) (removed)
|
||||
Actions: INPUT : 0 -> id UInt64 : 0
|
||||
COLUMN Const(UInt8) -> 5 UInt8 : 1
|
||||
FUNCTION equals(id : 0, 5 :: 1) -> equals(id, 5) UInt8 : 2
|
||||
Positions: 2 0
|
||||
Row level filter
|
||||
Row level filter column: greaterOrEquals(id, 5)
|
||||
Actions: INPUT : 0 -> id UInt64 : 0
|
||||
COLUMN Const(UInt8) -> 5 UInt8 : 1
|
||||
FUNCTION greaterOrEquals(id : 0, 5 :: 1) -> greaterOrEquals(id, 5) UInt8 : 2
|
||||
Positions: 2 0
|
@ -0,0 +1,16 @@
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table VALUES (0, 'Value');
|
||||
|
||||
DROP ROW POLICY IF EXISTS test_row_policy ON test_table;
|
||||
CREATE ROW POLICY test_row_policy ON test_table USING id >= 5 TO ALL;
|
||||
|
||||
EXPLAIN header = 1, actions = 1 SELECT id, value FROM test_table PREWHERE id = 5;
|
||||
|
||||
DROP ROW POLICY test_row_policy ON test_table;
|
||||
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user