Merge remote-tracking branch 'upstream/master' into METR-23466

This commit is contained in:
proller 2016-11-24 21:25:56 +03:00
commit e855c16f4a
27 changed files with 697 additions and 163 deletions

View File

@ -298,6 +298,7 @@ add_library (dbms
include/DB/DataStreams/SquashingTransform.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataStreams/SquashingBlockOutputStream.h
include/DB/DataStreams/ColumnGathererStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
include/DB/DataTypes/DataTypeSet.h
@ -813,6 +814,7 @@ add_library (dbms
src/DataStreams/SquashingTransform.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataStreams/SquashingBlockOutputStream.cpp
src/DataStreams/ColumnGathererStream.cpp
src/DataTypes/DataTypeString.cpp
src/DataTypes/DataTypeFixedString.cpp

View File

@ -35,9 +35,9 @@ struct SortColumnDescription
using SortDescription = std::vector<SortColumnDescription>;
/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
* Курсор двигается по одному блоку.
* Для использования в priority queue.
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
* It is used in priority queue.
*/
struct SortCursorImpl
{
@ -48,14 +48,17 @@ struct SortCursorImpl
size_t pos = 0;
size_t rows = 0;
/** Порядок (что сравнивается), если сравниваемые столбцы равны.
* Даёт возможность предпочитать строки из нужного курсора.
/** Determines order if comparing columns are equal.
* Order is determined by number of cursor.
*
* Cursor number (always?) equals to number of merging part.
* Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
*/
size_t order;
using NeedCollationFlags = std::vector<UInt8>;
/** Нужно ли использовать Collator для сортировки столбца */
/** Should we use Collator to sort a column? */
NeedCollationFlags need_collation;
/** Есть ли хотя бы один столбец с Collator. */

View File

@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_),
sign_column(sign_column_)
{
}
@ -62,7 +62,7 @@ private:
/// Прочитали до конца.
bool finished = false;
RowRef current_key; /// Текущий первичный ключ.
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
RowRef first_negative; /// Первая отрицательная строка для текущего первичного ключа.
@ -77,6 +77,12 @@ private:
size_t blocks_written = 0;
/// Fields specific for VERTICAL merge algorithm
size_t current_pos = 0; /// Global row number of current key
size_t first_negative_pos = 0; /// Global row number of first_negative
size_t last_positive_pos = 0; /// Global row number of last_positive
size_t last_negative_pos = 0; /// Global row number of last_negative
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/

View File

@ -0,0 +1,81 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/PODArray.h>
namespace DB
{
struct __attribute__((__packed__)) RowSourcePart
{
/// Sequence of members is important to use RowSourcePart * as UInt8 * if flag = false
UInt8 source_id: 7;
UInt8 flag: 1;
RowSourcePart() = default;
RowSourcePart(unsigned source_id_, bool flag_ = false)
{
source_id = source_id_;
flag = flag_;
}
static constexpr size_t MAX_PARTS = 127;
};
using MergedRowSources = PODArray<RowSourcePart>;
/** Gather single stream from multiple streams according to streams mask.
* Stream mask maps row number to index of source stream.
* Streams should conatin exactly one column.
*/
class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources& pos_to_source_idx_,
size_t block_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
String getID() const override;
Block readImpl() override;
private:
const MergedRowSources & pos_to_source_idx;
/// Cache required fileds
struct Source
{
const IColumn * column;
size_t pos;
size_t size;
Block block;
Source(Block && block_) : block(std::move(block_))
{
update();
}
void update()
{
column = block.getByPosition(0).column.get();
size = block.rowsInFirstColumn();
pos = 0;
}
};
std::vector<Source> sources;
size_t pos_global = 0;
size_t block_size;
Logger * log = &Logger::get("ColumnGathererStream");
};
}

View File

@ -9,6 +9,7 @@
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
@ -56,10 +57,12 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
/// limit - if isn't 0, then we can produce only first limit rows in sorted order.
/// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr)
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size())
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -158,6 +161,10 @@ protected:
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
QueueWithCollation queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK columns (on next step)
/// If it is not nullptr then it should be populated during execution
MergedRowSources * out_row_sources = nullptr;
/// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах.

View File

@ -283,7 +283,7 @@ public:
* node - это список значений: 1, 2, 3 или список tuple-ов: (1, 2), (3, 4), (5, 6).
* create_ordered_set - создавать ли вектор упорядоченных элементов. Нужен для работы индекса
*/
void createFromAST(DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set);
void createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set);
// Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять.
bool insertFromBlock(const Block & block, bool create_ordered_set = false);

