mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
dbms: use PODArray with FlatDictionary [#METR-13298]
remove IDictionarySource::reset
This commit is contained in:
parent
070c6be60e
commit
95bb52b8e5
@ -3,6 +3,7 @@
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Dictionaries/DictionaryStructure.h>
|
||||
#include <DB/Dictionaries/IDictionarySource.h>
|
||||
#include <DB/Dictionaries/OwningBufferBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -19,9 +20,11 @@ public:
|
||||
private:
|
||||
BlockInputStreamPtr loadAll() override
|
||||
{
|
||||
in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
|
||||
return context.getFormatFactory().getInput(
|
||||
auto in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
|
||||
auto stream = context.getFormatFactory().getInput(
|
||||
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
|
||||
|
||||
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
|
||||
}
|
||||
|
||||
BlockInputStreamPtr loadId(const std::uint64_t id) override
|
||||
@ -40,17 +43,10 @@ private:
|
||||
};
|
||||
}
|
||||
|
||||
void reset() override
|
||||
{
|
||||
in_ptr.reset(nullptr);
|
||||
}
|
||||
|
||||
const std::string filename;
|
||||
const std::string format;
|
||||
Block sample_block;
|
||||
const Context & context;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFile> in_ptr;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -9,10 +9,9 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
const auto initial_array_size = 128;
|
||||
const auto initial_array_size = 1024;
|
||||
const auto max_array_size = 500000;
|
||||
|
||||
/// @todo manage arrays using std::vector or PODArray, start with an initial size, expand up to max_array_size
|
||||
class FlatDictionary final : public IDictionary
|
||||
{
|
||||
public:
|
||||
@ -20,16 +19,17 @@ public:
|
||||
const std::string & config_prefix, DictionarySourcePtr source_ptr)
|
||||
: source_ptr{std::move(source_ptr)}
|
||||
{
|
||||
attributes.reserve(dict_struct.attributes.size());
|
||||
for (const auto & attribute : dict_struct.attributes)
|
||||
const auto size = dict_struct.attributes.size();
|
||||
attributes.resize(size);
|
||||
for (const auto idx : ext::range(0, size))
|
||||
{
|
||||
attribute_index_by_name.emplace(attribute.name, attributes.size());
|
||||
attributes.emplace_back(
|
||||
createAttributeWithType(getAttributeTypeByName(attribute.type),
|
||||
const auto & attribute = dict_struct.attributes[idx];
|
||||
attribute_index_by_name.emplace(attribute.name, idx);
|
||||
attributes[idx] = std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
|
||||
attribute.null_value));
|
||||
|
||||
if (attribute.hierarchical)
|
||||
hierarchical_attribute = &attributes.back();
|
||||
hierarchical_attribute = &attributes[idx];
|
||||
}
|
||||
|
||||
auto stream = this->source_ptr->loadAll();
|
||||
@ -47,26 +47,22 @@ public:
|
||||
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
|
||||
}
|
||||
}
|
||||
|
||||
/// @todo wrap source_ptr so that it reset buffer automatically
|
||||
this->source_ptr->reset();
|
||||
}
|
||||
|
||||
id_t toParent(const id_t id) const override
|
||||
{
|
||||
const auto exists = id < max_array_size;
|
||||
const auto attr = hierarchical_attribute;
|
||||
|
||||
switch (hierarchical_attribute->type)
|
||||
{
|
||||
case attribute_type::uint8: return exists ? attr->uint8_array[id] : attr->uint8_null_value;
|
||||
case attribute_type::uint16: return exists ? attr->uint16_array[id] : attr->uint16_null_value;
|
||||
case attribute_type::uint32: return exists ? attr->uint32_array[id] : attr->uint32_null_value;
|
||||
case attribute_type::uint64: return exists ? attr->uint64_array[id] : attr->uint64_null_value;
|
||||
case attribute_type::int8: return exists ? attr->int8_array[id] : attr->int8_null_value;
|
||||
case attribute_type::int16: return exists ? attr->int16_array[id] : attr->int16_null_value;
|
||||
case attribute_type::int32: return exists ? attr->int32_array[id] : attr->int32_null_value;
|
||||
case attribute_type::int64: return exists ? attr->int64_array[id] : attr->int64_null_value;
|
||||
case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
|
||||
case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
|
||||
case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
|
||||
case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
|
||||
case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
|
||||
case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
|
||||
case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
|
||||
case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
|
||||
case attribute_type::float32:
|
||||
case attribute_type::float64:
|
||||
case attribute_type::string:
|
||||
@ -89,8 +85,8 @@ public:
|
||||
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
|
||||
ErrorCodes::TYPE_MISMATCH\
|
||||
};\
|
||||
if (id < max_array_size)\
|
||||
return attribute.LC_TYPE##_array[id];\
|
||||
if (id < attribute.LC_TYPE##_array->size())\
|
||||
return (*attribute.LC_TYPE##_array)[id];\
|
||||
return attribute.LC_TYPE##_null_value;\
|
||||
}
|
||||
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
|
||||
@ -140,7 +136,7 @@ public:
|
||||
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
|
||||
{\
|
||||
const auto & attribute = attributes[attribute_idx];\
|
||||
return id < max_array_size ? attribute.LC_NAME##_array[id] : attribute.LC_NAME##_null_value;\
|
||||
return id < attribute.LC_NAME##_array->size() ? (*attribute.LC_NAME##_array)[id] : attribute.LC_NAME##_null_value;\
|
||||
}
|
||||
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
|
||||
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
|
||||
@ -173,18 +169,18 @@ public:
|
||||
Float32 float32_null_value;
|
||||
Float64 float64_null_value;
|
||||
String string_null_value;
|
||||
std::unique_ptr<UInt8[]> uint8_array;
|
||||
std::unique_ptr<UInt16[]> uint16_array;
|
||||
std::unique_ptr<UInt32[]> uint32_array;
|
||||
std::unique_ptr<UInt64[]> uint64_array;
|
||||
std::unique_ptr<Int8[]> int8_array;
|
||||
std::unique_ptr<Int16[]> int16_array;
|
||||
std::unique_ptr<Int32[]> int32_array;
|
||||
std::unique_ptr<Int64[]> int64_array;
|
||||
std::unique_ptr<Float32[]> float32_array;
|
||||
std::unique_ptr<Float64[]> float64_array;
|
||||
std::unique_ptr<PODArray<UInt8>> uint8_array;
|
||||
std::unique_ptr<PODArray<UInt16>> uint16_array;
|
||||
std::unique_ptr<PODArray<UInt32>> uint32_array;
|
||||
std::unique_ptr<PODArray<UInt64>> uint64_array;
|
||||
std::unique_ptr<PODArray<Int8>> int8_array;
|
||||
std::unique_ptr<PODArray<Int16>> int16_array;
|
||||
std::unique_ptr<PODArray<Int32>> int32_array;
|
||||
std::unique_ptr<PODArray<Int64>> int64_array;
|
||||
std::unique_ptr<PODArray<Float32>> float32_array;
|
||||
std::unique_ptr<PODArray<Float64>> float64_array;
|
||||
std::unique_ptr<Arena> string_arena;
|
||||
std::vector<StringRef> string_array;
|
||||
std::unique_ptr<PODArray<StringRef>> string_array;
|
||||
};
|
||||
|
||||
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
|
||||
@ -195,60 +191,59 @@ public:
|
||||
{
|
||||
case attribute_type::uint8:
|
||||
attr.uint8_null_value = DB::parse<UInt8>(null_value);
|
||||
attr.uint8_array.reset(new UInt8[max_array_size]);
|
||||
std::fill(attr.uint8_array.get(), attr.uint8_array.get() + max_array_size, attr.uint8_null_value);
|
||||
attr.uint8_array.reset(new PODArray<UInt8>);
|
||||
attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value);
|
||||
break;
|
||||
case attribute_type::uint16:
|
||||
attr.uint16_null_value = DB::parse<UInt16>(null_value);
|
||||
attr.uint16_array.reset(new UInt16[max_array_size]);
|
||||
std::fill(attr.uint16_array.get(), attr.uint16_array.get() + max_array_size, attr.uint16_null_value);
|
||||
attr.uint16_array.reset(new PODArray<UInt16>);
|
||||
attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value);
|
||||
break;
|
||||
case attribute_type::uint32:
|
||||
attr.uint32_null_value = DB::parse<UInt32>(null_value);
|
||||
attr.uint32_array.reset(new UInt32[max_array_size]);
|
||||
std::fill(attr.uint32_array.get(), attr.uint32_array.get() + max_array_size, attr.uint32_null_value);
|
||||
attr.uint32_array.reset(new PODArray<UInt32>);
|
||||
attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value);
|
||||
break;
|
||||
case attribute_type::uint64:
|
||||
attr.uint64_null_value = DB::parse<UInt64>(null_value);
|
||||
attr.uint64_array.reset(new UInt64[max_array_size]);
|
||||
std::fill(attr.uint64_array.get(), attr.uint64_array.get() + max_array_size, attr.uint64_null_value);
|
||||
attr.uint64_array.reset(new PODArray<UInt64>);
|
||||
attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value);
|
||||
break;
|
||||
case attribute_type::int8:
|
||||
attr.int8_null_value = DB::parse<Int8>(null_value);
|
||||
attr.int8_array.reset(new Int8[max_array_size]);
|
||||
std::fill(attr.int8_array.get(), attr.int8_array.get() + max_array_size, attr.int8_null_value);
|
||||
attr.int8_array.reset(new PODArray<Int8>);
|
||||
attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value);
|
||||
break;
|
||||
case attribute_type::int16:
|
||||
attr.int16_null_value = DB::parse<Int16>(null_value);
|
||||
attr.int16_array.reset(new Int16[max_array_size]);
|
||||
std::fill(attr.int16_array.get(), attr.int16_array.get() + max_array_size, attr.int16_null_value);
|
||||
attr.int16_array.reset(new PODArray<Int16>);
|
||||
attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value);
|
||||
break;
|
||||
case attribute_type::int32:
|
||||
attr.int32_null_value = DB::parse<Int32>(null_value);
|
||||
attr.int32_array.reset(new Int32[max_array_size]);
|
||||
std::fill(attr.int32_array.get(), attr.int32_array.get() + max_array_size, attr.int32_null_value);
|
||||
attr.int32_array.reset(new PODArray<Int32>);
|
||||
attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value);
|
||||
break;
|
||||
case attribute_type::int64:
|
||||
attr.int64_null_value = DB::parse<Int64>(null_value);
|
||||
attr.int64_array.reset(new Int64[max_array_size]);
|
||||
std::fill(attr.int64_array.get(), attr.int64_array.get() + max_array_size, attr.int64_null_value);
|
||||
attr.int64_array.reset(new PODArray<Int64>);
|
||||
attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value);
|
||||
break;
|
||||
case attribute_type::float32:
|
||||
attr.float32_null_value = DB::parse<Float32>(null_value);
|
||||
attr.float32_array.reset(new Float32[max_array_size]);
|
||||
std::fill(attr.float32_array.get(), attr.float32_array.get() + max_array_size, attr.float32_null_value);
|
||||
attr.float32_array.reset(new PODArray<Float32>);
|
||||
attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value);
|
||||
break;
|
||||
case attribute_type::float64:
|
||||
attr.float64_null_value = DB::parse<Float64>(null_value);
|
||||
attr.float64_array.reset(new Float64[max_array_size]);
|
||||
std::fill(attr.float64_array.get(), attr.float64_array.get() + max_array_size, attr.float64_null_value);
|
||||
attr.float64_array.reset(new PODArray<Float64>);
|
||||
attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value);
|
||||
break;
|
||||
case attribute_type::string:
|
||||
attr.string_null_value = null_value;
|
||||
attr.string_arena.reset(new Arena);
|
||||
attr.string_array.resize(initial_array_size, StringRef{
|
||||
attr.string_null_value.data(), attr.string_null_value.size()
|
||||
});
|
||||
attr.string_array.reset(new PODArray<StringRef>);
|
||||
attr.string_array->resize_fill(initial_array_size, attr.string_null_value);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -265,33 +260,86 @@ public:
|
||||
|
||||
switch (attribute.type)
|
||||
{
|
||||
case attribute_type::uint8: attribute.uint8_array[id] = value.get<UInt64>(); break;
|
||||
case attribute_type::uint16: attribute.uint16_array[id] = value.get<UInt64>(); break;
|
||||
case attribute_type::uint32: attribute.uint32_array[id] = value.get<UInt64>(); break;
|
||||
case attribute_type::uint64: attribute.uint64_array[id] = value.get<UInt64>(); break;
|
||||
case attribute_type::int8: attribute.int8_array[id] = value.get<Int64>(); break;
|
||||
case attribute_type::int16: attribute.int16_array[id] = value.get<Int64>(); break;
|
||||
case attribute_type::int32: attribute.int32_array[id] = value.get<Int64>(); break;
|
||||
case attribute_type::int64: attribute.int64_array[id] = value.get<Int64>(); break;
|
||||
case attribute_type::float32: attribute.float32_array[id] = value.get<Float64>(); break;
|
||||
case attribute_type::float64: attribute.float64_array[id] = value.get<Float64>(); break;
|
||||
case attribute_type::string:
|
||||
case attribute_type::uint8:
|
||||
{
|
||||
const auto & string = value.get<String>();
|
||||
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
|
||||
|
||||
const auto current_size = attribute.string_array.size();
|
||||
if (id >= current_size)
|
||||
attribute.string_array.resize(
|
||||
std::min<std::size_t>(max_array_size, 2 * current_size > id ? 2 * current_size : 2 * id),
|
||||
StringRef{
|
||||
attribute.string_null_value.data(), attribute.string_null_value.size()
|
||||
});
|
||||
|
||||
attribute.string_array[id] = StringRef{string_in_arena, string.size()};
|
||||
if (id >= attribute.uint8_array->size())
|
||||
attribute.uint8_array->resize_fill(id, attribute.uint8_null_value);
|
||||
(*attribute.uint8_array)[id] = value.get<UInt64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::uint16:
|
||||
{
|
||||
if (id >= attribute.uint16_array->size())
|
||||
attribute.uint16_array->resize_fill(id, attribute.uint16_null_value);
|
||||
(*attribute.uint16_array)[id] = value.get<UInt64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::uint32:
|
||||
{
|
||||
if (id >= attribute.uint32_array->size())
|
||||
attribute.uint32_array->resize_fill(id, attribute.uint32_null_value);
|
||||
(*attribute.uint32_array)[id] = value.get<UInt64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::uint64:
|
||||
{
|
||||
if (id >= attribute.uint64_array->size())
|
||||
attribute.uint64_array->resize_fill(id, attribute.uint64_null_value);
|
||||
(*attribute.uint64_array)[id] = value.get<UInt64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::int8:
|
||||
{
|
||||
if (id >= attribute.int8_array->size())
|
||||
attribute.int8_array->resize_fill(id, attribute.int8_null_value);
|
||||
(*attribute.int8_array)[id] = value.get<Int64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::int16:
|
||||
{
|
||||
if (id >= attribute.int16_array->size())
|
||||
attribute.int16_array->resize_fill(id, attribute.int16_null_value);
|
||||
(*attribute.int16_array)[id] = value.get<Int64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::int32:
|
||||
{
|
||||
if (id >= attribute.int32_array->size())
|
||||
attribute.int32_array->resize_fill(id, attribute.int32_null_value);
|
||||
(*attribute.int32_array)[id] = value.get<Int64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::int64:
|
||||
{
|
||||
if (id >= attribute.int64_array->size())
|
||||
attribute.int64_array->resize_fill(id, attribute.int64_null_value);
|
||||
(*attribute.int64_array)[id] = value.get<Int64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::float32:
|
||||
{
|
||||
if (id >= attribute.float32_array->size())
|
||||
attribute.float32_array->resize_fill(id, attribute.float32_null_value);
|
||||
(*attribute.float32_array)[id] = value.get<Float64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::float64:
|
||||
{
|
||||
if (id >= attribute.float64_array->size())
|
||||
attribute.float64_array->resize_fill(id, attribute.float64_null_value);
|
||||
(*attribute.float64_array)[id] = value.get<Float64>();
|
||||
break;
|
||||
}
|
||||
case attribute_type::string:
|
||||
{
|
||||
if (id >= attribute.string_array->size())
|
||||
attribute.string_array->resize_fill(id, attribute.string_null_value);
|
||||
const auto & string = value.get<String>();
|
||||
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());
|
||||
(*attribute.string_array)[id] = StringRef{string_in_arena, string.size()};
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
std::map<std::string, std::size_t> attribute_index_by_name;
|
||||
|
@ -13,8 +13,6 @@ public:
|
||||
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
|
||||
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;
|
||||
|
||||
virtual void reset() {}
|
||||
|
||||
virtual ~IDictionarySource() = default;
|
||||
};
|
||||
|
||||
|
32
dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h
Normal file
32
dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/IO/ReadBuffer.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class OwningBufferBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
OwningBufferBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<ReadBuffer> buffer)
|
||||
: stream{stream}, buffer{std::move(buffer)}
|
||||
{
|
||||
children.push_back(stream);
|
||||
}
|
||||
|
||||
private:
|
||||
Block readImpl() override { return stream->read(); }
|
||||
|
||||
String getName() const override { return "OwningBufferBlockInputStream"; }
|
||||
|
||||
String getID() const override {
|
||||
return "OwningBuffer(" + stream->getID() + ")";
|
||||
}
|
||||
|
||||
BlockInputStreamPtr stream;
|
||||
std::unique_ptr<ReadBuffer> buffer;
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user