polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-07 14:11:38 +03:00
parent 7293841003
commit 8cf6236936
19 changed files with 200 additions and 223 deletions

View File

@ -70,13 +70,13 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
: part_path(part_path_)
, storage(storage_)
, columns_list(columns_list_)
, skip_indices(indices_to_recalc_)
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, settings(settings_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty)
, with_final_mark(storage.getSettings()->write_final_mark && can_use_adaptive_granularity)
, default_codec(default_codec_)
, skip_indices(indices_to_recalc_)
, settings(settings_)
, compute_granularity(index_granularity.empty())
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity)
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
@ -314,8 +314,8 @@ void IMergeTreeDataPartWriter::finishSkipIndicesSerialization(
stream->finalize();
stream->addToChecksums(checksums);
}
skip_indices_streams.clear();
skip_indices_streams.clear();
skip_indices_aggregators.clear();
skip_index_filling.clear();
}

View File

@ -68,12 +68,10 @@ public:
const WriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
virtual MarkWithOffset write(
virtual void write(
const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t offset, MergeTreeIndexGranularity & index_granularity,
/* Blocks with already sorted index columns */
const Block & primary_key_block = {}, const Block & skip_indexes_block = {},
bool skip_offsets = false, const WrittenOffsetColumns & already_written_offset_columns = {}) = 0;
const Block & primary_key_block = {}, const Block & skip_indexes_block = {}) = 0;
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) = 0;
@ -82,6 +80,14 @@ public:
/// Count index_granularity for block and store in `index_granularity`
void fillIndexGranularity(const Block & block);
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
/// FIXME
MutableColumns && getIndexColumns()
{
return std::move(index_columns);
}
void initSkipIndices();
void initPrimaryIndex();
void calculateAndSerializePrimaryIndex(const Block & primary_index_block, size_t rows);
@ -92,7 +98,7 @@ public:
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
using SerializationStates = std::unordered_map<String, SerializationState>;
String part_path;
const MergeTreeData & storage;
@ -103,8 +109,11 @@ protected:
CompressionCodecPtr default_codec;
bool can_use_adaptive_granularity;
bool blocks_are_granules_size;
std::vector<MergeTreeIndexPtr> skip_indices;
WriterSettings settings;
bool compute_granularity;
bool with_final_mark;
@ -118,7 +127,6 @@ protected:
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;
std::vector<MergeTreeIndexPtr> skip_indices;
std::vector<std::unique_ptr<IMergeTreeDataPartWriter::ColumnStream>> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
@ -131,7 +139,8 @@ protected:
/// It's written to index file in the `writeSuffixAndFinalizePart` method
Row last_index_row;
WriterSettings settings;
bool data_written = false;
};
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;

View File

@ -55,13 +55,13 @@ public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeData::DataPartPtr data_part;
protected:
using LoadFunc = std::function<MarksPtr()>;
MarksPtr loadMarks(const String & mrk_path, const LoadFunc & load_func);
MergeTreeData::DataPartPtr data_part;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk

View File

