Merge branch 'master' into mvcc_prototype

This commit is contained in:
Alexander Tokmakov 2022-04-05 11:14:42 +02:00
commit da00beaf7f
98 changed files with 658 additions and 419 deletions

View File

@ -144,6 +144,7 @@ Checks: '-*,
clang-analyzer-cplusplus.SelfAssignment,
clang-analyzer-deadcode.DeadStores,
clang-analyzer-cplusplus.Move,
clang-analyzer-optin.cplusplus.UninitializedObject,
clang-analyzer-optin.cplusplus.VirtualCall,
clang-analyzer-security.insecureAPI.UncheckedReturn,
clang-analyzer-security.insecureAPI.bcmp,

View File

@ -1,4 +1,4 @@
set (ENABLE_KRB5_DEFAULT 1)
set (ENABLE_KRB5_DEFAULT ${ENABLE_LIBRARIES})
if (NOT CMAKE_SYSTEM_NAME MATCHES "Linux" AND NOT (CMAKE_SYSTEM_NAME MATCHES "Darwin" AND NOT CMAKE_CROSSCOMPILING))
message (WARNING "krb5 disabled in non-Linux and non-native-Darwin environments")
set (ENABLE_KRB5_DEFAULT 0)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 520a90e02e3e5cb90afeae1846d161dbc508a6f1
Subproject commit 008b16469471d55b176db181756c94e3f14dd2dc

View File

@ -94,8 +94,9 @@ RUN arch=${TARGETARCH:-amd64} \
&& apt-get update \
&& apt-get --yes -o "Dpkg::Options::=--force-confdef" -o "Dpkg::Options::=--force-confold" upgrade \
&& for package in ${PACKAGES}; do \
apt-get install --allow-unauthenticated --yes --no-install-recommends "${package}=${VERSION}" || exit 1 \
packages="${packages} ${package}=${VERSION}" \
; done \
&& apt-get install --allow-unauthenticated --yes --no-install-recommends ${packages} || exit 1 \
; fi \
&& clickhouse-local -q 'SELECT * FROM system.build_options' \
&& rm -rf \

View File

@ -77,7 +77,7 @@ A function configuration contains the following settings:
- `argument` - argument description with the `type`, and optional `name` of an argument. Each argument is described in a separate setting. Specifying name is necessary if argument names are part of serialization for user defined function format like [Native](../../interfaces/formats.md#native) or [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Default argument name value is `c` + argument_number.
- `format` - a [format](../../interfaces/formats.md) in which arguments are passed to the command.
- `return_type` - the type of a returned value.
- `return_name` - name of retuned value. Specifying return name is necessary if return name is part of serialization for user defined function format like [Native](../../interfaces/formats.md#native) or [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Optional. Default value is `result`.
- `return_name` - name of returned value. Specifying return name is necessary if return name is part of serialization for user defined function format like [Native](../../interfaces/formats.md#native) or [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Optional. Default value is `result`.
- `type` - an executable type. If `type` is set to `executable` then single command is started. If it is set to `executable_pool` then a pool of commands is created.
- `max_command_execution_time` - maximum execution time in seconds for processing block of data. This setting is valid for `executable_pool` commands only. Optional. Default value is `10`.
- `command_termination_timeout` - time in seconds during which a command should finish after its pipe is closed. After that time `SIGTERM` is sent to the process executing the command. Optional. Default value is `10`.

View File

@ -125,7 +125,7 @@ class FindResultImpl : public FindResultImplBase, public FindResultImplOffsetBas
public:
FindResultImpl()
: FindResultImplBase(false), FindResultImplOffsetBase<need_offset>(0)
: FindResultImplBase(false), FindResultImplOffsetBase<need_offset>(0) // NOLINT(clang-analyzer-optin.cplusplus.UninitializedObject) intentionally allow uninitialized value here
{}
FindResultImpl(Mapped * value_, bool found_, size_t off)

View File

@ -214,6 +214,9 @@ private:
/// offset in bits to the next to the rightmost bit at that byte; or zero if the rightmost bit is the rightmost bit in that byte.
offset_r = (l + content_width) % 8;
content_l = nullptr;
content_r = nullptr;
}
UInt8 ALWAYS_INLINE read(UInt8 value_l) const

View File

@ -61,7 +61,7 @@ private:
class JSONBool : public IItem
{
public:
explicit JSONBool(bool value_) : value(std::move(value_)) {}
explicit JSONBool(bool value_) : value(value_) {}
void format(const FormatSettings & settings, FormatContext & context) override;
private:
@ -74,7 +74,7 @@ public:
void add(ItemPtr value) { values.push_back(std::move(value)); }
void add(std::string value) { add(std::make_unique<JSONString>(std::move(value))); }
void add(const char * value) { add(std::make_unique<JSONString>(value)); }
void add(bool value) { add(std::make_unique<JSONBool>(std::move(value))); }
void add(bool value) { add(std::make_unique<JSONBool>(value)); }
template <typename T>
requires std::is_arithmetic_v<T>
@ -99,7 +99,7 @@ public:
void add(std::string key, std::string value) { add(std::move(key), std::make_unique<JSONString>(std::move(value))); }
void add(std::string key, const char * value) { add(std::move(key), std::make_unique<JSONString>(value)); }
void add(std::string key, std::string_view value) { add(std::move(key), std::make_unique<JSONString>(value)); }
void add(std::string key, bool value) { add(std::move(key), std::make_unique<JSONBool>(std::move(value))); }
void add(std::string key, bool value) { add(std::move(key), std::make_unique<JSONBool>(value)); }
template <typename T>
requires std::is_arithmetic_v<T>

View File

@ -0,0 +1,15 @@
#include <IO/WriteHelpers.h>
#include <Common/NamePrompter.h>
namespace DB::detail
{
void appendHintsMessageImpl(String & message, const std::vector<String> & hints)
{
if (hints.empty())
{
return;
}
message += ". Maybe you meant: " + toString(hints);
}
}

View File

@ -90,6 +90,10 @@ private:
}
};
namespace detail
{
void appendHintsMessageImpl(String & message, const std::vector<String> & hints);
}
template <size_t MaxNumHints, typename Self>
class IHints
@ -102,6 +106,12 @@ public:
return prompter.getHints(name, getAllRegisteredNames());
}
void appendHintsMessage(String & message, const String & name) const
{
auto hints = getHints(name);
detail::appendHintsMessageImpl(message, hints);
}
IHints() = default;
IHints(const IHints &) = default;
@ -114,5 +124,4 @@ public:
private:
NamePrompter<MaxNumHints> prompter;
};
}

View File

@ -46,7 +46,8 @@ static ReturnType checkColumnStructure(const ColumnWithTypeAndName & actual, con
return onError<ReturnType>("Block structure mismatch in " + std::string(context_description) + " stream: different names of columns:\n"
+ actual.dumpStructure() + "\n" + expected.dumpStructure(), code);
if (!actual.type->equals(*expected.type))
if ((actual.type && !expected.type) || (!actual.type && expected.type)
|| (actual.type && expected.type && !actual.type->equals(*expected.type)))
return onError<ReturnType>("Block structure mismatch in " + std::string(context_description) + " stream: different types:\n"
+ actual.dumpStructure() + "\n" + expected.dumpStructure(), code);

View File

@ -15,10 +15,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
@ -61,25 +57,21 @@ struct SortCursorImpl
reset(block, perm);
}
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
SortCursorImpl(
const Block & header,
const Columns & columns,
const SortDescription & desc_,
size_t order_ = 0,
IColumn::Permutation * perm = nullptr)
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
{
for (auto & column_desc : desc)
{
if (!column_desc.column_name.empty())
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
ErrorCodes::LOGICAL_ERROR);
}
reset(columns, {}, perm);
reset(columns, header, perm);
}
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
void reset(const Block & block, IColumn::Permutation * perm = nullptr)
{
reset(block.getColumns(), block, perm);
}
void reset(const Block & block, IColumn::Permutation * perm = nullptr) { reset(block.getColumns(), block, perm); }
/// Set the cursor to the beginning of the new block.
void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr)
@ -95,9 +87,7 @@ struct SortCursorImpl
for (size_t j = 0, size = desc.size(); j < size; ++j)
{
auto & column_desc = desc[j];
size_t column_number = !column_desc.column_name.empty()
? block.getPositionByName(column_desc.column_name)
: column_desc.column_number;
size_t column_number = block.getPositionByName(column_desc.column_name);
sort_columns.push_back(columns[column_number].get());
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported();
@ -367,12 +357,12 @@ private:
};
template <typename TLeftColumns, typename TRightColumns>
bool less(const TLeftColumns & lhs, const TRightColumns & rhs, size_t i, size_t j, const SortDescription & descr)
bool less(const TLeftColumns & lhs, const TRightColumns & rhs, size_t i, size_t j, const SortDescriptionWithPositions & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
int res = elem.base.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.base.nulls_direction);
if (res < 0)
return true;
else if (res > 0)

View File

