ClickHouse/src/Storages/MergeTree/MergeTreeDataWriter.cpp

697 lines
28 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
2022-04-08 18:56:08 +00:00
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
2020-04-10 09:25:52 +00:00
#include <Columns/ColumnConst.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Disks/createVolume.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
2022-02-14 19:50:08 +00:00
#include <Interpreters/MergeTreeTransaction.h>
#include <IO/HashingWriteBuffer.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
2021-04-23 23:56:26 +00:00
#include <DataTypes/ObjectUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
2021-10-15 10:11:57 +00:00
#include <Processors/TTL/ITTLAlgorithm.h>
2014-03-13 17:44:00 +00:00
2020-09-01 10:49:53 +00:00
#include <Parsers/queryToString.h>
2020-11-13 07:54:05 +00:00
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
2020-11-13 07:54:05 +00:00
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
extern const Event MergeTreeDataWriterBlocksAlreadySorted;
extern const Event MergeTreeDataWriterRows;
extern const Event MergeTreeDataWriterUncompressedBytes;
extern const Event MergeTreeDataWriterCompressedBytes;
extern const Event MergeTreeDataProjectionWriterBlocks;
extern const Event MergeTreeDataProjectionWriterBlocksAlreadySorted;
extern const Event MergeTreeDataProjectionWriterRows;
extern const Event MergeTreeDataProjectionWriterUncompressedBytes;
extern const Event MergeTreeDataProjectionWriterCompressedBytes;
}
2014-03-13 17:44:00 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS;
}
namespace
2014-03-13 17:44:00 +00:00
{
void buildScatterSelector(
const ColumnRawPtrs & columns,
2017-08-31 19:56:43 +00:00
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector,
size_t max_parts)
{
2017-08-31 19:56:43 +00:00
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
Data partitions_map;
size_t num_rows = columns[0]->size();
size_t partitions_count = 0;
for (size_t i = 0; i < num_rows; ++i)
{
Data::key_type key = hash128(i, columns.size(), columns);
typename Data::LookupResult it;
bool inserted;
partitions_map.emplace(key, it, inserted);
if (inserted)
{
if (max_parts && partitions_count >= max_parts)
throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).", ErrorCodes::TOO_MANY_PARTS);
2017-08-31 19:56:43 +00:00
partition_num_to_first_row.push_back(i);
2019-10-29 15:16:51 +00:00
it->getMapped() = partitions_count;
++partitions_count;
/// Optimization for common case when there is only one partition - defer selector initialization.
if (partitions_count == 2)
{
selector = IColumn::Selector(num_rows);
std::fill(selector.begin(), selector.begin() + i, 0);
}
}
if (partitions_count > 1)
2019-10-29 15:16:51 +00:00
selector[i] = it->getMapped();
}
}
/// Computes ttls and updates ttl infos
2020-05-25 17:07:14 +00:00
void updateTTL(
2020-05-28 15:33:44 +00:00
const TTLDescription & ttl_entry,
IMergeTreeDataPart::TTLInfos & ttl_infos,
DB::MergeTreeDataPartTTLInfo & ttl_info,
const Block & block,
2020-05-25 17:07:14 +00:00
bool update_part_min_max_ttls)
{
2021-01-12 16:42:49 +00:00
auto ttl_column = ITTLAlgorithm::executeExpressionAndGetColumn(ttl_entry.expression, block, ttl_entry.result_column);
2021-01-12 16:42:49 +00:00
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(ttl_column.get()))
{
const auto & date_lut = DateLUT::instance();
for (const auto & val : column_date->getData())
ttl_info.update(date_lut.fromDayNum(DayNum(val)));
}
2021-01-12 16:42:49 +00:00
else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(ttl_column.get()))
{
for (const auto & val : column_date_time->getData())
ttl_info.update(val);
}
2021-01-12 16:42:49 +00:00
else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(ttl_column.get()))
{
if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
{
const auto & date_lut = DateLUT::instance();
ttl_info.update(date_lut.fromDayNum(DayNum(column_const->getValue<UInt16>())));
}
else if (typeid_cast<const ColumnUInt32 *>(&column_const->getDataColumn()))
{
ttl_info.update(column_const->getValue<UInt32>());
}
else
throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
}
else
throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
if (update_part_min_max_ttls)
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
}
void MergeTreeDataWriter::TemporaryPart::finalize()
{
for (auto & stream : streams)
stream.finalizer.finish();
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
return result;
2020-06-17 14:32:25 +00:00
metadata_snapshot->check(block, true);
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
{
2021-05-21 01:17:18 +00:00
result.emplace_back(Block(block), Row{});
return result;
}
Block block_copy = block;
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
2021-05-24 23:39:56 +00:00
auto partition_key_names_and_types = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);
ColumnRawPtrs partition_columns;
2021-05-24 23:39:56 +00:00
partition_columns.reserve(partition_key_names_and_types.size());
for (const auto & element : partition_key_names_and_types)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
2017-08-31 19:56:43 +00:00
PODArray<size_t> partition_num_to_first_row;
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
2017-08-31 19:56:43 +00:00
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
auto get_partition = [&](size_t num)
{
2017-08-31 19:56:43 +00:00
Row partition(partition_columns.size());
for (size_t i = 0; i < partition_columns.size(); ++i)
2022-04-15 23:15:40 +00:00
partition[i] = (*partition_columns[i])[partition_num_to_first_row[num]];
return partition;
};
if (partitions_count == 1)
{
/// A typical case is when there is one partition (you do not need to split anything).
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
return result;
}
for (size_t i = 0; i < partitions_count; ++i)
result.emplace_back(block.cloneEmpty(), get_partition(i));
for (size_t col = 0; col < block.columns(); ++col)
{
MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
for (size_t i = 0; i < partitions_count; ++i)
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
return result;
2014-03-13 17:44:00 +00:00
}
Block MergeTreeDataWriter::mergeBlock(
const Block & block,
SortDescription sort_description,
const Names & partition_key_columns,
IColumn::Permutation *& permutation,
const MergeTreeData::MergingParams & merging_params)
2020-11-13 07:54:05 +00:00
{
size_t block_size = block.rows();
auto get_merging_algorithm = [&]() -> std::shared_ptr<IMergingAlgorithm>
{
switch (merging_params.mode)
2020-11-13 07:54:05 +00:00
{
/// There is nothing to merge in single block in ordinary MergeTree
case MergeTreeData::MergingParams::Ordinary:
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.version_column, block_size + 1);
2020-11-13 07:54:05 +00:00
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,
2022-05-09 19:13:02 +00:00
false, block_size + 1, &Poco::Logger::get("MergeTreeDataWriter"));
2020-11-13 07:54:05 +00:00
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, merging_params.columns_to_sum,
partition_key_columns, block_size + 1);
2020-11-13 07:54:05 +00:00
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1);
2020-11-13 07:54:05 +00:00
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingAlgorithm>(
block, 1, sort_description, merging_params.sign_column, block_size + 1);
2020-11-13 07:54:05 +00:00
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, merging_params.graphite_params, time(nullptr));
2020-11-13 07:54:05 +00:00
}
__builtin_unreachable();
};
auto merging_algorithm = get_merging_algorithm();
if (!merging_algorithm)
return block;
Chunk chunk(block.getColumns(), block_size);
IMergingAlgorithm::Input input;
input.set(std::move(chunk));
input.permutation = permutation;
2020-11-13 07:54:05 +00:00
IMergingAlgorithm::Inputs inputs;
inputs.push_back(std::move(input));
merging_algorithm->initialize(std::move(inputs));
IMergingAlgorithm::Status status = merging_algorithm->merge();
/// Check that after first merge merging_algorithm is waiting for data from input 0.
if (status.required_source != 0)
2020-12-09 15:07:58 +00:00
throw Exception("Logical error: required source after the first merge is not 0.", ErrorCodes::LOGICAL_ERROR);
status = merging_algorithm->merge();
/// Check that merge is finished.
if (!status.is_finished)
2020-12-09 15:07:58 +00:00
throw Exception("Logical error: merge is not finished after the second merge.", ErrorCodes::LOGICAL_ERROR);
2020-11-20 10:38:53 +00:00
2020-11-13 07:54:05 +00:00
/// Merged Block is sorted and we don't need to use permutation anymore
permutation = nullptr;
2020-11-13 07:54:05 +00:00
return block.cloneWithColumns(status.chunk.getColumns());
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
2014-03-13 17:44:00 +00:00
{
TemporaryPart temp_part;
Block & block = block_with_partition.block;
2021-04-23 23:56:26 +00:00
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
for (auto & column : columns)
if (isObject(column.type))
column.type = block.getByName(column.name).type;
static const String TMP_PREFIX = "tmp_insert_";
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
MergeTreePartition partition(std::move(block_with_partition.partition));
2014-03-13 17:44:00 +00:00
MergeTreePartInfo new_part_info(partition.getID(metadata_snapshot->getPartitionKey().sample_block), temp_index, temp_index, 0);
String part_name;
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date(minmax_idx->hyperrectangle[data.minmax_idx_date_column_pos].left.get<UInt64>());
DayNum max_date(minmax_idx->hyperrectangle[data.minmax_idx_date_column_pos].right.get<UInt64>());
2014-03-13 17:44:00 +00:00
const auto & date_lut = DateLUT::instance();
2015-08-17 21:09:36 +00:00
auto min_month = date_lut.toNumYYYYMM(min_date);
auto max_month = date_lut.toNumYYYYMM(max_date);
2015-08-17 21:09:36 +00:00
if (min_month != max_month)
throw Exception("Logical error: part spans more than one month.", ErrorCodes::LOGICAL_ERROR);
2014-03-13 17:44:00 +00:00
part_name = new_part_info.getPartNameV0(min_date, max_date);
}
else
part_name = new_part_info.getPartName();
2020-11-13 07:54:05 +00:00
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
2020-11-13 07:54:05 +00:00
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
if (context->getSettingsRef().optimize_on_insert)
block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr, data.merging_params);
2020-11-13 07:54:05 +00:00
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
2021-02-12 14:02:04 +00:00
/// If optimize_on_insert is true, block may become empty after merge.
/// There is no need to create empty part.
if (expected_size == 0)
return temp_part;
2021-02-12 14:02:04 +00:00
2019-12-19 13:10:57 +00:00
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
2020-06-17 13:39:26 +00:00
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
2020-05-25 17:07:14 +00:00
for (const auto & ttl_entry : move_ttl_entries)
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
2020-10-05 16:41:46 +00:00
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
2022-04-12 18:59:49 +00:00
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
2019-11-25 20:19:43 +00:00
2022-04-08 18:56:08 +00:00
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
2022-04-12 18:59:49 +00:00
data_part_volume,
data.relative_data_path,
TMP_PREFIX + part_name);
auto data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
data_part_volume,
2022-04-08 18:56:08 +00:00
data.relative_data_path,
TMP_PREFIX + part_name);
2019-11-25 11:06:59 +00:00
auto new_data_part = data.createPart(
2020-02-13 14:42:48 +00:00
part_name,
data.choosePartType(expected_size, block.rows()),
new_part_info,
2022-04-08 18:56:08 +00:00
data_part_storage);
if (data.storage_settings.get()->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();
2021-10-29 17:21:02 +00:00
const auto & data_settings = data.getSettings();
SerializationInfo::Settings settings{data_settings->ratio_of_defaults_for_sparse_serialization, true};
SerializationInfoByName infos(columns, settings);
infos.add(block);
2022-07-27 14:05:16 +00:00
new_data_part->setColumns(columns, infos);
2021-03-09 14:46:52 +00:00
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
2014-07-17 10:44:17 +00:00
2021-01-26 13:29:45 +00:00
SyncGuardPtr sync_guard;
2020-04-14 19:47:19 +00:00
if (new_data_part->isStoredOnDisk())
{
2020-04-14 19:47:19 +00:00
/// The name could be non-unique in case of stale files from previous runs.
2022-04-12 18:59:49 +00:00
String full_path = new_data_part->data_part_storage->getFullPath();
2020-04-14 19:47:19 +00:00
2022-04-12 18:59:49 +00:00
if (new_data_part->data_part_storage->exists())
2020-04-14 19:47:19 +00:00
{
2022-06-30 12:12:45 +00:00
LOG_WARNING(log, "Removing old temporary directory {}", full_path);
2022-06-28 11:19:30 +00:00
data_part_storage_builder->removeRecursive();
2020-04-14 19:47:19 +00:00
}
2022-06-28 11:19:30 +00:00
data_part_storage_builder->createDirectories();
2020-08-24 13:09:23 +00:00
if (data.getSettings()->fsync_part_directory)
2022-06-28 11:19:30 +00:00
{
const auto disk = data_part_volume->getDisk();
2021-01-26 13:29:45 +00:00
sync_guard = disk->getDirectorySyncGuard(full_path);
2022-06-28 11:19:30 +00:00
}
2020-08-24 13:09:23 +00:00
}
if (metadata_snapshot->hasRowsTTL())
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
2020-12-25 14:52:46 +00:00
for (const auto & ttl_entry : metadata_snapshot->getGroupByTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
2021-01-13 14:04:27 +00:00
for (const auto & ttl_entry : metadata_snapshot->getRowsWhereTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
for (const auto & ttl_entry : recompression_ttl_entries)
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false);
new_data_part->ttl_infos.update(move_ttl_infos);
2017-08-01 20:07:16 +00:00
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
2017-07-31 11:05:49 +00:00
2020-05-28 12:37:05 +00:00
const auto & index_factory = MergeTreeIndexFactory::instance();
2022-04-12 18:59:49 +00:00
auto out = std::make_unique<MergedBlockOutputStream>(new_data_part, data_part_storage_builder, metadata_snapshot, columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec,
context->getCurrentTransaction(), false, false, context->getWriteSettings());
2021-04-15 21:47:11 +00:00
out->writeWithPermutation(block, perm_ptr);
2021-08-26 11:01:15 +00:00
for (const auto & projection : metadata_snapshot->getProjections())
{
auto projection_block = projection.calculate(block, context);
if (projection_block.rows())
{
2022-04-12 18:59:49 +00:00
auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, data_part_storage_builder, new_data_part.get());
new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part));
2022-06-28 12:41:22 +00:00
proj_temp_part.builder->commit();
for (auto & stream : proj_temp_part.streams)
temp_part.streams.emplace_back(std::move(stream));
}
2021-08-26 11:01:15 +00:00
}
2022-03-21 08:52:48 +00:00
auto finalizer = out->finalizePartAsync(
new_data_part,
data_settings->fsync_after_insert,
nullptr, nullptr,
context->getWriteSettings());
temp_part.part = new_data_part;
2022-06-28 12:41:22 +00:00
temp_part.builder = data_part_storage_builder;
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
2020-03-23 13:32:02 +00:00
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());
2016-04-23 02:39:40 +00:00
return temp_part;
2014-03-13 17:44:00 +00:00
}
void MergeTreeDataWriter::deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
}
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name,
2021-08-26 11:01:15 +00:00
MergeTreeDataPartType part_type,
const String & relative_path,
2022-04-12 18:59:49 +00:00
const DataPartStorageBuilderPtr & data_part_storage_builder,
2021-08-26 11:01:15 +00:00
bool is_temp,
const IMergeTreeDataPart * parent_part,
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection)
{
TemporaryPart temp_part;
const StorageMetadataPtr & metadata_snapshot = projection.metadata;
2021-08-26 11:01:15 +00:00
MergeTreePartInfo new_part_info("all", 0, 0, 0);
2022-04-12 18:59:49 +00:00
auto projection_part_storage = parent_part->data_part_storage->getProjection(relative_path);
2021-08-26 11:01:15 +00:00
auto new_data_part = data.createPart(
part_name,
part_type,
new_part_info,
2022-04-12 18:59:49 +00:00
projection_part_storage,
2021-08-26 11:01:15 +00:00
parent_part);
2022-04-12 18:59:49 +00:00
auto projection_part_storage_builder = data_part_storage_builder->getProjection(relative_path);
2021-08-26 11:01:15 +00:00
new_data_part->is_temp = is_temp;
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
2021-10-29 17:21:02 +00:00
SerializationInfo::Settings settings{data.getSettings()->ratio_of_defaults_for_sparse_serialization, true};
SerializationInfoByName infos(columns, settings);
infos.add(block);
2022-07-27 14:05:16 +00:00
new_data_part->setColumns(columns, infos);
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
2022-04-12 18:59:49 +00:00
if (projection_part_storage->exists())
{
2022-04-12 18:59:49 +00:00
LOG_WARNING(log, "Removing old temporary directory {}", projection_part_storage->getFullPath());
projection_part_storage_builder->removeRecursive();
}
2022-04-12 18:59:49 +00:00
projection_part_storage_builder->createDirectories();
}
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterBlocks);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterBlocksAlreadySorted);
}
if (projection.type == ProjectionDescription::Type::Aggregate)
{
MergeTreeData::MergingParams projection_merging_params;
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
block = mergeBlock(block, sort_description, {}, perm_ptr, projection_merging_params);
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
2022-04-12 18:59:49 +00:00
projection_part_storage_builder,
metadata_snapshot,
columns,
MergeTreeIndices{},
2022-02-14 19:50:08 +00:00
compression_codec,
2022-03-16 19:16:26 +00:00
NO_TRANSACTION_PTR);
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, false);
temp_part.part = new_data_part;
2022-06-28 12:41:22 +00:00
temp_part.builder = projection_part_storage_builder;
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
return temp_part;
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
MergeTreeData & data,
Poco::Logger * log,
Block block,
2022-04-12 18:59:49 +00:00
const ProjectionDescription & projection,
const DataPartStorageBuilderPtr & data_part_storage_builder,
const IMergeTreeDataPart * parent_part)
{
String part_name = projection.name;
2021-08-26 11:01:15 +00:00
MergeTreeDataPartType part_type;
2022-03-14 14:42:09 +00:00
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
2021-08-26 11:01:15 +00:00
{
2022-03-14 14:42:09 +00:00
part_type = MergeTreeDataPartType::InMemory;
2021-08-26 11:01:15 +00:00
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
2022-04-12 18:59:49 +00:00
data.reserveSpace(expected_size, data_part_storage_builder);
2021-08-26 11:01:15 +00:00
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
2021-08-26 11:01:15 +00:00
return writeProjectionPartImpl(
part_name,
part_type,
part_name + ".proj" /* relative_path */,
2022-04-12 18:59:49 +00:00
data_part_storage_builder,
2021-08-26 11:01:15 +00:00
false /* is_temp */,
parent_part,
data,
log,
block,
projection);
}
2021-08-26 11:01:15 +00:00
/// This is used for projection materialization process which may contain multiple stages of
/// projection part merges.
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
2022-04-12 18:59:49 +00:00
const DataPartStorageBuilderPtr & data_part_storage_builder,
const IMergeTreeDataPart * parent_part,
size_t block_num)
{
String part_name = fmt::format("{}_{}", projection.name, block_num);
2021-08-26 11:01:15 +00:00
MergeTreeDataPartType part_type;
2022-03-14 14:42:09 +00:00
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
2021-08-26 11:01:15 +00:00
{
2022-03-14 14:42:09 +00:00
part_type = MergeTreeDataPartType::InMemory;
2021-08-26 11:01:15 +00:00
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
2022-04-12 18:59:49 +00:00
data.reserveSpace(expected_size, data_part_storage_builder);
2021-08-26 11:01:15 +00:00
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
}
return writeProjectionPartImpl(
part_name,
2021-08-26 11:01:15 +00:00
part_type,
part_name + ".tmp_proj" /* relative_path */,
2022-04-12 18:59:49 +00:00
data_part_storage_builder,
2021-08-26 11:01:15 +00:00
true /* is_temp */,
parent_part,
data,
log,
block,
projection);
2021-08-26 11:01:15 +00:00
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionPart(
2021-08-26 11:01:15 +00:00
const MergeTreeData & data,
Poco::Logger * log,
Block block,
const ProjectionDescription & projection,
2022-04-12 18:59:49 +00:00
const DataPartStorageBuilderPtr & data_part_storage_builder,
2021-08-26 11:01:15 +00:00
const IMergeTreeDataPart * parent_part)
{
return writeProjectionPartImpl(
projection.name,
2022-03-14 14:42:09 +00:00
MergeTreeDataPartType::InMemory,
2021-08-26 11:01:15 +00:00
projection.name + ".proj" /* relative_path */,
2022-04-12 18:59:49 +00:00
data_part_storage_builder,
2021-08-26 11:01:15 +00:00
false /* is_temp */,
parent_part,
data,
log,
block,
projection);
}
2014-03-13 17:44:00 +00:00
}