mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 09:52:38 +00:00
Merge pull request #72895 from ClickHouse/more_insistent_compress_in_memory_eng_2
Resubmit More insistent compression in StorageMemory
This commit is contained in:
commit
5ff265eea0
@ -36,6 +36,8 @@ Upper and lower bounds can be specified to limit Memory engine table size, effec
|
|||||||
- Requires `max_rows_to_keep`
|
- Requires `max_rows_to_keep`
|
||||||
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
- `max_rows_to_keep` — Maximum rows to keep within memory table where oldest rows are deleted on each insertion (i.e circular buffer). Max rows can exceed the stated limit if the oldest batch of rows to remove falls under the `min_rows_to_keep` limit when adding a large block.
|
||||||
- Default value: `0`
|
- Default value: `0`
|
||||||
|
- `compress` - Whether to compress data in memory.
|
||||||
|
- Default value: `false`
|
||||||
|
|
||||||
## Usage {#usage}
|
## Usage {#usage}
|
||||||
|
|
||||||
|
@ -1024,10 +1024,10 @@ void ColumnArray::updatePermutationWithCollation(const Collator & collator, Perm
|
|||||||
DefaultPartialSort());
|
DefaultPartialSort());
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnArray::compress() const
|
ColumnPtr ColumnArray::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
ColumnPtr data_compressed = data->compress();
|
ColumnPtr data_compressed = data->compress(force_compression);
|
||||||
ColumnPtr offsets_compressed = offsets->compress();
|
ColumnPtr offsets_compressed = offsets->compress(force_compression);
|
||||||
|
|
||||||
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();
|
size_t byte_size = data_compressed->byteSize() + offsets_compressed->byteSize();
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ public:
|
|||||||
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
|
/// For example, `getDataInRange(0, size())` is the same as `getDataPtr()->clone()`.
|
||||||
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
|
MutableColumnPtr getDataInRange(size_t start, size_t length) const;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
ColumnCheckpointPtr getCheckpoint() const override;
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
@ -16,7 +16,7 @@ namespace ErrorCodes
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool always_compress)
|
std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, size_t data_size, bool force_compression)
|
||||||
{
|
{
|
||||||
size_t max_dest_size = LZ4_COMPRESSBOUND(data_size);
|
size_t max_dest_size = LZ4_COMPRESSBOUND(data_size);
|
||||||
|
|
||||||
@ -35,7 +35,8 @@ std::shared_ptr<Memory<>> ColumnCompressed::compressBuffer(const void * data, si
|
|||||||
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
|
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
|
||||||
|
|
||||||
/// If compression is inefficient.
|
/// If compression is inefficient.
|
||||||
if (!always_compress && static_cast<size_t>(compressed_size) * 2 > data_size)
|
const size_t threshold = force_compression ? 1 : 2;
|
||||||
|
if (static_cast<size_t>(compressed_size) * threshold > data_size)
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
/// Shrink to fit.
|
/// Shrink to fit.
|
||||||
|
@ -70,9 +70,11 @@ public:
|
|||||||
|
|
||||||
/// Helper methods for compression.
|
/// Helper methods for compression.
|
||||||
|
|
||||||
/// If data is not worth to be compressed and not 'always_compress' - returns nullptr.
|
/// If data is not worth to be compressed - returns nullptr.
|
||||||
|
/// By default it requires that compressed data is at least 50% smaller than original.
|
||||||
|
/// With `force_compression` set to true, it requires compressed data to be not larger than the source data.
|
||||||
/// Note: shared_ptr is to allow to be captured by std::function.
|
/// Note: shared_ptr is to allow to be captured by std::function.
|
||||||
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool always_compress);
|
static std::shared_ptr<Memory<>> compressBuffer(const void * data, size_t data_size, bool force_compression);
|
||||||
|
|
||||||
static void decompressBuffer(
|
static void decompressBuffer(
|
||||||
const void * compressed_data, void * decompressed_data, size_t compressed_size, size_t decompressed_size);
|
const void * compressed_data, void * decompressed_data, size_t compressed_size, size_t decompressed_size);
|
||||||
|
@ -478,7 +478,7 @@ ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets & offsets) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <is_decimal T>
|
template <is_decimal T>
|
||||||
ColumnPtr ColumnDecimal<T>::compress() const
|
ColumnPtr ColumnDecimal<T>::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
const size_t data_size = data.size();
|
const size_t data_size = data.size();
|
||||||
const size_t source_size = data_size * sizeof(T);
|
const size_t source_size = data_size * sizeof(T);
|
||||||
@ -487,7 +487,7 @@ ColumnPtr ColumnDecimal<T>::compress() const
|
|||||||
if (source_size < 4096) /// A wild guess.
|
if (source_size < 4096) /// A wild guess.
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
|
||||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
|
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
|
||||||
|
|
||||||
if (!compressed)
|
if (!compressed)
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
@ -140,7 +140,7 @@ public:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
void insertValue(const T value) { data.push_back(value); }
|
void insertValue(const T value) { data.push_back(value); }
|
||||||
Container & getData() { return data; }
|
Container & getData() { return data; }
|
||||||
|
@ -991,9 +991,9 @@ void ColumnDynamic::updatePermutation(IColumn::PermutationSortDirection directio
|
|||||||
updatePermutationImpl(limit, res, equal_ranges, ComparatorDescendingStable(*this, nan_direction_hint), comparator_equal, DefaultSort(), DefaultPartialSort());
|
updatePermutationImpl(limit, res, equal_ranges, ComparatorDescendingStable(*this, nan_direction_hint), comparator_equal, DefaultSort(), DefaultPartialSort());
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnDynamic::compress() const
|
ColumnPtr ColumnDynamic::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
ColumnPtr variant_compressed = variant_column_ptr->compress();
|
ColumnPtr variant_compressed = variant_column_ptr->compress(force_compression);
|
||||||
size_t byte_size = variant_compressed->byteSize();
|
size_t byte_size = variant_compressed->byteSize();
|
||||||
return ColumnCompressed::create(size(), byte_size,
|
return ColumnCompressed::create(size(), byte_size,
|
||||||
[my_variant_compressed = std::move(variant_compressed), my_variant_info = variant_info, my_max_dynamic_types = max_dynamic_types, my_global_max_dynamic_types = global_max_dynamic_types, my_statistics = statistics]() mutable
|
[my_variant_compressed = std::move(variant_compressed), my_variant_info = variant_info, my_max_dynamic_types = max_dynamic_types, my_global_max_dynamic_types = global_max_dynamic_types, my_statistics = statistics]() mutable
|
||||||
|
@ -335,7 +335,7 @@ public:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
double getRatioOfDefaultRows(double sample_ratio) const override
|
double getRatioOfDefaultRows(double sample_ratio) const override
|
||||||
{
|
{
|
||||||
|
@ -419,7 +419,7 @@ void ColumnFixedString::getExtremes(Field & min, Field & max) const
|
|||||||
get(max_idx, max);
|
get(max_idx, max);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnFixedString::compress() const
|
ColumnPtr ColumnFixedString::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
size_t source_size = chars.size();
|
size_t source_size = chars.size();
|
||||||
|
|
||||||
@ -427,7 +427,7 @@ ColumnPtr ColumnFixedString::compress() const
|
|||||||
if (source_size < 4096) /// A wild guess.
|
if (source_size < 4096) /// A wild guess.
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
|
||||||
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, false);
|
auto compressed = ColumnCompressed::compressBuffer(chars.data(), source_size, force_compression);
|
||||||
|
|
||||||
if (!compressed)
|
if (!compressed)
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
@ -175,7 +175,7 @@ public:
|
|||||||
|
|
||||||
ColumnPtr replicate(const Offsets & offsets) const override;
|
ColumnPtr replicate(const Offsets & offsets) const override;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
void reserve(size_t size) override
|
void reserve(size_t size) override
|
||||||
{
|
{
|
||||||
|
@ -352,9 +352,9 @@ bool ColumnMap::dynamicStructureEquals(const IColumn & rhs) const
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnMap::compress() const
|
ColumnPtr ColumnMap::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
auto compressed = nested->compress();
|
auto compressed = nested->compress(force_compression);
|
||||||
const auto byte_size = compressed->byteSize();
|
const auto byte_size = compressed->byteSize();
|
||||||
/// The order of evaluation of function arguments is unspecified
|
/// The order of evaluation of function arguments is unspecified
|
||||||
/// and could cause interacting with object in moved-from state
|
/// and could cause interacting with object in moved-from state
|
||||||
|
@ -120,7 +120,7 @@ public:
|
|||||||
const ColumnTuple & getNestedData() const { return assert_cast<const ColumnTuple &>(getNestedColumn().getData()); }
|
const ColumnTuple & getNestedData() const { return assert_cast<const ColumnTuple &>(getNestedColumn().getData()); }
|
||||||
ColumnTuple & getNestedData() { return assert_cast<ColumnTuple &>(getNestedColumn().getData()); }
|
ColumnTuple & getNestedData() { return assert_cast<ColumnTuple &>(getNestedColumn().getData()); }
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
|
bool hasDynamicStructure() const override { return nested->hasDynamicStructure(); }
|
||||||
bool dynamicStructureEquals(const IColumn & rhs) const override;
|
bool dynamicStructureEquals(const IColumn & rhs) const override;
|
||||||
|
@ -773,10 +773,10 @@ void ColumnNullable::protect()
|
|||||||
getNullMapColumn().protect();
|
getNullMapColumn().protect();
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnNullable::compress() const
|
ColumnPtr ColumnNullable::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
ColumnPtr nested_compressed = nested_column->compress();
|
ColumnPtr nested_compressed = nested_column->compress(force_compression);
|
||||||
ColumnPtr null_map_compressed = null_map->compress();
|
ColumnPtr null_map_compressed = null_map->compress(force_compression);
|
||||||
|
|
||||||
size_t byte_size = nested_column->byteSize() + null_map->byteSize();
|
size_t byte_size = nested_column->byteSize() + null_map->byteSize();
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public:
|
|||||||
// Special function for nullable minmax index
|
// Special function for nullable minmax index
|
||||||
void getExtremesNullLast(Field & min, Field & max) const;
|
void getExtremesNullLast(Field & min, Field & max) const;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
ColumnCheckpointPtr getCheckpoint() const override;
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
@ -1225,14 +1225,14 @@ bool ColumnObject::structureEquals(const IColumn & rhs) const
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnObject::compress() const
|
ColumnPtr ColumnObject::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
std::unordered_map<String, ColumnPtr> compressed_typed_paths;
|
std::unordered_map<String, ColumnPtr> compressed_typed_paths;
|
||||||
compressed_typed_paths.reserve(typed_paths.size());
|
compressed_typed_paths.reserve(typed_paths.size());
|
||||||
size_t byte_size = 0;
|
size_t byte_size = 0;
|
||||||
for (const auto & [path, column] : typed_paths)
|
for (const auto & [path, column] : typed_paths)
|
||||||
{
|
{
|
||||||
auto compressed_column = column->compress();
|
auto compressed_column = column->compress(force_compression);
|
||||||
byte_size += compressed_column->byteSize();
|
byte_size += compressed_column->byteSize();
|
||||||
compressed_typed_paths[path] = std::move(compressed_column);
|
compressed_typed_paths[path] = std::move(compressed_column);
|
||||||
}
|
}
|
||||||
@ -1241,12 +1241,12 @@ ColumnPtr ColumnObject::compress() const
|
|||||||
compressed_dynamic_paths.reserve(dynamic_paths_ptrs.size());
|
compressed_dynamic_paths.reserve(dynamic_paths_ptrs.size());
|
||||||
for (const auto & [path, column] : dynamic_paths_ptrs)
|
for (const auto & [path, column] : dynamic_paths_ptrs)
|
||||||
{
|
{
|
||||||
auto compressed_column = column->compress();
|
auto compressed_column = column->compress(force_compression);
|
||||||
byte_size += compressed_column->byteSize();
|
byte_size += compressed_column->byteSize();
|
||||||
compressed_dynamic_paths[path] = std::move(compressed_column);
|
compressed_dynamic_paths[path] = std::move(compressed_column);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto compressed_shared_data = shared_data->compress();
|
auto compressed_shared_data = shared_data->compress(force_compression);
|
||||||
byte_size += compressed_shared_data->byteSize();
|
byte_size += compressed_shared_data->byteSize();
|
||||||
|
|
||||||
auto decompress =
|
auto decompress =
|
||||||
|
@ -171,7 +171,7 @@ public:
|
|||||||
|
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
void finalize() override;
|
void finalize() override;
|
||||||
bool isFinalized() const override;
|
bool isFinalized() const override;
|
||||||
|
@ -774,10 +774,10 @@ UInt64 ColumnSparse::getNumberOfDefaultRows() const
|
|||||||
return _size - offsets->size();
|
return _size - offsets->size();
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnSparse::compress() const
|
ColumnPtr ColumnSparse::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
auto values_compressed = values->compress();
|
auto values_compressed = values->compress(force_compression);
|
||||||
auto offsets_compressed = offsets->compress();
|
auto offsets_compressed = offsets->compress(force_compression);
|
||||||
|
|
||||||
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
|
size_t byte_size = values_compressed->byteSize() + offsets_compressed->byteSize();
|
||||||
|
|
||||||
|
@ -147,7 +147,7 @@ public:
|
|||||||
double getRatioOfDefaultRows(double sample_ratio) const override;
|
double getRatioOfDefaultRows(double sample_ratio) const override;
|
||||||
UInt64 getNumberOfDefaultRows() const override;
|
UInt64 getNumberOfDefaultRows() const override;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
ColumnCheckpointPtr getCheckpoint() const override;
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
@ -628,33 +628,46 @@ void ColumnString::getExtremes(Field & min, Field & max) const
|
|||||||
get(max_idx, max);
|
get(max_idx, max);
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnString::compress() const
|
ColumnPtr ColumnString::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
const size_t source_chars_size = chars.size();
|
const size_t source_chars_size = chars.size();
|
||||||
const size_t source_offsets_elements = offsets.size();
|
const size_t source_offsets_elements = offsets.size();
|
||||||
const size_t source_offsets_size = source_offsets_elements * sizeof(Offset);
|
const size_t source_offsets_size = source_offsets_elements * sizeof(Offset);
|
||||||
|
|
||||||
/// Don't compress small blocks.
|
/// Don't compress small blocks.
|
||||||
if (source_chars_size < 4096) /// A wild guess.
|
if (source_chars_size < min_size_to_compress)
|
||||||
|
{
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
}
|
||||||
|
|
||||||
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, false);
|
auto chars_compressed = ColumnCompressed::compressBuffer(chars.data(), source_chars_size, force_compression);
|
||||||
|
|
||||||
/// Return original column if not compressible.
|
/// Return original column if not compressible.
|
||||||
if (!chars_compressed)
|
if (!chars_compressed)
|
||||||
|
{
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
}
|
||||||
|
|
||||||
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, true);
|
auto offsets_compressed = ColumnCompressed::compressBuffer(offsets.data(), source_offsets_size, force_compression);
|
||||||
|
const bool offsets_were_compressed = !!offsets_compressed;
|
||||||
|
|
||||||
|
/// Offsets are not compressible. Use the source data.
|
||||||
|
if (!offsets_compressed)
|
||||||
|
{
|
||||||
|
offsets_compressed = std::make_shared<Memory<>>(source_offsets_size);
|
||||||
|
memcpy(offsets_compressed->data(), offsets.data(), source_offsets_size);
|
||||||
|
}
|
||||||
|
|
||||||
const size_t chars_compressed_size = chars_compressed->size();
|
const size_t chars_compressed_size = chars_compressed->size();
|
||||||
const size_t offsets_compressed_size = offsets_compressed->size();
|
const size_t offsets_compressed_size = offsets_compressed->size();
|
||||||
return ColumnCompressed::create(source_offsets_elements, chars_compressed_size + offsets_compressed_size,
|
return ColumnCompressed::create(
|
||||||
[
|
source_offsets_elements,
|
||||||
my_chars_compressed = std::move(chars_compressed),
|
chars_compressed_size + offsets_compressed_size,
|
||||||
my_offsets_compressed = std::move(offsets_compressed),
|
[my_chars_compressed = std::move(chars_compressed),
|
||||||
source_chars_size,
|
my_offsets_compressed = std::move(offsets_compressed),
|
||||||
source_offsets_elements
|
source_chars_size,
|
||||||
]
|
source_offsets_elements,
|
||||||
|
offsets_were_compressed]
|
||||||
{
|
{
|
||||||
auto res = ColumnString::create();
|
auto res = ColumnString::create();
|
||||||
|
|
||||||
@ -664,8 +677,18 @@ ColumnPtr ColumnString::compress() const
|
|||||||
ColumnCompressed::decompressBuffer(
|
ColumnCompressed::decompressBuffer(
|
||||||
my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
|
my_chars_compressed->data(), res->getChars().data(), my_chars_compressed->size(), source_chars_size);
|
||||||
|
|
||||||
ColumnCompressed::decompressBuffer(
|
if (offsets_were_compressed)
|
||||||
my_offsets_compressed->data(), res->getOffsets().data(), my_offsets_compressed->size(), source_offsets_elements * sizeof(Offset));
|
{
|
||||||
|
ColumnCompressed::decompressBuffer(
|
||||||
|
my_offsets_compressed->data(),
|
||||||
|
res->getOffsets().data(),
|
||||||
|
my_offsets_compressed->size(),
|
||||||
|
source_offsets_elements * sizeof(Offset));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memcpy(res->getOffsets().data(), my_offsets_compressed->data(), my_offsets_compressed->size());
|
||||||
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
});
|
});
|
||||||
|
@ -29,6 +29,8 @@ public:
|
|||||||
using Char = UInt8;
|
using Char = UInt8;
|
||||||
using Chars = PaddedPODArray<UInt8>;
|
using Chars = PaddedPODArray<UInt8>;
|
||||||
|
|
||||||
|
static constexpr size_t min_size_to_compress = 4096;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class COWHelper<IColumnHelper<ColumnString>, ColumnString>;
|
friend class COWHelper<IColumnHelper<ColumnString>, ColumnString>;
|
||||||
|
|
||||||
@ -272,7 +274,7 @@ public:
|
|||||||
|
|
||||||
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
ColumnPtr replicate(const Offsets & replicate_offsets) const override;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
void reserve(size_t n) override;
|
void reserve(size_t n) override;
|
||||||
size_t capacity() const override;
|
size_t capacity() const override;
|
||||||
|
@ -796,7 +796,7 @@ void ColumnTuple::takeDynamicStructureFromSourceColumns(const Columns & source_c
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ColumnPtr ColumnTuple::compress() const
|
ColumnPtr ColumnTuple::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (columns.empty())
|
||||||
{
|
{
|
||||||
@ -812,7 +812,7 @@ ColumnPtr ColumnTuple::compress() const
|
|||||||
compressed.reserve(columns.size());
|
compressed.reserve(columns.size());
|
||||||
for (const auto & column : columns)
|
for (const auto & column : columns)
|
||||||
{
|
{
|
||||||
auto compressed_column = column->compress();
|
auto compressed_column = column->compress(force_compression);
|
||||||
byte_size += compressed_column->byteSize();
|
byte_size += compressed_column->byteSize();
|
||||||
compressed.emplace_back(std::move(compressed_column));
|
compressed.emplace_back(std::move(compressed_column));
|
||||||
}
|
}
|
||||||
|
@ -125,7 +125,7 @@ public:
|
|||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
bool isCollationSupported() const override;
|
bool isCollationSupported() const override;
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
void finalize() override;
|
void finalize() override;
|
||||||
bool isFinalized() const override;
|
bool isFinalized() const override;
|
||||||
|
|
||||||
|
@ -1426,16 +1426,16 @@ bool ColumnVariant::dynamicStructureEquals(const IColumn & rhs) const
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnVariant::compress() const
|
ColumnPtr ColumnVariant::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
ColumnPtr local_discriminators_compressed = local_discriminators->compress();
|
ColumnPtr local_discriminators_compressed = local_discriminators->compress(force_compression);
|
||||||
ColumnPtr offsets_compressed = offsets->compress();
|
ColumnPtr offsets_compressed = offsets->compress(force_compression);
|
||||||
size_t byte_size = local_discriminators_compressed->byteSize() + offsets_compressed->byteSize();
|
size_t byte_size = local_discriminators_compressed->byteSize() + offsets_compressed->byteSize();
|
||||||
Columns compressed;
|
Columns compressed;
|
||||||
compressed.reserve(variants.size());
|
compressed.reserve(variants.size());
|
||||||
for (const auto & variant : variants)
|
for (const auto & variant : variants)
|
||||||
{
|
{
|
||||||
auto compressed_variant = variant->compress();
|
auto compressed_variant = variant->compress(force_compression);
|
||||||
byte_size += compressed_variant->byteSize();
|
byte_size += compressed_variant->byteSize();
|
||||||
compressed.emplace_back(std::move(compressed_variant));
|
compressed.emplace_back(std::move(compressed_variant));
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ public:
|
|||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
double getRatioOfDefaultRows(double sample_ratio) const override;
|
double getRatioOfDefaultRows(double sample_ratio) const override;
|
||||||
UInt64 getNumberOfDefaultRows() const override;
|
UInt64 getNumberOfDefaultRows() const override;
|
||||||
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
|
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
|
||||||
|
@ -951,7 +951,7 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
ColumnPtr ColumnVector<T>::compress() const
|
ColumnPtr ColumnVector<T>::compress(bool force_compression) const
|
||||||
{
|
{
|
||||||
const size_t data_size = data.size();
|
const size_t data_size = data.size();
|
||||||
const size_t source_size = data_size * sizeof(T);
|
const size_t source_size = data_size * sizeof(T);
|
||||||
@ -960,7 +960,7 @@ ColumnPtr ColumnVector<T>::compress() const
|
|||||||
if (source_size < 4096) /// A wild guess.
|
if (source_size < 4096) /// A wild guess.
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
|
||||||
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, false);
|
auto compressed = ColumnCompressed::compressBuffer(data.data(), source_size, force_compression);
|
||||||
|
|
||||||
if (!compressed)
|
if (!compressed)
|
||||||
return ColumnCompressed::wrap(this->getPtr());
|
return ColumnCompressed::wrap(this->getPtr());
|
||||||
|
@ -287,7 +287,7 @@ public:
|
|||||||
|
|
||||||
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
|
ColumnPtr createWithOffsets(const IColumn::Offsets & offsets, const ColumnConst & column_with_default_value, size_t total_rows, size_t shift) const override;
|
||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress(bool force_compression) const override;
|
||||||
|
|
||||||
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
|
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
|
||||||
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);
|
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);
|
||||||
|
@ -601,7 +601,8 @@ public:
|
|||||||
|
|
||||||
/// Compress column in memory to some representation that allows to decompress it back.
|
/// Compress column in memory to some representation that allows to decompress it back.
|
||||||
/// Return itself if compression is not applicable for this column type.
|
/// Return itself if compression is not applicable for this column type.
|
||||||
[[nodiscard]] virtual Ptr compress() const
|
/// The flag `force_compression` indicates that compression should be performed even if it's not efficient (if only compression factor < 1).
|
||||||
|
[[nodiscard]] virtual Ptr compress([[maybe_unused]] bool force_compression) const
|
||||||
{
|
{
|
||||||
/// No compression by default.
|
/// No compression by default.
|
||||||
return getPtr();
|
return getPtr();
|
||||||
|
88
src/Columns/tests/gtest_column_string.cpp
Normal file
88
src/Columns/tests/gtest_column_string.cpp
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
|
||||||
|
#include <Common/randomSeed.h>
|
||||||
|
#include <Common/thread_local_rng.h>
|
||||||
|
|
||||||
|
using namespace DB;
|
||||||
|
|
||||||
|
static pcg64 rng(randomSeed());
|
||||||
|
|
||||||
|
constexpr size_t bytes_per_string = sizeof(uint64_t) + 1;
|
||||||
|
/// Column should have enough bytes to be compressed
|
||||||
|
constexpr size_t column_size = ColumnString::min_size_to_compress / bytes_per_string + 42;
|
||||||
|
|
||||||
|
TEST(ColumnString, Incompressible)
|
||||||
|
{
|
||||||
|
auto col = ColumnString::create();
|
||||||
|
auto & chars = col->getChars();
|
||||||
|
auto & offsets = col->getOffsets();
|
||||||
|
chars.resize(column_size * bytes_per_string);
|
||||||
|
for (size_t i = 0; i < column_size; ++i)
|
||||||
|
{
|
||||||
|
const uint64_t value = rng();
|
||||||
|
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
|
||||||
|
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
|
||||||
|
offsets.push_back((i + 1) * bytes_per_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto compressed = col->compress(true);
|
||||||
|
auto decompressed = compressed->decompress();
|
||||||
|
// When column is incompressible, we return the original column wrapped in CompressedColumn
|
||||||
|
ASSERT_EQ(decompressed.get(), col.get());
|
||||||
|
ASSERT_EQ(compressed->size(), col->size());
|
||||||
|
ASSERT_EQ(compressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
ASSERT_EQ(decompressed->size(), col->size());
|
||||||
|
ASSERT_EQ(decompressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ColumnString, CompressibleCharsAndIncompressibleOffsets)
|
||||||
|
{
|
||||||
|
auto col = ColumnString::create();
|
||||||
|
auto & chars = col->getChars();
|
||||||
|
auto & offsets = col->getOffsets();
|
||||||
|
chars.resize(column_size * bytes_per_string);
|
||||||
|
for (size_t i = 0; i < column_size; ++i)
|
||||||
|
{
|
||||||
|
static const uint64_t value = 42;
|
||||||
|
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
|
||||||
|
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
|
||||||
|
}
|
||||||
|
offsets.push_back(chars.size());
|
||||||
|
|
||||||
|
auto compressed = col->compress(true);
|
||||||
|
auto decompressed = compressed->decompress();
|
||||||
|
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
|
||||||
|
// Upon decompression, a new column is created.
|
||||||
|
ASSERT_NE(decompressed.get(), col.get());
|
||||||
|
ASSERT_EQ(compressed->size(), col->size());
|
||||||
|
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
ASSERT_EQ(decompressed->size(), col->size());
|
||||||
|
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(ColumnString, CompressibleCharsAndCompressibleOffsets)
|
||||||
|
{
|
||||||
|
auto col = ColumnString::create();
|
||||||
|
auto & chars = col->getChars();
|
||||||
|
auto & offsets = col->getOffsets();
|
||||||
|
chars.resize(column_size * bytes_per_string);
|
||||||
|
for (size_t i = 0; i < column_size; ++i)
|
||||||
|
{
|
||||||
|
static const uint64_t value = 42;
|
||||||
|
memcpy(&chars[i * bytes_per_string], &value, sizeof(uint64_t));
|
||||||
|
chars[i * bytes_per_string + sizeof(uint64_t)] = '\0';
|
||||||
|
offsets.push_back((i + 1) * bytes_per_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto compressed = col->compress(true);
|
||||||
|
auto decompressed = compressed->decompress();
|
||||||
|
// For actually compressed column only compressed `chars` and `offsets` arrays are stored.
|
||||||
|
// Upon decompression, a new column is created.
|
||||||
|
ASSERT_NE(decompressed.get(), col.get());
|
||||||
|
ASSERT_EQ(compressed->size(), col->size());
|
||||||
|
ASSERT_LE(compressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
ASSERT_EQ(decompressed->size(), col->size());
|
||||||
|
ASSERT_LE(decompressed->allocatedBytes(), col->allocatedBytes());
|
||||||
|
}
|
@ -616,7 +616,7 @@ Block Block::compress() const
|
|||||||
size_t num_columns = data.size();
|
size_t num_columns = data.size();
|
||||||
Columns new_columns(num_columns);
|
Columns new_columns(num_columns);
|
||||||
for (size_t i = 0; i < num_columns; ++i)
|
for (size_t i = 0; i < num_columns; ++i)
|
||||||
new_columns[i] = data[i].column->compress();
|
new_columns[i] = data[i].column->compress(/*force_compression=*/false);
|
||||||
return cloneWithColumns(new_columns);
|
return cloneWithColumns(new_columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,7 +469,7 @@ void QueryCache::Writer::finalizeWrite()
|
|||||||
Columns compressed_columns;
|
Columns compressed_columns;
|
||||||
for (const auto & column : columns)
|
for (const auto & column : columns)
|
||||||
{
|
{
|
||||||
auto compressed_column = column->compress();
|
auto compressed_column = column->compress(/*force_compression=*/false);
|
||||||
compressed_columns.push_back(compressed_column);
|
compressed_columns.push_back(compressed_column);
|
||||||
}
|
}
|
||||||
Chunk compressed_chunk(compressed_columns, chunk.getNumRows());
|
Chunk compressed_chunk(compressed_columns, chunk.getNumRows());
|
||||||
|
@ -91,8 +91,7 @@ public:
|
|||||||
{
|
{
|
||||||
Block compressed_block;
|
Block compressed_block;
|
||||||
for (const auto & elem : block)
|
for (const auto & elem : block)
|
||||||
compressed_block.insert({ elem.column->compress(), elem.type, elem.name });
|
compressed_block.insert({elem.column->compress(/*force_compression=*/true), elem.type, elem.name});
|
||||||
|
|
||||||
new_blocks.push_back(std::move(compressed_block));
|
new_blocks.push_back(std::move(compressed_block));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -259,7 +258,7 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
|
|||||||
{
|
{
|
||||||
if ((*memory_settings)[MemorySetting::compress])
|
if ((*memory_settings)[MemorySetting::compress])
|
||||||
for (auto & elem : block)
|
for (auto & elem : block)
|
||||||
elem.column = elem.column->compress();
|
elem.column = elem.column->compress(/*force_compression=*/true);
|
||||||
|
|
||||||
out.push_back(block);
|
out.push_back(block);
|
||||||
}
|
}
|
||||||
@ -574,7 +573,7 @@ void StorageMemory::restoreDataImpl(const BackupPtr & backup, const String & dat
|
|||||||
{
|
{
|
||||||
Block compressed_block;
|
Block compressed_block;
|
||||||
for (const auto & elem : block)
|
for (const auto & elem : block)
|
||||||
compressed_block.insert({ elem.column->compress(), elem.type, elem.name });
|
compressed_block.insert({elem.column->compress(/*force_compression=*/true), elem.type, elem.name});
|
||||||
|
|
||||||
new_blocks.push_back(std::move(compressed_block));
|
new_blocks.push_back(std::move(compressed_block));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user