polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-05 14:53:22 +03:00
parent 35b7363802
commit 7293841003
19 changed files with 431 additions and 373 deletions

View File

@ -344,7 +344,8 @@ void IMergeTreeDataPart::assertState(const std::initializer_list<IMergeTreeDataP
void IMergeTreeDataPart::assertOnDisk() const
{
if (!isStoredOnDisk())
throw Exception("Data part '" + name + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR);
throw Exception("Data part '" + name + "' with type '"
+ typeToString(getType()) + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -63,8 +63,10 @@ public:
virtual MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec_,
const WriterSettings & writer_settings) const = 0;
const WriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity = {}) const = 0;
virtual bool isStoredOnDisk() const = 0;

View File

@ -1,23 +1,15 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <IO/createWriteBufferFromFileBase.h>
#include <Poco/File.h>
namespace DB
{
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_)
: part_path(part_path_)
, storage(storage_)
, columns_list(columns_list_)
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, settings(settings_) {}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
namespace
{
// constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
void IMergeTreeDataPartWriter::ColumnStream::finalize()
{
@ -65,4 +57,273 @@ void IMergeTreeDataPartWriter::ColumnStream::addToChecksums(MergeTreeData::DataP
checksums.files[name + marks_file_extension].file_hash = marks.getHash();
}
IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: part_path(part_path_)
, storage(storage_)
, columns_list(columns_list_)
, skip_indices(indices_to_recalc_)
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, settings(settings_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty)
, with_final_mark(storage.getSettings()->write_final_mark && can_use_adaptive_granularity)
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
Poco::File(part_path).createDirectories();
initPrimaryIndex();
initSkipIndices();
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
bool can_use_adaptive_index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// FIXME
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.appendMark(index_granularity_for_block);
}
void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block)
{
const auto storage_settings = storage.getSettings();
fillIndexGranularityImpl(
block,
storage_settings->index_granularity_bytes,
storage_settings->index_granularity,
settings.blocks_are_granules_size,
index_offset,
index_granularity,
settings.can_use_adaptive_granularity);
}
void IMergeTreeDataPartWriter::initPrimaryIndex()
{
if (storage.hasPrimaryKey())
{
index_file_stream = std::make_unique<WriteBufferFromFile>(
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
}
void IMergeTreeDataPartWriter::initSkipIndices()
{
for (const auto & index : skip_indices)
{
String stream_name = index->getFileName();
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::ColumnStream>(
stream_name,
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
default_codec, settings.max_compress_block_size,
0, settings.aio_threshold));
skip_indices_aggregators.push_back(index->createIndexAggregator());
skip_index_filling.push_back(0);
}
}
void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & primary_keys_block, size_t rows)
{
size_t primary_columns_num = primary_keys_block.columns();
if (index_columns.empty())
{
index_types = primary_keys_block.getDataTypes();
index_columns.resize(primary_columns_num);
last_index_row.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
index_columns[i] = primary_keys_block.getByPosition(i).column->cloneEmpty();
}
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),
* but then freed in completely different place (while merging parts), where query memory_tracker is not available.
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())
{
for (size_t j = 0; j < primary_columns_num; ++j)
{
const auto & primary_column = primary_keys_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, i);
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
}
}
++current_mark;
if (current_mark < index_granularity.getMarksCount())
i += index_granularity.getMarkRows(current_mark);
else
break;
}
/// store last index row to write final mark at the end of column
for (size_t j = 0; j < primary_columns_num; ++j)
{
const IColumn & primary_column = *primary_keys_block.getByPosition(j).column.get();
primary_column.get(rows - 1, last_index_row[j]);
}
}
void IMergeTreeDataPartWriter::calculateAndSerializeSkipIndices(
const Block & skip_indexes_block, size_t rows)
{
size_t skip_index_current_data_mark = 0;
/// Filling and writing skip indices like in IMergeTreeDataPartWriter::writeColumn
for (size_t i = 0; i < skip_indices.size(); ++i)
{
const auto index = skip_indices[i];
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
skip_index_current_data_mark = skip_index_data_mark;
while (prev_pos < rows)
{
UInt64 limit = 0;
if (prev_pos == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index->createIndexAggregator();
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
if (settings.can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark++;
}
size_t pos = prev_pos;
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit);
if (pos == prev_pos + limit)
{
++skip_index_filling[i];
/// write index if it is filled
if (skip_index_filling[i] == index->granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0;
}
}
prev_pos = pos;
}
}
skip_index_data_mark = skip_index_current_data_mark;
}
void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool write_final_mark)
{
if (index_stream)
{
if (write_final_mark)
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
index_columns[j]->insert(last_index_row[j]);
index_types[j]->serializeBinary(last_index_row[j], *index_stream);
}
last_index_row.clear();
index_granularity.appendMark(0);
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_stream = nullptr;
}
}
void IMergeTreeDataPartWriter::finishSkipIndicesSerialization(
MergeTreeData::DataPart::Checksums & checksums)
{
for (size_t i = 0; i < skip_indices.size(); ++i)
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
}
for (auto & stream : skip_indices_streams)
{
stream->finalize();
stream->addToChecksums(checksums);
}
skip_indices_streams.clear();
skip_indices_aggregators.clear();
skip_index_filling.clear();
}
void IMergeTreeDataPartWriter::next()
{
current_mark = next_mark;
index_offset = next_index_offset;
}
}

