diff --git a/docs/en/operations/caches.md b/docs/en/operations/caches.md index 28ddb14a511..f2eb1e3ce5c 100644 --- a/docs/en/operations/caches.md +++ b/docs/en/operations/caches.md @@ -19,6 +19,7 @@ Additional cache types: - Compiled expressions cache. - [Avro format](../interfaces/formats.md#data-format-avro) schemas cache. - [Dictionaries](../sql-reference/dictionaries/index.md) data cache. +- Schema inference cache. Indirectly used: diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index af3e8847f03..7fa2d4d3b9c 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3300,7 +3300,7 @@ Possible values: Default value: `0`. -## shutdown_wait_unfinished_queries +## shutdown_wait_unfinished_queries {#shutdown_wait_unfinished_queries} Enables or disables waiting unfinished queries when shutdown server. @@ -3311,13 +3311,13 @@ Possible values: Default value: 0. -## shutdown_wait_unfinished +## shutdown_wait_unfinished {#shutdown_wait_unfinished} The waiting time in seconds for currently handled connections when shutdown server. Default Value: 5. -## memory_overcommit_ratio_denominator +## memory_overcommit_ratio_denominator {#memory_overcommit_ratio_denominator} It represents soft memory limit in case when hard limit is reached on user level. This value is used to compute overcommit ratio for the query. @@ -3326,7 +3326,7 @@ Read more about [memory overcommit](memory-overcommit.md). Default value: `1GiB`. -## memory_usage_overcommit_max_wait_microseconds +## memory_usage_overcommit_max_wait_microseconds {#memory_usage_overcommit_max_wait_microseconds} Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level. If the timeout is reached and memory is not freed, an exception is thrown. @@ -3334,7 +3334,7 @@ Read more about [memory overcommit](memory-overcommit.md). Default value: `5000000`. -## memory_overcommit_ratio_denominator_for_user +## memory_overcommit_ratio_denominator_for_user {#memory_overcommit_ratio_denominator_for_user} It represents soft memory limit in case when hard limit is reached on global level. This value is used to compute overcommit ratio for the query. @@ -3343,6 +3343,36 @@ Read more about [memory overcommit](memory-overcommit.md). Default value: `1GiB`. +## schema_inference_use_cache_for_file {schema_inference_use_cache_for_file} + +Enable schemas cache for schema inference in `file` table function. + +Default value: `true`. + +## schema_inference_use_cache_for_s3 {schema_inference_use_cache_for_s3} + +Enable schemas cache for schema inference in `s3` table function. + +Default value: `true`. + +## schema_inference_use_cache_for_url {schema_inference_use_cache_for_url} + +Enable schemas cache for schema inference in `url` table function. + +Default value: `true`. + +## schema_inference_use_cache_for_hdfs {schema_inference_use_cache_for_hdfs} + +Enable schemas cache for schema inference in `hdfs` table function. + +Default value: `true`. + +## schema_inference_cache_require_modification_time_for_url {#schema_inference_cache_require_modification_time_for_url} + +Use schema from cache for URL with last modification time validation (for urls with Last-Modified header). If this setting is enabled and URL doesn't have Last-Modified header, schema from cache won't be used. + +Default value: `true`. + ## compatibility {#compatibility} This setting changes other settings according to provided ClickHouse version. diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 38e006f18e7..e8d3b453188 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -140,6 +140,7 @@ enum class AccessType M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ + M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 442f59bcbe8..e5275be43c1 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -349,7 +349,13 @@ \ M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ - M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \ + M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \ + \ + M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \ + M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \ + M(SchemaInferenceCacheEvictions, "Number of times a schema from cache was evicted due to overflow") \ + M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") \ + \ M(KeeperPacketsSent, "Packets sent by keeper server") \ M(KeeperPacketsReceived, "Packets received by keeper server") \ M(KeeperRequestTotal, "Total requests number on keeper server") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 5c80e1241bf..a905595b77c 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -609,6 +609,12 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \ M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \ \ + M(Bool, schema_inference_use_cache_for_file, true, "Use cache in schema inference while using file table function", 0) \ + M(Bool, schema_inference_use_cache_for_s3, true, "Use cache in schema inference while using s3 table function", 0) \ + M(Bool, schema_inference_use_cache_for_hdfs, true, "Use cache in schema inference while using hdfs table function", 0) \ + M(Bool, schema_inference_use_cache_for_url, true, "Use cache in schema inference while using url table function", 0) \ + M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \ + \ M(String, compatibility, "", "Changes other settings according to provided ClickHouse version. If we know that we changed some behaviour in ClickHouse by changing some settings in some version, this compatibility setting will control these settings", 0) \ \ M(Map, additional_table_filters, "", "Additional filter expression which would be applied after reading from specified table. Syntax: {'table1': 'expression', 'database.table2': 'expression'}", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index a8ad5ff4b06..149ce1711ca 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -573,6 +573,25 @@ bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) co return target.supports_subset_of_columns; } +void FormatFactory::registerAdditionalInfoForSchemaCacheGetter( + const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter) +{ + auto & target = dict[name].additional_info_for_schema_cache_getter; + if (target) + throw Exception("FormatFactory: additional info for schema cache getter " + name + " is already registered", ErrorCodes::LOGICAL_ERROR); + target = std::move(additional_info_for_schema_cache_getter); +} + +String FormatFactory::getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional & format_settings_) +{ + const auto & additional_info_getter = getCreators(name).additional_info_for_schema_cache_getter; + if (!additional_info_getter) + return ""; + + auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context); + return additional_info_getter(format_settings); +} + bool FormatFactory::isInputFormat(const String & name) const { auto it = dict.find(name); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 260f8197d5e..44b49fd6dc5 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -100,6 +100,13 @@ private: using SchemaReaderCreator = std::function; using ExternalSchemaReaderCreator = std::function; + /// Some formats can extract different schemas from the same source depending on + /// some settings. To process this case in schema cache we should add some additional + /// information to a cache key. This getter should return some string with information + /// about such settings. For example, for Protobuf format it's the path to the schema + /// and the name of the message. + using AdditionalInfoForSchemaCacheGetter = std::function; + struct Creators { InputCreator input_creator; @@ -111,6 +118,7 @@ private: bool supports_subset_of_columns{false}; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; AppendSupportChecker append_support_checker; + AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter; }; using FormatsDictionary = std::unordered_map; @@ -202,6 +210,9 @@ public: bool checkIfFormatHasExternalSchemaReader(const String & name) const; bool checkIfFormatHasAnySchemaReader(const String & name) const; + void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter); + String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional & format_settings_ = std::nullopt); + const FormatsDictionary & getAllFormats() const { return dict; diff --git a/src/Formats/ReadSchemaUtils.cpp b/src/Formats/ReadSchemaUtils.cpp index 5ae21340864..22eef5bd75b 100644 --- a/src/Formats/ReadSchemaUtils.cpp +++ b/src/Formats/ReadSchemaUtils.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -75,12 +74,13 @@ ColumnsDescription readSchemaFromFormat( SchemaReaderPtr schema_reader; size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference; size_t iterations = 0; + ColumnsDescription cached_columns; while (true) { bool is_eof = false; try { - buf = read_buffer_iterator(); + buf = read_buffer_iterator(cached_columns); if (!buf) break; is_eof = buf->eof(); @@ -142,6 +142,9 @@ ColumnsDescription readSchemaFromFormat( } } + if (!cached_columns.empty()) + return cached_columns; + if (names_and_types.empty()) throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}\nYou can specify the structure manually", exception_messages); @@ -236,4 +239,37 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header) return result; } +String getKeyForSchemaCache(const String & source, const String & format, const std::optional & format_settings, const ContextPtr & context) +{ + return getKeysForSchemaCache({source}, format, format_settings, context).front(); +} + +static String makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info) +{ + return source + "@@" + format + "@@" + additional_format_info; +} + +void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info) +{ + size_t additional_format_info_pos = key.rfind("@@"); + additional_format_info = key.substr(additional_format_info_pos + 2, key.size() - additional_format_info_pos - 2); + size_t format_pos = key.rfind("@@", additional_format_info_pos - 1); + format = key.substr(format_pos + 2, additional_format_info_pos - format_pos - 2); + source = key.substr(0, format_pos); +} + +Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional & format_settings, const ContextPtr & context) +{ + /// For some formats data schema depends on some settings, so it's possible that + /// two queries to the same source will get two different schemas. To process this + /// case we add some additional information specific for the format to the cache key. + /// For example, for Protobuf format additional information is the path to the schema + /// and message name. + String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings); + Strings cache_keys; + cache_keys.reserve(sources.size()); + std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); }); + return cache_keys; +} + } diff --git a/src/Formats/ReadSchemaUtils.h b/src/Formats/ReadSchemaUtils.h index b7558d5c22b..56b48823464 100644 --- a/src/Formats/ReadSchemaUtils.h +++ b/src/Formats/ReadSchemaUtils.h @@ -6,7 +6,7 @@ namespace DB { -using ReadBufferIterator = std::function()>; +using ReadBufferIterator = std::function(ColumnsDescription &)>; /// Try to determine the schema of the data in specifying format. /// For formats that have an external schema reader, it will @@ -46,4 +46,9 @@ DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type); /// Call makeNullableRecursivelyAndCheckForNothing for all types /// in the block and return names and types. NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header); + +String getKeyForSchemaCache(const String & source, const String & format, const std::optional & format_settings, const ContextPtr & context); +Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional & format_settings, const ContextPtr & context); + +void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info); } diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index d5abc4609ed..63496054e85 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -207,21 +207,7 @@ namespace detail return *file_size; Poco::Net::HTTPResponse response; - for (size_t i = 0; i < settings.http_max_tries; ++i) - { - try - { - callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD); - break; - } - catch (const Poco::Exception & e) - { - if (i == settings.http_max_tries - 1) - throw; - - LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText()); - } - } + getHeadResponse(response); if (response.hasContentLength()) { @@ -250,6 +236,25 @@ namespace detail InitializeError initialization_error = InitializeError::NONE; private: + void getHeadResponse(Poco::Net::HTTPResponse & response) + { + for (size_t i = 0; i < settings.http_max_tries; ++i) + { + try + { + callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD); + break; + } + catch (const Poco::Exception & e) + { + if (i == settings.http_max_tries - 1) + throw; + + LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText()); + } + } + } + void setupExternalBuffer() { /** @@ -669,6 +674,22 @@ namespace detail } const std::string & getCompressionMethod() const { return content_encoding; } + + std::optional getLastModificationTime() + { + Poco::Net::HTTPResponse response; + getHeadResponse(response); + if (!response.has("Last-Modified")) + return std::nullopt; + + String date_str = response.get("Last-Modified"); + struct tm info; + char * res = strptime(date_str.data(), "%a, %d %b %Y %H:%M:%S %Z", &info); + if (!res || res != date_str.data() + date_str.size()) + return std::nullopt; + + return timegm(&info); + } }; } diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index cea03277c91..fb9cff5d109 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -790,7 +790,8 @@ namespace S3 quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : ""); } - size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) + + S3::ObjectInfo getObjectInfo(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) { Aws::S3::Model::HeadObjectRequest req; req.SetBucket(bucket); @@ -804,13 +805,18 @@ namespace S3 if (outcome.IsSuccess()) { auto read_result = outcome.GetResultWithOwnership(); - return static_cast(read_result.GetContentLength()); + return {.size = static_cast(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000}; } else if (throw_on_error) { throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } - return 0; + return {}; + } + + size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error) + { + return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error).size; } } diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index 327730d9740..46a09ee8901 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -78,6 +78,14 @@ struct URI static void validateBucket(const String & bucket, const Poco::URI & uri); }; +struct ObjectInfo +{ + size_t size = 0; + time_t last_modification_time = 0; +}; + +S3::ObjectInfo getObjectInfo(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true); + size_t getObjectSize(std::shared_ptr client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true); } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index f94dd998c91..d9310169ac7 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -45,6 +45,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -326,6 +330,29 @@ BlockIO InterpreterSystemQuery::execute() } break; } + case Type::DROP_SCHEMA_CACHE: + { + getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE); + std::unordered_set caches_to_drop; + if (query.schema_cache_storage.empty()) + caches_to_drop = {"FILE", "S3", "HDFS", "URL"}; + else + caches_to_drop = {query.schema_cache_storage}; + + if (caches_to_drop.contains("FILE")) + StorageFile::getSchemaCache(getContext()).clear(); +#if USE_AWS_S3 + if (caches_to_drop.contains("S3")) + StorageS3::getSchemaCache(getContext()).clear(); +#endif +#if USE_HDFS + if (caches_to_drop.contains("HDFS")) + StorageHDFS::getSchemaCache(getContext()).clear(); +#endif + if (caches_to_drop.contains("URL")) + StorageURL::getSchemaCache(getContext()).clear(); + break; + } case Type::RELOAD_DICTIONARY: { getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); @@ -833,6 +860,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_INDEX_MARK_CACHE: case Type::DROP_INDEX_UNCOMPRESSED_CACHE: case Type::DROP_FILESYSTEM_CACHE: + case Type::DROP_SCHEMA_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); break; diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 6ea2204a452..f2c3fa8ece5 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -29,6 +29,7 @@ public: DROP_COMPILED_EXPRESSION_CACHE, #endif DROP_FILESYSTEM_CACHE, + DROP_SCHEMA_CACHE, STOP_LISTEN_QUERIES, START_LISTEN_QUERIES, RESTART_REPLICAS, @@ -97,6 +98,8 @@ public: String backup_name; + String schema_cache_storage; + String getID(char) const override { return "SYSTEM query"; } ASTPtr clone() const override diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index 7854f7a2400..26ba9290d04 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -363,6 +363,23 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & parseQueryWithOnCluster(res, pos, expected); break; } + case Type::DROP_SCHEMA_CACHE: + { + if (ParserKeyword{"FOR"}.ignore(pos, expected)) + { + if (ParserKeyword{"FILE"}.ignore(pos, expected)) + res->schema_cache_storage = "FILE"; + else if (ParserKeyword{"S3"}.ignore(pos, expected)) + res->schema_cache_storage = "S3"; + else if (ParserKeyword{"HDFS"}.ignore(pos, expected)) + res->schema_cache_storage = "HDFS"; + else if (ParserKeyword{"URL"}.ignore(pos, expected)) + res->schema_cache_storage = "URL"; + else + return false; + } + break; + } case Type::UNFREEZE: { diff --git a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp index 3916a8e4ba6..e94ac27870f 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.cpp @@ -318,6 +318,8 @@ void registerInputFormatCapnProto(FormatFactory & factory) }); factory.markFormatSupportsSubsetOfColumns("CapnProto"); factory.registerFileExtension("capnp", "CapnProto"); + factory.registerAdditionalInfoForSchemaCacheGetter( + "CapnProto", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } void registerCapnProtoSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp b/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp index 8e1beb8ec89..8c24f22ce13 100644 --- a/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/MySQLDumpRowInputFormat.cpp @@ -452,6 +452,9 @@ void registerInputFormatMySQLDump(FormatFactory & factory) { return std::make_shared(buf, header, params, settings); }); + + factory.registerAdditionalInfoForSchemaCacheGetter( + "MySQLDump", [](const FormatSettings & settings) { return "Table name: " + settings.mysql_dump.table_name; }); } void registerMySQLSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp index d4d80fe3a23..7af0d8a7094 100644 --- a/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufListInputFormat.cpp @@ -81,6 +81,8 @@ void registerInputFormatProtobufList(FormatFactory & factory) FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers); }); factory.markFormatSupportsSubsetOfColumns("ProtobufList"); + factory.registerAdditionalInfoForSchemaCacheGetter( + "ProtobufList", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } void registerProtobufListSchemaReader(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index f4329b7ecfe..5fd4bb79fa0 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -101,6 +101,10 @@ void registerProtobufSchemaReader(FormatFactory & factory) { return std::make_shared(settings); }); + + for (const auto & name : {"Protobuf", "ProtobufSingle"}) + factory.registerAdditionalInfoForSchemaCacheGetter( + name, [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; }); } } diff --git a/src/Storages/Cache/SchemaCache.cpp b/src/Storages/Cache/SchemaCache.cpp new file mode 100644 index 00000000000..7bce1965f1a --- /dev/null +++ b/src/Storages/Cache/SchemaCache.cpp @@ -0,0 +1,114 @@ +#include +#include +#include + +namespace ProfileEvents +{ + extern const Event SchemaInferenceCacheHits; + extern const Event SchemaInferenceCacheMisses; + extern const Event SchemaInferenceCacheEvictions; + extern const Event SchemaInferenceCacheInvalidations; +} + +namespace DB +{ + +SchemaCache::SchemaCache(size_t max_elements_) : max_elements(max_elements_) +{ +} + +void SchemaCache::add(const String & key, const ColumnsDescription & columns) +{ + std::lock_guard lock(mutex); + addUnlocked(key, columns); +} + + +void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns) +{ + std::lock_guard lock(mutex); + for (const auto & key : keys) + addUnlocked(key, columns); +} + +void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns) +{ + /// Do nothing if this key is already in cache; + if (data.contains(key)) + return; + + time_t now = std::time(nullptr); + auto it = queue.insert(queue.end(), key); + data[key] = {SchemaInfo{columns, now}, it}; + checkOverflow(); +} + +void SchemaCache::checkOverflow() +{ + if (queue.size() <= max_elements) + return; + + auto key = queue.front(); + data.erase(key); + queue.pop_front(); + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheEvictions); +} + +std::optional SchemaCache::tryGet(const String & key, LastModificationTimeGetter get_last_mod_time) +{ + std::lock_guard lock(mutex); + auto it = data.find(key); + if (it == data.end()) + { + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheMisses); + return std::nullopt; + } + + auto & schema_info = it->second.schema_info; + auto & queue_iterator = it->second.iterator; + if (get_last_mod_time) + { + /// It's important to call get_last_mod_time only if we have key in cache, + /// because this function can do some heavy operations. + auto last_mod_time = get_last_mod_time(); + /// If get_last_mod_time function was provided but it returned nullopt, it means that + /// it failed to get last modification time, so we cannot safely use value from cache. + if (!last_mod_time) + return std::nullopt; + + if (*last_mod_time >= schema_info.registration_time) + { + /// Object was modified after it was added in cache. + /// So, stored value is no more valid and we should remove it. + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations); + queue.erase(queue_iterator); + data.erase(key); + return std::nullopt; + } + } + + /// Move key to the end of queue. + queue.splice(queue.end(), queue, queue_iterator); + + ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits); + return schema_info.columns; +} + +void SchemaCache::clear() +{ + std::lock_guard lock(mutex); + data.clear(); + queue.clear(); +} + +std::unordered_map SchemaCache::getAll() +{ + std::lock_guard lock(mutex); + std::unordered_map result; + for (const auto & [key, value] : data) + result[key] = value.schema_info; + + return result; +} + +} diff --git a/src/Storages/Cache/SchemaCache.h b/src/Storages/Cache/SchemaCache.h new file mode 100644 index 00000000000..132fbc0a8cb --- /dev/null +++ b/src/Storages/Cache/SchemaCache.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +const size_t DEFAULT_SCHEMA_CACHE_ELEMENTS = 4096; + +/// Cache that stores columns description by some string key. It's used in schema inference. +/// It implements LRU semantic: after each access to a key in cache we move this key to +/// the end of the queue, if we reached the limit of maximum elements in the cache we +/// remove keys from the beginning of the queue. +/// It also supports keys invalidations by last modification time. If last modification time +/// is provided and last modification happened after a key was added to the cache, this key +/// will be removed from cache. +class SchemaCache +{ +public: + SchemaCache(size_t max_elements_); + + struct SchemaInfo + { + ColumnsDescription columns; + time_t registration_time; + }; + + using LastModificationTimeGetter = std::function()>; + + /// Add new key with a schema + void add(const String & key, const ColumnsDescription & columns); + + /// Add many keys with the same schema (usually used for globs) + void addMany(const Strings & keys, const ColumnsDescription & columns); + + std::optional tryGet(const String & key, LastModificationTimeGetter get_last_mod_time = {}); + + void clear(); + + std::unordered_map getAll(); + +private: + void addUnlocked(const String & key, const ColumnsDescription & columns); + void checkOverflow(); + + using Queue = std::list; + using QueueIterator = Queue::iterator; + + struct Cell + { + SchemaInfo schema_info; + QueueIterator iterator; + }; + + Queue queue; + std::unordered_map data; + + size_t max_elements; + std::mutex mutex; +}; + +} diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 5098dfd3ef1..c18726aa7c4 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -65,7 +65,7 @@ namespace /* Recursive directory listing with matched paths as a result. * Have the same method in StorageFile. */ - Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match) + Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match, std::unordered_map * last_mod_times) { const size_t first_glob = for_match.find_first_of("*?{"); @@ -100,13 +100,15 @@ namespace if (re2::RE2::FullMatch(file_name, matcher)) { result.push_back(String(ls.file_info[i].mName)); + if (last_mod_times) + (*last_mod_times)[result.back()] = ls.file_info[i].mLastMod; } } else if (is_directory && looking_for_directory) { if (re2::RE2::FullMatch(file_name, matcher)) { - Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash)); + Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash), last_mod_times); /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. std::move(result_part.begin(), result_part.end(), std::back_inserter(result)); } @@ -122,12 +124,12 @@ namespace return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)}; } - std::vector getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context) + std::vector getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context, std::unordered_map * last_mod_times = nullptr) { HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef()); HDFSFSPtr fs = createHDFSFS(builder.get()); - return LSWithRegexpMatching("/", fs, path_from_uri); + return LSWithRegexpMatching("/", fs, path_from_uri, last_mod_times); } } @@ -186,7 +188,8 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( ContextPtr ctx) { const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri); - auto paths = getPathsList(path_from_uri, uri, ctx); + std::unordered_map last_mod_time; + auto paths = getPathsList(path_from_uri, uri, ctx, &last_mod_time); if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format)) throw Exception( ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, @@ -194,7 +197,11 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( "specify table structure manually", format); - ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr + std::optional columns_from_cache; + if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs) + columns_from_cache = tryGetColumnsFromCache(paths, path_from_uri, last_mod_time, format, ctx); + + ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == paths.end()) return nullptr; @@ -203,7 +210,17 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max; return wrapReadBufferWithCompressionMethod(std::move(impl), compression, zstd_window_log_max); }; - return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx); + + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx); + + if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs) + addColumnsToCache(paths, path_from_uri, columns, format, ctx); + + return columns; } class HDFSSource::DisclosedGlobIterator::Impl @@ -732,6 +749,55 @@ NamesAndTypesList StorageHDFS::getVirtuals() const return virtual_columns; } +SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx) +{ + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_hdfs", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + +std::optional StorageHDFS::tryGetColumnsFromCache( + const Strings & paths, + const String & uri_without_path, + std::unordered_map & last_mod_time, + const String & format_name, + const ContextPtr & ctx) +{ + auto & schema_cache = getSchemaCache(ctx); + for (const auto & path : paths) + { + auto get_last_mod_time = [&]() -> std::optional + { + auto it = last_mod_time.find(path); + if (it == last_mod_time.end()) + return std::nullopt; + return it->second; + }; + + String url = fs::path(uri_without_path) / path; + String cache_key = getKeyForSchemaCache(url, format_name, {}, ctx); + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageHDFS::addColumnsToCache( + const Strings & paths, + const String & uri_without_path, + const ColumnsDescription & columns, + const String & format_name, + const ContextPtr & ctx) +{ + auto & schema_cache = getSchemaCache(ctx); + Strings sources; + sources.reserve(paths.size()); + std::transform(paths.begin(), paths.end(), std::back_inserter(sources), [&](const String & path){ return fs::path(uri_without_path) / path; }); + Strings cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx); + schema_cache.addMany(cache_keys, columns); +} + } #endif diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index d987820b844..a0d61f4bd2a 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -65,10 +66,26 @@ public: const String & compression_method, ContextPtr ctx); + static SchemaCache & getSchemaCache(const ContextPtr & ctx); + protected: friend class HDFSSource; private: + static std::optional tryGetColumnsFromCache( + const Strings & paths, + const String & uri_without_path, + std::unordered_map & last_mod_time, + const String & format_name, + const ContextPtr & ctx); + + static void addColumnsToCache( + const Strings & paths, + const String & uri_without_path, + const ColumnsDescription & columns, + const String & format_name, + const ContextPtr & ctx); + std::vector uris; String format_name; String compression_method; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 4f66c76f1eb..f4cf1fe5c58 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -264,7 +264,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c /// in case of file descriptor we have a stream of data and we cannot /// start reading data from the beginning after reading some data for /// schema inference. - ReadBufferIterator read_buffer_iterator = [&]() + ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &) { /// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// where we can store the original read buffer. @@ -308,7 +308,11 @@ ColumnsDescription StorageFile::getTableStructureFromFile( "table structure manually", format); - ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr + std::optional columns_from_cache; + if (context->getSettingsRef().schema_inference_use_cache_for_file) + columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context); + + ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == paths.end()) return nullptr; @@ -316,7 +320,16 @@ ColumnsDescription StorageFile::getTableStructureFromFile( return createReadBuffer(*it++, false, "File", -1, compression_method, context); }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); + + if (context->getSettingsRef().schema_inference_use_cache_for_file) + addColumnsToCache(paths, columns, format, format_settings, context); + + return columns; } bool StorageFile::supportsSubsetOfColumns() const @@ -1221,4 +1234,48 @@ NamesAndTypesList StorageFile::getVirtuals() const {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; } + +SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context) +{ + static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + +std::optional StorageFile::tryGetColumnsFromCache( + const Strings & paths, const String & format_name, const std::optional & format_settings, ContextPtr context) +{ + /// Check if the cache contains one of the paths. + auto & schema_cache = getSchemaCache(context); + struct stat file_stat{}; + for (const auto & path : paths) + { + auto get_last_mod_time = [&]() -> std::optional + { + if (0 != stat(path.c_str(), &file_stat)) + return std::nullopt; + + return file_stat.st_mtime; + }; + + String cache_key = getKeyForSchemaCache(path, format_name, format_settings, context); + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageFile::addColumnsToCache( + const Strings & paths, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context) +{ + auto & schema_cache = getSchemaCache(context); + Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context); + schema_cache.addMany(cache_keys, columns); +} + } diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index f47f6172c1c..e60e5f6b371 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -86,6 +87,8 @@ public: const std::optional & format_settings, ContextPtr context); + static SchemaCache & getSchemaCache(const ContextPtr & context); + protected: friend class StorageFileSource; friend class StorageFileSink; @@ -93,6 +96,16 @@ protected: private: void setStorageMetadata(CommonArguments args); + static std::optional tryGetColumnsFromCache( + const Strings & paths, const String & format_name, const std::optional & format_settings, ContextPtr context); + + static void addColumnsToCache( + const Strings & paths, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context); + std::string format_name; // We use format settings from global context + CREATE query for File table // function -- in this case, format_settings is set. diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index aed3f541d47..fea45dfc8fd 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -90,12 +90,16 @@ public: const S3::URI & globbed_uri_, ASTPtr & query_, const Block & virtual_header_, - ContextPtr context_) + ContextPtr context_, + std::unordered_map * object_infos_, + Strings * read_keys_) : WithContext(context_) , client(client_) , globbed_uri(globbed_uri_) , query(query_) , virtual_header(virtual_header_) + , object_infos(object_infos_) + , read_keys(read_keys_) { if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION); @@ -187,6 +191,8 @@ private: if (re2::RE2::FullMatch(key, *matcher)) { String path = fs::path(globbed_uri.bucket) / key; + if (object_infos) + (*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000}; String file = path.substr(path.find_last_of('/') + 1); if (path_column) path_column->insert(path); @@ -221,6 +227,9 @@ private: /// It returns false when all objects were returned is_finished = !outcome.GetResult().GetIsTruncated(); + + if (read_keys) + read_keys->insert(read_keys->end(), buffer.begin(), buffer.end()); } std::mutex mutex; @@ -235,6 +244,8 @@ private: Aws::S3::Model::ListObjectsV2Outcome outcome; std::unique_ptr matcher; bool is_finished{false}; + std::unordered_map * object_infos; + Strings * read_keys; }; StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( @@ -242,8 +253,10 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, - ContextPtr context) - : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_header, context)) + ContextPtr context, + std::unordered_map * object_infos_, + Strings * read_keys_) + : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_)) { } @@ -395,7 +408,8 @@ StorageS3Source::StorageS3Source( const String & bucket_, const String & version_id_, std::shared_ptr file_iterator_, - const size_t download_thread_num_) + const size_t download_thread_num_, + const std::unordered_map & object_infos_) : ISource(getHeader(sample_block_, requested_virtual_columns_)) , WithContext(context_) , name(std::move(name_)) @@ -412,6 +426,7 @@ StorageS3Source::StorageS3Source( , requested_virtual_columns(requested_virtual_columns_) , file_iterator(file_iterator_) , download_thread_num(download_thread_num_) + , object_infos(object_infos_) { initialize(); } @@ -455,7 +470,12 @@ bool StorageS3Source::initialize() std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & key) { - const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false); + size_t object_size; + auto it = object_infos.find(fs::path(bucket) / key); + if (it != object_infos.end()) + object_size = it->second.size; + else + object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false); auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1; @@ -806,27 +826,34 @@ std::shared_ptr StorageS3::createFileIterator( ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks) + const std::vector & read_tasks, + std::unordered_map * object_infos, + Strings * read_keys) { if (distributed_processing) { return std::make_shared( - [read_tasks_iterator = std::make_shared(read_tasks, local_context->getReadTaskCallback())]() -> String + [read_tasks_iterator = std::make_shared(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String { - return read_tasks_iterator->next(); + auto key = read_tasks_iterator->next(); + if (read_keys) + read_keys->push_back(key); + return key; }); } else if (is_key_with_globs) { /// Iterate through disclosed globs and make a source for each file auto glob_iterator = std::make_shared( - *s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context); + *s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys); return std::make_shared([glob_iterator]() { return glob_iterator->next(); }); } else { auto keys_iterator = std::make_shared(keys, s3_configuration.uri.bucket, query, virtual_block, local_context); + if (read_keys) + *read_keys = keys; return std::make_shared([keys_iterator]() { return keys_iterator->next(); }); } } @@ -871,7 +898,8 @@ Pipe StorageS3::read( local_context, query_info.query, virtual_block, - read_tasks_used_in_schema_inference); + read_tasks_used_in_schema_inference, + &object_infos); ColumnsDescription columns_description; Block block_for_format; @@ -914,7 +942,8 @@ Pipe StorageS3::read( s3_configuration.uri.bucket, s3_configuration.uri.version_id, iterator_wrapper, - max_download_threads)); + max_download_threads, + object_infos)); } auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -1151,11 +1180,12 @@ ColumnsDescription StorageS3::getTableStructureFromData( const String & compression_method, bool distributed_processing, const std::optional & format_settings, - ContextPtr ctx) + ContextPtr ctx, + std::unordered_map * object_infos) { S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) }; updateS3Configuration(ctx, s3_configuration); - return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx); + return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos); } ColumnsDescription StorageS3::getTableStructureFromDataImpl( @@ -1166,12 +1196,20 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing) + std::vector * read_keys_in_distributed_processing, + std::unordered_map * object_infos) { - auto file_iterator - = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}); + std::vector read_keys; - ReadBufferIterator read_buffer_iterator = [&, first = true]() mutable -> std::unique_ptr + auto file_iterator + = createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys); + + std::optional columns_from_cache; + size_t prev_read_keys_size = read_keys.size(); + if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) + columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx); + + ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr { auto key = (*file_iterator)(); @@ -1187,8 +1225,17 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( return nullptr; } - if (distributed_processing && read_keys_in_distributed_processing) - read_keys_in_distributed_processing->push_back(key); + /// S3 file iterator could get new keys after new iteration, check them in schema cache. + if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) + { + columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx); + prev_read_keys_size = read_keys.size(); + if (columns_from_cache) + { + cached_columns = *columns_from_cache; + return nullptr; + } + } first = false; const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max; @@ -1199,7 +1246,19 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( zstd_window_log_max); }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); + + if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) + addColumnsToCache(read_keys, s3_configuration, columns, format, format_settings, ctx); + + if (distributed_processing && read_keys_in_distributed_processing) + *read_keys_in_distributed_processing = std::move(read_keys); + + return columns; } @@ -1288,6 +1347,74 @@ bool StorageS3::supportsPartitionBy() const return true; } +SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) +{ + static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + +std::optional StorageS3::tryGetColumnsFromCache( + const Strings::const_iterator & begin, + const Strings::const_iterator & end, + const S3Configuration & s3_configuration, + std::unordered_map * object_infos, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & ctx) +{ + auto & schema_cache = getSchemaCache(ctx); + for (auto it = begin; it < end; ++it) + { + String path = fs::path(s3_configuration.uri.bucket) / *it; + auto get_last_mod_time = [&]() -> std::optional + { + S3::ObjectInfo info; + /// Check if we already have information about this object. + /// If no, request it and remember for possible future usage. + if (object_infos && object_infos->contains(path)) + info = (*object_infos)[path]; + else + { + /// Note that in case of exception in getObjectInfo returned info will be empty, + /// but schema cache will handle this case and won't return columns from cache + /// because we can't say that it's valid without last modification time. + info = S3::getObjectInfo(s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false); + if (object_infos) + (*object_infos)[path] = info; + } + + if (info.last_modification_time) + return info.last_modification_time; + return std::nullopt; + }; + + String source = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path; + String cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx); + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void StorageS3::addColumnsToCache( + const Strings & keys, + const S3Configuration & s3_configuration, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & ctx) +{ + auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket; + Strings sources; + sources.reserve(keys.size()); + std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; }); + Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); + auto & schema_cache = getSchemaCache(ctx); + schema_cache.addMany(cache_keys, columns); +} + } #endif diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index c0e5b80c709..3a02237570d 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -18,6 +18,7 @@ #include #include #include +#include namespace Aws::S3 { @@ -40,7 +41,9 @@ public: const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, - ContextPtr context); + ContextPtr context, + std::unordered_map * object_infos = nullptr, + Strings * read_keys_ = nullptr); String next(); @@ -94,7 +97,8 @@ public: const String & bucket, const String & version_id, std::shared_ptr file_iterator_, - size_t download_thread_num); + size_t download_thread_num, + const std::unordered_map & object_infos_); String getName() const override; @@ -128,6 +132,8 @@ private: Poco::Logger * log = &Poco::Logger::get("StorageS3Source"); + std::unordered_map object_infos; + /// Recreate ReadBuffer and Pipeline for each file. bool initialize(); @@ -190,7 +196,8 @@ public: const String & compression_method, bool distributed_processing, const std::optional & format_settings, - ContextPtr ctx); + ContextPtr ctx, + std::unordered_map * object_infos = nullptr); static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector> & key_value_args); @@ -204,6 +211,8 @@ public: S3Settings::ReadWriteSettings rw_settings; }; + static SchemaCache & getSchemaCache(const ContextPtr & ctx); + private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; @@ -223,6 +232,8 @@ private: std::vector read_tasks_used_in_schema_inference; + std::unordered_map object_infos; + static void updateS3Configuration(ContextPtr, S3Configuration &); static std::shared_ptr createFileIterator( @@ -233,7 +244,9 @@ private: ContextPtr local_context, ASTPtr query, const Block & virtual_block, - const std::vector & read_tasks = {}); + const std::vector & read_tasks = {}, + std::unordered_map * object_infos = nullptr, + Strings * read_keys = nullptr); static ColumnsDescription getTableStructureFromDataImpl( const String & format, @@ -243,9 +256,27 @@ private: bool is_key_with_globs, const std::optional & format_settings, ContextPtr ctx, - std::vector * read_keys_in_distributed_processing = nullptr); + std::vector * read_keys_in_distributed_processing = nullptr, + std::unordered_map * object_infos = nullptr); bool supportsSubsetOfColumns() const override; + + static std::optional tryGetColumnsFromCache( + const Strings::const_iterator & begin, + const Strings::const_iterator & end, + const S3Configuration & s3_configuration, + std::unordered_map * object_infos, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & ctx); + + static void addColumnsToCache( + const Strings & keys, + const S3Configuration & s3_configuration, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & ctx); }; } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index acf7444dca4..8b115f5824e 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include @@ -592,8 +591,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( urls_to_check = {uri}; } + std::optional columns_from_cache; + if (context->getSettingsRef().schema_inference_use_cache_for_url) + columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context); - ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr + ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr { if (it == urls_to_check.cend()) return nullptr; @@ -616,7 +618,16 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( return buf; }; - return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); + ColumnsDescription columns; + if (columns_from_cache) + columns = *columns_from_cache; + else + columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); + + if (context->getSettingsRef().schema_inference_use_cache_for_url) + addColumnsToCache(urls_to_check, columns, format, format_settings, context); + + return columns; } bool IStorageURLBase::supportsSubsetOfColumns() const @@ -797,6 +808,87 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad } } +SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context) +{ + static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + +std::optional IStorageURLBase::tryGetColumnsFromCache( + const Strings & urls, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + const Poco::Net::HTTPBasicCredentials & credentials, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context) +{ + auto & schema_cache = getSchemaCache(context); + for (const auto & url : urls) + { + auto get_last_mod_time = [&]() -> std::optional + { + auto last_mod_time = getLastModificationTime(url, headers, credentials, context); + /// Some URLs could not have Last-Modified header, in this case we cannot be sure that + /// data wasn't changed after adding it's schema to cache. Use schema from cache only if + /// special setting for this case is enabled. + if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url) + return 0; + return last_mod_time; + }; + + String cache_key = getKeyForSchemaCache(url, format_name, format_settings, context); + auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); + if (columns) + return columns; + } + + return std::nullopt; +} + +void IStorageURLBase::addColumnsToCache( + const Strings & urls, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context) +{ + auto & schema_cache = getSchemaCache(context); + Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context); + schema_cache.addMany(cache_keys, columns); +} + +std::optional IStorageURLBase::getLastModificationTime( + const String & url, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + const Poco::Net::HTTPBasicCredentials & credentials, + const ContextPtr & context) +{ + try + { + ReadWriteBufferFromHTTP buf( + Poco::URI(url), + Poco::Net::HTTPRequest::HTTP_GET, + {}, + ConnectionTimeouts::getHTTPTimeouts(context), + credentials, + context->getSettingsRef().max_http_get_redirects, + DBMS_DEFAULT_BUFFER_SIZE, + context->getReadSettings(), + headers, + ReadWriteBufferFromHTTP::Range{}, + &context->getRemoteHostFilter(), + true, + false, + false); + + return buf.getLastModificationTime(); + } + catch (...) + { + return std::nullopt; + } +} + StorageURL::StorageURL( const String & uri_, const StorageID & table_id_, diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 0198eda9e67..fb0d8e8fa43 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -48,6 +49,8 @@ public: const std::optional & format_settings, ContextPtr context); + static SchemaCache & getSchemaCache(const ContextPtr & context); + protected: IStorageURLBase( const String & uri_, @@ -97,6 +100,27 @@ protected: private: virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; + + static std::optional tryGetColumnsFromCache( + const Strings & urls, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + const Poco::Net::HTTPBasicCredentials & credentials, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context); + + static void addColumnsToCache( + const Strings & urls, + const ColumnsDescription & columns, + const String & format_name, + const std::optional & format_settings, + const ContextPtr & context); + + static std::optional getLastModificationTime( + const String & url, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + const Poco::Net::HTTPBasicCredentials & credentials, + const ContextPtr & context); }; class StorageURLSink : public SinkToStorage diff --git a/src/Storages/System/StorageSystemSchemaInferenceCache.cpp b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp new file mode 100644 index 00000000000..1a32128c7e0 --- /dev/null +++ b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +static String getSchemaString(const ColumnsDescription & columns) +{ + WriteBufferFromOwnString buf; + const auto & names_and_types = columns.getAll(); + for (auto it = names_and_types.begin(); it != names_and_types.end(); ++it) + { + if (it != names_and_types.begin()) + writeCString(", ", buf); + writeString(it->name, buf); + writeChar(' ', buf); + writeString(it->type->getName(), buf); + } + + return buf.str(); +} + +NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes() +{ + return { + {"storage", std::make_shared()}, + {"source", std::make_shared()}, + {"format", std::make_shared()}, + {"additional_format_info", std::make_shared()}, + {"registration_time", std::make_shared()}, + {"schema", std::make_shared()} + }; +} + + +static void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name) +{ + auto s3_schema_cache_data = schema_cache.getAll(); + String source; + String format; + String additional_format_info; + for (const auto & [key, schema_info] : s3_schema_cache_data) + { + splitSchemaCacheKey(key, source, format, additional_format_info); + res_columns[0]->insert(storage_name); + res_columns[1]->insert(source); + res_columns[2]->insert(format); + res_columns[3]->insert(additional_format_info); + res_columns[4]->insert(schema_info.registration_time); + res_columns[5]->insert(getSchemaString(schema_info.columns)); + } +} + +void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +{ + fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File"); +#if USE_AWS_S3 + fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3"); +#endif +#if USE_HDFS + fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS"); +#endif + fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL"); +} + +} diff --git a/src/Storages/System/StorageSystemSchemaInferenceCache.h b/src/Storages/System/StorageSystemSchemaInferenceCache.h new file mode 100644 index 00000000000..357bd687da6 --- /dev/null +++ b/src/Storages/System/StorageSystemSchemaInferenceCache.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class StorageSystemSchemaInferenceCache final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemSettingsChanges"; } + + static NamesAndTypesList getNamesAndTypes(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index dbef2df953b..ab1ffdf209a 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -74,6 +74,7 @@ #include #include #include +#include #ifdef OS_LINUX #include @@ -133,6 +134,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database) attach(context, system_database, "licenses"); attach(context, system_database, "time_zones"); attach(context, system_database, "backups"); + attach(context, system_database, "schema_inference_cache"); #ifdef OS_LINUX attach(context, system_database, "stack_trace"); #endif diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index d47f8353e18..0aac91d6095 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -50,7 +50,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const { - ReadBufferIterator read_buffer_iterator = [&]() + ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &) { return std::make_unique(data); }; diff --git a/tests/integration/test_file_schema_inference_cache/__init__.py b/tests/integration/test_file_schema_inference_cache/__init__.py new file mode 100755 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml b/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml new file mode 100644 index 00000000000..de7e27921f0 --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/configs/config.d/query_log.xml @@ -0,0 +1,9 @@ + + + + system + query_log
+ toYYYYMM(event_date) + 300 +
+
diff --git a/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml b/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml new file mode 100644 index 00000000000..de3c4cbe57f --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/configs/config.d/schema_cache.xml @@ -0,0 +1,4 @@ + + + 2 + diff --git a/tests/integration/test_file_schema_inference_cache/test.py b/tests/integration/test_file_schema_inference_cache/test.py new file mode 100755 index 00000000000..30c9a788d6f --- /dev/null +++ b/tests/integration/test_file_schema_inference_cache/test.py @@ -0,0 +1,151 @@ +import pytest +import time +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance( + "node", + stay_alive=True, + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/schema_cache.xml", + ], +) + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def get_profile_event_for_query(node, query, profile_event): + node.query("system flush logs") + query = query.replace("'", "\\'") + return int( + node.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1" + ) + ) + + +def check_cache_misses(node, file, amount=1): + assert ( + get_profile_event_for_query( + node, f"desc file('{file}')", "SchemaInferenceCacheMisses" + ) + == amount + ) + + +def check_cache_hits(node, file, amount=1): + assert ( + get_profile_event_for_query( + node, f"desc file('{file}')", "SchemaInferenceCacheHits" + ) + == amount + ) + + +def check_cache_invalidations(node, file, amount=1): + assert ( + get_profile_event_for_query( + node, f"desc file('{file}')", "SchemaInferenceCacheInvalidations" + ) + == amount + ) + + +def check_cache_evictions(node, file, amount=1): + assert ( + get_profile_event_for_query( + node, f"desc file('{file}')", "SchemaInferenceCacheEvictions" + ) + == amount + ) + + +def check_cache(node, expected_files): + sources = node.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted( + expected_files + ) + + +def test(start_cluster): + node.query("insert into function file('data.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data.jsonl')") + check_cache(node, ["data.jsonl"]) + check_cache_misses(node, "data.jsonl") + + node.query("desc file('data.jsonl')") + check_cache_hits(node, "data.jsonl") + + node.query("insert into function file('data.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data.jsonl')") + check_cache_invalidations(node, "data.jsonl") + + node.query("insert into function file('data1.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data1.jsonl')") + check_cache(node, ["data.jsonl", "data1.jsonl"]) + check_cache_misses(node, "data1.jsonl") + + node.query("desc file('data1.jsonl')") + check_cache_hits(node, "data1.jsonl") + + node.query("insert into function file('data2.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data2.jsonl')") + check_cache(node, ["data1.jsonl", "data2.jsonl"]) + check_cache_misses(node, "data2.jsonl") + check_cache_evictions(node, "data2.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache_hits(node, "data2.jsonl") + + node.query("desc file('data1.jsonl')") + check_cache_hits(node, "data1.jsonl") + + node.query("desc file('data.jsonl')") + check_cache(node, ["data.jsonl", "data1.jsonl"]) + check_cache_misses(node, "data.jsonl") + check_cache_evictions(node, "data.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache(node, ["data.jsonl", "data2.jsonl"]) + check_cache_misses(node, "data2.jsonl") + check_cache_evictions(node, "data2.jsonl") + + node.query("desc file('data2.jsonl')") + check_cache_hits(node, "data2.jsonl") + + node.query("desc file('data.jsonl')") + check_cache_hits(node, "data.jsonl") + + node.query("insert into function file('data3.jsonl') select * from numbers(100)") + time.sleep(1) + + node.query("desc file('data*.jsonl')") + check_cache_hits(node, "data*.jsonl") + + node.query("system drop schema cache for file") + check_cache(node, []) + + node.query("desc file('data*.jsonl')") + check_cache_misses(node, "data*.jsonl", 4) + + node.query("system drop schema cache") + check_cache(node, []) + + node.query("desc file('data*.jsonl')") + check_cache_misses(node, "data*.jsonl", 4) diff --git a/tests/integration/test_storage_hdfs/configs/schema_cache.xml b/tests/integration/test_storage_hdfs/configs/schema_cache.xml new file mode 100644 index 00000000000..37639649b5f --- /dev/null +++ b/tests/integration/test_storage_hdfs/configs/schema_cache.xml @@ -0,0 +1,3 @@ + + 2 + \ No newline at end of file diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index a9e6cbda67c..86fb5ab578c 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -1,13 +1,16 @@ import os import pytest +import time from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV from pyhdfs import HdfsClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( - "node1", main_configs=["configs/macro.xml"], with_hdfs=True + "node1", + main_configs=["configs/macro.xml", "configs/schema_cache.xml"], + with_hdfs=True, ) @@ -550,7 +553,9 @@ def test_schema_inference_with_globs(started_cluster): filename = "data{1,3}.jsoncompacteachrow" - result = node1.query_and_get_error(f"desc hdfs('hdfs://hdfs1:9000/{filename}')") + result = node1.query_and_get_error( + f"desc hdfs('hdfs://hdfs1:9000/{filename}') settings schema_inference_use_cache_for_hdfs=0" + ) assert "All attempts to extract table structure from files failed" in result @@ -558,10 +563,8 @@ def test_schema_inference_with_globs(started_cluster): f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'" ) - url_filename = "data{0,1,2,3}.jsoncompacteachrow" - result = node1.query_and_get_error( - f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')" + f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings schema_inference_use_cache_for_hdfs=0" ) assert ( @@ -628,6 +631,158 @@ def test_virtual_columns_2(started_cluster): assert result.strip() == "kek" +def get_profile_event_for_query(node, query, profile_event): + node.query("system flush logs") + query = query.replace("'", "\\'") + return int( + node.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1" + ) + ) + + +def check_cache_misses(node1, file, amount=1): + assert ( + get_profile_event_for_query( + node1, + f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheMisses", + ) + == amount + ) + + +def check_cache_hits(node1, file, amount=1): + assert ( + get_profile_event_for_query( + node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", "SchemaInferenceCacheHits" + ) + == amount + ) + + +def check_cache_invalidations(node1, file, amount=1): + assert ( + get_profile_event_for_query( + node1, + f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheInvalidations", + ) + == amount + ) + + +def check_cache_evictions(node1, file, amount=1): + assert ( + get_profile_event_for_query( + node1, + f"desc hdfs('hdfs://hdfs1:9000/{file}')", + "SchemaInferenceCacheEvictions", + ) + == amount + ) + + +def check_cache(node1, expected_files): + sources = node1.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted( + expected_files + ) + + +def run_describe_query(node, file): + query = f"desc hdfs('hdfs://hdfs1:9000/{file}')" + node.query(query) + + +def test_schema_inference_cache(started_cluster): + node1.query("system drop schema cache") + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query(node1, "test_cache0.jsonl") + check_cache(node1, ["test_cache0.jsonl"]) + check_cache_misses(node1, "test_cache0.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_hits(node1, "test_cache0.jsonl") + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_invalidations(node1, "test_cache0.jsonl") + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query(node1, "test_cache1.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(node1, "test_cache1.jsonl") + + run_describe_query(node1, "test_cache1.jsonl") + check_cache_hits(node1, "test_cache1.jsonl") + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query(node1, "test_cache2.jsonl") + check_cache(node1, ["test_cache1.jsonl", "test_cache2.jsonl"]) + check_cache_misses(node1, "test_cache2.jsonl") + check_cache_evictions(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache_hits(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache1.jsonl") + check_cache_hits(node1, "test_cache1.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses(node1, "test_cache0.jsonl") + check_cache_evictions(node1, "test_cache0.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache(node1, ["test_cache0.jsonl", "test_cache2.jsonl"]) + check_cache_misses(node1, "test_cache2.jsonl") + check_cache_evictions(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache2.jsonl") + check_cache_hits(node1, "test_cache2.jsonl") + + run_describe_query(node1, "test_cache0.jsonl") + check_cache_hits(node1, "test_cache0.jsonl") + + node1.query( + f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1" + ) + time.sleep(1) + + files = "test_cache{0,1,2,3}.jsonl" + run_describe_query(node1, files) + check_cache_hits(node1, files) + + node1.query(f"system drop schema cache for hdfs") + check_cache(node1, []) + + run_describe_query(node1, files) + check_cache_misses(node1, files, 4) + + node1.query("system drop schema cache") + check_cache(node1, []) + + run_describe_query(node1, files) + check_cache_misses(node1, files, 4) + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/integration/test_storage_s3/configs/schema_cache.xml b/tests/integration/test_storage_s3/configs/schema_cache.xml new file mode 100644 index 00000000000..78f7e4a3563 --- /dev/null +++ b/tests/integration/test_storage_s3/configs/schema_cache.xml @@ -0,0 +1,4 @@ + + 2 + 2 + \ No newline at end of file diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index a09821137c4..0eb3fbf4ca7 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -98,7 +98,11 @@ def started_cluster(): cluster.add_instance( "dummy", with_minio=True, - main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"], + main_configs=[ + "configs/defaultS3.xml", + "configs/named_collections.xml", + "configs/schema_cache.xml", + ], ) cluster.add_instance( "s3_max_redirects", @@ -1331,13 +1335,13 @@ def test_schema_inference_from_globs(started_cluster): url_filename = "test{1,3}.jsoncompacteachrow" result = instance.query_and_get_error( - f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" + f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_s3=0" ) assert "All attempts to extract table structure from files failed" in result result = instance.query_and_get_error( - f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" + f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0" ) assert "All attempts to extract table structure from files failed" in result @@ -1347,7 +1351,7 @@ def test_schema_inference_from_globs(started_cluster): ) result = instance.query_and_get_error( - f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')" + f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings schema_inference_use_cache_for_s3=0" ) assert ( @@ -1357,7 +1361,7 @@ def test_schema_inference_from_globs(started_cluster): url_filename = "test{0,1,2,3}.jsoncompacteachrow" result = instance.query_and_get_error( - f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" + f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0" ) assert ( @@ -1482,3 +1486,213 @@ def test_wrong_format_usage(started_cluster): ) assert "Not a Parquet file" in result + + +def get_profile_event_for_query(instance, query, profile_event): + instance.query("system flush logs") + time.sleep(0.5) + query = query.replace("'", "\\'") + return int( + instance.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1" + ) + ) + + +def check_cache_misses(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert ( + get_profile_event_for_query(instance, query, "SchemaInferenceCacheMisses") + == amount + ) + + +def check_cache_hits(instance, file, storage_name, started_cluster, bucket, amount=1): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert ( + get_profile_event_for_query(instance, query, "SchemaInferenceCacheHits") + == amount + ) + + +def check_cache_invalidations( + instance, file, storage_name, started_cluster, bucket, amount=1 +): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert ( + get_profile_event_for_query( + instance, query, "SchemaInferenceCacheInvalidations" + ) + == amount + ) + + +def check_cache_evictions( + instance, file, storage_name, started_cluster, bucket, amount=1 +): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + assert ( + get_profile_event_for_query(instance, query, "SchemaInferenceCacheEvictions") + == amount + ) + + +def run_describe_query(instance, file, storage_name, started_cluster, bucket): + query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')" + instance.query(query) + + +def check_cache(instance, expected_files): + sources = instance.query("select source from system.schema_inference_cache") + assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted( + expected_files + ) + + +def test_schema_inference_cache(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + def test(storage_name): + instance.query("system drop schema cache") + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache(instance, ["test_cache0.jsonl"]) + check_cache_misses( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache_invalidations( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) + + run_describe_query( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache(instance, ["test_cache1.jsonl", "test_cache2.jsonl"]) + check_cache_misses( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache_evictions( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache1.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"]) + check_cache_misses( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache_evictions( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache(instance, ["test_cache0.jsonl", "test_cache2.jsonl"]) + check_cache_misses( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache_evictions( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache2.jsonl", storage_name, started_cluster, bucket + ) + + run_describe_query( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + check_cache_hits( + instance, "test_cache0.jsonl", storage_name, started_cluster, bucket + ) + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1" + ) + time.sleep(1) + + files = "test_cache{0,1,2,3}.jsonl" + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache_hits(instance, files, storage_name, started_cluster, bucket) + + instance.query(f"system drop schema cache for {storage_name}") + check_cache(instance, []) + + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4) + + instance.query("system drop schema cache") + check_cache(instance, []) + + run_describe_query(instance, files, storage_name, started_cluster, bucket) + check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4) + + test("s3") + test("url") diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index a41398a7514..49f4d8245a2 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -94,6 +94,7 @@ SYSTEM DROP UNCOMPRESSED CACHE ['SYSTEM DROP UNCOMPRESSED','DROP UNCOMPRESSED CA SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP COMPILED EXPRESSION CACHE','DROP COMPILED EXPRESSIONS'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE +SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD SYMBOLS ['RELOAD SYMBOLS'] GLOBAL SYSTEM RELOAD diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index 6e9d9188962..9edc2aa0cb4 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -277,7 +277,7 @@ CREATE TABLE system.grants ( `user_name` Nullable(String), `role_name` Nullable(String), - `access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147), + `access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148), `database` Nullable(String), `table` Nullable(String), `column` Nullable(String), @@ -551,10 +551,10 @@ ENGINE = SystemPartsColumns COMMENT 'SYSTEM TABLE is built on the fly.' CREATE TABLE system.privileges ( - `privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147), + `privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148), `aliases` Array(String), `level` Nullable(Enum8('GLOBAL' = 0, 'DATABASE' = 1, 'TABLE' = 2, 'DICTIONARY' = 3, 'VIEW' = 4, 'COLUMN' = 5)), - `parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147)) + `parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148)) ) ENGINE = SystemPrivileges COMMENT 'SYSTEM TABLE is built on the fly.' diff --git a/tests/queries/0_stateless/02267_file_globs_schema_inference.sql b/tests/queries/0_stateless/02267_file_globs_schema_inference.sql index 0bdb0d6a8ca..56b99d80286 100644 --- a/tests/queries/0_stateless/02267_file_globs_schema_inference.sql +++ b/tests/queries/0_stateless/02267_file_globs_schema_inference.sql @@ -7,4 +7,4 @@ select * from file('02267_data*.jsonl') order by x; insert into function file('02267_data1.jsonl', 'TSV') select 1 as x; insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x; -select * from file('02267_data*.jsonl'); --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE} +select * from file('02267_data*.jsonl') settings schema_inference_use_cache_for_file=0; --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE} diff --git a/tests/queries/0_stateless/02375_system_schema_inference_cache.reference b/tests/queries/0_stateless/02375_system_schema_inference_cache.reference new file mode 100644 index 00000000000..8653863981c --- /dev/null +++ b/tests/queries/0_stateless/02375_system_schema_inference_cache.reference @@ -0,0 +1,12 @@ +storage String +source String +format String +additional_format_info String +registration_time DateTime +schema String +x Nullable(Int64) +s Nullable(String) +x Nullable(Int64) +s Nullable(String) +File 02374_data1.jsonl JSONEachRow x Nullable(Int64), s Nullable(String) +File 02374_data2.jsonl JSONEachRow x Nullable(Int64), s Nullable(String) diff --git a/tests/queries/0_stateless/02375_system_schema_inference_cache.sql b/tests/queries/0_stateless/02375_system_schema_inference_cache.sql new file mode 100644 index 00000000000..a6df0606a89 --- /dev/null +++ b/tests/queries/0_stateless/02375_system_schema_inference_cache.sql @@ -0,0 +1,14 @@ +-- Tags: no-fasttest + +insert into function file('02374_data1.jsonl') select number as x, 'str' as s from numbers(10); +insert into function file('02374_data2.jsonl') select number as x, 'str' as s from numbers(10); + +desc system.schema_inference_cache; +system drop schema cache for file; + +desc file('02374_data1.jsonl'); +desc file('02374_data2.jsonl'); + +select storage, splitByChar('/', source)[-1], format, schema from system.schema_inference_cache where storage='File'; +system drop schema cache for file; +select storage, source, format, schema from system.schema_inference_cache where storage='File';