Reimplement preallocate for hashed/sparse_hashed dictionaries

It was initially implemented in #15454, but was reverted in #21948 (due
to higher memory usage).

This implementation differs from the initial, since now there is
separate attribute to enable preallocation, before it was done
automatically, but this has problems with duplicates in the source.

Plus this implementation does not uses dynamic_cast, instead it extends
IDictionarySource interface.
This commit is contained in:
Azat Khuzhin 2021-05-09 12:12:21 +03:00
parent efcde4c84f
commit 808d1a0215
32 changed files with 240 additions and 114 deletions

View File

@ -95,7 +95,9 @@ LAYOUT(FLAT(INITIAL_ARRAY_SIZE 50000 MAX_ARRAY_SIZE 5000000))
The dictionary is completely stored in memory in the form of a hash table. The dictionary can contain any number of elements with any identifiers In practice, the number of keys can reach tens of millions of items. The dictionary is completely stored in memory in the form of a hash table. The dictionary can contain any number of elements with any identifiers In practice, the number of keys can reach tens of millions of items.
The hash table will be preallocated (this will make dictionary load faster), if the is approx number of total rows is known, this is supported only if the source is `clickhouse` without any `<where>` (since in case of `<where>` you can filter out too much rows and the dictionary will allocate too much memory, that will not be used eventually). If `preallocate` is `true` (default is `false`) the hash table will be preallocated (this will make dictionary load faster). But note that you should use it only if:
- the source support approximate number of elements (for now it is supported only by the `ClickHouse` source)
- there is no duplicates in the data (otherwise it may increase memory usage for the hashtable)
All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety. All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety.
@ -103,21 +105,23 @@ Configuration example:
``` xml ``` xml
<layout> <layout>
<hashed /> <hashed>
<preallocate>0</preallocate>
</hashed>
</layout> </layout>
``` ```
or or
``` sql ``` sql
LAYOUT(HASHED()) LAYOUT(HASHED(PREALLOCATE 0))
``` ```
### sparse_hashed {#dicts-external_dicts_dict_layout-sparse_hashed} ### sparse_hashed {#dicts-external_dicts_dict_layout-sparse_hashed}
Similar to `hashed`, but uses less memory in favor more CPU usage. Similar to `hashed`, but uses less memory in favor more CPU usage.
It will be also preallocated so as `hashed`, note that it is even more significant for `sparse_hashed`. It will be also preallocated so as `hashed` (with `preallocate` set to `true`), and note that it is even more significant for `sparse_hashed`.
Configuration example: Configuration example:
@ -127,8 +131,10 @@ Configuration example:
</layout> </layout>
``` ```
or
``` sql ``` sql
LAYOUT(SPARSE_HASHED()) LAYOUT(SPARSE_HASHED([PREALLOCATE 0]))
``` ```
### complex_key_hashed {#complex-key-hashed} ### complex_key_hashed {#complex-key-hashed}

View File

@ -132,7 +132,7 @@ void CassandraDictionarySource::maybeAllowFiltering(String & query) const
query += " ALLOW FILTERING;"; query += " ALLOW FILTERING;";
} }
BlockInputStreamPtr CassandraDictionarySource::loadAll() BlockInputStreamPtr CassandraDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
String query = query_builder.composeLoadAllQuery(); String query = query_builder.composeLoadAllQuery();
maybeAllowFiltering(query); maybeAllowFiltering(query);
@ -183,7 +183,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu
return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads); return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
} }
BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll() BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource");
} }

View File

@ -49,7 +49,7 @@ public:
const String & config_prefix, const String & config_prefix,
Block & sample_block); Block & sample_block);
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
bool supportsSelectiveLoad() const override { return true; } bool supportsSelectiveLoad() const override { return true; }
@ -66,7 +66,7 @@ public:
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
String toString() const override; String toString() const override;

View File