@ -1,12 +1,12 @@
#include <Core/SortDescription.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
namespace DB
{
void dumpSortDescription(const SortDescription & description, const Block & header, WriteBuffer & out)
void dumpSortDescription(const SortDescription & description, WriteBuffer & out)
{
bool first = true;
@ -16,17 +16,7 @@ void dumpSortDescription(const SortDescription & description, const Block & head
out << ", ";
first = false;
if (!desc.column_name.empty())
out << desc.column_name;
else
{
if (desc.column_number < header.columns())
out << header.getByPosition(desc.column_number).name;
else
out << "?";
out << " (pos " << desc.column_number << ")";
}
out << desc.column_name;
if (desc.direction > 0)
out << " ASC";
@ -38,18 +28,9 @@ void dumpSortDescription(const SortDescription & description, const Block & head
}
}
void SortColumnDescription::explain(JSONBuilder::JSONMap & map, const Block & header) const
void SortColumnDescription::explain(JSONBuilder::JSONMap & map) const
{
if (!column_name.empty())
map.add("Column", column_name);
else
{
if (column_number < header.columns())
map.add("Column", header.getByPosition(column_number).name);
map.add("Position", column_number);
}
map.add("Column", column_name);
map.add("Ascending", direction > 0);
map.add("With Fill", with_fill);
}
@ -57,17 +38,17 @@ void SortColumnDescription::explain(JSONBuilder::JSONMap & map, const Block & he
std::string dumpSortDescription(const SortDescription & description)
{
WriteBufferFromOwnString wb;
dumpSortDescription(description, Block{}, wb);
dumpSortDescription(description, wb);
return wb.str();
}
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description, const Block & header)
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description)
{
auto json_array = std::make_unique<JSONBuilder::JSONArray>();
for (const auto & descr : description)
{
auto json_map = std::make_unique<JSONBuilder::JSONMap>();
descr.explain(*json_map, header);
descr.explain(*json_map);
json_array->add(std::move(json_map));
}

View File

@ -39,7 +39,6 @@ struct FillColumnDescription
struct SortColumnDescription
{
std::string column_name; /// The name of the column.
size_t column_number; /// Column number (used if no name is given).
int direction; /// 1 - ascending, -1 - descending.
int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
/// To achieve NULLS LAST, set it equal to direction, to achieve NULLS FIRST, set it opposite.
@ -48,23 +47,24 @@ struct SortColumnDescription
FillColumnDescription fill_description;
explicit SortColumnDescription(
size_t column_number_, int direction_ = 1, int nulls_direction_ = 1,
const std::shared_ptr<Collator> & collator_ = nullptr,
bool with_fill_ = false, const FillColumnDescription & fill_description_ = {})
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_)
, with_fill(with_fill_), fill_description(fill_description_) {}
explicit SortColumnDescription(
const std::string & column_name_, int direction_ = 1, int nulls_direction_ = 1,
const std::shared_ptr<Collator> & collator_ = nullptr,
bool with_fill_ = false, const FillColumnDescription & fill_description_ = {})
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_)
, collator(collator_), with_fill(with_fill_), fill_description(fill_description_) {}
const std::string & column_name_,
int direction_ = 1,
int nulls_direction_ = 1,
const std::shared_ptr<Collator> & collator_ = nullptr,
bool with_fill_ = false,
const FillColumnDescription & fill_description_ = {})
: column_name(column_name_)
, direction(direction_)
, nulls_direction(nulls_direction_)
, collator(collator_)
, with_fill(with_fill_)
, fill_description(fill_description_)
{
}
bool operator == (const SortColumnDescription & other) const
{
return column_name == other.column_name && column_number == other.column_number
&& direction == other.direction && nulls_direction == other.nulls_direction;
return column_name == other.column_name && direction == other.direction && nulls_direction == other.nulls_direction;
}
bool operator != (const SortColumnDescription & other) const
@ -72,22 +72,30 @@ struct SortColumnDescription
return !(*this == other);
}
std::string dump() const
{
return fmt::format("{}:{}:dir {}nulls ", column_name, column_number, direction, nulls_direction);
}
std::string dump() const { return fmt::format("{}:dir {}nulls {}", column_name, direction, nulls_direction); }
void explain(JSONBuilder::JSONMap & map, const Block & header) const;
void explain(JSONBuilder::JSONMap & map) const;
};
struct SortColumnDescriptionWithColumnIndex
{
SortColumnDescription base;
size_t column_number;
SortColumnDescriptionWithColumnIndex(SortColumnDescription description_, size_t column_number_)
: base(std::move(description_)), column_number(column_number_)
{
}
};
/// Description of the sorting rule for several columns.
using SortDescription = std::vector<SortColumnDescription>;
using SortDescriptionWithPositions = std::vector<SortColumnDescriptionWithColumnIndex>;
/// Outputs user-readable description into `out`.
void dumpSortDescription(const SortDescription & description, const Block & header, WriteBuffer & out);
void dumpSortDescription(const SortDescription & description, WriteBuffer & out);
std::string dumpSortDescription(const SortDescription & description);
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description, const Block & header);
JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description);
}

View File

@ -128,22 +128,21 @@ static auto extractVector(const std::vector<Tuple> & vec)
return res;
}
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns)
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns)
{
std::unordered_map<String, DataTypePtr> storage_columns_map;
for (const auto & [name, type] : extended_storage_columns)
storage_columns_map[name] = type;
for (auto & name_type : columns_list)
for (auto & column : block)
{
if (!isObject(name_type.type))
if (!isObject(column.type))
continue;
auto & column = block.getByName(name_type.name);
if (!isObject(column.type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Type for column '{}' mismatch in columns list and in block. In list: {}, in block: {}",
name_type.name, name_type.type->getName(), column.type->getName());
column.name, column.type->getName(), column.type->getName());
const auto & column_object = assert_cast<const ColumnObject &>(*column.column);
const auto & subcolumns = column_object.getSubcolumns();
@ -151,7 +150,7 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con
if (!column_object.isFinalized())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot convert to tuple column '{}' from type {}. Column should be finalized first",
name_type.name, name_type.type->getName());
column.name, column.type->getName());
PathsInData tuple_paths;
DataTypes tuple_types;
@ -164,12 +163,11 @@ void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, con
tuple_columns.emplace_back(entry->data.getFinalizedColumnPtr());
}
auto it = storage_columns_map.find(name_type.name);
auto it = storage_columns_map.find(column.name);
if (it == storage_columns_map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", name_type.name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column '{}' not found in storage", column.name);
std::tie(column.column, column.type) = unflattenTuple(tuple_paths, tuple_types, tuple_columns);
name_type.type = column.type;
/// Check that constructed Tuple type and type in storage are compatible.
getLeastCommonTypeForObject({column.type, it->second}, true);

View File

@ -38,7 +38,7 @@ DataTypePtr getDataTypeByColumn(const IColumn & column);
/// Converts Object types and columns to Tuples in @columns_list and @block
/// and checks that types are consistent with types in @extended_storage_columns.
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns);
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns);
/// Checks that each path is not the prefix of any other path.
void checkObjectHasNoAmbiguosPaths(const PathsInData & paths);

View File

@ -39,10 +39,6 @@ public:
{
}
virtual ~ReadBufferFromFileDescriptor() override
{
}
int getFD() const
{
return fd;
@ -84,9 +80,6 @@ public:
{
use_pread = true;
}
virtual ~ReadBufferFromFileDescriptorPRead() override
{
}
};
}

View File