View File

@ -9,7 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/IBlockOutputStream.h>
// #include <Storages/MergeTree/MergeTreeData.h>
// #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
@ -62,9 +62,11 @@ public:
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const WriterSettings & settings);
const WriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
virtual MarkWithOffset write(
const Block & block, const IColumn::Permutation * permutation,
@ -73,10 +75,21 @@ public:
const Block & primary_key_block = {}, const Block & skip_indexes_block = {},
bool skip_offsets = false, const WrittenOffsetColumns & already_written_offset_columns = {}) = 0;
virtual void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) = 0;
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) = 0;
virtual ~IMergeTreeDataPartWriter();
/// Count index_granularity for block and store in `index_granularity`
void fillIndexGranularity(const Block & block);
void initSkipIndices();
void initPrimaryIndex();
void calculateAndSerializePrimaryIndex(const Block & primary_index_block, size_t rows);
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, size_t rows);
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool write_final_mark);
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
void next();
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
@ -86,8 +99,38 @@ protected:
NamesAndTypesList columns_list;
const String marks_file_extension;
MergeTreeIndexGranularity index_granularity;
CompressionCodecPtr default_codec;
bool can_use_adaptive_granularity;
bool blocks_are_granules_size;
bool compute_granularity;
bool with_final_mark;
size_t current_mark = 0;
size_t index_offset = 0;
size_t next_mark = 0;
size_t next_index_offset = 0;
/// Number of mark in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;
std::vector<MergeTreeIndexPtr> skip_indices;
std::vector<std::unique_ptr<IMergeTreeDataPartWriter::ColumnStream>> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MutableColumns index_columns;
DataTypes index_types;
/// Index columns values from the last row from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method
Row last_index_row;
WriterSettings settings;
};

View File

