ClickHouse/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp

92 lines
3.3 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
2020-04-14 01:26:34 +00:00
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Interpreters/Context.h>
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
2020-04-17 11:59:10 +00:00
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
2020-04-17 11:59:10 +00:00
const Block & header_,
CompressionCodecPtr default_codec,
2019-11-07 11:11:38 +00:00
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
2019-12-09 21:21:17 +00:00
WrittenOffsetColumns * offset_columns_,
2019-11-08 14:36:10 +00:00
const MergeTreeIndexGranularity & index_granularity,
2020-04-17 11:59:10 +00:00
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
, header(header_)
{
const auto & global_settings = data_part->storage.global_context.getSettings();
MergeTreeWriterSettings writer_settings(
global_settings,
index_granularity_info ? index_granularity_info->is_adaptive : data_part->storage.canUseAdaptiveGranularity(),
global_settings.min_bytes_to_use_direct_io);
2020-04-17 11:59:10 +00:00
writer = data_part->getWriter(
header.getNamesAndTypesList(),
2020-06-17 12:39:20 +00:00
metadata_snapshot_,
2020-04-17 11:59:10 +00:00
indices_to_recalc,
default_codec,
std::move(writer_settings),
index_granularity);
2019-11-08 14:36:10 +00:00
2020-04-14 01:26:34 +00:00
auto * writer_on_disk = dynamic_cast<MergeTreeDataPartWriterOnDisk *>(writer.get());
if (!writer_on_disk)
throw Exception("MergedColumnOnlyOutputStream supports only parts stored on disk", ErrorCodes::NOT_IMPLEMENTED);
writer_on_disk->setWrittenOffsetColumns(offset_columns_);
writer_on_disk->initSkipIndices();
}
void MergedColumnOnlyOutputStream::write(const Block & block)
{
2020-01-16 16:15:01 +00:00
std::unordered_set<String> skip_indexes_column_names_set;
for (const auto & index : writer->getSkipIndices())
2020-05-28 12:37:05 +00:00
std::copy(index->index.column_names.cbegin(), index->index.column_names.cend(),
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
2019-12-09 21:21:17 +00:00
Block skip_indexes_block = getBlockAndPermute(block, skip_indexes_column_names, nullptr);
2020-04-14 01:26:34 +00:00
if (!block.rows())
2019-06-24 15:07:01 +00:00
return;
2019-12-09 21:21:17 +00:00
writer->write(block);
2020-04-14 01:26:34 +00:00
writer->calculateAndSerializeSkipIndices(skip_indexes_block);
2019-12-09 21:21:17 +00:00
writer->next();
}
void MergedColumnOnlyOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums
MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(
MergeTreeData::MutableDataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & all_checksums,
bool sync)
{
/// Finish columns serialization.
MergeTreeData::DataPart::Checksums checksums;
writer->finishDataSerialization(checksums, sync);
writer->finishSkipIndicesSerialization(checksums, sync);
auto columns = new_part->getColumns();
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, checksums);
for (const String & removed_file : removed_files)
if (all_checksums.files.count(removed_file))
all_checksums.files.erase(removed_file);
new_part->setColumns(columns);
return checksums;
}
}