Merge branch 'master' into feature/mergetree-checksum-big-endian-support

This commit is contained in:
ltrk2 2023-07-19 16:22:58 -04:00 committed by GitHub
commit a753c3c6ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 131 additions and 17 deletions

View File

@ -76,6 +76,7 @@ The supported formats are:
| [RowBinary](#rowbinary) | ✔ | ✔ | | [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ | | [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ | | [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithDefaults](#rowbinarywithdefaults) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ | | [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ | | [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ | | [XML](#xml) | ✗ | ✔ |
@ -1515,6 +1516,23 @@ If setting [input_format_with_types_use_header](/docs/en/operations/settings/set
the types from input data will be compared with the types of the corresponding columns from the table. Otherwise, the second row will be skipped. the types from input data will be compared with the types of the corresponding columns from the table. Otherwise, the second row will be skipped.
::: :::
## RowBinaryWithDefaults {#rowbinarywithdefaults}
Similar to [RowBinary](#rowbinary), but with an extra byte before each column that indicates if default value should be used.
Examples:
```sql
:) select * from format('RowBinaryWithDefaults', 'x UInt32 default 42, y UInt32', x'010001000000')
┌──x─┬─y─┐
│ 42 │ 1 │
└────┴───┘
```
For column `x` there is only one byte `01` that indicates that default value should be used and no other data after this byte is provided.
For column `y` data starts with byte `00` that indicates that column has actual value that should be read from the subsequent data `01000000`.
## RowBinary format settings {#row-binary-format-settings} ## RowBinary format settings {#row-binary-format-settings}
- [format_binary_max_string_size](/docs/en/operations/settings/settings-formats.md/#format_binary_max_string_size) - The maximum allowed size for String in RowBinary format. Default value - `1GiB`. - [format_binary_max_string_size](/docs/en/operations/settings/settings-formats.md/#format_binary_max_string_size) - The maximum allowed size for String in RowBinary format. Default value - `1GiB`.

View File

@ -118,7 +118,10 @@ bool PredicateExpressionsOptimizer::tryRewritePredicatesToTables(ASTs & tables_e
if (table_element->table_join && isLeft(table_element->table_join->as<ASTTableJoin>()->kind)) if (table_element->table_join && isLeft(table_element->table_join->as<ASTTableJoin>()->kind))
continue; /// Skip right table optimization continue; /// Skip right table optimization
if (table_element->table_join && isFull(table_element->table_join->as<ASTTableJoin>()->kind)) if (table_element->table_join && (
isFull(table_element->table_join->as<ASTTableJoin>()->kind)
|| table_element->table_join->as<ASTTableJoin>()->strictness == JoinStrictness::Asof
|| table_element->table_join->as<ASTTableJoin>()->strictness == JoinStrictness::Anti))
break; /// Skip left and right table optimization break; /// Skip left and right table optimization
is_rewrite_tables |= tryRewritePredicatesToTable(tables_element[table_pos], tables_predicates[table_pos], is_rewrite_tables |= tryRewritePredicatesToTable(tables_element[table_pos], tables_predicates[table_pos],

View File

@ -13,7 +13,8 @@ namespace ErrorCodes
extern const int CANNOT_SKIP_UNKNOWN_FIELD; extern const int CANNOT_SKIP_UNKNOWN_FIELD;
} }
BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_) template <bool with_defaults>
BinaryRowInputFormat<with_defaults>::BinaryRowInputFormat(ReadBuffer & in_, const Block & header, Params params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: RowInputFormatWithNamesAndTypes( : RowInputFormatWithNamesAndTypes(
header, header,
in_, in_,
@ -22,16 +23,17 @@ BinaryRowInputFormat::BinaryRowInputFormat(ReadBuffer & in_, const Block & heade
with_names_, with_names_,
with_types_, with_types_,
format_settings_, format_settings_,
std::make_unique<BinaryFormatReader>(in_, format_settings_)) std::make_unique<BinaryFormatReader<with_defaults>>(in_, format_settings_))
{ {
} }
template <bool with_defaults>
BinaryFormatReader::BinaryFormatReader(ReadBuffer & in_, const FormatSettings & format_settings_) : FormatWithNamesAndTypesReader(in_, format_settings_) BinaryFormatReader<with_defaults>::BinaryFormatReader(ReadBuffer & in_, const FormatSettings & format_settings_) : FormatWithNamesAndTypesReader(in_, format_settings_)
{ {
} }
std::vector<String> BinaryFormatReader::readHeaderRow() template <bool with_defaults>
std::vector<String> BinaryFormatReader<with_defaults>::readHeaderRow()
{ {
std::vector<String> fields; std::vector<String> fields;
String field; String field;
@ -43,13 +45,15 @@ std::vector<String> BinaryFormatReader::readHeaderRow()
return fields; return fields;
} }
std::vector<String> BinaryFormatReader::readNames() template <bool with_defaults>
std::vector<String> BinaryFormatReader<with_defaults>::readNames()
{ {
readVarUInt(read_columns, *in); readVarUInt(read_columns, *in);
return readHeaderRow(); return readHeaderRow();
} }
std::vector<String> BinaryFormatReader::readTypes() template <bool with_defaults>
std::vector<String> BinaryFormatReader<with_defaults>::readTypes()
{ {
auto types = readHeaderRow(); auto types = readHeaderRow();
for (const auto & type_name : types) for (const auto & type_name : types)
@ -57,26 +61,40 @@ std::vector<String> BinaryFormatReader::readTypes()
return types; return types;
} }
bool BinaryFormatReader::readField(IColumn & column, const DataTypePtr & /*type*/, const SerializationPtr & serialization, bool /*is_last_file_column*/, const String & /*column_name*/) template <bool with_defaults>
bool BinaryFormatReader<with_defaults>::readField(IColumn & column, const DataTypePtr & /*type*/, const SerializationPtr & serialization, bool /*is_last_file_column*/, const String & /*column_name*/)
{ {
if constexpr (with_defaults)
{
UInt8 is_default;
readBinary(is_default, *in);
if (is_default)
{
column.insertDefault();
return false;
}
}
serialization->deserializeBinary(column, *in, format_settings); serialization->deserializeBinary(column, *in, format_settings);
return true; return true;
} }
void BinaryFormatReader::skipHeaderRow() template <bool with_defaults>
void BinaryFormatReader<with_defaults>::skipHeaderRow()
{ {
String tmp; String tmp;
for (size_t i = 0; i < read_columns; ++i) for (size_t i = 0; i < read_columns; ++i)
readStringBinary(tmp, *in); readStringBinary(tmp, *in);
} }
void BinaryFormatReader::skipNames() template <bool with_defaults>
void BinaryFormatReader<with_defaults>::skipNames()
{ {
readVarUInt(read_columns, *in); readVarUInt(read_columns, *in);
skipHeaderRow(); skipHeaderRow();
} }
void BinaryFormatReader::skipTypes() template <bool with_defaults>
void BinaryFormatReader<with_defaults>::skipTypes()
{ {
if (read_columns == 0) if (read_columns == 0)
{ {
@ -87,7 +105,8 @@ void BinaryFormatReader::skipTypes()
skipHeaderRow(); skipHeaderRow();
} }
void BinaryFormatReader::skipField(size_t file_column) template <bool with_defaults>
void BinaryFormatReader<with_defaults>::skipField(size_t file_column)
{ {
if (file_column >= read_data_types.size()) if (file_column >= read_data_types.size())
throw Exception(ErrorCodes::CANNOT_SKIP_UNKNOWN_FIELD, throw Exception(ErrorCodes::CANNOT_SKIP_UNKNOWN_FIELD,
@ -111,12 +130,21 @@ void registerInputFormatRowBinary(FormatFactory & factory)
const IRowInputFormat::Params & params, const IRowInputFormat::Params & params,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<BinaryRowInputFormat>(buf, sample, params, with_names, with_types, settings); return std::make_shared<BinaryRowInputFormat<false>>(buf, sample, params, with_names, with_types, settings);
}); });
}; };
registerWithNamesAndTypes("RowBinary", register_func); registerWithNamesAndTypes("RowBinary", register_func);
factory.registerFileExtension("bin", "RowBinary"); factory.registerFileExtension("bin", "RowBinary");
factory.registerInputFormat("RowBinaryWithDefaults", [](
ReadBuffer & buf,
const Block & sample,
const IRowInputFormat::Params & params,
const FormatSettings & settings)
{
return std::make_shared<BinaryRowInputFormat<true>>(buf, sample, params, false, false, settings);
});
} }
void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory) void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory)
@ -125,6 +153,8 @@ void registerRowBinaryWithNamesAndTypesSchemaReader(FormatFactory & factory)
{ {
return std::make_shared<BinaryWithNamesAndTypesSchemaReader>(buf, settings); return std::make_shared<BinaryWithNamesAndTypesSchemaReader>(buf, settings);
}); });
} }

