This commit is contained in:
Nikita Vasilev 2020-05-23 11:07:24 +03:00
parent c26144968a
commit 05938e562c
3 changed files with 79 additions and 150 deletions

View File

@ -263,7 +263,6 @@ size_t SSDCachePartition::appendBlock(
Index cache_index;
cache_index.setInMemory(true);
cache_index.setBlockId(current_memory_block_id);
// Poco::Logger::get("wr").information(" block mem: " + std::to_string(current_memory_block_id) + " wb: " + std::to_string(write_buffer_size));
if (current_memory_block_id >= write_buffer_size)
throw DB::Exception("lel " + std::to_string(current_memory_block_id) + " " +
std::to_string(write_buffer_size) + " " + std::to_string(index), ErrorCodes::LOGICAL_ERROR);
@ -338,7 +337,6 @@ size_t SSDCachePartition::appendBlock(
if (!flushed)
{
// Poco::Logger::get("wr").information(" set: " + std::to_string(cache_index.getBlockId()) + " " + std::to_string(cache_index.getAddressInBlock()));
key_to_index.set(ids[index], cache_index);
ids_buffer.push_back(ids[index]);
++index;
@ -349,7 +347,6 @@ size_t SSDCachePartition::appendBlock(
init_write_buffer();
}
}
// Poco::Logger::get("wr").information("exit");
return ids.size() - begin;
}
@ -362,7 +359,6 @@ void SSDCachePartition::flush()
if (ids.empty())
return;
Poco::Logger::get("paritiiton").information("flushing to SSD.");
// Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!! " + std::to_string(file_id) + " block: " + std::to_string(current_file_block_id));
AIOContext aio_context{1};
@ -426,7 +422,6 @@ void SSDCachePartition::flush()
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
{
index.setInMemory(false);
// Poco::Logger::get("pt").information("block: " + std::to_string(index.getBlockId()) + " " + std::to_string(current_file_block_id) + " ");
index.setBlockId((current_file_block_id % max_size) + index.getBlockId());
}
key_to_index.set(id, index);
@ -571,7 +566,6 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
Memory read_buffer(block_size * read_buffer_size, BUFFER_ALIGNMENT);
// TODO: merge requests
std::vector<iocb> requests;
std::vector<iocb*> pointers;
std::vector<std::vector<size_t>> blocks_to_indices;
@ -601,10 +595,6 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(read_buffer.data()) + block_size * (requests.size() % read_buffer_size);
request.aio_nbytes = block_size;
// Poco::Logger::get("RR").information("block found" + std::to_string(index_to_out[i].first.getBlockId()) + " max_size" + std::to_string(max_size));
// if (index_to_out[i].first.getBlockId() > max_size) {
// throw DB::Exception("kek", ErrorCodes::LOGICAL_ERROR);
// }
request.aio_offset = index_to_out[i].first.getBlockId() * block_size;
request.aio_data = requests.size();
#endif
@ -619,16 +609,12 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
std::vector<bool> processed(requests.size(), false);
std::vector<io_event> events(requests.size());
for (auto & event : events)
event.res = -1; // TODO: remove
event.res = -1;
size_t to_push = 0;
size_t to_pop = 0;
while (to_pop < requests.size())
{
// Poco::Logger::get("RR").information(
// "push = " + std::to_string(to_push) + " pop=" + std::to_string(to_pop) +
// "bi = " + std::to_string(blocks_to_indices.size()) + " req = " + std::to_string(requests.size()));
/// get io tasks from previous iteration
int popped = 0;
while (to_pop < to_push && (popped = io_getevents(aio_context.ctx, to_push - to_pop, to_push - to_pop, &events[to_pop], nullptr)) <= 0)
{
@ -681,13 +667,11 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray<Index> & indice
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
to_push += pushed;
// Poco::Logger::get("RR").information("fin iter");
}
}
void SSDCachePartition::clearOldestBlocks()
{
// Poco::Logger::get("GC").information("GC clear -----------------");
// write_buffer_size, because we need to erase the whole buffer.
Memory read_buffer_memory(block_size * write_buffer_size, BUFFER_ALIGNMENT);
@ -708,8 +692,6 @@ void SSDCachePartition::clearOldestBlocks()
request.aio_data = 0;
#endif
// Poco::Logger::get("GC").information("GC offset=" + std::to_string(request.aio_offset));
{
iocb* request_ptr = &request;
io_event event{};
@ -738,7 +720,6 @@ void SSDCachePartition::clearOldestBlocks()
std::vector<UInt64> keys;
keys.reserve(write_buffer_size);
// TODO: писать кол-во значений
for (size_t i = 0; i < write_buffer_size; ++i)
{
ReadBufferFromMemory read_buffer(read_buffer_memory.data() + i * block_size, block_size);
@ -753,7 +734,6 @@ void SSDCachePartition::clearOldestBlocks()
uint32_t keys_in_current_block = 0;
readBinary(keys_in_current_block, read_buffer);
// Poco::Logger::get("GC").information("keys in block: " + std::to_string(keys_in_current_block) + " offset=" + std::to_string(read_buffer.offset()));
for (uint32_t j = 0; j < keys_in_current_block; ++j)
{
@ -804,7 +784,6 @@ void SSDCachePartition::clearOldestBlocks()
const size_t start_block = current_file_block_id % max_size;
const size_t finish_block = start_block + write_buffer_size;
Poco::Logger::get("partition gc").information("erasing keys start = " + std::to_string(start_block) + " end = " + std::to_string(finish_block));
for (const auto& key : keys)
{
Index index;
@ -883,7 +862,7 @@ PaddedPODArray<SSDCachePartition::Key> SSDCachePartition::getCachedIds(const std
std::unique_lock lock(rw_lock); // Begin and end iterators can be changed.
PaddedPODArray<Key> array;
for (const auto & key : key_to_index.keys())
array.push_back(key); // TODO: exclude default
array.push_back(key);
return array;
}
@ -1185,7 +1164,7 @@ void SSDCacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector
std::rethrow_exception(last_update_exception);
}
// Set key
/// Set key
std::get<SSDCachePartition::Attribute::Container<UInt64>>(new_keys.values).push_back(id);
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
@ -1193,7 +1172,7 @@ void SSDCacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector
metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine)));
metadata.back().setDefault();
/// inform caller that the cell has not been found
/// Inform caller that the cell has not been found
on_id_not_found(id);
}
@ -1306,11 +1285,7 @@ SSDCacheDictionary::SSDCacheDictionary(
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
const auto null_value = std::get<TYPE>(null_values[index]); /* NOLINT */ \
getItemsNumberImpl<TYPE, TYPE>( /* NOLINT */ \
index, /* NOLINT */ \
ids, /* NOLINT */ \
out, /* NOLINT */ \
[&](const size_t) { return null_value; }); /* NOLINT */ \
getItemsNumberImpl<TYPE, TYPE>(index, ids, out, [&](const size_t) { return null_value; }); /* NOLINT */ \
}
DECLARE(UInt8)

View File

@ -92,8 +92,6 @@ namespace
constexpr UInt8 HAS_NOT_FOUND = 2;
//constexpr UInt16 MAX_KEY_SIZE = std::numeric_limits<UInt16>::max();
const std::string BIN_FILE_EXT = ".bin";
const std::string IND_FILE_EXT = ".idx";
@ -287,7 +285,6 @@ size_t SSDComplexKeyCachePartition::append(
for (size_t index = begin; index < keys.size();)
{
//Poco::Logger::get("test").information("wb off: " + std::to_string(write_buffer->offset()));
Index cache_index;
cache_index.setInMemory(true);
cache_index.setBlockId(current_memory_block_id);
@ -304,8 +301,6 @@ size_t SSDComplexKeyCachePartition::append(
writeBinary(metadata[index].data, *write_buffer);
}
//Poco::Logger::get("test key").information("wb off: " + std::to_string(write_buffer->offset()));
for (const auto & attribute : new_attributes)
{
if (flushed)
@ -322,7 +317,7 @@ size_t SSDComplexKeyCachePartition::append(
} \
else \
{ \
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); \
const auto & values = std::get<Attribute::Container<TYPE>>(attribute.values); /* NOLINT */ \
writeBinary(values[index], *write_buffer); \
} \
} \
@ -372,7 +367,6 @@ size_t SSDComplexKeyCachePartition::append(
{
init_write_buffer();
}
//Poco::Logger::get("test final").information("wb off: " + std::to_string(write_buffer->offset()));
}
return keys.size() - begin;
}
@ -406,8 +400,6 @@ void SSDComplexKeyCachePartition::flush()
write_request.aio_offset = (current_file_block_id % max_size) * block_size;
#endif
//Poco::Logger::get("try:").information("offset: " + std::to_string(write_request.aio_offset) + " nbytes: " + std::to_string(write_request.aio_nbytes));
while (io_submit(aio_context.ctx, 1, &write_request_ptr) < 0)
{
if (errno != EINTR)
@ -443,20 +435,18 @@ void SSDComplexKeyCachePartition::flush()
throwFromErrnoWithPath("Cannot fsync " + path + BIN_FILE_EXT, path + BIN_FILE_EXT, ErrorCodes::CANNOT_FSYNC);
/// commit changes in index
for (size_t row = 0; row < keys_buffer.size(); ++row)
for (auto & key : keys_buffer)
{
Index index;
//Poco::Logger::get("get:").information("sz = " + std::to_string(keys_buffer[row].size()));
if (key_to_index.getKeyAndValue(keys_buffer[row], index))
if (key_to_index.getKeyAndValue(key, index))
{
if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index.
{
index.setInMemory(false);
index.setBlockId((current_file_block_id % max_size) + index.getBlockId());
}
key_to_index.set(keys_buffer[row], index);
key_to_index.set(key, index);
}
//Poco::Logger::get("get:").information("finish");
}
current_file_block_id += write_buffer_size;
@ -652,7 +642,7 @@ void SSDComplexKeyCachePartition::getValueFromStorage(const PaddedPODArray<Index
std::vector<bool> processed(requests.size(), false);
std::vector<io_event> events(requests.size());
for (auto & event : events)
event.res = -1; // TODO: remove
event.res = -1;
size_t to_push = 0;
size_t to_pop = 0;
@ -714,7 +704,6 @@ void SSDComplexKeyCachePartition::getValueFromStorage(const PaddedPODArray<Index
void SSDComplexKeyCachePartition::clearOldestBlocks()
{
//Poco::Logger::get("GC").information("GC clear -----------------");
// write_buffer_size, because we need to erase the whole buffer.
Memory read_buffer_memory(block_size * write_buffer_size, BUFFER_ALIGNMENT);
@ -763,7 +752,6 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
TemporalComplexKeysPool tmp_keys_pool;
KeyRefs keys;
// TODO: писать кол-во значений
for (size_t i = 0; i < write_buffer_size; ++i)
{
ReadBufferFromMemory read_buffer(read_buffer_memory.data() + i * block_size, block_size);
@ -778,24 +766,20 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
uint32_t keys_in_current_block = 0;
readBinary(keys_in_current_block, read_buffer);
//Poco::Logger::get("GC").information("keys in block: " + std::to_string(keys_in_current_block) + " offset=" + std::to_string(read_buffer.offset()));
for (uint32_t j = 0; j < keys_in_current_block; ++j)
{
keys.emplace_back();
tmp_keys_pool.readKey(keys.back(), read_buffer);
//Poco::Logger::get("ClearOldestBlocks").information("ktest: sz=" + std::to_string(keys.back().size())
// + " data=" + std::to_string(reinterpret_cast<size_t>(keys.back().fullData())));
Metadata metadata;
readBinary(metadata.data, read_buffer);
if (!metadata.isDefault())
{
for (size_t attr = 0; attr < attributes_structure.size(); ++attr)
for (const auto & attr : attributes_structure)
{
switch (attributes_structure[attr])
switch (attr)
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
@ -833,21 +817,15 @@ void SSDComplexKeyCachePartition::clearOldestBlocks()
const size_t start_block = current_file_block_id % max_size;
const size_t finish_block = start_block + write_buffer_size;
//Poco::Logger::get("ClearOldestBlocks").information("> erasing keys <");
for (const auto& key : keys)
{
//Poco::Logger::get("ClearOldestBlocks").information("ktest: null=" + std::to_string(key.isNull()));
//Poco::Logger::get("ClearOldestBlocks").information("ktest: data=" + std::to_string(reinterpret_cast<size_t>(key.fullData())));
//Poco::Logger::get("ClearOldestBlocks").information("ktest: sz=" + std::to_string(key.size()) + " fz=" + std::to_string(key.fullSize()));
Index index;
if (key_to_index.get(key, index))
{
//Poco::Logger::get("ClearOldestBlocks").information("erase");
size_t block_id = index.getBlockId();
if (start_block <= block_id && block_id < finish_block)
key_to_index.erase(key);
}
//Poco::Logger::get("ClearOldestBlocks").information("finish");
}
}
@ -1048,6 +1026,67 @@ void SSDComplexKeyCacheStorage::has(
hit_count.fetch_add(n - count_not_found, std::memory_order_release);
}
namespace
{
SSDComplexKeyCachePartition::Attributes createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure)
{
SSDComplexKeyCachePartition::Attributes attributes;
const auto columns = block.getColumns();
for (size_t i = 0; i < structure.size(); ++i)
{
const auto & column = columns[i + begin_column];
switch (structure[i])
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
SSDComplexKeyCachePartition::Attribute::Container<TYPE> values(column->size()); \
memcpy(&values[0], column->getRawData().data, sizeof(TYPE) * values.size()); \
attributes.emplace_back(); \
attributes.back().type = structure[i]; \
attributes.back().values = std::move(values); \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
{
attributes.emplace_back();
SSDComplexKeyCachePartition::Attribute::Container<String> values(column->size());
for (size_t j = 0; j < column->size(); ++j)
{
const auto ref = column->getDataAt(j);
values[j].resize(ref.size);
memcpy(values[j].data(), ref.data, ref.size);
}
attributes.back().type = structure[i];
attributes.back().values = std::move(values);
}
break;
}
}
return attributes;
}
} // namespace
template <typename PresentIdHandler, typename AbsentIdHandler>
void SSDComplexKeyCacheStorage::update(
DictionarySourcePtr & source_ptr,
@ -1202,7 +1241,7 @@ void SSDComplexKeyCacheStorage::update(
if (update_error_count)
{
/// TODO: юзать старые значения.
/// TODO: use old values.
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_update_exception);
@ -1267,64 +1306,6 @@ void SSDComplexKeyCacheStorage::collectGarbage()
}
}
SSDComplexKeyCachePartition::Attributes SSDComplexKeyCacheStorage::createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure)
{
SSDComplexKeyCachePartition::Attributes attributes;
const auto columns = block.getColumns();
for (size_t i = 0; i < structure.size(); ++i)
{
const auto & column = columns[i + begin_column];
switch (structure[i])
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
SSDComplexKeyCachePartition::Attribute::Container<TYPE> values(column->size()); \
memcpy(&values[0], column->getRawData().data, sizeof(TYPE) * values.size()); \
attributes.emplace_back(); \
attributes.back().type = structure[i]; \
attributes.back().values = std::move(values); \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
{
attributes.emplace_back();
SSDComplexKeyCachePartition::Attribute::Container<String> values(column->size());
for (size_t j = 0; j < column->size(); ++j)
{
const auto ref = column->getDataAt(j);
values[j].resize(ref.size);
memcpy(values[j].data(), ref.data, ref.size);
}
attributes.back().type = structure[i];
attributes.back().values = std::move(values);
}
break;
}
}
return attributes;
}
SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
@ -1368,13 +1349,8 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary(
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
const auto null_value = std::get<TYPE>(null_values[index]); \
getItemsNumberImpl<TYPE, TYPE>( \
index, \
key_columns, \
key_types, \
out, \
[&](const size_t) { return null_value; }); \
const auto null_value = std::get<TYPE>(null_values[index]); /* NOLINT */ \
getItemsNumberImpl<TYPE, TYPE>( index, key_columns, key_types, out, [&](const size_t) { return null_value; }); /* NOLINT */ \
}
DECLARE(UInt8)
@ -1403,12 +1379,7 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary(
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
getItemsNumberImpl<TYPE, TYPE>( \
index, \
key_columns, \
key_types, \
out, \
[&](const size_t row) { return def[row]; }); \
getItemsNumberImpl<TYPE, TYPE>(index, key_columns, key_types, out, [&](const size_t row) { return def[row]; }); /* NOLINT */ \
}
DECLARE(UInt8)
DECLARE(UInt16)
@ -1436,12 +1407,7 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary(
{ \
const auto index = getAttributeIndex(attribute_name); \
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \
getItemsNumberImpl<TYPE, TYPE>( \
index, \
key_columns, \
key_types, \
out, \
[&](const size_t) { return def; }); \
getItemsNumberImpl<TYPE, TYPE>(index, key_columns, key_types, out, [&](const size_t) { return def; }); /* NOLINT */ \
}
DECLARE(UInt8)
DECLARE(UInt16)
@ -1708,7 +1674,7 @@ AttributeValueVariant SSDComplexKeyCacheDictionary::createAttributeNullValueWith
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
return createAttributeNullValueWithTypeImpl<TYPE>(null_value);
return createAttributeNullValueWithTypeImpl<TYPE>(null_value); /* NOLINT */
DISPATCH(UInt8)
DISPATCH(UInt16)

View File

@ -200,7 +200,6 @@ public:
{
UInt16 sz;
readBinary(sz, buf);
//Poco::Logger::get("test read key").information("sz " + std::to_string(sz));
char * data = nullptr;
if constexpr (std::is_same_v<A, SmallObjectPool>)
data = arena.alloc();
@ -209,7 +208,6 @@ public:
memcpy(data, &sz, sizeof(sz));
buf.read(data + sizeof(sz), sz);
key = KeyRef(data);
//Poco::Logger::get("test read key").information("ksz = " + std::to_string(key.size()));
}
void ignoreKey(ReadBuffer & buf) const
@ -478,9 +476,6 @@ public:
double getLoadFactor() const;
private:
SSDComplexKeyCachePartition::Attributes createAttributesFromBlock(
const Block & block, const size_t begin_column, const std::vector<AttributeUnderlyingType> & structure);
void collectGarbage();
const AttributeTypes attributes_structure;
@ -505,9 +500,6 @@ private:
mutable size_t update_error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
// stats
//mutable size_t bytes_allocated = 0;
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
@ -569,10 +561,6 @@ public:
return dict_struct.attributes[getAttributeIndex(attribute_name)].injective;
}
/*bool hasHierarchy() const { return false; }
void toParent(const PaddedPODArray<Key> &, PaddedPODArray<Key> &) const { }*/
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>