Merge pull request #38286 from Avogar/schema-inference-cache

Add schema inference cache for s3/hdfs/file/url
This commit is contained in:
Nikolai Kochetov 2022-08-18 13:07:50 +02:00 committed by GitHub
commit 5a85531ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1561 additions and 80 deletions

View File

@ -19,6 +19,7 @@ Additional cache types:
- Compiled expressions cache. - Compiled expressions cache.
- [Avro format](../interfaces/formats.md#data-format-avro) schemas cache. - [Avro format](../interfaces/formats.md#data-format-avro) schemas cache.
- [Dictionaries](../sql-reference/dictionaries/index.md) data cache. - [Dictionaries](../sql-reference/dictionaries/index.md) data cache.
- Schema inference cache.
Indirectly used: Indirectly used:

View File

@ -3300,7 +3300,7 @@ Possible values:
Default value: `0`. Default value: `0`.
## shutdown_wait_unfinished_queries ## shutdown_wait_unfinished_queries {#shutdown_wait_unfinished_queries}
Enables or disables waiting unfinished queries when shutdown server. Enables or disables waiting unfinished queries when shutdown server.
@ -3311,13 +3311,13 @@ Possible values:
Default value: 0. Default value: 0.
## shutdown_wait_unfinished ## shutdown_wait_unfinished {#shutdown_wait_unfinished}
The waiting time in seconds for currently handled connections when shutdown server. The waiting time in seconds for currently handled connections when shutdown server.
Default Value: 5. Default Value: 5.
## memory_overcommit_ratio_denominator ## memory_overcommit_ratio_denominator {#memory_overcommit_ratio_denominator}
It represents soft memory limit in case when hard limit is reached on user level. It represents soft memory limit in case when hard limit is reached on user level.
This value is used to compute overcommit ratio for the query. This value is used to compute overcommit ratio for the query.
@ -3326,7 +3326,7 @@ Read more about [memory overcommit](memory-overcommit.md).
Default value: `1GiB`. Default value: `1GiB`.
## memory_usage_overcommit_max_wait_microseconds ## memory_usage_overcommit_max_wait_microseconds {#memory_usage_overcommit_max_wait_microseconds}
Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level. Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level.
If the timeout is reached and memory is not freed, an exception is thrown. If the timeout is reached and memory is not freed, an exception is thrown.
@ -3334,7 +3334,7 @@ Read more about [memory overcommit](memory-overcommit.md).
Default value: `5000000`. Default value: `5000000`.
## memory_overcommit_ratio_denominator_for_user ## memory_overcommit_ratio_denominator_for_user {#memory_overcommit_ratio_denominator_for_user}
It represents soft memory limit in case when hard limit is reached on global level. It represents soft memory limit in case when hard limit is reached on global level.
This value is used to compute overcommit ratio for the query. This value is used to compute overcommit ratio for the query.
@ -3343,6 +3343,36 @@ Read more about [memory overcommit](memory-overcommit.md).
Default value: `1GiB`. Default value: `1GiB`.
## schema_inference_use_cache_for_file {schema_inference_use_cache_for_file}
Enable schemas cache for schema inference in `file` table function.
Default value: `true`.
## schema_inference_use_cache_for_s3 {schema_inference_use_cache_for_s3}
Enable schemas cache for schema inference in `s3` table function.
Default value: `true`.
## schema_inference_use_cache_for_url {schema_inference_use_cache_for_url}
Enable schemas cache for schema inference in `url` table function.
Default value: `true`.
## schema_inference_use_cache_for_hdfs {schema_inference_use_cache_for_hdfs}
Enable schemas cache for schema inference in `hdfs` table function.
Default value: `true`.
## schema_inference_cache_require_modification_time_for_url {#schema_inference_cache_require_modification_time_for_url}
Use schema from cache for URL with last modification time validation (for urls with Last-Modified header). If this setting is enabled and URL doesn't have Last-Modified header, schema from cache won't be used.
Default value: `true`.
## compatibility {#compatibility} ## compatibility {#compatibility}
This setting changes other settings according to provided ClickHouse version. This setting changes other settings according to provided ClickHouse version.

View File

@ -140,6 +140,7 @@ enum class AccessType
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \ M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \ M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -350,6 +350,12 @@
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \ M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
\
M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \
M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \
M(SchemaInferenceCacheEvictions, "Number of times a schema from cache was evicted due to overflow") \
M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") \
\
M(KeeperPacketsSent, "Packets sent by keeper server") \ M(KeeperPacketsSent, "Packets sent by keeper server") \
M(KeeperPacketsReceived, "Packets received by keeper server") \ M(KeeperPacketsReceived, "Packets received by keeper server") \
M(KeeperRequestTotal, "Total requests number on keeper server") \ M(KeeperRequestTotal, "Total requests number on keeper server") \

View File

@ -609,6 +609,12 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \ M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \
M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \ M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \
\ \
M(Bool, schema_inference_use_cache_for_file, true, "Use cache in schema inference while using file table function", 0) \
M(Bool, schema_inference_use_cache_for_s3, true, "Use cache in schema inference while using s3 table function", 0) \
M(Bool, schema_inference_use_cache_for_hdfs, true, "Use cache in schema inference while using hdfs table function", 0) \
M(Bool, schema_inference_use_cache_for_url, true, "Use cache in schema inference while using url table function", 0) \
M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \
\
M(String, compatibility, "", "Changes other settings according to provided ClickHouse version. If we know that we changed some behaviour in ClickHouse by changing some settings in some version, this compatibility setting will control these settings", 0) \ M(String, compatibility, "", "Changes other settings according to provided ClickHouse version. If we know that we changed some behaviour in ClickHouse by changing some settings in some version, this compatibility setting will control these settings", 0) \
\ \
M(Map, additional_table_filters, "", "Additional filter expression which would be applied after reading from specified table. Syntax: {'table1': 'expression', 'database.table2': 'expression'}", 0) \ M(Map, additional_table_filters, "", "Additional filter expression which would be applied after reading from specified table. Syntax: {'table1': 'expression', 'database.table2': 'expression'}", 0) \

View File

@ -573,6 +573,25 @@ bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) co
return target.supports_subset_of_columns; return target.supports_subset_of_columns;
} }
void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(
const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter)
{
auto & target = dict[name].additional_info_for_schema_cache_getter;
if (target)
throw Exception("FormatFactory: additional info for schema cache getter " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(additional_info_for_schema_cache_getter);
}
String FormatFactory::getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
{
const auto & additional_info_getter = getCreators(name).additional_info_for_schema_cache_getter;
if (!additional_info_getter)
return "";
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return additional_info_getter(format_settings);
}
bool FormatFactory::isInputFormat(const String & name) const bool FormatFactory::isInputFormat(const String & name) const
{ {
auto it = dict.find(name); auto it = dict.find(name);

View File

@ -100,6 +100,13 @@ private:
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings)>; using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings)>;
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>; using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
/// Some formats can extract different schemas from the same source depending on
/// some settings. To process this case in schema cache we should add some additional
/// information to a cache key. This getter should return some string with information
/// about such settings. For example, for Protobuf format it's the path to the schema
/// and the name of the message.
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;
struct Creators struct Creators
{ {
InputCreator input_creator; InputCreator input_creator;
@ -111,6 +118,7 @@ private:
bool supports_subset_of_columns{false}; bool supports_subset_of_columns{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker; AppendSupportChecker append_support_checker;
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
}; };
using FormatsDictionary = std::unordered_map<String, Creators>; using FormatsDictionary = std::unordered_map<String, Creators>;
@ -202,6 +210,9 @@ public:
bool checkIfFormatHasExternalSchemaReader(const String & name) const; bool checkIfFormatHasExternalSchemaReader(const String & name) const;
bool checkIfFormatHasAnySchemaReader(const String & name) const; bool checkIfFormatHasAnySchemaReader(const String & name) const;
void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter);
String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
const FormatsDictionary & getAllFormats() const const FormatsDictionary & getAllFormats() const
{ {
return dict; return dict;

View File

@ -1,4 +1,3 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h> #include <DataTypes/DataTypeTuple.h>
@ -75,12 +74,13 @@ ColumnsDescription readSchemaFromFormat(
SchemaReaderPtr schema_reader; SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference; size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
size_t iterations = 0; size_t iterations = 0;
ColumnsDescription cached_columns;
while (true) while (true)
{ {
bool is_eof = false; bool is_eof = false;
try try
{ {
buf = read_buffer_iterator(); buf = read_buffer_iterator(cached_columns);
if (!buf) if (!buf)
break; break;
is_eof = buf->eof(); is_eof = buf->eof();
@ -142,6 +142,9 @@ ColumnsDescription readSchemaFromFormat(
} }
} }
if (!cached_columns.empty())
return cached_columns;
if (names_and_types.empty()) if (names_and_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}\nYou can specify the structure manually", exception_messages); throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}\nYou can specify the structure manually", exception_messages);
@ -236,4 +239,37 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header)
return result; return result;
} }
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
{
return getKeysForSchemaCache({source}, format, format_settings, context).front();
}
static String makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info)
{
return source + "@@" + format + "@@" + additional_format_info;
}
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info)
{
size_t additional_format_info_pos = key.rfind("@@");
additional_format_info = key.substr(additional_format_info_pos + 2, key.size() - additional_format_info_pos - 2);
size_t format_pos = key.rfind("@@", additional_format_info_pos - 1);
format = key.substr(format_pos + 2, additional_format_info_pos - format_pos - 2);
source = key.substr(0, format_pos);
}
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
{
/// For some formats data schema depends on some settings, so it's possible that
/// two queries to the same source will get two different schemas. To process this
/// case we add some additional information specific for the format to the cache key.
/// For example, for Protobuf format additional information is the path to the schema
/// and message name.
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
Strings cache_keys;
cache_keys.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); });
return cache_keys;
}
} }