View File

@ -32,10 +32,21 @@ struct MergeInfo
UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{};
std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> bytes_written_uncompressed{};
/// Updated only for Horizontal algorithm
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> rows_written{};
/// Updated only for Vertical algorithm
/// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written
std::atomic<UInt64> columns_written{};
/// Updated in both cases
/// Number of rows for which primary key columns have been written
std::atomic<UInt64> rows_with_key_columns_read{};
std::atomic<UInt64> rows_with_key_columns_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
: database{database}, table{table}, result_part_name{result_part_name}
@ -52,9 +63,12 @@ struct MergeInfo
total_size_bytes_compressed(other.total_size_bytes_compressed),
total_size_marks(other.total_size_marks),
bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed))
rows_read(other.rows_read.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed)),
columns_written(other.columns_written.load(std::memory_order_relaxed)),
rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)),
rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed))
{
}
};

View File

@ -24,7 +24,7 @@ public:
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_);
bool save_marks_in_cache_, bool quiet = false);
~MergeTreeBlockInputStream() override;

View File

@ -55,11 +55,11 @@ namespace ErrorCodes
* Структура файлов:
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - список файлов с их размерами и контрольными суммами.
* columns.txt - список столбцов с их типами.
* checksums.txt - содержит список файлов с их размерами и контрольными суммами.
* columns.txt - содержит список столбцов с их типами.
* primary.idx - индексный файл.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
* [Column].bin - данные столбца
* [Column].mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* Имеется несколько режимов работы, определяющих, что делать при мердже:
* - Ordinary - ничего дополнительно не делать;

View File

