Merge branch 'METR-23305' of git://github.com/ludv1x/ClickHouse into ludv1x-METR-23305

Conflicts:
	contrib/CMakeLists.txt
This commit is contained in:
Alexey Milovidov 2016-12-02 23:22:11 +03:00
commit 4110b6865f
30 changed files with 835 additions and 114 deletions

View File

@ -184,6 +184,12 @@ find_library (BOOST_THREAD_LIB libboost_thread.a HINTS ${BOOST_HINTS})
set (LTDL_HINTS "/usr/local/opt/libtool/lib")
find_library (LTDL_LIB libltdl.a HINTS ${LTDL_HINTS})
# 5. tcmalloc
if(NOT DEFINED DISABLE_LIBTCMALLOC)
set(DISABLE_LIBTCMALLOC $ENV{DISABLE_LIBTCMALLOC} CACHE STRING "Don't use libtcmalloc" FORCE)
set(DEBUG_LIBTCMALLOC $ENV{DEBUG_LIBTCMALLOC} CACHE STRING "Use debug version of libtcmalloc" FORCE)
endif()
# Directory for Yandex specific files
SET(CLICKHOUSE_PRIVATE_DIR ${ClickHouse_SOURCE_DIR}/private/)

View File

@ -10,7 +10,7 @@ add_subdirectory (libpoco)
add_subdirectory (libre2)
add_subdirectory (libzookeeper)
if (NOT DEFINED $ENV{DISABLE_LIBTCMALLOC})
if (NOT DISABLE_LIBTCMALLOC)
add_subdirectory (libtcmalloc)
endif()

View File

@ -10,6 +10,8 @@ add_definitions(
include_directories (include src)
message(STATUS "Building: tcmalloc_minimal_internal")
add_library (tcmalloc_minimal_internal
./src/malloc_hook.cc
./src/base/spinlock_internal.cc

View File

@ -22,7 +22,7 @@ else()
set (LINK_MONGOCLIENT libmongoclient.a ${OPENSSL_LIBS} ${BOOST_THREAD_LIB})
endif()
if ($ENV{DISABLE_LIBTCMALLOC})
if (DISABLE_LIBTCMALLOC)
add_definitions(-D NO_TCMALLOC)
endif()
@ -298,6 +298,7 @@ add_library (dbms
include/DB/DataStreams/SquashingTransform.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataStreams/SquashingBlockOutputStream.h
include/DB/DataStreams/ColumnGathererStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
include/DB/DataTypes/DataTypeSet.h
@ -808,6 +809,7 @@ add_library (dbms
src/DataStreams/SquashingTransform.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataStreams/SquashingBlockOutputStream.cpp
src/DataStreams/ColumnGathererStream.cpp
src/DataTypes/DataTypeString.cpp
src/DataTypes/DataTypeFixedString.cpp

View File

@ -35,9 +35,9 @@ struct SortColumnDescription
using SortDescription = std::vector<SortColumnDescription>;
/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках.
* Курсор двигается по одному блоку.
* Для использования в priority queue.
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
* It is used in priority queue.
*/
struct SortCursorImpl
{
@ -48,14 +48,17 @@ struct SortCursorImpl
size_t pos = 0;
size_t rows = 0;
/** Порядок (что сравнивается), если сравниваемые столбцы равны.
* Даёт возможность предпочитать строки из нужного курсора.
/** Determines order if comparing columns are equal.
* Order is determined by number of cursor.
*
* Cursor number (always?) equals to number of merging part.
* Therefore this field can be used to determine part number of current row (see ColumnGathererStream).
*/
size_t order;
using NeedCollationFlags = std::vector<UInt8>;
/** Нужно ли использовать Collator для сортировки столбца */
/** Should we use Collator to sort a column? */
NeedCollationFlags need_collation;
/** Есть ли хотя бы один столбец с Collator. */

View File

@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_),
sign_column(sign_column_)
{
}
@ -62,7 +62,7 @@ private:
/// Прочитали до конца.
bool finished = false;
RowRef current_key; /// Текущий первичный ключ.
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
RowRef first_negative; /// Первая отрицательная строка для текущего первичного ключа.
@ -77,6 +77,12 @@ private:
size_t blocks_written = 0;
/// Fields specific for VERTICAL merge algorithm
size_t current_pos = 0; /// Global row number of current key
size_t first_negative_pos = 0; /// Global row number of first_negative
size_t last_positive_pos = 0; /// Global row number of last_positive
size_t last_negative_pos = 0; /// Global row number of last_negative
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/

View File

@ -0,0 +1,104 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Storages/IStorage.h>
#include <DB/Common/PODArray.h>
namespace DB
{
/// Tiny struct, stores number of a Part from which current row was fetched, and insertion flag.
struct RowSourcePart
{
RowSourcePart() = default;
RowSourcePart(size_t source_num, bool flag = false)
{
static_assert(sizeof(*this) == 1, "Size of RowSourcePart is too big due to compiler settings");
setSourceNum(source_num);
setSkipFlag(flag);
}
/// is equal to getSourceNum() if flag is false
size_t getData() const { return data; }
size_t getSourceNum()const { return data & MASK_NUMBER; }
/// In CollapsingMergeTree case flag means "skip this rows"
bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }
void setSourceNum(size_t source_num)
{
data = (data & MASK_FLAG) | (static_cast<UInt8>(source_num) & MASK_NUMBER);
}
void setSkipFlag(bool flag)
{
data = flag ? data | MASK_FLAG : data & ~MASK_FLAG;
}
static constexpr size_t MAX_PARTS = 0x7F;
static constexpr UInt8 MASK_NUMBER = 0x7F;
static constexpr UInt8 MASK_FLAG = 0x80;
private:
UInt8 data;
};
using MergedRowSources = PODArray<RowSourcePart>;
/** Gather single stream from multiple streams according to streams mask.
* Stream mask maps row number to index of source stream.
* Streams should conatin exactly one column.
*/
class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_ = DEFAULT_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
String getID() const override;
Block readImpl() override;
private:
String name;
ColumnWithTypeAndName column;
const MergedRowSources & pos_to_source_idx;
/// Cache required fileds
struct Source
{
const IColumn * column;
size_t pos;
size_t size;
Block block;
Source(Block && block_, const String & name) : block(std::move(block_))
{
update(name);
}
void update(const String & name)
{
column = block.getByName(name).column.get();
size = block.rows();
pos = 0;
}
};
std::vector<Source> sources;
size_t pos_global = 0;
size_t block_size;
Logger * log = &Logger::get("ColumnGathererStream");
};
}

View File

@ -9,6 +9,7 @@
#include <DB/Core/SortDescription.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
@ -56,10 +57,12 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке.
MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0)
/// limit - if isn't 0, then we can produce only first limit rows in sorted order.
/// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag)
MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr)
: description(description_), max_block_size(max_block_size_), limit(limit_),
source_blocks(inputs_.size()), cursors(inputs_.size())
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -158,6 +161,10 @@ protected:
using QueueWithCollation = std::priority_queue<SortCursorWithCollation>;
QueueWithCollation queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK columns (on next step)
/// If it is not nullptr then it should be populated during execution
MergedRowSources * out_row_sources = nullptr;
/// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах.