@ -2265,10 +2265,6 @@ static bool windowDescriptionComparator(const WindowDescription * _left, const W
return true;
else if (left[i].column_name > right[i].column_name)
return false;
else if (left[i].column_number < right[i].column_number)
return true;
else if (left[i].column_number > right[i].column_number)
return false;
else if (left[i].direction < right[i].direction)
return true;
else if (left[i].direction > right[i].direction)

View File

@ -1025,7 +1025,7 @@ std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIf
for (size_t i = 0; i < sort_columns_size; ++i)
{
if (header.has(sort_columns[i]))
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
sort_description.emplace_back(sort_columns[i], 1, 1);
else
return {};
}

View File

@ -430,8 +430,8 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
SortDescription sort_description;
for (size_t i = 0; i < tuple_size; ++i)
{
block_to_sort.insert({ ordered_set[i], nullptr, "_" + toString(i) });
sort_description.emplace_back(i, 1, 1);
block_to_sort.insert({ordered_set[i], nullptr, ordered_set[i]->getName()});
sort_description.emplace_back(ordered_set[i]->getName(), 1, 1);
}
sortBlock(block_to_sort, sort_description);

View File

@ -384,7 +384,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
for (const auto & name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(log_element_columns);
Block block(std::move(log_element_columns));
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)

View File

@ -512,14 +512,6 @@ TableJoin::createConvertingActions(const ColumnsWithTypeAndName & left_sample_co
template <typename LeftNamesAndTypes, typename RightNamesAndTypes>
void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const RightNamesAndTypes & right, bool allow_right)
{
if (strictness() == ASTTableJoin::Strictness::Asof)
{
if (clauses.size() != 1)
throw DB::Exception("ASOF join over multiple keys is not supported", ErrorCodes::NOT_IMPLEMENTED);
if (right.back().type->isNullable())
throw DB::Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
if (!left_type_map.empty() || !right_type_map.empty())
return;
@ -531,6 +523,15 @@ void TableJoin::inferJoinKeyCommonType(const LeftNamesAndTypes & left, const Rig
for (const auto & col : right)
right_types[renamedRightColumnName(col.name)] = col.type;
if (strictness() == ASTTableJoin::Strictness::Asof)
{
if (clauses.size() != 1)
throw DB::Exception("ASOF join over multiple keys is not supported", ErrorCodes::NOT_IMPLEMENTED);
auto asof_key_type = right_types.find(clauses.back().key_names_right.back());
if (asof_key_type != right_types.end() && asof_key_type->second->isNullable())
throw DB::Exception("ASOF join over right table Nullable column is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
forAllKeys(clauses, [&](const auto & left_key_name, const auto & right_key_name)
{

View File

@ -98,9 +98,7 @@ ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, c
{
const auto & sort_column_description = description[i];
const IColumn * column = !sort_column_description.column_name.empty()
? block.getByName(sort_column_description.column_name).column.get()
: block.safeGetByPosition(sort_column_description.column_number).column.get();
const IColumn * column = block.getByName(sort_column_description.column_name).column.get();
if (isCollationRequired(sort_column_description))
{

View File

@ -81,9 +81,17 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
if (infile)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM INFILE " << (settings.hilite ? hilite_none : "") << infile->as<ASTLiteral &>().value.safeGet<std::string>();
settings.ostr
<< (settings.hilite ? hilite_keyword : "")
<< " FROM INFILE "
<< (settings.hilite ? hilite_none : "")
<< quoteString(infile->as<ASTLiteral &>().value.safeGet<std::string>());
if (compression)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COMPRESSION " << (settings.hilite ? hilite_none : "") << compression->as<ASTLiteral &>().value.safeGet<std::string>();
settings.ostr
<< (settings.hilite ? hilite_keyword : "")
<< " COMPRESSION "
<< (settings.hilite ? hilite_none : "")
<< quoteString(compression->as<ASTLiteral &>().value.safeGet<std::string>());
}
if (select)

View File

@ -38,12 +38,7 @@ LimitTransform::LimitTransform(
}
for (const auto & desc : description)
{
if (!desc.column_name.empty())
sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
else
sort_column_positions.push_back(desc.column_number);
}
sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
}
Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, UInt64 row) const

View File

@ -104,7 +104,7 @@ static AggregatingSortedAlgorithm::ColumnsDefinition defineColumns(
/// Included into PK?
auto it = description.begin();
for (; it != description.end(); ++it)
if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i))
if (it->column_name == column.name)
break;
if (it != description.end())
@ -290,11 +290,10 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::initAggregateDescription
AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingAlgorithmWithDelayedChunk(num_inputs, description_)
, columns_definition(defineColumns(header, description_))
, merged_data(getMergedColumns(header, columns_definition), max_block_size, columns_definition)
const Block & header_, size_t num_inputs, SortDescription description_, size_t max_block_size)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, description_)
, columns_definition(defineColumns(header_, description_))
, merged_data(getMergedColumns(header_, columns_definition), max_block_size, columns_definition)
{
}

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
}
CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
const Block & header,
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & sign_column,
@ -30,9 +30,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header.getPositionByName(sign_column))
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header_.getPositionByName(sign_column))
, only_positive_sign(only_positive_sign_)
, log(log_)
{

View File

@ -14,11 +14,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
FinishAggregatingInOrderAlgorithm::State::State(
const Chunk & chunk, const SortDescription & desc, Int64 total_bytes_)
: all_columns(chunk.getColumns())
, num_rows(chunk.getNumRows())
, total_bytes(total_bytes_)
FinishAggregatingInOrderAlgorithm::State::State(const Chunk & chunk, const SortDescriptionWithPositions & desc, Int64 total_bytes_)
: all_columns(chunk.getColumns()), num_rows(chunk.getNumRows()), total_bytes(total_bytes_)
{
if (!chunk)
return;
@ -32,25 +29,13 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_)
: header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(std::move(description_))
, max_block_size(max_block_size_)
, max_block_bytes(max_block_bytes_)
: header(header_), num_inputs(num_inputs_), params(params_), max_block_size(max_block_size_), max_block_bytes(max_block_bytes_)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = header_.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
for (const auto & column_description : description_)
description.emplace_back(column_description, header_.getPositionByName(column_description.column_name));
}
void FinishAggregatingInOrderAlgorithm::initialize(Inputs inputs)

View File

@ -41,7 +41,7 @@ public:
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_);
@ -69,7 +69,7 @@ private:
/// Number of bytes in all columns + number of bytes in arena, related to current chunk.
size_t total_bytes = 0;
State(const Chunk & chunk, const SortDescription & description, Int64 total_bytes_);
State(const Chunk & chunk, const SortDescriptionWithPositions & description, Int64 total_bytes_);
State() = default;
bool isValid() const { return current_row < num_rows; }
@ -78,7 +78,7 @@ private:
Block header;
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescription description;
SortDescriptionWithPositions description;
size_t max_block_size;
size_t max_block_bytes;

View File

@ -30,12 +30,16 @@ static GraphiteRollupSortedAlgorithm::ColumnsDefinition defineColumns(
}
GraphiteRollupSortedAlgorithm::GraphiteRollupSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), nullptr, max_row_refs)
, merged_data(header.cloneEmptyColumns(), false, max_block_size)
, params(std::move(params_)), time_of_merge(time_of_merge_)
const Block & header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
Graphite::Params params_,
time_t time_of_merge_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), nullptr, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), false, max_block_size)
, params(std::move(params_))
, time_of_merge(time_of_merge_)
{
size_t max_size_of_aggregate_state = 0;
size_t max_alignment_of_aggregate_state = 1;
@ -50,7 +54,7 @@ GraphiteRollupSortedAlgorithm::GraphiteRollupSortedAlgorithm(
}
merged_data.allocMemForAggregates(max_size_of_aggregate_state, max_alignment_of_aggregate_state);
columns_definition = defineColumns(header, params);
columns_definition = defineColumns(header_, params);
}
UInt32 GraphiteRollupSortedAlgorithm::selectPrecision(const Graphite::Retentions & retentions, time_t time) const

View File

@ -4,12 +4,8 @@
namespace DB
{
IMergingAlgorithmWithDelayedChunk::IMergingAlgorithmWithDelayedChunk(
size_t num_inputs,
SortDescription description_)
: description(std::move(description_))
, current_inputs(num_inputs)
, cursors(num_inputs)
IMergingAlgorithmWithDelayedChunk::IMergingAlgorithmWithDelayedChunk(Block header_, size_t num_inputs, SortDescription description_)
: description(std::move(description_)), header(std::move(header_)), current_inputs(num_inputs), cursors(num_inputs)
{
}
@ -22,7 +18,8 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
if (!current_inputs[source_num].chunk)
continue;
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
cursors[source_num] = SortCursorImpl(
header, current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap<SortCursor>(cursors);
@ -37,7 +34,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
current_input.swap(input);
cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation);
cursors[source_num].reset(current_input.chunk.getColumns(), header, current_input.permutation);
queue.push(cursors[source_num]);
}

View File

@ -10,9 +10,7 @@ namespace DB
class IMergingAlgorithmWithDelayedChunk : public IMergingAlgorithm
{
public:
IMergingAlgorithmWithDelayedChunk(
size_t num_inputs,
SortDescription description_);
IMergingAlgorithmWithDelayedChunk(Block header_, size_t num_inputs, SortDescription description_);
protected:
SortingHeap<SortCursor> queue;
@ -28,6 +26,8 @@ protected:
bool skipLastRowFor(size_t input_number) const { return current_inputs[input_number].skip_last_row; }
private:
Block header;
/// Inputs currently being merged.
Inputs current_inputs;
SortCursorImpls cursors;

View File

@ -4,11 +4,9 @@ namespace DB
{
IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
size_t num_inputs,
SortDescription description_,
WriteBuffer * out_row_sources_buf_,
size_t max_row_refs)
: description(std::move(description_))
Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs)
: header(std::move(header_))
, description(std::move(description_))
, chunk_allocator(num_inputs + max_row_refs)
, cursors(num_inputs)
, sources(num_inputs)
@ -39,7 +37,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.skip_last_row = inputs[source_num].skip_last_row;
source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
cursors[source_num] = SortCursorImpl(header, source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;
@ -55,7 +53,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num
auto & source = sources[source_num];
source.skip_last_row = input.skip_last_row;
source.chunk = chunk_allocator.alloc(input.chunk);
cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation);
cursors[source_num].reset(source.chunk->getColumns(), header, input.permutation);
source.chunk->all_columns = cursors[source_num].all_columns;
source.chunk->sort_columns = cursors[source_num].sort_columns;

View File

@ -10,15 +10,13 @@ class IMergingAlgorithmWithSharedChunks : public IMergingAlgorithm
{
public:
IMergingAlgorithmWithSharedChunks(
size_t num_inputs,
SortDescription description_,
WriteBuffer * out_row_sources_buf_,
size_t max_row_refs);
Block header_, size_t num_inputs, SortDescription description_, WriteBuffer * out_row_sources_buf_, size_t max_row_refs);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
private:
Block header;
SortDescription description;
/// Allocator must be destroyed after source_chunks.

