mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
Merge pull request #72770 from ClickHouse/more_insistent_compress_in_memory_eng
More insistent compression in `StorageMemory`
This commit is contained in:
commit
b0454acb14
@ -36,6 +36,8 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
|
||||
- Requires `max_rows_to_keep`
|
||||
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
||||
- Default value: `0`
|
||||
- `compress` - Whether to compress data in memory.
|
||||
- Default value: `false`
|
||||
|
||||
## Usage {#usage}
|
||||
|
||||
|
@ -1024,10 +1024,10 @@ void ColumnArray::updatePermutationWithCollation(const Collator & collator, Perm
|
||||
DefaultPartialSort());
|
||||
}
|
||||
|
||||
ColumnPtr ColumnArray::compress() const
|
||||
ColumnPtr ColumnArray::compress(bool force_compression) const
|
||||
{
|
||||
ColumnPtr data_compressed = data->compress();
|
||||
ColumnPtr offsets_compressed = offsets->compress();
|
||||
ColumnPtr data_compressed = data->compress(force_compression);
|
||||
ColumnPtr offsets_compressed = offsets->compress(force_compression);
|
||||
|
||||
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();
|
||||
|
||||
|
@ -159,7 +159,7 @@ public:
|
||||
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
|
||||
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||
|
@ -16,7 +16,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool always_compress)
|
||||
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool force_compression)
|
||||
{
|
||||
size_t max_dest_size = LZ4_COMPRESSBOUND(data_size);
|
||||
|
||||
@ -35,7 +35,8 @@ std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, si
|
||||
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
|
||||
|
||||
/// If compression is inefficient.
|
||||
if (!always_compress && static_cast<size_t>(compressed_size) * 2 > data_size)
|
||||
const size_t threshold = force_compression ? 1 : 2;
|
||||
if (static_cast<size_t>(compressed_size) * threshold > data_size)
|
||||
return {};
|
||||
|
||||
/// Shrink to fit.
|
||||
|
@ -72,7 +72,7 @@ public:
|
||||
|
||||
/// If data is not worth to be compressed and not 'always_compress' - returns nullptr.
|
||||
/// Note: shared_ptr is to allow to be captured by std::function.
|
||||
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool always_compress);
|
||||
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool force_compression);
|
||||
|
||||
static void decompressBuffer(
|
||||
const void * compressed_data, void * decompressed_data, size_t compressed_size, size_t decompressed_size);
|
||||
|
@ -478,7 +478,7 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets & offsets) const
|
||||
}
|
||||
|
||||
template <is_decimal T>
|
||||
ColumnPtr ColumnDecimal<T>::compress() const
|
||||
ColumnPtr ColumnDecimal<T>::compress(bool force_compression) const
|
||||
{
|
||||
const size_t data_size = data.size();
|
||||
const size_t source_size = data_size * sizeof(T);
|
||||
@ -487,7 +487,7 @@ ColumnPtr ColumnDecimal<T>::compress() const
|
||||
if (source_size < 4096) /// A wild guess.
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
|
||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
|
||||
|
||||
if (!compressed)
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
@ -140,7 +140,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
void insertValue(const T value) { data.push_back(value); }
|
||||
Container & getData() { return data; }
|
||||
|
@ -991,9 +991,9 @@ void ColumnDynamic::updatePermutation(IColumn::PermutationSortDirection directio
|
||||
updatePermutationImpl(limit, res, equal_ranges, ComparatorDescendingStable(*this, nan_direction_hint), comparator_equal, DefaultSort(), DefaultPartialSort());
|
||||
}
|
||||
|
||||
ColumnPtr ColumnDynamic::compress() const
|
||||
ColumnPtr ColumnDynamic::compress(bool force_compression) const
|
||||
{
|
||||
ColumnPtr variant_compressed = variant_column_ptr->compress();
|
||||
ColumnPtr variant_compressed = variant_column_ptr->compress(force_compression);
|
||||
size_t byte_size = variant_compressed->byteSize();
|
||||
return ColumnCompressed::create(size(), byte_size,
|
||||
[my_variant_compressed = std::move(variant_compressed), my_variant_info = variant_info, my_max_dynamic_types = max_dynamic_types, my_global_max_dynamic_types = global_max_dynamic_types, my_statistics = statistics]() mutable
|
||||
|
@ -335,7 +335,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
double getRatioOfDefaultRows(double sample_ratio) const override
|
||||
{
|
||||
|
@ -419,7 +419,7 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
|
||||
get(max_idx, max);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnFixedString::compress() const
|
||||
ColumnPtr ColumnFixedString::compress(bool force_compression) const
|
||||
{
|
||||
size_t source_size = chars.size();
|
||||
|
||||
@ -427,7 +427,7 @@ ColumnPtr ColumnFixedString::compress() const
|
||||
if (source_size < 4096) /// A wild guess.
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
||||
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, false);
|
||||
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, force_compression);
|
||||
|
||||
if (!compressed)
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
@ -175,7 +175,7 @@ public:
|
||||
|
||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
void reserve(size_t size) override
|
||||
{
|
||||
|
@ -352,9 +352,9 @@ bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const
|
||||
return false;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnMap::compress() const
|
||||
ColumnPtr ColumnMap::compress(bool force_compression) const
|
||||
{
|
||||
auto compressed = nested->compress();
|
||||
auto compressed = nested->compress(force_compression);
|
||||
const auto byte_size = compressed->byteSize();
|
||||
/// The order of evaluation of function arguments is unspecified
|
||||
/// and could cause interacting with object in moved-from state
|
||||
|
@ -120,7 +120,7 @@ public:
|
||||
const ColumnTuple & getNestedData() const { return assert_cast<const ColumnTuple &>(getNestedColumn().getData()); }
|
||||
ColumnTuple & getNestedData() { return assert_cast<ColumnTuple &>(getNestedColumn().getData()); }
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
|
||||
bool dynamicStructureEquals(const IColumn & rhs) const override;
|
||||
|
@ -773,10 +773,10 @@ void ColumnNullable::protect()
|
||||
getNullMapColumn().protect();
|
||||
}
|
||||
|
||||
ColumnPtr ColumnNullable::compress() const
|
||||
ColumnPtr ColumnNullable::compress(bool force_compression) const
|
||||
{
|
||||
ColumnPtr nested_compressed = nested_column->compress();
|
||||
ColumnPtr null_map_compressed = null_map->compress();
|
||||
ColumnPtr nested_compressed = nested_column->compress(force_compression);
|
||||
ColumnPtr null_map_compressed = null_map->compress(force_compression);
|
||||
|
||||
size_t byte_size = nested_column->byteSize() + null_map->byteSize();
|
||||
|
||||
|
@ -141,7 +141,7 @@ public:
|
||||
// Special function for nullable minmax index
|
||||
void getExtremesNullLast(Field & min, Field & max) const;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||
|
@ -1224,14 +1224,14 @@ bool ColumnObject::structureEquals(const IColumn & rhs) const
|
||||
return true;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnObject::compress() const
|
||||
ColumnPtr ColumnObject::compress(bool force_compression) const
|
||||
{
|
||||
std::unordered_map<String, ColumnPtr> compressed_typed_paths;
|
||||
compressed_typed_paths.reserve(typed_paths.size());
|
||||
size_t byte_size = 0;
|
||||
for (const auto & [path, column] : typed_paths)
|
||||
{
|
||||
auto compressed_column = column->compress();
|
||||
auto compressed_column = column->compress(force_compression);
|
||||
byte_size += compressed_column->byteSize();
|
||||
compressed_typed_paths[path] = std::move(compressed_column);
|
||||
}
|
||||
@ -1240,12 +1240,12 @@ ColumnPtr ColumnObject::compress() const
|
||||
compressed_dynamic_paths.reserve(dynamic_paths_ptrs.size());
|
||||
for (const auto & [path, column] : dynamic_paths_ptrs)
|
||||
{
|
||||
auto compressed_column = column->compress();
|
||||
auto compressed_column = column->compress(force_compression);
|
||||
byte_size += compressed_column->byteSize();
|
||||
compressed_dynamic_paths[path] = std::move(compressed_column);
|
||||
}
|
||||
|
||||
auto compressed_shared_data = shared_data->compress();
|
||||
auto compressed_shared_data = shared_data->compress(force_compression);
|
||||
byte_size += compressed_shared_data->byteSize();
|
||||
|
||||
auto decompress =
|
||||
|
@ -171,7 +171,7 @@ public:
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
void finalize() override;
|
||||
bool isFinalized() const override;
|
||||
|
@ -774,10 +774,10 @@ UInt64 ColumnSparse::getNumberOfDefaultRows() const
|
||||
return _size - offsets->size();
|
||||
}
|
||||
|
||||
ColumnPtr ColumnSparse::compress() const
|
||||
ColumnPtr ColumnSparse::compress(bool force_compression) const
|
||||
{
|
||||
auto values_compressed = values->compress();
|
||||
auto offsets_compressed = offsets->compress();
|
||||
auto values_compressed = values->compress(force_compression);
|
||||
auto offsets_compressed = offsets->compress(force_compression);
|
||||
|
||||
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
|
||||
|
||||
|
@ -147,7 +147,7 @@ public:
|
||||
double getRatioOfDefaultRows(double sample_ratio) const override;
|
||||
UInt64 getNumberOfDefaultRows() const override;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
ColumnCheckpointPtr getCheckpoint() const override;
|
||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||
|
@ -627,7 +627,7 @@ void ColumnString::getExtremes(Field & min, Field & max) const
|
||||
get(max_idx, max);
|
||||
}
|
||||
|
||||
ColumnPtr ColumnString::compress() const
|
||||
ColumnPtr ColumnString::compress(bool force_compression) const
|
||||
{
|
||||
const size_t source_chars_size = chars.size();
|
||||
const size_t source_offsets_elements = offsets.size();
|
||||
@ -637,13 +637,13 @@ ColumnPtr ColumnString::compress() const
|
||||
if (source_chars_size < 4096) /// A wild guess.
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
||||
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, false);
|
||||
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, force_compression);
|
||||
|
||||
/// Return original column if not compressible.
|
||||
if (!chars_compressed)
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
||||
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, true);
|
||||
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, /*force_compression=*/true);
|
||||
|
||||
const size_t chars_compressed_size = chars_compressed->size();
|
||||
const size_t offsets_compressed_size = offsets_compressed->size();
|
||||
|
@ -284,7 +284,7 @@ public:
|
||||
|
||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
void reserve(size_t n) override;
|
||||
size_t capacity() const override;
|
||||
|
@ -796,7 +796,7 @@ void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_c
|
||||
}
|
||||
|
||||
|
||||
ColumnPtr ColumnTuple::compress() const
|
||||
ColumnPtr ColumnTuple::compress(bool force_compression) const
|
||||
{
|
||||
if (columns.empty())
|
||||
{
|
||||
@ -812,7 +812,7 @@ ColumnPtr ColumnTuple::compress() const
|
||||
compressed.reserve(columns.size());
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
auto compressed_column = column->compress();
|
||||
auto compressed_column = column->compress(force_compression);
|
||||
byte_size += compressed_column->byteSize();
|
||||
compressed.emplace_back(std::move(compressed_column));
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ public:
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
bool isCollationSupported() const override;
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
void finalize() override;
|
||||
bool isFinalized() const override;
|
||||
|
||||
|
@ -1426,16 +1426,16 @@ bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const
|
||||
return true;
|
||||
}
|
||||
|
||||
ColumnPtr ColumnVariant::compress() const
|
||||
ColumnPtr ColumnVariant::compress(bool force_compression) const
|
||||
{
|
||||
ColumnPtr local_discriminators_compressed = local_discriminators->compress();
|
||||
ColumnPtr offsets_compressed = offsets->compress();
|
||||
ColumnPtr local_discriminators_compressed = local_discriminators->compress(force_compression);
|
||||
ColumnPtr offsets_compressed = offsets->compress(force_compression);
|
||||
size_t byte_size = local_discriminators_compressed->byteSize() + offsets_compressed->byteSize();
|
||||
Columns compressed;
|
||||
compressed.reserve(variants.size());
|
||||
for (const auto & variant : variants)
|
||||
{
|
||||
auto compressed_variant = variant->compress();
|
||||
auto compressed_variant = variant->compress(force_compression);
|
||||
byte_size += compressed_variant->byteSize();
|
||||
compressed.emplace_back(std::move(compressed_variant));
|
||||
}
|
||||
|
@ -254,7 +254,7 @@ public:
|
||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
double getRatioOfDefaultRows(double sample_ratio) const override;
|
||||
UInt64 getNumberOfDefaultRows() const override;
|
||||
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
|
||||
|
@ -951,7 +951,7 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnVector<T>::compress() const
|
||||
ColumnPtr ColumnVector<T>::compress(bool force_compression) const
|
||||
{
|
||||
const size_t data_size = data.size();
|
||||
const size_t source_size = data_size * sizeof(T);
|
||||
@ -960,7 +960,7 @@ ColumnPtr ColumnVector<T>::compress() const
|
||||
if (source_size < 4096) /// A wild guess.
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
|
||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
|
||||
|
||||
if (!compressed)
|
||||
return ColumnCompressed::wrap(this->getPtr());
|
||||
|
@ -286,7 +286,7 @@ public:
|
||||
|
||||
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
|
||||
|
||||
ColumnPtr compress() const override;
|
||||
ColumnPtr compress(bool force_compression) const override;
|
||||
|
||||
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
|
||||
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);
|
||||
|
@ -601,7 +601,8 @@ public:
|
||||
|
||||
/// Compress column in memory to some representation that allows to decompress it back.
|
||||
/// Return itself if compression is not applicable for this column type.
|
||||
[[nodiscard]] virtual Ptr compress() const
|
||||
/// The flag `force_compression` indicates that compression should be performed even if it's not efficient (if only compression factor < 1).
|
||||
[[nodiscard]] virtual Ptr compress([[maybe_unused]] bool force_compression) const
|
||||
{
|
||||
/// No compression by default.
|
||||
return getPtr();
|
||||
|
@ -608,7 +608,7 @@ Block Block::compress() const
|
||||
size_t num_columns = data.size();
|
||||
Columns new_columns(num_columns);
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
new_columns[i] = data[i].column->compress();
|
||||
new_columns[i] = data[i].column->compress(/*force_compression=*/false);
|
||||
return cloneWithColumns(new_columns);
|
||||
}
|
||||
|
||||
|
@ -469,7 +469,7 @@ void QueryCache::Writer::finalizeWrite()
|
||||
Columns compressed_columns;
|
||||
for (const auto & column : columns)
|
||||
{
|
||||
auto compressed_column = column->compress();
|
||||
auto compressed_column = column->compress(/*force_compression=*/false);
|
||||
compressed_columns.push_back(compressed_column);
|
||||
}
|
||||
Chunk compressed_chunk(compressed_columns, chunk.getNumRows());
|
||||
|
@ -91,8 +91,7 @@ public:
|
||||
{
|
||||
Block compressed_block;
|
||||
for (const auto & elem : block)
|
||||
compressed_block.insert({ elem.column->compress(), elem.type, elem.name });
|
||||
|
||||
compressed_block.insert({elem.column->compress(/*force_compression=*/true), elem.type, elem.name});
|
||||
new_blocks.push_back(std::move(compressed_block));
|
||||
}
|
||||
else
|
||||
@ -259,7 +258,7 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
|
||||
{
|
||||
if ((*memory_settings)[MemorySetting::compress])
|
||||
for (auto & elem : block)
|
||||
elem.column = elem.column->compress();
|
||||
elem.column = elem.column->compress(/*force_compression=*/true);
|
||||
|
||||
out.push_back(block);
|
||||
}
|
||||
@ -574,7 +573,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
|
||||
{
|
||||
Block compressed_block;
|
||||
for (const auto & elem : block)
|
||||
compressed_block.insert({ elem.column->compress(), elem.type, elem.name });
|
||||
compressed_block.insert({elem.column->compress(/*force_compression=*/true), elem.type, elem.name});
|
||||
|
||||
new_blocks.push_back(std::move(compressed_block));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user