@ -48,8 +48,6 @@ public:
const NamesAndTypesList & getColumns() const { return columns; }
MergeTreeData::DataPartPtr data_part;
size_t getFirstMarkToRead() const
{
return all_mark_ranges.back().begin;
@ -62,6 +60,8 @@ protected:
using LoadFunc = std::function<MarksPtr()>;
MarksPtr loadMarks(const String & mrk_path, const LoadFunc & load_func);
MergeTreeData::DataPartPtr data_part;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk

View File

@ -41,160 +41,6 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
}
void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
bool can_use_adaptive_index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (!can_use_adaptive_index_granularity)
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
if (blocks_are_granules)
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes;
}
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
UNUSED(index_offset);
UNUSED(index_granularity);
/// FIXME
// for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
// index_granularity.appendMark(index_granularity_for_block);
}
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
const auto storage_settings = storage.getSettings();
fillIndexGranularityImpl(
block,
storage_settings->index_granularity_bytes,
storage_settings->index_granularity,
blocks_are_granules_size,
index_offset,
index_granularity,
can_use_adaptive_granularity);
}
void IMergedBlockOutputStream::initSkipIndices()
{
for (const auto & index : skip_indices)
{
String stream_name = index->getFileName();
skip_indices_streams.emplace_back(
std::make_unique<IMergeTreeDataPartWriter::ColumnStream>(
stream_name,
part_path + stream_name, INDEX_FILE_EXTENSION,
part_path + stream_name, marks_file_extension,
codec, writer_settings.max_compress_block_size,
0, writer_settings.aio_threshold));
skip_indices_aggregators.push_back(index->createIndexAggregator());
skip_index_filling.push_back(0);
}
}
void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
const Block & skip_indexes_block, size_t rows)
{
size_t skip_index_current_data_mark = 0;
/// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
for (size_t i = 0; i < skip_indices.size(); ++i)
{
const auto index = skip_indices[i];
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
skip_index_current_data_mark = skip_index_data_mark;
while (prev_pos < rows)
{
UInt64 limit = 0;
if (prev_pos == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index->createIndexAggregator();
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= writer_settings.min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
if (can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark++;
}
size_t pos = prev_pos;
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit);
if (pos == prev_pos + limit)
{
++skip_index_filling[i];
/// write index if it is filled
if (skip_index_filling[i] == index->granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0;
}
}
prev_pos = pos;
}
}
skip_index_data_mark = skip_index_current_data_mark;
}
void IMergedBlockOutputStream::finishSkipIndicesSerialization(
MergeTreeData::DataPart::Checksums & checksums)
{
for (size_t i = 0; i < skip_indices.size(); ++i)
{
auto & stream = *skip_indices_streams[i];
if (!skip_indices_aggregators[i]->empty())
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
}
for (auto & stream : skip_indices_streams)
{
stream->finalize();
stream->addToChecksums(checksums);
}
skip_indices_streams.clear();
skip_indices_aggregators.clear();
skip_index_filling.clear();
}
/// Implementation of IMergedBlockOutputStream::ColumnStream.
}

View File

@ -32,20 +32,6 @@ protected:
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Count index_granularity for block and store in `index_granularity`
void fillIndexGranularity(const Block & block);
/// Write final mark to the end of column
void writeFinalMark(
const std::string & column_name,
const DataTypePtr column_type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
DB::IDataType::SubstreamPath & path);
void initSkipIndices();
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, size_t rows);
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
protected:
const MergeTreeData & storage;
@ -53,29 +39,29 @@ protected:
String part_path;
/// The offset to the first row of the block for which you want to write the index.
size_t index_offset = 0;
// size_t index_offset = 0;
WriterSettings writer_settings;
size_t current_mark = 0;
// size_t current_mark = 0;
/// Number of mark in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;
// size_t skip_index_data_mark = 0;
const bool can_use_adaptive_granularity;
const std::string marks_file_extension;
const bool blocks_are_granules_size;
// const bool can_use_adaptive_granularity;
// const std::string marks_file_extension;
// const bool blocks_are_granules_size;
MergeTreeIndexGranularity index_granularity;
// MergeTreeIndexGranularity index_granularity;
const bool compute_granularity;
// const bool compute_granularity;
CompressionCodecPtr codec;
std::vector<MergeTreeIndexPtr> skip_indices;
std::vector<std::unique_ptr<IMergeTreeDataPartWriter::ColumnStream>> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
// std::vector<MergeTreeIndexPtr> skip_indices;
// std::vector<std::unique_ptr<IMergeTreeDataPartWriter::ColumnStream>> skip_indices_streams;
// MergeTreeIndexAggregators skip_indices_aggregators;
// std::vector<size_t> skip_index_filling;
MergeTreeWriterPtr writer;