View File

@ -32,10 +32,21 @@ struct MergeInfo
UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{};
std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> bytes_written_uncompressed{};
/// Updated only for Horizontal algorithm
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> rows_written{};
/// Updated only for Vertical algorithm
/// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written
std::atomic<UInt64> columns_written{};
/// Updated in both cases
/// Number of rows for which primary key columns have been written
std::atomic<UInt64> rows_with_key_columns_read{};
std::atomic<UInt64> rows_with_key_columns_written{};
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
: database{database}, table{table}, result_part_name{result_part_name}
@ -52,9 +63,12 @@ struct MergeInfo
total_size_bytes_compressed(other.total_size_bytes_compressed),
total_size_marks(other.total_size_marks),
bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed))
rows_read(other.rows_read.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed)),
columns_written(other.columns_written.load(std::memory_order_relaxed)),
rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)),
rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed))
{
}
};

View File

@ -24,7 +24,7 @@ public:
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_);
bool save_marks_in_cache_, bool quiet = false);
~MergeTreeBlockInputStream() override;

View File

@ -55,11 +55,11 @@ namespace ErrorCodes
* Структура файлов:
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - список файлов с их размерами и контрольными суммами.
* columns.txt - список столбцов с их типами.
* checksums.txt - содержит список файлов с их размерами и контрольными суммами.
* columns.txt - содержит список столбцов с их типами.
* primary.idx - индексный файл.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
* [Column].bin - данные столбца
* [Column].mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* Имеется несколько режимов работы, определяющих, что делать при мердже:
* - Ordinary - ничего дополнительно не делать;

View File