@ -105,15 +105,15 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
} }
} }
BlockInputStreamPtr ClickHouseDictionarySource::loadAll() BlockInputStreamPtr ClickHouseDictionarySource::loadAll(std::atomic<size_t> * result_size_hint)
{ {
return createStreamForQuery(load_all_query); return createStreamForQuery(load_all_query, result_size_hint);
} }
BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll(std::atomic<size_t> * result_size_hint)
{ {
String load_update_query = getUpdateFieldAndDate(); String load_update_query = getUpdateFieldAndDate();
return createStreamForQuery(load_update_query); return createStreamForQuery(load_update_query, result_size_hint);
} }
BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids) BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -152,19 +152,32 @@ std::string ClickHouseDictionarySource::toString() const
return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where); return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where);
} }
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query) BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint)
{ {
BlockInputStreamPtr stream;
/// Sample block should not contain first row default values /// Sample block should not contain first row default values
auto empty_sample_block = sample_block.cloneEmpty(); auto empty_sample_block = sample_block.cloneEmpty();
if (configuration.is_local) if (configuration.is_local)
{ {
auto stream = executeQuery(query, context, true).getInputStream(); stream = executeQuery(query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position); stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream; }
else
{
stream = std::make_shared<RemoteBlockInputStream>(pool, query, empty_sample_block, context);
} }
return std::make_shared<RemoteBlockInputStream>(pool, query, empty_sample_block, context); if (result_size_hint)
{
stream->setProgressCallback([result_size_hint](const Progress & progress)
{
*result_size_hint += progress.total_rows_to_read;
});
}
return stream;
} }
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const

View File

@ -43,9 +43,9 @@ public:
ClickHouseDictionarySource(const ClickHouseDictionarySource & other); ClickHouseDictionarySource(const ClickHouseDictionarySource & other);
ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete; ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * result_size_hint = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * result_size_hint = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
@ -67,7 +67,7 @@ public:
private: private:
std::string getUpdateFieldAndDate(); std::string getUpdateFieldAndDate();
BlockInputStreamPtr createStreamForQuery(const String & query); BlockInputStreamPtr createStreamForQuery(const String & query, std::atomic<size_t> * result_size_hint = nullptr);
std::string doInvalidateQuery(const std::string & request) const; std::string doInvalidateQuery(const std::string & request) const;

View File

@ -101,7 +101,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar
{ {
} }
BlockInputStreamPtr ExecutableDictionarySource::loadAll() BlockInputStreamPtr ExecutableDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
if (implicit_key) if (implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method");
@ -112,7 +112,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process)); return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
} }
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
if (implicit_key) if (implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method");

View File

@ -25,13 +25,13 @@ public:
ExecutableDictionarySource(const ExecutableDictionarySource & other); ExecutableDictionarySource(const ExecutableDictionarySource & other);
ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete; ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */) override;
/** The logic of this method is flawed, absolutely incorrect and ignorant. /** The logic of this method is flawed, absolutely incorrect and ignorant.
* It may lead to skipping some values due to clock sync or timezone changes. * It may lead to skipping some values due to clock sync or timezone changes.
* The intended usage of "update_field" is totally different. * The intended usage of "update_field" is totally different.
*/ */
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;

View File

@ -68,12 +68,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP
{ {
} }
BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll() BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
} }
BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll() BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
} }

View File

@ -47,13 +47,13 @@ public:
ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other); ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other);
ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete; ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
/** The logic of this method is flawed, absolutely incorrect and ignorant. /** The logic of this method is flawed, absolutely incorrect and ignorant.
* It may lead to skipping some values due to clock sync or timezone changes. * It may lead to skipping some values due to clock sync or timezone changes.
* The intended usage of "update_field" is totally different. * The intended usage of "update_field" is totally different.
*/ */
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;

View File

@ -61,7 +61,7 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other)
} }
BlockInputStreamPtr FileDictionarySource::loadAll() BlockInputStreamPtr FileDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString()); LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString());
auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath); auto in_ptr = std::make_unique<ReadBufferFromFile>(filepath);

View File

@ -21,9 +21,9 @@ public:
FileDictionarySource(const FileDictionarySource & other); FileDictionarySource(const FileDictionarySource & other);
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource");
} }

