This commit is contained in:
Alexey Arno 2015-02-27 00:38:33 +03:00
commit c5b61652ff
15 changed files with 386 additions and 260 deletions

View File

@ -177,6 +177,17 @@ public:
PODArray(const_iterator from_begin, const_iterator from_end) : use_libc_realloc(false) { alloc(from_end - from_begin); insert(from_begin, from_end); }
~PODArray() { dealloc(); }
PODArray(PODArray && other) { *this = std::move(other); }
PODArray & operator=(PODArray && other)
{
std::swap(c_start, other.c_start);
std::swap(c_end, other.c_end);
std::swap(c_end_of_storage, other.c_end_of_storage);
std::swap(use_libc_realloc, other.use_libc_realloc);
return *this;
}
size_t size() const { return t_end() - t_start(); }
bool empty() const { return t_end() == t_start(); }

View File

@ -24,6 +24,8 @@ struct StringRef
StringRef() = default;
std::string toString() const { return std::string(data, size); }
operator std::string() const { return toString(); }
};
typedef std::vector<StringRef> StringRefs;

View File

@ -3,18 +3,19 @@
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Columns/ColumnString.h>
#include <statdaemons/ext/scope_guard.hpp>
#include <Poco/RWLock.h>
#include <cmath>
#include <chrono>
#include <vector>
#include <map>
#include <tuple>
namespace DB
{
constexpr std::chrono::milliseconds spinlock_wait_time{10};
class CacheDictionary final : public IDictionary
{
public:
@ -24,7 +25,7 @@ public:
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
size{round_up_to_power_of_two(size)},
cells(this->size, cell{dict_struct.attributes.size()})
cells{this->size}
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{
@ -55,8 +56,8 @@ public:
id_t toParent(const id_t id) const override { return 0; }
#define DECLARE_INDIVIDUAL_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
#define DECLARE_INDIVIDUAL_GETTER(TYPE, LC_TYPE) \
TYPE get##TYPE(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
@ -66,20 +67,38 @@ public:
ErrorCodes::TYPE_MISMATCH\
};\
\
return getItem<TYPE>(getAttributeIndex(attribute_name), id);\
PODArray<UInt64> ids{1, id};\
PODArray<TYPE> out{1};\
getItems<TYPE>(idx, ids, out);\
return out.front();\
}
DECLARE_INDIVIDUAL_GETTER(UInt8, UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, Float64, float64)
DECLARE_INDIVIDUAL_GETTER(StringRef, String, string)
DECLARE_INDIVIDUAL_GETTER(UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, float64)
#undef DECLARE_INDIVIDUAL_GETTER
String getString(const std::string & attribute_name, const id_t id) const override
{
const auto idx = getAttributeIndex(attribute_name);
const auto & attribute = attributes[idx];
if (attribute.type != AttributeType::string)
throw Exception{
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),
ErrorCodes::TYPE_MISMATCH
};
PODArray<UInt64> ids{1, id};
ColumnString out;
getItems(idx, ids, &out);
return out.getDataAt(0);
}
#define DECLARE_MULTIPLE_GETTER(TYPE, LC_TYPE)\
void get##TYPE(const std::string & attribute_name, const PODArray<id_t> & ids, PODArray<TYPE> & out) const override\
@ -92,8 +111,7 @@ public:
ErrorCodes::TYPE_MISMATCH\
};\
\
for (const auto i : ext::range(0, ids.size()))\
out[i] = getItem<TYPE>(idx, ids[i]);\
getItems<TYPE>(idx, ids, out);\
}
DECLARE_MULTIPLE_GETTER(UInt8, uint8)
DECLARE_MULTIPLE_GETTER(UInt16, uint16)
@ -116,28 +134,34 @@ public:
ErrorCodes::TYPE_MISMATCH
};
for (const auto i : ext::range(0, ids.size()))
{
const auto string_ref = getItem<StringRef>(idx, ids[i]);
out->insertData(string_ref.data, string_ref.size);
}
getItems(idx, ids, out);
}
private:
struct attribute_t
struct cell_metadata_t final
{
std::uint64_t id;
std::chrono::system_clock::time_point expires_at;
};
struct attribute_t final
{
AttributeType type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
UInt64 uint64_null_value;
Int8 int8_null_value;
Int16 int16_null_value;
Int32 int32_null_value;
Int64 int64_null_value;
Float32 float32_null_value;
Float64 float64_null_value;
String string_null_value;
std::tuple<UInt8, UInt16, UInt32, UInt64,
Int8, Int16, Int32, Int64,
Float32, Float64,
String> null_values;
std::tuple<std::unique_ptr<UInt8[]>,
std::unique_ptr<UInt16[]>,
std::unique_ptr<UInt32[]>,
std::unique_ptr<UInt64[]>,
std::unique_ptr<Int8[]>,
std::unique_ptr<Int16[]>,
std::unique_ptr<Int32[]>,
std::unique_ptr<Int64[]>,
std::unique_ptr<Float32[]>,
std::unique_ptr<Float64[]>,
std::unique_ptr<StringRef[]>> arrays;
};
void createAttributes()
@ -148,8 +172,8 @@ private:
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
attributes.push_back(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();
@ -163,166 +187,288 @@ private:
switch (type)
{
case AttributeType::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
std::get<UInt8>(attr.null_values) = DB::parse<UInt8>(null_value);
std::get<std::unique_ptr<UInt8[]>>(attr.arrays) = std::make_unique<UInt8[]>(size);
break;
case AttributeType::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
std::get<UInt16>(attr.null_values) = DB::parse<UInt16>(null_value);
std::get<std::unique_ptr<UInt16[]>>(attr.arrays) = std::make_unique<UInt16[]>(size);
break;
case AttributeType::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
std::get<UInt32>(attr.null_values) = DB::parse<UInt32>(null_value);
std::get<std::unique_ptr<UInt32[]>>(attr.arrays) = std::make_unique<UInt32[]>(size);
break;
case AttributeType::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
std::get<UInt64>(attr.null_values) = DB::parse<UInt64>(null_value);
std::get<std::unique_ptr<UInt64[]>>(attr.arrays) = std::make_unique<UInt64[]>(size);
break;
case AttributeType::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
std::get<Int8>(attr.null_values) = DB::parse<Int8>(null_value);
std::get<std::unique_ptr<Int8[]>>(attr.arrays) = std::make_unique<Int8[]>(size);
break;
case AttributeType::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
std::get<Int16>(attr.null_values) = DB::parse<Int16>(null_value);
std::get<std::unique_ptr<Int16[]>>(attr.arrays) = std::make_unique<Int16[]>(size);
break;
case AttributeType::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
std::get<Int32>(attr.null_values) = DB::parse<Int32>(null_value);
std::get<std::unique_ptr<Int32[]>>(attr.arrays) = std::make_unique<Int32[]>(size);
break;
case AttributeType::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
std::get<Int64>(attr.null_values) = DB::parse<Int64>(null_value);
std::get<std::unique_ptr<Int64[]>>(attr.arrays) = std::make_unique<Int64[]>(size);
break;
case AttributeType::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
std::get<Float32>(attr.null_values) = DB::parse<Float32>(null_value);
std::get<std::unique_ptr<Float32[]>>(attr.arrays) = std::make_unique<Float32[]>(size);
break;
case AttributeType::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
std::get<Float64>(attr.null_values) = DB::parse<Float64>(null_value);
std::get<std::unique_ptr<Float64[]>>(attr.arrays) = std::make_unique<Float64[]>(size);
break;
case AttributeType::string:
attr.string_null_value = null_value;
std::get<String>(attr.null_values) = null_value;
std::get<std::unique_ptr<StringRef[]>>(attr.arrays) = std::make_unique<StringRef[]>(size);
break;
}
return attr;
}
union item
static bool hasTimeExpired(const std::chrono::system_clock::time_point & time_point)
{
UInt8 uint8_value;
UInt16 uint16_value;
UInt32 uint32_value;
UInt64 uint64_value;
Int8 int8_value;
Int16 int16_value;
Int32 int32_value;
Int64 int64_value;
Float32 float32_value;
Float64 float64_value;
StringRef string_value;
item() : string_value{} {}
template <typename T> inline T get() const = delete;
};
struct cell
{
std::atomic_flag lock{false};
id_t id{};
std::vector<item> attrs;
std::chrono::system_clock::time_point expires_at{};
cell() = default;
cell(const std::size_t attribute_count) : attrs(attribute_count) {}
cell(const cell & other) { *this = other; }
cell & operator=(const cell & other)
{
id = other.id;
attrs = other.attrs;
expires_at = other.expires_at;
return *this;
}
bool hasExpired() const { return std::chrono::system_clock::now() >= expires_at; }
};
template <typename T>
T getItem(const std::size_t attribute_idx, const id_t id) const
{
const auto hash = intHash64(id);
const auto idx = hash % size;
auto & cell = cells[idx];
/// spinlock with a bit of throttling
while (cell.lock.test_and_set(std::memory_order_acquire))
std::this_thread::sleep_for(spinlock_wait_time);
SCOPE_EXIT(
cell.lock.clear(std::memory_order_release);
);
if (cell.id != id || cell.hasExpired())
populateCellForId(cell, id);
return cell.attrs[attribute_idx].get<T>();
return std::chrono::system_clock::now() >= time_point;
}
void populateCellForId(cell & cell, const id_t id) const
template <typename T>
void getItems(const std::size_t attribute_idx, const PODArray<id_t> & ids, PODArray<T> & out) const
{
auto stream = source_ptr->loadId(id);
HashMap<id_t, std::vector<std::size_t>> outdated_ids;
auto & attribute = attributes[attribute_idx];
auto & attribute_array = std::get<std::unique_ptr<T[]>>(attribute.arrays);
{
const Poco::ScopedReadRWLock read_lock{rw_lock};
/// fetch up-to-date values, decide which ones require update
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
if (id == 0)
{
out[i] = std::get<T>(attribute.null_values);
continue;
}
const auto cell_idx = getCellIdx(id);
const auto & cell = cells[cell_idx];
if (cell.id != id || hasTimeExpired(cell.expires_at))
{
out[i] = std::get<T>(attribute.null_values);
outdated_ids[id].push_back(i);
}
else
out[i] = attribute_array[cell_idx];
}
}
if (outdated_ids.empty())
return;
/// request new values
std::vector<id_t> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
update(required_ids, [&] (const auto id, const auto cell_idx) {
const auto attribute_value = attribute_array[cell_idx];
/// set missing values to out
for (const auto out_idx : outdated_ids[id])
out[out_idx] = attribute_value;
});
}
void getItems(const std::size_t attribute_idx, const PODArray<id_t> & ids, ColumnString * out) const
{
/// save on some allocations
out->getOffsets().reserve(ids.size());
auto & attribute = attributes[attribute_idx];
auto & attribute_array = std::get<std::unique_ptr<StringRef[]>>(attribute.arrays);
auto found_outdated_values = false;
/// perform optimistic version, fallback to pessimistic if failed
{
const Poco::ScopedReadRWLock read_lock{rw_lock};
/// fetch up-to-date values, discard on fail
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
if (id == 0)
{
const auto & string = std::get<String>(attribute.null_values);
out->insertData(string.data(), string.size());
continue;
}
const auto cell_idx = getCellIdx(id);
const auto & cell = cells[cell_idx];
if (cell.id != id || hasTimeExpired(cell.expires_at))
{
found_outdated_values = true;
break;
}
else
{
const auto string_ref = attribute_array[cell_idx];
out->insertData(string_ref.data, string_ref.size);
}
}
}
/// optimistic code completed successfully
if (!found_outdated_values)
return;
/// now onto the pessimistic one, discard possibly partial results from the optimistic path
out->getChars().resize_assume_reserved(0);
out->getOffsets().resize_assume_reserved(0);
/// outdated ids joined number of times they've been requested
HashMap<id_t, std::size_t> outdated_ids;
/// we are going to store every string separately
HashMap<id_t, String> map;
std::size_t total_length = 0;
{
const Poco::ScopedReadRWLock read_lock{rw_lock};
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
if (id == 0)
{
total_length += 1;
continue;
}
const auto cell_idx = getCellIdx(id);
const auto & cell = cells[cell_idx];
if (cell.id != id || hasTimeExpired(cell.expires_at))
outdated_ids[id] += 1;
else
{
const auto string_ref = attribute_array[cell_idx];
map[id] = string_ref;
total_length += string_ref.size + 1;
};
}
}
/// request new values
if (!outdated_ids.empty())
{
std::vector<id_t> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids),
[] (auto & pair) { return pair.first; });
update(required_ids, [&] (const auto id, const auto cell_idx) {
const auto attribute_value = attribute_array[cell_idx];
map[id] = attribute_value;
total_length += attribute_value.size + 1;
});
}
out->getChars().reserve(total_length);
for (const auto id : ids)
{
const auto it = map.find(id);
const auto string = it != map.end() ? it->second : std::get<String>(attributes[attribute_idx].null_values);
out->insertData(string.data(), string.size());
}
}
template <typename F>
void update(const std::vector<id_t> ids, F && on_cell_updated) const
{
auto stream = source_ptr->loadIds(ids);
stream->readPrefix();
auto empty_response = true;
const Poco::ScopedWriteRWLock write_lock{rw_lock};
while (const auto block = stream->read())
{
if (!empty_response)
const auto id_column = typeid_cast<const ColumnVector<UInt64> *>(block.getByPosition(0).column.get());
if (!id_column)
throw Exception{
"Stream returned from loadId contains more than one block",
ErrorCodes::LOGICAL_ERROR
"Id column has type different from UInt64.",
ErrorCodes::TYPE_MISMATCH
};
if (block.rowsInFirstColumn() != 1)
throw Exception{
"Block has more than one row",
ErrorCodes::LOGICAL_ERROR
};
const auto & ids = id_column->getData();
for (const auto attribute_idx : ext::range(0, attributes.size()))
for (const auto i : ext::range(0, ids.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
const auto id = ids[i];
const auto & cell_idx = getCellIdx(id);
auto & cell = cells[cell_idx];
setAttributeValue(cell.attrs[attribute_idx], attribute, attribute_column[0]);
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *block.getByPosition(attribute_idx + 1).column;
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
std::uniform_int_distribution<std::uint64_t> distribution{
dict_lifetime.min_sec,
dict_lifetime.max_sec
};
cell.id = id;
cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
on_cell_updated(id, cell_idx);
}
empty_response = false;
}
stream->readSuffix();
if (empty_response)
setCellDefaults(cell);
cell.id = id;
cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{dict_lifetime.min_sec};
}
void setAttributeValue(item & item, const attribute_t & attribute, const Field & value) const
std::uint64_t getCellIdx(const id_t id) const
{
const auto hash = intHash64(id);
const auto idx = hash & (size - 1);
return idx;
}
void setAttributeValue(attribute_t & attribute, const id_t idx, const Field & value) const
{
switch (attribute.type)
{
case AttributeType::uint8: item.uint8_value = value.get<UInt64>(); break;
case AttributeType::uint16: item.uint16_value = value.get<UInt64>(); break;
case AttributeType::uint32: item.uint32_value = value.get<UInt64>(); break;
case AttributeType::uint64: item.uint64_value = value.get<UInt64>(); break;
case AttributeType::int8: item.int8_value = value.get<Int64>(); break;
case AttributeType::int16: item.int16_value = value.get<Int64>(); break;
case AttributeType::int32: item.int32_value = value.get<Int64>(); break;
case AttributeType::int64: item.int64_value = value.get<Int64>(); break;
case AttributeType::float32: item.float32_value = value.get<Float64>(); break;
case AttributeType::float64: item.float64_value = value.get<Float64>(); break;
case AttributeType::uint8: std::get<std::unique_ptr<UInt8[]>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeType::uint16: std::get<std::unique_ptr<UInt16[]>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeType::uint32: std::get<std::unique_ptr<UInt32[]>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeType::uint64: std::get<std::unique_ptr<UInt64[]>>(attribute.arrays)[idx] = value.get<UInt64>(); break;
case AttributeType::int8: std::get<std::unique_ptr<Int8[]>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeType::int16: std::get<std::unique_ptr<Int16[]>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeType::int32: std::get<std::unique_ptr<Int32[]>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeType::int64: std::get<std::unique_ptr<Int64[]>>(attribute.arrays)[idx] = value.get<Int64>(); break;
case AttributeType::float32: std::get<std::unique_ptr<Float32[]>>(attribute.arrays)[idx] = value.get<Float64>(); break;
case AttributeType::float64: std::get<std::unique_ptr<Float64[]>>(attribute.arrays)[idx] = value.get<Float64>(); break;
case AttributeType::string:
{
const auto & string = value.get<String>();
auto & string_ref = item.string_value;
if (string_ref.data && string_ref.data != attribute.string_null_value.data())
auto & string_ref = std::get<std::unique_ptr<StringRef[]>>(attribute.arrays)[idx];
if (string_ref.data)
delete[] string_ref.data;
const auto size = string.size();
@ -340,39 +486,6 @@ private:
}
}
void setCellDefaults(cell & cell) const
{
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
auto & attribute = attributes[attribute_idx];
auto & item = cell.attrs[attribute_idx];
switch (attribute.type)
{
case AttributeType::uint8: item.uint8_value = attribute.uint8_null_value; break;
case AttributeType::uint16: item.uint16_value = attribute.uint16_null_value; break;
case AttributeType::uint32: item.uint32_value = attribute.uint32_null_value; break;
case AttributeType::uint64: item.uint64_value = attribute.uint64_null_value; break;
case AttributeType::int8: item.int8_value = attribute.int8_null_value; break;
case AttributeType::int16: item.int16_value = attribute.int16_null_value; break;
case AttributeType::int32: item.int32_value = attribute.int32_null_value; break;
case AttributeType::int64: item.int64_value = attribute.int64_null_value; break;
case AttributeType::float32: item.float32_value = attribute.float32_null_value; break;
case AttributeType::float64: item.float64_value = attribute.float64_null_value; break;
case AttributeType::string:
{
auto & string_ref = item.string_value;
if (string_ref.data && string_ref.data != attribute.string_null_value.data())
delete[] string_ref.data;
string_ref = attribute.string_null_value;
break;
}
}
}
}
std::size_t getAttributeIndex(const std::string & attribute_name) const
{
const auto it = attribute_index_by_name.find(attribute_name);
@ -411,23 +524,14 @@ private:
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
mutable Poco::RWLock rw_lock;
const std::size_t size;
mutable std::vector<cell> cells;
std::map<std::string, std::size_t> attribute_index_by_name;
std::vector<attribute_t> attributes;
mutable std::vector<attribute_t> attributes;
mutable std::vector<cell_metadata_t> cells;
const attribute_t * hierarchical_attribute = nullptr;
mutable std::mt19937_64 rnd_engine{getSeed()};
};
template <> inline UInt8 CacheDictionary::item::get<UInt8>() const { return uint8_value; }
template <> inline UInt16 CacheDictionary::item::get<UInt16>() const { return uint16_value; }
template <> inline UInt32 CacheDictionary::item::get<UInt32>() const { return uint32_value; }
template <> inline UInt64 CacheDictionary::item::get<UInt64>() const { return uint64_value; }
template <> inline Int8 CacheDictionary::item::get<Int8>() const { return int8_value; }
template <> inline Int16 CacheDictionary::item::get<Int16>() const { return int16_value; }
template <> inline Int32 CacheDictionary::item::get<Int32>() const { return int32_value; }
template <> inline Int64 CacheDictionary::item::get<Int64>() const { return int64_value; }
template <> inline Float32 CacheDictionary::item::get<Float32>() const { return float32_value; }
template <> inline Float64 CacheDictionary::item::get<Float64>() const { return float64_value; }
template <> inline StringRef CacheDictionary::item::get<StringRef>() const { return string_value; }
}