@ -14,31 +14,15 @@ namespace ErrorCodes
namespace
{
// constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto INDEX_FILE_EXTENSION = ".idx";
// constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
CompressionCodecPtr codec_,
const WriterSettings & writer_settings_,
bool blocks_are_granules_size_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
bool can_use_adaptive_granularity_)
const MergeTreeDataPartPtr & data_part)
: storage(data_part->storage)
, part_path(data_part->getFullPath())
, writer_settings(writer_settings_)
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, marks_file_extension(data_part->getMarksFileExtension())
, blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(data_part->index_granularity)
, compute_granularity(index_granularity.empty())
, codec(std::move(codec_))
, skip_indices(indices_to_recalc)
, with_final_mark(storage.getSettings()->write_final_mark && can_use_adaptive_granularity)
{
if (blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
}
/// Implementation of IMergedBlockOutputStream::ColumnStream.

View File

@ -17,15 +17,15 @@ class IMergedBlockOutputStream : public IBlockOutputStream
{
public:
IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
CompressionCodecPtr codec_,
const WriterSettings & writer_settings_,
bool blocks_are_granules_size_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
bool can_use_adaptive_granularity_);
const MergeTreeDataPartPtr & data_part);
using WrittenOffsetColumns = std::set<std::string>;
const MergeTreeIndexGranularity & getIndexGranularity()
{
return writer->getIndexGranularity();
}
protected:
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::vector<SerializationState>;
@ -35,14 +35,11 @@ protected:
protected:
const MergeTreeData & storage;
SerializationStates serialization_states;
String part_path;
/// The offset to the first row of the block for which you want to write the index.
// size_t index_offset = 0;
WriterSettings writer_settings;
// size_t current_mark = 0;
/// Number of mark in data from which skip indices have to start
@ -56,7 +53,6 @@ protected:
// MergeTreeIndexGranularity index_granularity;
// const bool compute_granularity;
CompressionCodecPtr codec;
// std::vector<MergeTreeIndexPtr> skip_indices;
// std::vector<std::unique_ptr<IMergeTreeDataPartWriter::ColumnStream>> skip_indices_streams;
@ -65,7 +61,7 @@ protected:
MergeTreeWriterPtr writer;
const bool with_final_mark;
// const bool with_final_mark;
};
}

View File

@ -4,7 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/MergeTreeReaderSettings.h>
// #include <Storages/MergeTree/MergeTreeReaderSettings.h>
namespace DB

View File

@ -450,6 +450,11 @@ public:
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
std::vector<MergeTreeIndexPtr> getSkipIndices() const
{
return std::vector<MergeTreeIndexPtr>(std::begin(skip_indices), std::end(skip_indices));
}
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;

View File

@ -991,10 +991,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
IMergeTreeDataPart::MinMaxIndex minmax_idx;
WriterSettings writer_settings;
auto part_writer = new_data_part->getWriter(all_columns, compression_codec, blocks_are_granules_size, )
MergedBlockOutputStream out(new_data_part, all_columns, compression_codec);
MergedBlockOutputStream out{
new_data_part,
all_columns,
compression_codec};
in->readPrefix();
out.writePrefix();

View File

@ -89,13 +89,15 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const
const WriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) const
{
return std::make_unique<MergeTreeDataPartWriterCompact>(
getFullPath(), storage, columns_list,
getFullPath(), storage, columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings);
default_codec, writer_settings, computed_index_granularity);
}
/// Takes into account the fact that several columns can e.g. share their .size substreams.

View File

@ -60,8 +60,10 @@ public:
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const override;
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec_,
const WriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity = {}) const override;
bool isStoredOnDisk() const override { return true; }

View File

@ -89,13 +89,15 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const
const WriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) const
{
return std::make_unique<MergeTreeDataPartWriterWide>(
getFullPath(), storage, columns_list,
getFullPath(), storage, columns_list, indices_to_recalc,
index_granularity_info.marks_file_extension,
default_codec, writer_settings);
default_codec, writer_settings, computed_index_granularity);
}

View File