@ -9,6 +9,7 @@ namespace DB
{
class MergeListEntry;
class MergeProgressCallback;
struct ReshardingJob;
@ -125,6 +126,19 @@ public:
bool isCancelled() const { return cancelled > 0; }
public:
enum class MergeAlgorithm
{
Horizontal, /// per-row merge of all columns
Vertical /// per-row merge of PK columns, per-column gather for non-PK columns
};
private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const;
private:
MergeTreeData & data;
const BackgroundProcessingPool & pool;

View File

@ -46,6 +46,8 @@ struct MergeTreeDataPartChecksums
void addFile(const String & file_name, size_t file_size, uint128 file_hash);
void add(MergeTreeDataPartChecksums && rhs_checksums);
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const;

View File

@ -62,6 +62,7 @@ private:
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
size_t max_mark_range;
bool is_empty = false;
Stream(
const String & path_prefix_, UncompressedCache * uncompressed_cache,
@ -69,9 +70,14 @@ private:
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
static std::unique_ptr<Stream> createEmptyPtr();
void loadMarks(MarkCache * cache, bool save_in_cache);
void seekToMark(size_t index);
private:
Stream() = default;
};
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;

View File

@ -90,6 +90,9 @@ struct MergeTreeSettings
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 0;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -124,6 +127,7 @@ struct MergeTreeSettings
SET_SIZE_T(min_relative_delay_to_yield_leadership);
SET_SIZE_T(min_relative_delay_to_close);
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -11,6 +11,8 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Columns/ColumnArray.h>
namespace DB
{
@ -33,6 +35,7 @@ public:
{
}
protected:
using OffsetColumns = std::set<std::string>;
@ -94,7 +97,8 @@ protected:
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size = 0, size_t level = 0, String filename = "")
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size = 0,
size_t level = 0, String filename = "", bool skip_offsets = false)
{
String escaped_column_name;
if (filename.size())
@ -105,23 +109,27 @@ protected:
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!skip_offsets)
{
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
column_streams[size_name] = std::make_unique<ColumnStream>(
escaped_size_name,
path + escaped_size_name + ".bin",
path + escaped_size_name + ".mrk",
max_compress_block_size,
compression_method,
estimated_size,
aio_threshold);
column_streams[size_name] = std::make_unique<ColumnStream>(
escaped_size_name,
path + escaped_size_name + ".bin",
path + escaped_size_name + ".mrk",
max_compress_block_size,
compression_method,
estimated_size,
aio_threshold);
}
addStream(path, name, *type_arr->getNestedType(), estimated_size, level + 1);
}
else
{
column_streams[name] = std::make_unique<ColumnStream>(
escaped_column_name,
path + escaped_column_name + ".bin",
@ -130,16 +138,19 @@ protected:
compression_method,
estimated_size,
aio_threshold);
}
}
/// Записать данные одного столбца.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level = 0)
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
size_t level = 0, bool skip_offsets = false)
{
size_t size = column.size();
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type);
if (!skip_offsets && type_arr)
{
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
@ -306,11 +317,16 @@ public:
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums(
const NamesAndTypesList & total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr)
{
/// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{
index_stream->next();
@ -319,10 +335,10 @@ public:
index_stream = nullptr;
}
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
for (auto & column_stream : column_streams)
{
it->second->finalize();
it->second->addToChecksums(checksums);
column_stream.second->finalize();
column_stream.second->addToChecksums(checksums);
}
column_streams.clear();
@ -338,7 +354,7 @@ public:
{
/// Записываем файл с описанием столбцов.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
columns_list.writeText(out);
total_column_list.writeText(out);
}
{
@ -350,6 +366,11 @@ public:
return checksums;
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
return writeSuffixAndGetChecksums(columns_list, nullptr);
}
MergeTreeData::DataPart::Index & getIndex()
{
return index_columns;
@ -488,12 +509,13 @@ private:
class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{
public:
MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method)
MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method,
bool skip_offsets_ = false)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method,
storage_.context.getSettings().min_bytes_to_use_direct_io),
part_path(part_path_), sync(sync_)
part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_)
{
}
@ -505,7 +527,7 @@ public:
for (size_t i = 0; i < block.columns(); ++i)
{
addStream(part_path, block.getByPosition(i).name,
*block.getByPosition(i).type, 0, 0, block.getByPosition(i).name);
*block.getByPosition(i).type, 0, 0, block.getByPosition(i).name, skip_offsets);
}
initialized = true;
}
@ -516,7 +538,7 @@ public:
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.getByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns);
writeData(column.name, *column.type, *column.column, offset_columns, 0, skip_offsets);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;
@ -537,8 +559,7 @@ public:
column_stream.second->finalize();
if (sync)
column_stream.second->sync();
std::string column = escapeForFileName(column_stream.first);
column_stream.second->addToChecksums(checksums, column);
column_stream.second->addToChecksums(checksums);
}
column_streams.clear();
@ -552,6 +573,7 @@ private:
bool initialized = false;
bool sync;
bool skip_offsets;
};
}