View File

@ -12,6 +12,7 @@ class ReadBuffer;
/** A stream for inputting data in a binary line-by-line format. /** A stream for inputting data in a binary line-by-line format.
*/ */
template <bool with_defaults = false>
class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes class BinaryRowInputFormat final : public RowInputFormatWithNamesAndTypes
{ {
public: public:
@ -25,6 +26,7 @@ public:
std::string getDiagnosticInfo() override { return {}; } std::string getDiagnosticInfo() override { return {}; }
}; };
template <bool with_defaults = false>
class BinaryFormatReader final : public FormatWithNamesAndTypesReader class BinaryFormatReader final : public FormatWithNamesAndTypesReader
{ {
public: public:
@ -54,7 +56,7 @@ public:
BinaryWithNamesAndTypesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_); BinaryWithNamesAndTypesSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
private: private:
BinaryFormatReader reader; BinaryFormatReader<false> reader;
}; };
} }

View File

@ -10,6 +10,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
@ -72,7 +73,17 @@ Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr cont
auto read_buf = std::make_unique<ReadBufferFromString>(data); auto read_buf = std::make_unique<ReadBufferFromString>(data);
auto input_format = context->getInputFormat(format, *read_buf, block, context->getSettingsRef().max_block_size); auto input_format = context->getInputFormat(format, *read_buf, block, context->getSettingsRef().max_block_size);
auto pipeline = std::make_unique<QueryPipeline>(input_format); QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
if (columns.hasDefaults())
{
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingDefaultsTransform>(header, columns, *input_format, context);
});
}
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline); auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
std::vector<Block> blocks; std::vector<Block> blocks;