View File

@ -60,23 +60,15 @@ public:
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
const auto query = composeLoadIdQuery(id);
const auto query = composeLoadIdsQuery(ids);
if (is_local)
return executeQuery(query, context, true).in;
return new RemoteBlockInputStream{pool.get(), query, nullptr};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
bool isModified() const override { return true; }
bool supportsSelectiveLoad() const override { return true; }
@ -103,7 +95,7 @@ private:
return query;
}
std::string composeLoadIdQuery(const id_t id)
std::string composeLoadIdsQuery(const std::vector<std::uint64_t> ids)
{
std::string query{"SELECT "};
@ -113,13 +105,25 @@ private:
if (!first)
query += ", ";
query += sample_block.getByPosition(idx).name;
first = false;
query += sample_block.getByPosition(idx).name;
}
const auto & id_column_name = sample_block.getByPosition(0).name;
query += " FROM " + table + " WHERE " + id_column_name + " IN (" + std::to_string(id) + ");";
query += " FROM " + table + " WHERE " + id_column_name + " IN (";
first = true;
for (const auto id : ids)
{
if (!first)
query += ',';
first = false;
query += toString(id);
}
query += ");";
return query;
}

View File

@ -38,14 +38,6 @@ public:
return new OwningBufferBlockInputStream{stream, std::move(in_ptr)};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{

View File

@ -69,8 +69,8 @@ public:
};
}
#define DECLARE_INDIVIDUAL_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
#define DECLARE_INDIVIDUAL_GETTER(TYPE, LC_TYPE) \
TYPE get##TYPE(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
@ -83,17 +83,17 @@ public:
return (*attribute.LC_TYPE##_array)[id];\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_INDIVIDUAL_GETTER(UInt8, UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, Float64, float64)
DECLARE_INDIVIDUAL_GETTER(StringRef, String, string)
DECLARE_INDIVIDUAL_GETTER(UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, float64)
DECLARE_INDIVIDUAL_GETTER(String, string)
#undef DECLARE_INDIVIDUAL_GETTER
#define DECLARE_MULTIPLE_GETTER(TYPE, LC_TYPE)\
@ -143,7 +143,7 @@ public:
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto string_ref = id < attr.size() ? attr[id] : null_value;
const auto string_ref = id < attr.size() ? attr[id] : StringRef{null_value};
out->insertData(string_ref.data, string_ref.size);
}
}
@ -184,8 +184,8 @@ private:
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
attributes.push_back(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();

View File

@ -99,8 +99,8 @@ public:
};
}
#define DECLARE_INDIVIDUAL_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
#define DECLARE_INDIVIDUAL_GETTER(TYPE, LC_TYPE) \
TYPE get##TYPE(const std::string & attribute_name, const id_t id) const override\
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
@ -116,17 +116,17 @@ public:
\
return attribute.LC_TYPE##_null_value;\
}
DECLARE_INDIVIDUAL_GETTER(UInt8, UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, Float64, float64)
DECLARE_INDIVIDUAL_GETTER(StringRef, String, string)
DECLARE_INDIVIDUAL_GETTER(UInt8, uint8)
DECLARE_INDIVIDUAL_GETTER(UInt16, uint16)
DECLARE_INDIVIDUAL_GETTER(UInt32, uint32)
DECLARE_INDIVIDUAL_GETTER(UInt64, uint64)
DECLARE_INDIVIDUAL_GETTER(Int8, int8)
DECLARE_INDIVIDUAL_GETTER(Int16, int16)
DECLARE_INDIVIDUAL_GETTER(Int32, int32)
DECLARE_INDIVIDUAL_GETTER(Int64, int64)
DECLARE_INDIVIDUAL_GETTER(Float32, float32)
DECLARE_INDIVIDUAL_GETTER(Float64, float64)
DECLARE_INDIVIDUAL_GETTER(String, string)
#undef DECLARE_INDIVIDUAL_GETTER
#define DECLARE_MULTIPLE_GETTER(TYPE, LC_TYPE)\
@ -176,7 +176,7 @@ public:
for (const auto i : ext::range(0, ids.size()))
{
const auto it = attr.find(ids[i]);
const auto string_ref = it != attr.end() ? it->second : null_value;
const auto string_ref = it != attr.end() ? it->second : StringRef{null_value};
out->insertData(string_ref.data, string_ref.size);
}
}
@ -217,8 +217,8 @@ private:
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attributes.push_back(std::move(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value)));
attributes.push_back(createAttributeWithType(getAttributeTypeByName(attribute.type),
attribute.null_value));
if (attribute.hierarchical)
hierarchical_attribute = &attributes.back();