View File

@ -989,7 +989,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_and_skip_indices_expr));
IMergeTreeDataPart::MinMaxIndex minmax_idx;
IMergeTreeDataPart::MinMaxIndex minmax_idx;
WriterSettings writer_settings;
auto part_writer = new_data_part->getWriter(all_columns, compression_codec, blocks_are_granules_size, )
MergedBlockOutputStream out(new_data_part, all_columns, compression_codec);
@ -1092,6 +1095,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->index_granularity = source_part->index_granularity;
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
new_data_part,
updated_header,

View File

@ -82,7 +82,8 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const
{
return std::make_unique<MergeTreeReaderCompact>(shared_from_this(), columns_to_read, uncompressed_cache,
return std::make_unique<MergeTreeReaderCompact>(
shared_from_this(), columns_to_read, uncompressed_cache,
mark_cache, mark_ranges, reader_settings, avg_value_size_hints);
}

View File

@ -158,19 +158,6 @@ String MergeTreeDataPartWide::getColumnNameWithMinumumCompressedSize() const
return *minimum_size_column;
}
UInt64 MergeTreeDataPartWide::calculateTotalSizeOnDisk(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
return cur.getSize();
std::vector<std::string> files;
cur.list(files);
UInt64 res = 0;
for (const auto & file : files)
res += calculateTotalSizeOnDisk(from + file);
return res;
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullPath();
@ -226,7 +213,6 @@ void MergeTreeDataPartWide::loadIndexGranularity()
// }
// bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDataType & type) const
// {
// bool res = true;

View File

@ -60,8 +60,10 @@ public:
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec_,
const WriterSettings & writer_settings) const override;
const WriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity = {}) const override;
bool isStoredOnDisk() const override { return true; }
@ -73,10 +75,6 @@ public:
Type getType() const override { return Type::WIDE; }
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const String & from);
private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
@ -86,7 +84,6 @@ private:
void checkConsistency(bool require_part_metadata);
};
// using MergeTreeDataPartState =IMergeTreeDataPart::State;
}

View File

@ -28,7 +28,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
part_path + DATA_FILE_NAME, marks_file_extension,
default_codec,
settings.max_compress_block_size,
0,
settings.estimated_size,
settings.aio_threshold);
}
@ -78,6 +78,7 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
// size_t rows_to_write = std::min(total_rows, index_granularity.getMarkRows(current_mark));
size_t rows_to_write = total_rows;
index_granularity.appendMark(total_rows);
// if (current_row == 0 && index_offset != 0)
// {
// rows_to_write = index_offset;
@ -112,6 +113,7 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
for (size_t i = 0; i < columns_to_write.size(); ++i)
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
}
current_row = next_row;
}
@ -120,10 +122,10 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows)
{
std::cerr << "(writeColumnSingleGranule) writing column: " << column.name << "\n";
std::cerr << "(writeColumnSingleGranule) from_row: " << from_row << "\n";
std::cerr << "(writeColumnSingleGranule) number_of_rows: " << number_of_rows << "\n";
IDataType::SerializeBinaryBulkStatePtr state;
IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -148,6 +150,8 @@ void MergeTreeDataPartWriterCompact::finalize(IMergeTreeDataPart::Checksums & ch
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
}
if (compute_granularity)
index_granularity.appendMark(0);
}
stream->finalize();

View File

