ClickHouse/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp

98 lines
3.3 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
2020-04-14 01:26:34 +00:00
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Interpreters/Context.h>
#include <IO/WriteSettings.h>
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
2022-10-22 22:51:59 +00:00
const MergeTreeMutableDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
2020-04-17 11:59:10 +00:00
const Block & header_,
CompressionCodecPtr default_codec,
2020-12-10 09:22:43 +00:00
const MergeTreeIndices & indices_to_recalc,
2019-12-09 21:21:17 +00:00
WrittenOffsetColumns * offset_columns_,
2019-11-08 14:36:10 +00:00
const MergeTreeIndexGranularity & index_granularity,
2020-04-17 11:59:10 +00:00
const MergeTreeIndexGranularityInfo * index_granularity_info)
2022-10-22 22:51:59 +00:00
: IMergedBlockOutputStream(data_part, metadata_snapshot_, header_.getNamesAndTypesList(), /*reset_columns=*/ true)
, header(header_)
{
const auto & global_settings = data_part->storage.getContext()->getSettings();
const auto & storage_settings = data_part->storage.getSettings();
MergeTreeWriterSettings writer_settings(
global_settings,
2022-04-07 16:46:46 +00:00
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
2022-09-05 05:26:58 +00:00
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false);
2020-04-17 11:59:10 +00:00
writer = data_part->getWriter(
header.getNamesAndTypesList(),
2020-06-17 12:39:20 +00:00
metadata_snapshot_,
2020-04-17 11:59:10 +00:00
indices_to_recalc,
default_codec,
writer_settings,
2020-04-17 11:59:10 +00:00
index_granularity);
2019-11-08 14:36:10 +00:00
2020-04-14 01:26:34 +00:00
auto * writer_on_disk = dynamic_cast<MergeTreeDataPartWriterOnDisk *>(writer.get());
if (!writer_on_disk)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MergedColumnOnlyOutputStream supports only parts stored on disk");
2020-04-14 01:26:34 +00:00
writer_on_disk->setWrittenOffsetColumns(offset_columns_);
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
2020-04-14 01:26:34 +00:00
if (!block.rows())
2019-06-24 15:07:01 +00:00
return;
2020-12-10 08:57:52 +00:00
writer->write(block, nullptr);
2021-10-29 17:21:02 +00:00
new_serialization_infos.add(block);
}
MergeTreeData::DataPart::Checksums
MergedColumnOnlyOutputStream::fillChecksums(
MergeTreeData::MutableDataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & all_checksums)
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
writer->fillChecksums(checksums);
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
checksums.addFile(
projection_name + ".proj",
projection_part->checksums.getTotalSizeOnDisk(),
projection_part->checksums.getTotalChecksumUInt128());
auto columns = new_part->getColumns();
2022-01-21 00:20:41 +00:00
auto serialization_infos = new_part->getSerializationInfos();
serialization_infos.replaceData(new_serialization_infos);
2022-01-21 00:20:41 +00:00
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);
2022-02-08 19:21:16 +00:00
for (const String & removed_file : removed_files)
2022-02-08 19:21:16 +00:00
{
new_part->getDataPartStorage().removeFileIfExists(removed_file);
2022-02-09 10:57:10 +00:00
if (all_checksums.files.contains(removed_file))
all_checksums.files.erase(removed_file);
2022-02-08 19:21:16 +00:00
}
new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());
return checksums;
}
void MergedColumnOnlyOutputStream::finish(bool sync)
{
writer->finish(sync);
}
}