Add base functionality

This commit is contained in:
Igor Markelov 2024-05-09 17:36:34 +00:00
parent 68bff50c9d
commit 16c49358a7
8 changed files with 297 additions and 73 deletions

View File

@ -1,5 +1,6 @@
#include <Common/Arena.h>
#include <Common/Exception.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/Hash.h>
#include <Common/RadixSort.h>
#include <Common/SipHash.h>
@ -264,6 +265,22 @@ void ColumnDecimal<T>::updatePermutation(IColumn::PermutationSortDirection direc
}
}
template <is_decimal T>
size_t ColumnDecimal<T>::estimateNumberOfDifferent(const IColumn::Permutation & perm, const EqualRange & range, size_t /*samples*/) const
{
// TODO: sample random elements
size_t range_size = getRangeSize(range);
if (range_size <= 1) {
return range_size;
}
HashSet<T> elements;
for (size_t i = range.first; i < range.second; ++i)
{
elements.insert(data[perm[i]]);
}
return elements.size();
}
template <is_decimal T>
ColumnPtr ColumnDecimal<T>::permute(const IColumn::Permutation & perm, size_t limit) const
{

View File

@ -97,6 +97,8 @@ public:
size_t limit, int nan_direction_hint, IColumn::Permutation & res) const override;
void updatePermutation(IColumn::PermutationSortDirection direction, IColumn::PermutationSortStability stability,
size_t limit, int, IColumn::Permutation & res, EqualRanges& equal_ranges) const override;
size_t estimateNumberOfDifferent(const IColumn::Permutation & perm, const EqualRange & range, size_t samples) const override;
MutableColumnPtr cloneResized(size_t size) const override;

View File