View File

@ -6,7 +6,7 @@
namespace DB namespace DB
{ {
using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>()>; using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>(ColumnsDescription &)>;
/// Try to determine the schema of the data in specifying format. /// Try to determine the schema of the data in specifying format.
/// For formats that have an external schema reader, it will /// For formats that have an external schema reader, it will
@ -46,4 +46,9 @@ DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type);
/// Call makeNullableRecursivelyAndCheckForNothing for all types /// Call makeNullableRecursivelyAndCheckForNothing for all types
/// in the block and return names and types. /// in the block and return names and types.
NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header); NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header);
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info);
} }

View File

@ -207,21 +207,7 @@ namespace detail
return *file_size; return *file_size;
Poco::Net::HTTPResponse response; Poco::Net::HTTPResponse response;
for (size_t i = 0; i < settings.http_max_tries; ++i) getHeadResponse(response);
{
try
{
callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD);
break;
}
catch (const Poco::Exception & e)
{
if (i == settings.http_max_tries - 1)
throw;
LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText());
}
}
if (response.hasContentLength()) if (response.hasContentLength())
{ {
@ -250,6 +236,25 @@ namespace detail
InitializeError initialization_error = InitializeError::NONE; InitializeError initialization_error = InitializeError::NONE;
private: private:
void getHeadResponse(Poco::Net::HTTPResponse & response)
{
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
callWithRedirects(response, Poco::Net::HTTPRequest::HTTP_HEAD);
break;
}
catch (const Poco::Exception & e)
{
if (i == settings.http_max_tries - 1)
throw;
LOG_ERROR(log, "Failed to make HTTP_HEAD request to {}. Error: {}", uri.toString(), e.displayText());
}
}
}
void setupExternalBuffer() void setupExternalBuffer()
{ {
/** /**
@ -669,6 +674,22 @@ namespace detail
} }
const std::string & getCompressionMethod() const { return content_encoding; } const std::string & getCompressionMethod() const { return content_encoding; }
std::optional<time_t> getLastModificationTime()
{
Poco::Net::HTTPResponse response;
getHeadResponse(response);
if (!response.has("Last-Modified"))
return std::nullopt;
String date_str = response.get("Last-Modified");
struct tm info;
char * res = strptime(date_str.data(), "%a, %d %b %Y %H:%M:%S %Z", &info);
if (!res || res != date_str.data() + date_str.size())
return std::nullopt;
return timegm(&info);
}
}; };
} }

View File

@ -790,7 +790,8 @@ namespace S3
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : ""); quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
} }
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
{ {
Aws::S3::Model::HeadObjectRequest req; Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket); req.SetBucket(bucket);
@ -804,13 +805,18 @@ namespace S3
if (outcome.IsSuccess()) if (outcome.IsSuccess())
{ {
auto read_result = outcome.GetResultWithOwnership(); auto read_result = outcome.GetResultWithOwnership();
return static_cast<size_t>(read_result.GetContentLength()); return {.size = static_cast<size_t>(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000};
} }
else if (throw_on_error) else if (throw_on_error)
{ {
throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
} }
return 0; return {};
}
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
{
return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error).size;
} }
} }

View File

@ -78,6 +78,14 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri); static void validateBucket(const String & bucket, const Poco::URI & uri);
}; };
struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
};
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true);
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true); size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true);
} }

View File

@ -45,6 +45,10 @@
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h> #include <Storages/Freeze.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Parsers/ASTSystemQuery.h> #include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTDropQuery.h> #include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
@ -326,6 +330,29 @@ BlockIO InterpreterSystemQuery::execute()
} }
break; break;
} }
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_storage.empty())
caches_to_drop = {"FILE", "S3", "HDFS", "URL"};
else
caches_to_drop = {query.schema_cache_storage};
if (caches_to_drop.contains("FILE"))
StorageFile::getSchemaCache(getContext()).clear();
#if USE_AWS_S3
if (caches_to_drop.contains("S3"))
StorageS3::getSchemaCache(getContext()).clear();
#endif
#if USE_HDFS
if (caches_to_drop.contains("HDFS"))
StorageHDFS::getSchemaCache(getContext()).clear();
#endif
if (caches_to_drop.contains("URL"))
StorageURL::getSchemaCache(getContext()).clear();
break;
}
case Type::RELOAD_DICTIONARY: case Type::RELOAD_DICTIONARY:
{ {
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY); getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
@ -833,6 +860,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_INDEX_MARK_CACHE: case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE: case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_FILESYSTEM_CACHE: case Type::DROP_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
{ {
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break; break;

View File

@ -29,6 +29,7 @@ public:
DROP_COMPILED_EXPRESSION_CACHE, DROP_COMPILED_EXPRESSION_CACHE,
#endif #endif
DROP_FILESYSTEM_CACHE, DROP_FILESYSTEM_CACHE,
DROP_SCHEMA_CACHE,
STOP_LISTEN_QUERIES, STOP_LISTEN_QUERIES,
START_LISTEN_QUERIES, START_LISTEN_QUERIES,
RESTART_REPLICAS, RESTART_REPLICAS,
@ -97,6 +98,8 @@ public:
String backup_name; String backup_name;
String schema_cache_storage;
String getID(char) const override { return "SYSTEM query"; } String getID(char) const override { return "SYSTEM query"; }
ASTPtr clone() const override ASTPtr clone() const override

View File

@ -363,6 +363,23 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
parseQueryWithOnCluster(res, pos, expected); parseQueryWithOnCluster(res, pos, expected);
break; break;
} }
case Type::DROP_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"FILE"}.ignore(pos, expected))
res->schema_cache_storage = "FILE";
else if (ParserKeyword{"S3"}.ignore(pos, expected))
res->schema_cache_storage = "S3";
else if (ParserKeyword{"HDFS"}.ignore(pos, expected))
res->schema_cache_storage = "HDFS";
else if (ParserKeyword{"URL"}.ignore(pos, expected))
res->schema_cache_storage = "URL";
else
return false;
}
break;
}
case Type::UNFREEZE: case Type::UNFREEZE:
{ {

View File

@ -318,6 +318,8 @@ void registerInputFormatCapnProto(FormatFactory & factory)
}); });
factory.markFormatSupportsSubsetOfColumns("CapnProto"); factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto"); factory.registerFileExtension("capnp", "CapnProto");
factory.registerAdditionalInfoForSchemaCacheGetter(
"CapnProto", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
} }
void registerCapnProtoSchemaReader(FormatFactory & factory) void registerCapnProtoSchemaReader(FormatFactory & factory)

View File

@ -452,6 +452,9 @@ void registerInputFormatMySQLDump(FormatFactory & factory)
{ {
return std::make_shared<MySQLDumpRowInputFormat>(buf, header, params, settings); return std::make_shared<MySQLDumpRowInputFormat>(buf, header, params, settings);
}); });
factory.registerAdditionalInfoForSchemaCacheGetter(
"MySQLDump", [](const FormatSettings & settings) { return "Table name: " + settings.mysql_dump.table_name; });
} }
void registerMySQLSchemaReader(FormatFactory & factory) void registerMySQLSchemaReader(FormatFactory & factory)

View File

@ -81,6 +81,8 @@ void registerInputFormatProtobufList(FormatFactory & factory)
FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers); FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers);
}); });
factory.markFormatSupportsSubsetOfColumns("ProtobufList"); factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter(
"ProtobufList", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
} }
void registerProtobufListSchemaReader(FormatFactory & factory) void registerProtobufListSchemaReader(FormatFactory & factory)

View File

@ -101,6 +101,10 @@ void registerProtobufSchemaReader(FormatFactory & factory)
{ {
return std::make_shared<ProtobufSchemaReader>(settings); return std::make_shared<ProtobufSchemaReader>(settings);
}); });
for (const auto & name : {"Protobuf", "ProtobufSingle"})
factory.registerAdditionalInfoForSchemaCacheGetter(
name, [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
} }
} }

View File

@ -0,0 +1,114 @@
#include <Storages/Cache/SchemaCache.h>
#include <Common/ProfileEvents.h>
#include <ctime>
namespace ProfileEvents
{
extern const Event SchemaInferenceCacheHits;
extern const Event SchemaInferenceCacheMisses;
extern const Event SchemaInferenceCacheEvictions;
extern const Event SchemaInferenceCacheInvalidations;
}
namespace DB
{
SchemaCache::SchemaCache(size_t max_elements_) : max_elements(max_elements_)
{
}
void SchemaCache::add(const String & key, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
addUnlocked(key, columns);
}
void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
for (const auto & key : keys)
addUnlocked(key, columns);
}
void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns)
{
/// Do nothing if this key is already in cache;
if (data.contains(key))
return;
time_t now = std::time(nullptr);
auto it = queue.insert(queue.end(), key);
data[key] = {SchemaInfo{columns, now}, it};
checkOverflow();
}
void SchemaCache::checkOverflow()
{
if (queue.size() <= max_elements)
return;
auto key = queue.front();
data.erase(key);
queue.pop_front();
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheEvictions);
}
std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, LastModificationTimeGetter get_last_mod_time)
{
std::lock_guard lock(mutex);
auto it = data.find(key);
if (it == data.end())
{
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheMisses);
return std::nullopt;
}
auto & schema_info = it->second.schema_info;
auto & queue_iterator = it->second.iterator;
if (get_last_mod_time)
{
/// It's important to call get_last_mod_time only if we have key in cache,
/// because this function can do some heavy operations.
auto last_mod_time = get_last_mod_time();
/// If get_last_mod_time function was provided but it returned nullopt, it means that
/// it failed to get last modification time, so we cannot safely use value from cache.
if (!last_mod_time)
return std::nullopt;
if (*last_mod_time >= schema_info.registration_time)
{
/// Object was modified after it was added in cache.
/// So, stored value is no more valid and we should remove it.
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations);
queue.erase(queue_iterator);
data.erase(key);
return std::nullopt;
}
}
/// Move key to the end of queue.
queue.splice(queue.end(), queue, queue_iterator);
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits);
return schema_info.columns;
}
void SchemaCache::clear()
{
std::lock_guard lock(mutex);
data.clear();
queue.clear();
}
std::unordered_map<String, SchemaCache::SchemaInfo> SchemaCache::getAll()
{
std::lock_guard lock(mutex);
std::unordered_map<String, SchemaCache::SchemaInfo> result;
for (const auto & [key, value] : data)
result[key] = value.schema_info;
return result;
}
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <unordered_map>
#include <list>
#include <mutex>
#include <optional>
namespace DB
{
const size_t DEFAULT_SCHEMA_CACHE_ELEMENTS = 4096;
/// Cache that stores columns description by some string key. It's used in schema inference.
/// It implements LRU semantic: after each access to a key in cache we move this key to
/// the end of the queue, if we reached the limit of maximum elements in the cache we
/// remove keys from the beginning of the queue.
/// It also supports keys invalidations by last modification time. If last modification time
/// is provided and last modification happened after a key was added to the cache, this key
/// will be removed from cache.
class SchemaCache
{
public:
SchemaCache(size_t max_elements_);
struct SchemaInfo
{
ColumnsDescription columns;
time_t registration_time;
};
using LastModificationTimeGetter = std::function<std::optional<time_t>()>;
/// Add new key with a schema
void add(const String & key, const ColumnsDescription & columns);
/// Add many keys with the same schema (usually used for globs)
void addMany(const Strings & keys, const ColumnsDescription & columns);
std::optional<ColumnsDescription> tryGet(const String & key, LastModificationTimeGetter get_last_mod_time = {});
void clear();
std::unordered_map<String, SchemaInfo> getAll();
private:
void addUnlocked(const String & key, const ColumnsDescription & columns);
void checkOverflow();
using Queue = std::list<String>;
using QueueIterator = Queue::iterator;
struct Cell
{
SchemaInfo schema_info;
QueueIterator iterator;
};
Queue queue;
std::unordered_map<String, Cell> data;
size_t max_elements;
std::mutex mutex;
};
}

View File

@ -65,7 +65,7 @@ namespace
/* Recursive directory listing with matched paths as a result. /* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile. * Have the same method in StorageFile.
*/ */
Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match) Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match, std::unordered_map<String, time_t> * last_mod_times)
{ {
const size_t first_glob = for_match.find_first_of("*?{"); const size_t first_glob = for_match.find_first_of("*?{");
@ -100,13 +100,15 @@ namespace
if (re2::RE2::FullMatch(file_name, matcher)) if (re2::RE2::FullMatch(file_name, matcher))
{ {
result.push_back(String(ls.file_info[i].mName)); result.push_back(String(ls.file_info[i].mName));
if (last_mod_times)
(*last_mod_times)[result.back()] = ls.file_info[i].mLastMod;
} }
} }
else if (is_directory && looking_for_directory) else if (is_directory && looking_for_directory)
{ {
if (re2::RE2::FullMatch(file_name, matcher)) if (re2::RE2::FullMatch(file_name, matcher))
{ {
Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash)); Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash), last_mod_times);
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check. /// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
std::move(result_part.begin(), result_part.end(), std::back_inserter(result)); std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
} }
@ -122,12 +124,12 @@ namespace
return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)}; return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)};
} }
std::vector<String> getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context) std::vector<String> getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context, std::unordered_map<String, time_t> * last_mod_times = nullptr)
{ {
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef()); HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get()); HDFSFSPtr fs = createHDFSFS(builder.get());
return LSWithRegexpMatching("/", fs, path_from_uri); return LSWithRegexpMatching("/", fs, path_from_uri, last_mod_times);
} }
} }
@ -186,7 +188,8 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
ContextPtr ctx) ContextPtr ctx)
{ {
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri); const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
auto paths = getPathsList(path_from_uri, uri, ctx); std::unordered_map<String, time_t> last_mod_time;
auto paths = getPathsList(path_from_uri, uri, ctx, &last_mod_time);
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format)) if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception( throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
@ -194,7 +197,11 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
"specify table structure manually", "specify table structure manually",
format); format);
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer> std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
columns_from_cache = tryGetColumnsFromCache(paths, path_from_uri, last_mod_time, format, ctx);
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths.end()) if (it == paths.end())
return nullptr; return nullptr;
@ -203,7 +210,17 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max; const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, zstd_window_log_max); return wrapReadBufferWithCompressionMethod(std::move(impl), compression, zstd_window_log_max);
}; };
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
if (ctx->getSettingsRef().schema_inference_use_cache_for_hdfs)
addColumnsToCache(paths, path_from_uri, columns, format, ctx);
return columns;
} }
class HDFSSource::DisclosedGlobIterator::Impl class HDFSSource::DisclosedGlobIterator::Impl
@ -732,6 +749,55 @@ NamesAndTypesList StorageHDFS::getVirtuals() const
return virtual_columns; return virtual_columns;
} }
SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_hdfs", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(
const Strings & paths,
const String & uri_without_path,
std::unordered_map<String, time_t> & last_mod_time,
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
for (const auto & path : paths)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
auto it = last_mod_time.find(path);
if (it == last_mod_time.end())
return std::nullopt;
return it->second;
};
String url = fs::path(uri_without_path) / path;
String cache_key = getKeyForSchemaCache(url, format_name, {}, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageHDFS::addColumnsToCache(
const Strings & paths,
const String & uri_without_path,
const ColumnsDescription & columns,
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
Strings sources;
sources.reserve(paths.size());
std::transform(paths.begin(), paths.end(), std::back_inserter(sources), [&](const String & path){ return fs::path(uri_without_path) / path; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx);
schema_cache.addMany(cache_keys, columns);
}
} }
#endif #endif

