polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-08 17:36:10 +03:00
parent 8cf6236936
commit c070254279
11 changed files with 70 additions and 26 deletions

View File

@ -364,7 +364,8 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
String IMergeTreeDataPart::getFullPath() const
{
assertOnDisk();
/// FIXME
// assertOnDisk();
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);

View File

@ -80,10 +80,11 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
{
if (settings.blocks_are_granules_size && !index_granularity.empty())
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
Poco::File part_dir(part_path);
if (!part_dir.exists())
part_dir.createDirectories();
Poco::File(part_path).createDirectories();
initPrimaryIndex();
initSkipIndices();
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
@ -149,6 +150,8 @@ void IMergeTreeDataPartWriter::initPrimaryIndex()
part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = std::make_unique<HashingWriteBuffer>(*index_file_stream);
}
primary_index_initialized = true;
}
void IMergeTreeDataPartWriter::initSkipIndices()
@ -166,10 +169,15 @@ void IMergeTreeDataPartWriter::initSkipIndices()
skip_indices_aggregators.push_back(index->createIndexAggregator());
skip_index_filling.push_back(0);
}
skip_indices_initialized = true;
}
void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & primary_keys_block, size_t rows)
{
if (!primary_index_initialized)
throw Exception("Primary index is not initialized", ErrorCodes::LOGICAL_ERROR);
size_t primary_columns_num = primary_keys_block.columns();
if (index_columns.empty())
{
@ -219,6 +227,9 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p
void IMergeTreeDataPartWriter::calculateAndSerializeSkipIndices(
const Block & skip_indexes_block, size_t rows)
{
if (!skip_indices_initialized)
throw Exception("Skip indices are not initialized", ErrorCodes::LOGICAL_ERROR);
size_t skip_index_current_data_mark = 0;
/// Filling and writing skip indices like in IMergeTreeDataPartWriter::writeColumn
@ -279,6 +290,7 @@ void IMergeTreeDataPartWriter::calculateAndSerializeSkipIndices(
void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool write_final_mark)
{
std::cerr << "finishPrimaryIndexSerialization called...\n";
if (index_stream)
{
if (write_final_mark)
@ -288,10 +300,13 @@ void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::Da
index_columns[j]->insert(last_index_row[j]);
index_types[j]->serializeBinary(last_index_row[j], *index_stream);
}
last_index_row.clear();
index_granularity.appendMark(0);
}
std::cerr << "(finishPrimaryIndexSerialization) marks_count: " << index_granularity.getMarksCount() << "\n";
std::cerr << "(finishPrimaryIndexSerialization) write_final_mark: " << write_final_mark << "\n";
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();

View File

@ -140,6 +140,8 @@ protected:
Row last_index_row;
bool data_written = false;
bool primary_index_initialized = false;
bool skip_indices_initialized = false;
};

View File

@ -1678,6 +1678,7 @@ void MergeTreeData::alterDataPart(
/// Don't recalc indices because indices alter is restricted
std::vector<MergeTreeIndexPtr>{},
unused_written_offsets,
part->index_granularity,
&part->index_granularity_info);
in.readPrefix();

View File

@ -831,7 +831,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
new_data_part->index_granularity = to.getIndexGranularity();
MergedColumnOnlyOutputStream column_to(
new_data_part,
column_gathered_stream.getHeader(),
@ -842,7 +841,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/// because all of them were already recalculated and written
/// as key part of vertical merge
std::vector<MergeTreeIndexPtr>{},
written_offset_columns);
written_offset_columns,
to.getIndexGranularity());
size_t column_elems_written = 0;
@ -1093,7 +1093,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
merge_entry->columns_written = all_columns.size() - updated_header.columns();
new_data_part->index_granularity = source_part->index_granularity;
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
@ -1104,6 +1103,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
unused_written_offsets,
source_part->index_granularity,
&source_part->index_granularity_info
);

View File

@ -8,22 +8,22 @@ namespace DB
const MergeTreePartInfo & info, const String & relative_path)
{
// /// FIXME
size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
return std::make_shared<MergeTreeDataPartCompact>(storage, name, info, index_granularity_info, disk, relative_path);
// size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
// return std::make_shared<MergeTreeDataPartCompact>(storage, name, info, index_granularity_info, disk, relative_path);
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
// return std::make_shared<MergeTreeDataPartWide>(storage, name, info, index_granularity_info, disk, relative_path);
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
return std::make_shared<MergeTreeDataPartWide>(storage, name, info, index_granularity_info, disk, relative_path);
}
std::shared_ptr<IMergeTreeDataPart> createPart(MergeTreeData & storage, const DiskSpace::DiskPtr & disk,
const String & name, const String & relative_path)
{
/// FIXME
size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
return std::make_shared<MergeTreeDataPartCompact>(storage, name, index_granularity_info, disk, relative_path);
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
// return std::make_shared<MergeTreeDataPartWide>(storage, name, index_granularity_info, disk, relative_path);
// /// FIXME
// size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
// return std::make_shared<MergeTreeDataPartCompact>(storage, name, index_granularity_info, disk, relative_path);
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
return std::make_shared<MergeTreeDataPartWide>(storage, name, index_granularity_info, disk, relative_path);
}
}