View File

@ -36,7 +36,7 @@ ${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r1"
${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r2" ${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r2"
# Should be empty, but in case of problems we will see some diagnostics # Should be empty, but in case of problems we will see some diagnostics
${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.replication_queue WHERE table like 'kill_mutation_r%'" ${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.replication_queue WHERE database = '$CLICKHOUSE_DATABASE' AND table like 'kill_mutation_r%'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE kill_mutation_r1 DELETE WHERE toUInt32(s) = 1" ${CLICKHOUSE_CLIENT} --query="ALTER TABLE kill_mutation_r1 DELETE WHERE toUInt32(s) = 1"
@ -57,6 +57,14 @@ $CLICKHOUSE_CLIENT --query="SELECT count() FROM system.mutations WHERE database
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = '$CLICKHOUSE_DATABASE' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'" ${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = '$CLICKHOUSE_DATABASE' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
# Wait for the 1st mutation to be actually killed and the 2nd to finish
query_result=$($CLICKHOUSE_CLIENT --query="$check_query1" 2>&1)
while [ "$query_result" != "0" ]
do
query_result=$($CLICKHOUSE_CLIENT --query="$check_query1" 2>&1)
sleep 0.5
done
${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r1" ${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r1"
${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r2" ${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA kill_mutation_r2"

View File

@ -33,3 +33,4 @@
1 3 1 4 1 3 1 4
2 1 2 3 2 1 2 3
2 2 2 3 2 2 2 3
1 2 1 2

View File

@ -23,5 +23,10 @@ SELECT count() FROM A ASOF JOIN B ON A.a == B.b AND A.t != B.t; -- { serverError
SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t OR A.a == B.b + 1 ORDER BY (A.a, A.t); -- { serverError 48 } SELECT A.a, A.t, B.b, B.t FROM A ASOF JOIN B ON A.a == B.b AND A.t < B.t OR A.a == B.b + 1 ORDER BY (A.a, A.t); -- { serverError 48 }
SELECT A.a, A.t, B.b, B.t FROM A
ASOF INNER JOIN (SELECT * FROM B UNION ALL SELECT 1, 3) AS B ON B.t <= A.t AND A.a == B.b
WHERE B.t != 3 ORDER BY (A.a, A.t)
;
DROP TABLE A; DROP TABLE A;
DROP TABLE B; DROP TABLE B;

View File

@ -0,0 +1,3 @@
{"dd":"2023-06-24 00:00:00"}
{"dd":"2023-06-24 00:00:00"} 2023-06-24 00:00:00
{"result_date":"2023-08-24"}

View File

@ -0,0 +1,18 @@
with c as ( select 1 ID, toDate('2023-06-24') dt, 0 p ) select multiIf(t.ID = 1, formatRowNoNewline('JSONEachRow', dd), '') AS params from (select ID, case when p = 0 then toString(date_add(hour, p, dt)) else '2022-01-01' end as dd from c) t;
with c as ( select 1 ID, toDate('2023-06-24') dt, 0 p ) select multiIf(t.ID = 1, formatRowNoNewline('JSONEachRow', dd), '') AS params, dd from (select ID, case when p = 0 then toString(date_add(hour, p, dt)) else '2022-01-01' end as dd from c) t;
select
if(
outer_table.condition_value = 1,
formatRowNoNewline('JSONEachRow', outer_table.result_date),
''
) as json
from (
select
1 as condition_value,
date_add(month, inner_table.offset, toDate('2023-06-24')) as result_date
from (
select
2 as offset
) inner_table
) outer_table;

View File

@ -0,0 +1,6 @@
42
1
42
1
\N
[(42,42)]

View File

@ -0,0 +1,7 @@
select * from format('RowBinaryWithDefaults', 'x UInt32 default 42', x'01');
select * from format('RowBinaryWithDefaults', 'x UInt32 default 42', x'0001000000');
select * from format('RowBinaryWithDefaults', 'x Nullable(UInt32) default 42', x'01');
select * from format('RowBinaryWithDefaults', 'x Nullable(UInt32) default 42', x'000001000000');
select * from format('RowBinaryWithDefaults', 'x Nullable(UInt32) default 42', x'0001');
select * from format('RowBinaryWithDefaults', 'x Array(Tuple(UInt32, UInt32)) default [(42, 42)]', x'01');

View File

@ -762,6 +762,7 @@ Rollup
RowBinary RowBinary
RowBinaryWithNames RowBinaryWithNames
RowBinaryWithNamesAndTypes RowBinaryWithNamesAndTypes
RowBinaryWithDefaults
Runtime Runtime
SATA SATA
SELECTs SELECTs
@ -2125,6 +2126,7 @@ rowNumberInBlock
rowbinary rowbinary
rowbinarywithnames rowbinarywithnames
rowbinarywithnamesandtypes rowbinarywithnamesandtypes
rowbinarywithdefaults
rsync rsync
rsyslog rsyslog
runnable runnable