View File

@ -6,6 +6,7 @@
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -65,10 +66,26 @@ public:
const String & compression_method, const String & compression_method,
ContextPtr ctx); ContextPtr ctx);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
protected: protected:
friend class HDFSSource; friend class HDFSSource;
private: private:
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & paths,
const String & uri_without_path,
std::unordered_map<String, time_t> & last_mod_time,
const String & format_name,
const ContextPtr & ctx);
static void addColumnsToCache(
const Strings & paths,
const String & uri_without_path,
const ColumnsDescription & columns,
const String & format_name,
const ContextPtr & ctx);
std::vector<const String> uris; std::vector<const String> uris;
String format_name; String format_name;
String compression_method; String compression_method;

View File

@ -264,7 +264,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
/// in case of file descriptor we have a stream of data and we cannot /// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for /// start reading data from the beginning after reading some data for
/// schema inference. /// schema inference.
ReadBufferIterator read_buffer_iterator = [&]() ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{ {
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer. /// where we can store the original read buffer.
@ -308,7 +308,11 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
"table structure manually", "table structure manually",
format); format);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer> std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == paths.end()) if (it == paths.end())
return nullptr; return nullptr;
@ -316,7 +320,16 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return createReadBuffer(*it++, false, "File", -1, compression_method, context); return createReadBuffer(*it++, false, "File", -1, compression_method, context);
}; };
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context); ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(paths, columns, format, format_settings, context);
return columns;
} }
bool StorageFile::supportsSubsetOfColumns() const bool StorageFile::supportsSubsetOfColumns() const
@ -1221,4 +1234,48 @@ NamesAndTypesList StorageFile::getVirtuals() const
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}, {"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}}; {"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
} }
SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
{
/// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache(context);
struct stat file_stat{};
for (const auto & path : paths)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(path.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
String cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageFile::addColumnsToCache(
const Strings & paths,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -86,6 +87,8 @@ public:
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr context); ContextPtr context);
static SchemaCache & getSchemaCache(const ContextPtr & context);
protected: protected:
friend class StorageFileSource; friend class StorageFileSource;
friend class StorageFileSink; friend class StorageFileSink;
@ -93,6 +96,16 @@ protected:
private: private:
void setStorageMetadata(CommonArguments args); void setStorageMetadata(CommonArguments args);
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context);
static void addColumnsToCache(
const Strings & paths,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
std::string format_name; std::string format_name;
// We use format settings from global context + CREATE query for File table // We use format settings from global context + CREATE query for File table
// function -- in this case, format_settings is set. // function -- in this case, format_settings is set.

View File

@ -90,12 +90,16 @@ public:
const S3::URI & globbed_uri_, const S3::URI & globbed_uri_,
ASTPtr & query_, ASTPtr & query_,
const Block & virtual_header_, const Block & virtual_header_,
ContextPtr context_) ContextPtr context_,
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
: WithContext(context_) : WithContext(context_)
, client(client_) , client(client_)
, globbed_uri(globbed_uri_) , globbed_uri(globbed_uri_)
, query(query_) , query(query_)
, virtual_header(virtual_header_) , virtual_header(virtual_header_)
, object_infos(object_infos_)
, read_keys(read_keys_)
{ {
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos) if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION); throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -187,6 +191,8 @@ private:
if (re2::RE2::FullMatch(key, *matcher)) if (re2::RE2::FullMatch(key, *matcher))
{ {
String path = fs::path(globbed_uri.bucket) / key; String path = fs::path(globbed_uri.bucket) / key;
if (object_infos)
(*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000};
String file = path.substr(path.find_last_of('/') + 1); String file = path.substr(path.find_last_of('/') + 1);
if (path_column) if (path_column)
path_column->insert(path); path_column->insert(path);
@ -221,6 +227,9 @@ private:
/// It returns false when all objects were returned /// It returns false when all objects were returned
is_finished = !outcome.GetResult().GetIsTruncated(); is_finished = !outcome.GetResult().GetIsTruncated();
if (read_keys)
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
} }
std::mutex mutex; std::mutex mutex;
@ -235,6 +244,8 @@ private:
Aws::S3::Model::ListObjectsV2Outcome outcome; Aws::S3::Model::ListObjectsV2Outcome outcome;
std::unique_ptr<re2::RE2> matcher; std::unique_ptr<re2::RE2> matcher;
bool is_finished{false}; bool is_finished{false};
std::unordered_map<String, S3::ObjectInfo> * object_infos;
Strings * read_keys;
}; };
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
@ -242,8 +253,10 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const S3::URI & globbed_uri_, const S3::URI & globbed_uri_,
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context) ContextPtr context,
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context)) std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_))
{ {
} }
@ -395,7 +408,8 @@ StorageS3Source::StorageS3Source(
const String & bucket_, const String & bucket_,
const String & version_id_, const String & version_id_,
std::shared_ptr<IteratorWrapper> file_iterator_, std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_) const size_t download_thread_num_,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_)
: ISource(getHeader(sample_block_, requested_virtual_columns_)) : ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_) , WithContext(context_)
, name(std::move(name_)) , name(std::move(name_))
@ -412,6 +426,7 @@ StorageS3Source::StorageS3Source(
, requested_virtual_columns(requested_virtual_columns_) , requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_) , file_iterator(file_iterator_)
, download_thread_num(download_thread_num_) , download_thread_num(download_thread_num_)
, object_infos(object_infos_)
{ {
initialize(); initialize();
} }
@ -455,7 +470,12 @@ bool StorageS3Source::initialize()
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key) std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
{ {
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false); size_t object_size;
auto it = object_infos.find(fs::path(bucket) / key);
if (it != object_infos.end())
object_size = it->second.size;
else
object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false);
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1; const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
@ -806,27 +826,34 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
ContextPtr local_context, ContextPtr local_context,
ASTPtr query, ASTPtr query,
const Block & virtual_block, const Block & virtual_block,
const std::vector<String> & read_tasks) const std::vector<String> & read_tasks,
std::unordered_map<String, S3::ObjectInfo> * object_infos,
Strings * read_keys)
{ {
if (distributed_processing) if (distributed_processing)
{ {
return std::make_shared<StorageS3Source::IteratorWrapper>( return std::make_shared<StorageS3Source::IteratorWrapper>(
[read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback())]() -> String [read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String
{ {
return read_tasks_iterator->next(); auto key = read_tasks_iterator->next();
if (read_keys)
read_keys->push_back(key);
return key;
}); });
} }
else if (is_key_with_globs) else if (is_key_with_globs)
{ {
/// Iterate through disclosed globs and make a source for each file /// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>( auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context); *s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); }); return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
} }
else else
{ {
auto keys_iterator auto keys_iterator
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context); = std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
if (read_keys)
*read_keys = keys;
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); }); return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
} }
} }
@ -871,7 +898,8 @@ Pipe StorageS3::read(
local_context, local_context,
query_info.query, query_info.query,
virtual_block, virtual_block,
read_tasks_used_in_schema_inference); read_tasks_used_in_schema_inference,
&object_infos);
ColumnsDescription columns_description; ColumnsDescription columns_description;
Block block_for_format; Block block_for_format;
@ -914,7 +942,8 @@ Pipe StorageS3::read(
s3_configuration.uri.bucket, s3_configuration.uri.bucket,
s3_configuration.uri.version_id, s3_configuration.uri.version_id,
iterator_wrapper, iterator_wrapper,
max_download_threads)); max_download_threads,
object_infos));
} }
auto pipe = Pipe::unitePipes(std::move(pipes)); auto pipe = Pipe::unitePipes(std::move(pipes));
@ -1151,11 +1180,12 @@ ColumnsDescription StorageS3::getTableStructureFromData(
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx) ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{ {
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) }; S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
updateS3Configuration(ctx, s3_configuration); updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx); return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
} }
ColumnsDescription StorageS3::getTableStructureFromDataImpl( ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1166,12 +1196,20 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
bool is_key_with_globs, bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing) std::vector<String> * read_keys_in_distributed_processing,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{ {
auto file_iterator std::vector<String> read_keys;
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {});
ReadBufferIterator read_buffer_iterator = [&, first = true]() mutable -> std::unique_ptr<ReadBuffer> auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
size_t prev_read_keys_size = read_keys.size();
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx);
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{ {
auto key = (*file_iterator)(); auto key = (*file_iterator)();
@ -1187,8 +1225,17 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
return nullptr; return nullptr;
} }
if (distributed_processing && read_keys_in_distributed_processing) /// S3 file iterator could get new keys after new iteration, check them in schema cache.
read_keys_in_distributed_processing->push_back(key); if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx);
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
{
cached_columns = *columns_from_cache;
return nullptr;
}
}
first = false; first = false;
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max; const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
@ -1199,7 +1246,19 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
zstd_window_log_max); zstd_window_log_max);
}; };
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx); ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
addColumnsToCache(read_keys, s3_configuration, columns, format, format_settings, ctx);
if (distributed_processing && read_keys_in_distributed_processing)
*read_keys_in_distributed_processing = std::move(read_keys);
return columns;
} }
@ -1288,6 +1347,74 @@ bool StorageS3::supportsPartitionBy() const
return true; return true;
} }
SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
const Strings::const_iterator & begin,
const Strings::const_iterator & end,
const S3Configuration & s3_configuration,
std::unordered_map<String, S3::ObjectInfo> * object_infos,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it)
{
String path = fs::path(s3_configuration.uri.bucket) / *it;
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
S3::ObjectInfo info;
/// Check if we already have information about this object.
/// If no, request it and remember for possible future usage.
if (object_infos && object_infos->contains(path))
info = (*object_infos)[path];
else
{
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
info = S3::getObjectInfo(s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false);
if (object_infos)
(*object_infos)[path] = info;
}
if (info.last_modification_time)
return info.last_modification_time;
return std::nullopt;
};
String source = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path;
String cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageS3::addColumnsToCache(
const Strings & keys,
const S3Configuration & s3_configuration,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addMany(cache_keys, columns);
}
} }
#endif #endif

