This commit is contained in:
Nikita Vasilev 2019-10-25 21:06:08 +03:00
parent 22dfc611c9
commit c1af83d239
3 changed files with 490 additions and 0 deletions

View File

@ -0,0 +1,80 @@
#include "SSDCacheDictionary.h"
#include <Columns/ColumnsNumber.h>
namespace DB
{
BlockFile::BlockFile(size_t file_id, const std::string & file_name, const Block & header, size_t buffer_size)
: id(file_id), file_name(file_name), buffer_size(buffer_size), out_file(file_name, buffer_size), in_file(file_name), header(header), buffer(header.cloneEmptyColumns())
{
}
void BlockFile::appendBlock(const Block & block)
{
size_t bytes = 0;
const auto new_columns = block.getColumns();
if (new_columns.size() != buffer.size())
{
throw Exception("Wrong size of block in BlockFile::appendBlock(). It's a bug.", ErrorCodes::TYPE_MISMATCH);
}
const auto id_column = typeid_cast<const ColumnUInt64 *>(new_columns.front().get());
if (!id_column)
throw Exception{"id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
size_t start_size = buffer.front()->size();
for (size_t i = 0; i < header.columns(); ++i)
{
buffer[i]->insertRangeFrom(*new_columns[i], 0, new_columns[i]->size());
bytes += buffer[i]->byteSize();
}
const auto & ids = id_column->getData();
for (size_t i = 0; i < new_columns.size(); ++i)
{
key_to_file_offset[ids[i]] = start_size + i;
}
if (bytes >= buffer_size)
{
flush();
}
}
void BlockFile::flush()
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(buffer.front().get());
if (!id_column)
throw Exception{"id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
key_to_file_offset[ids[0]] = out_file.getPositionInFile() + (1ULL << FILE_OFFSET_SIZE);
size_t prev_size = 0;
for (size_t row = 0; row < buffer.front()->size(); ++row)
{
key_to_file_offset[ids[row]] = key_to_file_offset[ids[row ? row - 1 : 0]] + prev_size;
prev_size = 0;
for (size_t col = 0; col < header.columns(); ++col)
{
const auto & column = buffer[col];
const auto & type = header.getByPosition(col).type;
type->serializeBinary(*column, row, out_file);
if (type->getTypeId() != TypeIndex::String) {
prev_size += column->sizeOfValueIfFixed();
} else {
prev_size += column->getDataAt(row).size + sizeof(UInt64);
}
}
}
if (out_file.hasPendingData()) {
out_file.sync();
}
buffer = header.cloneEmptyColumns();
}
}

View File

@ -0,0 +1,365 @@
#pragma once
#include <atomic>
#include <chrono>
#include <shared_mutex>
#include <variant>
#include <vector>
#include <Core/Block.h>
#include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <pcg_random.hpp>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
#include "DictionaryStructure.h"
#include "IDictionary.h"
#include "IDictionarySource.h"
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
constexpr size_t OFFSET_MASK = ~0xffff000000000000;
constexpr size_t FILE_ID_SIZE = 16;
constexpr size_t FILE_OFFSET_SIZE = sizeof(size_t) * 8 - FILE_ID_SIZE;
class SSDCacheDictionary;
class BlockFile
{
public:
using Offset = size_t;
using Offsets = std::vector<Offset>;
BlockFile(size_t file_id, const std::string & file_name, const Block & header, size_t buffer_size = 4 * 1024 * 1024);
void appendBlock(const Block & block);
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out>
void getValue(size_t column, const PaddedPODArray<UInt64> & ids, ResultArrayType<Out> & out, PaddedPODArray<UInt64> & not_found) const;
// TODO:: getString
private:
void flush();
size_t id;
std::string file_name;
size_t buffer_size;
WriteBufferFromFile out_file; // 4MB
mutable ReadBufferFromFile in_file; // ssd page size TODO:: adaptive buffer (read two if there less than pagesize bytes)
/// Block structure: Key, (Default + TTL), Attr1, Attr2, ...
Block header;
std::unordered_map<UInt64, size_t> key_to_file_offset;
MutableColumns buffer;
};
class BlockFilesController
{
BlockFilesController(const std::string & path) : path(path) {
}
void appendBlock(const Block& block) {
file->appendBlock(block);
}
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
template <typename Out>
void getValue(size_t column, const PaddedPODArray<UInt64> & ids, ResultArrayType<Out> & out, PaddedPODArray<UInt64> & not_found) const {
file->getValue(column, ids, out, not_found);
}
// getString();
private:
const std::string path;
std::unique_ptr<BlockFile> file;
};
class SSDCacheDictionary final : public IDictionary
{
public:
SSDCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const size_t size_);
std::string getName() const override { return name; }
std::string getTypeName() const override { return "SSDCache"; }
size_t getBytesAllocated() const override { return bytes_allocated + (string_arena ? string_arena->size() : 0); } // TODO: ?
size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); }
double getHitRate() const override
{
return static_cast<double>(hit_count.load(std::memory_order_acquire)) / query_count.load(std::memory_order_relaxed);
}
size_t getElementCount() const override { return element_count.load(std::memory_order_relaxed); }
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; } // TODO: fix
bool isCached() const override { return true; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<SSDCacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
}
bool hasHierarchy() const override { return hierarchical_attribute; }
void toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const override;
void isInVectorVector(
const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
std::exception_ptr getLastException() const override;
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ColumnString * out) const;
#define DECLARE(TYPE) \
void get##TYPE( \
const std::string & attribute_name, \
const PaddedPODArray<Key> & ids, \
const PaddedPODArray<TYPE> & def, \
ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void
getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const ColumnString * const def, ColumnString * const out)
const;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const TYPE def, ResultArrayType<TYPE> & out) const;
DECLARE(UInt8)
DECLARE(UInt16)
DECLARE(UInt32)
DECLARE(UInt64)
DECLARE(UInt128)
DECLARE(Int8)
DECLARE(Int16)
DECLARE(Int32)
DECLARE(Int64)
DECLARE(Float32)
DECLARE(Float64)
DECLARE(Decimal32)
DECLARE(Decimal64)
DECLARE(Decimal128)
#undef DECLARE
void getString(const std::string & attribute_name, const PaddedPODArray<Key> & ids, const String & def, ColumnString * const out) const;
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value>
using ContainerType = Value[];
template <typename Value>
using ContainerPtrType = std::unique_ptr<ContainerType<Value>>;
struct CellMetadata final
{
using time_point_t = std::chrono::system_clock::time_point;
using time_point_rep_t = time_point_t::rep;
using time_point_urep_t = std::make_unsigned_t<time_point_rep_t>;
static constexpr UInt64 EXPIRES_AT_MASK = std::numeric_limits<time_point_rep_t>::max();
static constexpr UInt64 IS_DEFAULT_MASK = ~EXPIRES_AT_MASK;
UInt64 id;
/// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data;
/// Sets expiration time, resets `is_default` flag to false
time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); }
void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); }
bool isDefault() const { return (data & IS_DEFAULT_MASK) == IS_DEFAULT_MASK; }
void setDefault() { data |= IS_DEFAULT_MASK; }
};
struct Attribute final
{
AttributeUnderlyingType type;
std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
String>
null_values;
std::variant<
ContainerPtrType<UInt8>,
ContainerPtrType<UInt16>,
ContainerPtrType<UInt32>,
ContainerPtrType<UInt64>,
ContainerPtrType<UInt128>,
ContainerPtrType<Int8>,
ContainerPtrType<Int16>,
ContainerPtrType<Int32>,
ContainerPtrType<Int64>,
ContainerPtrType<Decimal32>,
ContainerPtrType<Decimal64>,
ContainerPtrType<Decimal128>,
ContainerPtrType<Float32>,
ContainerPtrType<Float64>,
ContainerPtrType<StringRef>>
arrays;
};
void createAttributes();
Attribute createAttributeWithType(const AttributeUnderlyingType type, const Field & null_value);
template <typename AttributeType, typename OutputType, typename DefaultGetter>
void getItemsNumberImpl(
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsString(Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const;
PaddedPODArray<Key> getCachedIds() const;
bool isEmptyCell(const UInt64 idx) const;
size_t getCellIdx(const Key id) const;
void setDefaultAttributeValue(Attribute & attribute, const Key idx) const;
void setAttributeValue(Attribute & attribute, const Key idx, const Field & value) const;
Attribute & getAttribute(const std::string & attribute_name) const;
struct FindResult
{
const size_t cell_idx;
const bool valid;
const bool outdated;
};
FindResult findCellIdx(const Key & id, const CellMetadata::time_point_t now) const;
template <typename AncestorType>
void isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const;
const std::string name;
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
Logger * const log;
mutable std::shared_mutex rw_lock;
/// Actual size will be increased to match power of 2
const size_t size;
/// all bits to 1 mask (size - 1) (0b1000 - 1 = 0b111)
const size_t size_overlap_mask;
/// Max tries to find cell, overlaped with mask: if size = 16 and start_cell=10: will try cells: 10,11,12,13,14,15,0,1,2,3
static constexpr size_t max_collision_length = 10;
const size_t zero_cell_idx{getCellIdx(0)};
std::map<std::string, size_t> attribute_index_by_name;
mutable std::vector<Attribute> attributes;
mutable std::vector<CellMetadata> cells;
Attribute * hierarchical_attribute = nullptr;
std::unique_ptr<ArenaWithFreeLists> string_arena;
mutable std::exception_ptr last_exception;
mutable size_t error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable pcg64 rnd_engine;
mutable size_t bytes_allocated = 0;
mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
}
#include "SSDCacheDictionary.inc.h"

View File

@ -0,0 +1,45 @@
#pragma once
namespace DB {
template<typename Out>
void BlockFile::getValue(size_t column, const PaddedPODArray<UInt64> & ids, ResultArrayType<Out> & out, PaddedPODArray<UInt64> & not_found) const
{
std::vector<std::pair<size_t, size_t>> offsets;
offsets.reserve(ids.size());
for (size_t i = 0; i < ids.size(); ++i)
{
auto it = key_to_file_offset.find(ids[i]);
if (it != std::end(key_to_file_offset))
{
offsets.emplace_back(it->second, i);
}
else
{
not_found.push_back(i);
}
}
std::sort(std::begin(offsets), std::end(offsets));
Field field;
for (const auto & [offset, index] : offsets)
{
if (offset & OFFSET_MASK)
{
in_file.seek(offset && !OFFSET_MASK);
for (size_t col = 0; col < column; ++col)
{
const auto & type = header.getByPosition(column).type;
type->deserializeBinary(field, in_file);
}
}
else
{
buffer[column]->get(offset, field);
}
out[index] = DB::get<Out>(field);
}
}
}