diff --git a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md index c5889a8c185..3c6eb5a0d62 100644 --- a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md +++ b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md @@ -95,7 +95,9 @@ LAYOUT(FLAT(INITIAL_ARRAY_SIZE 50000 MAX_ARRAY_SIZE 5000000)) The dictionary is completely stored in memory in the form of a hash table. The dictionary can contain any number of elements with any identifiers In practice, the number of keys can reach tens of millions of items. -The hash table will be preallocated (this will make dictionary load faster), if the is approx number of total rows is known, this is supported only if the source is `clickhouse` without any `` (since in case of `` you can filter out too much rows and the dictionary will allocate too much memory, that will not be used eventually). +If `preallocate` is `true` (default is `false`) the hash table will be preallocated (this will make dictionary load faster). But note that you should use it only if: +- the source support approximate number of elements (for now it is supported only by the `ClickHouse` source) +- there is no duplicates in the data (otherwise it may increase memory usage for the hashtable) All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety. @@ -103,21 +105,23 @@ Configuration example: ``` xml - + + 0 + ``` or ``` sql -LAYOUT(HASHED()) +LAYOUT(HASHED(PREALLOCATE 0)) ``` ### sparse_hashed {#dicts-external_dicts_dict_layout-sparse_hashed} Similar to `hashed`, but uses less memory in favor more CPU usage. -It will be also preallocated so as `hashed`, note that it is even more significant for `sparse_hashed`. +It will be also preallocated so as `hashed` (with `preallocate` set to `true`), and note that it is even more significant for `sparse_hashed`. Configuration example: @@ -127,8 +131,10 @@ Configuration example: ``` +or + ``` sql -LAYOUT(SPARSE_HASHED()) +LAYOUT(SPARSE_HASHED([PREALLOCATE 0])) ``` ### complex_key_hashed {#complex-key-hashed} diff --git a/src/Dictionaries/CassandraDictionarySource.cpp b/src/Dictionaries/CassandraDictionarySource.cpp index b4f7f3221bc..1e53310bb72 100644 --- a/src/Dictionaries/CassandraDictionarySource.cpp +++ b/src/Dictionaries/CassandraDictionarySource.cpp @@ -132,7 +132,7 @@ void CassandraDictionarySource::maybeAllowFiltering(String & query) const query += " ALLOW FILTERING;"; } -BlockInputStreamPtr CassandraDictionarySource::loadAll() +BlockInputStreamPtr CassandraDictionarySource::loadAll(std::atomic * /* result_size_hint */) { String query = query_builder.composeLoadAllQuery(); maybeAllowFiltering(query); @@ -183,7 +183,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu return std::make_shared(streams, nullptr, settings.max_threads); } -BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll() +BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource"); } diff --git a/src/Dictionaries/CassandraDictionarySource.h b/src/Dictionaries/CassandraDictionarySource.h index c0a4e774d23..bf253f8675d 100644 --- a/src/Dictionaries/CassandraDictionarySource.h +++ b/src/Dictionaries/CassandraDictionarySource.h @@ -49,7 +49,7 @@ public: const String & config_prefix, Block & sample_block); - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; bool supportsSelectiveLoad() const override { return true; } @@ -66,7 +66,7 @@ public: BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; String toString() const override; diff --git a/src/Dictionaries/ClickHouseDictionarySource.cpp b/src/Dictionaries/ClickHouseDictionarySource.cpp index fdb0d76a8d7..1b2cd412db2 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -105,15 +105,15 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate() } } -BlockInputStreamPtr ClickHouseDictionarySource::loadAll() +BlockInputStreamPtr ClickHouseDictionarySource::loadAll(std::atomic * result_size_hint) { - return createStreamForQuery(load_all_query); + return createStreamForQuery(load_all_query, result_size_hint); } -BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll(std::atomic * result_size_hint) { String load_update_query = getUpdateFieldAndDate(); - return createStreamForQuery(load_update_query); + return createStreamForQuery(load_update_query, result_size_hint); } BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector & ids) @@ -152,19 +152,32 @@ std::string ClickHouseDictionarySource::toString() const return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where); } -BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query) +BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic * result_size_hint) { + BlockInputStreamPtr stream; + /// Sample block should not contain first row default values auto empty_sample_block = sample_block.cloneEmpty(); if (configuration.is_local) { - auto stream = executeQuery(query, context, true).getInputStream(); + stream = executeQuery(query, context, true).getInputStream(); stream = std::make_shared(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position); - return stream; + } + else + { + stream = std::make_shared(pool, query, empty_sample_block, context); } - return std::make_shared(pool, query, empty_sample_block, context); + if (result_size_hint) + { + stream->setProgressCallback([result_size_hint](const Progress & progress) + { + *result_size_hint += progress.total_rows_to_read; + }); + } + + return stream; } std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const diff --git a/src/Dictionaries/ClickHouseDictionarySource.h b/src/Dictionaries/ClickHouseDictionarySource.h index 21c290ab23b..ce5de7610c1 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.h +++ b/src/Dictionaries/ClickHouseDictionarySource.h @@ -43,9 +43,9 @@ public: ClickHouseDictionarySource(const ClickHouseDictionarySource & other); ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * result_size_hint = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * result_size_hint = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; @@ -67,7 +67,7 @@ public: private: std::string getUpdateFieldAndDate(); - BlockInputStreamPtr createStreamForQuery(const String & query); + BlockInputStreamPtr createStreamForQuery(const String & query, std::atomic * result_size_hint = nullptr); std::string doInvalidateQuery(const std::string & request) const; diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index 13feab2071a..3f2a029212d 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -101,7 +101,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar { } -BlockInputStreamPtr ExecutableDictionarySource::loadAll() +BlockInputStreamPtr ExecutableDictionarySource::loadAll(std::atomic * /* result_size_hint */) { if (implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method"); @@ -112,7 +112,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll() return std::make_shared(log, input_stream, std::move(process)); } -BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { if (implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method"); diff --git a/src/Dictionaries/ExecutableDictionarySource.h b/src/Dictionaries/ExecutableDictionarySource.h index 878cb086873..6103a09882a 100644 --- a/src/Dictionaries/ExecutableDictionarySource.h +++ b/src/Dictionaries/ExecutableDictionarySource.h @@ -25,13 +25,13 @@ public: ExecutableDictionarySource(const ExecutableDictionarySource & other); ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */) override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index e920b8392d6..09f0be3c7d9 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -68,12 +68,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP { } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll() +BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); } diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.h b/src/Dictionaries/ExecutablePoolDictionarySource.h index 7a0b8681a21..fb235eabaea 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.h +++ b/src/Dictionaries/ExecutablePoolDictionarySource.h @@ -47,13 +47,13 @@ public: ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other); ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/FileDictionarySource.cpp b/src/Dictionaries/FileDictionarySource.cpp index 378c6f11857..2688cf74375 100644 --- a/src/Dictionaries/FileDictionarySource.cpp +++ b/src/Dictionaries/FileDictionarySource.cpp @@ -61,7 +61,7 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other) } -BlockInputStreamPtr FileDictionarySource::loadAll() +BlockInputStreamPtr FileDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString()); auto in_ptr = std::make_unique(filepath); diff --git a/src/Dictionaries/FileDictionarySource.h b/src/Dictionaries/FileDictionarySource.h index 6559503cccd..f556f3156b0 100644 --- a/src/Dictionaries/FileDictionarySource.h +++ b/src/Dictionaries/FileDictionarySource.h @@ -21,9 +21,9 @@ public: FileDictionarySource(const FileDictionarySource & other); - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource"); } diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index b674d593444..fef907ce0e2 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -104,7 +104,7 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri) } } -BlockInputStreamPtr HTTPDictionarySource::loadAll() +BlockInputStreamPtr HTTPDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, "loadAll {}", toString()); Poco::URI uri(url); @@ -115,7 +115,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll() return std::make_shared>(input_stream, std::move(in_ptr)); } -BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() +BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { Poco::URI uri(url); getUpdateFieldAndDate(uri); diff --git a/src/Dictionaries/HTTPDictionarySource.h b/src/Dictionaries/HTTPDictionarySource.h index c42c67ec8c9..9143adaec9f 100644 --- a/src/Dictionaries/HTTPDictionarySource.h +++ b/src/Dictionaries/HTTPDictionarySource.h @@ -32,9 +32,9 @@ public: HTTPDictionarySource(const HTTPDictionarySource & other); HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/HashedDictionary.cpp b/src/Dictionaries/HashedDictionary.cpp index b0b62760fb2..d52906900e6 100644 --- a/src/Dictionaries/HashedDictionary.cpp +++ b/src/Dictionaries/HashedDictionary.cpp @@ -40,14 +40,12 @@ HashedDictionary::HashedDictionary( const StorageID & dict_id_, const DictionaryStructure & dict_struct_, DictionarySourcePtr source_ptr_, - const DictionaryLifetime dict_lifetime_, - bool require_nonempty_, + const HashedDictionaryStorageConfiguration & configuration_, BlockPtr update_field_loaded_block_) : IDictionary(dict_id_) , dict_struct(dict_struct_) , source_ptr(std::move(source_ptr_)) - , dict_lifetime(dict_lifetime_) - , require_nonempty(require_nonempty_) + , configuration(configuration_) , update_field_loaded_block(std::move(update_field_loaded_block_)) { createAttributes(); @@ -359,6 +357,8 @@ void HashedDictionary::createAttributes() template void HashedDictionary::updateData() { + /// NOTE: updateData() does not preallocation since it may increase memory usage. + if (!update_field_loaded_block || update_field_loaded_block->rows() == 0) { auto stream = source_ptr->loadUpdatedAll(); @@ -552,13 +552,30 @@ void HashedDictionary::loadData() { if (!source_ptr->hasUpdateField()) { - auto stream = source_ptr->loadAll(); + std::atomic new_size = 0; + + BlockInputStreamPtr stream; + if (configuration.preallocate) + stream = source_ptr->loadAll(&new_size); + else + stream = source_ptr->loadAll(); stream->readPrefix(); while (const auto block = stream->read()) { - resize(block.rows()); + if (configuration.preallocate && new_size) + { + size_t current_new_size = new_size.exchange(0); + if (current_new_size) + { + LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Preallocated {} elements", current_new_size); + resize(current_new_size); + } + } + else + resize(block.rows()); + blockToAttributes(block); } @@ -567,7 +584,7 @@ void HashedDictionary::loadData() else updateData(); - if (require_nonempty && 0 == element_count) + if (configuration.require_nonempty && 0 == element_count) throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name); @@ -710,19 +727,24 @@ void registerDictionaryHashed(DictionaryFactory & factory) const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false); + const std::string & layout_prefix = sparse ? ".layout.sparse_hashed" : ".layout.hashed"; + const bool preallocate = config.getBool(config_prefix + layout_prefix + ".preallocate", false); + + HashedDictionaryStorageConfiguration configuration{preallocate, require_nonempty, dict_lifetime}; + if (dictionary_key_type == DictionaryKeyType::simple) { if (sparse) - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); else - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); } else { if (sparse) - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); else - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); } }; diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index eeb873fb190..9980e9c701b 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -28,6 +28,13 @@ namespace DB { +struct HashedDictionaryStorageConfiguration +{ + const bool preallocate; + const bool require_nonempty; + const DictionaryLifetime lifetime; +}; + template class HashedDictionary final : public IDictionary { @@ -39,8 +46,7 @@ public: const StorageID & dict_id_, const DictionaryStructure & dict_struct_, DictionarySourcePtr source_ptr_, - const DictionaryLifetime dict_lifetime_, - bool require_nonempty_, + const HashedDictionaryStorageConfiguration & configuration_, BlockPtr update_field_loaded_block_ = nullptr); std::string getTypeName() const override @@ -75,12 +81,12 @@ public: std::shared_ptr clone() const override { - return std::make_shared>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, update_field_loaded_block); + return std::make_shared>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block); } const IDictionarySource * getSource() const override { return source_ptr.get(); } - const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; } const DictionaryStructure & getStructure() const override { return dict_struct; } @@ -218,8 +224,7 @@ private: const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; - const DictionaryLifetime dict_lifetime; - const bool require_nonempty; + const HashedDictionaryStorageConfiguration configuration; std::vector attributes; diff --git a/src/Dictionaries/IDictionarySource.h b/src/Dictionaries/IDictionarySource.h index 90f8b7f3a55..5bee1917324 100644 --- a/src/Dictionaries/IDictionarySource.h +++ b/src/Dictionaries/IDictionarySource.h @@ -4,6 +4,7 @@ #include #include +#include namespace DB @@ -19,11 +20,42 @@ using SharedDictionarySourcePtr = std::shared_ptr; class IDictionarySource { public: - /// Returns an input stream with all the data available from this source. - virtual BlockInputStreamPtr loadAll() = 0; + /** + * @param result_size_hint - approx number of rows in the stream. + * @return an input stream with all the data available from this source. + * + * NOTE: result_size_hint may be changed during you are reading (usually it + * will be non zero for the first block and zero for others, since it uses + * Progress::total_rows_approx,) from the input stream, and may be called + * in parallel, so you should use something like this: + * + * ... + * std::atomic new_size = 0; + * + * auto stream = source->loadAll(&new_size); + * stream->readPrefix(); + * + * while (const auto block = stream->read()) + * { + * if (new_size) + * { + * size_t current_new_size = new_size.exchange(0); + * if (current_new_size) + * resize(current_new_size); + * } + * else + * { + * resize(block.rows()); + * } + * } + * + * stream->readSuffix(); + * ... + */ + virtual BlockInputStreamPtr loadAll(std::atomic * result_size_hint = nullptr) = 0; /// Returns an input stream with updated data available from this source. - virtual BlockInputStreamPtr loadUpdatedAll() = 0; + virtual BlockInputStreamPtr loadUpdatedAll(std::atomic * result_size_hint = nullptr) = 0; /** Indicates whether this source supports "random access" loading of data * loadId and loadIds can only be used if this function returns true. diff --git a/src/Dictionaries/LibraryDictionarySource.cpp b/src/Dictionaries/LibraryDictionarySource.cpp index a971ba4b1be..d4b9956cbcc 100644 --- a/src/Dictionaries/LibraryDictionarySource.cpp +++ b/src/Dictionaries/LibraryDictionarySource.cpp @@ -94,7 +94,7 @@ bool LibraryDictionarySource::supportsSelectiveLoad() const } -BlockInputStreamPtr LibraryDictionarySource::loadAll() +BlockInputStreamPtr LibraryDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, "loadAll {}", toString()); return bridge_helper->loadAll(); diff --git a/src/Dictionaries/LibraryDictionarySource.h b/src/Dictionaries/LibraryDictionarySource.h index 1ab47c5a06f..25a0eed0c58 100644 --- a/src/Dictionaries/LibraryDictionarySource.h +++ b/src/Dictionaries/LibraryDictionarySource.h @@ -47,9 +47,9 @@ public: ~LibraryDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource"); } diff --git a/src/Dictionaries/MongoDBDictionarySource.cpp b/src/Dictionaries/MongoDBDictionarySource.cpp index 0ab45dc4593..35962bb463f 100644 --- a/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/src/Dictionaries/MongoDBDictionarySource.cpp @@ -143,7 +143,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & MongoDBDictionarySource::~MongoDBDictionarySource() = default; -BlockInputStreamPtr MongoDBDictionarySource::loadAll() +BlockInputStreamPtr MongoDBDictionarySource::loadAll(std::atomic * /* result_size_hint */) { return std::make_shared(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); } diff --git a/src/Dictionaries/MongoDBDictionarySource.h b/src/Dictionaries/MongoDBDictionarySource.h index fef5749190f..e3c904da5c6 100644 --- a/src/Dictionaries/MongoDBDictionarySource.h +++ b/src/Dictionaries/MongoDBDictionarySource.h @@ -46,9 +46,9 @@ public: ~MongoDBDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource"); } diff --git a/src/Dictionaries/MySQLDictionarySource.cpp b/src/Dictionaries/MySQLDictionarySource.cpp index 676863ae588..c501797e0af 100644 --- a/src/Dictionaries/MySQLDictionarySource.cpp +++ b/src/Dictionaries/MySQLDictionarySource.cpp @@ -126,7 +126,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query) pool, query, sample_block, settings); } -BlockInputStreamPtr MySQLDictionarySource::loadAll() +BlockInputStreamPtr MySQLDictionarySource::loadAll(std::atomic * /* result_size_hint */) { auto connection = pool->get(); last_modification = getLastModification(connection, false); @@ -135,7 +135,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll() return loadFromQuery(load_all_query); } -BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll() +BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { auto connection = pool->get(); last_modification = getLastModification(connection, false); diff --git a/src/Dictionaries/MySQLDictionarySource.h b/src/Dictionaries/MySQLDictionarySource.h index ef1d81b862f..4c58a9d82bd 100644 --- a/src/Dictionaries/MySQLDictionarySource.h +++ b/src/Dictionaries/MySQLDictionarySource.h @@ -42,9 +42,9 @@ public: MySQLDictionarySource(const MySQLDictionarySource & other); MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index 051ee94ef3e..61674810e75 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -65,14 +65,14 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar } -BlockInputStreamPtr PostgreSQLDictionarySource::loadAll() +BlockInputStreamPtr PostgreSQLDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, load_all_query); return loadBase(load_all_query); } -BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll() +BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { auto load_update_query = getUpdateFieldAndDate(); LOG_TRACE(log, load_update_query); diff --git a/src/Dictionaries/PostgreSQLDictionarySource.h b/src/Dictionaries/PostgreSQLDictionarySource.h index aff340aa796..0a03c77aa97 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.h +++ b/src/Dictionaries/PostgreSQLDictionarySource.h @@ -33,8 +33,8 @@ public: PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other); PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; diff --git a/src/Dictionaries/RedisDictionarySource.cpp b/src/Dictionaries/RedisDictionarySource.cpp index 8144b37e63d..46c0926ce60 100644 --- a/src/Dictionaries/RedisDictionarySource.cpp +++ b/src/Dictionaries/RedisDictionarySource.cpp @@ -160,7 +160,7 @@ namespace DB __builtin_unreachable(); } - BlockInputStreamPtr RedisDictionarySource::loadAll() + BlockInputStreamPtr RedisDictionarySource::loadAll(std::atomic * /* result_size_hint */) { if (!client->isConnected()) client->connect(host, port); diff --git a/src/Dictionaries/RedisDictionarySource.h b/src/Dictionaries/RedisDictionarySource.h index b2c5859decd..b4c43fa897d 100644 --- a/src/Dictionaries/RedisDictionarySource.h +++ b/src/Dictionaries/RedisDictionarySource.h @@ -59,9 +59,9 @@ namespace ErrorCodes ~RedisDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource"); } diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index 5774641a90f..db9f2e7d6aa 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -166,14 +166,14 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate() } -BlockInputStreamPtr XDBCDictionarySource::loadAll() +BlockInputStreamPtr XDBCDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, load_all_query); return loadFromQuery(bridge_url, sample_block, load_all_query); } -BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll() +BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { std::string load_query_update = getUpdateFieldAndDate(); diff --git a/src/Dictionaries/XDBCDictionarySource.h b/src/Dictionaries/XDBCDictionarySource.h index bd473e0db8a..6cdd8a30a61 100644 --- a/src/Dictionaries/XDBCDictionarySource.h +++ b/src/Dictionaries/XDBCDictionarySource.h @@ -38,9 +38,9 @@ public: XDBCDictionarySource(const XDBCDictionarySource & other); XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.reference b/tests/queries/0_stateless/01509_dictionary_preallocate.reference index b072d6673db..2f1e1d2c386 100644 --- a/tests/queries/0_stateless/01509_dictionary_preallocate.reference +++ b/tests/queries/0_stateless/01509_dictionary_preallocate.reference @@ -1,4 +1,9 @@ -CREATE DICTIONARY db_01509.dict\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() USER \'default\' TABLE \'data\' PASSWORD \'\' DB \'db_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED()) +CREATE DICTIONARY default.dict_01509\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 0)) +CREATE DICTIONARY default.dict_01509_preallocate\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 1)) +HashedDictionary: Preallocated 10000 elements - 0 -1000 +10000 +- +0 +10000 diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.sh b/tests/queries/0_stateless/01509_dictionary_preallocate.sh new file mode 100755 index 00000000000..f2bc17d2e90 --- /dev/null +++ b/tests/queries/0_stateless/01509_dictionary_preallocate.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash + +# shellcheck disable=SC2031 +# shellcheck disable=SC2030 + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS data_01509; + DROP DICTIONARY IF EXISTS dict_01509; + DROP DICTIONARY IF EXISTS dict_01509_preallocate; + + CREATE TABLE data_01509 + ( + key UInt64, + value String + ) + ENGINE = MergeTree() + ORDER BY key; + INSERT INTO data_01509 SELECT number key, toString(number) value FROM numbers(10e3); +" + +# regular +$CLICKHOUSE_CLIENT -nm -q " + CREATE DICTIONARY dict_01509 + ( + key UInt64, + value String DEFAULT '-' + ) + PRIMARY KEY key + SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509')) + LAYOUT(SPARSE_HASHED(PREALLOCATE 0)) + LIFETIME(0); + SHOW CREATE DICTIONARY dict_01509; +" +( + # start new shell to avoid overriding variables for other client invocation + CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace} + $CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509" |& grep -o "HashedDictionary.*" +) + +# with preallocation +$CLICKHOUSE_CLIENT -nm -q " + CREATE DICTIONARY dict_01509_preallocate + ( + key UInt64, + value String DEFAULT '-' + ) + PRIMARY KEY key + SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509')) + LAYOUT(SPARSE_HASHED(PREALLOCATE 1)) + LIFETIME(0); + SHOW CREATE DICTIONARY dict_01509_preallocate; + SYSTEM RELOAD DICTIONARY dict_01509_preallocate; +" +( + # start new shell to avoid overriding variables for other client invocation + CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace} + $CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509_preallocate" |& grep -o "HashedDictionary.*" +) + +$CLICKHOUSE_CLIENT -nm -q " + SELECT dictGet('dict_01509', 'value', toUInt64(1e12)); + SELECT dictGet('dict_01509', 'value', toUInt64(0)); + SELECT count() FROM dict_01509; + + SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(1e12)); + SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(0)); + SELECT count() FROM dict_01509_preallocate; +" + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE data_01509; + DROP DICTIONARY dict_01509; + DROP DICTIONARY dict_01509_preallocate; +" diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.sql b/tests/queries/0_stateless/01509_dictionary_preallocate.sql deleted file mode 100644 index c5ed0e1fbde..00000000000 --- a/tests/queries/0_stateless/01509_dictionary_preallocate.sql +++ /dev/null @@ -1,36 +0,0 @@ --- The test itself does not test does preallocation works --- It simply check SPARSE_HASHED dictionary with bunch of dictGet() --- (since at the moment of writing there were no such test) - -DROP DATABASE IF EXISTS db_01509; -CREATE DATABASE db_01509; - -CREATE TABLE db_01509.data -( - key UInt64, - value String -) -ENGINE = MergeTree() -ORDER BY key; -INSERT INTO db_01509.data SELECT number key, toString(number) value FROM numbers(1000); - -DROP DICTIONARY IF EXISTS db_01509.dict; -CREATE DICTIONARY db_01509.dict -( - key UInt64, - value String DEFAULT '-' -) -PRIMARY KEY key -SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'data' PASSWORD '' DB 'db_01509')) -LAYOUT(SPARSE_HASHED()) -LIFETIME(0); - -SHOW CREATE DICTIONARY db_01509.dict; - -SYSTEM RELOAD DICTIONARY db_01509.dict; - -SELECT dictGet('db_01509.dict', 'value', toUInt64(1e12)); -SELECT dictGet('db_01509.dict', 'value', toUInt64(0)); -SELECT count() FROM db_01509.dict; - -DROP DATABASE IF EXISTS db_01509; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index e8edd142835..ffa5e21be80 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -348,6 +348,7 @@ "01508_partition_pruning_long", "01509_check_parallel_quorum_inserts_long", "01509_parallel_quorum_and_merge_long", + "01509_dictionary_preallocate", "01515_mv_and_array_join_optimisation_bag", "01516_create_table_primary_key", "01517_drop_mv_with_inner_table",