View File

@ -58,7 +58,7 @@ public:
virtual Int64 getInt64(const std::string & attribute_name, id_t id) const = 0;
virtual Float32 getFloat32(const std::string & attribute_name, id_t id) const = 0;
virtual Float64 getFloat64(const std::string & attribute_name, id_t id) const = 0;
virtual StringRef getString(const std::string & attribute_name, id_t id) const = 0;
virtual String getString(const std::string & attribute_name, id_t id) const = 0;
/// functions for multiple access
virtual void getUInt8(const std::string & attr_name, const PODArray<id_t> & ids, PODArray<UInt8> & out) const = 0;

View File

@ -24,9 +24,6 @@ public:
*/
virtual bool supportsSelectiveLoad() const = 0;
/// returns an input stream with the data for the requested identifier
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
/// returns an input stream with the data for a collection of identifiers
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;

View File

@ -39,14 +39,6 @@ public:
return new MySQLBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
{
throw Exception{
"Method unsupported",
ErrorCodes::NOT_IMPLEMENTED
};
}
BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) override
{
throw Exception{

View File

@ -779,7 +779,7 @@ private:
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", muste be UInt64.",
+ ", must be UInt64.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -836,7 +836,7 @@ private:
{
block.getByPosition(result).column = new ColumnConst<String>{
id_col->size(),
dictionary->getString(attr_name, id_col->getData()).toString()
dictionary->getString(attr_name, id_col->getData())
};
}
else

View File

@ -10,6 +10,7 @@
#include <DB/Core/StringRef.h>
#include <DB/Columns/IColumn.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnFixedString.h>
template <>
@ -56,7 +57,8 @@ static inline T ALWAYS_INLINE packFixed(
offset += 8;
break;
default:
__builtin_unreachable();
memcpy(bytes + offset, &static_cast<const ColumnFixedString *>(key_columns[j])->getChars()[i * key_sizes[j]], key_sizes[j]);
offset += key_sizes[j];
}
}

View File

@ -39,6 +39,17 @@ DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::Ab
{
return std::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if ("cache" == layout_type)
{
const auto size = config.getInt(layout_prefix + ".cache.size");
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
}
throw Exception{
"Unknown dictionary layout type: " + layout_type,

View File

@ -0,0 +1,10 @@
99999
99998
99997
99996
99995
99994
99993
99992
99991
99990

View File

@ -0,0 +1 @@
SELECT n, k FROM (SELECT number AS n, toFixedString(materialize(' '), 3) AS k FROM system.numbers LIMIT 100000) GROUP BY n, k ORDER BY n DESC, k LIMIT 10;