polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-10-21 20:23:06 +03:00
parent 8df8bcea94
commit 1d3f005538
12 changed files with 122 additions and 69 deletions

View File

@ -35,7 +35,7 @@ struct ColumnSize;
class MergeTreeData;
class IMergeTreeReader;
class IMergeTreeWriter;
class IMergeTreeDataPartWriter;
namespace ErrorCodes
{
@ -50,7 +50,7 @@ public:
using ValueSizeMap = std::map<std::string, double>;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeWriter>;
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;
// virtual BlockInputStreamPtr readAll() = 0;
// virtual BlockInputStreamPtr read() = 0;
@ -68,7 +68,6 @@ public:
virtual MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const CompressionCodecPtr & default_codec_,
const WriterSettings & writer_settings) const = 0;

View File

@ -7,14 +7,12 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const IColumn::Permutation * permutation_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_)
: part_path(part_path_)
, storage(storage_)
, columns_list(columns_list_)
, permutation(permutation_)
, marks_file_extension(marks_file_extension_)
, default_codec(default_codec_)
, settings(settings_) {}

View File

@ -61,17 +61,17 @@ public:
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const WriterSettings & settings);
virtual size_t write(
const Block & block, size_t from_mark, size_t offset, const MergeTreeIndexGranularity & index_granularity,
const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t offset, const MergeTreeIndexGranularity & index_granularity,
/* Blocks with already sorted index columns */
const Block & primary_key_block = {}, const Block & skip_indexes_block = {}) = 0;
// virtual void writeFinalMarks() = 0;
virtual void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) = 0;
virtual ~IMergeTreeDataPartWriter();
@ -82,7 +82,6 @@ protected:
String part_path;
const MergeTreeData & storage;
NamesAndTypesList columns_list;
const IColumn::Permutation * permutation;
const String marks_file_extension;
CompressionCodecPtr default_codec;

View File

@ -87,12 +87,10 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const
{
UNUSED(columns_list);
UNUSED(permutation);
UNUSED(default_codec);
UNUSED(writer_settings);
return {};

View File

@ -60,7 +60,6 @@ public:
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const override;

View File

@ -24,6 +24,8 @@
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
namespace DB
@ -87,15 +89,13 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const CompressionCodecPtr & default_codec,
const WriterSettings & writer_settings) const
{
UNUSED(columns_list);
UNUSED(permutation);
UNUSED(default_codec);
UNUSED(writer_settings);
return {};
return std::make_unique<MergeTreeDataPartWriterWide>(
getFullPath(), storage, columns_list,
index_granularity_info.marks_file_extension,
default_codec, writer_settings);
}

View File

@ -60,7 +60,6 @@ public:
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const CompressionCodecPtr & default_codec_,
const WriterSettings & writer_settings) const override;

View File

@ -3,7 +3,9 @@
namespace DB
{
size_t MergeTreeDataPartWriterCompact::write(const Block & block, size_t from_mark, size_t index_offset,
size_t MergeTreeDataPartWriterCompact::write(
const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset,
const MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block)
{
@ -83,4 +85,21 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith
return from_row + number_of_rows;
}
void MergeTreeDataPartWriterCompact::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark)
{
if (write_final_mark)
{
writeIntBinary(0, stream->marks);
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
}
}
stream->finalize();
stream->addToChecksums(checksums);
stream.reset();
}
}

View File

@ -6,16 +6,19 @@ namespace DB
class MergeTreeDataPartWriterCompact : IMergeTreeDataPartWriter
{
public:
size_t write(const Block & block, size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity,
size_t write(const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset, const MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block) override;
void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) override;
private:
/// Write single granule of one column (rows between 2 marks)
size_t writeColumnSingleGranule(
const ColumnWithTypeAndName & column,
size_t from_row,
size_t number_of_rows);
private:
ColumnStreamPtr stream;
};

View File

@ -12,13 +12,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
const String & part_path_,
const MergeTreeData & storage_,
const NamesAndTypesList & columns_list_,
const IColumn::Permutation * permutation_,
const String & marks_file_extension_,
const CompressionCodecPtr & default_codec_,
const WriterSettings & settings_,
const ColumnToSize & merged_column_to_size)
: IMergeTreeDataPartWriter(part_path_, storage_, columns_list_,
permutation_, marks_file_extension_,
: IMergeTreeDataPartWriter(part_path_,
storage_, columns_list_,
marks_file_extension_,
default_codec_, settings_)
{
size_t total_size = 0;
@ -89,9 +89,10 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
};
}
size_t MergeTreeDataPartWriterWide::write(const Block & block, size_t from_mark, size_t index_offset,
const MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block)
size_t MergeTreeDataPartWriterWide::write(const Block & block,
const IColumn::Permutation * permutation, size_t from_mark, size_t index_offset,
const MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block)
{
if (serialization_states.empty())
{
@ -286,4 +287,56 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
return std::make_pair(current_column_mark, current_row - total_rows);
}
void MergeTreeDataPartWriterWide::finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark)
{
const auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
WrittenOffsetColumns offset_columns;
{
auto it = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
{
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
}
if (write_final_mark)
writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
}
}
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{
it->second->finalize();
it->second->addToChecksums(checksums);
}
column_streams.clear();
}
void MergeTreeDataPartWriterWide::writeFinalMark(
const std::string & column_name,
const DataTypePtr column_type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
DB::IDataType::SubstreamPath & path)
{
writeSingleMark(column_name, *column_type, offset_columns, skip_offsets, 0, path);
/// Memoize information about offsets
column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
if (is_offsets)
{
String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
offset_columns.insert(stream_name);
}
}, path);
}
}

