This commit is contained in:
Nikita Vasilev 2020-03-30 10:12:12 +03:00
parent bcea5b26d7
commit 9e61702b95
4 changed files with 163 additions and 26 deletions

View File

@ -49,17 +49,18 @@ namespace DB
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int AIO_READ_ERROR;
extern const int AIO_WRITE_ERROR;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int CANNOT_FSYNC;
extern const int CANNOT_IO_GETEVENTS;
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_OPEN_FILE;
extern const int FILE_DOESNT_EXIST;
extern const int LOGICAL_ERROR;
extern const int TOO_SMALL_BUFFER_SIZE;
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
extern const int AIO_WRITE_ERROR;
extern const int CANNOT_FSYNC;
extern const int TYPE_MISMATCH;
extern const int UNSUPPORTED_METHOD;
}
namespace
@ -71,6 +72,7 @@ namespace
constexpr size_t DEFAULT_WRITE_BUFFER_SIZE = DEFAULT_SSD_BLOCK_SIZE;
constexpr size_t BUFFER_ALIGNMENT = DEFAULT_AIO_FILE_BLOCK_SIZE;
constexpr size_t BLOCK_SPECIAL_FIELDS_SIZE = 4;
static constexpr UInt64 KEY_METADATA_EXPIRES_AT_MASK = std::numeric_limits<std::chrono::system_clock::time_point::rep>::max();
static constexpr UInt64 KEY_METADATA_IS_DEFAULT_MASK = ~KEY_METADATA_EXPIRES_AT_MASK;
@ -225,8 +227,8 @@ size_t CachePartition::appendBlock(
const Attribute & new_keys, const Attributes & new_attributes, const PaddedPODArray<Metadata> & metadata, const size_t begin)
{
std::unique_lock lock(rw_lock);
if (current_file_block_id >= max_size)
return 0;
//if (current_file_block_id >= max_size)
// return 0;
if (new_attributes.size() != attributes_structure.size())
throw Exception{"Wrong columns number in block.", ErrorCodes::BAD_ARGUMENTS};
@ -239,6 +241,9 @@ size_t CachePartition::appendBlock(
if (!write_buffer)
{
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
uint32_t tmp = 0;
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_SPECIAL_FIELDS_SIZE);
keys_in_block = 0;
// codec = CompressionCodecFactory::instance().get("NONE", std::nullopt);
// compressed_buffer.emplace(*write_buffer, codec);
// hashing_buffer.emplace(*compressed_buffer);
@ -279,6 +284,7 @@ size_t CachePartition::appendBlock(
if (sizeof(TYPE) > write_buffer->available()) \
{ \
write_buffer.reset(); \
std::memcpy(memory->data() + block_size * current_memory_block_id, &keys_in_block, sizeof(keys_in_block)); /* set count */ \
if (++current_memory_block_id == write_buffer_size) \
flush(); \
flushed = true; \
@ -310,11 +316,12 @@ size_t CachePartition::appendBlock(
case AttributeUnderlyingType::utString:
{
LOG_DEBUG(&Poco::Logger::get("kek"), "string write");
//LOG_DEBUG(&Poco::Logger::get("kek"), "string write");
const auto & value = std::get<Attribute::Container<String>>(attribute.values)[index];
if (sizeof(UInt64) + value.size() > write_buffer->available())
{
write_buffer.reset();
std::memcpy(memory->data() + block_size * current_memory_block_id, &keys_in_block, sizeof(keys_in_block)); // set count
if (++current_memory_block_id == write_buffer_size)
flush();
flushed = true;
@ -334,22 +341,33 @@ size_t CachePartition::appendBlock(
key_to_index_and_metadata[ids[index]] = index_and_metadata;
ids_buffer.push_back(ids[index]);
++index;
++keys_in_block;
}
else if (current_file_block_id < max_size) // next block in write buffer or flushed to ssd
else //if (current_file_block_id < max_size) // next block in write buffer or flushed to ssd
{
write_buffer.emplace(memory->data() + current_memory_block_id * block_size, block_size);
uint32_t tmp = 0;
write_buffer->write(reinterpret_cast<char*>(&tmp), BLOCK_SPECIAL_FIELDS_SIZE);
keys_in_block = 0;
}
else // flushed to ssd, end of current file
/*else // flushed to ssd, end of current file
{
//write_buffer.emplace(memory->data() + current_memory_block_id * block_size + BLOCK_SPECIAL_FIELDS_SIZE, block_size - BLOCK_SPECIAL_FIELDS_SIZE);
keys_in_block = 0;
//clearOldestBlocks();
memory.reset();
return index - begin;
}
}*/
}
return ids.size() - begin;
}
void CachePartition::flush()
{
if (current_file_block_id >= max_size) {
clearOldestBlocks();
}
const auto & ids = std::get<Attribute::Container<UInt64>>(keys_buffer.values);
if (ids.empty())
return;
@ -539,6 +557,7 @@ void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices,
Memory read_buffer(block_size * read_buffer_size, BUFFER_ALIGNMENT);
// TODO: merge requests
std::vector<iocb> requests;
std::vector<iocb*> pointers;
std::vector<std::vector<size_t>> blocks_to_indices;
@ -603,7 +622,7 @@ void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices,
if (events[i].res != static_cast<ssize_t>(request.aio_nbytes))
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"request_id= " + std::to_string(request.aio_data) + ", aio_nbytes=" + std::to_string(request.aio_nbytes) + ", aio_offset=" + std::to_string(request.aio_offset) +
"returned: " + std::to_string(events[i].res), ErrorCodes::AIO_WRITE_ERROR);
"returned: " + std::to_string(events[i].res), ErrorCodes::AIO_READ_ERROR);
for (const size_t idx : blocks_to_indices[request_id])
{
const auto & [file_index, out_index] = index_to_out[idx];
@ -632,6 +651,124 @@ void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices,
}
}
void CachePartition::clearOldestBlocks()
{
Poco::Logger::get("GC").information("GC clear -----------------");
// write_buffer_size, because we need to erase the whole buffer.
Memory read_buffer_memory(block_size * write_buffer_size, BUFFER_ALIGNMENT);
iocb request{};
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
request.aio.aio_fildes = fd;
request.aio.aio_buf = reinterpret_cast<volatile void *>(reinterpret_cast<UInt64>(read_buffer_memory.data()));
request.aio.aio_nbytes = block_size * write_buffer_size;
request.aio.aio_offset = (current_file_block_id % max_size) * block_size;
request.aio_data = 0;
#else
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(read_buffer_memory.data());
request.aio_nbytes = block_size * write_buffer_size;
request.aio_offset = (current_file_block_id % max_size) * block_size;
request.aio_data = 0;
#endif
{
iocb* request_ptr = &request;
io_event event{};
AIOContext aio_context(1);
if (io_submit(aio_context.ctx, 1, &request_ptr) != 1)
{
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
if (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) != 1)
{
throwFromErrno("io_getevents: Failed to get an event for asynchronous IO", ErrorCodes::CANNOT_IO_GETEVENTS);
}
if (event.res != static_cast<ssize_t>(request.aio_nbytes))
{
throw Exception("GC: AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"aio_nbytes=" + std::to_string(request.aio_nbytes) +
", returned=" + std::to_string(event.res) + ".", ErrorCodes::AIO_READ_ERROR);
}
}
std::vector<UInt64> keys;
keys.reserve(write_buffer_size);
// TODO: писать кол-во значений
for (size_t i = 0; i < write_buffer_size; ++i)
{
ReadBufferFromMemory read_buffer(read_buffer_memory.data() + i * block_size, block_size);
uint32_t keys_in_current_block = 0;
readBinary(keys_in_current_block, read_buffer);
Poco::Logger::get("GC").information("keys in block: " + std::to_string(keys_in_current_block) + " offset=" + std::to_string(read_buffer.offset()));
for (uint32_t j = 0; j < keys_in_current_block; ++j)
{
//Poco::Logger::get("GC").information(std::to_string(j) + " " + std::to_string(read_buffer.offset()));
keys.emplace_back();
readBinary(keys.back(), read_buffer);
for (size_t attr = 0; attr < attributes_structure.size(); ++attr)
{
switch (attributes_structure[attr])
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
read_buffer.ignore(sizeof(TYPE)); \
//Poco::Logger::get("GC").information("read TYPE");\
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
{
//Poco::Logger::get("GC").information("read string");
size_t size = 0;
readVarUInt(size, read_buffer);
//Poco::Logger::get("GC").information("read string " + std::to_string(size));
read_buffer.ignore(size);
}
break;
}
}
}
}
const size_t start_block = current_file_block_id % max_size;
const size_t finish_block = start_block + block_size * write_buffer_size;
for (const auto& key : keys)
{
auto it = key_to_index_and_metadata.find(key);
if (it != std::end(key_to_index_and_metadata)) {
size_t block_id = it->second.index.getBlockId();
if (start_block <= block_id && block_id < finish_block) {
key_to_index_and_metadata.erase(it);
}
}
}
}
void CachePartition::ignoreFromBufferToAttributeIndex(const size_t attribute_index, ReadBuffer & buf) const
{
buf.ignore(sizeof(UInt64));

View File

@ -138,6 +138,8 @@ public:
size_t appendDefaults(const Attribute & new_keys, const PaddedPODArray<Metadata> & metadata, const size_t begin);
void clearOldestBlocks();
void flush();
void remove();
@ -187,6 +189,7 @@ private:
std::optional<Memory<>> memory;
std::optional<WriteBuffer> write_buffer;
uint32_t keys_in_block = 0;
// std::optional<CompressedWriteBuffer> compressed_buffer;
// std::optional<HashingWriteBuffer> hashing_buffer;
// CompressionCodecPtr codec;

View File

@ -22,10 +22,7 @@ VALUE FROM RAM BUFFER
VALUES FROM DISK AND RAM BUFFER
118
HAS
1
2
5
10
1006
VALUES NOT FROM TABLE
0 -1 none
0 -1 none

View File

@ -18,8 +18,8 @@ ORDER BY id;
INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, -100, 'clickhouse'), (2, 3, 4, 'database'), (5, 6, 7, 'columns'), (10, 9, 8, '');
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'a' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'b' FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'c' FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'b' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 370, 370;
INSERT INTO database_for_dict.table_for_dict SELECT number, 0, -1, 'c' FROM system.numbers WHERE number NOT IN (1, 2, 5, 10) LIMIT 700, 370;
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
@ -55,11 +55,11 @@ CREATE TABLE database_for_dict.keys_table
ENGINE = StripeLog();
INSERT INTO database_for_dict.keys_table VALUES (1);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 370;
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370;
INSERT INTO database_for_dict.keys_table VALUES (2);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 370, 370;
INSERT INTO database_for_dict.keys_table VALUES (5);
INSERT INTO database_for_dict.keys_table SELECT intHash64(number) FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.keys_table SELECT 11 + intHash64(number) % 1200 FROM system.numbers LIMIT 700, 370;
INSERT INTO database_for_dict.keys_table VALUES (10);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
@ -99,8 +99,8 @@ SELECT 'VALUES FROM DISK AND RAM BUFFER';
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;
SELECT 'HAS';
-- 1 2 5 10
SELECT id FROM database_for_dict.keys_table WHERE dictHas('database_for_dict.ssd_dict', toUInt64(id)) ORDER BY id;
-- 1006
SELECT count() FROM database_for_dict.keys_table WHERE dictHas('database_for_dict.ssd_dict', toUInt64(id));
SELECT 'VALUES NOT FROM TABLE';
-- 0 -1 none