View File

@ -18,6 +18,7 @@
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
namespace Aws::S3 namespace Aws::S3
{ {
@ -40,7 +41,9 @@ public:
const S3::URI & globbed_uri_, const S3::URI & globbed_uri_,
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context); ContextPtr context,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys_ = nullptr);
String next(); String next();
@ -94,7 +97,8 @@ public:
const String & bucket, const String & bucket,
const String & version_id, const String & version_id,
std::shared_ptr<IteratorWrapper> file_iterator_, std::shared_ptr<IteratorWrapper> file_iterator_,
size_t download_thread_num); size_t download_thread_num,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_);
String getName() const override; String getName() const override;
@ -128,6 +132,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("StorageS3Source"); Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
std::unordered_map<String, S3::ObjectInfo> object_infos;
/// Recreate ReadBuffer and Pipeline for each file. /// Recreate ReadBuffer and Pipeline for each file.
bool initialize(); bool initialize();
@ -190,7 +196,8 @@ public:
const String & compression_method, const String & compression_method,
bool distributed_processing, bool distributed_processing,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx); ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args); static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args);
@ -204,6 +211,8 @@ public:
S3Settings::ReadWriteSettings rw_settings; S3Settings::ReadWriteSettings rw_settings;
}; };
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
private: private:
friend class StorageS3Cluster; friend class StorageS3Cluster;
friend class TableFunctionS3Cluster; friend class TableFunctionS3Cluster;
@ -223,6 +232,8 @@ private:
std::vector<String> read_tasks_used_in_schema_inference; std::vector<String> read_tasks_used_in_schema_inference;
std::unordered_map<String, S3::ObjectInfo> object_infos;
static void updateS3Configuration(ContextPtr, S3Configuration &); static void updateS3Configuration(ContextPtr, S3Configuration &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator( static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(
@ -233,7 +244,9 @@ private:
ContextPtr local_context, ContextPtr local_context,
ASTPtr query, ASTPtr query,
const Block & virtual_block, const Block & virtual_block,
const std::vector<String> & read_tasks = {}); const std::vector<String> & read_tasks = {},
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys = nullptr);
static ColumnsDescription getTableStructureFromDataImpl( static ColumnsDescription getTableStructureFromDataImpl(
const String & format, const String & format,
@ -243,9 +256,27 @@ private:
bool is_key_with_globs, bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing = nullptr); std::vector<String> * read_keys_in_distributed_processing = nullptr,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
bool supportsSubsetOfColumns() const override; bool supportsSubsetOfColumns() const override;
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings::const_iterator & begin,
const Strings::const_iterator & end,
const S3Configuration & s3_configuration,
std::unordered_map<String, S3::ObjectInfo> * object_infos,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
static void addColumnsToCache(
const Strings & keys,
const S3Configuration & s3_configuration,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx);
}; };
} }