@ -9,6 +9,7 @@ namespace DB
{
class MergeListEntry;
class MergeProgressCallback;
struct ReshardingJob;
@ -120,6 +121,19 @@ public:
bool isCancelled() const { return cancelled > 0; }
public:
enum class MergeAlgorithm
{
Horizontal, /// per-row merge of all columns
Vertical /// per-row merge of PK columns, per-column gather for non-PK columns
};
private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const;
private:
MergeTreeData & data;
const BackgroundProcessingPool & pool;

View File

@ -46,6 +46,8 @@ struct MergeTreeDataPartChecksums
void addFile(const String & file_name, size_t file_size, uint128 file_hash);
void add(MergeTreeDataPartChecksums && rhs_checksums);
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const;

View File

@ -90,6 +90,9 @@ struct MergeTreeSettings
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 0;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -124,6 +127,7 @@ struct MergeTreeSettings
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -33,6 +33,7 @@ public:
{
}
protected:
using OffsetColumns = std::set<std::string>;
@ -306,11 +307,16 @@ public:
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums(
const NamesAndTypesList & total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr)
{
/// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{
index_stream->next();
@ -319,10 +325,10 @@ public:
index_stream = nullptr;
}
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
for (auto & column_stream : column_streams)
{
it->second->finalize();
it->second->addToChecksums(checksums);
column_stream.second->finalize();
column_stream.second->addToChecksums(checksums);
}
column_streams.clear();
@ -338,7 +344,7 @@ public:
{
/// Записываем файл с описанием столбцов.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
columns_list.writeText(out);
total_column_list.writeText(out);
}
{
@ -350,6 +356,11 @@ public:
return checksums;
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
return writeSuffixAndGetChecksums(columns_list, nullptr);
}
MergeTreeData::DataPart::Index & getIndex()
{
return index_columns;

View File

@ -117,6 +117,8 @@ DayNum_t stringToDate(const String & s)
DayNum_t date{};
readDateText(date, in);
if (!in.eof())
throw Exception("String is too long for Date: " + s);
return date;
}

View File

@ -55,6 +55,13 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources)
{
/// true flag value means "skip row"
out_row_sources->data()[last_positive_pos].flag = false;
out_row_sources->data()[last_negative_pos].flag = false;
}
}
return;
}
@ -64,6 +71,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
if (out_row_sources)
out_row_sources->data()[first_negative_pos].flag = false;
}
if (count_positive >= count_negative)
@ -71,6 +81,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
if (out_row_sources)
out_row_sources->data()[last_positive_pos].flag = false;
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
@ -123,7 +136,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
size_t merged_rows = 0;
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
for (; !queue.empty(); ++current_pos)
{
TSortCursor current = queue.top();
@ -149,6 +162,10 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
queue.pop();
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources)
out_row_sources->emplace_back(current.impl->order, true);
if (key_differs)
{
/// Запишем данные для предыдущего первичного ключа.
@ -166,13 +183,21 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
last_is_positive = true;
setRowRef(last_positive, current);
last_positive_pos = current_pos;
}
else if (sign == -1)
{
if (!count_negative)
{
setRowRef(first_negative, current);
first_negative_pos = current_pos;
}
if (!blocks_written && !merged_rows)
{
setRowRef(last_negative, current);
last_negative_pos = current_pos;
}
++count_negative;
last_is_positive = false;

View File

@ -0,0 +1,102 @@
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources & pos_to_source_idx_, size_t block_size_)
: pos_to_source_idx(pos_to_source_idx_), block_size(block_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
sources.reserve(children.size());
for (size_t i = 0; i < children.size(); i++)
{
sources.emplace_back(children[i]->read());
Block & block = sources.back().block;
if (block.columns() != 1)
throw Exception("Stream should contain exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (block.getByPosition(0).column->getName() != sources[0].block.getByPosition(0).column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); i++)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
Block ColumnGathererStream::readImpl()
{
if (children.size() == 1)
return children[0]->read();
if (pos_global >= pos_to_source_idx.size())
return Block();
Block block_res = sources[0].block.cloneEmpty();
IColumn & column_res = *block_res.getByPosition(0).column;
size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size());
column_res.reserve(pos_finish - pos_global);
for (size_t pos = pos_global; pos < pos_finish; ++pos)
{
auto source_id = pos_to_source_idx[pos].source_id;
bool skip = pos_to_source_idx[pos].flag;
Source & source = sources[source_id];
if (source.pos >= source.size) /// Fetch new block
{
try
{
source.block = children[source_id]->read();
source.update();
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
if (!skip)
column_res.insertFrom(*source.column, source.pos); //TODO: vectorize
++source.pos;
}
pos_global = pos_finish;
return block_res;
}
}

View File

@ -195,13 +195,10 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
return;
}
size_t source_num = 0;
size_t size = cursors.size();
for (; source_num < size; ++source_num)
if (&cursors[source_num] == current.impl)
break;
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
if (source_num == size)
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
@ -224,6 +221,9 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
finished = true;
}
if (out_row_sources)
out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num));
// std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
@ -236,6 +236,12 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (out_row_sources)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
out_row_sources->emplace_back(current.impl->order);
}
if (!current->isLast())
{
// std::cerr << "moving to next row\n";

View File

@ -279,7 +279,7 @@ static Field extractValueFromNode(ASTPtr & node, const IDataType & type, const C
}
void Set::createFromAST(DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set)
void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set)
{
data_types = types;

View File

@ -9,6 +9,8 @@
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/Core/FieldVisitors.h>
#include <DB/Interpreters/convertFieldToType.h>
@ -74,19 +76,12 @@ Field convertFieldToType(const Field & src, const IDataType & type)
const bool is_date = typeid_cast<const DataTypeDate *>(&type);
bool is_datetime = false;
bool is_enum8 = false;
bool is_enum16 = false;
bool is_enum = false;
if (!is_date)
if (!(is_datetime = typeid_cast<const DataTypeDateTime *>(&type)))
if (!(is_enum8 = typeid_cast<const DataTypeEnum8 *>(&type)))
if (!(is_enum16 = typeid_cast<const DataTypeEnum16 *>(&type)))
throw Exception{
"Logical error: unknown numeric type " + type.getName(),
ErrorCodes::LOGICAL_ERROR
};
const auto is_enum = is_enum8 || is_enum16;
if (!(is_enum = dynamic_cast<const IDataTypeEnum *>(&type)))
throw Exception{"Logical error: unknown numeric type " + type.getName(), ErrorCodes::LOGICAL_ERROR};
/// Numeric values for Enums should not be used directly in IN section
if (src.getType() == Field::Types::UInt64 && !is_enum)
@ -94,32 +89,21 @@ Field convertFieldToType(const Field & src, const IDataType & type)
if (src.getType() == Field::Types::String)
{
/// Возможность сравнивать даты и даты-с-временем со строкой.
const String & str = src.get<const String &>();
ReadBufferFromString in(str);
if (is_date)
{
DayNum_t date{};
readDateText(date, in);
if (!in.eof())
throw Exception("String is too long for Date: " + str);
return Field(UInt64(date));
/// Convert 'YYYY-MM-DD' Strings to Date
return UInt64(stringToDate(src.get<const String &>()));
}
else if (is_datetime)
{
time_t date_time{};
readDateTimeText(date_time, in);
if (!in.eof())
throw Exception("String is too long for DateTime: " + str);
return Field(UInt64(date_time));
/// Convert 'YYYY-MM-DD hh:mm:ss' Strings to DateTime
return stringToDateTime(src.get<const String &>());
}
else if (is_enum)
{
/// Convert String to Enum's value
return dynamic_cast<const IDataTypeEnum &>(type).castToValue(src);
}
else if (is_enum8)
return Field(UInt64(static_cast<const DataTypeEnum8 &>(type).getValue(str)));
else if (is_enum16)
return Field(UInt64(static_cast<const DataTypeEnum16 &>(type).getValue(str)));
}
throw Exception("Type mismatch in IN or VALUES section: " + type.getName() + " expected, "

View File

@ -21,7 +21,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_)
bool save_marks_in_cache_, bool quiet)
:
path(path_), block_size(block_size_),
storage(storage_), owned_data_part(owned_data_part_),
@ -97,6 +97,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
total_rows += range.end - range.begin;
total_rows *= storage.index_granularity;
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1