View File

@ -11,30 +11,22 @@ namespace ErrorCodes
}
MergingSortedAlgorithm::MergingSortedAlgorithm(
const Block & header,
Block header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
: header(std::move(header_))
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, description(std::move(description_))
, limit(limit_)
, has_collation(std::any_of(description.begin(), description.end(), [](const auto & descr) { return descr.collator != nullptr; }))
, out_row_sources_buf(out_row_sources_buf_)
, current_inputs(num_inputs)
, cursors(num_inputs)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
{
has_collation |= column_description.collator != nullptr;
if (!column_description.column_name.empty())
{
column_description.column_number = header.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
void MergingSortedAlgorithm::addInput()
@ -65,7 +57,7 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
continue;
prepareChunk(chunk);
cursors[source_num] = SortCursorImpl(chunk.getColumns(), description, source_num);
cursors[source_num] = SortCursorImpl(header, chunk.getColumns(), description, source_num);
}
if (has_collation)
@ -78,7 +70,7 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
{
prepareChunk(input.chunk);
current_inputs[source_num].swap(input);
cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), {});
cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), header);
if (has_collation)
queue_with_collation.push(cursors[source_num]);

View File

@ -14,7 +14,7 @@ class MergingSortedAlgorithm final : public IMergingAlgorithm
{
public:
MergingSortedAlgorithm(
const Block & header,
Block header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
@ -31,6 +31,8 @@ public:
const MergedData & getMergedData() const { return merged_data; }
private:
Block header;
MergedData merged_data;
/// Settings

View File

@ -5,16 +5,18 @@ namespace DB
{
ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & version_column,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
{
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
version_column_number = header_.getPositionByName(version_column);
}
void ReplacingSortedAlgorithm::insertRow()

View File

@ -101,10 +101,10 @@ struct SummingSortedAlgorithm::AggregateDescription
};
static bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
static bool isInPrimaryKey(const SortDescription & description, const std::string & name)
{
for (const auto & desc : description)
if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number))
if (desc.column_name == name)
return true;
return false;
@ -251,7 +251,7 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
}
/// Are they inside the primary key or partition key?
if (isInPrimaryKey(description, column.name, i) || isInPartitionKey(column.name, partition_key_columns))
if (isInPrimaryKey(description, column.name) || isInPartitionKey(column.name, partition_key_columns))
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
@ -307,7 +307,7 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name))
break;
if (column_num_it != map.second.end())
{
@ -687,14 +687,15 @@ Chunk SummingSortedAlgorithm::SummingMergedData::pull()
SummingSortedAlgorithm::SummingSortedAlgorithm(
const Block & header, size_t num_inputs,
const Block & header_,
size_t num_inputs,
SortDescription description_,
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
: IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_))
, columns_definition(defineColumns(header, description, column_names_to_sum, partition_key_columns))
, merged_data(getMergedDataColumns(header, columns_definition), max_block_size, columns_definition)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, std::move(description_))
, columns_definition(defineColumns(header_, description, column_names_to_sum, partition_key_columns))
, merged_data(getMergedDataColumns(header_, columns_definition), max_block_size, columns_definition)
{
}

View File

@ -8,19 +8,20 @@ namespace DB
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
const Block & header_,
size_t num_inputs,
SortDescription description_,
const String & sign_column_,
size_t max_block_size,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(
num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
/// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, current_keys(max_rows_in_queue)
{
sign_column_number = header.getPositionByName(sign_column_);
sign_column_number = header_.getPositionByName(sign_column_);
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)

View File

@ -48,13 +48,13 @@ void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
void FillingStep::describeActions(FormatSettings & settings) const
{
settings.out << String(settings.offset, ' ');
dumpSortDescription(sort_description, input_streams.front().header, settings.out);
dumpSortDescription(sort_description, settings.out);
settings.out << '\n';
}
void FillingStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add("Sort Description", explainSortDescription(sort_description, input_streams.front().header));
map.add("Sort Description", explainSortDescription(sort_description));
}
}

View File

@ -70,4 +70,9 @@ void ITransformingStep::describePipeline(FormatSettings & settings) const
IQueryPlanStep::describePipeline(processors, settings);
}
void ITransformingStep::appendExtraProcessors(const Processors & extra_processors)
{
processors.insert(processors.end(), extra_processors.begin(), extra_processors.end());
}
}

View File

@ -57,6 +57,9 @@ public:
void describePipeline(FormatSettings & settings) const override;
/// Append extra processors for this step.
void appendExtraProcessors(const Processors & extra_processors);
protected:
/// Clear distinct_columns if res_header doesn't contain all of them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);

View File