@ -14,13 +14,15 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_)
const WriterSettings & settings_,
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(part_path_,
storage_, columns_list_,
marks_file_extension_,
default_codec_, settings_)
indices_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
stream = std::make_unique<ColumnStream>(
DATA_FILE_NAME,
@ -32,20 +34,20 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
settings.aio_threshold);
}
IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
void MergeTreeDataPartWriterCompact::write(
const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block,
bool skip_offsets, const WrittenOffsetColumns & already_written_offset_columns)
const Block & primary_key_block, const Block & skip_indexes_block)
{
UNUSED(skip_offsets);
UNUSED(already_written_offset_columns);
size_t total_rows = block.rows();
size_t current_mark = from_mark;
size_t from_mark = current_mark;
size_t current_row = 0;
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
ColumnsWithTypeAndName columns_to_write(columns_list.size());
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
@ -68,8 +70,6 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
std::cerr << "(MergeTreeDataPartWriterCompact::write) total_rows: " << total_rows << "\n";
UNUSED(index_offset);
while (current_row < total_rows)
{
std::cerr << "(MergeTreeDataPartWriterCompact::write) current_row: " << current_row << "\n";
@ -79,6 +79,9 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
size_t rows_to_write = total_rows;
index_granularity.appendMark(total_rows);
if (rows_to_write)
data_written = true;
// if (current_row == 0 && index_offset != 0)
// {
// rows_to_write = index_offset;
@ -106,7 +109,7 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
writeIntBinary(stream->compressed.offset(), stream->marks);
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
}
++current_mark;
++from_mark;
}
else
{
@ -117,7 +120,8 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterCompact::write(
current_row = next_row;
}
return {current_mark, total_rows - current_row};
next_mark = from_mark;
next_index_offset = total_rows - current_row;
}
size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows)
@ -140,9 +144,11 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith
return from_row + number_of_rows;
}
void MergeTreeDataPartWriterCompact::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync)
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync)
{
if (write_final_mark)
UNUSED(write_final_mark);
if (with_final_mark && data_written)
{
writeIntBinary(0ULL, stream->marks);
for (size_t i = 0; i < columns_list.size(); ++i)

View File

@ -10,16 +10,16 @@ public:
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const WriterSettings & settings);
const WriterSettings & settings,
const MergeTreeIndexGranularity & index_granularity);
MarkWithOffset write(const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset, MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block,
bool skip_offsets = false, const WrittenOffsetColumns & already_written_offset_columns = {}) override;
void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block) override;
void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) override;
private:
/// Write single granule of one column (rows between 2 marks)

View File

@ -12,14 +12,14 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_,
const ColumnToSize & merged_column_to_size)
const MergeTreeIndexGranularity & index_granularity_)
: IMergeTreeDataPartWriter(part_path_,
storage_, columns_list_,
marks_file_extension_,
default_codec_, settings_)
storage_, columns_list_, indices_to_recalc_,
marks_file_extension_, default_codec_, settings_, index_granularity_)
, can_use_adaptive_granularity(storage_.canUseAdaptiveGranularity())
{
const auto & columns = storage.getColumns();
@ -79,27 +79,31 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
};
}
IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterWide::write(const Block & block,
const IColumn::Permutation * permutation, size_t from_mark, size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block,
bool skip_offsets, const WrittenOffsetColumns & already_written_offset_columns)
void MergeTreeDataPartWriterWide::write(const Block & block,
const IColumn::Permutation * permutation,
const Block & primary_key_block, const Block & skip_indexes_block)
{
if (serialization_states.empty())
{
serialization_states.reserve(columns_list.size());
WrittenOffsetColumns tmp_offset_columns;
IDataType::SerializeBinaryBulkSettings serialize_settings;
// if (serialization_states.empty())
// {
// serialization_states.reserve(columns_list.size());
// WrittenOffsetColumns tmp_offset_columns;
// IDataType::SerializeBinaryBulkSettings serialize_settings;
for (const auto & col : columns_list)
{
serialize_settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
serialization_states.emplace_back(nullptr);
col.type->serializeBinaryBulkStatePrefix(serialize_settings, serialization_states.back());
}
}
// for (const auto & col : columns_list)
// {
// serialize_settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
// serialization_states.emplace_back(nullptr);
// col.type->serializeBinaryBulkStatePrefix(serialize_settings, serialization_states.back());
// }
// }
WrittenOffsetColumns offset_columns = already_written_offset_columns;
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
WrittenOffsetColumns offset_columns;
MarkWithOffset result;
auto it = columns_list.begin();
@ -112,27 +116,25 @@ IMergeTreeDataPartWriter::MarkWithOffset MergeTreeDataPartWriterWide::write(cons
if (primary_key_block.has(it->name))
{
const auto & primary_column = *primary_key_block.getByName(it->name).column;
result = writeColumn(column.name, *column.type, primary_column, index_granularity, offset_columns, skip_offsets, serialization_states[i], from_mark, index_offset);
result = writeColumn(column.name, *column.type, primary_column, offset_columns);
}
else if (skip_indexes_block.has(it->name))
{
const auto & index_column = *skip_indexes_block.getByName(it->name).column;
result = writeColumn(column.name, *column.type, index_column, index_granularity, offset_columns, skip_offsets, serialization_states[i], from_mark, index_offset);
result = writeColumn(column.name, *column.type, index_column, offset_columns);
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permuted_column = column.column->permute(*permutation, 0);
result = writeColumn(column.name, *column.type, *permuted_column, index_granularity, offset_columns, skip_offsets, serialization_states[i], from_mark, index_offset);
result = writeColumn(column.name, *column.type, *permuted_column, offset_columns);
}
}
else
{
result = writeColumn(column.name, *column.type, *column.column, index_granularity, offset_columns, skip_offsets, serialization_states[i], from_mark, index_offset);
result = writeColumn(column.name, *column.type, *column.column, offset_columns);
}
}
return result;
}
void MergeTreeDataPartWriterWide::writeSingleMark(
@ -210,16 +212,19 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
const String & name,
const IDataType & type,
const IColumn & column,
MergeTreeIndexGranularity & index_granularity,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark,
size_t index_offset)
bool skip_offsets)
{
auto [it, inserted] = serialization_states.emplace(name, nullptr);
if (inserted)
{
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, false);
type.serializeBinaryBulkStatePrefix(serialize_settings, it->second);
}
std::cerr << "(writeColumn) table: " << storage.getTableName() << "\n";
std::cerr << "(writeColumn) column: " << name << "\n";
std::cerr << "(writeColumn) from_mark: " << from_mark << "\n";
std::cerr << "(writeColumn) index_offset: " << index_offset << "\n";
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
@ -229,7 +234,7 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
size_t total_rows = column.size();
size_t current_row = 0;
size_t current_column_mark = from_mark;
size_t current_column_mark = current_mark;
while (current_row < total_rows)
{
size_t rows_to_write;
@ -251,13 +256,16 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
rows_to_write = index_granularity.getMarkRows(current_column_mark);
}
if (rows_to_write != 0)
data_written = true;
current_row = writeSingleGranule(
name,
type,
column,
offset_columns,
skip_offsets,
serialization_state,
it->second,
serialize_settings,
current_row,
rows_to_write,
@ -268,7 +276,6 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
current_column_mark++;
}
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
@ -300,7 +307,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
}
if (write_final_mark)

