From 05938e562cb9215c8e33b8a6687183449683808d Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 23 May 2020 11:07:24 +0300 Subject: [PATCH] fix --- src/Dictionaries/SSDCacheDictionary.cpp | 35 +--- .../SSDComplexKeyCacheDictionary.cpp | 182 +++++++----------- .../SSDComplexKeyCacheDictionary.h | 12 -- 3 files changed, 79 insertions(+), 150 deletions(-) diff --git a/src/Dictionaries/SSDCacheDictionary.cpp b/src/Dictionaries/SSDCacheDictionary.cpp index a065b367101..ad6b5cb4ea8 100644 --- a/src/Dictionaries/SSDCacheDictionary.cpp +++ b/src/Dictionaries/SSDCacheDictionary.cpp @@ -263,7 +263,6 @@ size_t SSDCachePartition::appendBlock( Index cache_index; cache_index.setInMemory(true); cache_index.setBlockId(current_memory_block_id); - // Poco::Logger::get("wr").information(" block mem: " + std::to_string(current_memory_block_id) + " wb: " + std::to_string(write_buffer_size)); if (current_memory_block_id >= write_buffer_size) throw DB::Exception("lel " + std::to_string(current_memory_block_id) + " " + std::to_string(write_buffer_size) + " " + std::to_string(index), ErrorCodes::LOGICAL_ERROR); @@ -338,7 +337,6 @@ size_t SSDCachePartition::appendBlock( if (!flushed) { - // Poco::Logger::get("wr").information(" set: " + std::to_string(cache_index.getBlockId()) + " " + std::to_string(cache_index.getAddressInBlock())); key_to_index.set(ids[index], cache_index); ids_buffer.push_back(ids[index]); ++index; @@ -349,7 +347,6 @@ size_t SSDCachePartition::appendBlock( init_write_buffer(); } } - // Poco::Logger::get("wr").information("exit"); return ids.size() - begin; } @@ -362,7 +359,6 @@ void SSDCachePartition::flush() if (ids.empty()) return; Poco::Logger::get("paritiiton").information("flushing to SSD."); - // Poco::Logger::get("paritiiton").information("@@@@@@@@@@@@@@@@@@@@ FLUSH!!! " + std::to_string(file_id) + " block: " + std::to_string(current_file_block_id)); AIOContext aio_context{1}; @@ -426,7 +422,6 @@ void SSDCachePartition::flush() if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index. { index.setInMemory(false); - // Poco::Logger::get("pt").information("block: " + std::to_string(index.getBlockId()) + " " + std::to_string(current_file_block_id) + " "); index.setBlockId((current_file_block_id % max_size) + index.getBlockId()); } key_to_index.set(id, index); @@ -571,7 +566,6 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray & indice Memory read_buffer(block_size * read_buffer_size, BUFFER_ALIGNMENT); - // TODO: merge requests std::vector requests; std::vector pointers; std::vector> blocks_to_indices; @@ -601,10 +595,6 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray & indice request.aio_fildes = fd; request.aio_buf = reinterpret_cast(read_buffer.data()) + block_size * (requests.size() % read_buffer_size); request.aio_nbytes = block_size; - // Poco::Logger::get("RR").information("block found" + std::to_string(index_to_out[i].first.getBlockId()) + " max_size" + std::to_string(max_size)); - // if (index_to_out[i].first.getBlockId() > max_size) { - // throw DB::Exception("kek", ErrorCodes::LOGICAL_ERROR); - // } request.aio_offset = index_to_out[i].first.getBlockId() * block_size; request.aio_data = requests.size(); #endif @@ -619,16 +609,12 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray & indice std::vector processed(requests.size(), false); std::vector events(requests.size()); for (auto & event : events) - event.res = -1; // TODO: remove + event.res = -1; size_t to_push = 0; size_t to_pop = 0; while (to_pop < requests.size()) { - // Poco::Logger::get("RR").information( - // "push = " + std::to_string(to_push) + " pop=" + std::to_string(to_pop) + - // "bi = " + std::to_string(blocks_to_indices.size()) + " req = " + std::to_string(requests.size())); - /// get io tasks from previous iteration int popped = 0; while (to_pop < to_push && (popped = io_getevents(aio_context.ctx, to_push - to_pop, to_push - to_pop, &events[to_pop], nullptr)) <= 0) { @@ -681,13 +667,11 @@ void SSDCachePartition::getValueFromStorage(const PaddedPODArray & indice throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT); } to_push += pushed; - // Poco::Logger::get("RR").information("fin iter"); } } void SSDCachePartition::clearOldestBlocks() { - // Poco::Logger::get("GC").information("GC clear -----------------"); // write_buffer_size, because we need to erase the whole buffer. Memory read_buffer_memory(block_size * write_buffer_size, BUFFER_ALIGNMENT); @@ -708,8 +692,6 @@ void SSDCachePartition::clearOldestBlocks() request.aio_data = 0; #endif - // Poco::Logger::get("GC").information("GC offset=" + std::to_string(request.aio_offset)); - { iocb* request_ptr = &request; io_event event{}; @@ -738,7 +720,6 @@ void SSDCachePartition::clearOldestBlocks() std::vector keys; keys.reserve(write_buffer_size); - // TODO: писать кол-во значений for (size_t i = 0; i < write_buffer_size; ++i) { ReadBufferFromMemory read_buffer(read_buffer_memory.data() + i * block_size, block_size); @@ -753,7 +734,6 @@ void SSDCachePartition::clearOldestBlocks() uint32_t keys_in_current_block = 0; readBinary(keys_in_current_block, read_buffer); - // Poco::Logger::get("GC").information("keys in block: " + std::to_string(keys_in_current_block) + " offset=" + std::to_string(read_buffer.offset())); for (uint32_t j = 0; j < keys_in_current_block; ++j) { @@ -804,7 +784,6 @@ void SSDCachePartition::clearOldestBlocks() const size_t start_block = current_file_block_id % max_size; const size_t finish_block = start_block + write_buffer_size; - Poco::Logger::get("partition gc").information("erasing keys start = " + std::to_string(start_block) + " end = " + std::to_string(finish_block)); for (const auto& key : keys) { Index index; @@ -883,7 +862,7 @@ PaddedPODArray SSDCachePartition::getCachedIds(const std std::unique_lock lock(rw_lock); // Begin and end iterators can be changed. PaddedPODArray array; for (const auto & key : key_to_index.keys()) - array.push_back(key); // TODO: exclude default + array.push_back(key); return array; } @@ -1185,7 +1164,7 @@ void SSDCacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector std::rethrow_exception(last_update_exception); } - // Set key + /// Set key std::get>(new_keys.values).push_back(id); std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; @@ -1193,7 +1172,7 @@ void SSDCacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector metadata.back().setExpiresAt(now + std::chrono::seconds(distribution(rnd_engine))); metadata.back().setDefault(); - /// inform caller that the cell has not been found + /// Inform caller that the cell has not been found on_id_not_found(id); } @@ -1306,11 +1285,7 @@ SSDCacheDictionary::SSDCacheDictionary( const auto index = getAttributeIndex(attribute_name); \ checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \ const auto null_value = std::get(null_values[index]); /* NOLINT */ \ - getItemsNumberImpl( /* NOLINT */ \ - index, /* NOLINT */ \ - ids, /* NOLINT */ \ - out, /* NOLINT */ \ - [&](const size_t) { return null_value; }); /* NOLINT */ \ + getItemsNumberImpl(index, ids, out, [&](const size_t) { return null_value; }); /* NOLINT */ \ } DECLARE(UInt8) diff --git a/src/Dictionaries/SSDComplexKeyCacheDictionary.cpp b/src/Dictionaries/SSDComplexKeyCacheDictionary.cpp index 0a97c59f524..df636baa19e 100644 --- a/src/Dictionaries/SSDComplexKeyCacheDictionary.cpp +++ b/src/Dictionaries/SSDComplexKeyCacheDictionary.cpp @@ -92,8 +92,6 @@ namespace constexpr UInt8 HAS_NOT_FOUND = 2; - //constexpr UInt16 MAX_KEY_SIZE = std::numeric_limits::max(); - const std::string BIN_FILE_EXT = ".bin"; const std::string IND_FILE_EXT = ".idx"; @@ -287,7 +285,6 @@ size_t SSDComplexKeyCachePartition::append( for (size_t index = begin; index < keys.size();) { - //Poco::Logger::get("test").information("wb off: " + std::to_string(write_buffer->offset())); Index cache_index; cache_index.setInMemory(true); cache_index.setBlockId(current_memory_block_id); @@ -304,8 +301,6 @@ size_t SSDComplexKeyCachePartition::append( writeBinary(metadata[index].data, *write_buffer); } - //Poco::Logger::get("test key").information("wb off: " + std::to_string(write_buffer->offset())); - for (const auto & attribute : new_attributes) { if (flushed) @@ -322,7 +317,7 @@ size_t SSDComplexKeyCachePartition::append( } \ else \ { \ - const auto & values = std::get>(attribute.values); \ + const auto & values = std::get>(attribute.values); /* NOLINT */ \ writeBinary(values[index], *write_buffer); \ } \ } \ @@ -372,7 +367,6 @@ size_t SSDComplexKeyCachePartition::append( { init_write_buffer(); } - //Poco::Logger::get("test final").information("wb off: " + std::to_string(write_buffer->offset())); } return keys.size() - begin; } @@ -406,8 +400,6 @@ void SSDComplexKeyCachePartition::flush() write_request.aio_offset = (current_file_block_id % max_size) * block_size; #endif - //Poco::Logger::get("try:").information("offset: " + std::to_string(write_request.aio_offset) + " nbytes: " + std::to_string(write_request.aio_nbytes)); - while (io_submit(aio_context.ctx, 1, &write_request_ptr) < 0) { if (errno != EINTR) @@ -443,20 +435,18 @@ void SSDComplexKeyCachePartition::flush() throwFromErrnoWithPath("Cannot fsync " + path + BIN_FILE_EXT, path + BIN_FILE_EXT, ErrorCodes::CANNOT_FSYNC); /// commit changes in index - for (size_t row = 0; row < keys_buffer.size(); ++row) + for (auto & key : keys_buffer) { Index index; - //Poco::Logger::get("get:").information("sz = " + std::to_string(keys_buffer[row].size())); - if (key_to_index.getKeyAndValue(keys_buffer[row], index)) + if (key_to_index.getKeyAndValue(key, index)) { if (index.inMemory()) // Row can be inserted in the buffer twice, so we need to move to ssd only the last index. { index.setInMemory(false); index.setBlockId((current_file_block_id % max_size) + index.getBlockId()); } - key_to_index.set(keys_buffer[row], index); + key_to_index.set(key, index); } - //Poco::Logger::get("get:").information("finish"); } current_file_block_id += write_buffer_size; @@ -652,7 +642,7 @@ void SSDComplexKeyCachePartition::getValueFromStorage(const PaddedPODArray processed(requests.size(), false); std::vector events(requests.size()); for (auto & event : events) - event.res = -1; // TODO: remove + event.res = -1; size_t to_push = 0; size_t to_pop = 0; @@ -714,7 +704,6 @@ void SSDComplexKeyCachePartition::getValueFromStorage(const PaddedPODArray erasing keys <"); for (const auto& key : keys) { - //Poco::Logger::get("ClearOldestBlocks").information("ktest: null=" + std::to_string(key.isNull())); - //Poco::Logger::get("ClearOldestBlocks").information("ktest: data=" + std::to_string(reinterpret_cast(key.fullData()))); - //Poco::Logger::get("ClearOldestBlocks").information("ktest: sz=" + std::to_string(key.size()) + " fz=" + std::to_string(key.fullSize())); Index index; if (key_to_index.get(key, index)) { - //Poco::Logger::get("ClearOldestBlocks").information("erase"); size_t block_id = index.getBlockId(); if (start_block <= block_id && block_id < finish_block) key_to_index.erase(key); } - //Poco::Logger::get("ClearOldestBlocks").information("finish"); } } @@ -1048,6 +1026,67 @@ void SSDComplexKeyCacheStorage::has( hit_count.fetch_add(n - count_not_found, std::memory_order_release); } +namespace +{ +SSDComplexKeyCachePartition::Attributes createAttributesFromBlock( + const Block & block, const size_t begin_column, const std::vector & structure) +{ + SSDComplexKeyCachePartition::Attributes attributes; + + const auto columns = block.getColumns(); + for (size_t i = 0; i < structure.size(); ++i) + { + const auto & column = columns[i + begin_column]; + switch (structure[i]) + { +#define DISPATCH(TYPE) \ + case AttributeUnderlyingType::ut##TYPE: \ + { \ + SSDComplexKeyCachePartition::Attribute::Container values(column->size()); \ + memcpy(&values[0], column->getRawData().data, sizeof(TYPE) * values.size()); \ + attributes.emplace_back(); \ + attributes.back().type = structure[i]; \ + attributes.back().values = std::move(values); \ + } \ + break; + + DISPATCH(UInt8) + DISPATCH(UInt16) + DISPATCH(UInt32) + DISPATCH(UInt64) + DISPATCH(UInt128) + DISPATCH(Int8) + DISPATCH(Int16) + DISPATCH(Int32) + DISPATCH(Int64) + DISPATCH(Decimal32) + DISPATCH(Decimal64) + DISPATCH(Decimal128) + DISPATCH(Float32) + DISPATCH(Float64) +#undef DISPATCH + + case AttributeUnderlyingType::utString: + { + attributes.emplace_back(); + SSDComplexKeyCachePartition::Attribute::Container values(column->size()); + for (size_t j = 0; j < column->size(); ++j) + { + const auto ref = column->getDataAt(j); + values[j].resize(ref.size); + memcpy(values[j].data(), ref.data, ref.size); + } + attributes.back().type = structure[i]; + attributes.back().values = std::move(values); + } + break; + } + } + + return attributes; +} +} // namespace + template void SSDComplexKeyCacheStorage::update( DictionarySourcePtr & source_ptr, @@ -1202,7 +1241,7 @@ void SSDComplexKeyCacheStorage::update( if (update_error_count) { - /// TODO: юзать старые значения. + /// TODO: use old values. /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`. std::rethrow_exception(last_update_exception); @@ -1267,64 +1306,6 @@ void SSDComplexKeyCacheStorage::collectGarbage() } } -SSDComplexKeyCachePartition::Attributes SSDComplexKeyCacheStorage::createAttributesFromBlock( - const Block & block, const size_t begin_column, const std::vector & structure) -{ - SSDComplexKeyCachePartition::Attributes attributes; - - const auto columns = block.getColumns(); - for (size_t i = 0; i < structure.size(); ++i) - { - const auto & column = columns[i + begin_column]; - switch (structure[i]) - { -#define DISPATCH(TYPE) \ - case AttributeUnderlyingType::ut##TYPE: \ - { \ - SSDComplexKeyCachePartition::Attribute::Container values(column->size()); \ - memcpy(&values[0], column->getRawData().data, sizeof(TYPE) * values.size()); \ - attributes.emplace_back(); \ - attributes.back().type = structure[i]; \ - attributes.back().values = std::move(values); \ - } \ - break; - - DISPATCH(UInt8) - DISPATCH(UInt16) - DISPATCH(UInt32) - DISPATCH(UInt64) - DISPATCH(UInt128) - DISPATCH(Int8) - DISPATCH(Int16) - DISPATCH(Int32) - DISPATCH(Int64) - DISPATCH(Decimal32) - DISPATCH(Decimal64) - DISPATCH(Decimal128) - DISPATCH(Float32) - DISPATCH(Float64) -#undef DISPATCH - - case AttributeUnderlyingType::utString: - { - attributes.emplace_back(); - SSDComplexKeyCachePartition::Attribute::Container values(column->size()); - for (size_t j = 0; j < column->size(); ++j) - { - const auto ref = column->getDataAt(j); - values[j].resize(ref.size); - memcpy(values[j].data(), ref.data, ref.size); - } - attributes.back().type = structure[i]; - attributes.back().values = std::move(values); - } - break; - } - } - - return attributes; -} - SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary( const std::string & name_, const DictionaryStructure & dict_struct_, @@ -1368,13 +1349,8 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary( { \ const auto index = getAttributeIndex(attribute_name); \ checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \ - const auto null_value = std::get(null_values[index]); \ - getItemsNumberImpl( \ - index, \ - key_columns, \ - key_types, \ - out, \ - [&](const size_t) { return null_value; }); \ + const auto null_value = std::get(null_values[index]); /* NOLINT */ \ + getItemsNumberImpl( index, key_columns, key_types, out, [&](const size_t) { return null_value; }); /* NOLINT */ \ } DECLARE(UInt8) @@ -1403,12 +1379,7 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary( { \ const auto index = getAttributeIndex(attribute_name); \ checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \ - getItemsNumberImpl( \ - index, \ - key_columns, \ - key_types, \ - out, \ - [&](const size_t row) { return def[row]; }); \ + getItemsNumberImpl(index, key_columns, key_types, out, [&](const size_t row) { return def[row]; }); /* NOLINT */ \ } DECLARE(UInt8) DECLARE(UInt16) @@ -1436,12 +1407,7 @@ SSDComplexKeyCacheDictionary::SSDComplexKeyCacheDictionary( { \ const auto index = getAttributeIndex(attribute_name); \ checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::ut##TYPE); \ - getItemsNumberImpl( \ - index, \ - key_columns, \ - key_types, \ - out, \ - [&](const size_t) { return def; }); \ + getItemsNumberImpl(index, key_columns, key_types, out, [&](const size_t) { return def; }); /* NOLINT */ \ } DECLARE(UInt8) DECLARE(UInt16) @@ -1708,7 +1674,7 @@ AttributeValueVariant SSDComplexKeyCacheDictionary::createAttributeNullValueWith { #define DISPATCH(TYPE) \ case AttributeUnderlyingType::ut##TYPE: \ - return createAttributeNullValueWithTypeImpl(null_value); + return createAttributeNullValueWithTypeImpl(null_value); /* NOLINT */ DISPATCH(UInt8) DISPATCH(UInt16) diff --git a/src/Dictionaries/SSDComplexKeyCacheDictionary.h b/src/Dictionaries/SSDComplexKeyCacheDictionary.h index b6717d16f65..7809bd1909d 100644 --- a/src/Dictionaries/SSDComplexKeyCacheDictionary.h +++ b/src/Dictionaries/SSDComplexKeyCacheDictionary.h @@ -200,7 +200,6 @@ public: { UInt16 sz; readBinary(sz, buf); - //Poco::Logger::get("test read key").information("sz " + std::to_string(sz)); char * data = nullptr; if constexpr (std::is_same_v) data = arena.alloc(); @@ -209,7 +208,6 @@ public: memcpy(data, &sz, sizeof(sz)); buf.read(data + sizeof(sz), sz); key = KeyRef(data); - //Poco::Logger::get("test read key").information("ksz = " + std::to_string(key.size())); } void ignoreKey(ReadBuffer & buf) const @@ -478,9 +476,6 @@ public: double getLoadFactor() const; private: - SSDComplexKeyCachePartition::Attributes createAttributesFromBlock( - const Block & block, const size_t begin_column, const std::vector & structure); - void collectGarbage(); const AttributeTypes attributes_structure; @@ -505,9 +500,6 @@ private: mutable size_t update_error_count = 0; mutable std::chrono::system_clock::time_point backoff_end_time; - // stats - //mutable size_t bytes_allocated = 0; - mutable std::atomic hit_count{0}; mutable std::atomic query_count{0}; }; @@ -569,10 +561,6 @@ public: return dict_struct.attributes[getAttributeIndex(attribute_name)].injective; } - /*bool hasHierarchy() const { return false; } - - void toParent(const PaddedPODArray &, PaddedPODArray &) const { }*/ - std::exception_ptr getLastException() const override { return storage.getLastException(); } template