View File

@ -55,6 +55,13 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources)
{
/// true flag value means "skip row"
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
out_row_sources->data()[last_negative_pos].setSkipFlag(false);
}
}
return;
}
@ -64,6 +71,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
if (out_row_sources)
out_row_sources->data()[first_negative_pos].setSkipFlag(false);
}
if (count_positive >= count_negative)
@ -71,6 +81,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
if (out_row_sources)
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
@ -123,7 +136,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
size_t merged_rows = 0;
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
for (; !queue.empty(); ++current_pos)
{
TSortCursor current = queue.top();
@ -149,6 +162,10 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
queue.pop();
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources)
out_row_sources->emplace_back(current.impl->order, true);
if (key_differs)
{
/// Запишем данные для предыдущего первичного ключа.
@ -166,13 +183,21 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
last_is_positive = true;
setRowRef(last_positive, current);
last_positive_pos = current_pos;
}
else if (sign == -1)
{
if (!count_negative)
{
setRowRef(first_negative, current);
first_negative_pos = current_pos;
}
if (!blocks_written && !merged_rows)
{
setRowRef(last_negative, current);
last_negative_pos = current_pos;
}
++count_negative;
last_is_positive = false;

View File

@ -0,0 +1,111 @@
#include <DB/DataStreams/ColumnGathererStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & pos_to_source_idx_, size_t block_size_)
: name(column_name_), pos_to_source_idx(pos_to_source_idx_), block_size(block_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
sources.reserve(children.size());
for (size_t i = 0; i < children.size(); i++)
{
sources.emplace_back(children[i]->read(), name);
Block & block = sources.back().block;
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (block.columns() > 2 || !block.has(name))
throw Exception("Block should have 1 or 2 columns and contain column with requested name", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (i == 0)
{
column.name = name;
column.type = block.getByName(name).type->clone();
column.column = column.type->createColumn();
}
if (block.getByName(name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); i++)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
Block ColumnGathererStream::readImpl()
{
if (children.size() == 1)
return children[0]->read();
if (pos_global >= pos_to_source_idx.size())
return Block();
Block block_res{column.cloneEmpty()};
IColumn & column_res = *block_res.unsafeGetByPosition(0).column;
size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size());
column_res.reserve(pos_finish - pos_global);
for (size_t pos = pos_global; pos < pos_finish; ++pos)
{
auto source_id = pos_to_source_idx[pos].getSourceNum();
bool skip = pos_to_source_idx[pos].getSkipFlag();
Source & source = sources[source_id];
if (source.pos >= source.size) /// Fetch new block
{
try
{
source.block = children[source_id]->read();
source.update(name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
if (!skip)
column_res.insertFrom(*source.column, source.pos); //TODO: vectorize
++source.pos;
}
pos_global = pos_finish;
return block_res;
}
}

View File

@ -195,13 +195,10 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
return;
}
size_t source_num = 0;
size_t size = cursors.size();
for (; source_num < size; ++source_num)
if (&cursors[source_num] == current.impl)
break;
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
if (source_num == size)
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
@ -224,6 +221,9 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
finished = true;
}
if (out_row_sources)
out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num));
// std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
@ -236,6 +236,12 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
if (out_row_sources)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
out_row_sources->emplace_back(current.impl->order);
}
if (!current->isLast())
{
// std::cerr << "moving to next row\n";

View File

@ -21,7 +21,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
const MarkRanges & mark_ranges_, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns,
size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_,
bool save_marks_in_cache_)
bool save_marks_in_cache_, bool quiet)
:
path(path_), block_size(block_size_),
storage(storage_), owned_data_part(owned_data_part_),
@ -97,6 +97,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, ///
total_rows += range.end - range.begin;
total_rows *= storage.index_granularity;
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1

View File

