2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
2020-05-20 20:16:32 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-08-26 15:29:46 +00:00
|
|
|
#include <Parsers/queryToString.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-03-14 23:10:51 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:20:08 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2019-03-14 23:10:51 +00:00
|
|
|
}
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
MergedBlockOutputStream::MergedBlockOutputStream(
|
2019-11-07 11:11:38 +00:00
|
|
|
const MergeTreeDataPartPtr & data_part,
|
2020-06-16 15:51:29 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2017-12-25 21:57:29 +00:00
|
|
|
const NamesAndTypesList & columns_list_,
|
2020-04-10 13:36:51 +00:00
|
|
|
const MergeTreeIndices & skip_indices,
|
2020-08-26 15:29:46 +00:00
|
|
|
CompressionCodecPtr default_codec_,
|
2021-10-29 17:21:02 +00:00
|
|
|
bool reset_columns_,
|
2019-11-07 11:11:38 +00:00
|
|
|
bool blocks_are_granules_size)
|
2021-10-29 17:21:02 +00:00
|
|
|
: IMergedBlockOutputStream(data_part, metadata_snapshot_, columns_list_, reset_columns_)
|
2019-07-28 11:10:35 +00:00
|
|
|
, columns_list(columns_list_)
|
2020-08-26 15:29:46 +00:00
|
|
|
, default_codec(default_codec_)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2020-04-14 19:47:19 +00:00
|
|
|
MergeTreeWriterSettings writer_settings(
|
2021-04-10 23:33:54 +00:00
|
|
|
storage.getContext()->getSettings(),
|
2020-12-07 12:47:51 +00:00
|
|
|
storage.getSettings(),
|
2020-07-16 09:10:15 +00:00
|
|
|
data_part->index_granularity_info.is_adaptive,
|
2020-12-09 18:19:49 +00:00
|
|
|
/* rewrite_primary_key = */ true,
|
2020-04-14 19:47:19 +00:00
|
|
|
blocks_are_granules_size);
|
2019-11-05 11:53:22 +00:00
|
|
|
|
2020-04-14 19:47:19 +00:00
|
|
|
if (!part_path.empty())
|
2020-06-03 13:27:54 +00:00
|
|
|
volume->getDisk()->createDirectories(part_path);
|
2020-02-04 13:34:57 +00:00
|
|
|
|
2021-10-29 17:21:02 +00:00
|
|
|
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2018-05-07 02:01:11 +00:00
|
|
|
/// If data is pre-sorted.
|
2016-07-21 16:22:24 +00:00
|
|
|
void MergedBlockOutputStream::write(const Block & block)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeImpl(block, nullptr);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2017-03-12 19:18:07 +00:00
|
|
|
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
|
2017-04-01 07:20:54 +00:00
|
|
|
* This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
|
|
|
|
*/
|
2016-07-21 16:22:24 +00:00
|
|
|
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
writeImpl(block, permutation);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
2022-02-08 08:01:26 +00:00
|
|
|
struct MergedBlockOutputStream::Finalizer::Impl
|
|
|
|
{
|
|
|
|
IMergeTreeDataPartWriter & writer;
|
|
|
|
MergeTreeData::MutableDataPartPtr part;
|
2022-02-08 19:21:16 +00:00
|
|
|
NameSet files_to_remove_after_finish;
|
2022-02-08 08:01:26 +00:00
|
|
|
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
|
|
|
|
bool sync;
|
|
|
|
|
2022-02-08 19:21:16 +00:00
|
|
|
Impl(IMergeTreeDataPartWriter & writer_, MergeTreeData::MutableDataPartPtr part_, const NameSet & files_to_remove_after_finish_, bool sync_)
|
|
|
|
: writer(writer_)
|
|
|
|
, part(std::move(part_))
|
|
|
|
, files_to_remove_after_finish(files_to_remove_after_finish_)
|
|
|
|
, sync(sync_) {}
|
2022-02-08 08:01:26 +00:00
|
|
|
|
|
|
|
void finish();
|
|
|
|
};
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::Finalizer::finish()
|
|
|
|
{
|
|
|
|
std::unique_ptr<Impl> to_finish = std::move(impl);
|
|
|
|
if (to_finish)
|
|
|
|
to_finish->finish();
|
|
|
|
}
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::Finalizer::Impl::finish()
|
|
|
|
{
|
|
|
|
writer.finish(sync);
|
|
|
|
|
2022-02-08 19:21:16 +00:00
|
|
|
auto disk = part->volume->getDisk();
|
|
|
|
for (const auto & file_name: files_to_remove_after_finish)
|
|
|
|
disk->removeFile(part->getFullRelativePath() + file_name);
|
|
|
|
|
2022-02-08 08:01:26 +00:00
|
|
|
for (auto & file : written_files)
|
|
|
|
{
|
|
|
|
file->finalize();
|
|
|
|
if (sync)
|
|
|
|
file->sync();
|
|
|
|
}
|
|
|
|
|
|
|
|
part->storage.lockSharedData(*part);
|
|
|
|
}
|
|
|
|
|
|
|
|
MergedBlockOutputStream::Finalizer::~Finalizer()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
finish();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException("MergedBlockOutputStream");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-17 15:56:42 +00:00
|
|
|
MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) noexcept = default;
|
|
|
|
MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) noexcept = default;
|
2022-02-08 08:01:26 +00:00
|
|
|
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::finalizePart(
|
|
|
|
MergeTreeData::MutableDataPartPtr & new_part,
|
|
|
|
bool sync,
|
|
|
|
const NamesAndTypesList * total_columns_list,
|
|
|
|
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
|
|
|
{
|
|
|
|
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish();
|
|
|
|
}
|
|
|
|
|
|
|
|
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
2017-08-30 19:03:19 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr & new_part,
|
2022-01-14 19:53:55 +00:00
|
|
|
bool sync,
|
2020-03-09 02:55:28 +00:00
|
|
|
const NamesAndTypesList * total_columns_list,
|
2017-08-30 19:03:19 +00:00
|
|
|
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2019-10-21 17:23:06 +00:00
|
|
|
/// Finish write and get checksums.
|
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
|
2019-10-28 11:00:29 +00:00
|
|
|
if (additional_column_checksums)
|
|
|
|
checksums = std::move(*additional_column_checksums);
|
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
/// Finish columns serialization.
|
2022-02-08 08:01:26 +00:00
|
|
|
writer->fillChecksums(checksums);
|
|
|
|
|
|
|
|
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState());
|
2022-01-14 19:53:55 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
|
|
|
|
checksums.addFile(
|
|
|
|
projection_name + ".proj",
|
|
|
|
projection_part->checksums.getTotalSizeOnDisk(),
|
|
|
|
projection_part->checksums.getTotalChecksumUInt128());
|
|
|
|
|
2022-02-08 19:21:16 +00:00
|
|
|
NameSet files_to_remove_after_sync;
|
2022-01-20 21:06:32 +00:00
|
|
|
if (reset_columns)
|
2022-01-21 00:20:41 +00:00
|
|
|
{
|
|
|
|
auto part_columns = total_columns_list ? *total_columns_list : columns_list;
|
|
|
|
auto serialization_infos = new_part->getSerializationInfos();
|
|
|
|
|
|
|
|
serialization_infos.replaceData(new_serialization_infos);
|
2022-02-08 19:21:16 +00:00
|
|
|
files_to_remove_after_sync = removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
|
2017-08-30 19:03:19 +00:00
|
|
|
|
2022-01-21 00:20:41 +00:00
|
|
|
new_part->setColumns(part_columns);
|
|
|
|
new_part->setSerializationInfos(serialization_infos);
|
|
|
|
}
|
2021-04-15 21:47:11 +00:00
|
|
|
|
2022-02-08 19:21:16 +00:00
|
|
|
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
|
2020-04-14 01:26:34 +00:00
|
|
|
if (new_part->isStoredOnDisk())
|
2022-02-08 08:01:26 +00:00
|
|
|
finalizer->written_files = finalizePartOnDisk(new_part, checksums);
|
2020-04-14 01:26:34 +00:00
|
|
|
|
|
|
|
new_part->rows_count = rows_count;
|
|
|
|
new_part->modification_time = time(nullptr);
|
|
|
|
new_part->index = writer->releaseIndexColumns();
|
|
|
|
new_part->checksums = checksums;
|
|
|
|
new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
|
|
|
|
new_part->index_granularity = writer->getIndexGranularity();
|
2021-10-08 13:13:56 +00:00
|
|
|
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
|
|
|
|
|
2022-01-14 19:53:55 +00:00
|
|
|
if (default_codec != nullptr)
|
|
|
|
new_part->default_codec = default_codec;
|
2022-02-08 08:01:26 +00:00
|
|
|
|
|
|
|
return Finalizer(std::move(finalizer));
|
2022-01-11 19:02:48 +00:00
|
|
|
}
|
|
|
|
|
2022-02-08 08:01:26 +00:00
|
|
|
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk(
|
2022-01-20 23:22:16 +00:00
|
|
|
const MergeTreeData::DataPartPtr & new_part,
|
2022-02-08 08:01:26 +00:00
|
|
|
MergeTreeData::DataPart::Checksums & checksums)
|
2020-04-14 01:26:34 +00:00
|
|
|
{
|
2022-02-08 08:01:26 +00:00
|
|
|
WrittenFiles written_files;
|
2021-02-10 14:12:49 +00:00
|
|
|
if (new_part->isProjectionPart())
|
2017-10-19 18:20:41 +00:00
|
|
|
{
|
2021-02-10 14:12:49 +00:00
|
|
|
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
|
|
|
|
{
|
|
|
|
auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
|
|
|
|
HashingWriteBuffer count_out_hashing(*count_out);
|
|
|
|
writeIntText(rows_count, count_out_hashing);
|
|
|
|
count_out_hashing.next();
|
|
|
|
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
|
|
|
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
2022-02-08 08:01:26 +00:00
|
|
|
count_out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(count_out));
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
2021-04-17 01:06:59 +00:00
|
|
|
}
|
2021-02-10 14:12:49 +00:00
|
|
|
else
|
2021-04-17 01:06:59 +00:00
|
|
|
{
|
2021-02-10 14:12:49 +00:00
|
|
|
if (new_part->uuid != UUIDHelpers::Nil)
|
|
|
|
{
|
2021-05-12 06:53:04 +00:00
|
|
|
auto out = volume->getDisk()->writeFile(fs::path(part_path) / IMergeTreeDataPart::UUID_FILE_NAME, 4096);
|
2021-02-10 14:12:49 +00:00
|
|
|
HashingWriteBuffer out_hashing(*out);
|
|
|
|
writeUUIDText(new_part->uuid, out_hashing);
|
|
|
|
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
|
|
|
|
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
|
|
|
|
2021-05-14 20:38:16 +00:00
|
|
|
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
2021-02-10 14:12:49 +00:00
|
|
|
{
|
2022-02-08 08:01:26 +00:00
|
|
|
if (auto file = new_part->partition.store(storage, volume->getDisk(), part_path, checksums))
|
|
|
|
written_files.emplace_back(std::move(file));
|
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
if (new_part->minmax_idx->initialized)
|
2022-02-08 08:01:26 +00:00
|
|
|
{
|
|
|
|
auto files = new_part->minmax_idx->store(storage, volume->getDisk(), part_path, checksums);
|
|
|
|
for (auto & file : files)
|
|
|
|
written_files.emplace_back(std::move(file));
|
|
|
|
}
|
2021-02-10 14:12:49 +00:00
|
|
|
else if (rows_count)
|
|
|
|
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
|
2021-05-14 20:38:16 +00:00
|
|
|
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
2021-02-10 14:12:49 +00:00
|
|
|
|
2021-05-14 20:38:16 +00:00
|
|
|
{
|
2021-05-12 06:53:04 +00:00
|
|
|
auto count_out = volume->getDisk()->writeFile(fs::path(part_path) / "count.txt", 4096);
|
2021-02-10 14:12:49 +00:00
|
|
|
HashingWriteBuffer count_out_hashing(*count_out);
|
|
|
|
writeIntText(rows_count, count_out_hashing);
|
|
|
|
count_out_hashing.next();
|
|
|
|
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
|
|
|
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
2022-02-08 08:01:26 +00:00
|
|
|
count_out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(count_out));
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-10-17 18:55:07 +00:00
|
|
|
if (!new_part->ttl_infos.empty())
|
2019-04-15 09:30:45 +00:00
|
|
|
{
|
|
|
|
/// Write a file with ttl infos in json format.
|
2021-05-05 15:10:14 +00:00
|
|
|
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "ttl.txt", 4096);
|
2020-02-27 16:47:40 +00:00
|
|
|
HashingWriteBuffer out_hashing(*out);
|
2019-04-15 09:30:45 +00:00
|
|
|
new_part->ttl_infos.write(out_hashing);
|
|
|
|
checksums.files["ttl.txt"].file_size = out_hashing.count();
|
|
|
|
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2019-04-15 09:30:45 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-01-20 23:22:16 +00:00
|
|
|
if (!new_part->getSerializationInfos().empty())
|
2021-03-12 16:33:41 +00:00
|
|
|
{
|
|
|
|
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
|
|
|
|
HashingWriteBuffer out_hashing(*out);
|
2022-01-20 23:22:16 +00:00
|
|
|
new_part->getSerializationInfos().writeJSON(out_hashing);
|
2021-03-12 16:33:41 +00:00
|
|
|
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
|
|
|
|
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2021-03-12 16:33:41 +00:00
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/// Write a file with a description of columns.
|
2021-05-05 15:10:14 +00:00
|
|
|
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "columns.txt", 4096);
|
2022-01-20 23:22:16 +00:00
|
|
|
new_part->getColumns().writeText(*out);
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2020-08-28 09:07:20 +00:00
|
|
|
if (default_codec != nullptr)
|
2020-08-26 15:29:46 +00:00
|
|
|
{
|
|
|
|
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
|
|
|
|
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2020-08-26 15:29:46 +00:00
|
|
|
}
|
2020-08-28 09:07:20 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
throw Exception("Compression codec have to be specified for part on disk, empty for" + new_part->name
|
|
|
|
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
2020-08-26 15:29:46 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/// Write file with checksums.
|
2021-05-05 15:10:14 +00:00
|
|
|
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "checksums.txt", 4096);
|
2020-02-27 16:47:40 +00:00
|
|
|
checksums.write(*out);
|
2022-02-08 08:01:26 +00:00
|
|
|
out->preFinalize();
|
|
|
|
written_files.emplace_back(std::move(out));
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2022-02-08 08:01:26 +00:00
|
|
|
|
|
|
|
return written_files;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
block.checkNumberOfRows();
|
|
|
|
size_t rows = block.rows();
|
2018-12-04 14:44:42 +00:00
|
|
|
if (!rows)
|
|
|
|
return;
|
2018-11-30 15:36:10 +00:00
|
|
|
|
2020-12-10 08:57:52 +00:00
|
|
|
writer->write(block, permutation);
|
2021-10-29 17:21:02 +00:00
|
|
|
if (reset_columns)
|
|
|
|
new_serialization_infos.add(block);
|
2021-03-09 17:25:23 +00:00
|
|
|
|
2017-10-24 14:11:53 +00:00
|
|
|
rows_count += rows;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|