View File

@ -104,7 +104,7 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri)
} }
} }
BlockInputStreamPtr HTTPDictionarySource::loadAll() BlockInputStreamPtr HTTPDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
LOG_TRACE(log, "loadAll {}", toString()); LOG_TRACE(log, "loadAll {}", toString());
Poco::URI uri(url); Poco::URI uri(url);
@ -115,7 +115,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll()
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr)); return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(in_ptr));
} }
BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
Poco::URI uri(url); Poco::URI uri(url);
getUpdateFieldAndDate(uri); getUpdateFieldAndDate(uri);

View File

@ -32,9 +32,9 @@ public:
HTTPDictionarySource(const HTTPDictionarySource & other); HTTPDictionarySource(const HTTPDictionarySource & other);
HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete; HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;

View File

@ -40,14 +40,12 @@ HashedDictionary<dictionary_key_type, sparse>::HashedDictionary(
const StorageID & dict_id_, const StorageID & dict_id_,
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_, DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_, const HashedDictionaryStorageConfiguration & configuration_,
bool require_nonempty_,
BlockPtr update_field_loaded_block_) BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_) : IDictionary(dict_id_)
, dict_struct(dict_struct_) , dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_)) , source_ptr(std::move(source_ptr_))
, dict_lifetime(dict_lifetime_) , configuration(configuration_)
, require_nonempty(require_nonempty_)
, update_field_loaded_block(std::move(update_field_loaded_block_)) , update_field_loaded_block(std::move(update_field_loaded_block_))
{ {
createAttributes(); createAttributes();
@ -359,6 +357,8 @@ void HashedDictionary<dictionary_key_type, sparse>::createAttributes()
template <DictionaryKeyType dictionary_key_type, bool sparse> template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::updateData() void HashedDictionary<dictionary_key_type, sparse>::updateData()
{ {
/// NOTE: updateData() does not preallocation since it may increase memory usage.
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0) if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{ {
auto stream = source_ptr->loadUpdatedAll(); auto stream = source_ptr->loadUpdatedAll();
@ -552,13 +552,30 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
{ {
if (!source_ptr->hasUpdateField()) if (!source_ptr->hasUpdateField())
{ {
auto stream = source_ptr->loadAll(); std::atomic<size_t> new_size = 0;
BlockInputStreamPtr stream;
if (configuration.preallocate)
stream = source_ptr->loadAll(&new_size);
else
stream = source_ptr->loadAll();
stream->readPrefix(); stream->readPrefix();
while (const auto block = stream->read()) while (const auto block = stream->read())
{ {
if (configuration.preallocate && new_size)
{
size_t current_new_size = new_size.exchange(0);
if (current_new_size)
{
LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Preallocated {} elements", current_new_size);
resize(current_new_size);
}
}
else
resize(block.rows()); resize(block.rows());
blockToAttributes(block); blockToAttributes(block);
} }
@ -567,7 +584,7 @@ void HashedDictionary<dictionary_key_type, sparse>::loadData()
else else
updateData(); updateData();
if (require_nonempty && 0 == element_count) if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY,
"{}: dictionary source is empty and 'require_nonempty' property is set.", "{}: dictionary source is empty and 'require_nonempty' property is set.",
full_name); full_name);
@ -710,19 +727,24 @@ void registerDictionaryHashed(DictionaryFactory & factory)
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false); const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
const std::string & layout_prefix = sparse ? ".layout.sparse_hashed" : ".layout.hashed";
const bool preallocate = config.getBool(config_prefix + layout_prefix + ".preallocate", false);
HashedDictionaryStorageConfiguration configuration{preallocate, require_nonempty, dict_lifetime};
if (dictionary_key_type == DictionaryKeyType::simple) if (dictionary_key_type == DictionaryKeyType::simple)
{ {
if (sparse) if (sparse)
return std::make_unique<HashedDictionary<DictionaryKeyType::simple, true>>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); return std::make_unique<HashedDictionary<DictionaryKeyType::simple, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else else
return std::make_unique<HashedDictionary<DictionaryKeyType::simple, false>>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); return std::make_unique<HashedDictionary<DictionaryKeyType::simple, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
} }
else else
{ {
if (sparse) if (sparse)
return std::make_unique<HashedDictionary<DictionaryKeyType::complex, true>>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); return std::make_unique<HashedDictionary<DictionaryKeyType::complex, true>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else else
return std::make_unique<HashedDictionary<DictionaryKeyType::complex, false>>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); return std::make_unique<HashedDictionary<DictionaryKeyType::complex, false>>(dict_id, dict_struct, std::move(source_ptr), configuration);
} }
}; };