View File

@ -55,6 +55,8 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.aio_threshold);
};
std::cerr << "(addStreams) name: " << name << "\n";
IDataType::SubstreamPath stream_path;
type.enumerateStreams(callback, stream_path);
}
@ -102,6 +104,8 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
/// but not in case of vertical merge)
if (compute_granularity)
fillIndexGranularity(block);
std::cerr << "(MergeTreeDataPartWriterWide::write) marks_count: " << index_granularity.getMarksCount() << "\n";
WrittenOffsetColumns offset_columns;
MarkWithOffset result;
@ -226,6 +230,7 @@ std::pair<size_t, size_t> MergeTreeDataPartWriterWide::writeColumn(
std::cerr << "(writeColumn) table: " << storage.getTableName() << "\n";
std::cerr << "(writeColumn) column: " << name << "\n";
std::cerr << "(writeColumn) index_offset: " << index_offset << "\n";
auto & settings = storage.global_context.getSettingsRef();
IDataType::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets);
@ -311,10 +316,16 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
}
if (write_final_mark)
{
writeFinalMark(it->name, it->type, offset_columns, false, serialize_settings.path);
}
}
}
/// FIXME ??
if (compute_granularity && write_final_mark && data_written)
index_granularity.appendMark(0);
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{
it->second->finalize();

View File

@ -156,7 +156,10 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
*/
if (!data_file_exists)
return;
std::cerr << "(addStreams) part: " << path << '\n';
std::cerr << "(addStreams) marks count: " << data_part->getMarksCount() << "\n";
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
all_mark_ranges, settings, mark_cache,

View File

@ -30,6 +30,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
WriterSettings writer_settings(data_part->storage.global_context.getSettings(),
data_part->storage.canUseAdaptiveGranularity(), blocks_are_granules_size);
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
init();
}
MergedBlockOutputStream::MergedBlockOutputStream(
@ -58,6 +59,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
writer = data_part->getWriter(columns_list,
data_part->storage.getSkipIndices(), default_codec, writer_settings);
init();
}
std::string MergedBlockOutputStream::getPartPath() const
@ -152,10 +154,15 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
new_part->index_granularity = writer->getIndexGranularity();
std::cerr << "(writeSuffixAndFinalizePart) part: " << new_part->getFullPath() << "\n";
std::cerr << "(writeSuffixAndFinalizePart) marks_count: " << new_part->index_granularity.getMarksCount() << "\n";
}
void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
writer->initPrimaryIndex();
writer->initSkipIndices();
}

View File

@ -8,6 +8,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
CompressionCodecPtr default_codec, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
WrittenOffsetColumns & already_written_offset_columns_,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part),
header(header_), sync(sync_), skip_offsets(skip_offsets_),
@ -19,15 +20,17 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
// if (index_granularity_info_)
// std::cerr << "(MergedColumnOnlyOutputStream) index_granularity_info->isAdaptive(): " << index_granularity_info_->is_adaptive << "\n";
WriterSettings writer_settings(data_part->storage.global_context.getSettings(), false);
if (index_granularity_info && !index_granularity_info->is_adaptive)
writer_settings.can_use_adaptive_granularity = false;
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, default_codec, writer_settings);
WriterSettings writer_settings(
data_part->storage.global_context.getSettings(),
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity());
writer = data_part->getWriter(header.getNamesAndTypesList(), indices_to_recalc, default_codec, writer_settings, index_granularity);
writer_wide = typeid_cast<MergeTreeDataPartWriterWide *>(writer.get());
if (!writer_wide)
throw Exception("MergedColumnOnlyOutputStream can be used only for writing Wide parts", ErrorCodes::LOGICAL_ERROR);
/// FIXME unnessary init of primary idx
writer->initSkipIndices();
}
void MergedColumnOnlyOutputStream::write(const Block & block)
@ -51,7 +54,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
if (!rows)
return;
/// FIXME skip_offsets
std::cerr << "(MergedColumnOnlyOutputStream::write) writing rows: " << rows << "\n";
WrittenOffsetColumns offset_columns = already_written_offset_columns;
for (size_t i = 0; i < header.columns(); ++i)

View File

@ -21,6 +21,7 @@ public:
CompressionCodecPtr default_codec_, bool skip_offsets_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
WrittenOffsetColumns & already_written_offset_columns_,
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);
Block getHeader() const override { return header; }