@ -22,20 +22,9 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
default_codec_, settings_)
, can_use_adaptive_granularity(storage_.canUseAdaptiveGranularity())
{
size_t total_size = 0;
if (settings.aio_threshold > 0 && !merged_column_to_size.empty())
{
for (const auto & it : columns_list)
{
auto it2 = merged_column_to_size.find(it.name);
if (it2 != merged_column_to_size.end())
total_size += it2->second;
}
}
const auto & columns = storage.getColumns();
for (const auto & it : columns_list)
addStreams(it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec), total_size, false);
addStreams(it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec), settings.estimated_size, false);
}
void MergeTreeDataPartWriterWide::addStreams(
@ -100,13 +89,13 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterWide::write(cons
{
serialization_states.reserve(columns_list.size());
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings settings;
IDataType::SerializeBinaryBulkSettings serialize_settings;
for (const auto & col : columns_list)
{
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
serialize_settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
serialization_states.emplace_back(nullptr);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());
col.type->serializeBinaryBulkStatePrefix(serialize_settings, serialization_states.back());
}
}
@ -174,7 +163,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (can_use_adaptive_granularity)
if (settings.can_use_adaptive_granularity)
writeIntBinary(number_of_rows, stream.marks);
}, path);
}
@ -290,10 +279,13 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
}
}, serialize_settings.path);
next_mark = current_column_mark;
next_index_offset = current_row - total_rows;
return std::make_pair(current_column_mark, current_row - total_rows);
}
void MergeTreeDataPartWriterWide::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync)
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync)
{
const auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;

View File

@ -24,7 +24,7 @@ public:
const Block & primary_key_block = {}, const Block & skip_indexes_block = {},
bool skip_offsets = false, const WrittenOffsetColumns & already_written_offset_columns = {}) override;
void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);

View File

@ -175,7 +175,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
IMergeTreeDataPart::MinMaxIndex minmax_idx;
IMergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);
MergeTreePartition partition(std::move(block_with_partition.partition));
@ -263,6 +263,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
MergedBlockOutputStream out(new_data_part, columns, compression_codec);
out.writePrefix();

View File

@ -3,19 +3,38 @@
namespace DB
{
struct ReaderSettings
struct ReaderSettings
{
size_t min_bytes_to_use_direct_io = 0;
size_t max_read_buffer_size = 0;
bool save_marks_in_cache = false;
};
struct WriterSettings
{
WriterSettings(const Settings & settings, bool can_use_adaptive_granularity_, bool blocks_are_granules_size_ = false)
: min_compress_block_size(settings.min_compress_block_size)
, max_compress_block_size(settings.max_compress_block_size)
, aio_threshold(settings.min_bytes_to_use_direct_io)
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, blocks_are_granules_size(blocks_are_granules_size_) {}
WriterSettings & setAdaptive(bool value)
{
size_t min_bytes_to_use_direct_io = 0;
size_t max_read_buffer_size = 0;
bool save_marks_in_cache = false;
};
struct WriterSettings
can_use_adaptive_granularity = value;
}
WriterSettings & setAioThreshHold(size_t value)
{
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
// String marks_file_extension;
};
}
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;
bool can_use_adaptive_granularity;
bool blocks_are_granules_size;
size_t estimated_size = 0;
};
}

View File