View File

@ -28,6 +28,13 @@
namespace DB namespace DB
{ {
struct HashedDictionaryStorageConfiguration
{
const bool preallocate;
const bool require_nonempty;
const DictionaryLifetime lifetime;
};
template <DictionaryKeyType dictionary_key_type, bool sparse> template <DictionaryKeyType dictionary_key_type, bool sparse>
class HashedDictionary final : public IDictionary class HashedDictionary final : public IDictionary
{ {
@ -39,8 +46,7 @@ public:
const StorageID & dict_id_, const StorageID & dict_id_,
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_, DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_, const HashedDictionaryStorageConfiguration & configuration_,
bool require_nonempty_,
BlockPtr update_field_loaded_block_ = nullptr); BlockPtr update_field_loaded_block_ = nullptr);
std::string getTypeName() const override std::string getTypeName() const override
@ -75,12 +81,12 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, update_field_loaded_block); return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
} }
const IDictionarySource * getSource() const override { return source_ptr.get(); } const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
@ -218,8 +224,7 @@ private:
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr; const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime; const HashedDictionaryStorageConfiguration configuration;
const bool require_nonempty;
std::vector<Attribute> attributes; std::vector<Attribute> attributes;

View File

@ -4,6 +4,7 @@
#include <DataStreams/IBlockStream_fwd.h> #include <DataStreams/IBlockStream_fwd.h>
#include <vector> #include <vector>
#include <atomic>
namespace DB namespace DB
@ -19,11 +20,42 @@ using SharedDictionarySourcePtr = std::shared_ptr<IDictionarySource>;
class IDictionarySource class IDictionarySource
{ {
public: public:
/// Returns an input stream with all the data available from this source. /**
virtual BlockInputStreamPtr loadAll() = 0; * @param result_size_hint - approx number of rows in the stream.
* @return an input stream with all the data available from this source.
*
* NOTE: result_size_hint may be changed during you are reading (usually it
* will be non zero for the first block and zero for others, since it uses
* Progress::total_rows_approx,) from the input stream, and may be called
* in parallel, so you should use something like this:
*
* ...
* std::atomic<uint64_t> new_size = 0;
*
* auto stream = source->loadAll(&new_size);
* stream->readPrefix();
*
* while (const auto block = stream->read())
* {
* if (new_size)
* {
* size_t current_new_size = new_size.exchange(0);
* if (current_new_size)
* resize(current_new_size);
* }
* else
* {
* resize(block.rows());
* }
* }
*
* stream->readSuffix();
* ...
*/
virtual BlockInputStreamPtr loadAll(std::atomic<size_t> * result_size_hint = nullptr) = 0;
/// Returns an input stream with updated data available from this source. /// Returns an input stream with updated data available from this source.
virtual BlockInputStreamPtr loadUpdatedAll() = 0; virtual BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * result_size_hint = nullptr) = 0;
/** Indicates whether this source supports "random access" loading of data /** Indicates whether this source supports "random access" loading of data
* loadId and loadIds can only be used if this function returns true. * loadId and loadIds can only be used if this function returns true.

View File

@ -94,7 +94,7 @@ bool LibraryDictionarySource::supportsSelectiveLoad() const
} }
BlockInputStreamPtr LibraryDictionarySource::loadAll() BlockInputStreamPtr LibraryDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
LOG_TRACE(log, "loadAll {}", toString()); LOG_TRACE(log, "loadAll {}", toString());
return bridge_helper->loadAll(); return bridge_helper->loadAll();

View File

@ -47,9 +47,9 @@ public:
~LibraryDictionarySource() override; ~LibraryDictionarySource() override;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource");
} }

View File

@ -143,7 +143,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource &
MongoDBDictionarySource::~MongoDBDictionarySource() = default; MongoDBDictionarySource::~MongoDBDictionarySource() = default;
BlockInputStreamPtr MongoDBDictionarySource::loadAll() BlockInputStreamPtr MongoDBDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); return std::make_shared<MongoDBBlockInputStream>(connection, createCursor(db, collection, sample_block), sample_block, max_block_size);
} }

View File

@ -46,9 +46,9 @@ public:
~MongoDBDictionarySource() override; ~MongoDBDictionarySource() override;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource");
} }

View File

@ -126,7 +126,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query)
pool, query, sample_block, settings); pool, query, sample_block, settings);
} }
BlockInputStreamPtr MySQLDictionarySource::loadAll() BlockInputStreamPtr MySQLDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
auto connection = pool->get(); auto connection = pool->get();
last_modification = getLastModification(connection, false); last_modification = getLastModification(connection, false);
@ -135,7 +135,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
return loadFromQuery(load_all_query); return loadFromQuery(load_all_query);
} }
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll() BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
auto connection = pool->get(); auto connection = pool->get();
last_modification = getLastModification(connection, false); last_modification = getLastModification(connection, false);

View File

@ -42,9 +42,9 @@ public:
MySQLDictionarySource(const MySQLDictionarySource & other); MySQLDictionarySource(const MySQLDictionarySource & other);
MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete; MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;

View File

@ -65,14 +65,14 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadAll() BlockInputStreamPtr PostgreSQLDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
LOG_TRACE(log, load_all_query); LOG_TRACE(log, load_all_query);
return loadBase(load_all_query); return loadBase(load_all_query);
} }
BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll() BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
auto load_update_query = getUpdateFieldAndDate(); auto load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query); LOG_TRACE(log, load_update_query);

View File

@ -33,8 +33,8 @@ public:
PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other); PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other);
PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete; PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override; BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;

View File

@ -160,7 +160,7 @@ namespace DB
__builtin_unreachable(); __builtin_unreachable();
} }
BlockInputStreamPtr RedisDictionarySource::loadAll() BlockInputStreamPtr RedisDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
if (!client->isConnected()) if (!client->isConnected())
client->connect(host, port); client->connect(host, port);

View File

@ -59,9 +59,9 @@ namespace ErrorCodes
~RedisDictionarySource() override; ~RedisDictionarySource() override;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override
{ {
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource");
} }

View File

@ -166,14 +166,14 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
} }
BlockInputStreamPtr XDBCDictionarySource::loadAll() BlockInputStreamPtr XDBCDictionarySource::loadAll(std::atomic<size_t> * /* result_size_hint */)
{ {
LOG_TRACE(log, load_all_query); LOG_TRACE(log, load_all_query);
return loadFromQuery(bridge_url, sample_block, load_all_query); return loadFromQuery(bridge_url, sample_block, load_all_query);
} }
BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll() BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */)
{ {
std::string load_query_update = getUpdateFieldAndDate(); std::string load_query_update = getUpdateFieldAndDate();

View File

@ -38,9 +38,9 @@ public:
XDBCDictionarySource(const XDBCDictionarySource & other); XDBCDictionarySource(const XDBCDictionarySource & other);
XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete; XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete;
BlockInputStreamPtr loadAll() override; BlockInputStreamPtr loadAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadUpdatedAll() override; BlockInputStreamPtr loadUpdatedAll(std::atomic<size_t> * /* result_size_hint */ = nullptr) override;
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override; BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;

View File

@ -1,4 +1,9 @@
CREATE DICTIONARY db_01509.dict\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() USER \'default\' TABLE \'data\' PASSWORD \'\' DB \'db_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED()) CREATE DICTIONARY default.dict_01509\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 0))
CREATE DICTIONARY default.dict_01509_preallocate\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 1))
HashedDictionary: Preallocated 10000 elements
- -
0 0
1000 10000
-
0
10000