View File

@ -13,16 +13,14 @@ public:
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const WriterSettings & settings,
const ColumnToSize & merged_column_to_size = {});
const MergeTreeIndexGranularity & index_granularity);
MarkWithOffset write(const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset,
MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block = {}, const Block & skip_indexes_block = {},
bool skip_offsets = false, const WrittenOffsetColumns & already_written_offset_columns = {}) override;
void write(const Block & block, const IColumn::Permutation * permutation,
const Block & primary_key_block = {}, const Block & skip_indexes_block = {}) override;
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark, bool sync = false) override;
@ -35,12 +33,8 @@ public:
const String & name,
const IDataType & type,
const IColumn & column,
MergeTreeIndexGranularity & index_granularity,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
IDataType::SerializeBinaryBulkStatePtr & serialization_state,
size_t from_mark,
size_t index_offset);
bool skip_offsets = false);
private:
/// Write single granule of one column (rows between 2 marks)

View File

@ -1,8 +1,10 @@
#pragma once
#include <cstddef>
#include <cstddef>
#include <Core/Settings.h>
namespace DB
{
struct ReaderSettings
{
size_t min_bytes_to_use_direct_io = 0;
@ -12,23 +14,13 @@ struct ReaderSettings
struct WriterSettings
{
WriterSettings(const Settings & settings, bool can_use_adaptive_granularity_, bool blocks_are_granules_size_ = false)
: min_compress_block_size(settings.min_compress_block_size)
, max_compress_block_size(settings.max_compress_block_size)
, aio_threshold(settings.min_bytes_to_use_direct_io)
WriterSettings(const Settings & global_settings, bool can_use_adaptive_granularity_, bool blocks_are_granules_size_ = false)
: min_compress_block_size(global_settings.min_compress_block_size)
, max_compress_block_size(global_settings.min_compress_block_size)
, aio_threshold(global_settings.min_bytes_to_use_direct_io)
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, blocks_are_granules_size(blocks_are_granules_size_) {}
WriterSettings & setAdaptive(bool value)
{
can_use_adaptive_granularity = value;
}
WriterSettings & setAioThreshHold(size_t value)
{
}
size_t min_compress_block_size;
size_t max_compress_block_size;
size_t aio_threshold;

View File

@ -20,46 +20,30 @@ namespace ErrorCodes
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part_,
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
data_part_, default_codec_,
{
data_part_->storage.global_context.getSettings().min_compress_block_size,
data_part_->storage.global_context.getSettings().max_compress_block_size,
data_part_->storage.global_context.getSettings().min_bytes_to_use_direct_io
},
blocks_are_granules_size_,
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
CompressionCodecPtr default_codec,
bool blocks_are_granules_size)
: IMergedBlockOutputStream(data_part)
, columns_list(columns_list_)
{
const auto & global_settings = data_part->storage.global_context.getSettings();
writer = data_part_->getWriter(columns_list_, default_codec_, WriterSettings(global_settings));
WriterSettings writer_settings(data_part->storage.global_context.getSettings(),
data_part->storage.canUseAdaptiveGranularity(), blocks_are_granules_size);
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
}
can_use_adaptive_granularity
index_granularity
skip_indices
blocks_are_granule_size
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part_,
const MergeTreeDataPartPtr & data_part,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
data_part_, default_codec_,
blocks_are_granules_size_,
std::vector<MergeTreeIndexPtr>(std::begin(data_part_->storage.skip_indices), std::end(data_part_->storage.skip_indices)),
data_part_->storage.canUseAdaptiveGranularity())
bool blocks_are_granules_size)
: IMergedBlockOutputStream(data_part)
, columns_list(columns_list_)
{
WriterSettings writer_settings(data_part->storage.global_context.getSettings());
WriterSettings writer_settings(data_part->storage.global_context.getSettings(),
data_part->storage.canUseAdaptiveGranularity(), blocks_are_granules_size);
writer_settings.aio_threshold = aio_threshold;
if (aio_threshold > 0 && !merged_column_to_size.empty())
@ -72,7 +56,8 @@ MergedBlockOutputStream::MergedBlockOutputStream(
}
}
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
writer = data_part->getWriter(columns_list,
data_part->storage.getSkipIndices(), default_codec, writer_settings);
}
std::string MergedBlockOutputStream::getPartPath() const
@ -111,8 +96,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
checksums = std::move(*additional_column_checksums);
/// Finish columns serialization.
bool write_final_mark = (with_final_mark && rows_count != 0);
writer->finalize(checksums, write_final_mark);
bool write_final_mark = true; /// FIXME
writer->finishDataSerialization(checksums, write_final_mark);
writer->finishPrimaryIndexSerialization(checksums, write_final_mark);
writer->finishSkipIndicesSerialization(checksums);
@ -161,10 +146,12 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
/// FIXME
auto index_columns = writer->getIndexColumns();
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
new_part->index_granularity = index_granularity;
new_part->index_granularity = writer->getIndexGranularity();
}
void MergedBlockOutputStream::init()
@ -179,12 +166,6 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
if (!rows)
return;
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
Block primary_key_block;
Block skip_indexes_block;
@ -222,9 +203,9 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block);
writer->write(block, permutation, primary_key_block, skip_indexes_block);
writer->calculateAndSerializeSkipIndices(skip_indexes_block, rows);
writer->calculateAndSerializePrimaryIndex(primary_index_block);
writer->calculateAndSerializePrimaryIndex(primary_key_block, rows);
writer->next();
rows_count += rows;

