Merge remote-tracking branch 'origin/master' into HEAD

This commit is contained in:
Alexander Kuzmenkov 2020-11-19 15:28:37 +03:00
commit 24293ccb30
47 changed files with 934 additions and 49 deletions

View File

@ -20,7 +20,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
External dictionary structure consists of attributes. Dictionary attributes are specified similarly to table columns. The only required attribute property is its type, all other properties may have default values.

View File

@ -291,7 +291,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
Crear [diccionario externo](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) con dado [estructura](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [fuente](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [diseño](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) y [vida](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).

View File

@ -291,7 +291,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
ایجاد [فرهنگ لغت خارجی](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) با توجه به [ساختار](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [متن](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [طرحبندی](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) و [طول عمر](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).

View File

@ -291,7 +291,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
Crée [externe dictionnaire](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) avec le [structure](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [source](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [disposition](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) et [vie](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).

View File

@ -291,7 +291,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
作成 [外部辞書](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) 与えられたと [構造](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [ソース](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [レイアウト](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) と [生涯](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).

View File

@ -16,7 +16,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
Создаёт [внешний словарь](../../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) с заданной [структурой](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [источником](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [способом размещения в памяти](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) и [периодом обновления](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).
@ -27,5 +27,5 @@ LIFETIME([MIN val1] MAX val2)
Смотрите [Внешние словари](../../../sql-reference/dictionaries/external-dictionaries/external-dicts.md).
[Оригинальная статья](https://clickhouse.tech/docs/ru/sql-reference/statements/create/dictionary)
[Оригинальная статья](https://clickhouse.tech/docs/ru/sql-reference/statements/create/dictionary)
<!--hide-->

View File

@ -291,7 +291,7 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```
Oluşturuyor [dış sözlük](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md) verilen ile [yapılı](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md), [kaynaklı](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md), [düzen](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md) ve [ömür](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-lifetime.md).

View File

@ -259,5 +259,5 @@ CREATE DICTIONARY [IF NOT EXISTS] [db.]dictionary_name [ON CLUSTER cluster]
PRIMARY KEY key1, key2
SOURCE(SOURCE_NAME([param1 value1 ... paramN valueN]))
LAYOUT(LAYOUT_NAME([param_name param_value]))
LIFETIME([MIN val1] MAX val2)
LIFETIME({MIN min_val MAX max_val | max_val})
```

View File

@ -138,4 +138,12 @@ void ColumnConst::updateWeakHash32(WeakHash32 & hash) const
value = intHashCRC32(data_hash, value);
}
void ColumnConst::compareColumn(
const IColumn & rhs, size_t, PaddedPODArray<UInt64> *, PaddedPODArray<Int8> & compare_results, int, int nan_direction_hint)
const
{
Int8 res = compareAt(1, 1, rhs, nan_direction_hint);
std::fill(compare_results.begin(), compare_results.end(), res);
}
}

View File

@ -199,11 +199,7 @@ public:
void compareColumn(const IColumn & rhs, size_t rhs_row_num,
PaddedPODArray<UInt64> * row_indexes, PaddedPODArray<Int8> & compare_results,
int direction, int nan_direction_hint) const override
{
return data->compareColumn(rhs, rhs_row_num, row_indexes,
compare_results, direction, nan_direction_hint);
}
int direction, int nan_direction_hint) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;

View File

@ -442,6 +442,7 @@ class IColumn;
\
M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \
M(Bool, output_format_json_named_tuples_as_objects, false, "Serialize named tuple columns as JSON objects.", 0) \
M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \
\
M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \

View File

@ -17,13 +17,14 @@
namespace DB
{
namespace ErrorCodes
{
}
static const size_t MAX_CONNECTIONS = 16;
inline static UInt16 getPortFromContext(const Context & context, bool secure)
{
return secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort();
}
static ConnectionPoolWithFailoverPtr createPool(
const std::string & host,
UInt16 port,
@ -59,10 +60,10 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
const std::string & default_database)
: update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, host{config.getString(config_prefix + ".host")}
, port(config.getInt(config_prefix + ".port"))
, secure(config.getBool(config_prefix + ".secure", false))
, user{config.getString(config_prefix + ".user", "")}
, host{config.getString(config_prefix + ".host", "localhost")}
, port(config.getInt(config_prefix + ".port", getPortFromContext(context_, secure)))
, user{config.getString(config_prefix + ".user", "default")}
, password{config.getString(config_prefix + ".password", "")}
, db{config.getString(config_prefix + ".db", default_database)}
, table{config.getString(config_prefix + ".table")}
@ -72,7 +73,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(context_)
, is_local{isLocalAddress({host, port}, secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort())}
, is_local{isLocalAddress({host, port}, getPortFromContext(context_, secure))}
, pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)}
, load_all_query{query_builder.composeLoadAllQuery()}
{
@ -92,9 +93,9 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: update_time{other.update_time}
, dict_struct{other.dict_struct}
, secure{other.secure}
, host{other.host}
, port{other.port}
, secure{other.secure}
, user{other.user}
, password{other.password}
, db{other.db}

View File

@ -61,9 +61,9 @@ private:
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const bool secure;
const std::string host;
const UInt16 port;
const bool secure;
const std::string user;
const std::string password;
const std::string db;

View File

@ -78,6 +78,7 @@ FormatSettings getFormatSettings(const Context & context,
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.json.array_of_rows = settings.output_format_json_array_of_rows;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.named_tuples_as_objects = settings.output_format_json_named_tuples_as_objects;
format_settings.json.quote_64bit_integers = settings.output_format_json_quote_64bit_integers;

View File

@ -86,6 +86,7 @@ struct FormatSettings
struct
{
bool array_of_rows = false;
bool quote_64bit_integers = true;
bool quote_denormals = true;
bool escape_forward_slashes = true;

21
src/Functions/acosh.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct AcoshName
{
static constexpr auto name = "acosh";
};
using FunctionAcosh = FunctionMathUnary<UnaryFunctionVectorized<AcoshName, acosh>>;
}
void registerFunctionAcosh(FunctionFactory & factory)
{
factory.registerFunction<FunctionAcosh>();
}
}

21
src/Functions/asinh.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct AsinhName
{
static constexpr auto name = "asinh";
};
using FunctionAsinh = FunctionMathUnary<UnaryFunctionVectorized<AsinhName, asinh>>;
}
void registerFunctionAsinh(FunctionFactory & factory)
{
factory.registerFunction<FunctionAsinh>();
}
}

21
src/Functions/atan2.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathBinaryFloat64.h>
namespace DB
{
namespace
{
struct Atan2Name
{
static constexpr auto name = "atan2";
};
using FunctionAtan2 = FunctionMathBinaryFloat64<BinaryFunctionVectorized<Atan2Name, atan2>>;
}
void registerFunctionAtan2(FunctionFactory & factory)
{
factory.registerFunction<FunctionAtan2>(FunctionFactory::CaseInsensitive);
}
}

21
src/Functions/atanh.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct AtanhName
{
static constexpr auto name = "atanh";
};
using FunctionAtanh = FunctionMathUnary<UnaryFunctionVectorized<AtanhName, atanh>>;
}
void registerFunctionAtanh(FunctionFactory & factory)
{
factory.registerFunction<FunctionAtanh>();
}
}

21
src/Functions/cosh.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct CoshName
{
static constexpr auto name = "cosh";
};
using FunctionCosh = FunctionMathUnary<UnaryFunctionVectorized<CoshName, cosh>>;
}
void registerFunctionCosh(FunctionFactory & factory)
{
factory.registerFunction<FunctionCosh>();
}
}

21
src/Functions/hypot.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathBinaryFloat64.h>
namespace DB
{
namespace
{
struct HypotName
{
static constexpr auto name = "hypot";
};
using FunctionHypot = FunctionMathBinaryFloat64<BinaryFunctionVectorized<HypotName, hypot>>;
}
void registerFunctionHypot(FunctionFactory & factory)
{
factory.registerFunction<FunctionHypot>(FunctionFactory::CaseInsensitive);
}
}

21
src/Functions/log1p.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct Log1pName
{
static constexpr auto name = "log1p";
};
using FunctionLog1p = FunctionMathUnary<UnaryFunctionVectorized<Log1pName, log1p>>;
}
void registerFunctionLog1p(FunctionFactory & factory)
{
factory.registerFunction<FunctionLog1p>();
}
}

View File

@ -1,6 +1,5 @@
namespace DB
{
class FunctionFactory;
void registerFunctionE(FunctionFactory & factory);
@ -9,6 +8,7 @@ void registerFunctionExp(FunctionFactory & factory);
void registerFunctionLog(FunctionFactory & factory);
void registerFunctionExp2(FunctionFactory & factory);
void registerFunctionLog2(FunctionFactory & factory);
void registerFunctionLog1p(FunctionFactory & factory);
void registerFunctionExp10(FunctionFactory & factory);
void registerFunctionLog10(FunctionFactory & factory);
void registerFunctionSqrt(FunctionFactory & factory);
@ -23,8 +23,15 @@ void registerFunctionTan(FunctionFactory & factory);
void registerFunctionAsin(FunctionFactory & factory);
void registerFunctionAcos(FunctionFactory & factory);
void registerFunctionAtan(FunctionFactory & factory);
void registerFunctionAtan2(FunctionFactory & factory);
void registerFunctionSigmoid(FunctionFactory & factory);
void registerFunctionHypot(FunctionFactory & factory);
void registerFunctionSinh(FunctionFactory & factory);
void registerFunctionCosh(FunctionFactory & factory);
void registerFunctionTanh(FunctionFactory & factory);
void registerFunctionAsinh(FunctionFactory & factory);
void registerFunctionAcosh(FunctionFactory & factory);
void registerFunctionAtanh(FunctionFactory & factory);
void registerFunctionPow(FunctionFactory & factory);
@ -36,6 +43,7 @@ void registerFunctionsMath(FunctionFactory & factory)
registerFunctionLog(factory);
registerFunctionExp2(factory);
registerFunctionLog2(factory);
registerFunctionLog1p(factory);
registerFunctionExp10(factory);
registerFunctionLog10(factory);
registerFunctionSqrt(factory);
@ -50,8 +58,15 @@ void registerFunctionsMath(FunctionFactory & factory)
registerFunctionAsin(factory);
registerFunctionAcos(factory);
registerFunctionAtan(factory);
registerFunctionAtan2(factory);
registerFunctionSigmoid(factory);
registerFunctionHypot(factory);
registerFunctionSinh(factory);
registerFunctionCosh(factory);
registerFunctionTanh(factory);
registerFunctionAsinh(factory);
registerFunctionAcosh(factory);
registerFunctionAtanh(factory);
registerFunctionPow(factory);
}

21
src/Functions/sinh.cpp Normal file
View File

@ -0,0 +1,21 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionMathUnary.h>
namespace DB
{
namespace
{
struct SinhName
{
static constexpr auto name = "sinh";
};
using FunctionSinh = FunctionMathUnary<UnaryFunctionVectorized<SinhName, sinh>>;
}
void registerFunctionSinh(FunctionFactory & factory)
{
factory.registerFunction<FunctionSinh>();
}
}

View File

@ -102,6 +102,7 @@ SRCS(
URL/topLevelDomain.cpp
abs.cpp
acos.cpp
acosh.cpp
addDays.cpp
addHours.cpp
addMinutes.cpp
@ -170,8 +171,11 @@ SRCS(
array/range.cpp
array/registerFunctionsArray.cpp
asin.cpp
asinh.cpp
assumeNotNull.cpp
atan.cpp
atan2.cpp
atanh.cpp
bar.cpp
base64Decode.cpp
base64Encode.cpp
@ -202,6 +206,7 @@ SRCS(
concat.cpp
convertCharset.cpp
cos.cpp
cosh.cpp
countDigits.cpp
currentDatabase.cpp
currentUser.cpp
@ -273,6 +278,7 @@ SRCS(
hasToken.cpp
hasTokenCaseInsensitive.cpp
hostName.cpp
hypot.cpp
identity.cpp
if.cpp
ifNotFinite.cpp
@ -304,6 +310,7 @@ SRCS(
like.cpp
log.cpp
log10.cpp
log1p.cpp
log2.cpp
logTrace.cpp
lowCardinalityIndices.cpp
@ -409,6 +416,7 @@ SRCS(
runningDifferenceStartingWithFirstValue.cpp
sigmoid.cpp
sin.cpp
sinh.cpp
sleep.cpp
sleepEachRow.cpp
sqrt.cpp

View File

@ -12,9 +12,9 @@ JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_), yield_strings(yield_strings_)
const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_),
settings(settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -33,7 +33,7 @@ void JSONEachRowRowOutputFormat::writeField(const IColumn & column, const IDataT
writeString(fields[field_number], out);
writeChar(':', out);
if (yield_strings)
if (settings.json.serialize_as_strings)
{
WriteBufferFromOwnString buf;
@ -61,29 +61,94 @@ void JSONEachRowRowOutputFormat::writeRowStartDelimiter()
void JSONEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}\n", out);
// Why do we need this weird `if`?
//
// The reason is the formatRow function that is broken with respect to
// row-between delimiters. It should not write them, but it does, and then
// hacks around it by having a special formatRowNoNewline version, which, as
// you guessed, removes the newline from the end of row. But the row-between
// delimiter goes into a second row, so it turns out to be in the beginning
// of the line, and the removal doesn't work. There is also a second bug --
// the row-between delimiter in this format is written incorrectly. In fact,
// it is not written at all, and the newline is written in a row-end
// delimiter ("}\n" instead of the correct "}"). With these two bugs
// combined, the test 01420_format_row works perfectly.
//
// A proper implementation of formatRow would use IRowOutputFormat directly,
// and not write row-between delimiters, instead of using IOutputFormat
// processor and its crutch row callback. This would require exposing
// IRowOutputFormat, which we don't do now, but which can be generally useful
// for other cases such as parallel formatting, that also require a control
// flow different from the usual IOutputFormat.
//
// I just don't have time or energy to redo all of this, but I need to
// support JSON array output here, which requires proper ",\n" row-between
// delimiters. For compatibility, I preserve the bug in case of non-array
// output.
if (settings.json.array_of_rows)
{
writeCString("}", out);
}
else
{
writeCString("}\n", out);
}
field_number = 0;
}
void JSONEachRowRowOutputFormat::writeRowBetweenDelimiter()
{
// We preserve an existing bug here for compatibility. See the comment above.
if (settings.json.array_of_rows)
{
writeCString(",\n", out);
}
}
void JSONEachRowRowOutputFormat::writePrefix()
{
if (settings.json.array_of_rows)
{
writeCString("[\n", out);
}
}
void JSONEachRowRowOutputFormat::writeSuffix()
{
if (settings.json.array_of_rows)
{
writeCString("\n]\n", out);
}
}
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
const FormatSettings & _format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, false);
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
const FormatSettings & _format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, true);
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params,
settings);
});
}