View File

@ -0,0 +1,78 @@
#!/usr/bin/env bash
# shellcheck disable=SC2031
# shellcheck disable=SC2030
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS data_01509;
DROP DICTIONARY IF EXISTS dict_01509;
DROP DICTIONARY IF EXISTS dict_01509_preallocate;
CREATE TABLE data_01509
(
key UInt64,
value String
)
ENGINE = MergeTree()
ORDER BY key;
INSERT INTO data_01509 SELECT number key, toString(number) value FROM numbers(10e3);
"
# regular
$CLICKHOUSE_CLIENT -nm -q "
CREATE DICTIONARY dict_01509
(
key UInt64,
value String DEFAULT '-'
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509'))
LAYOUT(SPARSE_HASHED(PREALLOCATE 0))
LIFETIME(0);
SHOW CREATE DICTIONARY dict_01509;
"
(
# start new shell to avoid overriding variables for other client invocation
CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace}
$CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509" |& grep -o "HashedDictionary.*"
)
# with preallocation
$CLICKHOUSE_CLIENT -nm -q "
CREATE DICTIONARY dict_01509_preallocate
(
key UInt64,
value String DEFAULT '-'
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509'))
LAYOUT(SPARSE_HASHED(PREALLOCATE 1))
LIFETIME(0);
SHOW CREATE DICTIONARY dict_01509_preallocate;
SYSTEM RELOAD DICTIONARY dict_01509_preallocate;
"
(
# start new shell to avoid overriding variables for other client invocation
CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace}
$CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509_preallocate" |& grep -o "HashedDictionary.*"
)
$CLICKHOUSE_CLIENT -nm -q "
SELECT dictGet('dict_01509', 'value', toUInt64(1e12));
SELECT dictGet('dict_01509', 'value', toUInt64(0));
SELECT count() FROM dict_01509;
SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(1e12));
SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(0));
SELECT count() FROM dict_01509_preallocate;
"
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE data_01509;
DROP DICTIONARY dict_01509;
DROP DICTIONARY dict_01509_preallocate;
"