View File

@ -4,40 +4,36 @@ namespace DB
{
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part_, const Block & header_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
const MergeTreeDataPartPtr & data_part, const Block & header_, bool sync_,
CompressionCodecPtr default_codec, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
WrittenOffsetColumns & already_written_offset_columns_,
const MergeTreeIndexGranularityInfo * index_granularity_info_)
: IMergedBlockOutputStream(
data_part_, default_codec_,
{
data_part_->storage.global_context.getSettings().min_compress_block_size,
data_part_->storage.global_context.getSettings().max_compress_block_size,
data_part_->storage.global_context.getSettings().min_bytes_to_use_direct_io,
},
false,
indices_to_recalc_,
index_granularity_info_ ? index_granularity_info_->is_adaptive : data_part_->storage.canUseAdaptiveGranularity()),
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part),
header(header_), sync(sync_), skip_offsets(skip_offsets_),
already_written_offset_columns(already_written_offset_columns_)
{
std::cerr << "(MergedColumnOnlyOutputStream) storage: " << storage.getTableName() << "\n";
std::cerr << "(MergedColumnOnlyOutputStream) can_use_adaptive_granularity: " << can_use_adaptive_granularity << "\n";
std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info: " << !!index_granularity_info_ << "\n";
if (index_granularity_info_)
std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info->isAdaptive(): " << index_granularity_info_->is_adaptive << "\n";
// std::cerr << "(MergedColumnOnlyOutputStream) storage: " << storage.getTableName() << "\n";
// std::cerr << "(MergedColumnOnlyOutputStream) can_use_adaptive_granularity: " << can_use_adaptive_granularity << "\n";
// std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info: " << !!index_granularity_info_ << "\n";
// if (index_granularity_info_)
// std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info->isAdaptive(): " << index_granularity_info_->is_adaptive << "\n";
writer = data_part_->getWriter(header.getNamesAndTypesList(), default_codec_, writer_settings);
WriterSettings writer_settings(data_part->storage.global_context.getSettings(), false);
if (index_granularity_info && !index_granularity_info->is_adaptive)
writer_settings.can_use_adaptive_granularity = false;
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, default_codec, writer_settings);
writer_wide = typeid_cast<MergeTreeDataPartWriterWide *>(writer.get());
if (!writer_wide)
throw Exception("MergedColumnOnlyOutputStream can be used only for writing Wide parts", ErrorCodes::LOGICAL_ERROR);
std::cerr << "(MergedColumnOnlyOutputStream) writer: " << !!writer << "\n";
initSkipIndices();
/// FIXME unnessary init of primary idx
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
std::set<String> skip_indexes_column_names_set;
for (const auto & index : skip_indices)
for (const auto & index : storage.skip_indices) /// FIXME save local indices
std::copy(index->columns.cbegin(), index->columns.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
@ -56,18 +52,16 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
return;
/// FIXME skip_offsets
UNUSED(skip_offsets);
UNUSED(already_written_offset_columns);
auto [new_current_mark, new_index_offset] = writer->write(block, nullptr, current_mark, index_offset,
index_granularity, {}, {}, skip_offsets, already_written_offset_columns);
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column = block.getByName(header.getByPosition(i).name);
writer_wide->writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets);
}
/// Should be written before index offset update, because we calculate,
/// indices of currently written granules
calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
current_mark = new_current_mark;
index_offset = new_index_offset;
writer_wide->calculateAndSerializeSkipIndices(skip_indexes_columns, rows);
writer_wide->next();
}
void MergedColumnOnlyOutputStream::writeSuffix()
@ -79,8 +73,8 @@ MergeTreeData::DataPart::Checksums MergedColumnOnlyOutputStream::writeSuffixAndG
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
bool write_final_mark = with_final_mark && (index_offset != 0 || current_mark != 0);
writer->finalize(checksums, write_final_mark, sync);
bool write_final_mark = true; /// FIXME
writer->finishDataSerialization(checksums, write_final_mark, sync);
return checksums;
}

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/IMergedBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
namespace DB
{
@ -35,6 +36,8 @@ private:
/// To correctly write Nested elements column-by-column.
WrittenOffsetColumns & already_written_offset_columns;
MergeTreeDataPartWriterWide * writer_wide;
};