View File

@ -19,8 +19,7 @@ public:
WriteBuffer & out_,
const Block & header_,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; }
@ -28,6 +27,9 @@ public:
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
void writePrefix() override;
void writeSuffix() override;
protected:
/// No totals and extremes.
@ -40,9 +42,6 @@ private:
Names fields;
FormatSettings settings;
protected:
bool yield_strings;
};
}

View File

@ -34,18 +34,24 @@ void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factor
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
const FormatSettings & _format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, false);
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = false;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
const FormatSettings & _format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, true);
FormatSettings settings = _format_settings;
settings.json.serialize_as_strings = true;
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf,
sample, params, settings);
});
}

View File

@ -62,6 +62,7 @@ struct Settings;
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(Seconds, execute_merges_on_single_replica_time_threshold, 0, "When greater than zero only a single replica starts the merge immediately, others wait up to that amount of time to download the result instead of doing merges locally. If the chosen replica doesn't finish the merge during that amount of time, fallback to standard behavior happens.", 0) \
M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \
M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \
M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \

View File

@ -0,0 +1,151 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <common/types.h>
#include <optional>
#include <mutex>
#include <city.h>
#include <algorithm>
#include <atomic>
namespace DB
{
/// minimum interval (seconds) between checks if chosen replica finished the merge.
static const auto RECHECK_MERGE_READYNESS_INTERVAL_SECONDS = 1;
/// don't refresh state too often (to limit number of zookeeper ops)
static const auto REFRESH_STATE_MINIMUM_INTERVAL_SECONDS = 3;
/// refresh the state automatically it it was not refreshed for a longer time
static const auto REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS = 30;
ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_)
: storage(storage_)
{}
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
{
/// those have only seconds resolution, so recheck period is quite rough
auto reference_timestamp = entry.last_postpone_time;
if (reference_timestamp == 0)
reference_timestamp = entry.create_time;
/// we don't want to check zookeeper too frequent
if (time(nullptr) - reference_timestamp >= RECHECK_MERGE_READYNESS_INTERVAL_SECONDS)
{
return storage.checkReplicaHavePart(replica, entry.new_part_name);
}
return false;
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
&& entry.create_time + threshold > time(nullptr) /// not too much time waited
);
}
/// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same).
/// that way each replica knows who is responsible for doing a certain merge.
/// in some corner cases (added / removed / deactivated replica)
/// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together)
/// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold,
/// in another just 2 replicas will do the merge)
std::optional<String> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry)
{
/// last state refresh was too long ago, need to sync up the replicas list
if (time(nullptr) - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS)
refreshState();
auto hash = getEntryHash(entry);
std::lock_guard lock(mutex);
auto num_replicas = active_replicas.size();
if (num_replicas == 0)
return std::nullopt;
auto replica_index = static_cast<int>(hash % num_replicas);
if (replica_index == current_replica_index)
return std::nullopt;
return active_replicas.at(replica_index);
}
void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
{
auto threshold = storage.getSettings()->execute_merges_on_single_replica_time_threshold.totalSeconds();
if (threshold == 0)
{
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = threshold;
return;
}
auto now = time(nullptr);
/// the setting was already enabled, and last state refresh was done recently
if (execute_merges_on_single_replica_time_threshold != 0
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
auto zookeeper = storage.getZooKeeper();
auto all_replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
std::sort(all_replicas.begin(), all_replicas.end());
std::vector<String> active_replicas_tmp;
int current_replica_index_tmp = -1;
for (const String & replica : all_replicas)
{
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
{
active_replicas_tmp.push_back(replica);
if (replica == storage.replica_name)
{
current_replica_index_tmp = active_replicas_tmp.size() - 1;
}
}
}
if (current_replica_index_tmp < 0 || active_replicas_tmp.size() < 2)
{
LOG_WARNING(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use execute_merges_on_single_replica_time_threshold!");
/// we can reset the settings w/o lock (it's atomic)
execute_merges_on_single_replica_time_threshold = 0;
return;
}
std::lock_guard lock(mutex);
execute_merges_on_single_replica_time_threshold = threshold;
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;
}
uint64_t ReplicatedMergeTreeMergeStrategyPicker::getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const
{
auto hash_data = storage.zookeeper_path + entry.new_part_name;
return CityHash_v1_0_2::CityHash64(hash_data.c_str(), hash_data.length());
}
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <common/types.h>
#include <optional>
#include <mutex>
#include <vector>
#include <atomic>
#include <boost/noncopyable.hpp>
namespace DB
{
class StorageReplicatedMergeTree;
struct ReplicatedMergeTreeLogEntryData;
/// In some use cases merging can be more expensive than fetching
/// (so instead of doing exactly the same merge cluster-wise you can do merge once and fetch ready part)
/// Fetches may be desirable for other operational reasons (backup replica without lot of CPU resources).
///
/// That class allow to take a decisions about preferred strategy for a concreate merge.
///
/// Since that code is used in shouldExecuteLogEntry we need to be able to:
/// 1) make decision fast
/// 2) avoid excessive zookeeper operations
///
/// Because of that we need to cache some important things,
/// like list of active replicas (to limit the number of zookeeper operations)
///
/// That means we need to refresh the state of that object regularly
///
/// NOTE: This class currently supports only single feature (execute_merges_on_single_replica_time_threshold),
/// may be extended to postpone merges in some other scenarios, namely
/// * always_fetch_merged_part
/// * try_fetch_recompressed_part_timeout
/// * (maybe, not for postpone) prefer_fetch_merged_part_time_threshold
///
/// NOTE: execute_merges_on_single_replica_time_threshold feature doesn't provide any strict guarantees.
/// When some replicas are added / removed we may execute some merges on more than one replica,
/// or not execute merge on any of replicas during execute_merges_on_single_replica_time_threshold interval.
/// (so it may be a bad idea to set that threshold to high values).
///
class ReplicatedMergeTreeMergeStrategyPicker: public boost::noncopyable
{
public:
ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_);
/// triggers refreshing the cached state (list of replicas etc.)
/// used when we get new merge event from the zookeeper queue ( see queueUpdatingTask() etc )
void refreshState();
/// return true if execute_merges_on_single_replica_time_threshold feature is active
/// and we may need to do a fetch (or postpone) instead of merge
bool shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const;
/// returns the replica name
/// and it's not current replica should do the merge
/// used in shouldExecuteLogEntry and in tryExecuteMerge
std::optional<String> pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry);
/// checks (in zookeeper) if the picked replica finished the merge
bool isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry);
private:
StorageReplicatedMergeTree & storage;
/// calculate entry hash based on zookeeper path and new part name
/// ATTENTION: it's not a general-purpose hash, it just allows to select replicas consistently
uint64_t getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const;
std::atomic<time_t> execute_merges_on_single_replica_time_threshold = 0;
std::atomic<time_t> last_refresh_time = 0;
std::mutex mutex;
/// those 2 members accessed under the mutex, only when
/// execute_merges_on_single_replica_time_threshold enabled
int current_replica_index = -1;
std::vector<String> active_replicas;
};
}

