From d37ad2e6dec756362a1416ebba6d42ff5813d020 Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 21 Jun 2022 13:02:48 +0000 Subject: [PATCH] Implement cache for schema inference for file/s3/hdfs/url --- src/Common/ProfileEvents.cpp | 8 +- src/Core/Settings.h | 10 +- src/Formats/ReadSchemaUtils.cpp | 7 +- src/Formats/ReadSchemaUtils.h | 2 +- src/IO/S3Common.cpp | 12 +- src/IO/S3Common.h | 8 + src/Storages/Cache/SchemaCache.h | 154 ++++++++++++++++++ src/Storages/HDFS/StorageHDFS.cpp | 52 +++++- src/Storages/HDFS/StorageHDFS.h | 5 + src/Storages/StorageFile.cpp | 55 ++++++- src/Storages/StorageFile.h | 5 + src/Storages/StorageS3.cpp | 150 ++++++++++++++--- src/Storages/StorageS3.h | 26 ++- src/Storages/StorageURL.cpp | 41 ++++- src/Storages/StorageURL.h | 5 + src/TableFunctions/TableFunctionFormat.cpp | 2 +- .../__init__.py | 0 .../configs/config.d/query_log.xml | 9 + .../test_file_schema_inference_cache/test.py | 100 ++++++++++++ tests/integration/test_storage_hdfs/test.py | 94 +++++++++++ tests/integration/test_storage_s3/test.py | 149 +++++++++++++++++ 21 files changed, 847 insertions(+), 47 deletions(-) create mode 100644 src/Storages/Cache/SchemaCache.h create mode 100755 tests/integration/test_file_schema_inference_cache/__init__.py create mode 100644 tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml create mode 100755 tests/integration/test_file_schema_inference_cache/test.py diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index b8e552f6023..bdc331c766f 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -343,7 +343,13 @@ \ M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ - M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") + M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \ + \ + M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \ + M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \ + M(SchemaInferenceCacheTTLExpirations, "Number of times a schema from cache expires due to TTL") \ + M(SchemaInferenceCacheTTLUpdates, "Number of times TTL for schema in cache was updated") \ + M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") namespace ProfileEvents { diff --git a/src/Core/Settings.h b/src/Core/Settings.h index e3f756c85f5..b1b27479341 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -589,7 +589,15 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) \ M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \ \ - /** Experimental functions */ \ + M(Bool, use_cache_for_file_schema_inference, true, "Use cache in schema inference while using file table function", 0) \ + M(Seconds, cache_ttl_for_file_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using file table function. 0 means no ttl", 0) \ + M(Bool, use_cache_for_s3_schema_inference, true, "Use cache in schema inference while using s3 table function", 0) \ + M(Seconds, cache_ttl_for_s3_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using s3 table function. 0 means no ttl", 0) \ + M(Bool, use_cache_for_hdfs_schema_inference, true, "Use cache in schema inference while using hdfs table function", 0) \ + M(Seconds, cache_ttl_for_hdfs_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using hdfs table function. 0 means no ttl", 0) \ + M(Bool, use_cache_for_url_schema_inference, true, "Use cache in schema inference while using url table function", 0) \ + M(Seconds, cache_ttl_for_url_schema_inference, 3600, "TTL for schemes in cache in schema inference while using url table function. 0 means no ttl", 0) \ + /** Experimental functions */ \ M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \ M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \ M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions (hashid, etc)", 0) \ diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 11a91bd50dc..c3378ba871e 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -75,7 +74,8 @@ ColumnsDescription readSchemaFromFormat( SchemaReaderPtr schema_reader; size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference; size_t iterations = 0; - while ((buf = read_buffer_iterator())) + ColumnsDescription cached_columns; + while ((buf = read_buffer_iterator(cached_columns))) { ++iterations; @@ -124,6 +124,9 @@ ColumnsDescription readSchemaFromFormat( } } + if (!cached_columns.empty()) + return cached_columns; + if (names_and_types.empty()) throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}", exception_messages); diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h index b7558d5c22b..316058c9e3f 100644 --- a/src/Formats/ReadSchemaUtils.h +++ b/src/Formats/ReadSchemaUtils.h @@ -6,7 +6,7 @@ namespace DB { -using ReadBufferIterator = std::function()>; +using ReadBufferIterator = std::function(ColumnsDescription &)>; /// Try to determine the schema of the data in specifying format. /// For formats that have an external schema reader, it will diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index a58732287e9..92fc23106a6 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -790,7 +790,8 @@ namespace S3 quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : ""); } - size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) + + S3::ObjectInfo getObjectInfo(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) { Aws::S3::Model::HeadObjectRequest req; req.SetBucket(bucket); @@ -804,13 +805,18 @@ namespace S3 if (outcome.IsSuccess()) { auto read_result = outcome.GetResultWithOwnership(); - return static_cast(read_result.GetContentLength()); + return {.size = static_cast(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000}; } else if (throw_on_error) { throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } - return 0; + return {}; + } + + size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) + { + return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error).size; } } diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index 327730d9740..46a09ee8901 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -78,6 +78,14 @@ struct URI static void validateBucket(const String & bucket, const Poco::URI & uri); }; +struct ObjectInfo +{ + size_t size = 0; + time_t last_modification_time = 0; +}; + +S3::ObjectInfo getObjectInfo(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true); + size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true); } diff --git a/src/Storages/Cache/SchemaCache.h b/src/Storages/Cache/SchemaCache.h new file mode 100644 index 00000000000..6226a158b5f --- /dev/null +++ b/src/Storages/Cache/SchemaCache.h @@ -0,0 +1,154 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + + +namespace ProfileEvents +{ + extern const Event SchemaInferenceCacheHits; + extern const Event SchemaInferenceCacheMisses; + extern const Event SchemaInferenceCacheTTLExpirations; + extern const Event SchemaInferenceCacheTTLUpdates; + extern const Event SchemaInferenceCacheInvalidations; +} + +namespace DB +{ + +/// Cache that stores columns description by some string key. It's used in schema inference. +/// It supports TTL for keys. Before each action it looks for expired TTls and removes +/// corresponding keys from cache. After each access to a key in cache it's TTL resumes, +/// so a key will be removed by TTL only if it was not accessed during this TTL. +/// It also supports keys invalidations by last modification time. If last modification time +/// is provided and last modification happened after a key was added to the cache, this key +/// will be removed from cache. +class SchemaCache +{ +public: + void add(const String & key, const ColumnsDescription & columns, time_t ttl = 0) + { + std::lock_guard lock(mutex); + clean(); + addUnlocked(key, columns, ttl); + } + + void addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl = 0) + { + std::lock_guard lock(mutex); + clean(); + for (const auto & key : keys) + addUnlocked(key, columns, ttl); + } + + std::optional tryGet(const String & key, std::function get_last_mod_time = {}) + { + std::lock_guard lock(mutex); + clean(); + auto it = data.find(key); + if (it == data.end()) + { + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheMisses); + return std::nullopt; + } + + auto & schema_info = it->second; + if (get_last_mod_time) + { + /// It's important to call get_last_mod_time only if we have key in cache, + /// because this function can do some heavy operations. + auto last_mod_time = get_last_mod_time(); + /// If get_last_mod_time function was provided but it returned 0, it means that + /// it failed to get last modification time, so we cannot safely use value from cache. + if (!last_mod_time) + return std::nullopt; + + if (last_mod_time > schema_info.registration_time) + { + /// Object was modified after it was added in cache. + /// So, stored value is no more valid and we should remove it. + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations); + /// If this key had TTL, we should remove it from expiration queue. + if (schema_info.ttl) + expiration_queue.erase({schema_info.valid_until, key}); + data.erase(key); + return std::nullopt; + } + } + + if (schema_info.ttl) + { + /// Current value in cache is valid and we can resume it's TTL bu updating it's expiration time. + /// We will extract current value from the expiration queue, modify it and insert back to the queue.. + time_t now = std::time(nullptr); + auto jt = expiration_queue.find({schema_info.valid_until, key}); + auto node = expiration_queue.extract(jt); + schema_info.valid_until = now + schema_info.ttl; + node.value().first = schema_info.valid_until; + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLUpdates); + expiration_queue.insert(std::move(node)); + } + + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits); + return schema_info.columns; + } + + /// Check if this cache contains provided key. + bool has(const String & key) + { + std::lock_guard lock(mutex); + clean(); + return data.contains(key); + } + +private: + void addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl) + { + /// Do nothing if this key is already in cache; + if (data.contains(key)) + return; + time_t now = std::time(nullptr); + time_t valid_until = now + ttl; + data[key] = SchemaInfo{columns, now, ttl, valid_until}; + if (ttl) + expiration_queue.insert({valid_until, key}); + } + + /// Check for expired TTLs. + void clean() + { + time_t now = std::time(nullptr); + auto it = expiration_queue.begin(); + /// Queue is sorted by time, so we need to check only the first + /// values that are less than current time. + while (it != expiration_queue.end() && it->first < now) + { + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLExpirations); + data.erase(it->second); + ++it; + } + expiration_queue.erase(expiration_queue.begin(), it); + } + + struct SchemaInfo + { + ColumnsDescription columns; + time_t registration_time; + time_t ttl; + time_t valid_until; + }; + + std::unordered_map data; + /// Special queue for checking expired TTLs. It contains pairs + /// (expiration time, key) sorted in ascending order. + std::set> expiration_queue; + std::mutex mutex; +}; + +} diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 2edcbeb9a7e..6e76da3911d 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -63,7 +63,7 @@ namespace /* Recursive directory listing with matched paths as a result. * Have the same method in StorageFile. */ - Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match) + Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match, std::unordered_map * last_mod_times) { const size_t first_glob = for_match.find_first_of("*?{"); @@ -98,13 +98,15 @@ namespace if (re2::RE2::FullMatch(file_name, matcher)) { result.push_back(String(ls.file_info[i].mName)); + if (last_mod_times) + (*last_mod_times)[result.back()] = ls.file_info[i].mLastMod; } } else if (is_directory && looking_for_directory) { if (re2::RE2::FullMatch(file_name, matcher)) { - Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash)); + Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash), last_mod_times); /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. std::move(result_part.begin(), result_part.end(), std::back_inserter(result)); } @@ -120,12 +122,12 @@ namespace return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)}; } - std::vector getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context) + std::vector getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context, std::unordered_map * last_mod_times = nullptr) { HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef()); HDFSFSPtr fs = createHDFSFS(builder.get()); - return LSWithRegexpMatching("/", fs, path_from_uri); + return LSWithRegexpMatching("/", fs, path_from_uri, last_mod_times); } } @@ -184,7 +186,8 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( ContextPtr ctx) { const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri); - auto paths = getPathsList(path_from_uri, uri, ctx); + std::unordered_map last_mod_time; + auto paths = getPathsList(path_from_uri, uri, ctx, &last_mod_time); if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format)) throw Exception( ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, @@ -192,7 +195,11 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( "specify table structure manually", format); - ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr + std::optional columns_from_cache; + if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference) + columns_from_cache = tryGetColumnsFromCache(paths, last_mod_time); + + ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == paths.end()) return nullptr; @@ -200,7 +207,13 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( return wrapReadBufferWithCompressionMethod( std::make_unique(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression); }; - return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx); + + auto columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx); + + if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference) + addColumnsToCache(paths, columns, ctx); + + return columns; } class HDFSSource::DisclosedGlobIterator::Impl @@ -700,6 +713,31 @@ NamesAndTypesList StorageHDFS::getVirtuals() const return virtual_columns; } +SchemaCache & StorageHDFS::getSchemaCache() +{ + static SchemaCache schema_cache; + return schema_cache; +} + +std::optional StorageHDFS::tryGetColumnsFromCache(const Strings & paths, std::unordered_map & last_mod_time) +{ + auto & schema_cache = getSchemaCache(); + for (const auto & path : paths) + { + auto columns = schema_cache.tryGet(path, [&](){ return last_mod_time[path]; }); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageHDFS::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx) +{ + auto & schema_cache = getSchemaCache(); + schema_cache.addMany(paths, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds()); +} + } #endif diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index d987820b844..c6aa49a75e8 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -69,6 +70,10 @@ protected: friend class HDFSSource; private: + static SchemaCache & getSchemaCache(); + static std::optional tryGetColumnsFromCache(const Strings & paths, std::unordered_map & last_mod_time); + static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx); + std::vector uris; String format_name; String compression_method; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d466096c8ba..17a6e93a05e 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -262,7 +262,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c /// in case of file descriptor we have a stream of data and we cannot /// start reading data from the beginning after reading some data for /// schema inference. - ReadBufferIterator read_buffer_iterator = [&]() + ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &) { /// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// where we can store the original read buffer. @@ -306,7 +306,11 @@ ColumnsDescription StorageFile::getTableStructureFromFile( "table structure manually", format); - ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr + std::optional columns_from_cache; + if (context->getSettingsRef().use_cache_for_file_schema_inference) + columns_from_cache = tryGetColumnsFromCache(paths); + + ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == paths.end()) return nullptr; @@ -314,7 +318,16 @@ ColumnsDescription StorageFile::getTableStructureFromFile( return createReadBuffer(*it++, false, "File", -1, compression_method, context); }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); + + if (context->getSettingsRef().use_cache_for_file_schema_inference) + addColumnsToCache(paths, columns, context); + + return columns; } bool StorageFile::supportsSubsetOfColumns() const @@ -1190,4 +1203,40 @@ NamesAndTypesList StorageFile::getVirtuals() const {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; } + +SchemaCache & StorageFile::getSchemaCache() +{ + static SchemaCache schema_cache; + return schema_cache; +} + +std::optional StorageFile::tryGetColumnsFromCache(const Strings & paths) +{ + /// Check if the cache contains one of the paths. + auto & schema_cache = getSchemaCache(); + struct stat file_stat{}; + for (const auto & path : paths) + { + auto get_last_mod_time = [&]() + { + if (0 != stat(path.c_str(), &file_stat)) + throwFromErrno("Cannot stat file " + path, ErrorCodes::CANNOT_STAT); + + return file_stat.st_mtim.tv_sec; + }; + + auto columns = schema_cache.tryGet(path, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageFile::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context) +{ + auto & schema_cache = getSchemaCache(); + schema_cache.addMany(paths, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds()); +} + } diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index f47f6172c1c..2671760bf40 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -93,6 +94,10 @@ protected: private: void setStorageMetadata(CommonArguments args); + static SchemaCache & getSchemaCache(); + static std::optional tryGetColumnsFromCache(const Strings & paths); + static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context); + std::string format_name; // We use format settings from global context + CREATE query for File table // function -- in this case, format_settings is set. diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index f524a405c9b..96b72979408 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -11,7 +11,6 @@ #include -#include #include #include #include @@ -91,12 +90,16 @@ public: const S3::URI & globbed_uri_, ASTPtr & query_, const Block & virtual_header_, - ContextPtr context_) + ContextPtr context_, + std::unordered_map * object_infos_, + Strings * read_keys_) : WithContext(context_) , client(client_) , globbed_uri(globbed_uri_) , query(query_) , virtual_header(virtual_header_) + , object_infos(object_infos_) + , read_keys(read_keys_) { if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION); @@ -188,6 +191,8 @@ private: if (re2::RE2::FullMatch(key, *matcher)) { String path = fs::path(globbed_uri.bucket) / key; + if (object_infos) + (*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000}; String file = path.substr(path.find_last_of('/') + 1); if (path_column) path_column->insert(path); @@ -222,6 +227,9 @@ private: /// It returns false when all objects were returned is_finished = !outcome.GetResult().GetIsTruncated(); + + if (read_keys) + read_keys->insert(read_keys->end(), buffer.begin(), buffer.end()); } std::mutex mutex; @@ -236,6 +244,8 @@ private: Aws::S3::Model::ListObjectsV2Outcome outcome; std::unique_ptr matcher; bool is_finished{false}; + std::unordered_map * object_infos; + Strings * read_keys; }; StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( @@ -243,8 +253,10 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, - ContextPtr context) - : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_header, context)) + ContextPtr context, + std::unordered_map * object_infos_, + Strings * read_keys_) + : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_)) { } @@ -396,7 +408,8 @@ StorageS3Source::StorageS3Source( const String & bucket_, const String & version_id_, std::shared_ptr file_iterator_, - const size_t download_thread_num_) + const size_t download_thread_num_, + const std::unordered_map & object_infos_) : ISource(getHeader(sample_block_, requested_virtual_columns_)) , WithContext(context_) , name(std::move(name_)) @@ -413,6 +426,7 @@ StorageS3Source::StorageS3Source( , requested_virtual_columns(requested_virtual_columns_) , file_iterator(file_iterator_) , download_thread_num(download_thread_num_) + , object_infos(object_infos_) { initialize(); } @@ -455,7 +469,12 @@ bool StorageS3Source::initialize() std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & key) { - const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false); + size_t object_size; + auto it = object_infos.find(fs::path(bucket) / key); + if (it != object_infos.end()) + object_size = it->second.size; + else + object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false); auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1; @@ -785,27 +804,34 @@ std::shared_ptr StorageS3::createFileIterator( ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks) + const std::vector & read_tasks, + std::unordered_map * object_infos, + Strings * read_keys) { if (distributed_processing) { return std::make_shared( - [read_tasks_iterator = std::make_shared(read_tasks, local_context->getReadTaskCallback())]() -> String + [read_tasks_iterator = std::make_shared(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String { - return read_tasks_iterator->next(); + auto key = read_tasks_iterator->next(); + if (read_keys) + read_keys->push_back(key); + return key; }); } else if (is_key_with_globs) { /// Iterate through disclosed globs and make a source for each file auto glob_iterator = std::make_shared( - *s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context); + *s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys); return std::make_shared([glob_iterator]() { return glob_iterator->next(); }); } else { auto keys_iterator = std::make_shared(keys, s3_configuration.uri.bucket, query, virtual_block, local_context); + if (read_keys) + *read_keys = keys; return std::make_shared([keys_iterator]() { return keys_iterator->next(); }); } } @@ -850,7 +876,8 @@ Pipe StorageS3::read( local_context, query_info.query, virtual_block, - read_tasks_used_in_schema_inference); + read_tasks_used_in_schema_inference, + &object_infos); ColumnsDescription columns_description; Block block_for_format; @@ -893,7 +920,8 @@ Pipe StorageS3::read( s3_configuration.uri.bucket, s3_configuration.uri.version_id, iterator_wrapper, - max_download_threads)); + max_download_threads, + object_infos)); } auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -1130,11 +1158,12 @@ ColumnsDescription StorageS3::getTableStructureFromData( const String & compression_method, bool distributed_processing, const std::optional & format_settings, - ContextPtr ctx) + ContextPtr ctx, + std::unordered_map * object_infos) { S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) }; updateS3Configuration(ctx, s3_configuration); - return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx); + return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos); } ColumnsDescription StorageS3::getTableStructureFromDataImpl( @@ -1145,12 +1174,20 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing) + std::vector * read_keys_in_distributed_processing, + std::unordered_map * object_infos) { - auto file_iterator - = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}); + std::vector read_keys; - ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr + auto file_iterator + = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys); + + std::optional columns_from_cache; + size_t prev_read_keys_size = read_keys.size(); + if (ctx->getSettingsRef().use_cache_for_s3_schema_inference) + columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos); + + ReadBufferIterator read_buffer_iterator = [&, first = false](ColumnsDescription & cached_columns) mutable -> std::unique_ptr { auto key = (*file_iterator)(); @@ -1166,8 +1203,17 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( return nullptr; } - if (distributed_processing && read_keys_in_distributed_processing) - read_keys_in_distributed_processing->push_back(key); + /// S3 file iterator could get new keys after new iteration, check them in schema cache. + if (ctx->getSettingsRef().use_cache_for_s3_schema_inference && read_keys.size() > prev_read_keys_size) + { + columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos); + prev_read_keys_size = read_keys.size(); + if (columns_from_cache) + { + cached_columns = *columns_from_cache; + return nullptr; + } + } first = false; return wrapReadBufferWithCompressionMethod( @@ -1176,7 +1222,19 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( chooseCompressionMethod(key, compression_method)); }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); + + if (ctx->getSettingsRef().use_cache_for_s3_schema_inference) + addColumnsToCache(read_keys, s3_configuration, columns, ctx); + + if (distributed_processing && read_keys_in_distributed_processing) + *read_keys_in_distributed_processing = std::move(read_keys); + + return columns; } @@ -1265,6 +1323,56 @@ bool StorageS3::supportsPartitionBy() const return true; } +SchemaCache & StorageS3::getSchemaCache() +{ + static SchemaCache schema_cache; + return schema_cache; +} + +std::optional StorageS3::tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map * object_infos) +{ + auto & schema_cache = getSchemaCache(); + for (auto it = begin; it < end; ++it) + { + String path = fs::path(s3_configuration.uri.bucket) / *it; + String cache_key = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path; + auto get_last_mod_time = [&]() + { + S3::ObjectInfo info; + /// Check if we already have information about this object. + /// If no, request it and remember for possible future usage. + if (object_infos && object_infos->contains(path)) + info = (*object_infos)[path]; + else + { + /// Note that in case of exception in getObjectInfo returned info will be empty, + /// but schema cache will handle this case and won't return columns from cache + /// because we can't say that it's valid without last modification time. + info = S3::getObjectInfo(s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false); + if (object_infos) + (*object_infos)[path] = info; + } + + return info.last_modification_time; + }; + + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageS3::addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & ctx) +{ + auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket; + Strings objects; + std::transform(keys.begin(), keys.end(), std::back_inserter(objects), [&](const String & key){ return host_and_bucket / key; }); + auto & schema_cache = getSchemaCache(); + schema_cache.addMany(objects, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds()); +} + } #endif diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index c0e5b80c709..83f95d3ded1 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace Aws::S3 { @@ -40,7 +41,9 @@ public: const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, - ContextPtr context); + ContextPtr context, + std::unordered_map * object_infos = nullptr, + Strings * read_keys_ = nullptr); String next(); @@ -94,7 +97,8 @@ public: const String & bucket, const String & version_id, std::shared_ptr file_iterator_, - size_t download_thread_num); + size_t download_thread_num, + const std::unordered_map & object_infos_); String getName() const override; @@ -128,6 +132,8 @@ private: Poco::Logger * log = &Poco::Logger::get("StorageS3Source"); + std::unordered_map object_infos; + /// Recreate ReadBuffer and Pipeline for each file. bool initialize(); @@ -190,7 +196,8 @@ public: const String & compression_method, bool distributed_processing, const std::optional & format_settings, - ContextPtr ctx); + ContextPtr ctx, + std::unordered_map * object_infos = nullptr); static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector> & key_value_args); @@ -223,6 +230,8 @@ private: std::vector read_tasks_used_in_schema_inference; + std::unordered_map object_infos; + static void updateS3Configuration(ContextPtr, S3Configuration &); static std::shared_ptr createFileIterator( @@ -233,7 +242,9 @@ private: ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks = {}); + const std::vector & read_tasks = {}, + std::unordered_map * object_infos = nullptr, + Strings * read_keys = nullptr); static ColumnsDescription getTableStructureFromDataImpl( const String & format, @@ -243,9 +254,14 @@ private: bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing = nullptr); + std::vector * read_keys_in_distributed_processing = nullptr, + std::unordered_map * object_infos = nullptr); bool supportsSubsetOfColumns() const override; + + static SchemaCache & getSchemaCache(); + static std::optional tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map * object_infos); + static void addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & context); }; } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index cd55c32fb9c..6280aec61c8 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -567,8 +567,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( urls_to_check = {uri}; } + std::optional columns_from_cache; + if (context->getSettingsRef().use_cache_for_url_schema_inference) + columns_from_cache = tryGetColumnsFromCache(urls_to_check); - ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr + ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == urls_to_check.cend()) return nullptr; @@ -591,7 +594,16 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( return buf; }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); + + if (context->getSettingsRef().use_cache_for_url_schema_inference) + addColumnsToCache(urls_to_check, columns, context); + + return columns; } bool IStorageURLBase::supportsSubsetOfColumns() const @@ -772,6 +784,31 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad } } +SchemaCache & IStorageURLBase::getSchemaCache() +{ + static SchemaCache schema_cache; + return schema_cache; +} + +std::optional IStorageURLBase::tryGetColumnsFromCache(const Strings & urls) +{ + auto & schema_cache = getSchemaCache(); + for (const auto & url : urls) + { + auto columns = schema_cache.tryGet(url); + if (columns) + return columns; + } + + return std::nullopt; +} + +void IStorageURLBase::addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context) +{ + auto & schema_cache = getSchemaCache(); + schema_cache.addMany(urls, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds()); +} + StorageURL::StorageURL( const String & uri_, const StorageID & table_id_, diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 85c77b00550..cc244ec4982 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -97,6 +98,10 @@ protected: private: virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; + + static SchemaCache & getSchemaCache(); + static std::optional tryGetColumnsFromCache(const Strings & urls); + static void addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context); }; class StorageURLSink : public SinkToStorage diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index d3ce9627598..67685cfc515 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -49,7 +49,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const { - ReadBufferIterator read_buffer_iterator = [&]() + ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &) { return std::make_unique(data); }; diff --git a/tests/integration/test_file_schema_inference_cache/__init__.py b/tests/integration/test_file_schema_inference_cache/__init__.py new file mode 100755 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml b/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml new file mode 100644 index 00000000000..de7e27921f0 --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml @@ -0,0 +1,9 @@ + + + + system + query_log
+ toYYYYMM(event_date) + 300 +
+
diff --git a/tests/integration/test_file_schema_inference_cache/test.py b/tests/integration/test_file_schema_inference_cache/test.py new file mode 100755 index 00000000000..0d4b59016e3 --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/test.py @@ -0,0 +1,100 @@ +import pytest +import time +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance("node", stay_alive=True, main_configs=["configs/config.d/query_log.xml"]) + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def get_profile_event_for_query(node, query, profile_event): + node.query('system flush logs') + query = query.replace("'", "\\'") + return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1")) + + +def test(start_cluster): + node.query("insert into function file('data.jsonl') select * from numbers(100)") + desc_query = "desc file('data.jsonl')" + node.query(desc_query) + cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + desc_query = "desc file('data.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + node.query("insert into function file('data.jsonl') select * from numbers(100)") + desc_query = "desc file('data.jsonl')" + node.query(desc_query) + cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + node.query("insert into function file('data1.jsonl') select * from numbers(100)") + desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1" + node.query(desc_query) + cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + time.sleep(2) + + desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000" + node.query(desc_query) + cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = "desc file('data1.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_updates = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLUpdates') + assert cache_ttl_updates == 1 + + node.query("insert into function file('data1.jsonl') select * from numbers(100)") + desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1" + node.query(desc_query) + cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + time.sleep(2) + + desc_query = "desc file('data.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = "desc file('data1.jsonl')" + node.query(desc_query) + cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + node.query("insert into function file('data2.jsonl') select * from numbers(100)") + node.query("insert into function file('data3.jsonl') select * from numbers(100)") + + desc_query = "desc file('data*.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = "desc file('data2.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = "desc file('data3.jsonl')" + node.query(desc_query) + cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 0490c0c1f0d..8ea978747a9 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -1,6 +1,7 @@ import os import pytest +import time from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV from pyhdfs import HdfsClient @@ -628,6 +629,99 @@ def test_virtual_columns_2(started_cluster): assert result.strip() == "kek" +def get_profile_event_for_query(node, query, profile_event): + node.query('system flush logs') + query = query.replace("'", "\\'") + return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1")) + + +def test_schema_inference_cache(started_cluster): + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" + node1.query(desc_query) + cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" + node1.query(desc_query) + cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1" + node1.query(desc_query) + cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + time.sleep(2) + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1000" + node1.query(desc_query) + cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_updates = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLUpdates') + assert cache_ttl_updates == 1 + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1" + node1.query(desc_query) + cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + time.sleep(2) + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')" + node1.query(desc_query) + cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1") + node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1") + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache*.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache2.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache3.jsonl')" + node1.query(desc_query) + cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 58b9dac8cb0..0bd73678e98 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -1483,3 +1483,152 @@ def test_wrong_format_usage(started_cluster): ) assert "Not a Parquet file" in result + + +def get_profile_event_for_query(instance, query, profile_event): + instance.query('system flush logs') + query = query.replace("'", "\\'") + return int(instance.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1")) + + +def test_schema_inference_cache(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + time.sleep(2) + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1000" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates') + assert cache_ttl_updates == 1 + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1" + instance.query(desc_query) + cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations') + assert cache_invalidations == 1 + + time.sleep(2) + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1") + instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1") + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache*.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + time.sleep(2) + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations') + assert cache_ttl_expirations == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=10000" + instance.query(desc_query) + cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses') + assert cache_misses == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates') + assert cache_ttl_updates == 1 + + file_name = 'test_cache{1,2,3}.jsonl' + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name}')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1 + + desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')" + instance.query(desc_query) + cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits') + assert cache_hits == 1