View File

@ -3,7 +3,7 @@
namespace DB
{
class MergeTreeDataPartWriterWide : IMergeTreeDataPartWriter
class MergeTreeDataPartWriterWide : public IMergeTreeDataPartWriter
{
public:
@ -13,16 +13,18 @@ public:
const String & part_path,
const MergeTreeData & storage,
const NamesAndTypesList & columns_list,
const IColumn::Permutation * permutation,
const String & marks_file_extension,
const CompressionCodecPtr & default_codec,
const WriterSettings & settings,
const ColumnToSize & merged_column_to_size = {});
size_t write(const Block & block, size_t from_mark, size_t index_offset,
size_t write(const Block & block, const IColumn::Permutation * permutation,
size_t from_mark, size_t index_offset,
const MergeTreeIndexGranularity & index_granularity,
const Block & primary_key_block, const Block & skip_indexes_block) override;
void finalize(IMergeTreeDataPart::Checksums & checksums, bool write_final_mark) override;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);
/// Write data of one column.
@ -61,6 +63,13 @@ private:
bool skip_offsets,
size_t number_of_rows,
DB::IDataType::SubstreamPath & path);
void writeFinalMark(
const std::string & column_name,
const DataTypePtr column_type,
WrittenOffsetColumns & offset_columns,
bool skip_offsets,
DB::IDataType::SubstreamPath & path);
void addStreams(
const String & name,

View File

@ -37,6 +37,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, columns_list(columns_list_)
{
init();
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
}
MergedBlockOutputStream::MergedBlockOutputStream(
@ -58,6 +59,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, columns_list(columns_list_)
{
init();
writer = data_part_->getWriter(columns_list_, default_codec_, writer_settings);
}
std::string MergedBlockOutputStream::getPartPath() const
@ -89,37 +91,19 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
const NamesAndTypesList * total_column_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
/// Finish columns serialization.
{
/// FIXME
// const auto & settings = storage.global_context.getSettingsRef();
// IDataType::SerializeBinaryBulkSettings serialize_settings;
// serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
// serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
// WrittenOffsetColumns offset_columns;
// auto it = columns_list.begin();
// for (size_t i = 0; i < columns_list.size(); ++i, ++it)
// {
// if (!serialization_states.empty())
// {
// serialize_settings.getter = createStreamGetter(it->name, offset_columns, false);
// it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[i]);
// }
bool write_final_mark = (with_final_mark && rows_count != 0);
writer->finalize(checksums, write_final_mark);
// if (with_final_mark && rows_count != 0)
// writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
// }
}
if (with_final_mark && rows_count != 0)
if (write_final_mark)
index_granularity.appendMark(0); /// last mark
if (!total_column_list)
total_column_list = &columns_list;
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
@ -142,17 +126,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
index_stream = nullptr;
}
/// FIXME
// for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
// {
// it->second->finalize();
// it->second->addToChecksums(checksums);
// }
finishSkipIndicesSerialization(checksums);
// column_streams.clear();
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
new_part->partition.store(storage, part_path, checksums);
@ -278,7 +253,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
size_t new_index_offset = writer->write(block, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block);
size_t new_index_offset = writer->write(block, permutation, current_mark, index_offset, index_granularity, primary_key_block, skip_indexes_block);
rows_count += rows;
/// Should be written before index offset update, because we calculate,
@ -299,9 +274,11 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
{
if (storage.hasPrimaryKey())
{
for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j)
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
const auto & primary_column = primary_key_block.getByPosition(j);
std::cerr << "(writeImpl) primary_column: " << !!primary_column.column << "\n";
std::cerr << "(writeImpl) index_column: " << !!index_columns[j] << "\n";
index_columns[j]->insertFrom(*primary_column.column, i);
primary_column.type->serializeBinary(*primary_column.column, i, *index_stream);
}
@ -316,7 +293,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
/// store last index row to write final mark at the end of column
for (size_t j = 0, size = primary_key_block.rows(); j < size; ++j)
for (size_t j = 0, size = primary_key_block.columns(); j < size; ++j)
{
const IColumn & primary_column = *primary_key_block.getByPosition(j).column.get();
auto mutable_column = std::move(*last_index_row[j].column).mutate();