strings basic

This commit is contained in:
Nikita Vasilev 2020-01-26 20:35:39 +03:00
parent 94b41450e5
commit bbcba746b2
5 changed files with 184 additions and 88 deletions

View File

@ -175,7 +175,7 @@ CachePartition::CachePartition(
, attributes_structure(attributes_structure_)
{
keys_buffer.type = AttributeUnderlyingType::utUInt64;
keys_buffer.values = PaddedPODArray<UInt64>();
keys_buffer.values = CachePartition::Attribute::Container<UInt64>();
Poco::File directory(dir_path);
if (!directory.exists())
@ -278,9 +278,24 @@ size_t CachePartition::appendBlock(
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
case AttributeUnderlyingType::utString:
/*{
LOG_DEBUG(&Poco::Logger::get("kek"), "string write");
const auto & value = std::get<Attribute::Container<String>>(attribute.values)[index];
if (sizeof(UInt64) + value.size() > write_buffer->available())
{
write_buffer.reset();
if (++current_memory_block_id == write_buffer_size)
flush();
flushed = true;
continue;
}
else
{
writeStringBinary(value, *write_buffer);
}
}*/
break;
}
}
@ -386,8 +401,8 @@ void CachePartition::flush()
template <typename Out>
void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const
ResultArrayType<Out> & out, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const
{
std::shared_lock lock(rw_lock);
PaddedPODArray<Index> indices(ids.size());
@ -409,13 +424,17 @@ void CachePartition::getValue(const size_t attribute_index, const PaddedPODArray
}
}
getValueFromMemory<Out>(attribute_index, indices, out);
getValueFromStorage<Out>(attribute_index, indices, out);
auto set_value = [&](const size_t index, ReadBuffer & buf)
{
readValueFromBuffer(attribute_index, out, index, buf);
};
getValueFromMemory(indices, set_value);
getValueFromStorage(indices, set_value);
}
template <typename Out>
void CachePartition::getValueFromMemory(
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
template <typename SetFunc>
void CachePartition::getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc set) const
{
for (size_t i = 0; i < indices.size(); ++i)
{
@ -425,14 +444,13 @@ void CachePartition::getValueFromMemory(
const size_t offset = index.getBlockId() * block_size + index.getAddressInBlock();
ReadBufferFromMemory read_buffer(memory->data() + offset, block_size * write_buffer_size - offset);
readValueFromBuffer(attribute_index, out[i], read_buffer);
set(i, read_buffer);
}
}
}
template <typename Out>
void CachePartition::getValueFromStorage(
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const
template <typename SetFunc>
void CachePartition::getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc set) const
{
std::vector<std::pair<Index, size_t>> index_to_out;
for (size_t i = 0; i < indices.size(); ++i)
@ -520,7 +538,7 @@ void CachePartition::getValueFromStorage(
ReadBufferFromMemory buf(
reinterpret_cast<char *>(request.aio_buf) + file_index.getAddressInBlock(),
block_size - file_index.getAddressInBlock());
readValueFromBuffer(attribute_index, out[out_index], buf);
set(i, buf);
}
processed[request_id] = true;
@ -543,18 +561,18 @@ void CachePartition::getValueFromStorage(
}
template <typename Out>
void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst, ReadBuffer & buf) const
void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst, const size_t index, ReadBuffer & buf) const
{
for (size_t i = 0; i < attribute_index; ++i)
{
switch (attributes_structure[i])
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
buf.ignore(sizeof(TYPE)); \
} \
break;
case AttributeUnderlyingType::ut##TYPE: \
{ \
buf.ignore(sizeof(TYPE)); \
} \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
@ -572,39 +590,26 @@ void CachePartition::readValueFromBuffer(const size_t attribute_index, Out & dst
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
case AttributeUnderlyingType::utString:
/*{
size_t size = 0;
readVarUInt(size, buf);
buf.ignore(size);
}*/
break;
}
}
switch (attributes_structure[attribute_index])
//if constexpr (!std::is_same_v<ColumnString, Out>)
readBinary(dst[index], buf);
/*else
{
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
readBinary(dst, buf); \
break;
DISPATCH(UInt8)
DISPATCH(UInt16)
DISPATCH(UInt32)
DISPATCH(UInt64)
DISPATCH(UInt128)
DISPATCH(Int8)
DISPATCH(Int16)
DISPATCH(Int32)
DISPATCH(Int64)
DISPATCH(Decimal32)
DISPATCH(Decimal64)
DISPATCH(Decimal128)
DISPATCH(Float32)
DISPATCH(Float64)
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
break;
}
LOG_DEBUG(&Poco::Logger::get("kek"), "string READ");
UNUSED(index);
size_t size = 0;
readVarUInt(size, buf);
dst.insertData(buf.position(), size);
}*/
}
void CachePartition::has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out, std::chrono::system_clock::time_point now) const
@ -820,7 +825,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
/// Check which ids have not been found and require setting null_value
CachePartition::Attribute new_keys;
new_keys.type = AttributeUnderlyingType::utUInt64;
new_keys.values = PaddedPODArray<UInt64>();
new_keys.values = CachePartition::Attribute::Container<UInt64>();
CachePartition::Attributes new_attributes;
{
/// TODO: create attributes from structure
@ -832,7 +837,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
case AttributeUnderlyingType::ut##TYPE: \
new_attributes.emplace_back(); \
new_attributes.back().type = attribute_type; \
new_attributes.back().values = PaddedPODArray<TYPE>(); \
new_attributes.back().values = CachePartition::Attribute::Container<TYPE>(); \
break;
DISPATCH(UInt8)
@ -852,7 +857,11 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
/*{
new_attributes.emplace_back();
new_attributes.back().type = attribute_type;
new_attributes.back().values = CachePartition::Attribute::Container<String>();
}*/
break;
}
}
@ -880,7 +889,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
}
// Set key
std::get<PaddedPODArray<UInt64>>(new_keys.values).push_back(id);
std::get<CachePartition::Attribute::Container<UInt64>>(new_keys.values).push_back(id);
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
metadata.emplace_back();
@ -897,7 +906,7 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
auto & to_values = std::get<PaddedPODArray<TYPE>>(new_attributes[i].values); \
auto & to_values = std::get<CachePartition::Attribute::Container<TYPE>>(new_attributes[i].values); \
auto & null_value = std::get<TYPE>(null_values[i]); \
to_values.push_back(null_value); \
} \
@ -920,7 +929,11 @@ void CacheStorage::update(DictionarySourcePtr & source_ptr, const std::vector<Ke
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
/*{
auto & to_values = std::get<CachePartition::Attribute::Container<String>>(new_attributes[i].values);
auto & null_value = std::get<String>(null_values[i]);
to_values.push_back(null_value);
}*/
break;
}
}
@ -1004,7 +1017,7 @@ CachePartition::Attributes CacheStorage::createAttributesFromBlock(
#define DISPATCH(TYPE) \
case AttributeUnderlyingType::ut##TYPE: \
{ \
PaddedPODArray<TYPE> values(column->size()); \
CachePartition::Attribute::Container<TYPE> values(column->size()); \
const auto raw_data = column->getRawData(); \
memcpy(&values[0], raw_data.data, raw_data.size * sizeof(TYPE)); \
attributes.emplace_back(); \
@ -1030,7 +1043,18 @@ CachePartition::Attributes CacheStorage::createAttributesFromBlock(
#undef DISPATCH
case AttributeUnderlyingType::utString:
// TODO: string support
/*{
attributes.emplace_back();
CachePartition::Attribute::Container<String> values(column->size());
for (size_t j = 0; j < column->size(); ++j)
{
const auto ref = column->getDataAt(j);
values[j].resize(ref.size);
memcpy(values[j].data(), ref.data, ref.size);
}
attributes.back().type = structure[i];
attributes.back().values = std::move(values);
}*/
break;
}
}
@ -1180,7 +1204,7 @@ void SSDCacheDictionary::getItemsNumberImpl(
required_ids,
[&](const auto id, const auto row, const auto & new_attributes) {
for (const size_t out_row : not_found_ids[id])
out[out_row] = std::get<PaddedPODArray<OutputType>>(new_attributes[attribute_index].values)[row];
out[out_row] = std::get<CachePartition::Attribute::Container<OutputType>>(new_attributes[attribute_index].values)[row];
},
[&](const size_t id)
{
@ -1198,7 +1222,7 @@ void SSDCacheDictionary::getString(const std::string & attribute_name, const Pad
const auto null_value = StringRef{std::get<String>(null_values[index])};
getItemsString(index, ids, out, [&](const size_t) { return null_value; });
getItemsStringImpl(index, ids, out, [&](const size_t) { return null_value; });
}
void SSDCacheDictionary::getString(
@ -1207,7 +1231,7 @@ void SSDCacheDictionary::getString(
const auto index = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
getItemsString(index, ids, out, [&](const size_t row) { return def->getDataAt(row); });
getItemsStringImpl(index, ids, out, [&](const size_t row) { return def->getDataAt(row); });
}
void SSDCacheDictionary::getString(
@ -1216,17 +1240,78 @@ void SSDCacheDictionary::getString(
const auto index = getAttributeIndex(attribute_name);
checkAttributeType(name, attribute_name, dict_struct.attributes[index].underlying_type, AttributeUnderlyingType::utString);
getItemsString(index, ids, out, [&](const size_t) { return StringRef{def}; });
getItemsStringImpl(index, ids, out, [&](const size_t) { return StringRef{def}; });
}
template <typename DefaultGetter>
void SSDCacheDictionary::getItemsString(const size_t attribute_index, const PaddedPODArray<Key> & ids,
void SSDCacheDictionary::getItemsStringImpl(const size_t attribute_index, const PaddedPODArray<Key> & ids,
ColumnString * out, DefaultGetter && get_default) const
{
UNUSED(attribute_index);
UNUSED(ids);
UNUSED(out);
const auto now = std::chrono::system_clock::now();
UNUSED(now);
UNUSED(get_default);
return;
/*
std::unordered_map<Key, std::vector<size_t>> not_found_ids;
auto from_cache = ColumnString::create();
//storage.template getValue<String>(attribute_index, ids, *from_cache, not_found_ids, now);
if (not_found_ids.empty())
{
out->getChars().resize(from_cache->getChars().size());
memcpy(out->getChars().data(), from_cache->getChars().data(), from_cache->getChars().size() * sizeof(from_cache->getChars()[0]));
out->getOffsets().resize(from_cache->getOffsets().size());
memcpy(out->getOffsets().data(), from_cache->getOffsets().data(), from_cache->getOffsets().size() * sizeof(from_cache->getOffsets()[0]));
return;
}
std::vector<Key> required_ids(not_found_ids.size());
std::transform(std::begin(not_found_ids), std::end(not_found_ids), std::begin(required_ids), [](const auto & pair) { return pair.first; });
std::unordered_map<Key, String> update_result;
storage.update(
source_ptr,
required_ids,
[&](const auto id, const auto row, const auto & new_attributes)
{
update_result[id] = std::get<CachePartition::Attribute::Container<String>>(new_attributes[attribute_index].values)[row];
},
[&](const size_t) {},
getLifetime(),
null_values);
LOG_DEBUG(&Poco::Logger::get("log"), "fill data");
size_t from_cache_counter = 0;
for (size_t row = 0; row < ids.size(); ++row)
{
const auto & id = ids[row];
auto it = not_found_ids.find(id);
if (it == std::end(not_found_ids))
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill found " << row << " " << id);
out->insertFrom(*from_cache, from_cache_counter++);
}
else
{
auto it_update = update_result.find(id);
if (it_update != std::end(update_result))
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill update " << row << " " << id);
out->insertData(it_update->second.data(), it_update->second.size());
}
else
{
LOG_DEBUG(&Poco::Logger::get("log"), "fill default " << row << " " << id);
auto to_insert = get_default(row);
out->insertData(to_insert.data, to_insert.size);
}
}
}*/
}
void SSDCacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
@ -1284,8 +1369,6 @@ AttributeValueVariant SSDCacheDictionary::createAttributeNullValueWithTypeImpl<S
{
AttributeValueVariant var_null_value = null_value.get<String>();
bytes_allocated += sizeof(StringRef);
//if (!string_arena)
// string_arena = std::make_unique<ArenaWithFreeLists>();
return var_null_value;
}

View File

@ -102,14 +102,12 @@ public:
ResultArrayType<Out> & out, std::vector<bool> & found,
std::chrono::system_clock::time_point now) const;
// TODO:: getString
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out, std::chrono::system_clock::time_point now) const;
struct Attribute
{
template <typename T>
using Container = PaddedPODArray<T>;
using Container = std::vector<T>;
AttributeUnderlyingType type;
std::variant<
@ -134,6 +132,8 @@ public:
size_t appendBlock(const Attribute & new_keys, const Attributes & new_attributes,
const PaddedPODArray<Metadata> & metadata, const size_t begin);
//size_t appendDefaults();
void flush();
void remove();
@ -147,16 +147,14 @@ public:
size_t getElementCount() const;
private:
template <typename Out>
void getValueFromMemory(
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const;
template <typename SetFunc>
void getValueFromMemory(const PaddedPODArray<Index> & indices, SetFunc set) const;
template <typename SetFunc>
void getValueFromStorage(const PaddedPODArray<Index> & indices, SetFunc set) const;
template <typename Out>
void getValueFromStorage(
const size_t attribute_index, const PaddedPODArray<Index> & indices, ResultArrayType<Out> & out) const;
template <typename Out>
void readValueFromBuffer(const size_t attribute_index, Out & dst, ReadBuffer & buf) const;
void readValueFromBuffer(const size_t attribute_index, Out & dst, const size_t index, ReadBuffer & buf) const;
const size_t file_id;
const size_t max_size;
@ -211,15 +209,13 @@ public:
~CacheStorage();
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
using ResultArrayType = CachePartition::ResultArrayType<T>;
template <typename Out>
void getValue(const size_t attribute_index, const PaddedPODArray<UInt64> & ids,
ResultArrayType<Out> & out, std::unordered_map<Key, std::vector<size_t>> & not_found,
std::chrono::system_clock::time_point now) const;
// getString();
void has(const PaddedPODArray<UInt64> & ids, ResultArrayType<UInt8> & out,
std::unordered_map<Key, std::vector<size_t>> & not_found, std::chrono::system_clock::time_point now) const;
@ -230,8 +226,6 @@ public:
PaddedPODArray<Key> getCachedIds() const;
//BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const;
std::exception_ptr getLastException() const { return last_update_exception; }
const std::string & getPath() const { return path; }
@ -339,7 +333,7 @@ public:
std::exception_ptr getLastException() const override { return storage.getLastException(); }
template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
using ResultArrayType = CacheStorage::ResultArrayType<T>;
#define DECLARE(TYPE) \
void get##TYPE(const std::string & attribute_name, const PaddedPODArray<Key> & ids, ResultArrayType<TYPE> & out) const;
@ -423,7 +417,7 @@ private:
void getItemsNumberImpl(
const size_t attribute_index, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const;
template <typename DefaultGetter>
void getItemsString(const size_t attribute_index, const PaddedPODArray<Key> & ids,
void getItemsStringImpl(const size_t attribute_index, const PaddedPODArray<Key> & ids,
ColumnString * out, DefaultGetter && get_default) const;
const std::string name;

View File

@ -313,6 +313,7 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
//!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr) &&
@ -499,6 +500,7 @@ private:
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr) &&
//!executeDispatch<SSDCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyHashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<ComplexKeyCacheDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatchComplex<TrieDictionary>(block, arguments, result, dict_ptr))

View File

@ -3,14 +3,19 @@ TEST_SMALL
-1
6
0
5 6 7
1 100 -100
database
none
1 100 -100 clickhouse
2 3 4 database
5 6 7 columns
UPDATE DICTIONARY
118
VALUE FROM DISK
-100
clickhouse
VALUE FROM RAM BUFFER
8
VALUES FROM DISK AND RAM BUFFER
118
HAS

View File

@ -11,10 +11,12 @@ CREATE TABLE database_for_dict.table_for_dict
id UInt64,
a UInt64,
b Int32
--c String
)
ENGINE = MergeTree()
ORDER BY id;
--INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, -100, 'clickhouse'), (2, 3, 4, 'database'), (5, 6, 7, 'columns'), (10, 9, 8, '');
INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, -100), (2, 3, 4), (5, 6, 7), (10, 9, 8);
DROP DICTIONARY IF EXISTS database_for_dict.ssd_dict;
@ -24,19 +26,22 @@ CREATE DICTIONARY database_for_dict.ssd_dict
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1
--c String DEFAULT 'none'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD(PARTITION_SIZE 8192 PATH '/mnt/disk4/clickhouse_dicts/1d'));
LAYOUT(SSD(PARTITION_SIZE 8192 PATH '/mnt/disk4/clickhouse_dicts/0d'));
SELECT 'TEST_SMALL';
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1));
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(4));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(5));
SELECT dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(6));
--SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(2));
--SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(3));
SELECT * FROM database_for_dict.ssd_dict;
SELECT * FROM database_for_dict.ssd_dict ORDER BY id;
DROP DICTIONARY database_for_dict.ssd_dict;
DROP TABLE IF EXISTS database_for_dict.keys_table;
@ -62,6 +67,7 @@ CREATE DICTIONARY database_for_dict.ssd_dict
id UInt64,
a UInt64 DEFAULT 0,
b Int32 DEFAULT -1
--c String DEFAULT 'none'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
@ -76,10 +82,16 @@ SELECT 'VALUE FROM DISK';
-- -100
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(1));
-- 'clickhouse'
--SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(1));
SELECT 'VALUE FROM RAM BUFFER';
-- 8
SELECT dictGetInt32('database_for_dict.ssd_dict', 'b', toUInt64(10));
-- ''
--SELECT dictGetString('database_for_dict.ssd_dict', 'c', toUInt64(10));
SELECT 'VALUES FROM DISK AND RAM BUFFER';
-- 118
SELECT sum(dictGetUInt64('database_for_dict.ssd_dict', 'a', toUInt64(id))) FROM database_for_dict.keys_table;