In memory compression: a prototype

This commit is contained in:
Alexey Milovidov 2021-02-07 04:41:31 +03:00
parent 263d751d64
commit d539948fe7
8 changed files with 207 additions and 42 deletions

View File

@ -16,6 +16,9 @@
#include <common/unaligned.h>
#include <ext/bit_cast.h>
#include <ext/scope_guard.h>
#include <lz4.h>
#include <Compression/LZ4_decompress_faster.h>
#include <IO/BufferWithOwnMemory.h>
#include <cmath>
#include <cstring>
@ -32,6 +35,8 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
}
template <typename T>
@ -520,6 +525,52 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
max = NearestFieldType<T>(cur_max);
}
#pragma GCC diagnostic ignored "-Wold-style-cast"
template <typename T>
LazyColumn ColumnVector<T>::compress() const
{
size_t source_size = data.size() * sizeof(T);
size_t max_dest_size = LZ4_COMPRESSBOUND(source_size);
if (max_dest_size > std::numeric_limits<int>::max())
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column of size {}", formatReadableSizeWithBinarySuffix(source_size));
auto compressed = std::make_shared<Memory<>>(max_dest_size);
auto compressed_size = LZ4_compress_default(
reinterpret_cast<const char *>(data.data()),
compressed->data(),
source_size,
max_dest_size);
if (compressed_size <= 0)
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress column");
/// If compression is inefficient.
if (static_cast<size_t>(compressed_size) * 2 > source_size)
return IColumn::compress();
/// Shrink to fit.
auto shrank = std::make_shared<Memory<>>(compressed_size);
memcpy(shrank->data(), compressed->data(), compressed_size);
return [compressed = std::move(shrank), column_size = data.size()]
{
auto res = ColumnVector<T>::create(column_size);
auto processed_size = LZ4_decompress_fast(
compressed->data(),
reinterpret_cast<char *>(res->getData().data()),
column_size * sizeof(T));
if (processed_size <= 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress column");
return res;
};
}
/// Explicit template instantiations - to avoid code bloat in headers.
template class ColumnVector<UInt8>;
template class ColumnVector<UInt16>;

View File

@ -298,6 +298,8 @@ public:
return typeid(rhs) == typeid(ColumnVector<T>);
}
LazyColumn compress() const override;
/// Replace elements that match the filter with zeroes. If inverted replaces not matched elements.
void applyZeroMap(const IColumn::Filter & filt, bool inverted = false);

View File

