ClickHouse/src/Storages/MergeTree/MergedBlockOutputStream.cpp

236 lines
8.2 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <Parsers/queryToString.h>
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:20:08 +00:00
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
MergedBlockOutputStream::MergedBlockOutputStream(
2019-11-07 11:11:38 +00:00
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec_,
2019-11-07 11:11:38 +00:00
bool blocks_are_granules_size)
: MergedBlockOutputStream(
data_part,
metadata_snapshot_,
columns_list_,
skip_indices,
default_codec_,
{},
data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size)
{
}
MergedBlockOutputStream::MergedBlockOutputStream(
2019-11-07 11:11:38 +00:00
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec_,
2019-11-05 11:53:22 +00:00
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,
2019-11-07 11:11:38 +00:00
bool blocks_are_granules_size)
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
2019-07-28 11:10:35 +00:00
, columns_list(columns_list_)
, default_codec(default_codec_)
{
2020-04-14 19:47:19 +00:00
MergeTreeWriterSettings writer_settings(
storage.global_context.getSettings(),
data_part->index_granularity_info.is_adaptive,
2020-04-14 19:47:19 +00:00
aio_threshold,
blocks_are_granules_size);
2019-11-05 11:53:22 +00:00
if (aio_threshold > 0 && !merged_column_to_size.empty())
{
for (const auto & column : columns_list)
2019-11-05 11:53:22 +00:00
{
auto size_it = merged_column_to_size.find(column.name);
if (size_it != merged_column_to_size.end())
writer_settings.estimated_size += size_it->second;
2019-11-05 11:53:22 +00:00
}
}
2020-04-14 19:47:19 +00:00
if (!part_path.empty())
volume->getDisk()->createDirectories(part_path);
2020-06-17 12:39:20 +00:00
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
writer->initPrimaryIndex();
writer->initSkipIndices();
}
2018-05-07 02:01:11 +00:00
/// If data is pre-sorted.
void MergedBlockOutputStream::write(const Block & block)
{
writeImpl(block, nullptr);
}
2017-03-12 19:18:07 +00:00
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
* This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
*/
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
{
writeImpl(block, permutation);
}
void MergedBlockOutputStream::writeSuffix()
{
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part,
bool sync,
2020-03-09 02:55:28 +00:00
const NamesAndTypesList * total_columns_list,
MergeTreeData::DataPart::Checksums * additional_column_checksums)
{
2019-10-21 17:23:06 +00:00
/// Finish write and get checksums.
MergeTreeData::DataPart::Checksums checksums;
2019-10-28 11:00:29 +00:00
if (additional_column_checksums)
checksums = std::move(*additional_column_checksums);
/// Finish columns serialization.
writer->finishDataSerialization(checksums, sync);
writer->finishPrimaryIndexSerialization(checksums, sync);
writer->finishSkipIndicesSerialization(checksums, sync);
NamesAndTypesList part_columns;
2020-03-09 02:55:28 +00:00
if (!total_columns_list)
part_columns = columns_list;
else
part_columns = *total_columns_list;
2020-04-14 01:26:34 +00:00
if (new_part->isStoredOnDisk())
2020-09-01 15:26:49 +00:00
finalizePartOnDisk(new_part, part_columns, checksums, sync);
2020-04-14 01:26:34 +00:00
new_part->setColumns(part_columns);
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
new_part->index = writer->releaseIndexColumns();
new_part->checksums = checksums;
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
new_part->index_granularity = writer->getIndexGranularity();
2020-06-03 22:52:21 +00:00
new_part->calculateColumnsSizesOnDisk();
2020-08-28 09:07:20 +00:00
if (default_codec != nullptr)
new_part->default_codec = default_codec;
2020-04-14 01:26:34 +00:00
}
void MergedBlockOutputStream::finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns,
2020-09-01 15:26:49 +00:00
MergeTreeData::DataPart::Checksums & checksums,
bool sync)
2020-04-14 01:26:34 +00:00
{
if (new_part->uuid != UUIDHelpers::Nil)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
writeUUIDText(new_part->uuid, *out);
out->finalize();
if (sync)
out->sync();
}
2020-01-22 14:10:35 +00:00
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
2018-05-23 19:34:37 +00:00
if (new_part->minmax_idx.initialized)
new_part->minmax_idx.store(storage, volume->getDisk(), part_path, checksums);
else if (rows_count)
2018-08-06 16:53:34 +00:00
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(*count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->finalize();
2020-09-01 15:26:49 +00:00
if (sync)
count_out->sync();
}
2019-10-17 18:55:07 +00:00
if (!new_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
auto out = volume->getDisk()->writeFile(part_path + "ttl.txt", 4096);
HashingWriteBuffer out_hashing(*out);
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->finalize();
2020-09-01 15:26:49 +00:00
if (sync)
out->sync();
}
removeEmptyColumnsFromPart(new_part, part_columns, checksums);
{
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out);
out->finalize();
2020-09-01 15:26:49 +00:00
if (sync)
out->sync();
}
2020-08-28 09:07:20 +00:00
if (default_codec != nullptr)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->finalize();
}
2020-08-28 09:07:20 +00:00
else
{
throw Exception("Compression codec have to be specified for part on disk, empty for" + new_part->name
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
}
{
/// Write file with checksums.
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
out->finalize();
2020-09-01 15:26:49 +00:00
if (sync)
out->sync();
}
}
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
{
block.checkNumberOfRows();
size_t rows = block.rows();
2018-12-04 14:44:42 +00:00
if (!rows)
return;
2018-11-30 15:36:10 +00:00
2019-12-27 21:17:53 +00:00
std::unordered_set<String> skip_indexes_column_names_set;
for (const auto & index : metadata_snapshot->getSecondaryIndices())
2020-05-28 12:37:05 +00:00
std::copy(index.column_names.cbegin(), index.column_names.cend(),
2019-02-14 16:59:26 +00:00
std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
2020-06-17 12:39:20 +00:00
Block primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);
2019-12-09 21:21:17 +00:00
Block skip_indexes_block = getBlockAndPermute(block, skip_indexes_column_names, permutation);
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
2019-11-07 11:11:38 +00:00
writer->write(block, permutation, primary_key_block, skip_indexes_block);
2020-04-14 01:26:34 +00:00
writer->calculateAndSerializeSkipIndices(skip_indexes_block);
writer->calculateAndSerializePrimaryIndex(primary_key_block);
2019-11-05 11:53:22 +00:00
writer->next();
rows_count += rows;
}
}