View File

@ -16,11 +16,13 @@
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Common/Increment.h>
#include <DB/Common/interpolate.h>
#include <cmath>
#include <numeric>
namespace ProfileEvents
@ -42,6 +44,10 @@ namespace ErrorCodes
extern const int ABORTED;
}
using MergeAlgorithm = MergeTreeDataMerger::MergeAlgorithm;
namespace
{
@ -274,8 +280,157 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
return parts_from_partition;
}
static void extractOrdinaryAndKeyColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions,
NamesAndTypesList & ordinary_column_names_and_types, Names & ordinary_column_names,
NamesAndTypesList & key_column_names_and_types, Names & key_column_names
)
{
Names key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(key_columns_dup.cbegin(), key_columns_dup.cend());
/// parts должны быть отсортированы.
for (auto & column : all_columns)
{
auto it = std::find(key_columns.cbegin(), key_columns.cend(), column.name);
if (key_columns.end() == it)
{
ordinary_column_names_and_types.emplace_back(column);
ordinary_column_names.emplace_back(column.name);
}
else
{
key_column_names_and_types.emplace_back(column);
key_column_names.emplace_back(column.name);
}
}
}
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
std::unordered_map<String, size_t> map;
public:
/// Stores approximate size of columns in bytes
/// Exact values are not required since it used for relative values estimation (progress).
size_t sum_total = 0;
size_t sum_index_columns = 0;
size_t sum_ordinary_columns = 0;
ColumnSizeEstimator(MergeTreeData::DataPartsVector & parts, const Names & key_columns, const Names & ordinary_columns)
{
if (parts.empty())
return;
for (const auto & name_and_type : parts.front()->columns)
map[name_and_type.name] = 0;
for (const auto & part : parts)
{
for (const auto & name_and_type : parts.front()->columns)
map.at(name_and_type.name) += part->getColumnSize(name_and_type.name);
}
for (const auto & name : key_columns)
sum_index_columns += map.at(name);
for (const auto & name : ordinary_columns)
sum_ordinary_columns += map.at(name);
sum_total = sum_index_columns + sum_ordinary_columns;
}
/// Approximate size of num_rows column elements if column contains num_total_rows elements
Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(map.at(column)) / num_total_rows * num_rows;
}
/// Relative size of num_rows column elements (in comparison with overall size of all columns) if column contains num_total_rows elements
Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const
{
return columnSize(column, num_rows, num_total_rows) / sum_total;
}
/// Like columnSize, but takes into account only PK columns
Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(sum_index_columns) / num_total_rows * num_rows;
}
/// Like columnProgress, but takes into account only PK columns
Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const
{
return keyColumnsSize(num_rows, num_total_rows) / sum_total;
}
};
class MergeProgressCallback : public ProgressCallback
{
public:
MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {}
MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows,
const ColumnSizeEstimator & column_sizes)
: merge_entry(merge_entry_), merge_alg(merge_alg_)
{
if (merge_alg == MergeAlgorithm::Horizontal)
average_elem_progress = 1.0 / num_total_rows;
else
average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows);
}
MergeList::Entry & merge_entry;
const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical};
Float64 average_elem_progress;
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_with_key_columns_read += value.rows;
if (merge_alg == MergeAlgorithm::Horizontal)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
}
else
{
merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read;
}
};
};
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name)
: MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress)
{
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
}
Float64 initial_progress;
/// NOTE: not thread safe (to be copyable). It is OK in current single thread use case
size_t rows_read_internal{0};
void operator() (const Progress & value)
{
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal;
merge_entry->progress = initial_progress + local_progress;
};
};
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation)
@ -306,49 +461,58 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
part->accumulateColumnSizes(merged_column_to_size);
}
Names column_names = data.getColumnNamesList();
NamesAndTypesList column_names_and_types = data.getColumnsList();
Names all_column_names = data.getColumnNamesList();
NamesAndTypesList all_column_names_and_types = data.getColumnsList();
SortDescription sort_desc = data.getSortDescription();
NamesAndTypesList ordinary_column_names_and_types, key_column_names_and_types;
Names ordinary_column_names, key_column_names;
extractOrdinaryAndKeyColumns(all_column_names_and_types, data.getPrimaryExpression(),
ordinary_column_names_and_types, ordinary_column_names,
key_column_names_and_types, key_column_names
);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
new_data_part->name = "tmp_" + merged_name;
new_data_part->is_temp = true;
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
MergedRowSources merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical)
? &merged_rows_sources : nullptr;
Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical)
? key_column_names : all_column_names;
NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlgorithm::Vertical)
? key_column_names_and_types : all_column_names_and_types;
ColumnSizeEstimator column_sizes(parts, key_column_names, ordinary_column_names);
/** Читаем из всех кусков, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges{{0, parts[i]->size}};
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i],
MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
{
const auto new_rows_read = merge_entry->rows_read += value.rows;
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
});
input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes});
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
src_streams.emplace_back(std::move(input));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
@ -360,32 +524,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr);
break;
case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time_of_merge);
break;
@ -404,7 +568,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
data, new_part_tmp_path, main_column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -418,19 +582,86 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_written += block.rows();
to.write(block);
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
if (merge_alg == MergeAlgorithm::Horizontal)
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal)
{
Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound);
disk_reservation->update(static_cast<size_t>((1. - relative_rows_written) * initial_reservation));
}
}
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_ordinary_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read;
merge_entry->columns_written = key_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
BlockInputStreams column_part_streams(parts.size());
auto it_name_and_type = ordinary_column_names_and_types.cbegin();
for (size_t column_num = 0; column_num < ordinary_column_names.size(); ++column_num)
{
const String & column_name = ordinary_column_names[column_num];
Names column_name_(1, column_name);
NamesAndTypesList column_name_and_type_(1, *it_name_and_type++);
Float64 progress_before = merge_entry->progress;
LOG_TRACE(log, "Gathering column " << column_name << " " << column_name_and_type_.front().type->getName());
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
String part_path = data.getFullPath() + parts[part_num]->name + '/';
/// TODO: test perfomance with more accurate settings
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num],
MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE,
false, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name});
column_part_streams[part_num] = std::move(column_part_stream);
}
ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method);
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_to.write(block);
}
/// NOTE: nested column contains duplicates checksums (and files)
checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums());
merge_entry->columns_written = key_column_names.size() + column_num;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
}
merged_stream->readSuffix();
new_data_part->columns = column_names_and_types;
new_data_part->checksums = to.writeSuffixAndGetChecksums();
new_data_part->columns = all_column_names_and_types;
if (merge_alg != MergeAlgorithm::Vertical)
new_data_part->checksums = to.writeSuffixAndGetChecksums();
else
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_column_names_and_types, &checksums_ordinary_columns);
new_data_part->index.swap(to.getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
@ -446,6 +677,43 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const
{
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size();
bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
if (merge_alg == MergeAlgorithm::Vertical)
{
try
{
rows_sources_to_alloc.reserve(sum_rows_upper_bound);
}
catch (...)
{
/// Not enough memory for VERTICAL merge algorithm, make sense for very large tables
merge_alg = MergeAlgorithm::Horizontal;
}
}
return merge_alg;
}
MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart(
MergeTreeData::DataPartsVector & parts,
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -222,6 +222,14 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s
files[file_name] = Checksum(file_size, file_hash);
}
void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums)
{
for (auto & checksum : rhs_checksums.files)
files[std::move(checksum.first)] = std::move(checksum.second);
rhs_checksums.files.clear();
}
/// Контрольная сумма от множества контрольных сумм .bin файлов.
void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
{

View File

@ -11,6 +11,7 @@
#include <DB/Parsers/ASTSet.h>
#include <DB/Functions/FunctionFactory.h>
#include <DB/Core/FieldVisitors.h>
#include <DB/Interpreters/convertFieldToType.h>
namespace DB
@ -131,7 +132,7 @@ const PKCondition::AtomMap PKCondition::atom_map{
},
{
"in",
[] (RPNElement & out, const Field & value, ASTPtr & node)
[] (RPNElement & out, const Field &, ASTPtr & node)
{
out.function = RPNElement::FUNCTION_IN_SET;
out.in_function = node;
@ -140,7 +141,7 @@ const PKCondition::AtomMap PKCondition::atom_map{
},
{
"notIn",
[] (RPNElement & out, const Field & value, ASTPtr & node)
[] (RPNElement & out, const Field &, ASTPtr & node)
{
out.function = RPNElement::FUNCTION_NOT_IN_SET;
out.in_function = node;
@ -239,23 +240,26 @@ bool PKCondition::addCondition(const String & column, const Range & range)
return true;
}
/** Получить значение константного выражения.
* Вернуть false, если выражение не константно.
/** Computes value of constant expression and it data type.
* Returns false, if expression isn't constant.
*/
static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & value)
static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & out_value, DataTypePtr & out_type)
{
String column_name = expr->getColumnName();
if (const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(&*expr))
if (const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(expr.get()))
{
/// литерал
value = lit->value;
/// Simple literal
out_value = lit->value;
out_type = block_with_constants.getByName(column_name).type;
return true;
}
else if (block_with_constants.has(column_name) && block_with_constants.getByName(column_name).column->isConst())
{
/// выражение, вычислившееся в константу
value = (*block_with_constants.getByName(column_name).column)[0];
/// An expression which is dependent on constants only
const auto & expr_info = block_with_constants.getByName(column_name);
out_value = (*expr_info.column)[0];
out_type = expr_info.type;
return true;
}
else
@ -362,46 +366,23 @@ bool PKCondition::isPrimaryKeyPossiblyWrappedByMonotonicFunctionsImpl(
}
/// NOTE: Keep in the mind that such behavior could be incompatible inside ordinary expression.
/// TODO: Use common methods for types conversions.
static bool tryCastValueToType(const DataTypePtr & desired_type, const DataTypePtr & src_type, Field & src_value)
static void castValueToType(const DataTypePtr & desired_type, Field & src_value, const DataTypePtr & src_type, const ASTPtr & node)
{
if (desired_type->getName() == src_type->getName())
return true;
return;
/// Try to correct type of constant for correct comparison
try
{
/// Convert String to Enum's value
if (auto data_type_enum = dynamic_cast<const IDataTypeEnum *>(desired_type.get()))
{
src_value = data_type_enum->castToValue(src_value);
}
/// Convert 'YYYY-MM-DD' Strings to Date
else if (typeid_cast<const DataTypeDate *>(desired_type.get()) && typeid_cast<const DataTypeString *>(src_type.get()))
{
src_value = UInt64(stringToDate(src_value.safeGet<String>()));
}
/// Convert 'YYYY-MM-DD hh:mm:ss' Strings to DateTime
else if (typeid_cast<const DataTypeDateTime *>(desired_type.get()) && typeid_cast<const DataTypeString *>(src_type.get()))
{
src_value = stringToDateTime(src_value.safeGet<String>());
}
else if (desired_type->behavesAsNumber() && src_type->behavesAsNumber())
{
/// Ok, numeric types are almost mutually convertible
}
else
{
return false;
}
/// NOTE: We don't need accurate info about src_type at this moment
src_value = convertFieldToType(src_value, *desired_type);
}
catch (...)
{
return false;
throw Exception("Primary key expression contains comparison between inconvertible types: " +
desired_type->getName() + " and " + src_type->getName() +
" inside " + DB::toString(node->range),
ErrorCodes::BAD_TYPE_OF_FIELD);
}
return true;
}
@ -411,8 +392,9 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
* либо он же, завёрнутый в цепочку возможно-монотонных функций,
* либо константное выражение - число.
*/
Field value;
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(&*node))
Field const_value;
DataTypePtr const_type;
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(node.get()))
{
const ASTs & args = typeid_cast<const ASTExpressionList &>(*func->arguments).children;
@ -423,13 +405,14 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
size_t key_arg_pos; /// Position of argument with primary key column (non-const argument)
size_t key_column_num; /// Number of a primary key column (inside sort_descr array)
RPNElement::MonotonicFunctionsChain chain;
bool is_set_const = false;
if (getConstant(args[1], block_with_constants, value)
if (getConstant(args[1], block_with_constants, const_value, const_type)
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 0;
}
else if (getConstant(args[0], block_with_constants, value)
else if (getConstant(args[0], block_with_constants, const_value, const_type)
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 1;
@ -438,6 +421,7 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
&& isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 0;
is_set_const = true;
}
else
return false;
@ -469,26 +453,19 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl
if (atom_it == std::end(atom_map))
return false;
const DataTypePtr & const_type = block_with_constants.getByName(args[1 - key_arg_pos]->getColumnName()).type;
if (!is_set_const) /// Set args are already casted inside Set::createFromAST
castValueToType(key_expr_type, const_value, const_type, node);
if (!tryCastValueToType(key_expr_type, const_type, value))
{
throw Exception("Primary key expression contains comparison between inconvertible types: " +
key_expr_type->getName() + " and " + const_type->getName() +
" inside " + DB::toString(func->range),
ErrorCodes::BAD_TYPE_OF_FIELD);
}
return atom_it->second(out, value, node);
return atom_it->second(out, const_value, node);
}
else if (getConstant(node, block_with_constants, value)) /// Для случаев, когда написано, например, WHERE 0 AND something
else if (getConstant(node, block_with_constants, const_value, const_type)) /// Для случаев, когда написано, например, WHERE 0 AND something
{
if (value.getType() == Field::Types::UInt64
|| value.getType() == Field::Types::Int64
|| value.getType() == Field::Types::Float64)
if (const_value.getType() == Field::Types::UInt64
|| const_value.getType() == Field::Types::Int64
|| const_value.getType() == Field::Types::Float64)
{
/// Ноль во всех типах представлен в памяти так же, как в UInt64.
out.function = value.get<UInt64>()
out.function = const_value.get<UInt64>()
? RPNElement::ALWAYS_TRUE
: RPNElement::ALWAYS_FALSE;

View File

@ -691,7 +691,7 @@ BlockInputStreams StorageLog::read(
max_block_size,
column_names,
*this,
0, std::numeric_limits<size_t>::max(),
0, marksCount() ? std::numeric_limits<size_t>::max() : 0,
settings.max_read_buffer_size));
}
else

View File

@ -24,7 +24,10 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "bytes_read_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() }
{ "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_read", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_written", std::make_shared<DataTypeUInt64>() }
}
{
}
@ -58,13 +61,16 @@ BlockInputStreams StorageSystemMerges::read(
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_written_uncompressed"};
ColumnWithTypeAndName col_rows_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_written"};
ColumnWithTypeAndName col_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "columns_written"};
ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_read"};
ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_written"};
for (const auto & merge : context.getMergeList().get())
{
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_elapsed.column->insert(merge.watch.elapsedSeconds());
col_progress.column->insert(merge.progress);
col_progress.column->insert(std::min(1., merge.progress)); /// little cheat
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);
@ -73,6 +79,9 @@ BlockInputStreams StorageSystemMerges::read(
col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed));
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed));
col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed));
col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed));
col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed));
}
Block block{
@ -87,7 +96,10 @@ BlockInputStreams StorageSystemMerges::read(
col_bytes_read_uncompressed,
col_rows_read,
col_bytes_written_uncompressed,
col_rows_written
col_rows_written,
col_columns_written,
col_rows_with_key_columns_read,
col_rows_with_key_columns_written
};
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};

View File

@ -20,5 +20,7 @@
3447905173014179293
3051197876967004596
3051197876967004596
3051197876967004596
3051197876967004596
463667963421364848
463667963421364848

View File

@ -35,5 +35,8 @@ SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE 1 = 1;
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x = '0' OR x = '1');
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d = '0' OR d = '1');
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x IN ('0', '1');
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d IN ('0', '1');
SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x != '0' AND x != '1');
SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d != '0' AND d != '1');