@ -31,6 +31,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
size_t getRangeSize(const EqualRange & range)
{
return range.second - range.first;
}
String IColumn::dumpStructure() const
{
WriteBufferFromOwnString res;
@ -50,6 +55,17 @@ void IColumn::insertFrom(const IColumn & src, size_t n)
insert(src[n]);
}
size_t IColumn::estimateNumberOfDifferent(const IColumn::Permutation & /*perm*/, const EqualRange & range, size_t /*samples*/) const
{
return getRangeSize(range);
}
void IColumn::updatePermutationForCompression(IColumn::Permutation & perm, EqualRanges & ranges) const
{
updatePermutation(PermutationSortDirection::Ascending, PermutationSortStability::Unstable, 0, 1, perm, ranges);
}
ColumnPtr IColumn::createWithOffsets(const Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const
{
if (offsets.size() + shift != size())

View File

@ -40,7 +40,10 @@ class ColumnConst;
* Represents a set of equal ranges in previous column to perform sorting in current column.
* Used in sorting by tuples.
* */
using EqualRanges = std::vector<std::pair<size_t, size_t> >;
using EqualRange = std::pair<size_t, size_t>;
using EqualRanges = std::vector<EqualRange>;
size_t getRangeSize(const EqualRange & range);
/// Declares interface to store columns in memory.
class IColumn : public COW<IColumn>
@ -399,6 +402,10 @@ public:
"or for Array or Tuple, containing them.");
}
virtual size_t estimateNumberOfDifferent(const Permutation & /*perm*/, const EqualRange & range, size_t /*samples*/) const;
virtual void updatePermutationForCompression(Permutation & /*perm*/, EqualRanges & /*ranges*/) const;
/** Copies each element according offsets parameter.
* (i-th element should be copied offsets[i] - offsets[i - 1] times.)
* It is necessary in ARRAY JOIN operation.

View File

@ -0,0 +1,94 @@
#include <numeric>
#include <Interpreters/BestCompressionPermutation.h>
#include <Interpreters/sortBlock.h>
#include "Columns/IColumn.h"
#include "base/sort.h"
namespace DB
{
namespace
{
void getBestCompressionPermutationImpl(
const Block & block,
const std::vector<size_t> & not_already_sorted_columns,
IColumn::Permutation & permutation,
const EqualRange & range)
{
std::vector<size_t> estimate_unique_count(not_already_sorted_columns.size());
for (size_t i = 0; i < not_already_sorted_columns.size(); ++i)
{
const auto column = block.getByPosition(i).column;
// TODO: improve with sampling
estimate_unique_count[i] = column->estimateNumberOfDifferent(permutation, range, -1);
}
std::vector<size_t> order(not_already_sorted_columns.size());
std::iota(order.begin(), order.end(), 0);
auto comparator = [&](size_t lhs, size_t rhs) -> bool { return estimate_unique_count[lhs] < estimate_unique_count[rhs]; };
::sort(order.begin(), order.end(), comparator);
std::vector<EqualRange> equal_ranges{range};
for (size_t i : order)
{
const size_t column_id = not_already_sorted_columns[i];
const auto column = block.getByPosition(column_id).column;
column->updatePermutationForCompression(permutation, equal_ranges);
}
}
}
std::vector<size_t> getAlreadySortedColumnsIndex(const Block & block, const SortDescription & description)
{
std::vector<size_t> already_sorted_columns;
already_sorted_columns.reserve(description.size());
for (const SortColumnDescription & column_description : description)
{
size_t id = block.getPositionByName(column_description.column_name);
already_sorted_columns.emplace_back(id);
}
::sort(already_sorted_columns.begin(), already_sorted_columns.end());
return already_sorted_columns;
}
std::vector<size_t> getNotAlreadySortedColumnsIndex(const Block & block, const SortDescription & description)
{
std::vector<size_t> not_already_sorted_columns;
not_already_sorted_columns.reserve(block.columns() - description.size());
if (description.empty())
{
not_already_sorted_columns.resize(block.columns());
std::iota(not_already_sorted_columns.begin(), not_already_sorted_columns.end(), 0);
}
else
{
const auto already_sorted_columns = getAlreadySortedColumnsIndex(block, description);
for (size_t i = 0; i < already_sorted_columns.front(); ++i)
not_already_sorted_columns.push_back(i);
for (size_t i = 0; i + 1 < already_sorted_columns.size(); ++i)
for (size_t id = already_sorted_columns[i] + 1; id < already_sorted_columns[i + 1]; ++id)
not_already_sorted_columns.push_back(id);
for (size_t i = already_sorted_columns.back() + 1; i < block.columns(); ++i)
not_already_sorted_columns.push_back(i);
}
return not_already_sorted_columns;
}
void getBestCompressionPermutation(const Block & block, const SortDescription & description, IColumn::Permutation & permutation)
{
const auto equal_ranges = getEqualRanges(block, description, permutation);
const auto not_already_sorted_columns = getNotAlreadySortedColumnsIndex(block, description);
for (const auto & range : equal_ranges)
{
if (getRangeSize(range) <= 1)
continue;
getBestCompressionPermutationImpl(block, not_already_sorted_columns, permutation, range);
}
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
namespace DB
{
std::vector<size_t> getAlreadySortedColumnsIndex(const Block & block, const SortDescription & description);
std::vector<size_t> getNotAlreadySortedColumnsIndex(const Block & block, const SortDescription & description);
EqualRanges getEqualRanges(const Block & block, const SortDescription & description, IColumn::Permutation & permutation);
void getBestCompressionPermutation(const Block & block, const SortDescription & description, IColumn::Permutation & permutation);
}

View File

@ -6,6 +6,7 @@
#include <IO/HashingWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/BestCompressionPermutation.h>
#include <Interpreters/Context.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Processors/TTL/ITTLAlgorithm.h>
@ -20,33 +21,33 @@
#include <Parsers/queryToString.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace ProfileEvents
{
extern const Event MergeTreeDataWriterBlocks;
extern const Event MergeTreeDataWriterBlocksAlreadySorted;
extern const Event MergeTreeDataWriterRows;
extern const Event MergeTreeDataWriterUncompressedBytes;
extern const Event MergeTreeDataWriterCompressedBytes;
extern const Event MergeTreeDataWriterSortingBlocksMicroseconds;
extern const Event MergeTreeDataWriterMergingBlocksMicroseconds;
extern const Event MergeTreeDataWriterProjectionsCalculationMicroseconds;
extern const Event MergeTreeDataProjectionWriterBlocks;
extern const Event MergeTreeDataProjectionWriterBlocksAlreadySorted;
extern const Event MergeTreeDataProjectionWriterRows;
extern const Event MergeTreeDataProjectionWriterUncompressedBytes;
extern const Event MergeTreeDataProjectionWriterCompressedBytes;
extern const Event MergeTreeDataProjectionWriterSortingBlocksMicroseconds;
extern const Event MergeTreeDataProjectionWriterMergingBlocksMicroseconds;
extern const Event RejectedInserts;
extern const Event MergeTreeDataWriterBlocks;
extern const Event MergeTreeDataWriterBlocksAlreadySorted;
extern const Event MergeTreeDataWriterRows;
extern const Event MergeTreeDataWriterUncompressedBytes;
extern const Event MergeTreeDataWriterCompressedBytes;
extern const Event MergeTreeDataWriterSortingBlocksMicroseconds;
extern const Event MergeTreeDataWriterMergingBlocksMicroseconds;
extern const Event MergeTreeDataWriterProjectionsCalculationMicroseconds;
extern const Event MergeTreeDataProjectionWriterBlocks;
extern const Event MergeTreeDataProjectionWriterBlocksAlreadySorted;
extern const Event MergeTreeDataProjectionWriterRows;
extern const Event MergeTreeDataProjectionWriterUncompressedBytes;
extern const Event MergeTreeDataProjectionWriterCompressedBytes;
extern const Event MergeTreeDataProjectionWriterSortingBlocksMicroseconds;
extern const Event MergeTreeDataProjectionWriterMergingBlocksMicroseconds;
extern const Event RejectedInserts;
}
namespace DB
@ -54,20 +55,20 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS;
extern const int ABORTED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_PARTS;
}
namespace
{
void buildScatterSelector(
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector,
size_t max_parts,
ContextPtr context)
const ColumnRawPtrs & columns,
PODArray<size_t> & partition_num_to_first_row,
IColumn::Selector & selector,
size_t max_parts,
ContextPtr context)
{
/// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
@ -89,15 +90,17 @@ void buildScatterSelector(
if (max_parts && partitions_count >= max_parts && throw_on_limit)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(ErrorCodes::TOO_MANY_PARTS,
"Too many partitions for single INSERT block (more than {}). "
"The limit is controlled by 'max_partitions_per_insert_block' setting. "
"Large number of partitions is a common misconception. "
"It will lead to severe negative performance impact, including slow server startup, "
"slow INSERT queries and slow SELECT queries. Recommended total number of partitions "
"for a table is under 1000..10000. Please note, that partitioning is not intended "
"to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). "
"Partitions are intended for data manipulation (DROP PARTITION, etc).", max_parts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many partitions for single INSERT block (more than {}). "
"The limit is controlled by 'max_partitions_per_insert_block' setting. "
"Large number of partitions is a common misconception. "
"It will lead to severe negative performance impact, including slow server startup, "
"slow INSERT queries and slow SELECT queries. Recommended total number of partitions "
"for a table is under 1000..10000. Please note, that partitioning is not intended "
"to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). "
"Partitions are intended for data manipulation (DROP PARTITION, etc).",
max_parts);
}
partition_num_to_first_row.push_back(i);
@ -123,10 +126,14 @@ void buildScatterSelector(
const auto & client_info = context->getClientInfo();
LoggerPtr log = getLogger("MergeTreeDataWriter");
LOG_WARNING(log, "INSERT query from initial_user {} (query ID: {}) inserted a block "
"that created parts in {} partitions. This is being logged "
"rather than throwing an exception as throw_on_max_partitions_per_insert_block=false.",
client_info.initial_user, client_info.initial_query_id, partitions_count);
LOG_WARNING(
log,
"INSERT query from initial_user {} (query ID: {}) inserted a block "
"that created parts in {} partitions. This is being logged "
"rather than throwing an exception as throw_on_max_partitions_per_insert_block=false.",
client_info.initial_user,
client_info.initial_query_id,
partitions_count);
}
}
@ -202,16 +209,13 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfoPtr async_insert_info, const IColumn::Selector & selector, size_t partition_num)
std::vector<AsyncInsertInfoPtr>
scatterAsyncInsertInfoBySelector(AsyncInsertInfoPtr async_insert_info, const IColumn::Selector & selector, size_t partition_num)
{
if (nullptr == async_insert_info)
{
return {};
}
if (selector.empty())
{
return {async_insert_info};
}
std::vector<AsyncInsertInfoPtr> result(partition_num);
std::vector<Int64> last_row_for_partition(partition_num, -1);
size_t offset_idx = 0;
@ -241,7 +245,11 @@ std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfo
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
Block && block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
Block && block,
size_t max_parts,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
AsyncInsertInfoPtr async_insert_info)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -273,7 +281,8 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts, context);
auto async_insert_info_with_partition = scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
auto async_insert_info_with_partition
= scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
@ -342,15 +351,32 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
block,
1,
sort_description,
merging_params.is_deleted_column,
merging_params.version_column,
block_size + 1,
/*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,
false, block_size + 1, /*block_size_bytes=*/0, getLogger("MergeTreeDataWriter"));
block,
1,
sort_description,
merging_params.sign_column,
false,
block_size + 1,
/*block_size_bytes=*/0,
getLogger("MergeTreeDataWriter"));
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, merging_params.columns_to_sum,
partition_key_columns, block_size + 1, /*block_size_bytes=*/0);
block,
1,
sort_description,
merging_params.columns_to_sum,
partition_key_columns,
block_size + 1,
/*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::VersionedCollapsing:
@ -384,7 +410,13 @@ Block MergeTreeDataWriter::mergeBlock(
/// Check that after first merge merging_algorithm is waiting for data from input 0.
if (status.required_source != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required source after the first merge is not 0. Chunk rows: {}, is_finished: {}, required_source: {}, algorithm: {}", status.chunk.getNumRows(), status.is_finished, status.required_source, merging_algorithm->getName());
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Required source after the first merge is not 0. Chunk rows: {}, is_finished: {}, required_source: {}, algorithm: {}",
status.chunk.getNumRows(),
status.is_finished,
status.required_source,
merging_algorithm->getName());
status = merging_algorithm->merge();
@ -399,14 +431,16 @@ Block MergeTreeDataWriter::mergeBlock(
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
MergeTreeDataWriter::TemporaryPart
MergeTreeDataWriter::writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
return writeTempPartImpl(block, metadata_snapshot, context, data.insert_increment.get(), /*need_tmp_prefix = */true);
return writeTempPartImpl(block, metadata_snapshot, context, data.insert_increment.get(), /*need_tmp_prefix = */ true);
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartWithoutPrefix(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context)
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartWithoutPrefix(
BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, int64_t block_number, ContextPtr context)
{
return writeTempPartImpl(block, metadata_snapshot, context, block_number, /*need_tmp_prefix = */false);
return writeTempPartImpl(block, metadata_snapshot, context, block_number, /*need_tmp_prefix = */ false);
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
@ -498,6 +532,20 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
}
if (data.getSettings()->allow_experimental_improve_compression_raws_order)
{
LOG_DEBUG(
log, "allow_experimental_improve_compression_raws_order=true");
getBestCompressionPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
{
LOG_DEBUG(
log, "allow_experimental_improve_compression_raws_order=false");
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
if (context->getSettingsRef().optimize_on_insert)
{
@ -518,14 +566,15 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
for (const auto & ttl_entry : move_ttl_entries)
updateTTL(context, ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
ReservationPtr reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
auto new_data_part = data.getDataPartBuilder(part_name, data_part_volume, part_dir)
.withPartFormat(data.choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info)
.build();
.withPartFormat(data.choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info)
.build();
auto data_part_storage = new_data_part->getDataPartStoragePtr();
data_part_storage->beginTransaction();
@ -575,17 +624,25 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
updateTTL(context, metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
for (const auto & ttl_entry : metadata_snapshot->getGroupByTTLs())
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
updateTTL(
context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
for (const auto & ttl_entry : metadata_snapshot->getRowsWhereTTLs())
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
updateTTL(
context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
const auto & recompression_ttl_entries = metadata_snapshot->getRecompressionTTLs();
for (const auto & ttl_entry : recompression_ttl_entries)
updateTTL(context, ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column], block, false);
updateTTL(
context,
ttl_entry,
new_data_part->ttl_infos,
new_data_part->ttl_infos.recompression_ttl[ttl_entry.result_column],
block,
false);
new_data_part->ttl_infos.update(move_ttl_infos);
@ -613,7 +670,8 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataWriterProjectionsCalculationMicroseconds);
projection_block = projection.calculate(block, context);
LOG_DEBUG(log, "Spent {} ms calculating projection {} for the part {}", watch.elapsed() / 1000, projection.name, new_data_part->name);
LOG_DEBUG(
log, "Spent {} ms calculating projection {} for the part {}", watch.elapsed() / 1000, projection.name, new_data_part->name);
}
if (projection_block.rows())
@ -626,10 +684,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
}
}
auto finalizer = out->finalizePartAsync(
new_data_part,
data_settings->fsync_after_insert,
nullptr, nullptr);
auto finalizer = out->finalizePartAsync(new_data_part, data_settings->fsync_after_insert, nullptr, nullptr);
temp_part.part = new_data_part;
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
@ -718,6 +773,18 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterBlocksAlreadySorted);
}
if (data.getSettings()->allow_experimental_improve_compression_raws_order)
{
LOG_DEBUG(log, "allow_experimental_improve_compression_raws_order=true");
getBestCompressionPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
else
{
LOG_DEBUG(log, "allow_experimental_improve_compression_raws_order=false");
}
if (projection.type == ProjectionDescription::Type::Aggregate && merge_is_needed)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::MergeTreeDataProjectionWriterMergingBlocksMicroseconds);
@ -739,7 +806,9 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
Statistics{}, /// TODO(hanfei): It should be helpful to write statistics for projection result.
compression_codec,
NO_TRANSACTION_PTR,
false, false, data.getContext()->getWriteSettings());
false,
false,
data.getContext()->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, false);

View File

@ -198,6 +198,7 @@ struct Settings;
M(Bool, cache_populated_by_fetch, false, "Only available in ClickHouse Cloud", 0) \
M(Bool, force_read_through_cache_for_merges, false, "Force read-through filesystem cache for merges", 0) \
M(Bool, allow_experimental_replacing_merge_with_cleanup, false, "Allow experimental CLEANUP merges for ReplacingMergeTree with is_deleted column.", 0) \
M(Bool, allow_experimental_improve_compression_raws_order, false, "Some text about this setting", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \