Merge branch 'ClickHouse:master' into fix_full_text_with_multi_col

This commit is contained in:
siyuan 2024-08-27 14:07:21 +08:00 committed by GitHub
commit 3fee996750
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
56 changed files with 940 additions and 263 deletions

View File

@ -112,3 +112,5 @@ wadllib==1.3.6
websocket-client==0.59.0
wheel==0.37.1
zipp==1.0.0
deltalake==0.16.0

View File

@ -1389,7 +1389,7 @@ DESC format(JSONEachRow, '{"id" : 1, "age" : 25, "name" : "Josh", "status" : nul
#### schema_inference_make_columns_nullable
Controls making inferred types `Nullable` in schema inference for formats without information about nullability.
If the setting is enabled, all inferred type will be `Nullable`, if disabled, the inferred type will be `Nullable` only if `input_format_null_as_default` is disabled and the column contains `NULL` in a sample that is parsed during schema inference.
If the setting is enabled, all inferred type will be `Nullable`, if disabled, the inferred type will never be `Nullable`, if set to `auto`, the inferred type will be `Nullable` only if the column contains `NULL` in a sample that is parsed during schema inference or file metadata contains information about column nullability.
Enabled by default.
@ -1412,15 +1412,13 @@ DESC format(JSONEachRow, $$
└─────────┴─────────────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
```
```sql
SET schema_inference_make_columns_nullable = 0;
SET input_format_null_as_default = 0;
SET schema_inference_make_columns_nullable = 'auto';
DESC format(JSONEachRow, $$
{"id" : 1, "age" : 25, "name" : "Josh", "status" : null, "hobbies" : ["football", "cooking"]}
{"id" : 2, "age" : 19, "name" : "Alan", "status" : "married", "hobbies" : ["tennis", "art"]}
$$)
```
```response
┌─name────┬─type─────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ id │ Int64 │ │ │ │ │ │
│ age │ Int64 │ │ │ │ │ │
@ -1432,7 +1430,6 @@ DESC format(JSONEachRow, $$
```sql
SET schema_inference_make_columns_nullable = 0;
SET input_format_null_as_default = 1;
DESC format(JSONEachRow, $$
{"id" : 1, "age" : 25, "name" : "Josh", "status" : null, "hobbies" : ["football", "cooking"]}
{"id" : 2, "age" : 19, "name" : "Alan", "status" : "married", "hobbies" : ["tennis", "art"]}

View File

@ -171,8 +171,8 @@ If the `schema_inference_hints` is not formated properly, or if there is a typo
## schema_inference_make_columns_nullable {#schema_inference_make_columns_nullable}
Controls making inferred types `Nullable` in schema inference for formats without information about nullability.
If the setting is enabled, the inferred type will be `Nullable` only if column contains `NULL` in a sample that is parsed during schema inference.
Controls making inferred types `Nullable` in schema inference.
If the setting is enabled, all inferred type will be `Nullable`, if disabled, the inferred type will never be `Nullable`, if set to `auto`, the inferred type will be `Nullable` only if the column contains `NULL` in a sample that is parsed during schema inference or file metadata contains information about column nullability.
Default value: `true`.

View File

@ -688,6 +688,40 @@ SELECT kostikConsistentHash(16045690984833335023, 2);
└───────────────────────────────────────────────┘
```
## ripeMD160
Produces [RIPEMD-160](https://en.wikipedia.org/wiki/RIPEMD) hash value.
**Syntax**
```sql
ripeMD160(input)
```
**Parameters**
- `input`: Input string. [String](../data-types/string.md)
**Returned value**
- A [UInt256](../data-types/int-uint.md) hash value where the 160-bit RIPEMD-160 hash is stored in the first 20 bytes. The remaining 12 bytes are zero-padded.
**Example**
Use the [hex](../functions/encoding-functions.md/#hex) function to represent the result as a hex-encoded string.
Query:
```sql
SELECT hex(ripeMD160('The quick brown fox jumps over the lazy dog'));
```
```response
┌─hex(ripeMD160('The quick brown fox jumps over the lazy dog'))─┐
│ 37F332F68DB77BD9D7EDD4969571AD671CF9DD3B │
└───────────────────────────────────────────────────────────────┘
```
## murmurHash2_32, murmurHash2_64
Produces a [MurmurHash2](https://github.com/aappleby/smhasher) hash value.

View File

@ -124,6 +124,40 @@ SELECT hex(sipHash128('foo', '\x01', 3));
└──────────────────────────────────┘
```
## ripeMD160
Генерирует [RIPEMD-160](https://en.wikipedia.org/wiki/RIPEMD) хеш строки.
**Синтаксис**
```sql
ripeMD160(input)
```
**Аргументы**
- `input`: Строка [String](../data-types/string.md)
**Возвращаемое значение**
- [UInt256](../data-types/int-uint.md), где 160-битный хеш RIPEMD-160 хранится в первых 20 байтах. Оставшиеся 12 байт заполняются нулями.
**Пример**
Используйте функцию [hex](../functions/encoding-functions.md#hex) для представления результата в виде строки с шестнадцатеричной кодировкой
Запрос:
```sql
SELECT hex(ripeMD160('The quick brown fox jumps over the lazy dog'));
```
Результат:
```response
┌─hex(ripeMD160('The quick brown fox jumps over the lazy dog'))─┐
│ 37F332F68DB77BD9D7EDD4969571AD671CF9DD3B │
└───────────────────────────────────────────────────────────────┘
```
## cityHash64 {#cityhash64}
Генерирует 64-х битное значение [CityHash](https://github.com/google/cityhash).

View File

@ -1120,7 +1120,7 @@ class IColumn;
M(String, column_names_for_schema_inference, "", "The list of column names to use in schema inference for formats without column names. The format: 'column1,column2,column3,...'", 0) \
M(String, schema_inference_hints, "", "The list of column names and types to use in schema inference for formats without column names. The format: 'column_name1 column_type1, column_name2 column_type2, ...'", 0) \
M(SchemaInferenceMode, schema_inference_mode, "default", "Mode of schema inference. 'default' - assume that all files have the same schema and schema can be inferred from any file, 'union' - files can have different schemas and the resulting schema should be the a union of schemas of all files", 0) \
M(Bool, schema_inference_make_columns_nullable, true, "If set to true, all inferred types will be Nullable in schema inference for formats without information about nullability.", 0) \
M(UInt64Auto, schema_inference_make_columns_nullable, 1, "If set to true, all inferred types will be Nullable in schema inference. When set to false, no columns will be converted to Nullable. When set to 'auto', ClickHouse will use information about nullability from the data.", 0) \
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_json_read_bools_as_strings, true, "Allow to parse bools as strings in JSON input formats", 0) \
M(Bool, input_format_json_try_infer_numbers_from_strings, false, "Try to infer numbers from string fields while schema inference", 0) \

View File

@ -257,7 +257,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.max_bytes_to_read_for_schema_inference = settings.input_format_max_bytes_to_read_for_schema_inference;
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.schema_inference_hints = settings.schema_inference_hints;
format_settings.schema_inference_make_columns_nullable = settings.schema_inference_make_columns_nullable;
format_settings.schema_inference_make_columns_nullable = settings.schema_inference_make_columns_nullable.valueOr(2);
format_settings.mysql_dump.table_name = settings.input_format_mysql_dump_table_name;
format_settings.mysql_dump.map_column_names = settings.input_format_mysql_dump_map_column_names;
format_settings.sql_insert.max_batch_size = settings.output_format_sql_insert_max_batch_size;

View File

@ -77,7 +77,7 @@ struct FormatSettings
Raw
};
bool schema_inference_make_columns_nullable = true;
UInt64 schema_inference_make_columns_nullable = 1;
DateTimeOutputFormat date_time_output_format = DateTimeOutputFormat::Simple;

View File

@ -1344,7 +1344,11 @@ namespace
if (checkCharCaseInsensitive('n', buf))
{
if (checkStringCaseInsensitive("ull", buf))
return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeNothing>());
{
if (settings.schema_inference_make_columns_nullable == 0)
return std::make_shared<DataTypeNothing>();
return makeNullable(std::make_shared<DataTypeNothing>());
}
else if (checkStringCaseInsensitive("an", buf))
return std::make_shared<DataTypeFloat64>();
}

View File

@ -19,7 +19,9 @@
#include <Common/HashTable/Hash.h>
#if USE_SSL
# include <openssl/evp.h>
# include <openssl/md5.h>
# include <openssl/ripemd.h>
#endif
#include <bit>
@ -196,6 +198,34 @@ T combineHashesFunc(T t1, T t2)
return HashFunction::apply(reinterpret_cast<const char *>(hashes), sizeof(hashes));
}
#if USE_SSL
struct RipeMD160Impl
{
static constexpr auto name = "ripeMD160";
using ReturnType = UInt256;
static UInt256 apply(const char * begin, size_t size)
{
UInt8 digest[RIPEMD160_DIGEST_LENGTH];
RIPEMD160(reinterpret_cast<const unsigned char *>(begin), size, reinterpret_cast<unsigned char *>(digest));
std::reverse(digest, digest + RIPEMD160_DIGEST_LENGTH);
UInt256 res = 0;
std::memcpy(&res, digest, RIPEMD160_DIGEST_LENGTH);
return res;
}
static UInt256 combineHashes(UInt256 h1, UInt256 h2)
{
return combineHashesFunc<UInt256, RipeMD160Impl>(h1, h2);
}
static constexpr bool use_int_hash_for_pods = false;
};
#endif
struct SipHash64Impl
{
@ -1624,6 +1654,7 @@ using FunctionIntHash32 = FunctionIntHash<IntHash32Impl, NameIntHash32>;
using FunctionIntHash64 = FunctionIntHash<IntHash64Impl, NameIntHash64>;
#if USE_SSL
using FunctionHalfMD5 = FunctionAnyHash<HalfMD5Impl>;
using FunctionRipeMD160Hash = FunctionAnyHash<RipeMD160Impl>;
#endif
using FunctionSipHash128 = FunctionAnyHash<SipHash128Impl>;
using FunctionSipHash128Keyed = FunctionAnyHash<SipHash128KeyedImpl, true, SipHash128KeyedImpl::Key, SipHash128KeyedImpl::KeyColumns>;
@ -1652,6 +1683,7 @@ using FunctionXxHash64 = FunctionAnyHash<ImplXxHash64>;
using FunctionXXH3 = FunctionAnyHash<ImplXXH3>;
using FunctionWyHash64 = FunctionAnyHash<ImplWyHash64>;
}
#pragma clang diagnostic pop

View File

@ -0,0 +1,23 @@
#include "FunctionsHashing.h"
#include <Functions/FunctionFactory.h>
/// FunctionsHashing instantiations are separated into files FunctionsHashing*.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
#if USE_SSL
REGISTER_FUNCTION(HashingRipe)
{
factory.registerFunction<FunctionRipeMD160Hash>(FunctionDocumentation{
.description = "RIPEMD-160 hash function, primarily used in Bitcoin address generation.",
.examples{{"", "SELECT hex(ripeMD160('The quick brown fox jumps over the lazy dog'));", R"(
hex(ripeMD160('The quick brown fox jumps over the lazy dog'))
37F332F68DB77BD9D7EDD4969571AD671CF9DD3B
)"}},
.categories{"Hash"}});
}
#endif
}

View File

@ -787,7 +787,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
/// EC2MetadataService delay is in order of seconds so it only make sense to retry after a couple of seconds.
/// But the connection timeout should be small because there is the case when there is no IMDS at all,
/// like outside of the cloud, on your own machines.
aws_client_configuration.connectTimeoutMs = 10;
aws_client_configuration.connectTimeoutMs = 50;
aws_client_configuration.requestTimeoutMs = 1000;
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);

View File

@ -54,13 +54,8 @@ void checkFinalInferredType(
type = default_type;
}
if (settings.schema_inference_make_columns_nullable)
if (settings.schema_inference_make_columns_nullable == 1)
type = makeNullableRecursively(type);
/// In case when data for some column could contain nulls and regular values,
/// resulting inferred type is Nullable.
/// If input_format_null_as_default is enabled, we should remove Nullable type.
else if (settings.null_as_default)
type = removeNullable(type);
}
void ISchemaReader::transformTypesIfNeeded(DB::DataTypePtr & type, DB::DataTypePtr & new_type)

View File

@ -204,8 +204,11 @@ NamesAndTypesList ArrowSchemaReader::readSchema()
schema = file_reader->schema();
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, stream ? "ArrowStream" : "Arrow", format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
*schema,
stream ? "ArrowStream" : "Arrow",
format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference,
format_settings.schema_inference_make_columns_nullable != 0);
if (format_settings.schema_inference_make_columns_nullable == 1)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}

View File

@ -727,6 +727,7 @@ struct ReadColumnFromArrowColumnSettings
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior;
bool allow_arrow_null_type;
bool skip_columns_with_unsupported_types;
bool allow_inferring_nullable_columns;
};
static ColumnWithTypeAndName readColumnFromArrowColumn(
@ -1109,7 +1110,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings)
{
bool read_as_nullable_column = arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable());
bool read_as_nullable_column = (arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable())) && settings.allow_inferring_nullable_columns;
if (read_as_nullable_column &&
arrow_column->type()->id() != arrow::Type::LIST &&
arrow_column->type()->id() != arrow::Type::LARGE_LIST &&
@ -1173,14 +1174,16 @@ static std::shared_ptr<arrow::ChunkedArray> createArrowColumn(const std::shared_
Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types)
bool skip_columns_with_unsupported_types,
bool allow_inferring_nullable_columns)
{
ReadColumnFromArrowColumnSettings settings
{
.format_name = format_name,
.date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore,
.allow_arrow_null_type = false,
.skip_columns_with_unsupported_types = skip_columns_with_unsupported_types
.skip_columns_with_unsupported_types = skip_columns_with_unsupported_types,
.allow_inferring_nullable_columns = allow_inferring_nullable_columns,
};
ColumnsWithTypeAndName sample_columns;
@ -1254,7 +1257,8 @@ Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & nam
.format_name = format_name,
.date_time_overflow_behavior = date_time_overflow_behavior,
.allow_arrow_null_type = true,
.skip_columns_with_unsupported_types = false
.skip_columns_with_unsupported_types = false,
.allow_inferring_nullable_columns = true
};
Columns columns;

View File

@ -34,7 +34,8 @@ public:
static Block arrowSchemaToCHHeader(
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types = false);
bool skip_columns_with_unsupported_types = false,
bool allow_inferring_nullable_columns = true);
struct DictionaryInfo
{

View File

@ -15,8 +15,8 @@ namespace ErrorCodes
}
template <bool with_defaults>
BinaryRowInputFormat<with_defaults>::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(
BinaryRowInputFormat<with_defaults>::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, IRowInputFormat::Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes<BinaryFormatReader<with_defaults>>(
header,
in_,
params_,

View File

@ -10,13 +10,16 @@ namespace DB
class ReadBuffer;
template <bool>
class BinaryFormatReader;
/** A stream for inputting data in a binary line-by-line format.
*/
template <bool with_defaults = false>
class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes
class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes<BinaryFormatReader<with_defaults>>
{
public:
BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
BinaryRowInputFormat(ReadBuffer & in_, const Block & header, IRowInputFormat::Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
String getName() const override { return "BinaryRowInputFormat"; }

View File

@ -61,7 +61,7 @@ CSVRowInputFormat::CSVRowInputFormat(
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_)
std::unique_ptr<CSVFormatReader> format_reader_)
: RowInputFormatWithNamesAndTypes(
header_,
*in_,

View File

@ -1,7 +1,6 @@
#pragma once
#include <optional>
#include <unordered_map>
#include <Core/Block.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
@ -13,10 +12,12 @@
namespace DB
{
class CSVFormatReader;
/** A stream for inputting data in csv format.
* Does not conform with https://tools.ietf.org/html/rfc4180 because it skips spaces and tabs between values.
*/
class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes
class CSVRowInputFormat : public RowInputFormatWithNamesAndTypes<CSVFormatReader>
{
public:
/** with_names - in the first line the header with column names
@ -32,7 +33,7 @@ public:
protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_, std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_);
bool with_names_, bool with_types_, const FormatSettings & format_settings_, std::unique_ptr<CSVFormatReader> format_reader_);
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_buf_, const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_);

View File

@ -9,7 +9,8 @@
namespace DB
{
class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
class CustomSeparatedFormatReader;
class CustomSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes<CustomSeparatedFormatReader>
{
public:
CustomSeparatedRowInputFormat(

View File

@ -11,7 +11,7 @@ namespace DB
{
class ReadBuffer;
class JSONCompactEachRowFormatReader;
/** A stream for reading data in a bunch of formats:
* - JSONCompactEachRow
@ -20,7 +20,7 @@ class ReadBuffer;
* - JSONCompactStringsEachRowWithNamesAndTypes
*
*/
class JSONCompactEachRowRowInputFormat final : public RowInputFormatWithNamesAndTypes
class JSONCompactEachRowRowInputFormat final : public RowInputFormatWithNamesAndTypes<JSONCompactEachRowFormatReader>
{
public:
JSONCompactEachRowRowInputFormat(

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
JSONCompactRowInputFormat::JSONCompactRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes(
: RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>(
header_, in_, params_, false, false, false, format_settings_, std::make_unique<JSONCompactFormatReader>(in_, format_settings_))
{
}

View File

@ -5,8 +5,8 @@
namespace DB
{
class JSONCompactRowInputFormat final : public RowInputFormatWithNamesAndTypes
class JSONCompactFormatReader;
class JSONCompactRowInputFormat final : public RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>
{
public:
JSONCompactRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);

View File

@ -1002,7 +1002,7 @@ NamesAndTypesList NativeORCSchemaReader::readSchema()
header.insert(ColumnWithTypeAndName{type, name});
}
if (format_settings.schema_inference_make_columns_nullable)
if (format_settings.schema_inference_make_columns_nullable == 1)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}

View File

@ -160,8 +160,11 @@ NamesAndTypesList ORCSchemaReader::readSchema()
{
initializeIfNeeded();
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "ORC", format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
*schema,
"ORC",
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference,
format_settings.schema_inference_make_columns_nullable != 0);
if (format_settings.schema_inference_make_columns_nullable == 1)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}

View File

@ -869,8 +869,11 @@ NamesAndTypesList ParquetSchemaReader::readSchema()
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "Parquet", format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
*schema,
"Parquet",
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference,
format_settings.schema_inference_make_columns_nullable != 0);
if (format_settings.schema_inference_make_columns_nullable == 1)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}

View File

@ -10,9 +10,11 @@
namespace DB
{
class TabSeparatedFormatReader;
/** A stream to input data in tsv format.
*/
class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes
class TabSeparatedRowInputFormat final : public RowInputFormatWithNamesAndTypes<TabSeparatedFormatReader>
{
public:
/** with_names - the first line is the header with the names of the columns

View File

@ -1,14 +1,20 @@
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
#include <Processors/Formats/ISchemaReader.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/PeekableReadBuffer.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <Formats/EscapingRuleUtils.h>
#include <IO/Operators.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/Formats/Impl/BinaryRowInputFormat.h>
#include <Processors/Formats/Impl/CSVRowInputFormat.h>
#include <Processors/Formats/Impl/CustomSeparatedRowInputFormat.h>
#include <Processors/Formats/Impl/HiveTextRowInputFormat.h>
#include <Processors/Formats/Impl/JSONCompactRowInputFormat.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <Processors/Formats/RowInputFormatWithNamesAndTypes.h>
namespace DB
@ -44,7 +50,8 @@ namespace
}
}
RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
template <typename FormatReaderImpl>
RowInputFormatWithNamesAndTypes<FormatReaderImpl>::RowInputFormatWithNamesAndTypes(
const Block & header_,
ReadBuffer & in_,
const Params & params_,
@ -52,7 +59,7 @@ RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_,
std::unique_ptr<FormatReaderImpl> format_reader_,
bool try_detect_header_)
: RowInputFormatWithDiagnosticInfo(header_, in_, params_)
, format_settings(format_settings_)
@ -66,7 +73,8 @@ RowInputFormatWithNamesAndTypes::RowInputFormatWithNamesAndTypes(
column_indexes_by_names = getPort().getHeader().getNamesToIndexesMap();
}
void RowInputFormatWithNamesAndTypes::readPrefix()
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::readPrefix()
{
/// Search and remove BOM only in textual formats (CSV, TSV etc), not in binary ones (RowBinary*).
/// Also, we assume that column name or type cannot contain BOM, so, if format has header,
@ -138,7 +146,8 @@ void RowInputFormatWithNamesAndTypes::readPrefix()
}
}
void RowInputFormatWithNamesAndTypes::tryDetectHeader(std::vector<String> & column_names_out, std::vector<String> & type_names_out)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::tryDetectHeader(std::vector<String> & column_names_out, std::vector<String> & type_names_out)
{
auto & read_buf = getReadBuffer();
PeekableReadBuffer * peekable_buf = dynamic_cast<PeekableReadBuffer *>(&read_buf);
@ -201,7 +210,8 @@ void RowInputFormatWithNamesAndTypes::tryDetectHeader(std::vector<String> & colu
peekable_buf->dropCheckpoint();
}
bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadExtension & ext)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::readRow(MutableColumns & columns, RowReadExtension & ext)
{
if (unlikely(end_of_stream))
return false;
@ -280,7 +290,8 @@ bool RowInputFormatWithNamesAndTypes::readRow(MutableColumns & columns, RowReadE
return true;
}
size_t RowInputFormatWithNamesAndTypes::countRows(size_t max_block_size)
template <typename FormatReaderImpl>
size_t RowInputFormatWithNamesAndTypes<FormatReaderImpl>::countRows(size_t max_block_size)
{
if (unlikely(end_of_stream))
return 0;
@ -304,7 +315,8 @@ size_t RowInputFormatWithNamesAndTypes::countRows(size_t max_block_size)
return num_rows;
}
void RowInputFormatWithNamesAndTypes::resetParser()
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_mapping->column_indexes_for_input_fields.clear();
@ -313,7 +325,8 @@ void RowInputFormatWithNamesAndTypes::resetParser()
end_of_stream = false;
}
void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
{
const auto & index = column_mapping->column_indexes_for_input_fields[file_column];
if (index)
@ -328,7 +341,8 @@ void RowInputFormatWithNamesAndTypes::tryDeserializeField(const DataTypePtr & ty
}
}
bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
{
if (in->eof())
{
@ -374,12 +388,14 @@ bool RowInputFormatWithNamesAndTypes::parseRowAndPrintDiagnosticInfo(MutableColu
return format_reader->parseRowEndWithDiagnosticInfo(out);
}
bool RowInputFormatWithNamesAndTypes::isGarbageAfterField(size_t index, ReadBuffer::Position pos)
template <typename FormatReaderImpl>
bool RowInputFormatWithNamesAndTypes<FormatReaderImpl>::isGarbageAfterField(size_t index, ReadBuffer::Position pos)
{
return format_reader->isGarbageAfterField(index, pos);
}
void RowInputFormatWithNamesAndTypes::setReadBuffer(ReadBuffer & in_)
template <typename FormatReaderImpl>
void RowInputFormatWithNamesAndTypes<FormatReaderImpl>::setReadBuffer(ReadBuffer & in_)
{
format_reader->setReadBuffer(in_);
IInputFormat::setReadBuffer(in_);
@ -582,5 +598,12 @@ void FormatWithNamesAndTypesSchemaReader::transformTypesIfNeeded(DB::DataTypePtr
transformInferredTypesIfNeeded(type, new_type, format_settings);
}
template class RowInputFormatWithNamesAndTypes<JSONCompactFormatReader>;
template class RowInputFormatWithNamesAndTypes<JSONCompactEachRowFormatReader>;
template class RowInputFormatWithNamesAndTypes<TabSeparatedFormatReader>;
template class RowInputFormatWithNamesAndTypes<CSVFormatReader>;
template class RowInputFormatWithNamesAndTypes<CustomSeparatedFormatReader>;
template class RowInputFormatWithNamesAndTypes<BinaryFormatReader<true>>;
template class RowInputFormatWithNamesAndTypes<BinaryFormatReader<false>>;
}

View File

@ -26,6 +26,7 @@ class FormatWithNamesAndTypesReader;
/// will be compared types from header.
/// It's important that firstly this class reads/skips names and only
/// then reads/skips types. So you can this invariant.
template <typename FormatReaderImpl>
class RowInputFormatWithNamesAndTypes : public RowInputFormatWithDiagnosticInfo
{
protected:
@ -41,7 +42,7 @@ protected:
bool with_names_,
bool with_types_,
const FormatSettings & format_settings_,
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader_,
std::unique_ptr<FormatReaderImpl> format_reader_,
bool try_detect_header_ = false);
void resetParser() override;
@ -70,7 +71,7 @@ private:
bool is_header_detected = false;
protected:
std::unique_ptr<FormatWithNamesAndTypesReader> format_reader;
std::unique_ptr<FormatReaderImpl> format_reader;
Block::NameMap column_indexes_by_names;
};

View File

@ -255,7 +255,7 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional<ActionsDAG> & dag, Fi
/// Add more functions to fixed columns.
/// Functions result is fixed if all arguments are fixed or constants.
void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
{
struct Frame
{
@ -300,20 +300,20 @@ void enreachFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
{
if (frame.node->function_base->isDeterministicInScopeOfQuery())
{
//std::cerr << "*** enreachFixedColumns check " << frame.node->result_name << std::endl;
//std::cerr << "*** enrichFixedColumns check " << frame.node->result_name << std::endl;
bool all_args_fixed_or_const = true;
for (const auto * child : frame.node->children)
{
if (!child->column && !fixed_columns.contains(child))
{
//std::cerr << "*** enreachFixedColumns fail " << child->result_name << ' ' << static_cast<const void *>(child) << std::endl;
//std::cerr << "*** enrichFixedColumns fail " << child->result_name << ' ' << static_cast<const void *>(child) << std::endl;
all_args_fixed_or_const = false;
}
}
if (all_args_fixed_or_const)
{
//std::cerr << "*** enreachFixedColumns add " << frame.node->result_name << ' ' << static_cast<const void *>(frame.node) << std::endl;
//std::cerr << "*** enrichFixedColumns add " << frame.node->result_name << ' ' << static_cast<const void *>(frame.node) << std::endl;
fixed_columns.insert(frame.node);
}
}
@ -357,7 +357,7 @@ InputOrderInfoPtr buildInputOrderInfo(
}
}
enreachFixedColumns(sorting_key_dag, fixed_key_columns);
enrichFixedColumns(sorting_key_dag, fixed_key_columns);
}
/// This is a result direction we will read from MergeTree
@ -530,7 +530,7 @@ AggregationInputOrder buildInputOrderInfo(
}
}
enreachFixedColumns(sorting_key_dag, fixed_key_columns);
enrichFixedColumns(sorting_key_dag, fixed_key_columns);
for (const auto * output : dag->getOutputs())
{
@ -804,7 +804,7 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
buildSortingDAG(node, dag, fixed_columns, limit);
if (dag && !fixed_columns.empty())
enreachFixedColumns(*dag, fixed_columns);
enrichFixedColumns(*dag, fixed_columns);
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{
@ -858,7 +858,7 @@ AggregationInputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPl
buildSortingDAG(node, dag, fixed_columns, limit);
if (dag && !fixed_columns.empty())
enreachFixedColumns(*dag, fixed_columns);
enrichFixedColumns(*dag, fixed_columns);
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{

View File

@ -0,0 +1,95 @@
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
#include <Common/TransactionID.h>
#include <Storages/MergeTree/MergeList.h>
namespace DB
{
bool MergeProjectionPartsTask::executeStep()
{
auto & current_level_parts = level_parts[current_level];
auto & next_level_parts = level_parts[next_level];
MergeTreeData::MutableDataPartsVector selected_parts;
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
{
selected_parts.push_back(std::move(current_level_parts.back()));
current_level_parts.pop_back();
}
if (selected_parts.empty())
{
if (next_level_parts.empty())
{
LOG_WARNING(log, "There is no projection parts merged");
/// Task is finished
return false;
}
current_level = next_level;
++next_level;
}
else if (selected_parts.size() == 1)
{
if (next_level_parts.empty())
{
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
selected_parts[0]->renameTo(projection.name + ".proj", true);
selected_parts[0]->setName(projection.name);
selected_parts[0]->is_temp = false;
new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
/// Task is finished
return false;
}
else
{
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
next_level_parts.push_back(std::move(selected_parts[0]));
}
}
else if (selected_parts.size() > 1)
{
// Generate a unique part name
++block_num;
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
MergeTreeData::DataPartsVector const_selected_parts(
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
projection_future_part->assign(std::move(const_selected_parts));
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
projection_future_part->part_info = {"all", 0, 0, 0};
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (projection.type == ProjectionDescription::Type::Aggregate)
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
auto tmp_part_merge_task = mutator->mergePartsToTemporaryPart(
projection_future_part,
projection.metadata,
merge_entry,
std::make_unique<MergeListElement>((*merge_entry)->table_id, projection_future_part, context),
*table_lock_holder,
time_of_merge,
context,
space_reservation,
false, // TODO Do we need deduplicate for projections
{},
false, // no cleanup
projection_merging_params,
NO_TRANSACTION_PTR,
/* need_prefix */ true,
new_data_part.get(),
".tmp_proj");
next_level_parts.push_back(executeHere(tmp_part_merge_task));
next_level_parts.back()->is_temp = true;
}
/// Need execute again
return true;
}
}

View File

@ -0,0 +1,84 @@
#pragma once
#include <Interpreters/StorageID.h>
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/ProjectionsDescription.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class MergeProjectionPartsTask : public IExecutableTask
{
public:
MergeProjectionPartsTask(
String name_,
MergeTreeData::MutableDataPartsVector && parts_,
const ProjectionDescription & projection_,
size_t & block_num_,
ContextPtr context_,
TableLockHolder * table_lock_holder_,
MergeTreeDataMergerMutator * mutator_,
MergeListEntry * merge_entry_,
time_t time_of_merge_,
MergeTreeData::MutableDataPartPtr new_data_part_,
ReservationSharedPtr space_reservation_)
: name(std::move(name_))
, parts(std::move(parts_))
, projection(projection_)
, block_num(block_num_)
, context(context_)
, table_lock_holder(table_lock_holder_)
, mutator(mutator_)
, merge_entry(merge_entry_)
, time_of_merge(time_of_merge_)
, new_data_part(new_data_part_)
, space_reservation(space_reservation_)
, log(getLogger("MergeProjectionPartsTask"))
{
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
level_parts[current_level] = std::move(parts);
}
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool executeStep() override;
private:
String name;
MergeTreeData::MutableDataPartsVector parts;
const ProjectionDescription & projection;
size_t & block_num;
ContextPtr context;
TableLockHolder * table_lock_holder;
MergeTreeDataMergerMutator * mutator;
MergeListEntry * merge_entry;
time_t time_of_merge;
MergeTreeData::MutableDataPartPtr new_data_part;
ReservationSharedPtr space_reservation;
LoggerPtr log;
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
size_t current_level = 0;
size_t next_level = 1;
/// TODO(nikitamikhaylov): make this constant a setting
static constexpr size_t max_parts_to_merge_in_one_level = 10;
};
}

View File

@ -21,6 +21,8 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/FilterTransform.h>
@ -63,6 +65,7 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static ColumnsStatistics getStatisticsForColumns(
const NamesAndTypesList & columns_to_read,
const StorageMetadataPtr & metadata_snapshot)
@ -155,6 +158,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
}
}
for (const auto * projection : global_ctx->projections_to_rebuild)
{
Names projection_columns_vec = projection->getRequiredColumns();
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
std::inserter(key_columns, key_columns.end()));
}
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (const auto & column : global_ctx->storage_columns)
@ -254,6 +264,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
extendObjectColumns(global_ctx->storage_columns, object_columns, false);
global_ctx->storage_snapshot = std::make_shared<StorageSnapshot>(*global_ctx->data, global_ctx->metadata_snapshot, std::move(object_columns));
prepareProjectionsToMergeAndRebuild();
extractMergingAndGatheringColumns();
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
@ -517,6 +529,148 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
{
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
/// users might have projections before this change.
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
return;
/// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
const bool merge_may_reduce_rows =
global_ctx->cleanup ||
global_ctx->deduplicate ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
const auto & projections = global_ctx->metadata_snapshot->getProjections();
for (const auto & projection : projections)
{
if (merge_may_reduce_rows)
{
global_ctx->projections_to_rebuild.push_back(&projection);
continue;
}
MergeTreeData::DataPartsVector projection_parts;
for (const auto & part : global_ctx->future_part->parts)
{
auto it = part->getProjectionParts().find(projection.name);
if (it != part->getProjectionParts().end() && !it->second->is_broken)
projection_parts.push_back(it->second);
}
if (projection_parts.size() == global_ctx->future_part->parts.size())
{
global_ctx->projections_to_merge.push_back(&projection);
global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end());
}
else
{
chassert(projection_parts.size() < global_ctx->future_part->parts.size());
LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name);
continue;
}
}
const auto & settings = global_ctx->context->getSettingsRef();
for (const auto * projection : global_ctx->projections_to_rebuild)
ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(),
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
{
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
{
const auto & projection = *global_ctx->projections_to_rebuild[i];
Block block_to_squash = projection.calculate(block, global_ctx->context);
auto & projection_squash_plan = ctx->projection_squashes[i];
projection_squash_plan.setHeader(block_to_squash.cloneEmpty());
Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()}));
if (squashed_chunk)
{
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
tmp_part.finalize();
tmp_part.part->getDataPartStorage().commitTransaction();
ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
}
}
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalizeProjections() const
{
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
{
const auto & projection = *global_ctx->projections_to_rebuild[i];
auto & projection_squash_plan = ctx->projection_squashes[i];
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
if (squashed_chunk)
{
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part));
}
}
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
constructTaskForProjectionPartsMerge();
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const
{
auto && [name, parts] = *ctx->projection_parts_iterator;
const auto & projection = global_ctx->metadata_snapshot->projections.get(name);
ctx->merge_projection_parts_task_ptr = std::make_unique<MergeProjectionPartsTask>
(
name,
std::move(parts),
projection,
ctx->projection_block_num,
global_ctx->context,
global_ctx->holder,
global_ctx->mutator,
global_ctx->merge_entry,
global_ctx->time_of_merge,
global_ctx->new_data_part,
global_ctx->space_reservation
);
}
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT
{
/// In case if there are no projections we didn't construct a task
if (!ctx->merge_projection_parts_task_ptr)
return false;
if (ctx->merge_projection_parts_task_ptr->executeStep())
return true;
++ctx->projection_parts_iterator;
if (ctx->projection_parts_iterator == std::make_move_iterator(ctx->projection_parts.end()))
return false;
constructTaskForProjectionPartsMerge();
return true;
}
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -535,6 +689,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
global_ctx->rows_written += block.rows();
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
calculateProjections(block);
UInt64 result_rows = 0;
UInt64 result_bytes = 0;
global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes);
@ -558,8 +714,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
return true;
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const
{
finalizeProjections();
global_ctx->merging_executor.reset();
global_ctx->merged_pipeline.reset();
@ -847,35 +1005,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds));
}
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
/// users might have projections before this change.
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
for (const auto & projection : global_ctx->projections_to_merge)
{
ctx->projections_iterator = ctx->tasks_for_projections.begin();
return false;
}
const auto & projections = global_ctx->metadata_snapshot->getProjections();
for (const auto & projection : projections)
{
MergeTreeData::DataPartsVector projection_parts;
for (const auto & part : global_ctx->future_part->parts)
{
auto actual_projection_parts = part->getProjectionParts();
auto it = actual_projection_parts.find(projection.name);
if (it != actual_projection_parts.end() && !it->second->is_broken)
projection_parts.push_back(it->second);
}
if (projection_parts.size() < global_ctx->future_part->parts.size())
{
LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name);
continue;
}
MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name];
LOG_DEBUG(
ctx->log,
"Selected {} projection_parts from {} to {}",
@ -885,7 +1017,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
projection_future_part->assign(std::move(projection_parts));
projection_future_part->name = projection.name;
projection_future_part->name = projection->name;
// TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts.
// Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection.
// projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
@ -893,16 +1025,17 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (projection.type == ProjectionDescription::Type::Aggregate)
if (projection->type == ProjectionDescription::Type::Aggregate)
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
ctx->tasks_for_projections.emplace_back(std::make_shared<MergeTask>(
projection_future_part,
projection.metadata,
projection->metadata,
global_ctx->merge_entry,
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part, global_ctx->context),
global_ctx->time_of_merge,
global_ctx->context,
*global_ctx->holder,
global_ctx->space_reservation,
global_ctx->deduplicate,
global_ctx->deduplicate_by_columns,

View File

@ -9,6 +9,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Interpreters/Squashing.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@ -72,6 +73,7 @@ public:
std::unique_ptr<MergeListElement> projection_merge_list_element_,
time_t time_of_merge_,
ContextPtr context_,
TableLockHolder & holder,
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
@ -96,6 +98,7 @@ public:
= global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr();
global_ctx->time_of_merge = std::move(time_of_merge_);
global_ctx->context = std::move(context_);
global_ctx->holder = &holder;
global_ctx->space_reservation = std::move(space_reservation_);
global_ctx->deduplicate = std::move(deduplicate_);
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
@ -151,6 +154,7 @@ private:
/// Proper initialization is responsibility of the author
struct GlobalRuntimeContext : public IStageRuntimeContext
{
TableLockHolder * holder;
MergeList::Entry * merge_entry{nullptr};
/// If not null, use this instead of the global MergeList::Entry. This is for merging projections.
std::unique_ptr<MergeListElement> projection_merge_list_element;
@ -181,6 +185,10 @@ private:
MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided};
std::vector<ProjectionDescriptionRawPtr> projections_to_rebuild{};
std::vector<ProjectionDescriptionRawPtr> projections_to_merge{};
std::map<String, MergeTreeData::DataPartsVector> projections_to_merge_parts{};
std::unique_ptr<MergeStageProgress> horizontal_stage_progress{nullptr};
std::unique_ptr<MergeStageProgress> column_progress{nullptr};
@ -228,6 +236,14 @@ private:
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
std::optional<ColumnSizeEstimator> column_sizes{};
/// For projections to rebuild
using ProjectionNameToItsBlocks = std::map<String, MergeTreeData::MutableDataPartsVector>;
ProjectionNameToItsBlocks projection_parts;
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
std::vector<Squashing> projection_squashes;
size_t projection_block_num = 0;
ExecutableTaskPtr merge_projection_parts_task_ptr;
size_t initial_reservation{0};
bool read_with_direct_io{false};
@ -257,16 +273,23 @@ private:
void finalize() const;
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>;
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 3>;
const ExecuteAndFinalizeHorizontalPartSubtasks subtasks
{
&ExecuteAndFinalizeHorizontalPart::prepare,
&ExecuteAndFinalizeHorizontalPart::executeImpl
&ExecuteAndFinalizeHorizontalPart::executeImpl,
&ExecuteAndFinalizeHorizontalPart::executeMergeProjections
};
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
void prepareProjectionsToMergeAndRebuild() const;
void calculateProjections(const Block & block) const;
void finalizeProjections() const;
void constructTaskForProjectionPartsMerge() const;
bool executeMergeProjections();
MergeAlgorithm chooseMergeAlgorithm() const;
void createMergedStream();
void extractMergingAndGatheringColumns() const;

View File

@ -671,7 +671,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const StorageMetadataPtr & metadata_snapshot,
MergeList::Entry * merge_entry,
std::unique_ptr<MergeListElement> projection_merge_list_element,
TableLockHolder,
TableLockHolder & holder,
time_t time_of_merge,
ContextPtr context,
ReservationSharedPtr space_reservation,
@ -691,6 +691,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
std::move(projection_merge_list_element),
time_of_merge,
context,
holder,
space_reservation,
deduplicate,
deduplicate_by_columns,

View File

@ -159,7 +159,7 @@ public:
const StorageMetadataPtr & metadata_snapshot,
MergeListEntry * merge_entry,
std::unique_ptr<MergeListElement> projection_merge_list_element,
TableLockHolder table_lock_holder,
TableLockHolder & table_lock_holder,
time_t time_of_merge,
ContextPtr context,
ReservationSharedPtr space_reservation,

View File

@ -24,6 +24,7 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeIndexFullText.h>
@ -1058,136 +1059,6 @@ struct MutationContext
using MutationContextPtr = std::shared_ptr<MutationContext>;
class MergeProjectionPartsTask : public IExecutableTask
{
public:
MergeProjectionPartsTask(
String name_,
MergeTreeData::MutableDataPartsVector && parts_,
const ProjectionDescription & projection_,
size_t & block_num_,
MutationContextPtr ctx_)
: name(std::move(name_))
, parts(std::move(parts_))
, projection(projection_)
, block_num(block_num_)
, ctx(ctx_)
, log(getLogger("MergeProjectionPartsTask"))
{
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
level_parts[current_level] = std::move(parts);
}
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool executeStep() override
{
auto & current_level_parts = level_parts[current_level];
auto & next_level_parts = level_parts[next_level];
MergeTreeData::MutableDataPartsVector selected_parts;
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
{
selected_parts.push_back(std::move(current_level_parts.back()));
current_level_parts.pop_back();
}
if (selected_parts.empty())
{
if (next_level_parts.empty())
{
LOG_WARNING(log, "There is no projection parts merged");
/// Task is finished
return false;
}
current_level = next_level;
++next_level;
}
else if (selected_parts.size() == 1)
{
if (next_level_parts.empty())
{
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
selected_parts[0]->renameTo(projection.name + ".proj", true);
selected_parts[0]->setName(projection.name);
selected_parts[0]->is_temp = false;
ctx->new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
/// Task is finished
return false;
}
else
{
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
next_level_parts.push_back(std::move(selected_parts[0]));
}
}
else if (selected_parts.size() > 1)
{
// Generate a unique part name
++block_num;
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
MergeTreeData::DataPartsVector const_selected_parts(
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
projection_future_part->assign(std::move(const_selected_parts));
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
projection_future_part->part_info = {"all", 0, 0, 0};
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (projection.type == ProjectionDescription::Type::Aggregate)
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart(
projection_future_part,
projection.metadata,
ctx->mutate_entry,
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part, ctx->context),
*ctx->holder,
ctx->time_of_mutation,
ctx->context,
ctx->space_reservation,
false, // TODO Do we need deduplicate for projections
{},
false, // no cleanup
projection_merging_params,
NO_TRANSACTION_PTR,
/* need_prefix */ true,
ctx->new_data_part.get(),
".tmp_proj");
next_level_parts.push_back(executeHere(tmp_part_merge_task));
next_level_parts.back()->is_temp = true;
}
/// Need execute again
return true;
}
private:
String name;
MergeTreeData::MutableDataPartsVector parts;
const ProjectionDescription & projection;
size_t & block_num;
MutationContextPtr ctx;
LoggerPtr log;
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
size_t current_level = 0;
size_t next_level = 1;
/// TODO(nikitamikhaylov): make this constant a setting
static constexpr size_t max_parts_to_merge_in_one_level = 10;
};
// This class is responsible for:
// 1. get projection pipeline and a sink to write parts
// 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible)
@ -1406,7 +1277,13 @@ void PartMergerWriter::constructTaskForProjectionPartsMerge()
std::move(parts),
projection,
block_num,
ctx
ctx->context,
ctx->holder,
ctx->mutator,
ctx->mutate_entry,
ctx->time_of_mutation,
ctx->new_data_part,
ctx->space_reservation
);
}

View File

@ -425,8 +425,9 @@ struct DeltaLakeMetadataImpl
{
auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field, "type", required));
auto is_nullable = field->getValue<bool>("nullable");
element_types.push_back(getFieldType(field, "type", is_nullable));
}
return std::make_shared<DataTypeTuple>(element_types, element_names);
@ -434,16 +435,16 @@ struct DeltaLakeMetadataImpl
if (type_name == "array")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto element_type = getFieldType(type, "elementType", is_nullable);
bool element_nullable = type->getValue<bool>("containsNull");
auto element_type = getFieldType(type, "elementType", element_nullable);
return std::make_shared<DataTypeArray>(element_type);
}
if (type_name == "map")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto key_type = getFieldType(type, "keyType", /* is_nullable */false);
auto value_type = getFieldType(type, "valueType", is_nullable);
bool value_nullable = type->getValue<bool>("valueContainsNull");
auto value_type = getFieldType(type, "valueType", value_nullable);
return std::make_shared<DataTypeMap>(key_type, value_type);
}

View File

@ -200,7 +200,6 @@ def test_distributed_replica_max_ignored_errors():
"connect_timeout": 2,
"receive_timeout": 2,
"send_timeout": 2,
"idle_connection_timeout": 2,
"tcp_keep_alive_timeout": 2,
"distributed_replica_max_ignored_errors": 0,
"distributed_replica_error_half_life": 60,

View File

@ -29,6 +29,9 @@ from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from minio.deleteobjects import DeleteObject
import pyarrow as pa
import pyarrow.parquet as pq
from deltalake.writer import write_deltalake
from helpers.s3_tools import (
prepare_s3_bucket,
@ -728,3 +731,96 @@ SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.mini
)
== 1
)
def test_complex_types(started_cluster):
node = started_cluster.instances["node1"]
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
schema = pa.schema(
[
("id", pa.int32()),
("name", pa.string()),
(
"address",
pa.struct(
[
("street", pa.string()),
("city", pa.string()),
("state", pa.string()),
]
),
),
("interests", pa.list_(pa.string())),
(
"metadata",
pa.map_(
pa.string(), pa.string()
), # Map with string keys and string values
),
]
)
# Create sample data
data = [
pa.array([1, 2, 3], type=pa.int32()),
pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()),
pa.array(
[
{"street": "123 Elm St", "city": "Springfield", "state": "IL"},
{"street": "456 Maple St", "city": "Shelbyville", "state": "IL"},
{"street": "789 Oak St", "city": "Ogdenville", "state": "IL"},
],
type=schema.field("address").type,
),
pa.array(
[
pa.array(["dancing", "coding", "hiking"]),
pa.array(["dancing", "coding", "hiking"]),
pa.array(["dancing", "coding", "hiking"]),
],
type=schema.field("interests").type,
),
pa.array(
[
{"key1": "value1", "key2": "value2"},
{"key1": "value3", "key2": "value4"},
{"key1": "value5", "key2": "value6"},
],
type=schema.field("metadata").type,
),
]
endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}"
aws_access_key_id = "minio"
aws_secret_access_key = "minio123"
table_name = randomize_table_name("test_complex_types")
storage_options = {
"AWS_ENDPOINT_URL": endpoint_url,
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
path = f"s3://root/{table_name}"
table = pa.Table.from_arrays(data, schema=schema)
write_deltalake(path, table, storage_options=storage_options)
assert "1\n2\n3\n" in node.query(
f"SELECT id FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
assert (
"('123 Elm St','Springfield','IL')\n('456 Maple St','Shelbyville','IL')\n('789 Oak St','Ogdenville','IL')"
in node.query(
f"SELECT address FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
)
assert (
"{'key1':'value1','key2':'value2'}\n{'key1':'value3','key2':'value4'}\n{'key1':'value5','key2':'value6'}"
in node.query(
f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
)

View File

@ -6,8 +6,8 @@ CREATE DATABASE {CLICKHOUSE_DATABASE:Identifier};
CREATE TABLE {CLICKHOUSE_DATABASE:Identifier}.A (A UInt8) ENGINE = TinyLog;
CREATE TABLE {CLICKHOUSE_DATABASE:Identifier}.B (A UInt8) ENGINE = TinyLog;
SHOW TABLES from {CLICKHOUSE_DATABASE:Identifier};
SHOW TABLES in system where engine like '%System%' and name in ('numbers', 'one');
SHOW TABLES FROM {CLICKHOUSE_DATABASE:Identifier};
SHOW TABLES IN system WHERE engine LIKE '%System%' AND name IN ('numbers', 'one') AND database = 'system';
SELECT name, toUInt32(metadata_modification_time) > 0, engine_full, create_table_query FROM system.tables WHERE database = currentDatabase() ORDER BY name FORMAT TSVRaw;
@ -16,7 +16,7 @@ SELECT name FROM system.tables WHERE is_temporary = 1 AND name = 'test_temporary
CREATE TABLE {CLICKHOUSE_DATABASE:Identifier}.test_log(id UInt64) ENGINE = Log;
CREATE MATERIALIZED VIEW {CLICKHOUSE_DATABASE:Identifier}.test_materialized ENGINE = Log AS SELECT * FROM {CLICKHOUSE_DATABASE:Identifier}.test_log;
SELECT dependencies_database, dependencies_table FROM system.tables WHERE name = 'test_log' and database=currentDatabase();
SELECT dependencies_database, dependencies_table FROM system.tables WHERE name = 'test_log' AND database=currentDatabase();
DROP DATABASE {CLICKHOUSE_DATABASE:Identifier};

View File

@ -18,7 +18,7 @@ desc format(JSONEachRow, '{"x" : [[], [null], [1, 2, 3]]}');
desc format(JSONEachRow, '{"x" : [{"a" : null}, {"b" : 1}]}');
desc format(JSONEachRow, '{"x" : [["2020-01-01", null, "1234"], ["abcd"]]}');
set schema_inference_make_columns_nullable=0;
set schema_inference_make_columns_nullable='auto';
desc format(JSONEachRow, '{"x" : [1, 2]}');
desc format(JSONEachRow, '{"x" : [null, 1]}');
desc format(JSONEachRow, '{"x" : [1, 2]}, {"x" : [3]}');
@ -40,7 +40,7 @@ desc format(JSONCompactEachRow, '[[[], [null], [1, 2, 3]]]');
desc format(JSONCompactEachRow, '[[{"a" : null}, {"b" : 1}]]');
desc format(JSONCompactEachRow, '[[["2020-01-01", null, "1234"], ["abcd"]]]');
set schema_inference_make_columns_nullable=0;
set schema_inference_make_columns_nullable='auto';
desc format(JSONCompactEachRow, '[[1, 2]]');
desc format(JSONCompactEachRow, '[[null, 1]]');
desc format(JSONCompactEachRow, '[[1, 2]], [[3]]');
@ -59,7 +59,7 @@ desc format(CSV, '"[[], [null], [1, 2, 3]]"');
desc format(CSV, '"[{\'a\' : null}, {\'b\' : 1}]"');
desc format(CSV, '"[[\'2020-01-01\', null, \'1234\'], [\'abcd\']]"');
set schema_inference_make_columns_nullable=0;
set schema_inference_make_columns_nullable='auto';
desc format(CSV, '"[1,2]"');
desc format(CSV, '"[NULL, 1]"');
desc format(CSV, '"[1, 2]"\n"[3]"');

View File

@ -1,7 +1,7 @@
desc format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=1;
select * from format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=1;
desc format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=0, input_format_null_as_default=0;
select * from format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=0, input_format_null_as_default=0;
desc format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable='auto', input_format_null_as_default=0;
select * from format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable='auto', input_format_null_as_default=0;
desc format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=0, input_format_null_as_default=1;
select * from format(JSONEachRow, '{"x" : null}, {"x" : 42}') settings schema_inference_make_columns_nullable=0, input_format_null_as_default=1;

View File

@ -0,0 +1,28 @@
ReplacingMergeTree
0 2
1 2
2 2
0 2
1 2
2 2
CollapsingMergeTree
0 2
1 2
2 2
0 2
1 2
2 2
VersionedCollapsingMergeTree
0 2
1 2
2 2
0 2
1 2
2 2
DEDUPLICATE ON MergeTree
0 1
1 1
2 1
0 1
1 1
2 1

View File

@ -0,0 +1,116 @@
SELECT 'ReplacingMergeTree';
DROP TABLE IF EXISTS tp;
CREATE TABLE tp
(
`type` Int32,
`eventcnt` UInt64,
PROJECTION p
(
SELECT type,sum(eventcnt)
GROUP BY type
)
)
ENGINE = ReplacingMergeTree
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number%3, 1 FROM numbers(3);
INSERT INTO tp SELECT number%3, 2 FROM numbers(3);
OPTIMIZE TABLE tp FINAL;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
SELECT 'CollapsingMergeTree';
DROP TABLE IF EXISTS tp;
CREATE TABLE tp
(
`type` Int32,
`eventcnt` UInt64,
`sign` Int8,
PROJECTION p
(
SELECT type,sum(eventcnt)
GROUP BY type
)
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 2, 1 FROM numbers(3);
OPTIMIZE TABLE tp FINAL;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
-- Actually we don't need to test all 3 engines Replacing/Collapsing/VersionedCollapsing,
-- Because they share the same logic of 'reduce number of rows during merges'
SELECT 'VersionedCollapsingMergeTree';
DROP TABLE IF EXISTS tp;
CREATE TABLE tp
(
`type` Int32,
`eventcnt` UInt64,
`sign` Int8,
`version` UInt8,
PROJECTION p
(
SELECT type,sum(eventcnt)
GROUP BY type
)
)
ENGINE = VersionedCollapsingMergeTree(sign,version)
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 1, 1, 0 FROM numbers(3);
OPTIMIZE TABLE tp FINAL;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
SELECT 'DEDUPLICATE ON MergeTree';
DROP TABLE IF EXISTS tp;
CREATE TABLE tp
(
`type` Int32,
`eventcnt` UInt64,
PROJECTION p
(
SELECT type,sum(eventcnt)
GROUP BY type
)
)
ENGINE = MergeTree
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 2 FROM numbers(3);
OPTIMIZE TABLE tp FINAL DEDUPLICATE BY type;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;

View File

@ -4,6 +4,8 @@ set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set allow_experimental_dynamic_type = 1;
set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken
drop table if exists test;
create table test (id UInt64, d Dynamic(max_types=2)) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;

View File

@ -5,6 +5,8 @@ set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set allow_experimental_dynamic_type = 1;
set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken
drop table if exists test;
create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;

View File

@ -1,40 +1,66 @@
Parquet
a UInt64
a_nullable Nullable(UInt64)
a UInt64
a_nullable UInt64
Arrow
a UInt64
a_nullable Nullable(UInt64)
a UInt64
a_nullable UInt64
Parquet
b Array(UInt64)
b_nullable Array(Nullable(UInt64))
b Array(UInt64)
b_nullable Array(UInt64)
Arrow
b Array(Nullable(UInt64))
b_nullable Array(Nullable(UInt64))
b Array(UInt64)
b_nullable Array(UInt64)
Parquet
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a UInt64,\n b String)
Arrow
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a UInt64,\n b String)
Parquet
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String)))
Arrow
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String)))
Parquet
e Map(UInt64, String)
e_nullable Map(UInt64, Nullable(String))
e Map(UInt64, String)
e_nullable Map(UInt64, String)
Arrow
e Map(UInt64, Nullable(String))
e_nullable Map(UInt64, Nullable(String))
e Map(UInt64, String)
e_nullable Map(UInt64, String)
Parquet
f Map(UInt64, Map(UInt64, String))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
f Map(UInt64, Map(UInt64, String))
f_nullables Map(UInt64, Map(UInt64, String))
Arrow
f Map(UInt64, Map(UInt64, Nullable(String)))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
f Map(UInt64, Map(UInt64, String))
f_nullables Map(UInt64, Map(UInt64, String))
Parquet
g String
g_nullable Nullable(String)
g String
g_nullable String
Arrow
g LowCardinality(String)
g_nullable LowCardinality(String)
g LowCardinality(String)
g_nullable LowCardinality(String)

View File

@ -14,6 +14,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('a UInt64, a_nullable Nullable(UInt64)', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -21,6 +22,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('b Array(UInt64), b_nullable Array(Nullable(UInt64))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -28,6 +30,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('c Tuple(a UInt64, b String), c_nullable Tuple(a Nullable(UInt64), b Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -35,6 +38,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('d Tuple(a UInt64, b Tuple(a UInt64, b String), d_nullable Tuple(a UInt64, b Tuple(a Nullable(UInt64), b Nullable(String))))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -42,6 +46,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('e Map(UInt64, String), e_nullable Map(UInt64, Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -49,6 +54,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('f Map(UInt64, Map(UInt64, String)), f_nullables Map(UInt64, Map(UInt64, Nullable(String)))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
@ -56,6 +62,7 @@ for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('g LowCardinality(String), g_nullable LowCardinality(Nullable(String))', 42) limit 10 settings output_format_arrow_low_cardinality_as_dictionary=1, allow_suspicious_low_cardinality_types=1 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 'auto'"
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done

View File

@ -5,6 +5,8 @@ set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set session_timezone = 'UTC';
set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken
drop table if exists test;
create table test (id UInt64, json JSON(max_dynamic_paths=2, a.b.c UInt32)) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;

View File

@ -4,6 +4,8 @@ set allow_experimental_json_type = 1;
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set min_bytes_to_use_direct_io = 0; -- min_bytes_to_use_direct_io > 0 is broken
create table test (id UInt64, json JSON(max_dynamic_paths=8, a.b Array(JSON))) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;
insert into test select number, '{}' from numbers(10000);

View File

@ -1,2 +1,2 @@
x Nullable(Int64)
schema_inference_hints=, max_rows_to_read_for_schema_inference=25000, max_bytes_to_read_for_schema_inference=1000, schema_inference_make_columns_nullable=true, try_infer_integers=true, try_infer_dates=true, try_infer_datetimes=true, try_infer_datetimes_only_datetime64=false, try_infer_numbers_from_strings=false, read_bools_as_numbers=true, read_bools_as_strings=true, read_objects_as_strings=true, read_numbers_as_strings=true, read_arrays_as_strings=true, try_infer_objects_as_tuples=true, infer_incomplete_types_as_strings=true, try_infer_objects=false, use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects=false
schema_inference_hints=, max_rows_to_read_for_schema_inference=25000, max_bytes_to_read_for_schema_inference=1000, schema_inference_make_columns_nullable=1, try_infer_integers=true, try_infer_dates=true, try_infer_datetimes=true, try_infer_datetimes_only_datetime64=false, try_infer_numbers_from_strings=false, read_bools_as_numbers=true, read_bools_as_strings=true, read_objects_as_strings=true, read_numbers_as_strings=true, read_arrays_as_strings=true, try_infer_objects_as_tuples=true, infer_incomplete_types_as_strings=true, try_infer_objects=false, use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects=false

View File

@ -0,0 +1,5 @@
37F332F68DB77BD9D7EDD4969571AD671CF9DD3B
132072DF690933835EB8B6AD0B77E7B6F14ACAD7
9C1185A5C5E9FC54612808977EE8F548B2258D31
13920F39C93D503A0AC02EAB9AA8F672BC523ADA
3FEDF0C212CCFA54C0EBA676C8A8A2A10BC218BE

View File

@ -0,0 +1,12 @@
-- Tags: no-fasttest
-- Ouput can be verified using: https://emn178.github.io/online-tools/ripemd-160/
SELECT hex(ripeMD160('The quick brown fox jumps over the lazy dog'));
SELECT hex(ripeMD160('The quick brown fox jumps over the lazy cog'));
SELECT hex(ripeMD160(''));
SELECT hex(ripeMD160('CheREpaha1512'));
SELECT hex(ripeMD160('A-very-long-string-that-should-be-hashed-using-ripeMD160'));

View File

@ -1,4 +1,4 @@
personal_ws-1.1 en 2942
personal_ws-1.1 en 2983
AArch
ACLs
ALTERs
@ -975,6 +975,7 @@ ThreadPoolRemoteFSReaderThreads
ThreadPoolRemoteFSReaderThreadsActive
ThreadsActive
ThreadsInOvercommitTracker
TimeSeries
Timeunit
TinyLog
Tkachenko
@ -1116,12 +1117,12 @@ addressToLineWithInlines
addressToSymbol
adviced
agg
aggThrow
aggregatefunction
aggregatingmergetree
aggregatio
aggretate
aggthrow
aggThrow
aiochclient
allocator
alphaTokens
@ -1895,8 +1896,8 @@ joinGet
joinGetOrNull
json
jsonMergePatch
jsonasstring
jsonasobject
jsonasstring
jsoncolumns
jsoncolumnsmonoblock
jsoncompact
@ -1937,8 +1938,8 @@ kurtSamp
kurtosis
kurtpop
kurtsamp
laion
lagInFrame
laion
lang
laravel
largestTriangleThreeBuckets
@ -2040,7 +2041,6 @@ maxMap
maxintersections
maxintersectionsposition
maxmap
minMappedArrays
maxmind
mdadm
meanZTest
@ -2237,8 +2237,8 @@ parseReadableSizeOrZero
parseTimeDelta
parseable
parsers
partitionId
partitionID
partitionId
pathFull
pclmulqdq
pcre
@ -2467,6 +2467,7 @@ rewritable
rightPad
rightPadUTF
rightUTF
ripeMD
risc
riscv
ro
@ -2718,7 +2719,6 @@ themself
threadpool
throwIf
timeDiff
TimeSeries
timeSeriesData
timeSeriesMetrics
timeSeriesTags