From 808d1a02158201ea0d8bc5f34b02e718f9fa4a22 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 9 May 2021 12:12:21 +0300 Subject: [PATCH] Reimplement preallocate for hashed/sparse_hashed dictionaries It was initially implemented in #15454, but was reverted in #21948 (due to higher memory usage). This implementation differs from the initial, since now there is separate attribute to enable preallocation, before it was done automatically, but this has problems with duplicates in the source. Plus this implementation does not uses dynamic_cast, instead it extends IDictionarySource interface. --- .../external-dicts-dict-layout.md | 16 ++-- .../CassandraDictionarySource.cpp | 4 +- src/Dictionaries/CassandraDictionarySource.h | 4 +- .../ClickHouseDictionarySource.cpp | 29 +++++-- src/Dictionaries/ClickHouseDictionarySource.h | 6 +- .../ExecutableDictionarySource.cpp | 4 +- src/Dictionaries/ExecutableDictionarySource.h | 4 +- .../ExecutablePoolDictionarySource.cpp | 4 +- .../ExecutablePoolDictionarySource.h | 4 +- src/Dictionaries/FileDictionarySource.cpp | 2 +- src/Dictionaries/FileDictionarySource.h | 4 +- src/Dictionaries/HTTPDictionarySource.cpp | 4 +- src/Dictionaries/HTTPDictionarySource.h | 4 +- src/Dictionaries/HashedDictionary.cpp | 44 ++++++++--- src/Dictionaries/HashedDictionary.h | 17 ++-- src/Dictionaries/IDictionarySource.h | 38 ++++++++- src/Dictionaries/LibraryDictionarySource.cpp | 2 +- src/Dictionaries/LibraryDictionarySource.h | 4 +- src/Dictionaries/MongoDBDictionarySource.cpp | 2 +- src/Dictionaries/MongoDBDictionarySource.h | 4 +- src/Dictionaries/MySQLDictionarySource.cpp | 4 +- src/Dictionaries/MySQLDictionarySource.h | 4 +- .../PostgreSQLDictionarySource.cpp | 4 +- src/Dictionaries/PostgreSQLDictionarySource.h | 4 +- src/Dictionaries/RedisDictionarySource.cpp | 2 +- src/Dictionaries/RedisDictionarySource.h | 4 +- src/Dictionaries/XDBCDictionarySource.cpp | 4 +- src/Dictionaries/XDBCDictionarySource.h | 4 +- .../01509_dictionary_preallocate.reference | 9 ++- .../01509_dictionary_preallocate.sh | 78 +++++++++++++++++++ .../01509_dictionary_preallocate.sql | 36 --------- tests/queries/skip_list.json | 1 + 32 files changed, 240 insertions(+), 114 deletions(-) create mode 100755 tests/queries/0_stateless/01509_dictionary_preallocate.sh delete mode 100644 tests/queries/0_stateless/01509_dictionary_preallocate.sql diff --git a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md index c5889a8c185..3c6eb5a0d62 100644 --- a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md +++ b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-layout.md @@ -95,7 +95,9 @@ LAYOUT(FLAT(INITIAL_ARRAY_SIZE 50000 MAX_ARRAY_SIZE 5000000)) The dictionary is completely stored in memory in the form of a hash table. The dictionary can contain any number of elements with any identifiers In practice, the number of keys can reach tens of millions of items. -The hash table will be preallocated (this will make dictionary load faster), if the is approx number of total rows is known, this is supported only if the source is `clickhouse` without any `` (since in case of `` you can filter out too much rows and the dictionary will allocate too much memory, that will not be used eventually). +If `preallocate` is `true` (default is `false`) the hash table will be preallocated (this will make dictionary load faster). But note that you should use it only if: +- the source support approximate number of elements (for now it is supported only by the `ClickHouse` source) +- there is no duplicates in the data (otherwise it may increase memory usage for the hashtable) All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety. @@ -103,21 +105,23 @@ Configuration example: ``` xml - + + 0 + ``` or ``` sql -LAYOUT(HASHED()) +LAYOUT(HASHED(PREALLOCATE 0)) ``` ### sparse_hashed {#dicts-external_dicts_dict_layout-sparse_hashed} Similar to `hashed`, but uses less memory in favor more CPU usage. -It will be also preallocated so as `hashed`, note that it is even more significant for `sparse_hashed`. +It will be also preallocated so as `hashed` (with `preallocate` set to `true`), and note that it is even more significant for `sparse_hashed`. Configuration example: @@ -127,8 +131,10 @@ Configuration example: ``` +or + ``` sql -LAYOUT(SPARSE_HASHED()) +LAYOUT(SPARSE_HASHED([PREALLOCATE 0])) ``` ### complex_key_hashed {#complex-key-hashed} diff --git a/src/Dictionaries/CassandraDictionarySource.cpp b/src/Dictionaries/CassandraDictionarySource.cpp index b4f7f3221bc..1e53310bb72 100644 --- a/src/Dictionaries/CassandraDictionarySource.cpp +++ b/src/Dictionaries/CassandraDictionarySource.cpp @@ -132,7 +132,7 @@ void CassandraDictionarySource::maybeAllowFiltering(String & query) const query += " ALLOW FILTERING;"; } -BlockInputStreamPtr CassandraDictionarySource::loadAll() +BlockInputStreamPtr CassandraDictionarySource::loadAll(std::atomic * /* result_size_hint */) { String query = query_builder.composeLoadAllQuery(); maybeAllowFiltering(query); @@ -183,7 +183,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu return std::make_shared(streams, nullptr, settings.max_threads); } -BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll() +BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for CassandraDictionarySource"); } diff --git a/src/Dictionaries/CassandraDictionarySource.h b/src/Dictionaries/CassandraDictionarySource.h index c0a4e774d23..bf253f8675d 100644 --- a/src/Dictionaries/CassandraDictionarySource.h +++ b/src/Dictionaries/CassandraDictionarySource.h @@ -49,7 +49,7 @@ public: const String & config_prefix, Block & sample_block); - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; bool supportsSelectiveLoad() const override { return true; } @@ -66,7 +66,7 @@ public: BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; String toString() const override; diff --git a/src/Dictionaries/ClickHouseDictionarySource.cpp b/src/Dictionaries/ClickHouseDictionarySource.cpp index fdb0d76a8d7..1b2cd412db2 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -105,15 +105,15 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate() } } -BlockInputStreamPtr ClickHouseDictionarySource::loadAll() +BlockInputStreamPtr ClickHouseDictionarySource::loadAll(std::atomic * result_size_hint) { - return createStreamForQuery(load_all_query); + return createStreamForQuery(load_all_query, result_size_hint); } -BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll(std::atomic * result_size_hint) { String load_update_query = getUpdateFieldAndDate(); - return createStreamForQuery(load_update_query); + return createStreamForQuery(load_update_query, result_size_hint); } BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector & ids) @@ -152,19 +152,32 @@ std::string ClickHouseDictionarySource::toString() const return "ClickHouse: " + configuration.db + '.' + configuration.table + (where.empty() ? "" : ", where: " + where); } -BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query) +BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query, std::atomic * result_size_hint) { + BlockInputStreamPtr stream; + /// Sample block should not contain first row default values auto empty_sample_block = sample_block.cloneEmpty(); if (configuration.is_local) { - auto stream = executeQuery(query, context, true).getInputStream(); + stream = executeQuery(query, context, true).getInputStream(); stream = std::make_shared(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position); - return stream; + } + else + { + stream = std::make_shared(pool, query, empty_sample_block, context); } - return std::make_shared(pool, query, empty_sample_block, context); + if (result_size_hint) + { + stream->setProgressCallback([result_size_hint](const Progress & progress) + { + *result_size_hint += progress.total_rows_to_read; + }); + } + + return stream; } std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const diff --git a/src/Dictionaries/ClickHouseDictionarySource.h b/src/Dictionaries/ClickHouseDictionarySource.h index 21c290ab23b..ce5de7610c1 100644 --- a/src/Dictionaries/ClickHouseDictionarySource.h +++ b/src/Dictionaries/ClickHouseDictionarySource.h @@ -43,9 +43,9 @@ public: ClickHouseDictionarySource(const ClickHouseDictionarySource & other); ClickHouseDictionarySource & operator=(const ClickHouseDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * result_size_hint = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * result_size_hint = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; @@ -67,7 +67,7 @@ public: private: std::string getUpdateFieldAndDate(); - BlockInputStreamPtr createStreamForQuery(const String & query); + BlockInputStreamPtr createStreamForQuery(const String & query, std::atomic * result_size_hint = nullptr); std::string doInvalidateQuery(const std::string & request) const; diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index 13feab2071a..3f2a029212d 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -101,7 +101,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar { } -BlockInputStreamPtr ExecutableDictionarySource::loadAll() +BlockInputStreamPtr ExecutableDictionarySource::loadAll(std::atomic * /* result_size_hint */) { if (implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method"); @@ -112,7 +112,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll() return std::make_shared(log, input_stream, std::move(process)); } -BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { if (implicit_key) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method"); diff --git a/src/Dictionaries/ExecutableDictionarySource.h b/src/Dictionaries/ExecutableDictionarySource.h index 878cb086873..6103a09882a 100644 --- a/src/Dictionaries/ExecutableDictionarySource.h +++ b/src/Dictionaries/ExecutableDictionarySource.h @@ -25,13 +25,13 @@ public: ExecutableDictionarySource(const ExecutableDictionarySource & other); ExecutableDictionarySource & operator=(const ExecutableDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */) override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index e920b8392d6..09f0be3c7d9 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -68,12 +68,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP { } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll() +BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); } -BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll() +BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method"); } diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.h b/src/Dictionaries/ExecutablePoolDictionarySource.h index 7a0b8681a21..fb235eabaea 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.h +++ b/src/Dictionaries/ExecutablePoolDictionarySource.h @@ -47,13 +47,13 @@ public: ExecutablePoolDictionarySource(const ExecutablePoolDictionarySource & other); ExecutablePoolDictionarySource & operator=(const ExecutablePoolDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; /** The logic of this method is flawed, absolutely incorrect and ignorant. * It may lead to skipping some values due to clock sync or timezone changes. * The intended usage of "update_field" is totally different. */ - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/FileDictionarySource.cpp b/src/Dictionaries/FileDictionarySource.cpp index 378c6f11857..2688cf74375 100644 --- a/src/Dictionaries/FileDictionarySource.cpp +++ b/src/Dictionaries/FileDictionarySource.cpp @@ -61,7 +61,7 @@ FileDictionarySource::FileDictionarySource(const FileDictionarySource & other) } -BlockInputStreamPtr FileDictionarySource::loadAll() +BlockInputStreamPtr FileDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(&Poco::Logger::get("FileDictionary"), "loadAll {}", toString()); auto in_ptr = std::make_unique(filepath); diff --git a/src/Dictionaries/FileDictionarySource.h b/src/Dictionaries/FileDictionarySource.h index 6559503cccd..f556f3156b0 100644 --- a/src/Dictionaries/FileDictionarySource.h +++ b/src/Dictionaries/FileDictionarySource.h @@ -21,9 +21,9 @@ public: FileDictionarySource(const FileDictionarySource & other); - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for FileDictionarySource"); } diff --git a/src/Dictionaries/HTTPDictionarySource.cpp b/src/Dictionaries/HTTPDictionarySource.cpp index b674d593444..fef907ce0e2 100644 --- a/src/Dictionaries/HTTPDictionarySource.cpp +++ b/src/Dictionaries/HTTPDictionarySource.cpp @@ -104,7 +104,7 @@ void HTTPDictionarySource::getUpdateFieldAndDate(Poco::URI & uri) } } -BlockInputStreamPtr HTTPDictionarySource::loadAll() +BlockInputStreamPtr HTTPDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, "loadAll {}", toString()); Poco::URI uri(url); @@ -115,7 +115,7 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll() return std::make_shared>(input_stream, std::move(in_ptr)); } -BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll() +BlockInputStreamPtr HTTPDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { Poco::URI uri(url); getUpdateFieldAndDate(uri); diff --git a/src/Dictionaries/HTTPDictionarySource.h b/src/Dictionaries/HTTPDictionarySource.h index c42c67ec8c9..9143adaec9f 100644 --- a/src/Dictionaries/HTTPDictionarySource.h +++ b/src/Dictionaries/HTTPDictionarySource.h @@ -32,9 +32,9 @@ public: HTTPDictionarySource(const HTTPDictionarySource & other); HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/HashedDictionary.cpp b/src/Dictionaries/HashedDictionary.cpp index b0b62760fb2..d52906900e6 100644 --- a/src/Dictionaries/HashedDictionary.cpp +++ b/src/Dictionaries/HashedDictionary.cpp @@ -40,14 +40,12 @@ HashedDictionary::HashedDictionary( const StorageID & dict_id_, const DictionaryStructure & dict_struct_, DictionarySourcePtr source_ptr_, - const DictionaryLifetime dict_lifetime_, - bool require_nonempty_, + const HashedDictionaryStorageConfiguration & configuration_, BlockPtr update_field_loaded_block_) : IDictionary(dict_id_) , dict_struct(dict_struct_) , source_ptr(std::move(source_ptr_)) - , dict_lifetime(dict_lifetime_) - , require_nonempty(require_nonempty_) + , configuration(configuration_) , update_field_loaded_block(std::move(update_field_loaded_block_)) { createAttributes(); @@ -359,6 +357,8 @@ void HashedDictionary::createAttributes() template void HashedDictionary::updateData() { + /// NOTE: updateData() does not preallocation since it may increase memory usage. + if (!update_field_loaded_block || update_field_loaded_block->rows() == 0) { auto stream = source_ptr->loadUpdatedAll(); @@ -552,13 +552,30 @@ void HashedDictionary::loadData() { if (!source_ptr->hasUpdateField()) { - auto stream = source_ptr->loadAll(); + std::atomic new_size = 0; + + BlockInputStreamPtr stream; + if (configuration.preallocate) + stream = source_ptr->loadAll(&new_size); + else + stream = source_ptr->loadAll(); stream->readPrefix(); while (const auto block = stream->read()) { - resize(block.rows()); + if (configuration.preallocate && new_size) + { + size_t current_new_size = new_size.exchange(0); + if (current_new_size) + { + LOG_TRACE(&Poco::Logger::get("HashedDictionary"), "Preallocated {} elements", current_new_size); + resize(current_new_size); + } + } + else + resize(block.rows()); + blockToAttributes(block); } @@ -567,7 +584,7 @@ void HashedDictionary::loadData() else updateData(); - if (require_nonempty && 0 == element_count) + if (configuration.require_nonempty && 0 == element_count) throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY, "{}: dictionary source is empty and 'require_nonempty' property is set.", full_name); @@ -710,19 +727,24 @@ void registerDictionaryHashed(DictionaryFactory & factory) const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false); + const std::string & layout_prefix = sparse ? ".layout.sparse_hashed" : ".layout.hashed"; + const bool preallocate = config.getBool(config_prefix + layout_prefix + ".preallocate", false); + + HashedDictionaryStorageConfiguration configuration{preallocate, require_nonempty, dict_lifetime}; + if (dictionary_key_type == DictionaryKeyType::simple) { if (sparse) - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); else - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); } else { if (sparse) - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); else - return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty); + return std::make_unique>(dict_id, dict_struct, std::move(source_ptr), configuration); } }; diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index eeb873fb190..9980e9c701b 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -28,6 +28,13 @@ namespace DB { +struct HashedDictionaryStorageConfiguration +{ + const bool preallocate; + const bool require_nonempty; + const DictionaryLifetime lifetime; +}; + template class HashedDictionary final : public IDictionary { @@ -39,8 +46,7 @@ public: const StorageID & dict_id_, const DictionaryStructure & dict_struct_, DictionarySourcePtr source_ptr_, - const DictionaryLifetime dict_lifetime_, - bool require_nonempty_, + const HashedDictionaryStorageConfiguration & configuration_, BlockPtr update_field_loaded_block_ = nullptr); std::string getTypeName() const override @@ -75,12 +81,12 @@ public: std::shared_ptr clone() const override { - return std::make_shared>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, update_field_loaded_block); + return std::make_shared>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block); } const IDictionarySource * getSource() const override { return source_ptr.get(); } - const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; } const DictionaryStructure & getStructure() const override { return dict_struct; } @@ -218,8 +224,7 @@ private: const DictionaryStructure dict_struct; const DictionarySourcePtr source_ptr; - const DictionaryLifetime dict_lifetime; - const bool require_nonempty; + const HashedDictionaryStorageConfiguration configuration; std::vector attributes; diff --git a/src/Dictionaries/IDictionarySource.h b/src/Dictionaries/IDictionarySource.h index 90f8b7f3a55..5bee1917324 100644 --- a/src/Dictionaries/IDictionarySource.h +++ b/src/Dictionaries/IDictionarySource.h @@ -4,6 +4,7 @@ #include #include +#include namespace DB @@ -19,11 +20,42 @@ using SharedDictionarySourcePtr = std::shared_ptr; class IDictionarySource { public: - /// Returns an input stream with all the data available from this source. - virtual BlockInputStreamPtr loadAll() = 0; + /** + * @param result_size_hint - approx number of rows in the stream. + * @return an input stream with all the data available from this source. + * + * NOTE: result_size_hint may be changed during you are reading (usually it + * will be non zero for the first block and zero for others, since it uses + * Progress::total_rows_approx,) from the input stream, and may be called + * in parallel, so you should use something like this: + * + * ... + * std::atomic new_size = 0; + * + * auto stream = source->loadAll(&new_size); + * stream->readPrefix(); + * + * while (const auto block = stream->read()) + * { + * if (new_size) + * { + * size_t current_new_size = new_size.exchange(0); + * if (current_new_size) + * resize(current_new_size); + * } + * else + * { + * resize(block.rows()); + * } + * } + * + * stream->readSuffix(); + * ... + */ + virtual BlockInputStreamPtr loadAll(std::atomic * result_size_hint = nullptr) = 0; /// Returns an input stream with updated data available from this source. - virtual BlockInputStreamPtr loadUpdatedAll() = 0; + virtual BlockInputStreamPtr loadUpdatedAll(std::atomic * result_size_hint = nullptr) = 0; /** Indicates whether this source supports "random access" loading of data * loadId and loadIds can only be used if this function returns true. diff --git a/src/Dictionaries/LibraryDictionarySource.cpp b/src/Dictionaries/LibraryDictionarySource.cpp index a971ba4b1be..d4b9956cbcc 100644 --- a/src/Dictionaries/LibraryDictionarySource.cpp +++ b/src/Dictionaries/LibraryDictionarySource.cpp @@ -94,7 +94,7 @@ bool LibraryDictionarySource::supportsSelectiveLoad() const } -BlockInputStreamPtr LibraryDictionarySource::loadAll() +BlockInputStreamPtr LibraryDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, "loadAll {}", toString()); return bridge_helper->loadAll(); diff --git a/src/Dictionaries/LibraryDictionarySource.h b/src/Dictionaries/LibraryDictionarySource.h index 1ab47c5a06f..25a0eed0c58 100644 --- a/src/Dictionaries/LibraryDictionarySource.h +++ b/src/Dictionaries/LibraryDictionarySource.h @@ -47,9 +47,9 @@ public: ~LibraryDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for LibraryDictionarySource"); } diff --git a/src/Dictionaries/MongoDBDictionarySource.cpp b/src/Dictionaries/MongoDBDictionarySource.cpp index 0ab45dc4593..35962bb463f 100644 --- a/src/Dictionaries/MongoDBDictionarySource.cpp +++ b/src/Dictionaries/MongoDBDictionarySource.cpp @@ -143,7 +143,7 @@ MongoDBDictionarySource::MongoDBDictionarySource(const MongoDBDictionarySource & MongoDBDictionarySource::~MongoDBDictionarySource() = default; -BlockInputStreamPtr MongoDBDictionarySource::loadAll() +BlockInputStreamPtr MongoDBDictionarySource::loadAll(std::atomic * /* result_size_hint */) { return std::make_shared(connection, createCursor(db, collection, sample_block), sample_block, max_block_size); } diff --git a/src/Dictionaries/MongoDBDictionarySource.h b/src/Dictionaries/MongoDBDictionarySource.h index fef5749190f..e3c904da5c6 100644 --- a/src/Dictionaries/MongoDBDictionarySource.h +++ b/src/Dictionaries/MongoDBDictionarySource.h @@ -46,9 +46,9 @@ public: ~MongoDBDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for MongoDBDictionarySource"); } diff --git a/src/Dictionaries/MySQLDictionarySource.cpp b/src/Dictionaries/MySQLDictionarySource.cpp index 676863ae588..c501797e0af 100644 --- a/src/Dictionaries/MySQLDictionarySource.cpp +++ b/src/Dictionaries/MySQLDictionarySource.cpp @@ -126,7 +126,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query) pool, query, sample_block, settings); } -BlockInputStreamPtr MySQLDictionarySource::loadAll() +BlockInputStreamPtr MySQLDictionarySource::loadAll(std::atomic * /* result_size_hint */) { auto connection = pool->get(); last_modification = getLastModification(connection, false); @@ -135,7 +135,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll() return loadFromQuery(load_all_query); } -BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll() +BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { auto connection = pool->get(); last_modification = getLastModification(connection, false); diff --git a/src/Dictionaries/MySQLDictionarySource.h b/src/Dictionaries/MySQLDictionarySource.h index ef1d81b862f..4c58a9d82bd 100644 --- a/src/Dictionaries/MySQLDictionarySource.h +++ b/src/Dictionaries/MySQLDictionarySource.h @@ -42,9 +42,9 @@ public: MySQLDictionarySource(const MySQLDictionarySource & other); MySQLDictionarySource & operator=(const MySQLDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/src/Dictionaries/PostgreSQLDictionarySource.cpp b/src/Dictionaries/PostgreSQLDictionarySource.cpp index 051ee94ef3e..61674810e75 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.cpp +++ b/src/Dictionaries/PostgreSQLDictionarySource.cpp @@ -65,14 +65,14 @@ PostgreSQLDictionarySource::PostgreSQLDictionarySource(const PostgreSQLDictionar } -BlockInputStreamPtr PostgreSQLDictionarySource::loadAll() +BlockInputStreamPtr PostgreSQLDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, load_all_query); return loadBase(load_all_query); } -BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll() +BlockInputStreamPtr PostgreSQLDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { auto load_update_query = getUpdateFieldAndDate(); LOG_TRACE(log, load_update_query); diff --git a/src/Dictionaries/PostgreSQLDictionarySource.h b/src/Dictionaries/PostgreSQLDictionarySource.h index aff340aa796..0a03c77aa97 100644 --- a/src/Dictionaries/PostgreSQLDictionarySource.h +++ b/src/Dictionaries/PostgreSQLDictionarySource.h @@ -33,8 +33,8 @@ public: PostgreSQLDictionarySource(const PostgreSQLDictionarySource & other); PostgreSQLDictionarySource & operator=(const PostgreSQLDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override; diff --git a/src/Dictionaries/RedisDictionarySource.cpp b/src/Dictionaries/RedisDictionarySource.cpp index 8144b37e63d..46c0926ce60 100644 --- a/src/Dictionaries/RedisDictionarySource.cpp +++ b/src/Dictionaries/RedisDictionarySource.cpp @@ -160,7 +160,7 @@ namespace DB __builtin_unreachable(); } - BlockInputStreamPtr RedisDictionarySource::loadAll() + BlockInputStreamPtr RedisDictionarySource::loadAll(std::atomic * /* result_size_hint */) { if (!client->isConnected()) client->connect(host, port); diff --git a/src/Dictionaries/RedisDictionarySource.h b/src/Dictionaries/RedisDictionarySource.h index b2c5859decd..b4c43fa897d 100644 --- a/src/Dictionaries/RedisDictionarySource.h +++ b/src/Dictionaries/RedisDictionarySource.h @@ -59,9 +59,9 @@ namespace ErrorCodes ~RedisDictionarySource() override; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadUpdatedAll is unsupported for RedisDictionarySource"); } diff --git a/src/Dictionaries/XDBCDictionarySource.cpp b/src/Dictionaries/XDBCDictionarySource.cpp index 5774641a90f..db9f2e7d6aa 100644 --- a/src/Dictionaries/XDBCDictionarySource.cpp +++ b/src/Dictionaries/XDBCDictionarySource.cpp @@ -166,14 +166,14 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate() } -BlockInputStreamPtr XDBCDictionarySource::loadAll() +BlockInputStreamPtr XDBCDictionarySource::loadAll(std::atomic * /* result_size_hint */) { LOG_TRACE(log, load_all_query); return loadFromQuery(bridge_url, sample_block, load_all_query); } -BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll() +BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll(std::atomic * /* result_size_hint */) { std::string load_query_update = getUpdateFieldAndDate(); diff --git a/src/Dictionaries/XDBCDictionarySource.h b/src/Dictionaries/XDBCDictionarySource.h index bd473e0db8a..6cdd8a30a61 100644 --- a/src/Dictionaries/XDBCDictionarySource.h +++ b/src/Dictionaries/XDBCDictionarySource.h @@ -38,9 +38,9 @@ public: XDBCDictionarySource(const XDBCDictionarySource & other); XDBCDictionarySource & operator=(const XDBCDictionarySource &) = delete; - BlockInputStreamPtr loadAll() override; + BlockInputStreamPtr loadAll(std::atomic * /* result_size_hint */ = nullptr) override; - BlockInputStreamPtr loadUpdatedAll() override; + BlockInputStreamPtr loadUpdatedAll(std::atomic * /* result_size_hint */ = nullptr) override; BlockInputStreamPtr loadIds(const std::vector & ids) override; diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.reference b/tests/queries/0_stateless/01509_dictionary_preallocate.reference index b072d6673db..2f1e1d2c386 100644 --- a/tests/queries/0_stateless/01509_dictionary_preallocate.reference +++ b/tests/queries/0_stateless/01509_dictionary_preallocate.reference @@ -1,4 +1,9 @@ -CREATE DICTIONARY db_01509.dict\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() USER \'default\' TABLE \'data\' PASSWORD \'\' DB \'db_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED()) +CREATE DICTIONARY default.dict_01509\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 0)) +CREATE DICTIONARY default.dict_01509_preallocate\n(\n `key` UInt64,\n `value` String DEFAULT \'-\'\n)\nPRIMARY KEY key\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() TABLE \'data_01509\'))\nLIFETIME(MIN 0 MAX 0)\nLAYOUT(SPARSE_HASHED(PREALLOCATE 1)) +HashedDictionary: Preallocated 10000 elements - 0 -1000 +10000 +- +0 +10000 diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.sh b/tests/queries/0_stateless/01509_dictionary_preallocate.sh new file mode 100755 index 00000000000..f2bc17d2e90 --- /dev/null +++ b/tests/queries/0_stateless/01509_dictionary_preallocate.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash + +# shellcheck disable=SC2031 +# shellcheck disable=SC2030 + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE IF EXISTS data_01509; + DROP DICTIONARY IF EXISTS dict_01509; + DROP DICTIONARY IF EXISTS dict_01509_preallocate; + + CREATE TABLE data_01509 + ( + key UInt64, + value String + ) + ENGINE = MergeTree() + ORDER BY key; + INSERT INTO data_01509 SELECT number key, toString(number) value FROM numbers(10e3); +" + +# regular +$CLICKHOUSE_CLIENT -nm -q " + CREATE DICTIONARY dict_01509 + ( + key UInt64, + value String DEFAULT '-' + ) + PRIMARY KEY key + SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509')) + LAYOUT(SPARSE_HASHED(PREALLOCATE 0)) + LIFETIME(0); + SHOW CREATE DICTIONARY dict_01509; +" +( + # start new shell to avoid overriding variables for other client invocation + CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace} + $CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509" |& grep -o "HashedDictionary.*" +) + +# with preallocation +$CLICKHOUSE_CLIENT -nm -q " + CREATE DICTIONARY dict_01509_preallocate + ( + key UInt64, + value String DEFAULT '-' + ) + PRIMARY KEY key + SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() TABLE 'data_01509')) + LAYOUT(SPARSE_HASHED(PREALLOCATE 1)) + LIFETIME(0); + SHOW CREATE DICTIONARY dict_01509_preallocate; + SYSTEM RELOAD DICTIONARY dict_01509_preallocate; +" +( + # start new shell to avoid overriding variables for other client invocation + CLICKHOUSE_CLIENT=${CLICKHOUSE_CLIENT/--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}/--send_logs_level=trace} + $CLICKHOUSE_CLIENT -nm -q "SYSTEM RELOAD DICTIONARY dict_01509_preallocate" |& grep -o "HashedDictionary.*" +) + +$CLICKHOUSE_CLIENT -nm -q " + SELECT dictGet('dict_01509', 'value', toUInt64(1e12)); + SELECT dictGet('dict_01509', 'value', toUInt64(0)); + SELECT count() FROM dict_01509; + + SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(1e12)); + SELECT dictGet('dict_01509_preallocate', 'value', toUInt64(0)); + SELECT count() FROM dict_01509_preallocate; +" + +$CLICKHOUSE_CLIENT -nm -q " + DROP TABLE data_01509; + DROP DICTIONARY dict_01509; + DROP DICTIONARY dict_01509_preallocate; +" diff --git a/tests/queries/0_stateless/01509_dictionary_preallocate.sql b/tests/queries/0_stateless/01509_dictionary_preallocate.sql deleted file mode 100644 index c5ed0e1fbde..00000000000 --- a/tests/queries/0_stateless/01509_dictionary_preallocate.sql +++ /dev/null @@ -1,36 +0,0 @@ --- The test itself does not test does preallocation works --- It simply check SPARSE_HASHED dictionary with bunch of dictGet() --- (since at the moment of writing there were no such test) - -DROP DATABASE IF EXISTS db_01509; -CREATE DATABASE db_01509; - -CREATE TABLE db_01509.data -( - key UInt64, - value String -) -ENGINE = MergeTree() -ORDER BY key; -INSERT INTO db_01509.data SELECT number key, toString(number) value FROM numbers(1000); - -DROP DICTIONARY IF EXISTS db_01509.dict; -CREATE DICTIONARY db_01509.dict -( - key UInt64, - value String DEFAULT '-' -) -PRIMARY KEY key -SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'data' PASSWORD '' DB 'db_01509')) -LAYOUT(SPARSE_HASHED()) -LIFETIME(0); - -SHOW CREATE DICTIONARY db_01509.dict; - -SYSTEM RELOAD DICTIONARY db_01509.dict; - -SELECT dictGet('db_01509.dict', 'value', toUInt64(1e12)); -SELECT dictGet('db_01509.dict', 'value', toUInt64(0)); -SELECT count() FROM db_01509.dict; - -DROP DATABASE IF EXISTS db_01509; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index e8edd142835..ffa5e21be80 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -348,6 +348,7 @@ "01508_partition_pruning_long", "01509_check_parallel_quorum_inserts_long", "01509_parallel_quorum_and_merge_long", + "01509_dictionary_preallocate", "01515_mv_and_array_join_optimisation_bag", "01516_create_table_primary_key", "01517_drop_mv_with_inner_table",