View File

@ -15,7 +15,6 @@
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <IO/IOThreadPool.h> #include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h> #include <IO/ParallelReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h> #include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -592,8 +591,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
urls_to_check = {uri}; urls_to_check = {uri};
} }
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_url)
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr<ReadBuffer> ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (it == urls_to_check.cend()) if (it == urls_to_check.cend())
return nullptr; return nullptr;
@ -616,7 +618,16 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return buf; return buf;
}; };
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context); ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
if (context->getSettingsRef().schema_inference_use_cache_for_url)
addColumnsToCache(urls_to_check, columns, format, format_settings, context);
return columns;
} }
bool IStorageURLBase::supportsSubsetOfColumns() const bool IStorageURLBase::supportsSubsetOfColumns() const
@ -797,6 +808,87 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
} }
} }
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const Strings & urls,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
auto last_mod_time = getLastModificationTime(url, headers, credentials, context);
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
String cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void IStorageURLBase::addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns);
}
std::optional<time_t> IStorageURLBase::getLastModificationTime(
const String & url,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context)
{
try
{
ReadWriteBufferFromHTTP buf(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
&context->getRemoteHostFilter(),
true,
false,
false);
return buf.getLastModificationTime();
}
catch (...)
{
return std::nullopt;
}
}
StorageURL::StorageURL( StorageURL::StorageURL(
const String & uri_, const String & uri_,
const StorageID & table_id_, const StorageID & table_id_,

View File

@ -8,6 +8,7 @@
#include <IO/ReadWriteBufferFromHTTP.h> #include <IO/ReadWriteBufferFromHTTP.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
namespace DB namespace DB
@ -48,6 +49,8 @@ public:
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr context); ContextPtr context);
static SchemaCache & getSchemaCache(const ContextPtr & context);
protected: protected:
IStorageURLBase( IStorageURLBase(
const String & uri_, const String & uri_,
@ -97,6 +100,27 @@ protected:
private: private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0; virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & urls,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
static void addColumnsToCache(
const Strings & urls,
const ColumnsDescription & columns,
const String & format_name,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
static std::optional<time_t> getLastModificationTime(
const String & url,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const Poco::Net::HTTPBasicCredentials & credentials,
const ContextPtr & context);
}; };
class StorageURLSink : public SinkToStorage class StorageURLSink : public SinkToStorage

View File

@ -0,0 +1,74 @@
#include <Storages/System/StorageSystemSchemaInferenceCache.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <Formats/ReadSchemaUtils.h>
namespace DB
{
static String getSchemaString(const ColumnsDescription & columns)
{
WriteBufferFromOwnString buf;
const auto & names_and_types = columns.getAll();
for (auto it = names_and_types.begin(); it != names_and_types.end(); ++it)
{
if (it != names_and_types.begin())
writeCString(", ", buf);
writeString(it->name, buf);
writeChar(' ', buf);
writeString(it->type->getName(), buf);
}
return buf.str();
}
NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes()
{
return {
{"storage", std::make_shared<DataTypeString>()},
{"source", std::make_shared<DataTypeString>()},
{"format", std::make_shared<DataTypeString>()},
{"additional_format_info", std::make_shared<DataTypeString>()},
{"registration_time", std::make_shared<DataTypeDateTime>()},
{"schema", std::make_shared<DataTypeString>()}
};
}
static void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name)
{
auto s3_schema_cache_data = schema_cache.getAll();
String source;
String format;
String additional_format_info;
for (const auto & [key, schema_info] : s3_schema_cache_data)
{
splitSchemaCacheKey(key, source, format, additional_format_info);
res_columns[0]->insert(storage_name);
res_columns[1]->insert(source);
res_columns[2]->insert(format);
res_columns[3]->insert(additional_format_info);
res_columns[4]->insert(schema_info.registration_time);
res_columns[5]->insert(getSchemaString(schema_info.columns));
}
}
void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File");
#if USE_AWS_S3
fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3");
#endif
#if USE_HDFS
fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS");
#endif
fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL");
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/Cache/SchemaCache.h>
namespace DB
{
class StorageSystemSchemaInferenceCache final : public IStorageSystemOneBlock<StorageSystemSchemaInferenceCache>
{
public:
std::string getName() const override { return "SystemSettingsChanges"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -74,6 +74,7 @@
#include <Storages/System/StorageSystemFilesystemCache.h> #include <Storages/System/StorageSystemFilesystemCache.h>
#include <Storages/System/StorageSystemRemoteDataPaths.h> #include <Storages/System/StorageSystemRemoteDataPaths.h>
#include <Storages/System/StorageSystemCertificates.h> #include <Storages/System/StorageSystemCertificates.h>
#include <Storages/System/StorageSystemSchemaInferenceCache.h>
#ifdef OS_LINUX #ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h> #include <Storages/System/StorageSystemStackTrace.h>
@ -133,6 +134,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemLicenses>(context, system_database, "licenses"); attach<StorageSystemLicenses>(context, system_database, "licenses");
attach<StorageSystemTimeZones>(context, system_database, "time_zones"); attach<StorageSystemTimeZones>(context, system_database, "time_zones");
attach<StorageSystemBackups>(context, system_database, "backups"); attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
#ifdef OS_LINUX #ifdef OS_LINUX
attach<StorageSystemStackTrace>(context, system_database, "stack_trace"); attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif #endif

View File

@ -50,7 +50,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
{ {
ReadBufferIterator read_buffer_iterator = [&]() ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{ {
return std::make_unique<ReadBufferFromString>(data); return std::make_unique<ReadBufferFromString>(data);
}; };

View File

@ -0,0 +1,9 @@
<?xml version="1.0"?>
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>300</flush_interval_milliseconds>
</query_log>
</clickhouse>

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<schema_inference_cache_max_elements_for_file>2</schema_inference_cache_max_elements_for_file>
</clickhouse>

View File

@ -0,0 +1,151 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/schema_cache.xml",
],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_profile_event_for_query(node, query, profile_event):
node.query("system flush logs")
query = query.replace("'", "\\'")
return int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
)
)
def check_cache_misses(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheMisses"
)
== amount
)
def check_cache_hits(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheHits"
)
== amount
)
def check_cache_invalidations(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheInvalidations"
)
== amount
)
def check_cache_evictions(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheEvictions"
)
== amount
)
def check_cache(node, expected_files):
sources = node.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(
expected_files
)
def test(start_cluster):
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl"])
check_cache_misses(node, "data.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data.jsonl')")
check_cache_invalidations(node, "data.jsonl")
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data1.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data1.jsonl")
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("insert into function file('data2.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data2.jsonl')")
check_cache(node, ["data1.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data.jsonl")
check_cache_evictions(node, "data.jsonl")
node.query("desc file('data2.jsonl')")
check_cache(node, ["data.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data3.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data*.jsonl')")
check_cache_hits(node, "data*.jsonl")
node.query("system drop schema cache for file")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)
node.query("system drop schema cache")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)

