select * from ssd_dictionary

This commit is contained in:
Nikita Vasilev 2020-01-18 14:47:58 +03:00
parent 3ccf10f7b2
commit 1ee46c0690
4 changed files with 73 additions and 13 deletions

View File

@ -8,6 +8,7 @@
#include <Common/MemorySanitizer.h>
#include <DataStreams/IBlockInputStream.h>
#include <Poco/File.h>
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
#include <IO/AIO.h>
#include <IO/ReadHelpers.h>
@ -382,19 +383,16 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
{
if (found[i])
{
Poco::Logger::get("kek").information("FOUND BEFORE:: Key :" + std::to_string(ids[i]) + " i: " + std::to_string(i));
indices[i].setNotExists();
}
else if (auto it = key_to_index_and_metadata.find(ids[i]);
it != std::end(key_to_index_and_metadata) && it->second.metadata.expiresAt() > now)
{
Poco::Logger::get("kek").information(std::to_string(file_id) + " FOUND BEFORE: inmemory: " + std::to_string(it->second.index.inMemory()) + " " + std::to_string(it->second.index.getBlockId()) + " " + std::to_string(it->second.index.getAddressInBlock()));
indices[i] = it->second.index;
found[i] = true;
}
else
{
Poco::Logger::get("kek").information("NF:: Key :" + std::to_string(ids[i]) + " i: " + std::to_string(i));
indices[i].setNotExists();
}
}
@ -499,7 +497,9 @@ void CachePartition::getValueFromStorage(
const auto request_id = events[i].data;
const auto & request = requests[request_id];
if (events[i].res != static_cast<ssize_t>(request.aio_nbytes))
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". returned: " + std::to_string(events[i].res), ErrorCodes::AIO_WRITE_ERROR);
throw Exception("AIO failed to read file " + path + BIN_FILE_EXT + ". " +
"request_id= " + std::to_string(request.aio_data) + ", aio_nbytes=" + std::to_string(request.aio_nbytes) + ", aio_offset=" + std::to_string(request.aio_offset) +
"returned: " + std::to_string(events[i].res), ErrorCodes::AIO_WRITE_ERROR);
for (const size_t idx : blocks_to_indices[request_id])
{
const auto & [file_index, out_index] = index_to_out[idx];
@ -602,7 +602,6 @@ void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UIn
if (it == std::end(key_to_index_and_metadata) || it->second.metadata.expiresAt() <= now)
{
Poco::Logger::get("kek").information("NF:: Key :" + std::to_string(ids[i]) + " i: " + std::to_string(i));
out[i] = HAS_NOT_FOUND;
}
else
@ -617,6 +616,17 @@ size_t CachePartition::getId() const
return file_id;
}
PaddedPODArray<CachePartition::Key> CachePartition::getCachedIds(const std::chrono::system_clock::time_point now) const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
PaddedPODArray<Key> array;
for (const auto & [key, index_and_metadata] : key_to_index_and_metadata)
if (!index_and_metadata.metadata.isDefault() && index_and_metadata.metadata.expiresAt() > now)
array.push_back(key);
return array;
}
void CachePartition::remove()
{
std::unique_lock lock(rw_lock);
@ -894,6 +904,22 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
PaddedPODArray<CachePartition::Key> CacheStorage::getCachedIds() const
{
PaddedPODArray<Key> array;
const auto now = std::chrono::system_clock::now();
std::shared_lock lock(rw_lock);
for (auto & partition : partitions)
{
const auto cached_in_partition = partition->getCachedIds(now);
array.insert(std::begin(cached_in_partition), std::end(cached_in_partition));
}
return array;
}
void CacheStorage::collectGarbage()
{
// add partitions to queue
@ -1175,6 +1201,12 @@ void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UIn
null_values);
}
BlockInputStreamPtr SSDCacheDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
{
using BlockInputStreamType = DictionaryBlockInputStream<SSDCacheDictionary, Key>;
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, storage.getCachedIds(), column_names);
}
size_t SSDCacheDictionary::getAttributeIndex(const std::string & attr_name) const
{
auto it = attribute_index_by_name.find(attr_name);

View File

@ -140,6 +140,8 @@ public:
size_t getId() const;
PaddedPODArray<Key> getCachedIds(const std::chrono::system_clock::time_point now) const;
private:
template <typename Out>
void getValueFromMemory(
@ -220,6 +222,8 @@ public:
PresentIdHandler && on_updated, AbsentIdHandler && on_id_not_found,
const DictionaryLifetime lifetime, const std::vector<AttributeValueVariant> & null_values);
PaddedPODArray<Key> getCachedIds() const;
//BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const;
std::exception_ptr getLastException() const { return last_update_exception; }
@ -388,12 +392,7 @@ public:
void has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override // TODO
{
UNUSED(column_names);
UNUSED(max_block_size);
return nullptr;
}
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
size_t getAttributeIndex(const std::string & attr_name) const;

View File

@ -1,3 +1,10 @@
TEST_SMALL
-100
-1
6
0
5 6 7
1 100 -100
UPDATE DICTIONARY
118
VALUE FROM DISK

View File

@ -17,6 +17,28 @@ ORDER BY id;
INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, -100), (2, 3, 4), (5, 6, 7), (10, 9, 8);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
CREATE DICTIONARY database_for_dict.ssd_dict
(
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD(PARTITION_SIZE 8192 PATH '/mnt/disk4/clickhouse_dicts/1d'));
SELECT 'TEST_SMALL';
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(4));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(5));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(6));
SELECT * FROM database_for_dict.ssd_dict;
DROP DICTIONARY database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;
CREATE TABLE database_for_dict.keys_table
@ -44,7 +66,7 @@ CREATE DICTIONARY database_for_dict.ssd_dict
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD(MAX_PARTITION_SIZE 1000 PATH '/mnt/disk4/clickhouse_dicts/1'));
LAYOUT(SSD(PARTITION_SIZE 8192 PATH '/mnt/disk4/clickhouse_dicts/1d'));
SELECT 'UPDATE DICTIONARY';
-- 118
@ -103,7 +125,7 @@ CREATE DICTIONARY database_for_dict.ssd_dict
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD(MAX_PARTITION_SIZE 1000 PATH '/mnt/disk4/clickhouse_dicts/2'));
LAYOUT(SSD(PARTITION_SIZE 8192 PATH '/mnt/disk4/clickhouse_dicts/2d'));
SELECT 'UPDATE DICTIONARY (MT)';
-- 118