View File

@ -5,6 +5,7 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Common/StringUtils/StringUtils.h>
@ -20,8 +21,9 @@ namespace ErrorCodes
}
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_)
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_)
: storage(storage_)
, merge_strategy_picker(merge_strategy_picker_)
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
@ -120,6 +122,8 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
merge_strategy_picker.refreshState();
LOG_TRACE(log, "Loaded queue");
return updated;
}
@ -587,7 +591,10 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
}
if (!copied_entries.empty())
{
LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size());
merge_strategy_picker.refreshState();
}
}
storage.background_executor.triggerTask();
@ -1088,6 +1095,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
return false;
}
}
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
: merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),

View File

@ -22,6 +22,7 @@ class StorageReplicatedMergeTree;
class MergeTreeDataMergerMutator;
class ReplicatedMergeTreeMergePredicate;
class ReplicatedMergeTreeMergeStrategyPicker;
class ReplicatedMergeTreeQueue
@ -57,6 +58,7 @@ private:
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
StorageReplicatedMergeTree & storage;
ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker;
MergeTreeDataFormatVersion format_version;
String zookeeper_path;
@ -275,7 +277,7 @@ private:
size_t current_multi_batch_size = 1;
public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
~ReplicatedMergeTreeQueue();

View File

@ -139,7 +139,6 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000;
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard lock(current_zookeeper_mutex);
@ -202,7 +201,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
, queue(*this)
, merge_strategy_picker(*this)
, queue(*this, merge_strategy_picker)
, fetcher(*this)
, background_executor(*this, global_context)
, background_moves_executor(*this, global_context)
@ -1361,6 +1361,20 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
return false;
}
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log, "Prefer fetching part {} from replica {} due execute_merges_on_single_replica_time_threshold", entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}
DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.source_parts)
@ -3011,6 +3025,11 @@ void StorageReplicatedMergeTree::exitLeaderElection()
leader_election = nullptr;
}
bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name)
{
auto zookeeper = getZooKeeper();
return zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name);
}
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
{
@ -3026,7 +3045,7 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
if (replica == replica_name)
continue;
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
if (checkReplicaHavePart(replica, part_name) &&
(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
return replica;
@ -4016,6 +4035,8 @@ void StorageReplicatedMergeTree::alter(
StorageInMemoryMetadata future_metadata = getInMemoryMetadata();
commands.apply(future_metadata, query_context);
merge_strategy_picker.refreshState();
changeSettings(future_metadata.settings_changes, table_lock_holder);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(query_context, table_id, future_metadata);

View File

@ -13,6 +13,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
@ -210,7 +211,6 @@ public:
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
private:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
@ -222,11 +222,13 @@ private:
friend class ReplicatedMergeTreeCleanupThread;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeMergeStrategyPicker;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class MergeTreeData;
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
@ -262,6 +264,8 @@ private:
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
MergeStrategyPicker merge_strategy_picker;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
@ -478,6 +482,8 @@ private:
*/
String findReplicaHavingPart(const String & part_name, bool active);
bool checkReplicaHavePart(const String & replica, const String & part_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.

View File

@ -94,6 +94,7 @@ SRCS(
MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp
MergeTree/ReplicatedMergeTreeCleanupThread.cpp
MergeTree/ReplicatedMergeTreeLogEntry.cpp
MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp
MergeTree/ReplicatedMergeTreeMutationEntry.cpp
MergeTree/ReplicatedMergeTreePartCheckThread.cpp
MergeTree/ReplicatedMergeTreePartHeader.cpp

View File

@ -72,3 +72,43 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -56,6 +56,12 @@ select log2(2) = 1;
select log2(4) = 2;
select sum(abs(log2(exp2(x)) - x) < 1.0e-9) / count() from system.one array join range(1000) as x;
select log1p(-1) = -inf;
select log1p(0) = 0;
select abs(log1p(exp(2) - 1) - 2) < 1e8;
select abs(log1p(exp(3) - 1) - 3) < 1e8;
select sum(abs(log1p(exp(x) - 1) - x) < 1e-8) / count() from system.one array join range(100) as x;
select sin(0) = 0;
select sin(pi() / 4) = 1 / sqrt(2);
select sin(pi() / 2) = 1;
@ -82,6 +88,48 @@ select acos(-1) = pi();
select atan(0) = 0;
select atan(1) = pi() / 4;
select atan2(0, 1) = 0;
select atan2(0, 2) = 0;
select atan2(1, 0) = pi() / 2;
select atan2(1, 1) = pi() / 4;
select atan2(-1, -1) = -3 * pi() / 4;
select hypot(0, 1) = 1;
select hypot(1, 0) = 1;
select hypot(1, 1) = sqrt(2);
select hypot(-1, 1) = sqrt(2);
select hypot(3, 4) = 5;
select sinh(0) = 0;
select sinh(1) = -sinh(-1);
select abs(sinh(1) - 0.5 * (e() - exp(-1))) < 1e-6;
select abs(sinh(2) - 0.5 * (exp(2) - exp(-2))) < 1e-6;
select sum(abs(sinh(x) - 0.5 * (exp(x) - exp(-x))) < 1e-6) / count() from system.one array join range(10) as x;
select cosh(0) = 1;
select cosh(1) = cosh(-1);
select abs(cosh(1) - 0.5 * (e() + exp(-1))) < 1e-6;
select abs(pow(cosh(1), 2) - pow(sinh(1), 2) - 1) < 1e-6;
select sum(abs(cosh(x) * cosh(x) - sinh(x) * sinh(x) - 1) < 1e-6) / count() from system.one array join range(10) as x;
select asinh(0) = 0;
select asinh(1) = -asinh(-1);
select abs(asinh(1) - ln(1 + sqrt(2))) < 1e-9;
select abs(asinh(sinh(1)) - 1) < 1e-9;
select sum(abs(asinh(sinh(x)) - x) < 1e-9) / count() from system.one array join range(100) as x;
select acosh(1) = 0;
select abs(acosh(2) - ln(2 + sqrt(3))) < 1e-9;
select abs(acosh(cosh(2)) - 2) < 1e-9;
select abs(acosh(cosh(3)) - 3) < 1e-9;
select sum(abs(acosh(cosh(x)) - x) < 1e-9) / count() from system.one array join range(1, 101) as x;
select atanh(0) = 0;
select atanh(0.5) = -atanh(-0.5);
select abs(atanh(0.9) - 0.5 * ln(19)) < 1e-5;
select abs(atanh(tanh(1)) - 1) < 1e-5;
select sum(abs(atanh(tanh(x)) - x) < 1e-5) / count() from system.one array join range(10) as x;
select erf(0) = 0;
select erf(-10) = -1;
select erf(10) = 1;

View File

@ -7,6 +7,8 @@
17
11
11
17
11
7
11
6

View File

@ -51,6 +51,27 @@ DROP DICTIONARY database_for_dict.dict1;
SELECT dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(11)); -- {serverError 36}
-- SOURCE(CLICKHOUSE(...)) uses default params if not specified
DROP DICTIONARY IF EXISTS database_for_dict.dict1;
CREATE DICTIONARY database_for_dict.dict1
(
key_column UInt64 DEFAULT 0,
second_column UInt8 DEFAULT 1,
third_column String DEFAULT 'qqq',
fourth_column Float64 DEFAULT 42.0
)
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(TABLE 'table_for_dict' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
SELECT dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(11));
SELECT count(distinct(dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(number)))) from numbers(100);
DROP DICTIONARY database_for_dict.dict1;
CREATE DICTIONARY database_for_dict.dict1
(
key_column UInt64 DEFAULT 0,

View File

@ -0,0 +1,11 @@
[
{"a":"0","b":"0"},
{"a":"1","b":"2"},
{"a":"2","b":"4"}
]
[
{"number":"0"}
]
[
]

View File

@ -0,0 +1,4 @@
set output_format_json_array_of_rows = 1;
select number a, number * 2 b from numbers(3) format JSONEachRow;
select * from numbers(1) format JSONEachRow;
select * from numbers(1) where null format JSONEachRow;

View File

@ -0,0 +1,77 @@
############################
### emulate normal feature operation - merges are distributed between replicas
############################
### emulate execute_merges_on_single_replica_time_threshold timeout
############################
### timeout not exceeded, r1 waits for r2
Row 1:
──────
table: execute_on_single_replica_r1
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 1
postpone_reason: Not executing merge for the part all_0_0_5, waiting for r2 to execute merge.
Row 2:
──────
table: execute_on_single_replica_r2
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 0
postpone_reason:
############################
### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own
Row 1:
──────
table: execute_on_single_replica_r2
type: MERGE_PARTS
new_part_name: all_0_0_5
has_postpones: 0
postpone_reason:
############################
### queue unfreeze
############################
### disable the feature
############################
### part_log
Row 1:
──────
part_name: all_0_0_1
mergers: ['execute_on_single_replica_r1']
fetchers: ['execute_on_single_replica_r2']
Row 2:
──────
part_name: all_0_0_2
mergers: ['execute_on_single_replica_r1']
fetchers: ['execute_on_single_replica_r2']
Row 3:
──────
part_name: all_0_0_3
mergers: ['execute_on_single_replica_r2']
fetchers: ['execute_on_single_replica_r1']
Row 4:
──────
part_name: all_0_0_4
mergers: ['execute_on_single_replica_r2']
fetchers: ['execute_on_single_replica_r1']
Row 5:
──────
part_name: all_0_0_5
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []
Row 6:
──────
part_name: all_0_0_6
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []
Row 7:
──────
part_name: all_0_0_7
mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2']
fetchers: []

View File

@ -0,0 +1,127 @@
DROP TABLE IF EXISTS execute_on_single_replica_r1 NO DELAY;
DROP TABLE IF EXISTS execute_on_single_replica_r2 NO DELAY;
/* that test requires fixed zookeeper path */
CREATE TABLE execute_on_single_replica_r1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r1') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
CREATE TABLE execute_on_single_replica_r2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r2') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
INSERT INTO execute_on_single_replica_r1 VALUES (1);
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SET optimize_throw_if_noop=1;
SELECT '############################';
SELECT '### emulate normal feature operation - merges are distributed between replicas';
/* all_0_0_1 - will be merged by r1, and downloaded by r2 */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
/* all_0_0_2 - will be merged by r1, and downloaded by r2 */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
/* all_0_0_3 - will be merged by r2, and downloaded by r1 */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
/* all_0_0_4 - will be merged by r2, and downloaded by r1 */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
SELECT '############################';
SELECT '### emulate execute_merges_on_single_replica_time_threshold timeout';
SYSTEM STOP REPLICATION QUEUES execute_on_single_replica_r2;
/* all_0_0_5 - should be merged by r2, but it has replication queue stopped, so r1 do the merge */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL SETTINGS replication_alter_partitions_sync=0;
/* if we will check immediately we can find the log entry unchecked */
SELECT * FROM numbers(4) where sleepEachRow(1);
SELECT '############################';
SELECT '### timeout not exceeded, r1 waits for r2';
/* we can now check that r1 waits for r2 */
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
/* we have execute_merges_on_single_replica_time_threshold exceeded */
SELECT * FROM numbers(10) where sleepEachRow(1);
SELECT '############################';
SELECT '### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own';
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
SYSTEM START REPLICATION QUEUES execute_on_single_replica_r2;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SELECT '############################';
SELECT '### queue unfreeze';
SELECT
table,
type,
new_part_name,
num_postponed > 0 AS has_postpones,
postpone_reason
FROM system.replication_queue
WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%'
AND database = currentDatabase()
ORDER BY table
FORMAT Vertical;
SELECT '############################';
SELECT '### disable the feature';
ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;
/* all_0_0_6 - we disabled the feature, both replicas will merge */
OPTIMIZE TABLE execute_on_single_replica_r2 FINAL;
/* all_0_0_7 - same */
OPTIMIZE TABLE execute_on_single_replica_r1 FINAL;
SYSTEM SYNC REPLICA execute_on_single_replica_r1;
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SYSTEM FLUSH LOGS;
SELECT '############################';
SELECT '### part_log';
SELECT
part_name,
arraySort(groupArrayIf(table, event_type = 'MergeParts')) AS mergers,
arraySort(groupArrayIf(table, event_type = 'DownloadPart')) AS fetchers
FROM system.part_log
WHERE (event_time > (now() - 40))
AND (table LIKE 'execute\\_on\\_single\\_replica\\_r%')
AND (part_name NOT LIKE '%\\_0')
AND (database = currentDatabase())
GROUP BY part_name
ORDER BY part_name
FORMAT Vertical;
DROP TABLE execute_on_single_replica_r1 NO DELAY;
DROP TABLE execute_on_single_replica_r2 NO DELAY;

View File

@ -0,0 +1 @@
select '1111' as name from system.numbers_mt order by name limit 10000 format Null;

View File

@ -15,6 +15,7 @@ v20.8.2.3-stable 2020-09-08
v20.7.4.11-stable 2020-10-09
v20.7.3.7-stable 2020-09-18
v20.7.2.30-stable 2020-08-31
v20.6.10.2-stable 2020-11-19
v20.6.9.1-stable 2020-11-10
v20.6.8.5-stable 2020-10-12
v20.6.7.4-stable 2020-09-18

1 v20.11.3.3-stable 2020-11-13
15 v20.7.4.11-stable 2020-10-09
16 v20.7.3.7-stable 2020-09-18
17 v20.7.2.30-stable 2020-08-31
18 v20.6.10.2-stable 2020-11-19
19 v20.6.9.1-stable 2020-11-10
20 v20.6.8.5-stable 2020-10-12
21 v20.6.7.4-stable 2020-09-18