View File

@ -1,36 +0,0 @@
-- The test itself does not test does preallocation works
-- It simply check SPARSE_HASHED dictionary with bunch of dictGet()
-- (since at the moment of writing there were no such test)
DROP DATABASE IF EXISTS db_01509;
CREATE DATABASE db_01509;
CREATE TABLE db_01509.data
(
key UInt64,
value String
)
ENGINE = MergeTree()
ORDER BY key;
INSERT INTO db_01509.data SELECT number key, toString(number) value FROM numbers(1000);
DROP DICTIONARY IF EXISTS db_01509.dict;
CREATE DICTIONARY db_01509.dict
(
key UInt64,
value String DEFAULT '-'
)
PRIMARY KEY key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'data' PASSWORD '' DB 'db_01509'))
LAYOUT(SPARSE_HASHED())
LIFETIME(0);
SHOW CREATE DICTIONARY db_01509.dict;
SYSTEM RELOAD DICTIONARY db_01509.dict;
SELECT dictGet('db_01509.dict', 'value', toUInt64(1e12));
SELECT dictGet('db_01509.dict', 'value', toUInt64(0));
SELECT count() FROM db_01509.dict;
DROP DATABASE IF EXISTS db_01509;

View File

@ -348,6 +348,7 @@
"01508_partition_pruning_long", "01508_partition_pruning_long",
"01509_check_parallel_quorum_inserts_long", "01509_check_parallel_quorum_inserts_long",
"01509_parallel_quorum_and_merge_long", "01509_parallel_quorum_and_merge_long",
"01509_dictionary_preallocate",
"01515_mv_and_array_join_optimisation_bag", "01515_mv_and_array_join_optimisation_bag",
"01516_create_table_primary_key", "01516_create_table_primary_key",
"01517_drop_mv_with_inner_table", "01517_drop_mv_with_inner_table",