@ -35,31 +35,43 @@ MergedBlockOutputStream::MergedBlockOutputStream(
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
, columns_list(columns_list_)
{
init();
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
{
const auto & global_settings = data_part->storage.global_context.getSettings();
writer = data_part_->getWriter(columns_list_, default_codec_, WriterSettings(global_settings));
}
can_use_adaptive_granularity
index_granularity
skip_indices
blocks_are_granule_size
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part_,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
const MergeTreeData::DataPart::ColumnToSize & /* merged_column_to_size_ */, // FIXME
size_t aio_threshold_,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
data_part_, default_codec_,
{
data_part_->storage.global_context.getSettings().min_compress_block_size,
data_part_->storage.global_context.getSettings().max_compress_block_size,
aio_threshold_
},
blocks_are_granules_size_,
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
, columns_list(columns_list_)
{
init();
WriterSettings writer_settings(data_part->storage.global_context.getSettings());
writer_settings.aio_threshold = aio_threshold;
if (aio_threshold > 0 && !merged_column_to_size.empty())
{
for (const auto & it : columns_list)
{
auto it2 = merged_column_to_size.find(it.name);
if (it2 != merged_column_to_size.end())
writer_settings.estimated_size += it2->second;
}
}
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
}
@ -101,34 +113,12 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
/// Finish columns serialization.
bool write_final_mark = (with_final_mark && rows_count != 0);
writer->finalize(checksums, write_final_mark);
if (write_final_mark)
index_granularity.appendMark(0); /// last mark
writer->finishPrimaryIndexSerialization(checksums, write_final_mark);
writer->finishSkipIndicesSerialization(checksums);
if (!total_column_list)
total_column_list = &columns_list;
if (index_stream)
{
if (with_final_mark && rows_count != 0)
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
auto & column = *last_index_row[j].column;
index_columns[j]->insertFrom(column, 0); /// it has only one element
last_index_row[j].type->serializeBinary(column, 0, *index_stream);
}
last_index_row.clear();
}
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_stream = nullptr;
}
finishSkipIndicesSerialization(checksums);
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
new_part->partition.store(storage, part_path, checksums);
@ -179,16 +169,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
if (storage.hasPrimaryKey())
{
index_file_stream = std::make_unique<WriteBufferFromFile>(
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
initSkipIndices();
}
@ -242,66 +222,14 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
if (index_columns.empty())
{
index_columns.resize(primary_key_column_names.size());
last_index_row.resize(primary_key_column_names.size());
for (size_t i = 0, size = primary_key_column_names.size(); i < size; ++i)
{
last_index_row[i] = primary_key_block.getByPosition(i).cloneEmpty();
index_columns[i] = last_index_row[i].column->cloneEmpty();
}
}
writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block);
writer->calculateAndSerializeSkipIndices(skip_indexes_block, rows);
writer->calculateAndSerializePrimaryIndex(primary_index_block);
writer->next();
size_t new_index_offset = writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block).second;
rows_count += rows;
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
calculateAndSerializeSkipIndices(skip_indexes_block, rows);
{
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),
* but then freed in completely different place (while merging parts), where query memory_tracker is not available.
* And otherwise it will look like excessively growing memory consumption in context of query.
* (observed in long INSERT SELECTs)
*/
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows;)
{
if (storage.hasPrimaryKey())
{
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
const auto & primary_column = primary_key_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, i);
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
}
}
++current_mark;
if (current_mark < index_granularity.getMarksCount())
i += index_granularity.getMarkRows(current_mark);
else
break;
}
}
/// store last index row to write final mark at the end of column
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
const IColumn & primary_column = *primary_key_block.getByPosition(j).column.get();
auto mutable_column = std::move(*last_index_row[j].column).mutate();
if (!mutable_column->empty())
mutable_column->popBack(1);
mutable_column->insertFrom(primary_column, rows - 1);
last_index_row[j].column = std::move(mutable_column);
}
index_offset = new_index_offset;
// index_offset = new_index_offset;
}
}

View File

@ -47,11 +47,6 @@ public:
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const MergeTreeIndexGranularity & getIndexGranularity() const
{
return index_granularity;
}
private:
void init();
@ -64,13 +59,6 @@ private:
NamesAndTypesList columns_list;
size_t rows_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MutableColumns index_columns;
/// Index columns values from the last row from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method
ColumnsWithTypeAndName last_index_row;
};
}

View File

@ -59,7 +59,8 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
UNUSED(skip_offsets);
UNUSED(already_written_offset_columns);
auto [new_current_mark, new_index_offset] = writer->write(block, nullptr, current_mark, index_offset, index_granularity);
auto [new_current_mark, new_index_offset] = writer->write(block, nullptr, current_mark, index_offset,
index_granularity, {}, {}, skip_offsets, already_written_offset_columns);
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
@ -81,8 +82,6 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
bool write_final_mark = with_final_mark && (index_offset != 0 || current_mark != 0);
writer->finalize(checksums, write_final_mark, sync);
finishSkipIndicesSerialization(checksums);
return checksums;
}