View File

@ -0,0 +1,3 @@
<clickhouse>
<schema_inference_cache_max_elements_for_hdfs>2</schema_inference_cache_max_elements_for_hdfs>
</clickhouse>

View File

@ -1,13 +1,16 @@
import os import os
import pytest import pytest
import time
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance( node1 = cluster.add_instance(
"node1", main_configs=["configs/macro.xml"], with_hdfs=True "node1",
main_configs=["configs/macro.xml", "configs/schema_cache.xml"],
with_hdfs=True,
) )
@ -550,7 +553,9 @@ def test_schema_inference_with_globs(started_cluster):
filename = "data{1,3}.jsoncompacteachrow" filename = "data{1,3}.jsoncompacteachrow"
result = node1.query_and_get_error(f"desc hdfs('hdfs://hdfs1:9000/{filename}')") result = node1.query_and_get_error(
f"desc hdfs('hdfs://hdfs1:9000/{filename}') settings schema_inference_use_cache_for_hdfs=0"
)
assert "All attempts to extract table structure from files failed" in result assert "All attempts to extract table structure from files failed" in result
@ -558,10 +563,8 @@ def test_schema_inference_with_globs(started_cluster):
f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'" f"insert into table function hdfs('hdfs://hdfs1:9000/data0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
) )
url_filename = "data{0,1,2,3}.jsoncompacteachrow"
result = node1.query_and_get_error( result = node1.query_and_get_error(
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')" f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings schema_inference_use_cache_for_hdfs=0"
) )
assert ( assert (
@ -628,6 +631,158 @@ def test_virtual_columns_2(started_cluster):
assert result.strip() == "kek" assert result.strip() == "kek"
def get_profile_event_for_query(node, query, profile_event):
node.query("system flush logs")
query = query.replace("'", "\\'")
return int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
)
)
def check_cache_misses(node1, file, amount=1):
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheMisses",
)
== amount
)
def check_cache_hits(node1, file, amount=1):
assert (
get_profile_event_for_query(
node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')", "SchemaInferenceCacheHits"
)
== amount
)
def check_cache_invalidations(node1, file, amount=1):
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheInvalidations",
)
== amount
)
def check_cache_evictions(node1, file, amount=1):
assert (
get_profile_event_for_query(
node1,
f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheEvictions",
)
== amount
)
def check_cache(node1, expected_files):
sources = node1.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(
expected_files
)
def run_describe_query(node, file):
query = f"desc hdfs('hdfs://hdfs1:9000/{file}')"
node.query(query)
def test_schema_inference_cache(started_cluster):
node1.query("system drop schema cache")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache0.jsonl")
check_cache_invalidations(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache1.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache1.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache1.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
check_cache_evictions(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
files = "test_cache{0,1,2,3}.jsonl"
run_describe_query(node1, files)
check_cache_hits(node1, files)
node1.query(f"system drop schema cache for hdfs")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
node1.query("system drop schema cache")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
if __name__ == "__main__": if __name__ == "__main__":
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,4 @@
<clickhouse>
<schema_inference_cache_max_elements_for_s3>2</schema_inference_cache_max_elements_for_s3>
<schema_inference_cache_max_elements_for_url>2</schema_inference_cache_max_elements_for_url>
</clickhouse>

View File

@ -98,7 +98,11 @@ def started_cluster():
cluster.add_instance( cluster.add_instance(
"dummy", "dummy",
with_minio=True, with_minio=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"], main_configs=[
"configs/defaultS3.xml",
"configs/named_collections.xml",
"configs/schema_cache.xml",
],
) )
cluster.add_instance( cluster.add_instance(
"s3_max_redirects", "s3_max_redirects",
@ -1331,13 +1335,13 @@ def test_schema_inference_from_globs(started_cluster):
url_filename = "test{1,3}.jsoncompacteachrow" url_filename = "test{1,3}.jsoncompacteachrow"
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_s3=0"
) )
assert "All attempts to extract table structure from files failed" in result assert "All attempts to extract table structure from files failed" in result
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0"
) )
assert "All attempts to extract table structure from files failed" in result assert "All attempts to extract table structure from files failed" in result
@ -1347,7 +1351,7 @@ def test_schema_inference_from_globs(started_cluster):
) )
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings schema_inference_use_cache_for_s3=0"
) )
assert ( assert (
@ -1357,7 +1361,7 @@ def test_schema_inference_from_globs(started_cluster):
url_filename = "test{0,1,2,3}.jsoncompacteachrow" url_filename = "test{0,1,2,3}.jsoncompacteachrow"
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')" f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0"
) )
assert ( assert (
@ -1482,3 +1486,213 @@ def test_wrong_format_usage(started_cluster):
) )
assert "Not a Parquet file" in result assert "Not a Parquet file" in result
def get_profile_event_for_query(instance, query, profile_event):
instance.query("system flush logs")
time.sleep(0.5)
query = query.replace("'", "\\'")
return int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
)
)
def check_cache_misses(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheMisses")
== amount
)
def check_cache_hits(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheHits")
== amount
)
def check_cache_invalidations(
instance, file, storage_name, started_cluster, bucket, amount=1
):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(
instance, query, "SchemaInferenceCacheInvalidations"
)
== amount
)
def check_cache_evictions(
instance, file, storage_name, started_cluster, bucket, amount=1
):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheEvictions")
== amount
)
def run_describe_query(instance, file, storage_name, started_cluster, bucket):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
instance.query(query)
def check_cache(instance, expected_files):
sources = instance.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(
expected_files
)
def test_schema_inference_cache(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
def test(storage_name):
instance.query("system drop schema cache")
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache(instance, ["test_cache0.jsonl"])
check_cache_misses(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache_invalidations(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache(instance, ["test_cache1.jsonl", "test_cache2.jsonl"])
check_cache_misses(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache_evictions(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache1.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache_evictions(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache(instance, ["test_cache0.jsonl", "test_cache2.jsonl"])
check_cache_misses(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache_evictions(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache2.jsonl", storage_name, started_cluster, bucket
)
run_describe_query(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
check_cache_hits(
instance, "test_cache0.jsonl", storage_name, started_cluster, bucket
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
files = "test_cache{0,1,2,3}.jsonl"
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache_hits(instance, files, storage_name, started_cluster, bucket)
instance.query(f"system drop schema cache for {storage_name}")
check_cache(instance, [])
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4)
instance.query("system drop schema cache")
check_cache(instance, [])
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4)
test("s3")
test("url")

View File

@ -94,6 +94,7 @@ SYSTEM DROP UNCOMPRESSED CACHE ['SYSTEM DROP UNCOMPRESSED','DROP UNCOMPRESSED CA
SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP COMPILED EXPRESSION CACHE','DROP COMPILED EXPRESSIONS'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP COMPILED EXPRESSION CACHE','DROP COMPILED EXPRESSIONS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM
SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD SYMBOLS ['RELOAD SYMBOLS'] GLOBAL SYSTEM RELOAD SYSTEM RELOAD SYMBOLS ['RELOAD SYMBOLS'] GLOBAL SYSTEM RELOAD

View File

@ -277,7 +277,7 @@ CREATE TABLE system.grants
( (
`user_name` Nullable(String), `user_name` Nullable(String),
`role_name` Nullable(String), `role_name` Nullable(String),
`access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147), `access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148),
`database` Nullable(String), `database` Nullable(String),
`table` Nullable(String), `table` Nullable(String),
`column` Nullable(String), `column` Nullable(String),
@ -551,10 +551,10 @@ ENGINE = SystemPartsColumns
COMMENT 'SYSTEM TABLE is built on the fly.' COMMENT 'SYSTEM TABLE is built on the fly.'
CREATE TABLE system.privileges CREATE TABLE system.privileges
( (
`privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147), `privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148),
`aliases` Array(String), `aliases` Array(String),
`level` Nullable(Enum8('GLOBAL' = 0, 'DATABASE' = 1, 'TABLE' = 2, 'DICTIONARY' = 3, 'VIEW' = 4, 'COLUMN' = 5)), `level` Nullable(Enum8('GLOBAL' = 0, 'DATABASE' = 1, 'TABLE' = 2, 'DICTIONARY' = 3, 'VIEW' = 4, 'COLUMN' = 5)),
`parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP CACHE' = 96, 'SYSTEM RELOAD CONFIG' = 97, 'SYSTEM RELOAD SYMBOLS' = 98, 'SYSTEM RELOAD DICTIONARY' = 99, 'SYSTEM RELOAD MODEL' = 100, 'SYSTEM RELOAD FUNCTION' = 101, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 102, 'SYSTEM RELOAD' = 103, 'SYSTEM RESTART DISK' = 104, 'SYSTEM MERGES' = 105, 'SYSTEM TTL MERGES' = 106, 'SYSTEM FETCHES' = 107, 'SYSTEM MOVES' = 108, 'SYSTEM DISTRIBUTED SENDS' = 109, 'SYSTEM REPLICATED SENDS' = 110, 'SYSTEM SENDS' = 111, 'SYSTEM REPLICATION QUEUES' = 112, 'SYSTEM DROP REPLICA' = 113, 'SYSTEM SYNC REPLICA' = 114, 'SYSTEM RESTART REPLICA' = 115, 'SYSTEM RESTORE REPLICA' = 116, 'SYSTEM SYNC DATABASE REPLICA' = 117, 'SYSTEM SYNC TRANSACTION LOG' = 118, 'SYSTEM FLUSH DISTRIBUTED' = 119, 'SYSTEM FLUSH LOGS' = 120, 'SYSTEM FLUSH' = 121, 'SYSTEM THREAD FUZZER' = 122, 'SYSTEM UNFREEZE' = 123, 'SYSTEM' = 124, 'dictGet' = 125, 'addressToLine' = 126, 'addressToLineWithInlines' = 127, 'addressToSymbol' = 128, 'demangle' = 129, 'INTROSPECTION' = 130, 'FILE' = 131, 'URL' = 132, 'REMOTE' = 133, 'MONGO' = 134, 'MEILISEARCH' = 135, 'MYSQL' = 136, 'POSTGRES' = 137, 'SQLITE' = 138, 'ODBC' = 139, 'JDBC' = 140, 'HDFS' = 141, 'S3' = 142, 'HIVE' = 143, 'SOURCES' = 144, 'CLUSTER' = 145, 'ALL' = 146, 'NONE' = 147)) `parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER TABLE' = 41, 'ALTER DATABASE' = 42, 'ALTER VIEW REFRESH' = 43, 'ALTER VIEW MODIFY QUERY' = 44, 'ALTER VIEW' = 45, 'ALTER' = 46, 'CREATE DATABASE' = 47, 'CREATE TABLE' = 48, 'CREATE VIEW' = 49, 'CREATE DICTIONARY' = 50, 'CREATE TEMPORARY TABLE' = 51, 'CREATE FUNCTION' = 52, 'CREATE' = 53, 'DROP DATABASE' = 54, 'DROP TABLE' = 55, 'DROP VIEW' = 56, 'DROP DICTIONARY' = 57, 'DROP FUNCTION' = 58, 'DROP' = 59, 'TRUNCATE' = 60, 'OPTIMIZE' = 61, 'BACKUP' = 62, 'KILL QUERY' = 63, 'KILL TRANSACTION' = 64, 'MOVE PARTITION BETWEEN SHARDS' = 65, 'CREATE USER' = 66, 'ALTER USER' = 67, 'DROP USER' = 68, 'CREATE ROLE' = 69, 'ALTER ROLE' = 70, 'DROP ROLE' = 71, 'ROLE ADMIN' = 72, 'CREATE ROW POLICY' = 73, 'ALTER ROW POLICY' = 74, 'DROP ROW POLICY' = 75, 'CREATE QUOTA' = 76, 'ALTER QUOTA' = 77, 'DROP QUOTA' = 78, 'CREATE SETTINGS PROFILE' = 79, 'ALTER SETTINGS PROFILE' = 80, 'DROP SETTINGS PROFILE' = 81, 'SHOW USERS' = 82, 'SHOW ROLES' = 83, 'SHOW ROW POLICIES' = 84, 'SHOW QUOTAS' = 85, 'SHOW SETTINGS PROFILES' = 86, 'SHOW ACCESS' = 87, 'ACCESS MANAGEMENT' = 88, 'SYSTEM SHUTDOWN' = 89, 'SYSTEM DROP DNS CACHE' = 90, 'SYSTEM DROP MARK CACHE' = 91, 'SYSTEM DROP UNCOMPRESSED CACHE' = 92, 'SYSTEM DROP MMAP CACHE' = 93, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 94, 'SYSTEM DROP FILESYSTEM CACHE' = 95, 'SYSTEM DROP SCHEMA CACHE' = 96, 'SYSTEM DROP CACHE' = 97, 'SYSTEM RELOAD CONFIG' = 98, 'SYSTEM RELOAD SYMBOLS' = 99, 'SYSTEM RELOAD DICTIONARY' = 100, 'SYSTEM RELOAD MODEL' = 101, 'SYSTEM RELOAD FUNCTION' = 102, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 103, 'SYSTEM RELOAD' = 104, 'SYSTEM RESTART DISK' = 105, 'SYSTEM MERGES' = 106, 'SYSTEM TTL MERGES' = 107, 'SYSTEM FETCHES' = 108, 'SYSTEM MOVES' = 109, 'SYSTEM DISTRIBUTED SENDS' = 110, 'SYSTEM REPLICATED SENDS' = 111, 'SYSTEM SENDS' = 112, 'SYSTEM REPLICATION QUEUES' = 113, 'SYSTEM DROP REPLICA' = 114, 'SYSTEM SYNC REPLICA' = 115, 'SYSTEM RESTART REPLICA' = 116, 'SYSTEM RESTORE REPLICA' = 117, 'SYSTEM SYNC DATABASE REPLICA' = 118, 'SYSTEM SYNC TRANSACTION LOG' = 119, 'SYSTEM FLUSH DISTRIBUTED' = 120, 'SYSTEM FLUSH LOGS' = 121, 'SYSTEM FLUSH' = 122, 'SYSTEM THREAD FUZZER' = 123, 'SYSTEM UNFREEZE' = 124, 'SYSTEM' = 125, 'dictGet' = 126, 'addressToLine' = 127, 'addressToLineWithInlines' = 128, 'addressToSymbol' = 129, 'demangle' = 130, 'INTROSPECTION' = 131, 'FILE' = 132, 'URL' = 133, 'REMOTE' = 134, 'MONGO' = 135, 'MEILISEARCH' = 136, 'MYSQL' = 137, 'POSTGRES' = 138, 'SQLITE' = 139, 'ODBC' = 140, 'JDBC' = 141, 'HDFS' = 142, 'S3' = 143, 'HIVE' = 144, 'SOURCES' = 145, 'CLUSTER' = 146, 'ALL' = 147, 'NONE' = 148))
) )
ENGINE = SystemPrivileges ENGINE = SystemPrivileges
COMMENT 'SYSTEM TABLE is built on the fly.' COMMENT 'SYSTEM TABLE is built on the fly.'

View File

@ -7,4 +7,4 @@ select * from file('02267_data*.jsonl') order by x;
insert into function file('02267_data1.jsonl', 'TSV') select 1 as x; insert into function file('02267_data1.jsonl', 'TSV') select 1 as x;
insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x; insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x;
select * from file('02267_data*.jsonl'); --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE} select * from file('02267_data*.jsonl') settings schema_inference_use_cache_for_file=0; --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE}

View File

@ -0,0 +1,12 @@
storage String
source String
format String
additional_format_info String
registration_time DateTime
schema String
x Nullable(Int64)
s Nullable(String)
x Nullable(Int64)
s Nullable(String)
File 02374_data1.jsonl JSONEachRow x Nullable(Int64), s Nullable(String)
File 02374_data2.jsonl JSONEachRow x Nullable(Int64), s Nullable(String)

View File

@ -0,0 +1,14 @@
-- Tags: no-fasttest
insert into function file('02374_data1.jsonl') select number as x, 'str' as s from numbers(10);
insert into function file('02374_data2.jsonl') select number as x, 'str' as s from numbers(10);
desc system.schema_inference_cache;
system drop schema cache for file;
desc file('02374_data1.jsonl');
desc file('02374_data2.jsonl');
select storage, splitByChar('/', source)[-1], format, schema from system.schema_inference_cache where storage='File';
system drop schema cache for file;
select storage, source, format, schema from system.schema_inference_cache where storage='File';