@ -16,11 +16,13 @@
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Common/Increment.h>
#include <DB/Common/interpolate.h>
#include <cmath>
#include <numeric>
namespace ProfileEvents
@ -42,6 +44,10 @@ namespace ErrorCodes
extern const int ABORTED;
}
using MergeAlgorithm = MergeTreeDataMerger::MergeAlgorithm;
namespace
{
@ -283,7 +289,159 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition(
}
/// parts должны быть отсортированы.
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions,
const MergeTreeData::MergingParams & merging_params,
NamesAndTypesList & gathering_columns, Names & gathering_column_names,
NamesAndTypesList & merging_columns, Names & merging_column_names
)
{
Names key_columns_dup = primary_key_expressions->getRequiredColumns();
std::set<String> key_columns(key_columns_dup.cbegin(), key_columns_dup.cend());
/// Force sign column for Collapsing mode
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
key_columns.emplace(merging_params.sign_column);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (auto & column : all_columns)
{
auto it = std::find(key_columns.cbegin(), key_columns.cend(), column.name);
if (key_columns.end() == it)
{
gathering_columns.emplace_back(column);
gathering_column_names.emplace_back(column.name);
}
else
{
merging_columns.emplace_back(column);
merging_column_names.emplace_back(column.name);
}
}
}
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
MergeTreeData::DataPart::ColumnToSize map;
public:
/// Stores approximate size of columns in bytes
/// Exact values are not required since it used for relative values estimation (progress).
size_t sum_total = 0;
size_t sum_index_columns = 0;
size_t sum_ordinary_columns = 0;
ColumnSizeEstimator(const MergeTreeData::DataPart::ColumnToSize & map_, const Names & key_columns, const Names & ordinary_columns)
: map(map_)
{
for (const auto & name : key_columns)
if (!map.count(name)) map[name] = 0;
for (const auto & name : ordinary_columns)
if (!map.count(name)) map[name] = 0;
for (const auto & name : key_columns)
sum_index_columns += map.at(name);
for (const auto & name : ordinary_columns)
sum_ordinary_columns += map.at(name);
sum_total = std::max(1UL, sum_index_columns + sum_ordinary_columns);
}
/// Approximate size of num_rows column elements if column contains num_total_rows elements
Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(map.at(column)) / num_total_rows * num_rows;
}
/// Relative size of num_rows column elements (in comparison with overall size of all columns) if column contains num_total_rows elements
Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const
{
return columnSize(column, num_rows, num_total_rows) / sum_total;
}
/// Like columnSize, but takes into account only PK columns
Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const
{
return static_cast<Float64>(sum_index_columns) / num_total_rows * num_rows;
}
/// Like columnProgress, but takes into account only PK columns
Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const
{
return keyColumnsSize(num_rows, num_total_rows) / sum_total;
}
};
class MergeProgressCallback : public ProgressCallback
{
public:
MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {}
MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows,
const ColumnSizeEstimator & column_sizes)
: merge_entry(merge_entry_), merge_alg(merge_alg_)
{
if (merge_alg == MergeAlgorithm::Horizontal)
average_elem_progress = 1.0 / num_total_rows;
else
average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows);
}
MergeList::Entry & merge_entry;
const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical};
Float64 average_elem_progress;
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
merge_entry->bytes_read_uncompressed += value.bytes;
merge_entry->rows_with_key_columns_read += value.rows;
if (merge_alg == MergeAlgorithm::Horizontal)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
merge_entry->rows_read += value.rows;
merge_entry->progress = average_elem_progress * merge_entry->rows_read;
}
else
{
merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read;
}
};
};
class MergeProgressCallbackVerticalStep : public MergeProgressCallback
{
public:
MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact,
const ColumnSizeEstimator & column_sizes, const String & column_name)
: MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress)
{
average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact);
}
Float64 initial_progress;
/// NOTE: not thread safe (to be copyable). It is OK in current single thread use case
size_t rows_read_internal{0};
void operator() (const Progress & value)
{
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
rows_read_internal += value.rows;
Float64 local_progress = average_elem_progress * rows_read_internal;
merge_entry->progress = initial_progress + local_progress;
};
};
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart(
MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry,
size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation)
@ -308,55 +466,62 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
if (aio_threshold > 0)
{
for (const MergeTreeData::DataPartPtr & part : parts)
part->accumulateColumnSizes(merged_column_to_size);
}
for (const MergeTreeData::DataPartPtr & part : parts)
part->accumulateColumnSizes(merged_column_to_size);
Names column_names = data.getColumnNamesList();
NamesAndTypesList column_names_and_types = data.getColumnsList();
Names all_column_names = data.getColumnNamesList();
NamesAndTypesList all_columns = data.getColumnsList();
SortDescription sort_desc = data.getSortDescription();
NamesAndTypesList gathering_columns, merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(all_columns, data.getPrimaryExpression(), data.merging_params,
gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
new_data_part->name = "tmp_" + merged_name;
new_data_part->is_temp = true;
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
MergedRowSources merged_rows_sources;
MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
if (merge_alg != MergeAlgorithm::Vertical)
{
merged_rows_sources_ptr = nullptr;
merging_columns = all_columns;
merging_column_names = all_column_names;
gathering_columns.clear();
gathering_column_names.clear();
}
ColumnSizeEstimator column_sizes(merged_column_to_size, merging_column_names, gathering_column_names);
/** Читаем из всех кусков, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges{{0, parts[i]->size}};
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
part_path, DEFAULT_MERGE_BLOCK_SIZE, merging_column_names, data, parts[i],
MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
{
const auto new_rows_read = merge_entry->rows_read += value.rows;
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
merge_entry->bytes_read_uncompressed += value.bytes;
ProfileEvents::increment(ProfileEvents::MergedRows, value.rows);
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes);
});
input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes});
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.emplace_back(std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
src_streams.emplace_back(std::move(input));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
@ -368,32 +533,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr);
break;
case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Graphite:
merged_stream = std::make_unique<GraphiteRollupSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE,
src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE,
data.merging_params.graphite_params, time_of_merge);
break;
@ -412,7 +577,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
data, new_part_tmp_path, merging_columns, compression_method, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -426,19 +591,94 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
rows_written += block.rows();
to.write(block);
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
if (merge_alg == MergeAlgorithm::Horizontal)
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
/// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal)
{
Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound);
disk_reservation->update(static_cast<size_t>((1. - relative_rows_written) * initial_reservation));
}
}
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_ordinary_columns;
/// Gather ordinary columns
if (merge_alg == MergeAlgorithm::Vertical)
{
size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read;
merge_entry->columns_written = merging_column_names.size();
merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact);
BlockInputStreams column_part_streams(parts.size());
NameSet offset_columns_written;
auto it_name_and_type = gathering_columns.cbegin();
for (size_t column_num = 0; column_num < gathering_column_names.size(); ++column_num, it_name_and_type++)
{
const String & column_name = it_name_and_type->name;
const DataTypePtr & column_type = it_name_and_type->type;
const String offset_column_name = DataTypeNested::extractNestedTableName(column_name);
Names column_name_(1, column_name);
NamesAndTypesList column_name_and_type_(1, *it_name_and_type);
Float64 progress_before = merge_entry->progress;
bool offset_written = offset_columns_written.count(offset_column_name);
LOG_TRACE(log, "Gathering column " << column_name << " " << column_type->getName());
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
String part_path = data.getFullPath() + parts[part_num]->name + '/';
/// TODO: test perfomance with more accurate settings
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num],
MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE,
false, true);
column_part_stream->setProgressCallback(
MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name});
column_part_streams[part_num] = std::move(column_part_stream);
}
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method, offset_written);
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_to.write(block);
}
/// NOTE: nested column contains duplicates checksums (and files)
checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums());
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);
merge_entry->columns_written = merging_column_names.size() + column_num;
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
if (isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
}
merged_stream->readSuffix();
new_data_part->columns = column_names_and_types;
new_data_part->checksums = to.writeSuffixAndGetChecksums();
new_data_part->columns = all_columns;
if (merge_alg != MergeAlgorithm::Vertical)
new_data_part->checksums = to.writeSuffixAndGetChecksums();
else
new_data_part->checksums = to.writeSuffixAndGetChecksums(all_columns, &checksums_ordinary_columns);
new_data_part->index.swap(to.getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.
@ -454,6 +694,43 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
}
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const
{
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
bool is_supported_storage =
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size();
bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE;
bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
if (merge_alg == MergeAlgorithm::Vertical)
{
try
{
rows_sources_to_alloc.reserve(sum_rows_upper_bound);
}
catch (...)
{
/// Not enough memory for VERTICAL merge algorithm, make sense for very large tables
merge_alg = MergeAlgorithm::Horizontal;
}
}
return merge_alg;
}
MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart(
MergeTreeData::DataPartsVector & parts,
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -222,6 +222,14 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s
files[file_name] = Checksum(file_size, file_hash);
}
void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums)
{
for (auto & checksum : rhs_checksums.files)
files[std::move(checksum.first)] = std::move(checksum.second);
rhs_checksums.files.clear();
}
/// Контрольная сумма от множества контрольных сумм .bin файлов.
void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
{

View File

@ -215,6 +215,13 @@ MergeTreeReader::Stream::Stream(
}
}
std::unique_ptr<MergeTreeReader::Stream> MergeTreeReader::Stream::createEmptyPtr()
{
std::unique_ptr<Stream> res(new Stream);
res->is_empty = true;
return res;
}
void MergeTreeReader::Stream::loadMarks(MarkCache * cache, bool save_in_cache)
{
@ -285,26 +292,39 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
{
String escaped_column_name = escapeForFileName(name);
/** Если файла с данными нет - то не будем пытаться открыть его.
* Это нужно, чтобы можно было добавлять новые столбцы к структуре таблицы без создания файлов для старых кусков.
*/
if (!Poco::File(path + escaped_column_name + ".bin").exists())
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type);
bool data_file_exists = Poco::File(path + escaped_column_name + ".bin").exists();
bool is_column_of_nested_type = type_arr && level == 0 && DataTypeNested::extractNestedTableName(name) != name;
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
* But we should try to load offset data for array columns of Nested subtable (their data will be filled by default value).
*/
if (!data_file_exists && !is_column_of_nested_type)
return;
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type_arr)
{
String size_name = DataTypeNested::extractNestedTableName(name)
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_path = path + escaped_size_name + ".bin";
/// We don't have neither offsets neither data -> skipping, default values will be filled after
if (!data_file_exists && !Poco::File(size_path).exists())
return;
if (!streams.count(size_name))
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, uncompressed_cache, mark_cache, save_marks_in_cache,
all_mark_ranges, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1);
if (data_file_exists)
addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1);
else
streams.emplace(name, Stream::createEmptyPtr());
}
else
streams.emplace(name, std::make_unique<Stream>(
@ -344,10 +364,11 @@ void MergeTreeReader::readData(const String & name, const IDataType & type, ICol
required_internal_size - array.getData().size(),
level + 1);
size_t read_internal_size = array.getData().size();
/** Исправление для ошибочно записанных пустых файлов с данными массива.
* Такое бывает после ALTER с добавлением новых столбцов во вложенную структуру данных.
*/
size_t read_internal_size = array.getData().size();
if (required_internal_size != read_internal_size)
{
if (read_internal_size != 0)
@ -369,6 +390,11 @@ void MergeTreeReader::readData(const String & name, const IDataType & type, ICol
else
{
Stream & stream = *streams[name];
/// It means that data column of array column will be empty, and it will be replaced by const data column
if (stream.is_empty)
return;
double & avg_value_size_hint = avg_value_size_hints[name];
stream.seekToMark(from_mark);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);

View File

@ -691,7 +691,7 @@ BlockInputStreams StorageLog::read(
max_block_size,
column_names,
*this,
0, std::numeric_limits<size_t>::max(),
0, marksCount() ? std::numeric_limits<size_t>::max() : 0,
settings.max_read_buffer_size));
}
else

View File

@ -24,7 +24,10 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name)
{ "bytes_read_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_written_uncompressed", std::make_shared<DataTypeUInt64>() },
{ "rows_written", std::make_shared<DataTypeUInt64>() }
{ "rows_written", std::make_shared<DataTypeUInt64>() },
{ "columns_written", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_read", std::make_shared<DataTypeUInt64>() },
{ "rows_with_key_columns_written", std::make_shared<DataTypeUInt64>() }
}
{
}
@ -58,13 +61,16 @@ BlockInputStreams StorageSystemMerges::read(
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_written_uncompressed"};
ColumnWithTypeAndName col_rows_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_written"};
ColumnWithTypeAndName col_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "columns_written"};
ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_read"};
ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_with_key_columns_written"};
for (const auto & merge : context.getMergeList().get())
{
col_database.column->insert(merge.database);
col_table.column->insert(merge.table);
col_elapsed.column->insert(merge.watch.elapsedSeconds());
col_progress.column->insert(merge.progress);
col_progress.column->insert(std::min(1., merge.progress)); /// little cheat
col_num_parts.column->insert(merge.num_parts);
col_result_part_name.column->insert(merge.result_part_name);
col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed);
@ -73,6 +79,9 @@ BlockInputStreams StorageSystemMerges::read(
col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed));
col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed));
col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed));
col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed));
col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed));
col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed));
}
Block block{
@ -87,7 +96,10 @@ BlockInputStreams StorageSystemMerges::read(
col_bytes_read_uncompressed,
col_rows_read,
col_bytes_written_uncompressed,
col_rows_written
col_rows_written,
col_columns_written,
col_rows_with_key_columns_read,
col_rows_with_key_columns_written
};
return BlockInputStreams{1, std::make_shared<OneBlockInputStream>(block)};

View File

@ -33,3 +33,5 @@ target_link_libraries (merge_selector dbms)
add_executable (merge_selector2 merge_selector2.cpp)
target_link_libraries (merge_selector2 dbms)
add_executable (row_source_bitwise_compatibility row_source_bitwise_test.cpp)

View File

@ -0,0 +1,34 @@
#include <cstdlib>
#include <DB/DataStreams/ColumnGathererStream.h>
using DB::RowSourcePart;
static void check(const RowSourcePart & s, size_t num, bool flag)
{
if ((s.getSourceNum() != num || s.getSkipFlag() != flag) || (!flag && s.getData() != num))
{
printf("FAIL");
std::exit(-1);
}
}
int main(int, char **)
{
check(RowSourcePart(0, false), 0, false);
check(RowSourcePart(0, true), 0, true);
check(RowSourcePart(1, false), 1, false);
check(RowSourcePart(1, true), 1, true);
check(RowSourcePart(RowSourcePart::MAX_PARTS, false), RowSourcePart::MAX_PARTS, false);
check(RowSourcePart(RowSourcePart::MAX_PARTS, true), RowSourcePart::MAX_PARTS, true);
RowSourcePart p{80, false};
check(p, 80, false);
p.setSkipFlag(true);
check(p, 80, true);
p.setSkipFlag(false);
check(p, 80, false);
p.setSourceNum(RowSourcePart::MAX_PARTS);
check(p, RowSourcePart::MAX_PARTS, false);
printf("PASSED");
return 0;
}

View File

@ -0,0 +1,9 @@
[0] [1] ['1']
[0,0] [1,2] ['1','12']
[0,0,0] [1,2,3] ['1','12','123']
[0]
[0,0]
[0,0,0]
[0]
[0,0]
[0,0,0]

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS test.alter;
CREATE TABLE test.alter (d Date, k UInt64, i32 Int32, n Nested(ui8 UInt8, s String)) ENGINE=MergeTree(d, k, 8192);
INSERT INTO test.alter VALUES ('2015-01-01', 3, 30, [1,2,3], ['1','12','123']);
INSERT INTO test.alter VALUES ('2015-01-01', 2, 20, [1,2], ['1','12']);
INSERT INTO test.alter VALUES ('2015-01-01', 1, 10, [1], ['1']);
ALTER TABLE test.alter ADD COLUMN `n.i8` Array(Int8) AFTER i32;
SELECT `n.i8`, `n.ui8`, `n.s` FROM test.alter ORDER BY k;
SELECT `n.i8` FROM test.alter ORDER BY k;
OPTIMIZE TABLE test.alter;
SELECT `n.i8` FROM test.alter ORDER BY k;
DROP TABLE IF EXISTS test.alter;

View File

@ -60,17 +60,18 @@ add_library (common
)
# TESTIRT-3687 DISABLE_LIBTCMALLOC - when testing for memory leaks, disable libtcmalloc
IF($ENV{DISABLE_LIBTCMALLOC})
IF(DISABLE_LIBTCMALLOC)
message(STATUS "Disabling libtcmalloc for valgrind better analysis")
ELSE($ENV{DISABLE_LIBTCMALLOC})
IF($ENV{DEBUG_LIBTCMALLOC})
message(STATUS "Link libtcmalloc_minimal_debug for testing")
SET(MALLOC_LIBRARIES libtcmalloc_minimal_debug.a)
ELSE($ENV{DEBUG_LIBTCMALLOC})
ELSE(DISABLE_LIBTCMALLOC)
IF(DEBUG_LIBTCMALLOC)
find_library(LIBTCMALLOC_DEBUG libtcmalloc_minimal_debug.a tcmalloc_minimal_debug) # debug version of tcmalloc from package
message(STATUS "Link libtcmalloc_minimal_debug for testing from ${LIBTCMALLOC_DEBUG}")
SET(MALLOC_LIBRARIES ${LIBTCMALLOC_DEBUG})
ELSE(DEBUG_LIBTCMALLOC)
message(STATUS "Link libtcmalloc_minimal")
SET(MALLOC_LIBRARIES tcmalloc_minimal_internal)
ENDIF($ENV{DEBUG_LIBTCMALLOC})
ENDIF($ENV{DISABLE_LIBTCMALLOC})
ENDIF(DEBUG_LIBTCMALLOC)
ENDIF(DISABLE_LIBTCMALLOC)
if (APPLE)
SET(RT_LIBRARIES "apple_rt")