mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Add some additional information to cache keys
This commit is contained in:
parent
86e8f31ad4
commit
5155262a16
@ -558,6 +558,25 @@ bool FormatFactory::checkIfFormatSupportsSubsetOfColumns(const String & name) co
|
||||
return target.supports_subset_of_columns;
|
||||
}
|
||||
|
||||
void FormatFactory::registerAdditionalInfoForSchemaCacheGetter(
|
||||
const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter)
|
||||
{
|
||||
auto & target = dict[name].additional_info_for_schema_cache_getter;
|
||||
if (target)
|
||||
throw Exception("FormatFactory: additional info for schema cache getter " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
|
||||
target = std::move(additional_info_for_schema_cache_getter);
|
||||
}
|
||||
|
||||
String FormatFactory::getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
|
||||
{
|
||||
const auto & additional_info_getter = getCreators(name).additional_info_for_schema_cache_getter;
|
||||
if (!additional_info_getter)
|
||||
return "";
|
||||
|
||||
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
|
||||
return additional_info_getter(format_settings);
|
||||
}
|
||||
|
||||
bool FormatFactory::isInputFormat(const String & name) const
|
||||
{
|
||||
auto it = dict.find(name);
|
||||
|
@ -100,6 +100,13 @@ private:
|
||||
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings)>;
|
||||
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
|
||||
|
||||
/// Some formats can extract different schemas from the same source depending on
|
||||
/// some settings. To process this case in schema cache we should add some additional
|
||||
/// information to a cache key. This getter should return some string with information
|
||||
/// about such settings. For example, for Protobuf format it's the path to the schema
|
||||
/// and the name of the message.
|
||||
using AdditionalInfoForSchemaCacheGetter = std::function<String(const FormatSettings & settings)>;
|
||||
|
||||
struct Creators
|
||||
{
|
||||
InputCreator input_creator;
|
||||
@ -111,6 +118,7 @@ private:
|
||||
bool supports_subset_of_columns{false};
|
||||
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
|
||||
AppendSupportChecker append_support_checker;
|
||||
AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter;
|
||||
};
|
||||
|
||||
using FormatsDictionary = std::unordered_map<String, Creators>;
|
||||
@ -202,6 +210,9 @@ public:
|
||||
bool checkIfFormatHasExternalSchemaReader(const String & name) const;
|
||||
bool checkIfFormatHasAnySchemaReader(const String & name) const;
|
||||
|
||||
void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter);
|
||||
String getAdditionalInfoForSchemaCache(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
|
||||
|
||||
const FormatsDictionary & getAllFormats() const
|
||||
{
|
||||
return dict;
|
||||
|
@ -221,4 +221,22 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header)
|
||||
return result;
|
||||
}
|
||||
|
||||
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
{
|
||||
return getKeysForSchemaCache({source}, format, format_settings, context).front();
|
||||
}
|
||||
|
||||
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
{
|
||||
/// For some formats data schema depends on some settings, so it's possible that
|
||||
/// two queries to the same source will get two different schemas. To process this
|
||||
/// case we add som additional information specific for the format to the cache key.
|
||||
/// For example, for Protobuf format additional information is the path to the schema
|
||||
/// and message name.
|
||||
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
|
||||
Strings cache_keys;
|
||||
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return source + format + additional_format_info; });
|
||||
return cache_keys;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -46,4 +46,8 @@ DataTypePtr makeNullableRecursivelyAndCheckForNothing(DataTypePtr type);
|
||||
/// Call makeNullableRecursivelyAndCheckForNothing for all types
|
||||
/// in the block and return names and types.
|
||||
NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header);
|
||||
|
||||
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
|
||||
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
|
||||
|
||||
}
|
||||
|
@ -312,6 +312,8 @@ void registerInputFormatCapnProto(FormatFactory & factory)
|
||||
});
|
||||
factory.markFormatSupportsSubsetOfColumns("CapnProto");
|
||||
factory.registerFileExtension("capnp", "CapnProto");
|
||||
factory.registerAdditionalInfoForSchemaCacheGetter(
|
||||
"CapnProto", [](const FormatSettings & settings) { return settings.schema.format_schema; });
|
||||
}
|
||||
|
||||
void registerCapnProtoSchemaReader(FormatFactory & factory)
|
||||
|
@ -452,6 +452,9 @@ void registerInputFormatMySQLDump(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<MySQLDumpRowInputFormat>(buf, header, params, settings);
|
||||
});
|
||||
|
||||
factory.registerAdditionalInfoForSchemaCacheGetter(
|
||||
"MySQLDump", [](const FormatSettings & settings) { return settings.mysql_dump.table_name; });
|
||||
}
|
||||
|
||||
void registerMySQLSchemaReader(FormatFactory & factory)
|
||||
|
@ -80,6 +80,8 @@ void registerInputFormatProtobufList(FormatFactory & factory)
|
||||
FormatSchemaInfo(settings, "Protobuf", true), settings.protobuf.input_flatten_google_wrappers);
|
||||
});
|
||||
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
|
||||
factory.registerAdditionalInfoForSchemaCacheGetter(
|
||||
"ProtobufList", [](const FormatSettings & settings) { return settings.schema.format_schema; });
|
||||
}
|
||||
|
||||
void registerProtobufListSchemaReader(FormatFactory & factory)
|
||||
|
@ -101,6 +101,10 @@ void registerProtobufSchemaReader(FormatFactory & factory)
|
||||
{
|
||||
return std::make_shared<ProtobufSchemaReader>(settings);
|
||||
});
|
||||
|
||||
for (const auto & name : {"Protobuf", "ProtobufSingle"})
|
||||
factory.registerAdditionalInfoForSchemaCacheGetter(
|
||||
name, [](const FormatSettings & settings) { return settings.schema.format_schema; });
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ public:
|
||||
if (!last_mod_time)
|
||||
return std::nullopt;
|
||||
|
||||
if (*last_mod_time > schema_info.registration_time)
|
||||
if (*last_mod_time >= schema_info.registration_time)
|
||||
{
|
||||
/// Object was modified after it was added in cache.
|
||||
/// So, stored value is no more valid and we should remove it.
|
||||
|
@ -197,7 +197,7 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
|
||||
std::optional<ColumnsDescription> columns_from_cache;
|
||||
if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference)
|
||||
columns_from_cache = tryGetColumnsFromCache(paths, last_mod_time);
|
||||
columns_from_cache = tryGetColumnsFromCache(paths, last_mod_time, format, ctx);
|
||||
|
||||
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
||||
{
|
||||
@ -216,7 +216,7 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
|
||||
columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
|
||||
|
||||
if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference)
|
||||
addColumnsToCache(paths, columns, ctx);
|
||||
addColumnsToCache(paths, columns, format, ctx);
|
||||
|
||||
return columns;
|
||||
}
|
||||
@ -725,7 +725,11 @@ SchemaCache & StorageHDFS::getSchemaCache()
|
||||
return schema_cache;
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(const Strings & paths, std::unordered_map<String, time_t> & last_mod_time)
|
||||
std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(
|
||||
const Strings & paths,
|
||||
std::unordered_map<String, time_t> & last_mod_time,
|
||||
const String & format_name,
|
||||
const ContextPtr & ctx)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
for (const auto & path : paths)
|
||||
@ -738,7 +742,8 @@ std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(const Stri
|
||||
return it->second;
|
||||
};
|
||||
|
||||
auto columns = schema_cache.tryGet(path, get_last_mod_time);
|
||||
String cache_key = getKeyForSchemaCache(path, format_name, {}, ctx);
|
||||
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
|
||||
if (columns)
|
||||
return columns;
|
||||
}
|
||||
@ -746,10 +751,15 @@ std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(const Stri
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void StorageHDFS::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx)
|
||||
void StorageHDFS::addColumnsToCache(
|
||||
const Strings & paths,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const ContextPtr & ctx)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
schema_cache.addMany(paths, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds());
|
||||
Strings cache_keys = getKeysForSchemaCache(paths, format_name, {}, ctx);
|
||||
schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -71,8 +71,18 @@ protected:
|
||||
|
||||
private:
|
||||
static SchemaCache & getSchemaCache();
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & paths, std::unordered_map<String, time_t> & last_mod_time);
|
||||
static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx);
|
||||
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
|
||||
const Strings & paths,
|
||||
std::unordered_map<String, time_t> & last_mod_time,
|
||||
const String & format_name,
|
||||
const ContextPtr & ctx);
|
||||
|
||||
static void addColumnsToCache(
|
||||
const Strings & paths,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const ContextPtr & ctx);
|
||||
|
||||
std::vector<const String> uris;
|
||||
String format_name;
|
||||
|
@ -309,7 +309,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
|
||||
|
||||
std::optional<ColumnsDescription> columns_from_cache;
|
||||
if (context->getSettingsRef().use_cache_for_file_schema_inference)
|
||||
columns_from_cache = tryGetColumnsFromCache(paths);
|
||||
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
|
||||
|
||||
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
||||
{
|
||||
@ -326,7 +326,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
|
||||
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
|
||||
|
||||
if (context->getSettingsRef().use_cache_for_file_schema_inference)
|
||||
addColumnsToCache(paths, columns, context);
|
||||
addColumnsToCache(paths, columns, format, format_settings, context);
|
||||
|
||||
return columns;
|
||||
}
|
||||
@ -1211,7 +1211,8 @@ SchemaCache & StorageFile::getSchemaCache()
|
||||
return schema_cache;
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(const Strings & paths)
|
||||
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
|
||||
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
|
||||
{
|
||||
/// Check if the cache contains one of the paths.
|
||||
auto & schema_cache = getSchemaCache();
|
||||
@ -1226,7 +1227,8 @@ std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(const Stri
|
||||
return file_stat.st_mtim.tv_sec;
|
||||
};
|
||||
|
||||
auto columns = schema_cache.tryGet(path, get_last_mod_time);
|
||||
String cache_key = getKeyForSchemaCache(path, format_name, format_settings, context);
|
||||
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
|
||||
if (columns)
|
||||
return columns;
|
||||
}
|
||||
@ -1234,10 +1236,16 @@ std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(const Stri
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void StorageFile::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context)
|
||||
void StorageFile::addColumnsToCache(
|
||||
const Strings & paths,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
schema_cache.addMany(paths, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds());
|
||||
Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
|
||||
schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -95,8 +95,16 @@ private:
|
||||
void setStorageMetadata(CommonArguments args);
|
||||
|
||||
static SchemaCache & getSchemaCache();
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & paths);
|
||||
static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context);
|
||||
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
|
||||
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context);
|
||||
|
||||
static void addColumnsToCache(
|
||||
const Strings & paths,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context);
|
||||
|
||||
std::string format_name;
|
||||
// We use format settings from global context + CREATE query for File table
|
||||
|
@ -227,7 +227,7 @@ private:
|
||||
|
||||
/// It returns false when all objects were returned
|
||||
is_finished = !outcome.GetResult().GetIsTruncated();
|
||||
|
||||
|
||||
if (read_keys)
|
||||
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
|
||||
}
|
||||
@ -1186,7 +1186,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
std::optional<ColumnsDescription> columns_from_cache;
|
||||
size_t prev_read_keys_size = read_keys.size();
|
||||
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference)
|
||||
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos);
|
||||
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx);
|
||||
|
||||
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
|
||||
{
|
||||
@ -1207,7 +1207,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
|
||||
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference && read_keys.size() > prev_read_keys_size)
|
||||
{
|
||||
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos);
|
||||
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx);
|
||||
prev_read_keys_size = read_keys.size();
|
||||
if (columns_from_cache)
|
||||
{
|
||||
@ -1232,7 +1232,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
|
||||
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
|
||||
|
||||
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference)
|
||||
addColumnsToCache(read_keys, s3_configuration, columns, ctx);
|
||||
addColumnsToCache(read_keys, s3_configuration, columns, format, format_settings, ctx);
|
||||
|
||||
if (distributed_processing && read_keys_in_distributed_processing)
|
||||
*read_keys_in_distributed_processing = std::move(read_keys);
|
||||
@ -1332,13 +1332,19 @@ SchemaCache & StorageS3::getSchemaCache()
|
||||
return schema_cache;
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map<String, S3::ObjectInfo> * object_infos)
|
||||
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
|
||||
const Strings::const_iterator & begin,
|
||||
const Strings::const_iterator & end,
|
||||
const S3Configuration & s3_configuration,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
for (auto it = begin; it < end; ++it)
|
||||
{
|
||||
String path = fs::path(s3_configuration.uri.bucket) / *it;
|
||||
String cache_key = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path;
|
||||
auto get_last_mod_time = [&]() -> std::optional<time_t>
|
||||
{
|
||||
S3::ObjectInfo info;
|
||||
@ -1361,6 +1367,8 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(const String
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
String source = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path;
|
||||
String cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
|
||||
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
|
||||
if (columns)
|
||||
return columns;
|
||||
@ -1369,13 +1377,20 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(const String
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void StorageS3::addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & ctx)
|
||||
void StorageS3::addColumnsToCache(
|
||||
const Strings & keys,
|
||||
const S3Configuration & s3_configuration,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx)
|
||||
{
|
||||
auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket;
|
||||
Strings objects;
|
||||
std::transform(keys.begin(), keys.end(), std::back_inserter(objects), [&](const String & key){ return host_and_bucket / key; });
|
||||
Strings sources;
|
||||
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; });
|
||||
Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
|
||||
auto & schema_cache = getSchemaCache();
|
||||
schema_cache.addMany(objects, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds());
|
||||
schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -260,8 +260,23 @@ private:
|
||||
bool supportsSubsetOfColumns() const override;
|
||||
|
||||
static SchemaCache & getSchemaCache();
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map<String, S3::ObjectInfo> * object_infos);
|
||||
static void addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & context);
|
||||
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
|
||||
const Strings::const_iterator & begin,
|
||||
const Strings::const_iterator & end,
|
||||
const S3Configuration & s3_configuration,
|
||||
std::unordered_map<String, S3::ObjectInfo> * object_infos,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx);
|
||||
|
||||
static void addColumnsToCache(
|
||||
const Strings & keys,
|
||||
const S3Configuration & s3_configuration,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -574,7 +574,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
||||
|
||||
std::optional<ColumnsDescription> columns_from_cache;
|
||||
if (context->getSettingsRef().use_cache_for_url_schema_inference)
|
||||
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, context);
|
||||
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, format, format_settings, context);
|
||||
|
||||
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
|
||||
{
|
||||
@ -606,7 +606,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
|
||||
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
|
||||
|
||||
if (context->getSettingsRef().use_cache_for_url_schema_inference)
|
||||
addColumnsToCache(urls_to_check, columns, context);
|
||||
addColumnsToCache(urls_to_check, columns, format, format_settings, context);
|
||||
|
||||
return columns;
|
||||
}
|
||||
@ -795,7 +795,12 @@ SchemaCache & IStorageURLBase::getSchemaCache()
|
||||
return schema_cache;
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(const Strings & urls, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, const ContextPtr & context)
|
||||
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
|
||||
const Strings & urls,
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
for (const auto & url : urls)
|
||||
@ -811,7 +816,8 @@ std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(const
|
||||
return last_mod_time;
|
||||
};
|
||||
|
||||
auto columns = schema_cache.tryGet(url, get_last_mod_time);
|
||||
String cache_key = getKeyForSchemaCache(url, format_name, format_settings, context);
|
||||
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
|
||||
if (columns)
|
||||
return columns;
|
||||
}
|
||||
@ -819,10 +825,16 @@ std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(const
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void IStorageURLBase::addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context)
|
||||
void IStorageURLBase::addColumnsToCache(
|
||||
const Strings & urls,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context)
|
||||
{
|
||||
auto & schema_cache = getSchemaCache();
|
||||
schema_cache.addMany(urls, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds());
|
||||
Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
|
||||
schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds());
|
||||
}
|
||||
|
||||
std::optional<time_t> IStorageURLBase::getLastModificationTime(const String & url, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, const ContextPtr & context)
|
||||
|
@ -100,8 +100,21 @@ private:
|
||||
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
|
||||
|
||||
static SchemaCache & getSchemaCache();
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & urls, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, const ContextPtr & context);
|
||||
static void addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context);
|
||||
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
|
||||
const Strings & urls,
|
||||
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context);
|
||||
|
||||
static void addColumnsToCache(
|
||||
const Strings & urls,
|
||||
const ColumnsDescription & columns,
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & context);
|
||||
|
||||
static std::optional<time_t> getLastModificationTime(const String & url, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, const ContextPtr & context);
|
||||
};
|
||||
|
||||
|
@ -18,67 +18,101 @@ def start_cluster():
|
||||
def get_profile_event_for_query(node, query, profile_event):
|
||||
node.query('system flush logs')
|
||||
query = query.replace("'", "\\'")
|
||||
return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
|
||||
return int(
|
||||
node.query(
|
||||
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test(start_cluster):
|
||||
node.query("insert into function file('data.jsonl') select * from numbers(100)")
|
||||
desc_query = "desc file('data.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
desc_query = "desc file('data.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
node.query("insert into function file('data.jsonl') select * from numbers(100)")
|
||||
desc_query = "desc file('data.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
|
||||
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
|
||||
desc_query = (
|
||||
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
|
||||
)
|
||||
node.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000"
|
||||
desc_query = (
|
||||
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000"
|
||||
)
|
||||
node.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = "desc file('data1.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_updates = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLUpdates')
|
||||
cache_ttl_updates = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheTTLUpdates'
|
||||
)
|
||||
assert cache_ttl_updates == 1
|
||||
|
||||
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
|
||||
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
|
||||
desc_query = (
|
||||
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
|
||||
)
|
||||
node.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = "desc file('data.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = "desc file('data1.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
node.query("insert into function file('data2.jsonl') select * from numbers(100)")
|
||||
@ -86,15 +120,21 @@ def test(start_cluster):
|
||||
|
||||
desc_query = "desc file('data*.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = "desc file('data2.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = "desc file('data3.jsonl')"
|
||||
node.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
@ -562,7 +562,7 @@ def test_schema_inference_with_globs(started_cluster):
|
||||
url_filename = "data{0,1,2,3}.jsoncompacteachrow"
|
||||
|
||||
result = node1.query_and_get_error(
|
||||
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')"
|
||||
f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow') settings use_cache_for_hdfs_schema_inference=0"
|
||||
)
|
||||
|
||||
assert (
|
||||
@ -630,9 +630,13 @@ def test_virtual_columns_2(started_cluster):
|
||||
|
||||
|
||||
def get_profile_event_for_query(node, query, profile_event):
|
||||
node.query('system flush logs')
|
||||
node.query("system flush logs")
|
||||
query = query.replace("'", "\\'")
|
||||
return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
|
||||
return int(
|
||||
node.query(
|
||||
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_schema_inference_cache(started_cluster):
|
||||
@ -641,12 +645,16 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
node1.query(
|
||||
@ -654,7 +662,9 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
node1.query(
|
||||
@ -662,23 +672,33 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
|
||||
node1.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1000"
|
||||
node1.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_updates = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLUpdates')
|
||||
cache_ttl_updates = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheTTLUpdates'
|
||||
)
|
||||
assert cache_ttl_updates == 1
|
||||
|
||||
node1.query(
|
||||
@ -686,7 +706,9 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
|
||||
node1.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
time.sleep(2)
|
||||
@ -695,7 +717,9 @@ def test_schema_inference_cache(started_cluster):
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
assert cache_hits == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
|
||||
@ -703,22 +727,32 @@ def test_schema_inference_cache(started_cluster):
|
||||
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
|
||||
assert cache_misses == 1
|
||||
|
||||
node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1")
|
||||
node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1")
|
||||
node1.query(
|
||||
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
|
||||
)
|
||||
node1.query(
|
||||
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
|
||||
)
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache*.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache2.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache3.jsonl')"
|
||||
node1.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
node1, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
|
||||
|
@ -1330,13 +1330,13 @@ def test_schema_inference_from_globs(started_cluster):
|
||||
url_filename = "test{1,3}.jsoncompacteachrow"
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
|
||||
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings use_cache_for_s3_schema_inference=0"
|
||||
)
|
||||
|
||||
assert "All attempts to extract table structure from files failed" in result
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
|
||||
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings use_cache_for_url_schema_inference=0"
|
||||
)
|
||||
|
||||
assert "All attempts to extract table structure from files failed" in result
|
||||
@ -1346,7 +1346,7 @@ def test_schema_inference_from_globs(started_cluster):
|
||||
)
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')"
|
||||
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings use_cache_for_s3_schema_inference=0"
|
||||
)
|
||||
|
||||
assert (
|
||||
@ -1356,7 +1356,7 @@ def test_schema_inference_from_globs(started_cluster):
|
||||
url_filename = "test{0,1,2,3}.jsoncompacteachrow"
|
||||
|
||||
result = instance.query_and_get_error(
|
||||
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')"
|
||||
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings use_cache_for_url_schema_inference=0"
|
||||
)
|
||||
|
||||
assert (
|
||||
@ -1486,7 +1486,11 @@ def test_wrong_format_usage(started_cluster):
|
||||
def get_profile_event_for_query(instance, query, profile_event):
|
||||
instance.query('system flush logs')
|
||||
query = query.replace("'", "\\'")
|
||||
return int(instance.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
|
||||
return int(
|
||||
instance.query(
|
||||
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_schema_inference_cache(started_cluster):
|
||||
@ -1498,12 +1502,16 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
instance.query(
|
||||
@ -1511,7 +1519,9 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
instance.query(
|
||||
@ -1519,23 +1529,33 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1000"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates')
|
||||
cache_ttl_updates = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLUpdates'
|
||||
)
|
||||
assert cache_ttl_updates == 1
|
||||
|
||||
instance.query(
|
||||
@ -1543,39 +1563,57 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
|
||||
instance.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
|
||||
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
|
||||
)
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
|
||||
)
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache*.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
# Test the same scenarious but for URL table function
|
||||
@ -1585,12 +1623,16 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
instance.query(
|
||||
@ -1598,7 +1640,9 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
instance.query(
|
||||
@ -1606,23 +1650,33 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1000"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates')
|
||||
cache_ttl_updates = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLUpdates'
|
||||
)
|
||||
assert cache_ttl_updates == 1
|
||||
|
||||
instance.query(
|
||||
@ -1630,38 +1684,56 @@ def test_schema_inference_cache(started_cluster):
|
||||
)
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1"
|
||||
instance.query(desc_query)
|
||||
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
|
||||
cache_invalidations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheInvalidations'
|
||||
)
|
||||
assert cache_invalidations == 1
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
|
||||
cache_ttl_expirations = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheTTLExpirations'
|
||||
)
|
||||
assert cache_ttl_expirations == 1
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
|
||||
cache_misses = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheMisses'
|
||||
)
|
||||
assert cache_misses == 1
|
||||
|
||||
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
|
||||
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
|
||||
)
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
|
||||
)
|
||||
|
||||
file_name = 'test_cache{1,2,3}.jsonl'
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name}')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
||||
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
|
||||
instance.query(desc_query)
|
||||
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
|
||||
cache_hits = get_profile_event_for_query(
|
||||
instance, desc_query, 'SchemaInferenceCacheHits'
|
||||
)
|
||||
assert cache_hits == 1
|
||||
|
Loading…
Reference in New Issue
Block a user