@ -612,14 +612,8 @@ static void addMergingFinal(
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (const auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
key_columns.push_back(header.getPositionByName(desc.column_name));
pipe.addSimpleTransform([&](const Block & stream_header)
{
@ -774,9 +768,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
Names partition_key_columns = metadata_for_reading->getPartitionKey().column_names;
const auto & header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
sort_description.emplace_back(sort_columns[i], 1, 1);
addMergingFinal(
pipe,

View File

@ -206,17 +206,17 @@ void SortingStep::describeActions(FormatSettings & settings) const
if (!prefix_description.empty())
{
settings.out << prefix << "Prefix sort description: ";
dumpSortDescription(prefix_description, input_streams.front().header, settings.out);
dumpSortDescription(prefix_description, settings.out);
settings.out << '\n';
settings.out << prefix << "Result sort description: ";
dumpSortDescription(result_description, input_streams.front().header, settings.out);
dumpSortDescription(result_description, settings.out);
settings.out << '\n';
}
else
{
settings.out << prefix << "Sort description: ";
dumpSortDescription(result_description, input_streams.front().header, settings.out);
dumpSortDescription(result_description, settings.out);
settings.out << '\n';
}
@ -228,11 +228,11 @@ void SortingStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (!prefix_description.empty())
{
map.add("Prefix Sort Description", explainSortDescription(prefix_description, input_streams.front().header));
map.add("Result Sort Description", explainSortDescription(result_description, input_streams.front().header));
map.add("Prefix Sort Description", explainSortDescription(prefix_description));
map.add("Result Sort Description", explainSortDescription(result_description));
}
else
map.add("Sort Description", explainSortDescription(result_description, input_streams.front().header));
map.add("Sort Description", explainSortDescription(result_description));
if (limit)
map.add("Limit", limit);

View File

@ -129,7 +129,7 @@ void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
}
if (!window_description.order_by.empty())
map.add("Sort Description", explainSortDescription(window_description.order_by, {}));
map.add("Sort Description", explainSortDescription(window_description.order_by));
auto functions_array = std::make_unique<JSONBuilder::JSONArray>();
for (const auto & func : window_functions)

View File

@ -26,7 +26,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
, max_block_size(max_block_size_)
, max_block_bytes(max_block_bytes_)
, params(std::move(params_))
, group_by_description(group_by_description_)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
, variants(*many_data->variants[current_variant])
@ -34,15 +33,8 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
/// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform
res_header = params->getCustomHeader(false);
/// Replace column names to column position in description_sorted.
for (auto & column_description : group_by_description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = res_header.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
for (const auto & column_description : group_by_description_)
group_by_description.emplace_back(column_description, res_header.getPositionByName(column_description.column_name));
}
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;

View File

@ -51,7 +51,7 @@ private:
MutableColumns res_aggregate_columns;
AggregatingTransformParamsPtr params;
SortDescription group_by_description;
SortDescriptionWithPositions group_by_description;
Aggregator::AggregateColumns aggregate_columns;

View File

@ -12,33 +12,13 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
CheckSortedTransform::CheckSortedTransform(
const Block & header_,
const SortDescription & sort_description_)
: ISimpleTransform(header_, header_, false)
, sort_description_map(addPositionsToSortDescriptions(sort_description_))
CheckSortedTransform::CheckSortedTransform(const Block & header, const SortDescription & sort_description)
: ISimpleTransform(header, header, false)
{
for (const auto & column_description : sort_description)
sort_description_map.emplace_back(column_description, header.getPositionByName(column_description.column_name));
}
SortDescriptionsWithPositions
CheckSortedTransform::addPositionsToSortDescriptions(const SortDescription & sort_description)
{
SortDescriptionsWithPositions result;
result.reserve(sort_description.size());
const auto & header = getInputPort().getHeader();
for (SortColumnDescription description_copy : sort_description)
{
if (!description_copy.column_name.empty())
description_copy.column_number = header.getPositionByName(description_copy.column_name);
result.push_back(description_copy);
}
return result;
}
void CheckSortedTransform::transform(Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();
@ -54,7 +34,7 @@ void CheckSortedTransform::transform(Chunk & chunk)
const IColumn * left_col = left[column_number].get();
const IColumn * right_col = right[column_number].get();
int res = elem.direction * left_col->compareAt(left_index, right_index, *right_col, elem.nulls_direction);
int res = elem.base.direction * left_col->compareAt(left_index, right_index, *right_col, elem.base.nulls_direction);
if (res < 0)
{
return;

View File

@ -5,16 +5,12 @@
namespace DB
{
using SortDescriptionsWithPositions = std::vector<SortColumnDescription>;
/// Streams checks that flow of blocks is sorted in the sort_description order
/// Othrewise throws exception in readImpl function.
class CheckSortedTransform : public ISimpleTransform
{
public:
CheckSortedTransform(
const Block & header_,
const SortDescription & sort_description_);
CheckSortedTransform(const Block & header, const SortDescription & sort_description);
String getName() const override { return "CheckSortedTransform"; }
@ -23,10 +19,7 @@ protected:
void transform(Chunk & chunk) override;
private:
SortDescriptionsWithPositions sort_description_map;
SortDescriptionWithPositions sort_description_map;
Columns last_row;
/// Just checks, that all sort_descriptions has column_number
SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description);
};
}

View File

@ -9,8 +9,9 @@ namespace ErrorCodes
}
DistinctSortedTransform::DistinctSortedTransform(
const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
: ISimpleTransform(header, header, true)
Block header_, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns)
: ISimpleTransform(header_, header_, true)
, header(std::move(header_))
, description(std::move(sort_description))
, columns_names(columns)
, limit_hint(limit_hint_)
@ -24,7 +25,7 @@ void DistinctSortedTransform::transform(Chunk & chunk)
if (column_ptrs.empty())
return;
ColumnRawPtrs clearing_hint_columns(getClearingColumns(chunk, column_ptrs));
ColumnRawPtrs clearing_hint_columns(getClearingColumns(column_ptrs));
if (data.type == ClearableSetVariants::Type::EMPTY)
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
@ -139,13 +140,13 @@ ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
return column_ptrs;
}
ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const
ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const ColumnRawPtrs & key_columns) const
{
ColumnRawPtrs clearing_hint_columns;
clearing_hint_columns.reserve(description.size());
for (const auto & sort_column_description : description)
{
const auto * sort_column_ptr = chunk.getColumns().at(sort_column_description.column_number).get();
const auto * sort_column_ptr = header.getByName(sort_column_description.column_name).column.get();
const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr);
if (it != key_columns.cend()) /// if found in key_columns
clearing_hint_columns.emplace_back(sort_column_ptr);

View File

@ -22,7 +22,8 @@ class DistinctSortedTransform : public ISimpleTransform
{
public:
/// Empty columns_ means all columns.
DistinctSortedTransform(const Block & header, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
DistinctSortedTransform(
Block header_, SortDescription sort_description, const SizeLimits & set_size_limits_, UInt64 limit_hint_, const Names & columns);
String getName() const override { return "DistinctSortedTransform"; }
@ -33,7 +34,7 @@ private:
ColumnRawPtrs getKeyColumns(const Chunk & chunk) const;
/// When clearing_columns changed, we can clean HashSet to memory optimization
/// clearing_columns is a left-prefix of SortDescription exists in key_columns
ColumnRawPtrs getClearingColumns(const Chunk & chunk, const ColumnRawPtrs & key_columns) const;
ColumnRawPtrs getClearingColumns(const ColumnRawPtrs & key_columns) const;
static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m);
/// return true if has new data
@ -46,6 +47,7 @@ private:
size_t rows,
ClearableSetVariants & variants) const;
Block header;
SortDescription description;
struct PreviousChunk

View File

@ -21,9 +21,11 @@ static bool isPrefix(const SortDescription & pref_descr, const SortDescription &
}
FinishSortingTransform::FinishSortingTransform(
const Block & header, const SortDescription & description_sorted_,
const Block & header,
const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, UInt64 limit_)
size_t max_merged_block_size_,
UInt64 limit_)
: SortingTransform(header, description_to_sort_, max_merged_block_size_, limit_)
{
/// Check for sanity non-modified descriptions
@ -34,7 +36,8 @@ FinishSortingTransform::FinishSortingTransform(
/// The target description is modified in SortingTransform constructor.
/// To avoid doing the same actions with description_sorted just copy it from prefix of target description.
size_t prefix_size = description_sorted_.size();
description_sorted.assign(description.begin(), description.begin() + prefix_size);
for (size_t i = 0; i < prefix_size; ++i)
description_with_positions.emplace_back(description[i], header_without_constants.getPositionByName(description[i].column_name));
}
void FinishSortingTransform::consume(Chunk chunk)
@ -62,7 +65,7 @@ void FinishSortingTransform::consume(Chunk chunk)
while (high - low > 1)
{
ssize_t mid = (low + high) / 2;
if (!less(last_chunk.getColumns(), chunk.getColumns(), last_chunk.getNumRows() - 1, mid, description_sorted))
if (!less(last_chunk.getColumns(), chunk.getColumns(), last_chunk.getNumRows() - 1, mid, description_with_positions))
low = mid;
else
high = mid;
@ -100,7 +103,8 @@ void FinishSortingTransform::generate()
{
if (!merge_sorter)
{
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
generated_prefix = true;
}

View File

@ -11,9 +11,12 @@ class FinishSortingTransform : public SortingTransform
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
FinishSortingTransform(const Block & header, const SortDescription & description_sorted_,
FinishSortingTransform(
const Block & header,
const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, UInt64 limit_);
size_t max_merged_block_size_,
UInt64 limit_);
String getName() const override { return "FinishSortingTransform"; }
@ -22,7 +25,7 @@ protected:
void generate() override;
private:
SortDescription description_sorted;
SortDescriptionWithPositions description_with_positions;
Chunk tail_chunk;
};

View File

@ -90,16 +90,21 @@ private:
MergeSortingTransform::MergeSortingTransform(
const Block & header,
const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: SortingTransform(header, description_, max_merged_block_size_, limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_) {}
, max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
}
Processors MergeSortingTransform::expandPipeline()
{
@ -180,7 +185,8 @@ void MergeSortingTransform::consume(Chunk chunk)
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
processors.emplace_back(current_processor);
@ -223,7 +229,8 @@ void MergeSortingTransform::generate()
if (!generated_prefix)
{
if (temporary_files.empty())
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
else
{
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
@ -251,7 +258,7 @@ void MergeSortingTransform::remerge()
LOG_DEBUG(log, "Re-merging intermediate ORDER BY data ({} blocks with {} rows) to save memory consumption", chunks.size(), sum_rows_in_blocks);
/// NOTE Maybe concat all blocks and partial sort will be faster than merge?
MergeSorter remerge_sorter(std::move(chunks), description, max_merged_block_size, limit);
MergeSorter remerge_sorter(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
Chunks new_chunks;
size_t new_sum_rows_in_blocks = 0;

View File

@ -18,13 +18,16 @@ class MergeSortingTransform : public SortingTransform
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingTransform(const Block & header,
const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t min_free_disk_space_);
MergeSortingTransform(
const Block & header,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSortingTransform"; }

View File

@ -22,9 +22,7 @@ static ColumnRawPtrs extractColumns(const Block & block, const SortDescription &
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
const IColumn * column = block.getByName(description[i].column_name).column.get();
res.emplace_back(column);
}

View File

@ -22,7 +22,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
MergeSorter::MergeSorter(const Block & header, Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Chunks nonempty_chunks;
@ -36,7 +36,7 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t
/// which can be inefficient.
convertToFullIfSparse(chunk);
cursors.emplace_back(chunk.getColumns(), description);
cursors.emplace_back(header, chunk.getColumns(), description);
has_collation |= cursors.back().has_collation;
nonempty_chunks.emplace_back(std::move(chunk));
@ -139,16 +139,6 @@ SortingTransform::SortingTransform(
{
const auto & sample = inputs.front().getHeader();
/// Replace column names to column position in sort_description.
for (auto & column_description : description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = sample.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
/// Remove constants from header and map old indexes to new.
size_t num_columns = sample.columns();
ColumnNumbers map(num_columns, num_columns);
@ -169,13 +159,10 @@ SortingTransform::SortingTransform(
description_without_constants.reserve(description.size());
for (const auto & column_description : description)
{
auto old_pos = column_description.column_number;
auto old_pos = header.getPositionByName(column_description.column_name);
auto new_pos = map[old_pos];
if (new_pos < num_columns)
{
description_without_constants.push_back(column_description);
description_without_constants.back().column_number = new_pos;
}
}
description.swap(description_without_constants);

View File

@ -15,7 +15,7 @@ namespace DB
class MergeSorter
{
public:
MergeSorter(Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_);
MergeSorter(const Block & header, Chunks chunks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_);
Chunk read();
@ -45,8 +45,10 @@ private:
class MergeSorterSource : public ISource
{
public:
MergeSorterSource(Block header, Chunks chunks, SortDescription & description, size_t max_merged_block_size, UInt64 limit)
: ISource(std::move(header)), merge_sorter(std::move(chunks), description, max_merged_block_size, limit) {}
MergeSorterSource(const Block & header, Chunks chunks, SortDescription & description, size_t max_merged_block_size, UInt64 limit)
: ISource(header), merge_sorter(header, std::move(chunks), description, max_merged_block_size, limit)
{
}
String getName() const override { return "MergeSorterSource"; }

View File

@ -1,5 +1,6 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
@ -307,7 +308,15 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
right->pipe.dropExtremes();
left->pipe.collected_processors = collected_processors;
right->pipe.collected_processors = collected_processors;
/// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right);
/// Remember the last step of the right pipeline.
ExpressionStep* step = typeid_cast<ExpressionStep*>(right->pipe.processors.back()->getQueryPlanStep());
if (!step)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "The top step of the right pipeline should be ExpressionStep");
}
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;
@ -377,6 +386,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
left->pipe.processors.emplace_back(std::move(joining));
}
/// Move the collected processors to the last step in the right pipeline.
Processors processors = collector.detachProcessors();
step->appendExtraProcessors(processors);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();

View File

@ -696,22 +696,24 @@ namespace
/// The function works for Arrays and Nullables of the same structure.
bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
{
if (from->equals(*to))
return true;
if (const auto * from_enum8 = typeid_cast<const DataTypeEnum8 *>(from))
auto is_compatible_enum_types_conversion = [](const IDataType * from_type, const IDataType * to_type)
{
if (const auto * to_enum8 = typeid_cast<const DataTypeEnum8 *>(to))
return to_enum8->contains(*from_enum8);
}
if (const auto * from_enum8 = typeid_cast<const DataTypeEnum8 *>(from_type))
{
if (const auto * to_enum8 = typeid_cast<const DataTypeEnum8 *>(to_type))
return to_enum8->contains(*from_enum8);
}
if (const auto * from_enum16 = typeid_cast<const DataTypeEnum16 *>(from))
{
if (const auto * to_enum16 = typeid_cast<const DataTypeEnum16 *>(to))
return to_enum16->contains(*from_enum16);
}
if (const auto * from_enum16 = typeid_cast<const DataTypeEnum16 *>(from_type))
{
if (const auto * to_enum16 = typeid_cast<const DataTypeEnum16 *>(to_type))
return to_enum16->contains(*from_enum16);
}
static const std::unordered_multimap<std::type_index, const std::type_info &> ALLOWED_CONVERSIONS =
return false;
};
static const std::unordered_multimap<std::type_index, const std::type_info &> allowed_conversions =
{
{ typeid(DataTypeEnum8), typeid(DataTypeInt8) },
{ typeid(DataTypeEnum16), typeid(DataTypeInt16) },
@ -721,12 +723,19 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
{ typeid(DataTypeUInt16), typeid(DataTypeDate) },
};
/// Unwrap some nested and check for valid conevrsions
while (true)
{
/// types are equal, obviously pure metadata alter
if (from->equals(*to))
return true;
auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from));
/// We just adding something to enum, nothing changed on disk
if (is_compatible_enum_types_conversion(from, to))
return true;
/// Types changed, but representation on disk didn't
auto it_range = allowed_conversions.equal_range(typeid(*from));
for (auto it = it_range.first; it != it_range.second; ++it)
{
if (it->second == typeid(*to))
@ -1046,8 +1055,12 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
if (!all_columns.has(column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(column_name) + " to modify",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK};
{
String exception_message = fmt::format("Wrong column. Cannot find column {} to modify", backQuote(column_name));
all_columns.appendHintsMessage(exception_message, column_name);
throw Exception{exception_message,
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK};
}
else
continue;
}
@ -1152,17 +1165,22 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
all_columns.remove(command.column_name);
}
else if (!command.if_exists)
throw Exception(
"Wrong column name. Cannot find column " + backQuote(command.column_name) + " to drop",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
{
String exception_message = fmt::format("Wrong column name. Cannot find column {} to drop", backQuote(command.column_name));
all_columns.appendHintsMessage(exception_message, command.column_name);
throw Exception(exception_message, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
else if (command.type == AlterCommand::COMMENT_COLUMN)
{
if (!all_columns.has(command.column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(command.column_name) + " to comment",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK};
{
String exception_message = fmt::format("Wrong column name. Cannot find column {} to comment", backQuote(command.column_name));
all_columns.appendHintsMessage(exception_message, command.column_name);
throw Exception(exception_message, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
}
else if (command.type == AlterCommand::MODIFY_SETTING || command.type == AlterCommand::RESET_SETTING)
@ -1196,8 +1214,11 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
if (!all_columns.has(command.column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(command.column_name) + " to rename",
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK};
{
String exception_message = fmt::format("Wrong column name. Cannot find column {} to rename", backQuote(command.column_name));
all_columns.appendHintsMessage(exception_message, command.column_name);
throw Exception(exception_message, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
else
continue;
}

View File

@ -122,7 +122,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
if (col_ast->default_expression)
{
default_desc.kind = columnDefaultKindFromString(col_ast->default_specifier);
default_desc.expression = col_ast->default_expression;
default_desc.expression = std::move(col_ast->default_expression);
}
if (col_ast->comment)
@ -230,8 +230,11 @@ void ColumnsDescription::remove(const String & column_name)
{
auto range = getNameRange(columns, column_name);
if (range.first == range.second)
throw Exception("There is no column " + column_name + " in table.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
{
String exception_message = fmt::format("There is no column {} in table", column_name);
appendHintsMessage(exception_message, column_name);
throw Exception(exception_message, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}
for (auto list_it = range.first; list_it != range.second;)
{
@ -244,7 +247,11 @@ void ColumnsDescription::rename(const String & column_from, const String & colum
{
auto it = columns.get<1>().find(column_from);
if (it == columns.get<1>().end())
throw Exception("Cannot find column " + column_from + " in ColumnsDescription", ErrorCodes::LOGICAL_ERROR);
{
String exception_message = fmt::format("Cannot find column {} in ColumnsDescription", column_from);
appendHintsMessage(exception_message, column_from);
throw Exception(exception_message, ErrorCodes::LOGICAL_ERROR);
}
columns.get<1>().modify_key(it, [&column_to] (String & old_name)
{
@ -745,6 +752,18 @@ void ColumnsDescription::removeSubcolumns(const String & name_in_storage)
subcolumns.get<1>().erase(range.first, range.second);
}
std::vector<String> ColumnsDescription::getAllRegisteredNames() const
{
std::vector<String> names;
names.reserve(columns.size());
for (const auto & column : columns)
{
if (column.name.find('.') == std::string::npos)
names.push_back(column.name);
}
return names;
}
Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const NamesAndTypesList & all_columns, ContextPtr context)
{
for (const auto & child : default_expr_list->children)

View File

@ -91,7 +91,7 @@ struct ColumnDescription
/// Description of multiple table columns (in CREATE TABLE for example).
class ColumnsDescription
class ColumnsDescription : public IHints<1, ColumnsDescription>
{
public:
ColumnsDescription() = default;
@ -149,7 +149,11 @@ public:
{
auto it = columns.get<1>().find(column_name);
if (it == columns.get<1>().end())
throw Exception("Cannot find column " + column_name + " in ColumnsDescription", ErrorCodes::LOGICAL_ERROR);
{
String exception_message = fmt::format("Cannot find column {} in ColumnsDescription", column_name);
appendHintsMessage(exception_message, column_name);
throw Exception(exception_message, ErrorCodes::LOGICAL_ERROR);
}
removeSubcolumns(it->name);
if (!columns.get<1>().modify(it, std::forward<F>(f)))
@ -196,6 +200,8 @@ public:
return columns.empty();
}
std::vector<String> getAllRegisteredNames() const override;
/// Keep the sequence of columns and allow to lookup by name.
using ColumnsContainer = boost::multi_index_container<
ColumnDescription,

View File

@ -74,7 +74,6 @@ struct IndicesDescription : public std::vector<IndexDescription>, IHints<1, Indi
/// Return common expression for all stored indices
ExpressionActionsPtr getSingleExpressionForIndices(const ColumnsDescription & columns, ContextPtr context) const;
public:
Names getAllRegisteredNames() const override;
};

View File

@ -779,11 +779,16 @@ void registerStorageKafka(StorageFactory & factory)
#undef CHECK_KAFKA_STORAGE_ARGUMENT
auto num_consumers = kafka_settings->kafka_num_consumers.value;
auto physical_cpu_cores = getNumberOfPhysicalCPUCores();
auto max_consumers = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
if (num_consumers > physical_cpu_cores)
if (num_consumers > max_consumers)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of consumers can not be bigger than {}", physical_cpu_cores);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The number of consumers can not be bigger than {}. "
"A single consumer can read any number of partitions. Extra consumers are relatively expensive, "
"and using a lot of them can lead to high memory and CPU usage. To achieve better performance "
"of getting data from Kafka, consider using a setting kafka_thread_per_consumer=1, "
"and ensure you have enough threads in MessageBrokerSchedulePool (background_message_broker_schedule_pool_size). "
"See also https://clickhouse.com/docs/integrations/kafka/kafka-table-engine#tuning-performance", max_consumers);
}
else if (num_consumers < 1)
{

View File

@ -784,7 +784,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
sort_description.emplace_back(sort_columns[i], 1, 1);
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,

View File

@ -2026,6 +2026,7 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
const auto & settings = local_context->getSettingsRef();
const auto & settings_from_storage = getSettings();
if (!settings.allow_non_metadata_alters)
{
@ -2216,6 +2217,14 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
dropped_columns.emplace(command.column_name);
}
else if (command.type == AlterCommand::RESET_SETTING)
{
for (const auto & reset_setting : command.settings_resets)
{
if (!settings_from_storage->has(reset_setting))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot reset setting '{}' because it doesn't exist for MergeTree engines family", reset_setting);
}
}
else if (command.isRequireMutationStage(getInMemoryMetadata()))
{
/// This alter will override data on disk. Let's check that it doesn't

View File

@ -81,7 +81,7 @@ struct MergeTreeDataPartTTLInfos
bool empty() const
{
/// part_min_ttl in minimum of rows, rows_where and group_by TTLs
return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty();
return !part_min_ttl && moves_ttl.empty() && recompression_ttl.empty() && columns_ttl.empty();
}
};

View File

@ -146,7 +146,7 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -283,16 +283,12 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
{
TemporaryPart temp_part;
Block & block = block_with_partition.block;
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context);
if (!storage_snapshot->object_columns.empty())
{
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(columns, block, extended_storage_columns);
}
for (auto & column : columns)
if (isObject(column.type))
column.type = block.getByName(column.name).type;
static const String TMP_PREFIX = "tmp_insert_";
@ -334,7 +330,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
sort_description.emplace_back(sort_columns[i], 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
@ -468,6 +464,16 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
return temp_part;
}
void MergeTreeDataWriter::deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
}
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name,
MergeTreeDataPartType part_type,
@ -523,7 +529,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
sort_description.emplace_back(sort_columns[i], 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterBlocks);

View File

@ -42,14 +42,12 @@ public:
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
static void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block);
/// This structure contains not completely written temporary part.
/// Some writes may happen asynchronously, e.g. for blob storages.
/// You should call finalize() to wait until all data is written.
struct TemporaryPart
{
MergeTreeData::MutableDataPartPtr part;
@ -65,6 +63,9 @@ public:
void finalize();
};
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/// For insertion.

View File

@ -50,7 +50,9 @@ struct MergeTreeSink::DelayedChunk
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, context);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;

View File

@ -150,7 +150,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
if (quorum)
checkQuorumPrecondition(zookeeper);
const Settings & settings = context->getSettingsRef();
auto storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, context);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
@ -158,6 +159,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
size_t streams = 0;
bool support_parallel_write = false;
const Settings & settings = context->getSettingsRef();
for (auto & current_block : part_blocks)
{

View File

@ -335,7 +335,11 @@ const ProjectionDescription & ProjectionsDescription::get(const String & project
{
auto it = map.find(projection_name);
if (it == map.end())
throw Exception("There is no projection " + projection_name + " in table", ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE);
{
String exception_message = fmt::format("There is no projection {} in table", projection_name);
appendHintsMessage(exception_message, projection_name);
throw Exception(exception_message, ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE);
}
return *(it->second);
}
@ -376,13 +380,25 @@ void ProjectionsDescription::remove(const String & projection_name, bool if_exis
{
if (if_exists)
return;
throw Exception("There is no projection " + projection_name + " in table.", ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE);
String exception_message = fmt::format("There is no projection {} in table", projection_name);
appendHintsMessage(exception_message, projection_name);
throw Exception(exception_message, ErrorCodes::NO_SUCH_PROJECTION_IN_TABLE);
}
projections.erase(it->second);
map.erase(it);
}
std::vector<String> ProjectionsDescription::getAllRegisteredNames() const
{
std::vector<String> names;
names.reserve(map.size());
for (const auto & pair : map)
names.push_back(pair.first);
return names;
}
ExpressionActionsPtr
ProjectionsDescription::getSingleExpressionForProjections(const ColumnsDescription & columns, ContextPtr query_context) const
{

View File

@ -106,7 +106,7 @@ struct ProjectionDescription
using ProjectionDescriptionRawPtr = const ProjectionDescription *;
/// All projections in storage
struct ProjectionsDescription
struct ProjectionsDescription : public IHints<1, ProjectionsDescription>
{
ProjectionsDescription() = default;
ProjectionsDescription(ProjectionsDescription && other) = default;
@ -138,6 +138,8 @@ struct ProjectionsDescription
add(ProjectionDescription && projection, const String & after_projection = String(), bool first = false, bool if_not_exists = false);
void remove(const String & projection_name, bool if_exists);
std::vector<String> getAllRegisteredNames() const override;
private:
/// Keep the sequence of columns and allow to lookup by name.
using Container = std::list<ProjectionDescription>;

View File

@ -44,7 +44,7 @@ class StorageDistributed final : public shared_ptr_helper<StorageDistributed>, p
friend class StorageSystemDistributionQueue;
public:
virtual ~StorageDistributed() override;
~StorageDistributed() override;
std::string getName() const override { return "Distributed"; }

View File

@ -138,11 +138,10 @@ public:
storage_snapshot->metadata->check(block, true);
if (!storage_snapshot->object_columns.empty())
{
auto columns = storage_snapshot->metadata->getColumns().getAllPhysical().filter(block.getNames());
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(columns, block, extended_storage_columns);
convertObjectsToTuples(block, extended_storage_columns);
}
if (storage.compress)

View File

@ -91,8 +91,8 @@ SHOW CREATE TABLE table_for_reset_setting;
ALTER TABLE table_for_reset_setting RESET SETTING index_granularity; -- { serverError 472 }
-- ignore undefined setting
ALTER TABLE table_for_reset_setting RESET SETTING merge_with_ttl_timeout, unknown_setting;
-- don't execute alter with incorrect setting
ALTER TABLE table_for_reset_setting RESET SETTING merge_with_ttl_timeout, unknown_setting; -- { serverError 36 }
ALTER TABLE table_for_reset_setting MODIFY SETTING merge_with_ttl_timeout = 300, max_concurrent_queries = 1;

View File

@ -108,8 +108,8 @@ ATTACH TABLE replicated_table_for_reset_setting1;
SHOW CREATE TABLE replicated_table_for_reset_setting1;
SHOW CREATE TABLE replicated_table_for_reset_setting2;
-- ignore undefined setting
ALTER TABLE replicated_table_for_reset_setting1 RESET SETTING check_delay_period, unknown_setting;
-- don't execute alter with incorrect setting
ALTER TABLE replicated_table_for_reset_setting1 RESET SETTING check_delay_period, unknown_setting; -- { serverError 36 }
ALTER TABLE replicated_table_for_reset_setting1 RESET SETTING merge_with_ttl_timeout;
ALTER TABLE replicated_table_for_reset_setting2 RESET SETTING merge_with_ttl_timeout;

View File

@ -109,3 +109,8 @@ FROM (SELECT toUInt8(number) > 0 as pk, toNullable(toUInt8(number)) as dt FROM n
ASOF JOIN (SELECT 1 as pk, toNullable(0) as dt) b
ON a.dt >= b.dt AND a.pk = b.pk
ORDER BY a.dt; -- { serverError 48 }
SELECT *
FROM (SELECT NULL AS y, 1 AS x, '2020-01-01 10:10:10' :: DateTime64 AS t) AS t1
ASOF LEFT JOIN (SELECT NULL AS y, 1 AS x, '2020-01-01 10:10:10' :: DateTime64 AS t) AS t2
ON t1.t <= t2.t AND t1.x == t2.x FORMAT Null;

View File

@ -0,0 +1,2 @@
{"id":1,"obj":{"k1":"v1","k2":""}}
{"id":2,"obj":{"k1":"","k2":"v2"}}

View File

@ -0,0 +1,15 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_partitions;
SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
CREATE TABLE t_json_partitions (id UInt32, obj JSON)
ENGINE MergeTree ORDER BY id PARTITION BY id;
INSERT INTO t_json_partitions FORMAT JSONEachRow {"id": 1, "obj": {"k1": "v1"}} {"id": 2, "obj": {"k2": "v2"}};
SELECT * FROM t_json_partitions ORDER BY id FORMAT JSONEachRow;
DROP TABLE t_json_partitions;

View File

@ -1,5 +1,5 @@
INSERT INTO test FROM INFILE data.file SELECT x
INSERT INTO test FROM INFILE \'data.file\' SELECT x
FROM input(\'x UInt32\')
INSERT INTO test FROM INFILE data.file WITH number AS x
INSERT INTO test FROM INFILE \'data.file\' WITH number AS x
SELECT number
FROM input(\'number UInt32\')

View File

@ -0,0 +1,19 @@
(Expression)
ExpressionTransform
(Join)
JoiningTransform 2 → 1
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1
(Expression)
FillingRightJoinSide
ExpressionTransform
(SettingQuotaAndLimits)
(Limit)
Limit
(ReadFromStorage)
Numbers 0 → 1

View File

@ -0,0 +1,10 @@
EXPLAIN PIPELINE
SELECT * FROM
(
SELECT * FROM system.numbers LIMIT 10
) t1
ALL LEFT JOIN
(
SELECT * FROM system.numbers LIMIT 10
) t2
USING number;

View File

@ -0,0 +1,3 @@
OK
OK
OK

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS t"
$CLICKHOUSE_CLIENT --query="CREATE TABLE t (CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192)"
$CLICKHOUSE_CLIENT --query="ALTER TABLE t DROP COLUMN ToDro" 2>&1 | grep -q "Maybe you meant: \['ToDrop'\]" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --query="ALTER TABLE t MODIFY COLUMN ToDro UInt64" 2>&1 | grep -q "Maybe you meant: \['ToDrop'\]" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --query="ALTER TABLE t RENAME COLUMN ToDro to ToDropp" 2>&1 | grep -q "Maybe you meant: \['ToDrop'\]" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --query="DROP TABLE t"

View File

@ -0,0 +1 @@
OK

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS t"
$CLICKHOUSE_CLIENT --query="create table t (x Int32, y Int32, projection pToDrop (select x, y order by x)) engine = MergeTree order by y;"
$CLICKHOUSE_CLIENT --query="ALTER TABLE t DROP PROJECTION pToDro" 2>&1 | grep -q "Maybe you meant: \['pToDrop'\]" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT --query="DROP TABLE t"

View File

@ -0,0 +1,7 @@
1 ['Option2','Option1']
2 ['Option1']
3 ['Option1','Option3']
1 ['Option2','Option1']
2 ['Option1']
3 ['Option1','Option3']
0

View File

@ -0,0 +1,27 @@
DROP TABLE IF EXISTS alter_enum_array;
CREATE TABLE alter_enum_array(
Key UInt64,
Value Array(Enum8('Option1'=1, 'Option2'=2))
)
ENGINE=MergeTree()
ORDER BY tuple();
INSERT INTO alter_enum_array VALUES (1, ['Option2', 'Option1']), (2, ['Option1']);
ALTER TABLE alter_enum_array MODIFY COLUMN Value Array(Enum8('Option1'=1, 'Option2'=2, 'Option3'=3)) SETTINGS mutations_sync=2;
INSERT INTO alter_enum_array VALUES (3, ['Option1','Option3']);
SELECT * FROM alter_enum_array ORDER BY Key;
DETACH TABLE alter_enum_array;
ATTACH TABLE alter_enum_array;
SELECT * FROM alter_enum_array ORDER BY Key;
OPTIMIZE TABLE alter_enum_array FINAL;
SELECT COUNT() FROM system.mutations where table='alter_enum_array' and database=currentDatabase();
DROP TABLE IF EXISTS alter_enum_array;

View File

@ -0,0 +1,13 @@
DROP TABLE IF EXISTS most_ordinary_mt;
CREATE TABLE most_ordinary_mt
(
Key UInt64
)
ENGINE = MergeTree()
ORDER BY tuple();
ALTER TABLE most_ordinary_mt RESET SETTING ttl; --{serverError 36}
ALTER TABLE most_ordinary_mt RESET SETTING allow_remote_fs_zero_copy_replication, xxx; --{serverError 36}
DROP TABLE IF EXISTS most_ordinary_mt;

View File

@ -0,0 +1 @@
1 0

View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-ordinary-database
# ^^^^^^^^^^^
# Since the underlying view may disappears while flushing log, and leads to:
#
# DB::Exception: Table test_x449vo..inner_id.9c14fb82-e6b1-4d1a-85a6-935c3a2a2029 is dropped. (TABLE_IS_DROPPED)
#
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# regression test for columns TTLs
# note, that this should be written in .sh since we need $CLICKHOUSE_DATABASE
# not 'default' to catch text_log
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists ttl_02262;
drop table if exists this_text_log;
create table ttl_02262 (date Date, key Int, value String TTL date + interval 1 month) engine=MergeTree order by key;
insert into ttl_02262 values ('2010-01-01', 2010, 'foo');
optimize table ttl_02262 final;
detach table ttl_02262;
attach table ttl_02262;
-- create system.text_log
system flush logs;
"
ttl_02262_uuid=$($CLICKHOUSE_CLIENT -q "select uuid from system.tables where database = '$CLICKHOUSE_DATABASE' and name = 'ttl_02262'")
$CLICKHOUSE_CLIENT -nm -q "
-- OPTIMIZE TABLE x FINAL will be done in background
-- attach to it's log, via table UUID in query_id (see merger/mutator code).
create materialized view this_text_log engine=Memory() as
select * from system.text_log where query_id like '%${ttl_02262_uuid}%';
optimize table ttl_02262 final;
system flush logs;
-- If TTL will be applied again (during OPTIMIZE TABLE FINAL) it will produce the following message:
--
-- Some TTL values were not calculated for part 201701_487_641_3. Will calculate them forcefully during merge.
--
-- Let's ensure that this is not happen anymore:
select count()>0, countIf(message LIKE '%TTL%') from this_text_log;
drop table ttl_02262;
drop table this_text_log;
"

View File

@ -0,0 +1,3 @@
-- { echo }
EXPLAIN SYNTAX INSERT INTO foo FROM INFILE '/dev/null' COMPRESSION 'gz';
INSERT INTO foo FROM INFILE \'/dev/null\' COMPRESSION \'gz\'

View File

@ -0,0 +1,2 @@
-- { echo }
EXPLAIN SYNTAX INSERT INTO foo FROM INFILE '/dev/null' COMPRESSION 'gz';

View File

@ -0,0 +1,3 @@
-- { echo }
EXPLAIN SYNTAX INSERT INTO foo FROM INFILE '/dev/null';
INSERT INTO foo FROM INFILE \'/dev/null\'

View File

@ -0,0 +1,2 @@
-- { echo }
EXPLAIN SYNTAX INSERT INTO foo FROM INFILE '/dev/null';

View File

@ -67,22 +67,17 @@
});
}
(function (d, w, c) {
(w[c] = w[c] || []).push(function() {
var is_single_page = $('html').attr('data-single-page') === 'true';
if (!is_single_page) {
$('head').each(function(_, element) {
$(element).append(
'<script async src="https://www.googletagmanager.com/gtag/js?id=G-KF1LLRTQ5Q"></script><script>window.dataLayer = window.dataLayer || [];function gtag(){dataLayer.push(arguments);}gtag(\'js\', new Date());gtag(\'config\', \'G-KF1LLRTQ5Q\');</script>'
);
$(element).append(
'<script>!function(){var analytics=window.analytics=window.analytics||[];if(!analytics.initialize)if(analytics.invoked)window.console&&console.error&&console.error("Segment snippet included twice.");else{analytics.invoked=!0;analytics.methods=["trackSubmit","trackClick","trackLink","trackForm","pageview","identify","reset","group","track","ready","alias","debug","page","once","off","on","addSourceMiddleware","addIntegrationMiddleware","setAnonymousId","addDestinationMiddleware"];analytics.factory=function(e){return function(){var t=Array.prototype.slice.call(arguments);t.unshift(e);analytics.push(t);return analytics}};for(var e=0;e<analytics.methods.length;e++){var key=analytics.methods[e];analytics[key]=analytics.factory(key)}analytics.load=function(key,e){var t=document.createElement("script");t.type="text/javascript";t.async=!0;t.src="https://cdn.segment.com/analytics.js/v1/" + key + "/analytics.min.js";var n=document.getElementsByTagName("script")[0];n.parentNode.insertBefore(t,n);analytics._loadOptions=e};analytics._writeKey="dZuEnmCPmWqDuSEzCvLUSBBRt8Xrh2el";;analytics.SNIPPET_VERSION="4.15.3";analytics.load("dZuEnmCPmWqDuSEzCvLUSBBRt8Xrh2el");analytics.page();}}();</script>'
);
});
}
var is_single_page = $('html').attr('data-single-page') === 'true';
if (!is_single_page) {
$('head').each(function (_, element) {
$(element).append(
'<script async src="https://www.googletagmanager.com/gtag/js?id=G-KF1LLRTQ5Q"></script><script>window.dataLayer = window.dataLayer || [];function gtag(){dataLayer.push(arguments);}gtag(\'js\', new Date());gtag(\'config\', \'G-KF1LLRTQ5Q\');</script>'
);
$(element).append(
'<script>!function(){var analytics=window.analytics=window.analytics||[];if(!analytics.initialize)if(analytics.invoked)window.console&&console.error&&console.error("Segment snippet included twice.");else{analytics.invoked=!0;analytics.methods=["trackSubmit","trackClick","trackLink","trackForm","pageview","identify","reset","group","track","ready","alias","debug","page","once","off","on","addSourceMiddleware","addIntegrationMiddleware","setAnonymousId","addDestinationMiddleware"];analytics.factory=function(e){return function(){var t=Array.prototype.slice.call(arguments);t.unshift(e);analytics.push(t);return analytics}};for(var e=0;e<analytics.methods.length;e++){var key=analytics.methods[e];analytics[key]=analytics.factory(key)}analytics.load=function(key,e){var t=document.createElement("script");t.type="text/javascript";t.async=!0;t.src="https://cdn.segment.com/analytics.js/v1/" + key + "/analytics.min.js";var n=document.getElementsByTagName("script")[0];n.parentNode.insertBefore(t,n);analytics._loadOptions=e};analytics._writeKey="dZuEnmCPmWqDuSEzCvLUSBBRt8Xrh2el";;analytics.SNIPPET_VERSION="4.15.3";analytics.load("dZuEnmCPmWqDuSEzCvLUSBBRt8Xrh2el");analytics.page();}}();</script>'
);
});
})(document, window, "");
}
var beforePrint = function() {
var details = document.getElementsByTagName("details");