@ -357,6 +357,14 @@ public:
throw Exception("Method structureEquals is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Compress column in memory to some representation that allows to decompress it back.
using Lazy = std::function<Ptr()>;
virtual Lazy compress() const
{
/// No compression by default, just wrap the object.
return [column = getPtr()] { return column; };
}
static MutablePtr mutate(Ptr ptr)
{
@ -462,6 +470,9 @@ using MutableColumns = std::vector<MutableColumnPtr>;
using ColumnRawPtrs = std::vector<const IColumn *>;
//using MutableColumnRawPtrs = std::vector<IColumn *>;
using LazyColumn = IColumn::Lazy;
using LazyColumns = std::vector<LazyColumn>;
template <typename ... Args>
struct IsMutableColumns;

View File

@ -0,0 +1,36 @@
#include <Storages/MemorySettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
void MemorySettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
#define MEMORY_SETTINGS(M) \
M(Bool, compress, true, "Compress data in memory", 0) \
DECLARE_SETTINGS_TRAITS(memorySettingsTraits, MEMORY_SETTINGS)
/** Settings for the Memory engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct MemorySettings : public BaseSettings<memorySettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -6,6 +6,7 @@
#include <Interpreters/MutationsInterpreter.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMemory.h>
#include <Storages/MemorySettings.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
@ -23,7 +24,7 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(std::shared_ptr<const Blocks> &)>;
using InitializerFunc = std::function<void(std::shared_ptr<const LazyBlocks> &)>;
public:
/// Blocks are stored in std::list which may be appended in another thread.
/// We use pointer to the beginning of the list and its current size.
@ -34,7 +35,7 @@ public:
Names column_names_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot,
std::shared_ptr<const Blocks> data_,
std::shared_ptr<const LazyBlocks> data_,
std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
InitializerFunc initializer_func_ = {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
@ -43,6 +44,8 @@ public:
, parallel_execution_index(parallel_execution_index_)
, initializer_func(std::move(initializer_func_))
{
for (const auto & elem : column_names_and_types)
column_positions.push_back(metadata_snapshot->getSampleBlock().getPositionByName(elem.getNameInStorage()));
}
String getName() const override { return "Memory"; }
@ -63,21 +66,25 @@ protected:
return {};
}
const Block & src = (*data)[current_index];
const LazyBlock & src = (*data)[current_index];
Columns columns;
columns.reserve(columns.size());
/// Add only required columns to `res`.
size_t i = 0;
for (const auto & elem : column_names_and_types)
{
auto current_column = src.getByName(elem.getNameInStorage()).column;
auto current_column = src[column_positions[i]]();
if (elem.isSubcolumn())
columns.emplace_back(elem.getTypeInStorage()->getSubcolumn(elem.getSubcolumnName(), *current_column));
else
columns.emplace_back(std::move(current_column));
++i;
}
return Chunk(std::move(columns), src.rows());
size_t rows = columns.at(0)->size();
return Chunk(std::move(columns), rows);
}
private:
@ -95,9 +102,10 @@ private:
const NamesAndTypesList column_names_and_types;
size_t execution_index = 0;
std::shared_ptr<const Blocks> data;
std::shared_ptr<const LazyBlocks> data;
std::shared_ptr<std::atomic<size_t>> parallel_execution_index;
InitializerFunc initializer_func;
std::vector<size_t> column_positions;
};
@ -149,8 +157,12 @@ private:
};
StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
: IStorage(table_id_), data(std::make_unique<const Blocks>())
StorageMemory::StorageMemory(
const StorageID & table_id_,
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
bool compress_)
: IStorage(table_id_), data(std::make_unique<const LazyBlocks>()), compress(compress_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(std::move(columns_description_));
@ -186,7 +198,7 @@ Pipe StorageMemory::read(
metadata_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[this](std::shared_ptr<const Blocks> & data_to_initialize)
[this](std::shared_ptr<const LazyBlocks> & data_to_initialize)
{
data_to_initialize = data.get();
}));
@ -219,18 +231,18 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag
void StorageMemory::drop()
{
data.set(std::make_unique<Blocks>());
data.set(std::make_unique<LazyBlocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
static inline void updateBlockData(Block & old_block, const Block & new_block)
static inline void updateBlockData(LazyBlock & old_block, const LazyBlock & new_block, const Block & old_header, const Block & new_header)
{
for (const auto & it : new_block)
size_t i = 0;
for (const auto & it : new_header)
{
auto col_name = it.name;
auto & col_with_type_name = old_block.getByName(col_name);
col_with_type_name.column = it.column;
old_block[old_header.getPositionByName(it.name)] = new_block[i];
++i;
}
}
@ -242,36 +254,47 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto in = interpreter->execute();
Block old_header = metadata_snapshot->getSampleBlock();
Block mutation_header = in->getHeader();
in->readPrefix();
Blocks out;
Block block;
while ((block = in->read()))
LazyBlocks out;
while (Block block = in->read())
{
out.push_back(block);
LazyColumns lazy_columns;
for (const auto & elem : block)
{
if (compress)
lazy_columns.emplace_back(elem.column->compress());
else
lazy_columns.emplace_back([=]{ return elem.column; });
}
out.emplace_back(std::move(lazy_columns));
}
in->readSuffix();
std::unique_ptr<Blocks> new_data;
std::unique_ptr<LazyBlocks> new_data;
// all column affected
/// All columns affected.
if (interpreter->isAffectingAllColumns())
{
new_data = std::make_unique<Blocks>(out);
new_data = std::make_unique<LazyBlocks>(out);
}
else
{
/// just some of the column affected, we need update it with new column
new_data = std::make_unique<Blocks>(*(data.get()));
/// Just some of the columns affected, we need update it with new column.
new_data = std::make_unique<LazyBlocks>(*(data.get()));
auto data_it = new_data->begin();
auto out_it = out.begin();
while (data_it != new_data->end())
{
/// Mutation does not change the number of blocks
/// Mutation does not change the number of blocks.
assert(out_it != out.end());
updateBlockData(*data_it, *out_it);
updateBlockData(*data_it, *out_it, old_header, mutation_header);
++data_it;
++out_it;
}
@ -279,7 +302,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
assert(out_it == out.end());
}
size_t rows = 0;
/* size_t rows = 0;
size_t bytes = 0;
for (const auto & buffer : *new_data)
{
@ -287,7 +310,8 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
bytes += buffer.bytes();
}
total_size_bytes.store(rows, std::memory_order_relaxed);
total_size_rows.store(bytes, std::memory_order_relaxed);
total_size_rows.store(bytes, std::memory_order_relaxed);*/
data.set(std::move(new_data));
}
@ -295,7 +319,7 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
data.set(std::make_unique<Blocks>());
data.set(std::make_unique<LazyBlocks>());
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
@ -317,13 +341,19 @@ void registerStorageMemory(StorageFactory & factory)
factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Engine {} doesn't support any arguments ({} given)",
args.engine_name, args.engine_args.size());
return StorageMemory::create(args.table_id, args.columns, args.constraints);
bool has_settings = args.storage_def->settings;
MemorySettings settings;
if (has_settings)
settings.loadFromQuery(*args.storage_def);
return StorageMemory::create(args.table_id, args.columns, args.constraints, settings.compress);
},
{
.supports_settings = true,
.supports_parallel_insert = true,
});
}

View File

@ -15,6 +15,11 @@
namespace DB
{
/// Lazy block contains possibly compressed columns. LazyColumn is std::function that reconstructs Column on call.
using LazyBlock = LazyColumns;
using LazyBlocks = std::vector<LazyBlock>;
/** Implements storage in the RAM.
* Suitable for temporary data.
* It does not support keys.
@ -95,7 +100,8 @@ public:
private:
/// MultiVersion data storage, so that we can copy the list of blocks to readers.
MultiVersion<Blocks> data;
MultiVersion<LazyBlocks> data;
mutable std::mutex mutex;
@ -104,8 +110,14 @@ private:
std::atomic<size_t> total_size_bytes = 0;
std::atomic<size_t> total_size_rows = 0;
bool compress;
protected:
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
StorageMemory(
const StorageID & table_id_,
ColumnsDescription columns_description_,
ConstraintsDescription constraints_,
bool compress_ = false);
};
}

View File

@ -242,15 +242,12 @@ void registerStorageSet(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
bool has_settings = args.storage_def->settings;
auto set_settings = std::make_unique<SetSettings>();
SetSettings set_settings;
if (has_settings)
{
set_settings->loadFromQuery(*args.storage_def);
}
set_settings.loadFromQuery(*args.storage_def);
DiskPtr disk = args.context.getDisk(set_settings->disk);
return StorageSet::create(disk, args.relative_data_path, args.table_id, args.columns, args.constraints, set_settings->persistent);
DiskPtr disk = args.context.getDisk(set_settings.disk);
return StorageSet::create(disk, args.relative_data_path, args.table_id, args.columns, args.constraints, set_settings.persistent);
}, StorageFactory::StorageFeatures{ .supports_settings = true, });
}