diff --git a/docs/en/operations/caches.md b/docs/en/operations/caches.md index 28ddb14a511..f2eb1e3ce5c 100644 --- a/docs/en/operations/caches.md +++ b/docs/en/operations/caches.md @@ -19,6 +19,7 @@ Additional cache types: - Compiled expressions cache. - [Avro format](../interfaces/formats.md#data-format-avro) schemas cache. - [Dictionaries](../sql-reference/dictionaries/index.md) data cache. +- Schema inference cache. Indirectly used: diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 140e11ed2bc..ca729ff2683 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3351,63 +3351,27 @@ Enable schemas cache for schema inference in `file` table function. Default value: `true`. -## cache_ttl_for_file_schema_inference {#cache_ttl_for_file_schema_inference} - -TTL for schemes in cache in schema inference while using `file` table function. 0 means no ttl. -Schema will be removed from cache if it was not accessed during specified TTL. - -Type: seconds. - -Default value: `3600 * 24`. - ## use_cache_for_s3_schema_inference {use_cache_for_s3_schema_inference} Enable schemas cache for schema inference in `s3` table function. Default value: `true`. -## cache_ttl_for_s3_schema_inference {#cache_ttl_for_s3_schema_inference} - -TTL for schemes in cache in schema inference while using `s3` table function. 0 means no ttl. -Schema will be removed from cache if it was not accessed during specified TTL. - -Type: seconds. - -Default value: `3600 * 24`. - ## use_cache_for_url_schema_inference {use_cache_for_url_schema_inference} Enable schemas cache for schema inference in `url` table function. Default value: `true`. -## cache_ttl_for_url_schema_inference {#cache_ttl_for_url_schema_inference} - -TTL for schemes in cache in schema inference while using `url` table function. 0 means no ttl. -Schema will be removed from cache if it was not accessed during specified TTL. - -Type: seconds. - -Default value: `3600 * 24`. - ## use_cache_for_hdfs_schema_inference {use_cache_for_hdfs_schema_inference} Enable schemas cache for schema inference in `hdfs` table function. Default value: `true`. -## cache_ttl_for_hdfs_schema_inference {#cache_ttl_for_hdfs_schema_inference} +## schema_inference_cache_require_modification_time_for_url {#schema_inference_cache_require_modification_time_for_url} -TTL for schemes in cache in schema inference while using `hdfs` table function. 0 means no ttl. -Schema will be removed from cache if it was not accessed during specified TTL. - -Type: seconds. - -Default value: `3600 * 24`. - -## allow_urls_without_last_mod_time_in_schema_inference_cache {#allow_urls_without_last_mod_time_in_schema_inference_cache} - -Use schema from cache without last modification time validation for urls without `Last-Modified` header. +Use schema from cache for URL with last modification time validation (for urls with Last-Modified header). If this setting is enabled and URL doesn't have Last-Modified header, schema from cache won't be used. Default value: `true`. diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 38e006f18e7..e8d3b453188 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -140,6 +140,7 @@ enum class AccessType M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index de2712bf715..1ea5e55f00e 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -348,8 +348,7 @@ \ M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \ M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \ - M(SchemaInferenceCacheTTLExpirations, "Number of times a schema from cache expires due to TTL") \ - M(SchemaInferenceCacheTTLUpdates, "Number of times TTL for schema in cache was updated") \ + M(SchemaInferenceCacheEvictions, "Number of times a schema from cache was evicted due to overflow") \ M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") \ \ M(KeeperPacketsSent, "Packets sent by keeper server") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 6e966dface2..53fcc9112eb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -614,7 +614,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Seconds, cache_ttl_for_hdfs_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using hdfs table function. 0 means no ttl", 0) \ M(Bool, use_cache_for_url_schema_inference, true, "Use cache in schema inference while using url table function", 0) \ M(Seconds, cache_ttl_for_url_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using url table function. 0 means no ttl", 0) \ - M(Bool, allow_urls_without_last_mod_time_in_schema_inference_cache, true, "Use schema from cache without last modification time validation for urls without Last-Modified header", 0) \ + M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \ \ M(String, compatibility, "", "Changes other settings according to provided ClickHouse version. If we know that we changed some behaviour in ClickHouse by changing some settings in some version, this compatibility setting will control these settings", 0) \ \ diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index e4e93f9f370..1aa656cc723 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -239,6 +239,20 @@ String getKeyForSchemaCache(const String & source, const String & format, const return getKeysForSchemaCache({source}, format, format_settings, context).front(); } +static String makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info) +{ + return source + "@@" + format + "@@" + additional_format_info; +} + +void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info) +{ + size_t additional_format_info_pos = key.rfind("@@"); + additional_format_info = key.substr(additional_format_info_pos + 2, key.size() - additional_format_info_pos - 2); + size_t format_pos = key.rfind("@@", additional_format_info_pos - 1); + format = key.substr(format_pos + 2, additional_format_info_pos - format_pos - 2); + source = key.substr(0, format_pos); +} + Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional & format_settings, const ContextPtr & context) { /// For some formats data schema depends on some settings, so it's possible that @@ -249,7 +263,7 @@ Strings getKeysForSchemaCache(const Strings & sources, const String & format, co String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings); Strings cache_keys; cache_keys.reserve(sources.size()); - std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return source + format + additional_format_info; }); + std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); }); return cache_keys; } diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h index 592ec4d8863..56b48823464 100644 --- a/src/Formats/ReadSchemaUtils.h +++ b/src/Formats/ReadSchemaUtils.h @@ -50,4 +50,5 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header); String getKeyForSchemaCache(const String & source, const String & format, const std::optional & format_settings, const ContextPtr & context); Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional & format_settings, const ContextPtr & context); +void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info); } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 695ea53e65e..cba197a25cd 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -45,6 +45,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -326,6 +330,25 @@ BlockIO InterpreterSystemQuery::execute() } break; } + case Type::DROP_SCHEMA_CACHE: + { + getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE); + std::unordered_set caches_to_drop; + if (query.schema_cache_storage.empty()) + caches_to_drop = {"FILE", "S3", "HDFS", "URL"}; + else + caches_to_drop = {query.schema_cache_storage}; + + if (caches_to_drop.contains("FILE")) + StorageFile::getSchemaCache(getContext()).clear(); + if (caches_to_drop.contains("S3")) + StorageS3::getSchemaCache(getContext()).clear(); + if (caches_to_drop.contains("HDFS")) + StorageHDFS::getSchemaCache(getContext()).clear(); + if (caches_to_drop.contains("URL")) + StorageURL::getSchemaCache(getContext()).clear(); + break; + } case Type::RELOAD_DICTIONARY: { getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); @@ -833,6 +856,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_INDEX_MARK_CACHE: case Type::DROP_INDEX_UNCOMPRESSED_CACHE: case Type::DROP_FILESYSTEM_CACHE: + case Type::DROP_SCHEMA_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); break; diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index f2ef7a7a47a..a8c1c246836 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -29,6 +29,7 @@ public: DROP_COMPILED_EXPRESSION_CACHE, #endif DROP_FILESYSTEM_CACHE, + DROP_SCHEMA_CACHE, STOP_LISTEN_QUERIES, START_LISTEN_QUERIES, RESTART_REPLICAS, @@ -96,6 +97,8 @@ public: String filesystem_cache_path; String backup_name; + String schema_cache_storage; + String getID(char) const override { return "SYSTEM query"; } ASTPtr clone() const override diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index 7854f7a2400..26ba9290d04 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -363,6 +363,23 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & parseQueryWithOnCluster(res, pos, expected); break; } + case Type::DROP_SCHEMA_CACHE: + { + if (ParserKeyword{"FOR"}.ignore(pos, expected)) + { + if (ParserKeyword{"FILE"}.ignore(pos, expected)) + res->schema_cache_storage = "FILE"; + else if (ParserKeyword{"S3"}.ignore(pos, expected)) + res->schema_cache_storage = "S3"; + else if (ParserKeyword{"HDFS"}.ignore(pos, expected)) + res->schema_cache_storage = "HDFS"; + else if (ParserKeyword{"URL"}.ignore(pos, expected)) + res->schema_cache_storage = "URL"; + else + return false; + } + break; + } case Type::UNFREEZE: { diff --git a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp index 93be771fcaf..1580125f97e 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp @@ -314,7 +314,7 @@ void registerInputFormatCapnProto(FormatFactory & factory) factory.markFormatSupportsSubsetOfColumns("CapnProto"); factory.registerFileExtension("capnp", "CapnProto"); factory.registerAdditionalInfoForSchemaCacheGetter( - "CapnProto", [](const FormatSettings & settings) { return settings.schema.format_schema; }); + "CapnProto", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } void registerCapnProtoSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp b/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp index a662d2a7198..a8d9aab375b 100644 --- a/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp @@ -454,7 +454,7 @@ void registerInputFormatMySQLDump(FormatFactory & factory) }); factory.registerAdditionalInfoForSchemaCacheGetter( - "MySQLDump", [](const FormatSettings & settings) { return settings.mysql_dump.table_name; }); + "MySQLDump", [](const FormatSettings & settings) { return "Table name: " + settings.mysql_dump.table_name; }); } void registerMySQLSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp index 8373708ae59..7af0d8a7094 100644 --- a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp @@ -82,7 +82,7 @@ void registerInputFormatProtobufList(FormatFactory & factory) }); factory.markFormatSupportsSubsetOfColumns("ProtobufList"); factory.registerAdditionalInfoForSchemaCacheGetter( - "ProtobufList", [](const FormatSettings & settings) { return settings.schema.format_schema; }); + "ProtobufList", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } void registerProtobufListSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 7971f8944e5..5fd4bb79fa0 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -104,7 +104,7 @@ void registerProtobufSchemaReader(FormatFactory & factory) for (const auto & name : {"Protobuf", "ProtobufSingle"}) factory.registerAdditionalInfoForSchemaCacheGetter( - name, [](const FormatSettings & settings) { return settings.schema.format_schema; }); + name, [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } } diff --git a/src/Storages/Cache/SchemaCache.cpp b/src/Storages/Cache/SchemaCache.cpp index 2fa5f7dfbd8..7bce1965f1a 100644 --- a/src/Storages/Cache/SchemaCache.cpp +++ b/src/Storages/Cache/SchemaCache.cpp @@ -6,46 +6,57 @@ namespace ProfileEvents { extern const Event SchemaInferenceCacheHits; extern const Event SchemaInferenceCacheMisses; - extern const Event SchemaInferenceCacheTTLExpirations; - extern const Event SchemaInferenceCacheTTLUpdates; + extern const Event SchemaInferenceCacheEvictions; extern const Event SchemaInferenceCacheInvalidations; } namespace DB { -void SchemaCache::add(const String & key, const ColumnsDescription & columns, time_t ttl) +SchemaCache::SchemaCache(size_t max_elements_) : max_elements(max_elements_) +{ +} + +void SchemaCache::add(const String & key, const ColumnsDescription & columns) { std::lock_guard lock(mutex); - clean(); - addUnlocked(key, columns, ttl); + addUnlocked(key, columns); } -void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl) +void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns) { std::lock_guard lock(mutex); - clean(); for (const auto & key : keys) - addUnlocked(key, columns, ttl); + addUnlocked(key, columns); } -void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl) +void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns) { /// Do nothing if this key is already in cache; if (data.contains(key)) return; + time_t now = std::time(nullptr); - time_t valid_until = now + ttl; - data[key] = SchemaInfo{columns, now, ttl, valid_until}; - if (ttl) - expiration_queue.insert({valid_until, key}); + auto it = queue.insert(queue.end(), key); + data[key] = {SchemaInfo{columns, now}, it}; + checkOverflow(); } -std::optional SchemaCache::tryGet(const String & key, std::function()> get_last_mod_time) +void SchemaCache::checkOverflow() +{ + if (queue.size() <= max_elements) + return; + + auto key = queue.front(); + data.erase(key); + queue.pop_front(); + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheEvictions); +} + +std::optional SchemaCache::tryGet(const String & key, LastModificationTimeGetter get_last_mod_time) { std::lock_guard lock(mutex); - clean(); auto it = data.find(key); if (it == data.end()) { @@ -53,7 +64,8 @@ std::optional SchemaCache::tryGet(const String & key, std::f return std::nullopt; } - auto & schema_info = it->second; + auto & schema_info = it->second.schema_info; + auto & queue_iterator = it->second.iterator; if (get_last_mod_time) { /// It's important to call get_last_mod_time only if we have key in cache, @@ -69,44 +81,34 @@ std::optional SchemaCache::tryGet(const String & key, std::f /// Object was modified after it was added in cache. /// So, stored value is no more valid and we should remove it. ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations); - /// If this key had TTL, we should remove it from expiration queue. - if (schema_info.ttl) - expiration_queue.erase({schema_info.valid_until, key}); + queue.erase(queue_iterator); data.erase(key); return std::nullopt; } } - if (schema_info.ttl) - { - /// Current value in cache is valid and we can resume it's TTL by updating it's expiration time. - /// We will extract current value from the expiration queue, modify it and insert back to the queue. - time_t now = std::time(nullptr); - auto jt = expiration_queue.find({schema_info.valid_until, key}); - auto node = expiration_queue.extract(jt); - schema_info.valid_until = now + schema_info.ttl; - node.value().first = schema_info.valid_until; - ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLUpdates); - expiration_queue.insert(std::move(node)); - } + /// Move key to the end of queue. + queue.splice(queue.end(), queue, queue_iterator); ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits); return schema_info.columns; } -void SchemaCache::clean() +void SchemaCache::clear() { - time_t now = std::time(nullptr); - auto it = expiration_queue.begin(); - /// Queue is sorted by time, so we need to check only the first - /// values that are less than current time. - while (it != expiration_queue.end() && it->first < now) - { - ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLExpirations); - data.erase(it->second); - ++it; - } - expiration_queue.erase(expiration_queue.begin(), it); + std::lock_guard lock(mutex); + data.clear(); + queue.clear(); +} + +std::unordered_map SchemaCache::getAll() +{ + std::lock_guard lock(mutex); + std::unordered_map result; + for (const auto & [key, value] : data) + result[key] = value.schema_info; + + return result; } } diff --git a/src/Storages/Cache/SchemaCache.h b/src/Storages/Cache/SchemaCache.h index a9d34dc23b6..132fbc0a8cb 100644 --- a/src/Storages/Cache/SchemaCache.h +++ b/src/Storages/Cache/SchemaCache.h @@ -2,49 +2,64 @@ #include #include -#include +#include #include #include namespace DB { +const size_t DEFAULT_SCHEMA_CACHE_ELEMENTS = 4096; + /// Cache that stores columns description by some string key. It's used in schema inference. -/// It supports TTL for keys. Before each action it looks for expired TTls and removes -/// corresponding keys from cache. After each access to a key in cache it's TTL resumes, -/// so a key will be removed by TTL only if it was not accessed during this TTL. +/// It implements LRU semantic: after each access to a key in cache we move this key to +/// the end of the queue, if we reached the limit of maximum elements in the cache we +/// remove keys from the beginning of the queue. /// It also supports keys invalidations by last modification time. If last modification time /// is provided and last modification happened after a key was added to the cache, this key /// will be removed from cache. class SchemaCache { public: - /// Add new key with a schema with optional TTL - void add(const String & key, const ColumnsDescription & columns, time_t ttl = 0); - - /// Add many keys with the same schema with optional TTL (usually used for globs) - void addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl = 0); - - std::optional tryGet(const String & key, std::function()> get_last_mod_time = {}); - -private: - void addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl); - - /// Check for expired TTLs. - void clean(); + SchemaCache(size_t max_elements_); struct SchemaInfo { ColumnsDescription columns; time_t registration_time; - time_t ttl; - time_t valid_until; }; - std::unordered_map data; - /// Special queue for checking expired TTLs. It contains pairs - /// (expiration time, key) sorted in ascending order. - std::set> expiration_queue; + using LastModificationTimeGetter = std::function()>; + + /// Add new key with a schema + void add(const String & key, const ColumnsDescription & columns); + + /// Add many keys with the same schema (usually used for globs) + void addMany(const Strings & keys, const ColumnsDescription & columns); + + std::optional tryGet(const String & key, LastModificationTimeGetter get_last_mod_time = {}); + + void clear(); + + std::unordered_map getAll(); + +private: + void addUnlocked(const String & key, const ColumnsDescription & columns); + void checkOverflow(); + + using Queue = std::list; + using QueueIterator = Queue::iterator; + + struct Cell + { + SchemaInfo schema_info; + QueueIterator iterator; + }; + + Queue queue; + std::unordered_map data; + + size_t max_elements; std::mutex mutex; }; diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 5c8c3afb414..b63b1aa67e2 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -729,9 +729,9 @@ NamesAndTypesList StorageHDFS::getVirtuals() const return virtual_columns; } -SchemaCache & StorageHDFS::getSchemaCache() +SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx) { - static SchemaCache schema_cache; + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_hdfs", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -742,7 +742,7 @@ std::optional StorageHDFS::tryGetColumnsFromCache( const String & format_name, const ContextPtr & ctx) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(ctx); for (const auto & path : paths) { auto get_last_mod_time = [&]() -> std::optional @@ -770,12 +770,12 @@ void StorageHDFS::addColumnsToCache( const String & format_name, const ContextPtr & ctx) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(ctx); Strings sources; sources.reserve(paths.size()); std::transform(paths.begin(), paths.end(), std::back_inserter(sources), [&](const String & path){ return fs::path(uri_without_path) / path; }); Strings cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx); - schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds()); + schema_cache.addMany(cache_keys, columns); } } diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index 55d3d88a05d..a0d61f4bd2a 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -66,12 +66,12 @@ public: const String & compression_method, ContextPtr ctx); + static SchemaCache & getSchemaCache(const ContextPtr & ctx); + protected: friend class HDFSSource; private: - static SchemaCache & getSchemaCache(); - static std::optional tryGetColumnsFromCache( const Strings & paths, const String & uri_without_path, diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 5ff54c5fd1f..a6a1c878074 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1206,9 +1206,9 @@ NamesAndTypesList StorageFile::getVirtuals() const {"_file", std::make_shared(std::make_shared())}}; } -SchemaCache & StorageFile::getSchemaCache() +SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context) { - static SchemaCache schema_cache; + static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -1216,7 +1216,7 @@ std::optional StorageFile::tryGetColumnsFromCache( const Strings & paths, const String & format_name, const std::optional & format_settings, ContextPtr context) { /// Check if the cache contains one of the paths. - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(context); struct stat file_stat{}; for (const auto & path : paths) { @@ -1244,9 +1244,9 @@ void StorageFile::addColumnsToCache( const std::optional & format_settings, const ContextPtr & context) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(context); Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context); - schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds()); + schema_cache.addMany(cache_keys, columns); } } diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 9a779e272b5..e60e5f6b371 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -87,6 +87,8 @@ public: const std::optional & format_settings, ContextPtr context); + static SchemaCache & getSchemaCache(const ContextPtr & context); + protected: friend class StorageFileSource; friend class StorageFileSink; @@ -94,8 +96,6 @@ protected: private: void setStorageMetadata(CommonArguments args); - static SchemaCache & getSchemaCache(); - static std::optional tryGetColumnsFromCache( const Strings & paths, const String & format_name, const std::optional & format_settings, ContextPtr context); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 2175a86f950..7351cbed994 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1327,9 +1327,9 @@ bool StorageS3::supportsPartitionBy() const return true; } -SchemaCache & StorageS3::getSchemaCache() +SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) { - static SchemaCache schema_cache; + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -1342,7 +1342,7 @@ std::optional StorageS3::tryGetColumnsFromCache( const std::optional & format_settings, const ContextPtr & ctx) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(ctx); for (auto it = begin; it < end; ++it) { String path = fs::path(s3_configuration.uri.bucket) / *it; @@ -1391,8 +1391,8 @@ void StorageS3::addColumnsToCache( sources.reserve(keys.size()); std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; }); Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); - auto & schema_cache = getSchemaCache(); - schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds()); + auto & schema_cache = getSchemaCache(ctx); + schema_cache.addMany(cache_keys, columns); } } diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index f683d7f5a45..3a02237570d 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -211,6 +211,8 @@ public: S3Settings::ReadWriteSettings rw_settings; }; + static SchemaCache & getSchemaCache(const ContextPtr & ctx); + private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; @@ -259,8 +261,6 @@ private: bool supportsSubsetOfColumns() const override; - static SchemaCache & getSchemaCache(); - static std::optional tryGetColumnsFromCache( const Strings::const_iterator & begin, const Strings::const_iterator & end, diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 16ee06e3fbb..4f6f1d2a3fb 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -790,9 +790,9 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad } } -SchemaCache & IStorageURLBase::getSchemaCache() +SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context) { - static SchemaCache schema_cache; + static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } @@ -804,7 +804,7 @@ std::optional IStorageURLBase::tryGetColumnsFromCache( const std::optional & format_settings, const ContextPtr & context) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(context); for (const auto & url : urls) { auto get_last_mod_time = [&]() -> std::optional @@ -813,7 +813,7 @@ std::optional IStorageURLBase::tryGetColumnsFromCache( /// Some URLs could not have Last-Modified header, in this case we cannot be sure that /// data wasn't changed after adding it's schema to cache. Use schema from cache only if /// special setting for this case is enabled. - if (!last_mod_time && context->getSettingsRef().allow_urls_without_last_mod_time_in_schema_inference_cache) + if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url) return 0; return last_mod_time; }; @@ -834,9 +834,9 @@ void IStorageURLBase::addColumnsToCache( const std::optional & format_settings, const ContextPtr & context) { - auto & schema_cache = getSchemaCache(); + auto & schema_cache = getSchemaCache(context); Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context); - schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds()); + schema_cache.addMany(cache_keys, columns); } std::optional IStorageURLBase::getLastModificationTime( diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 28123e94ab4..3fac574cf44 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -49,6 +49,8 @@ public: const std::optional & format_settings, ContextPtr context); + static SchemaCache & getSchemaCache(const ContextPtr & context); + protected: IStorageURLBase( const String & uri_, @@ -99,8 +101,6 @@ protected: private: virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; - static SchemaCache & getSchemaCache(); - static std::optional tryGetColumnsFromCache( const Strings & urls, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, diff --git a/src/Storages/System/StorageSystemSchemaInferenceCache.cpp b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp new file mode 100644 index 00000000000..8bcb460baf0 --- /dev/null +++ b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +static String getSchemaString(const ColumnsDescription & columns) +{ + WriteBufferFromOwnString buf; + const auto & names_and_types = columns.getAll(); + for (auto it = names_and_types.begin(); it != names_and_types.end(); ++it) + { + if (it != names_and_types.begin()) + writeCString(", ", buf); + writeString(it->name, buf); + writeChar(' ', buf); + writeString(it->type->getName(), buf); + } + + return buf.str(); +} + +NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes() +{ + return { + {"storage", std::make_shared()}, + {"source", std::make_shared()}, + {"format", std::make_shared()}, + {"additional_format_info", std::make_shared()}, + {"registration_time", std::make_shared()}, + {"schema", std::make_shared()} + }; +} + +void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +{ + fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File"); + fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3"); + fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS"); + fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL"); +} + +void StorageSystemSchemaInferenceCache::fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name) const +{ + auto s3_schema_cache_data = schema_cache.getAll(); + String source; + String format; + String additional_format_info; + for (const auto & [key, schema_info] : s3_schema_cache_data) + { + splitSchemaCacheKey(key, source, format, additional_format_info); + res_columns[0]->insert(storage_name); + res_columns[1]->insert(source); + res_columns[2]->insert(format); + res_columns[3]->insert(additional_format_info); + res_columns[4]->insert(schema_info.registration_time); + res_columns[5]->insert(getSchemaString(schema_info.columns)); + } +} + +} diff --git a/src/Storages/System/StorageSystemSchemaInferenceCache.h b/src/Storages/System/StorageSystemSchemaInferenceCache.h new file mode 100644 index 00000000000..56200131bee --- /dev/null +++ b/src/Storages/System/StorageSystemSchemaInferenceCache.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class StorageSystemSchemaInferenceCache final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemSettingsChanges"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override; + + void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name) const; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index dbef2df953b..ab1ffdf209a 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -74,6 +74,7 @@ #include #include #include +#include #ifdef OS_LINUX #include @@ -133,6 +134,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database) attach(context, system_database, "licenses"); attach(context, system_database, "time_zones"); attach(context, system_database, "backups"); + attach(context, system_database, "schema_inference_cache"); #ifdef OS_LINUX attach(context, system_database, "stack_trace"); #endif diff --git a/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml b/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml new file mode 100644 index 00000000000..de3c4cbe57f --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml @@ -0,0 +1,4 @@ + + + 2 + diff --git a/tests/integration/test_file_schema_inference_cache/test.py b/tests/integration/test_file_schema_inference_cache/test.py index ed80ed43952..0f078172963 100755 --- a/tests/integration/test_file_schema_inference_cache/test.py +++ b/tests/integration/test_file_schema_inference_cache/test.py @@ -4,7 +4,8 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node = cluster.add_instance( - "node", stay_alive=True, main_configs=["configs/config.d/query_log.xml"] + "node", stay_alive=True, + main_configs=["configs/config.d/query_log.xml", "configs/config.d/schema_cache.xml"] ) @@ -27,122 +28,115 @@ def get_profile_event_for_query(node, query, profile_event): ) +def check_cache_misses(node, file, amount=1): + assert get_profile_event_for_query(node, f"desc file('{file}')", + "SchemaInferenceCacheMisses") == amount + + +def check_cache_hits(node, file, amount=1): + assert get_profile_event_for_query(node, f"desc file('{file}')", + "SchemaInferenceCacheHits") == amount + + +def check_cache_invalidations(node, file, amount=1): + assert get_profile_event_for_query(node, f"desc file('{file}')", + "SchemaInferenceCacheInvalidations") == amount + + +def check_cache_evictions(node, file, amount=1): + assert get_profile_event_for_query(node, f"desc file('{file}')", + "SchemaInferenceCacheEvictions") == amount + + +def check_cache(node, expected_files): + sources = node.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files) + + def test(start_cluster): node.query("insert into function file('data.jsonl') select * from numbers(100)") time.sleep(1) - desc_query = "desc file('data.jsonl')" - node.query(desc_query) - cache_misses = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - desc_query = "desc file('data.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + node.query("desc file('data.jsonl')") + check_cache(node, ["data.jsonl"]) + check_cache_misses(node, "data.jsonl") + + node.query("desc file('data.jsonl')") + check_cache_hits(node, "data.jsonl") node.query("insert into function file('data.jsonl') select * from numbers(100)") - time.sleep(1) - desc_query = "desc file('data.jsonl')" - node.query(desc_query) - cache_invalidations = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 + node.query("desc file('data.jsonl')") + check_cache_invalidations(node, "data.jsonl") node.query("insert into function file('data1.jsonl') select * from numbers(100)") - desc_query = ( - "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1" - ) - node.query(desc_query) - cache_misses = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + time.sleep(1) - time.sleep(2) + node.query("desc file('data1.jsonl')") + check_cache(node, ["data.jsonl", "data1.jsonl"]) + check_cache_misses(node, "data1.jsonl") - desc_query = ( - "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000" - ) - node.query(desc_query) - cache_misses = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - cache_ttl_expirations = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = "desc file('data1.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_updates = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheTTLUpdates" - ) - assert cache_ttl_updates == 1 - - node.query("insert into function file('data1.jsonl') select * from numbers(100)") - desc_query = ( - "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1" - ) - node.query(desc_query) - cache_invalidations = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 - - time.sleep(2) - - desc_query = "desc file('data.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_expirations = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = "desc file('data1.jsonl')" - node.query(desc_query) - cache_misses = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + node.query("desc file('data1.jsonl')") + check_cache_hits(node, "data1.jsonl") node.query("insert into function file('data2.jsonl') select * from numbers(100)") - node.query("insert into function file('data3.jsonl') select * from numbers(100)") - time.sleep(1) - desc_query = "desc file('data*.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + node.query("desc file('data2.jsonl')") + check_cache(node, ["data1.jsonl", "data2.jsonl"]) + check_cache_misses(node, "data2.jsonl") + check_cache_evictions(node, "data2.jsonl") - desc_query = "desc file('data2.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + node.query("desc file('data2.jsonl')") + check_cache_hits(node, "data2.jsonl") - desc_query = "desc file('data3.jsonl')" - node.query(desc_query) - cache_hits = get_profile_event_for_query( - node, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + node.query("desc file('data1.jsonl')") + check_cache_hits(node, "data1.jsonl") + + node.query("desc file('data.jsonl')") + check_cache(node, ["data.jsonl", "data1.jsonl"]) + check_cache_misses(node, "data.jsonl") + check_cache_evictions(node, "data.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache(node, ["data.jsonl", "data2.jsonl"]) + check_cache_misses(node, "data2.jsonl") + check_cache_evictions(node, "data2.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache_hits(node, "data2.jsonl") + + node.query("desc file('data.jsonl')") + check_cache_hits(node, "data.jsonl") + + node.query("insert into function file('data3.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data*.jsonl')") + check_cache(node, ["data.jsonl", "data1.jsonl"]) + check_cache_hits(node, "data*.jsonl") + + node.query("desc file('data.jsonl')") + check_cache_hits(node, "data.jsonl") + + node.query("desc file('data1.jsonl')") + check_cache_hits(node, "data1.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache_misses(node, "data2.jsonl") + + node.query("desc file('data3.jsonl')") + check_cache_misses(node, "data3.jsonl") + + node.query("system drop schema cache for file") + check_cache(node, []) + + node.query("desc file('data*.jsonl')") + check_cache_misses(node, "data*.jsonl", 4) + + node.query("system drop schema cache") + check_cache(node, []) + + node.query("desc file('data*.jsonl')") + check_cache_misses(node, "data*.jsonl", 4) diff --git a/tests/integration/test_storage_hdfs/configs/schema_cache.xml b/tests/integration/test_storage_hdfs/configs/schema_cache.xml new file mode 100644 index 00000000000..37639649b5f --- /dev/null +++ b/tests/integration/test_storage_hdfs/configs/schema_cache.xml @@ -0,0 +1,3 @@ + + 2 + \ No newline at end of file diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index 5e8cd72889b..77f6c02cf42 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -8,7 +8,7 @@ from pyhdfs import HdfsClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( - "node1", main_configs=["configs/macro.xml"], with_hdfs=True + "node1", main_configs=["configs/macro.xml", "configs/schema_cache.xml"], with_hdfs=True ) @@ -21,7 +21,7 @@ def started_cluster(): cluster.shutdown() -def _test_read_write_storage(started_cluster): +def test_read_write_storage(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query("drop table if exists SimpleHDFSStorage SYNC") node1.query( @@ -32,7 +32,7 @@ def _test_read_write_storage(started_cluster): assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n" -def _test_read_write_storage_with_globs(started_cluster): +def test_read_write_storage_with_globs(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( @@ -79,7 +79,7 @@ def _test_read_write_storage_with_globs(started_cluster): assert "in readonly mode" in str(ex) -def _test_read_write_table(started_cluster): +def test_read_write_table(started_cluster): hdfs_api = started_cluster.hdfs_api data = "1\tSerialize\t555.222\n2\tData\t777.333\n" @@ -95,7 +95,7 @@ def _test_read_write_table(started_cluster): ) -def _test_write_table(started_cluster): +def test_write_table(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( @@ -110,7 +110,7 @@ def _test_write_table(started_cluster): assert node1.query("select * from OtherHDFSStorage order by id") == result -def _test_bad_hdfs_uri(started_cluster): +def test_bad_hdfs_uri(started_cluster): try: node1.query( "create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS('hads:hgsdfs100500:9000/other_storage', 'TSV')" @@ -136,7 +136,7 @@ def _test_bad_hdfs_uri(started_cluster): @pytest.mark.timeout(800) -def _test_globs_in_read_table(started_cluster): +def test_globs_in_read_table(started_cluster): hdfs_api = started_cluster.hdfs_api some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n" @@ -191,7 +191,7 @@ def _test_globs_in_read_table(started_cluster): ).rstrip() == str(files_amount) -def _test_read_write_gzip_table(started_cluster): +def test_read_write_gzip_table(started_cluster): hdfs_api = started_cluster.hdfs_api data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n" @@ -207,7 +207,7 @@ def _test_read_write_gzip_table(started_cluster): ) -def _test_read_write_gzip_table_with_parameter_gzip(started_cluster): +def test_read_write_gzip_table_with_parameter_gzip(started_cluster): hdfs_api = started_cluster.hdfs_api data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n" @@ -223,7 +223,7 @@ def _test_read_write_gzip_table_with_parameter_gzip(started_cluster): ) -def _test_read_write_table_with_parameter_none(started_cluster): +def test_read_write_table_with_parameter_none(started_cluster): hdfs_api = started_cluster.hdfs_api data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n" @@ -239,7 +239,7 @@ def _test_read_write_table_with_parameter_none(started_cluster): ) -def _test_read_write_gzip_table_with_parameter_auto_gz(started_cluster): +def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster): hdfs_api = started_cluster.hdfs_api data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n" @@ -255,7 +255,7 @@ def _test_read_write_gzip_table_with_parameter_auto_gz(started_cluster): ) -def _test_write_gz_storage(started_cluster): +def test_write_gz_storage(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( @@ -266,7 +266,7 @@ def _test_write_gz_storage(started_cluster): assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n" -def _test_write_gzip_storage(started_cluster): +def test_write_gzip_storage(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( @@ -277,7 +277,7 @@ def _test_write_gzip_storage(started_cluster): assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n" -def _test_virtual_columns(started_cluster): +def test_virtual_columns(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( @@ -295,7 +295,7 @@ def _test_virtual_columns(started_cluster): ) -def _test_read_files_with_spaces(started_cluster): +def test_read_files_with_spaces(started_cluster): hdfs_api = started_cluster.hdfs_api fs = HdfsClient(hosts=started_cluster.hdfs_ip) @@ -316,7 +316,7 @@ def _test_read_files_with_spaces(started_cluster): fs.delete(dir, recursive=True) -def _test_truncate_table(started_cluster): +def test_truncate_table(started_cluster): hdfs_api = started_cluster.hdfs_api node1.query( "create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/tr', 'TSV')" @@ -329,7 +329,7 @@ def _test_truncate_table(started_cluster): node1.query("drop table test_truncate") -def _test_partition_by(started_cluster): +def test_partition_by(started_cluster): hdfs_api = started_cluster.hdfs_api table_format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -373,7 +373,7 @@ def _test_partition_by(started_cluster): assert result.strip() == "1\t2\t3" -def _test_seekable_formats(started_cluster): +def test_seekable_formats(started_cluster): hdfs_api = started_cluster.hdfs_api table_function = ( @@ -394,7 +394,7 @@ def _test_seekable_formats(started_cluster): assert int(result) == 5000000 -def _test_read_table_with_default(started_cluster): +def test_read_table_with_default(started_cluster): hdfs_api = started_cluster.hdfs_api data = "n\n100\n" @@ -410,7 +410,7 @@ def _test_read_table_with_default(started_cluster): ) -def _test_schema_inference(started_cluster): +def test_schema_inference(started_cluster): node1.query( f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000)" ) @@ -433,7 +433,7 @@ def _test_schema_inference(started_cluster): assert int(result) == 5000000 -def _test_hdfsCluster(started_cluster): +def test_hdfsCluster(started_cluster): hdfs_api = started_cluster.hdfs_api fs = HdfsClient(hosts=started_cluster.hdfs_ip) dir = "/test_hdfsCluster" @@ -459,13 +459,13 @@ def _test_hdfsCluster(started_cluster): fs.delete(dir, recursive=True) -def _test_hdfs_directory_not_exist(started_cluster): +def test_hdfs_directory_not_exist(started_cluster): ddl = "create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/data/not_eixst', 'TSV')" node1.query(ddl) assert "" == node1.query("select * from HDFSStorageWithNotExistDir") -def _test_overwrite(started_cluster): +def test_overwrite(started_cluster): hdfs_api = started_cluster.hdfs_api table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')" @@ -484,7 +484,7 @@ def _test_overwrite(started_cluster): assert int(result) == 10 -def _test_multiple_inserts(started_cluster): +def test_multiple_inserts(started_cluster): hdfs_api = started_cluster.hdfs_api table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')" @@ -520,7 +520,7 @@ def _test_multiple_inserts(started_cluster): assert int(result) == 60 -def _test_format_detection(started_cluster): +def test_format_detection(started_cluster): node1.query( f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')" ) @@ -529,7 +529,7 @@ def _test_format_detection(started_cluster): assert int(result) == 1 -def _test_schema_inference_with_globs(started_cluster): +def test_schema_inference_with_globs(started_cluster): node1.query( f"insert into table function hdfs('hdfs://hdfs1:9000/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL" ) @@ -570,7 +570,7 @@ def _test_schema_inference_with_globs(started_cluster): ) -def _test_insert_select_schema_inference(started_cluster): +def test_insert_select_schema_inference(started_cluster): node1.query( f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst') select toUInt64(1) as x" ) @@ -582,7 +582,7 @@ def _test_insert_select_schema_inference(started_cluster): assert int(result) == 1 -def _test_cluster_join(started_cluster): +def test_cluster_join(started_cluster): result = node1.query( """ SELECT l.id,r.id FROM hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') as l @@ -593,7 +593,7 @@ def _test_cluster_join(started_cluster): assert "AMBIGUOUS_COLUMN_NAME" not in result -def _test_cluster_macro(started_cluster): +def test_cluster_macro(started_cluster): with_macro = node1.query( """ SELECT id FROM hdfsCluster('{default_cluster_macro}', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') @@ -609,7 +609,7 @@ def _test_cluster_macro(started_cluster): assert TSV(with_macro) == TSV(no_macro) -def _test_virtual_columns_2(started_cluster): +def test_virtual_columns_2(started_cluster): hdfs_api = started_cluster.hdfs_api table_function = ( @@ -639,129 +639,134 @@ def get_profile_event_for_query(node, query, profile_event): ) +def check_cache_misses(node1, file, amount=1): + assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheMisses") == amount + + +def check_cache_hits(node1, file, amount=1): + assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheHits") == amount + + +def check_cache_invalidations(node1, file, amount=1): + assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheInvalidations") == amount + + +def check_cache_evictions(node1, file, amount=1): + assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheEvictions") == amount + + +def check_cache(node1, expected_files): + sources = node1.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files) + + +def run_describe_query(node, file): + query = f"desc hdfs('hdfs://hdfs1:9000/{file}')" + node.query(query) + + def test_schema_inference_cache(started_cluster): node1.query( - f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" ) time.sleep(1) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" - node1.query(desc_query) - cache_misses = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + run_describe_query(node1, "test_cache0.jsonl") + check_cache(node1, ["test_cache0.jsonl"]) + check_cache_misses(node1, "test_cache0.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_hits(node1, "test_cache0.jsonl") node1.query( - f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" ) time.sleep(1) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" - node1.query(desc_query) - cache_invalidations = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_invalidations(node1, "test_cache0.jsonl") node1.query( f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" ) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1" - node1.query(desc_query) - cache_misses = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + time.sleep(1) - time.sleep(2) + run_describe_query(node1, "test_cache1.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(node1, "test_cache1.jsonl") - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1000" - node1.query(desc_query) - cache_misses = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - cache_ttl_expirations = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_updates = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheTTLUpdates" - ) - assert cache_ttl_updates == 1 - - node1.query( - f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" - ) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1" - node1.query(desc_query) - cache_invalidations = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 - - time.sleep(2) - - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_expirations = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')" - node1.query(desc_query) - cache_misses = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + run_describe_query(node1, "test_cache1.jsonl") + check_cache_hits(node1, "test_cache1.jsonl") node1.query( f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" ) + time.sleep(1) + + run_describe_query(node1, "test_cache2.jsonl") + check_cache(node1, ["test_cache1.jsonl", "test_cache2.jsonl"]) + check_cache_misses(node1, "test_cache2.jsonl") + check_cache_evictions(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache_hits(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache1.jsonl") + check_cache_hits(node1, "test_cache1.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(node1, "test_cache0.jsonl") + check_cache_evictions(node1, "test_cache0.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache2.jsonl"]) + check_cache_misses(node1, "test_cache2.jsonl") + check_cache_evictions(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache_hits(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_hits(node1, "test_cache0.jsonl") + node1.query( f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" ) - time.sleep(1) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache*.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + files = "test_cache{0,1,2,3}.jsonl" + run_describe_query(node1, files) + check_cache(node1, ["test_cache2.jsonl", "test_cache3.jsonl"]) + check_cache_hits(node1, files) - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache2.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + run_describe_query(node1, "test_cache2.jsonl") + check_cache_hits(node1, "test_cache2.jsonl") - desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache3.jsonl')" - node1.query(desc_query) - cache_hits = get_profile_event_for_query( - node1, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + run_describe_query(node1, "test_cache3.jsonl") + check_cache_hits(node1, "test_cache3.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_misses(node1, "test_cache0.jsonl") + + run_describe_query(node1, "test_cache1.jsonl") + check_cache_misses(node1, "test_cache1.jsonl") + + node1.query(f"system drop schema cache for hdfs") + check_cache(node1, []) + + run_describe_query(node1, files) + check_cache_misses(node1, files, 4) + + node1.query("system drop schema cache") + check_cache(node1, []) + + run_describe_query(node1, files) + check_cache_misses(node1, files, 4) if __name__ == "__main__": diff --git a/tests/integration/test_storage_s3/configs/schema_cache.xml b/tests/integration/test_storage_s3/configs/schema_cache.xml new file mode 100644 index 00000000000..78f7e4a3563 --- /dev/null +++ b/tests/integration/test_storage_s3/configs/schema_cache.xml @@ -0,0 +1,4 @@ + + 2 + 2 + \ No newline at end of file diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 4087f4aa0b8..89bb10563ff 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -103,7 +103,7 @@ def started_cluster(): cluster.add_instance( "dummy", with_minio=True, - main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"], + main_configs=["configs/defaultS3.xml", "configs/named_collections.xml", "configs/schema_cache.xml"], ) cluster.add_instance( "s3_max_redirects", @@ -148,7 +148,7 @@ def run_query(instance, query, stdin=None, settings=None): pytest.param("'wrongid','wrongkey',", False, "zstd", id="zstd"), ], ) -def _test_put(started_cluster, maybe_auth, positive, compression): +def test_put(started_cluster, maybe_auth, positive, compression): # type: (ClickHouseCluster) -> None bucket = ( @@ -174,7 +174,7 @@ def _test_put(started_cluster, maybe_auth, positive, compression): assert values_csv == get_s3_file_content(started_cluster, bucket, filename) -def _test_partition_by(started_cluster): +def test_partition_by(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -201,7 +201,7 @@ def _test_partition_by(started_cluster): assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test2_45.csv") -def _test_partition_by_string_column(started_cluster): +def test_partition_by_string_column(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_format = "col_num UInt32, col_str String" @@ -221,7 +221,7 @@ def _test_partition_by_string_column(started_cluster): assert '78,"你好"\n' == get_s3_file_content(started_cluster, bucket, "test_你好.csv") -def _test_partition_by_const_column(started_cluster): +def test_partition_by_const_column(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -239,7 +239,7 @@ def _test_partition_by_const_column(started_cluster): @pytest.mark.parametrize("special", ["space", "plus"]) -def _test_get_file_with_special(started_cluster, special): +def test_get_file_with_special(started_cluster, special): symbol = {"space": " ", "plus": "+"}[special] urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special] auth = "'minio','minio123'," @@ -281,7 +281,7 @@ def _test_get_file_with_special(started_cluster, special): @pytest.mark.parametrize("special", ["space", "plus", "plus2"]) -def _test_get_path_with_special(started_cluster, special): +def test_get_path_with_special(started_cluster, special): symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special] safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special] auth = "'minio','minio123'," @@ -293,7 +293,7 @@ def _test_get_path_with_special(started_cluster, special): # Test put no data to S3. @pytest.mark.parametrize("auth", [pytest.param("'minio','minio123',", id="minio")]) -def _test_empty_put(started_cluster, auth): +def test_empty_put(started_cluster, auth): # type: (ClickHouseCluster, str) -> None bucket = started_cluster.minio_bucket @@ -349,7 +349,7 @@ def _test_empty_put(started_cluster, auth): pytest.param("'wrongid','wrongkey',", False, id="negative"), ], ) -def _test_put_csv(started_cluster, maybe_auth, positive): +def test_put_csv(started_cluster, maybe_auth, positive): # type: (ClickHouseCluster, bool, str) -> None bucket = ( @@ -381,7 +381,7 @@ def _test_put_csv(started_cluster, maybe_auth, positive): # Test put and get with S3 server redirect. -def _test_put_get_with_redirect(started_cluster): +def test_put_get_with_redirect(started_cluster): # type: (ClickHouseCluster) -> None bucket = started_cluster.minio_bucket @@ -419,7 +419,7 @@ def _test_put_get_with_redirect(started_cluster): # Test put with restricted S3 server redirect. -def _test_put_with_zero_redirect(started_cluster): +def test_put_with_zero_redirect(started_cluster): # type: (ClickHouseCluster) -> None bucket = started_cluster.minio_bucket @@ -458,7 +458,7 @@ def _test_put_with_zero_redirect(started_cluster): assert exception_raised -def _test_put_get_with_globs(started_cluster): +def test_put_get_with_globs(started_cluster): # type: (ClickHouseCluster) -> None unique_prefix = random.randint(1, 10000) bucket = started_cluster.minio_bucket @@ -515,7 +515,7 @@ def _test_put_get_with_globs(started_cluster): # ("'minio','minio123',",True), Redirect with credentials not working with nginx. ], ) -def _test_multipart(started_cluster, maybe_auth, positive): +def test_multipart(started_cluster, maybe_auth, positive): # type: (ClickHouseCluster) -> None bucket = ( @@ -604,7 +604,7 @@ def _test_multipart(started_cluster, maybe_auth, positive): ) -def _test_remote_host_filter(started_cluster): +def test_remote_host_filter(started_cluster): instance = started_cluster.instances["restricted_dummy"] format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -624,7 +624,7 @@ def _test_remote_host_filter(started_cluster): assert "not allowed in configuration file" in instance.query_and_get_error(query) -def _test_wrong_s3_syntax(started_cluster): +def test_wrong_s3_syntax(started_cluster): instance = started_cluster.instances["dummy"] # type: ClickHouseInstance expected_err_msg = "Code: 42" # NUMBER_OF_ARGUMENTS_DOESNT_MATCH @@ -638,7 +638,7 @@ def _test_wrong_s3_syntax(started_cluster): # https://en.wikipedia.org/wiki/One_Thousand_and_One_Nights -def _test_s3_glob_scheherazade(started_cluster): +def test_s3_glob_scheherazade(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -734,7 +734,7 @@ def replace_config(old, new): config.close() -def _test_custom_auth_headers(started_cluster): +def test_custom_auth_headers(started_cluster): table_format = "column1 UInt32, column2 UInt32, column3 UInt32" filename = "test.csv" get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format( @@ -773,7 +773,7 @@ def _test_custom_auth_headers(started_cluster): instance.query("DROP TABLE test") -def _test_custom_auth_headers_exclusion(started_cluster): +def test_custom_auth_headers_exclusion(started_cluster): table_format = "column1 UInt32, column2 UInt32, column3 UInt32" filename = "test.csv" get_query = f"SELECT * FROM s3('http://resolver:8080/{started_cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')" @@ -787,7 +787,7 @@ def _test_custom_auth_headers_exclusion(started_cluster): assert "Forbidden Error" in ei.value.stderr -def _test_infinite_redirect(started_cluster): +def test_infinite_redirect(started_cluster): bucket = "redirected" table_format = "column1 UInt32, column2 UInt32, column3 UInt32" filename = "test.csv" @@ -810,7 +810,7 @@ def _test_infinite_redirect(started_cluster): pytest.param("gz", "auto", id="gz"), ], ) -def _test_storage_s3_get_gzip(started_cluster, extension, method): +def test_storage_s3_get_gzip(started_cluster, extension, method): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] filename = f"test_get_gzip.{extension}" @@ -853,7 +853,7 @@ def _test_storage_s3_get_gzip(started_cluster, extension, method): run_query(instance, f"DROP TABLE {name}") -def _test_storage_s3_get_unstable(started_cluster): +def test_storage_s3_get_unstable(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64" @@ -862,7 +862,7 @@ def _test_storage_s3_get_unstable(started_cluster): assert result.splitlines() == ["500001,500000,0"] -def _test_storage_s3_put_uncompressed(started_cluster): +def test_storage_s3_put_uncompressed(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] filename = "test_put_uncompressed.bin" @@ -903,7 +903,7 @@ def _test_storage_s3_put_uncompressed(started_cluster): "extension,method", [pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")], ) -def _test_storage_s3_put_gzip(started_cluster, extension, method): +def test_storage_s3_put_gzip(started_cluster, extension, method): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] filename = f"test_put_gzip.{extension}" @@ -945,7 +945,7 @@ def _test_storage_s3_put_gzip(started_cluster, extension, method): assert sum([int(i.split(",")[1]) for i in uncompressed_content.splitlines()]) == 708 -def _test_truncate_table(started_cluster): +def test_truncate_table(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance name = "truncate" @@ -975,7 +975,7 @@ def _test_truncate_table(started_cluster): assert instance.query("SELECT * FROM {}".format(name)) == "" -def _test_predefined_connection_configuration(started_cluster): +def test_predefined_connection_configuration(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance name = "test_table" @@ -998,7 +998,7 @@ def _test_predefined_connection_configuration(started_cluster): result = "" -def _test_url_reconnect_in_the_middle(started_cluster): +def test_url_reconnect_in_the_middle(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] table_format = "id String, data String" @@ -1047,7 +1047,7 @@ def _test_url_reconnect_in_the_middle(started_cluster): assert int(result) == 3914219105369203805 -def _test_seekable_formats(started_cluster): +def test_seekable_formats(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance @@ -1083,7 +1083,7 @@ def _test_seekable_formats(started_cluster): assert int(result) > 80 -def _test_seekable_formats_url(started_cluster): +def test_seekable_formats_url(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] # type: ClickHouseInstance @@ -1109,7 +1109,7 @@ def _test_seekable_formats_url(started_cluster): assert int(result) == 1000000 -def _test_empty_file(started_cluster): +def test_empty_file(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1124,7 +1124,7 @@ def _test_empty_file(started_cluster): assert int(result) == 0 -def _test_insert_with_path_with_globs(started_cluster): +def test_insert_with_path_with_globs(started_cluster): instance = started_cluster.instances["dummy"] table_function_3 = f"s3('http://minio1:9001/root/test_parquet*', 'minio', 'minio123', 'Parquet', 'a Int32, b String')" @@ -1133,7 +1133,7 @@ def _test_insert_with_path_with_globs(started_cluster): ) -def _test_s3_schema_inference(started_cluster): +def test_s3_schema_inference(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1179,7 +1179,7 @@ def _test_s3_schema_inference(started_cluster): assert int(result) == 5000000 -def _test_empty_file(started_cluster): +def test_empty_file(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1194,7 +1194,7 @@ def _test_empty_file(started_cluster): assert int(result) == 0 -def _test_overwrite(started_cluster): +def test_overwrite(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1215,7 +1215,7 @@ def _test_overwrite(started_cluster): assert int(result) == 200 -def _test_create_new_files_on_insert(started_cluster): +def test_create_new_files_on_insert(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1256,7 +1256,7 @@ def _test_create_new_files_on_insert(started_cluster): assert int(result) == 60 -def _test_format_detection(started_cluster): +def test_format_detection(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1291,7 +1291,7 @@ def _test_format_detection(started_cluster): assert int(result) == 1 -def _test_schema_inference_from_globs(started_cluster): +def test_schema_inference_from_globs(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1364,7 +1364,7 @@ def _test_schema_inference_from_globs(started_cluster): ) -def _test_signatures(started_cluster): +def test_signatures(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1398,7 +1398,7 @@ def _test_signatures(started_cluster): assert int(result) == 1 -def _test_select_columns(started_cluster): +def test_select_columns(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] name = "test_table2" @@ -1429,7 +1429,7 @@ def _test_select_columns(started_cluster): assert int(result1) * 3 <= int(result2) -def _test_insert_select_schema_inference(started_cluster): +def test_insert_select_schema_inference(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1447,7 +1447,7 @@ def _test_insert_select_schema_inference(started_cluster): assert int(result) == 1 -def _test_parallel_reading_with_memory_limit(started_cluster): +def test_parallel_reading_with_memory_limit(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1468,7 +1468,7 @@ def _test_parallel_reading_with_memory_limit(started_cluster): assert int(result) == 1 -def _test_wrong_format_usage(started_cluster): +def test_wrong_format_usage(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] @@ -1493,259 +1493,178 @@ def get_profile_event_for_query(instance, query, profile_event): ) +def check_cache_misses(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert get_profile_event_for_query(instance, query, + "SchemaInferenceCacheMisses") == amount + + +def check_cache_hits(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert get_profile_event_for_query(instance, query, + "SchemaInferenceCacheHits") == amount + + +def check_cache_invalidations(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert get_profile_event_for_query(instance, query, + "SchemaInferenceCacheInvalidations") == amount + + +def check_cache_evictions(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert get_profile_event_for_query(instance, query, + "SchemaInferenceCacheEvictions") == amount + + +def run_describe_query(instance, file, storage_name, started_cluster, bucket): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + instance.query(query) + + +def check_cache(instance, expected_files): + sources = instance.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files) + + def test_schema_inference_cache(started_cluster): bucket = started_cluster.minio_bucket instance = started_cluster.instances["dummy"] - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) + def test(storage_name): + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) - time.sleep(1) + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache(instance, ["test_cache0.jsonl"]) + check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache_invalidations(instance, "test_cache0.jsonl", storage_name, + started_cluster, bucket) - time.sleep(1) + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_invalidations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 + run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) + check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) - time.sleep(2) + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1000" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - cache_ttl_expirations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 + run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache(instance, ["test_cache1.jsonl", "test_cache2.jsonl"]) + check_cache_misses(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache_evictions(instance, "test_cache2.jsonl", storage_name, + started_cluster, bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_updates = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLUpdates" - ) - assert cache_ttl_updates == 1 + run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1" - instance.query(desc_query) - cache_invalidations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 + run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) - time.sleep(2) + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache_evictions(instance, "test_cache0.jsonl", storage_name, + started_cluster, bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_expirations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 + run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache(instance, ["test_cache0.jsonl", "test_cache2.jsonl"]) + check_cache_misses(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache_evictions(instance, "test_cache2.jsonl", storage_name, + started_cluster, bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) - time.sleep(1) + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache*.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + files = "test_cache{0,1,2,3}.jsonl" + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache(instance, ["test_cache2.jsonl", "test_cache3.jsonl"]) + check_cache_hits(instance, files, storage_name, started_cluster, bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster, + bucket) - desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + run_describe_query(instance, "test_cache3.jsonl", storage_name, started_cluster, + bucket) + check_cache_hits(instance, "test_cache3.jsonl", storage_name, started_cluster, + bucket) - # Test the same scenarious but for URL table function + run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) + check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster, + bucket) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - time.sleep(1) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) + check_cache_misses(instance, "test_cache1.jsonl", storage_name, started_cluster, + bucket) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + instance.query(f"system drop schema cache for {storage_name}") + check_cache(instance, []) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - time.sleep(1) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_invalidations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 + instance.query("system drop schema cache") + check_cache(instance, []) - time.sleep(2) + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1000" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - cache_ttl_expirations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_updates = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLUpdates" - ) - assert cache_ttl_updates == 1 - - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1" - instance.query(desc_query) - cache_invalidations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheInvalidations" - ) - assert cache_invalidations == 1 - - time.sleep(2) - - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - cache_ttl_expirations = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheTTLExpirations" - ) - assert cache_ttl_expirations == 1 - - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')" - instance.query(desc_query) - cache_misses = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheMisses" - ) - assert cache_misses == 1 - - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - instance.query( - f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" - ) - - time.sleep(1) - - file_name = "test_cache{1,2,3}.jsonl" - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name}')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 - - desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')" - instance.query(desc_query) - cache_hits = get_profile_event_for_query( - instance, desc_query, "SchemaInferenceCacheHits" - ) - assert cache_hits == 1 + test("s3") + instance.query("system drop schema cache") + test("url") diff --git a/tests/queries/0_stateless/02375_system_schema_inference_cache.reference b/tests/queries/0_stateless/02375_system_schema_inference_cache.reference new file mode 100644 index 00000000000..b9b2b472767 --- /dev/null +++ b/tests/queries/0_stateless/02375_system_schema_inference_cache.reference @@ -0,0 +1,12 @@ +storage String +source String +format String +additional_format_info String +registration_time DateTime +schema String +x Nullable(String) +s Nullable(String) +x Nullable(String) +s Nullable(String) +File 02374_data1.jsonl JSONEachRow x Nullable(String), s Nullable(String) +File 02374_data2.jsonl JSONEachRow x Nullable(String), s Nullable(String) diff --git a/tests/queries/0_stateless/02375_system_schema_inference_cache.sql b/tests/queries/0_stateless/02375_system_schema_inference_cache.sql new file mode 100644 index 00000000000..45e85350f66 --- /dev/null +++ b/tests/queries/0_stateless/02375_system_schema_inference_cache.sql @@ -0,0 +1,12 @@ +insert into function file('02374_data1.jsonl') select number as x, 'str' as s from numbers(10); +insert into function file('02374_data2.jsonl') select number as x, 'str' as s from numbers(10); + +desc system.schema_inference_cache; +system drop schema cache for file; + +desc file('02374_data1.jsonl'); +desc file('02374_data2.jsonl'); + +select storage, splitByChar('/', source)[-1], format, schema from system.schema_inference_cache where storage='File'; +system drop schema cache for file; +select storage, source, format, schema from system.schema_inference_cache where storage='File';