Refactor, remove TTL, add size limit, add system table and system query

This commit is contained in:
avogar 2022-08-05 16:20:15 +00:00
parent 381ea139c2
commit 9b1a267203
35 changed files with 726 additions and 637 deletions

View File

@ -19,6 +19,7 @@ Additional cache types:
- Compiled expressions cache.
- [Avro format](../interfaces/formats.md#data-format-avro) schemas cache.
- [Dictionaries](../sql-reference/dictionaries/index.md) data cache.
- Schema inference cache.
Indirectly used:

View File

@ -3351,63 +3351,27 @@ Enable schemas cache for schema inference in `file` table function.
Default value: `true`.
## cache_ttl_for_file_schema_inference {#cache_ttl_for_file_schema_inference}
TTL for schemes in cache in schema inference while using `file` table function. 0 means no ttl.
Schema will be removed from cache if it was not accessed during specified TTL.
Type: seconds.
Default value: `3600 * 24`.
## use_cache_for_s3_schema_inference {use_cache_for_s3_schema_inference}
Enable schemas cache for schema inference in `s3` table function.
Default value: `true`.
## cache_ttl_for_s3_schema_inference {#cache_ttl_for_s3_schema_inference}
TTL for schemes in cache in schema inference while using `s3` table function. 0 means no ttl.
Schema will be removed from cache if it was not accessed during specified TTL.
Type: seconds.
Default value: `3600 * 24`.
## use_cache_for_url_schema_inference {use_cache_for_url_schema_inference}
Enable schemas cache for schema inference in `url` table function.
Default value: `true`.
## cache_ttl_for_url_schema_inference {#cache_ttl_for_url_schema_inference}
TTL for schemes in cache in schema inference while using `url` table function. 0 means no ttl.
Schema will be removed from cache if it was not accessed during specified TTL.
Type: seconds.
Default value: `3600 * 24`.
## use_cache_for_hdfs_schema_inference {use_cache_for_hdfs_schema_inference}
Enable schemas cache for schema inference in `hdfs` table function.
Default value: `true`.
## cache_ttl_for_hdfs_schema_inference {#cache_ttl_for_hdfs_schema_inference}
## schema_inference_cache_require_modification_time_for_url {#schema_inference_cache_require_modification_time_for_url}
TTL for schemes in cache in schema inference while using `hdfs` table function. 0 means no ttl.
Schema will be removed from cache if it was not accessed during specified TTL.
Type: seconds.
Default value: `3600 * 24`.
## allow_urls_without_last_mod_time_in_schema_inference_cache {#allow_urls_without_last_mod_time_in_schema_inference_cache}
Use schema from cache without last modification time validation for urls without `Last-Modified` header.
Use schema from cache for URL with last modification time validation (for urls with Last-Modified header). If this setting is enabled and URL doesn't have Last-Modified header, schema from cache won't be used.
Default value: `true`.

View File

@ -140,6 +140,7 @@ enum class AccessType
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -348,8 +348,7 @@
\
M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \
M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \
M(SchemaInferenceCacheTTLExpirations, "Number of times a schema from cache expires due to TTL") \
M(SchemaInferenceCacheTTLUpdates, "Number of times TTL for schema in cache was updated") \
M(SchemaInferenceCacheEvictions, "Number of times a schema from cache was evicted due to overflow") \
M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data") \
\
M(KeeperPacketsSent, "Packets sent by keeper server") \

View File

@ -614,7 +614,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, cache_ttl_for_hdfs_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using hdfs table function. 0 means no ttl", 0) \
M(Bool, use_cache_for_url_schema_inference, true, "Use cache in schema inference while using url table function", 0) \
M(Seconds, cache_ttl_for_url_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using url table function. 0 means no ttl", 0) \
M(Bool, allow_urls_without_last_mod_time_in_schema_inference_cache, true, "Use schema from cache without last modification time validation for urls without Last-Modified header", 0) \
M(Bool, schema_inference_cache_require_modification_time_for_url, true, "Use schema from cache for URL with last modification time validation (for urls with Last-Modified header)", 0) \
\
M(String, compatibility, "", "Changes other settings according to provided ClickHouse version. If we know that we changed some behaviour in ClickHouse by changing some settings in some version, this compatibility setting will control these settings", 0) \
\

View File

@ -239,6 +239,20 @@ String getKeyForSchemaCache(const String & source, const String & format, const
return getKeysForSchemaCache({source}, format, format_settings, context).front();
}
static String makeSchemaCacheKey(const String & source, const String & format, const String & additional_format_info)
{
return source + "@@" + format + "@@" + additional_format_info;
}
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info)
{
size_t additional_format_info_pos = key.rfind("@@");
additional_format_info = key.substr(additional_format_info_pos + 2, key.size() - additional_format_info_pos - 2);
size_t format_pos = key.rfind("@@", additional_format_info_pos - 1);
format = key.substr(format_pos + 2, additional_format_info_pos - format_pos - 2);
source = key.substr(0, format_pos);
}
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
{
/// For some formats data schema depends on some settings, so it's possible that
@ -249,7 +263,7 @@ Strings getKeysForSchemaCache(const Strings & sources, const String & format, co
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
Strings cache_keys;
cache_keys.reserve(sources.size());
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return source + format + additional_format_info; });
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); });
return cache_keys;
}

View File

@ -50,4 +50,5 @@ NamesAndTypesList getNamesAndRecursivelyNullableTypes(const Block & header);
String getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
Strings getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context);
void splitSchemaCacheKey(const String & key, String & source, String & format, String & additional_format_info);
}

View File

@ -45,6 +45,10 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -326,6 +330,25 @@ BlockIO InterpreterSystemQuery::execute()
}
break;
}
case Type::DROP_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_storage.empty())
caches_to_drop = {"FILE", "S3", "HDFS", "URL"};
else
caches_to_drop = {query.schema_cache_storage};
if (caches_to_drop.contains("FILE"))
StorageFile::getSchemaCache(getContext()).clear();
if (caches_to_drop.contains("S3"))
StorageS3::getSchemaCache(getContext()).clear();
if (caches_to_drop.contains("HDFS"))
StorageHDFS::getSchemaCache(getContext()).clear();
if (caches_to_drop.contains("URL"))
StorageURL::getSchemaCache(getContext()).clear();
break;
}
case Type::RELOAD_DICTIONARY:
{
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_DICTIONARY);
@ -833,6 +856,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;

View File

@ -29,6 +29,7 @@ public:
DROP_COMPILED_EXPRESSION_CACHE,
#endif
DROP_FILESYSTEM_CACHE,
DROP_SCHEMA_CACHE,
STOP_LISTEN_QUERIES,
START_LISTEN_QUERIES,
RESTART_REPLICAS,
@ -96,6 +97,8 @@ public:
String filesystem_cache_path;
String backup_name;
String schema_cache_storage;
String getID(char) const override { return "SYSTEM query"; }
ASTPtr clone() const override

View File

@ -363,6 +363,23 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
parseQueryWithOnCluster(res, pos, expected);
break;
}
case Type::DROP_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"FILE"}.ignore(pos, expected))
res->schema_cache_storage = "FILE";
else if (ParserKeyword{"S3"}.ignore(pos, expected))
res->schema_cache_storage = "S3";
else if (ParserKeyword{"HDFS"}.ignore(pos, expected))
res->schema_cache_storage = "HDFS";
else if (ParserKeyword{"URL"}.ignore(pos, expected))
res->schema_cache_storage = "URL";
else
return false;
}
break;
}
case Type::UNFREEZE:
{

View File

@ -314,7 +314,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
factory.markFormatSupportsSubsetOfColumns("CapnProto");
factory.registerFileExtension("capnp", "CapnProto");
factory.registerAdditionalInfoForSchemaCacheGetter(
"CapnProto", [](const FormatSettings & settings) { return settings.schema.format_schema; });
"CapnProto", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
}
void registerCapnProtoSchemaReader(FormatFactory & factory)

View File

@ -454,7 +454,7 @@ void registerInputFormatMySQLDump(FormatFactory & factory)
});
factory.registerAdditionalInfoForSchemaCacheGetter(
"MySQLDump", [](const FormatSettings & settings) { return settings.mysql_dump.table_name; });
"MySQLDump", [](const FormatSettings & settings) { return "Table name: " + settings.mysql_dump.table_name; });
}
void registerMySQLSchemaReader(FormatFactory & factory)

View File

@ -82,7 +82,7 @@ void registerInputFormatProtobufList(FormatFactory & factory)
});
factory.markFormatSupportsSubsetOfColumns("ProtobufList");
factory.registerAdditionalInfoForSchemaCacheGetter(
"ProtobufList", [](const FormatSettings & settings) { return settings.schema.format_schema; });
"ProtobufList", [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
}
void registerProtobufListSchemaReader(FormatFactory & factory)

View File

@ -104,7 +104,7 @@ void registerProtobufSchemaReader(FormatFactory & factory)
for (const auto & name : {"Protobuf", "ProtobufSingle"})
factory.registerAdditionalInfoForSchemaCacheGetter(
name, [](const FormatSettings & settings) { return settings.schema.format_schema; });
name, [](const FormatSettings & settings) { return "Format schema: " + settings.schema.format_schema; });
}
}

View File

@ -6,46 +6,57 @@ namespace ProfileEvents
{
extern const Event SchemaInferenceCacheHits;
extern const Event SchemaInferenceCacheMisses;
extern const Event SchemaInferenceCacheTTLExpirations;
extern const Event SchemaInferenceCacheTTLUpdates;
extern const Event SchemaInferenceCacheEvictions;
extern const Event SchemaInferenceCacheInvalidations;
}
namespace DB
{
void SchemaCache::add(const String & key, const ColumnsDescription & columns, time_t ttl)
SchemaCache::SchemaCache(size_t max_elements_) : max_elements(max_elements_)
{
}
void SchemaCache::add(const String & key, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
clean();
addUnlocked(key, columns, ttl);
addUnlocked(key, columns);
}
void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl)
void SchemaCache::addMany(const Strings & keys, const ColumnsDescription & columns)
{
std::lock_guard lock(mutex);
clean();
for (const auto & key : keys)
addUnlocked(key, columns, ttl);
addUnlocked(key, columns);
}
void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl)
void SchemaCache::addUnlocked(const String & key, const ColumnsDescription & columns)
{
/// Do nothing if this key is already in cache;
if (data.contains(key))
return;
time_t now = std::time(nullptr);
time_t valid_until = now + ttl;
data[key] = SchemaInfo{columns, now, ttl, valid_until};
if (ttl)
expiration_queue.insert({valid_until, key});
auto it = queue.insert(queue.end(), key);
data[key] = {SchemaInfo{columns, now}, it};
checkOverflow();
}
std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, std::function<std::optional<time_t>()> get_last_mod_time)
void SchemaCache::checkOverflow()
{
if (queue.size() <= max_elements)
return;
auto key = queue.front();
data.erase(key);
queue.pop_front();
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheEvictions);
}
std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, LastModificationTimeGetter get_last_mod_time)
{
std::lock_guard lock(mutex);
clean();
auto it = data.find(key);
if (it == data.end())
{
@ -53,7 +64,8 @@ std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, std::f
return std::nullopt;
}
auto & schema_info = it->second;
auto & schema_info = it->second.schema_info;
auto & queue_iterator = it->second.iterator;
if (get_last_mod_time)
{
/// It's important to call get_last_mod_time only if we have key in cache,
@ -69,44 +81,34 @@ std::optional<ColumnsDescription> SchemaCache::tryGet(const String & key, std::f
/// Object was modified after it was added in cache.
/// So, stored value is no more valid and we should remove it.
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations);
/// If this key had TTL, we should remove it from expiration queue.
if (schema_info.ttl)
expiration_queue.erase({schema_info.valid_until, key});
queue.erase(queue_iterator);
data.erase(key);
return std::nullopt;
}
}
if (schema_info.ttl)
{
/// Current value in cache is valid and we can resume it's TTL by updating it's expiration time.
/// We will extract current value from the expiration queue, modify it and insert back to the queue.
time_t now = std::time(nullptr);
auto jt = expiration_queue.find({schema_info.valid_until, key});
auto node = expiration_queue.extract(jt);
schema_info.valid_until = now + schema_info.ttl;
node.value().first = schema_info.valid_until;
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLUpdates);
expiration_queue.insert(std::move(node));
}
/// Move key to the end of queue.
queue.splice(queue.end(), queue, queue_iterator);
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits);
return schema_info.columns;
}
void SchemaCache::clean()
void SchemaCache::clear()
{
time_t now = std::time(nullptr);
auto it = expiration_queue.begin();
/// Queue is sorted by time, so we need to check only the first
/// values that are less than current time.
while (it != expiration_queue.end() && it->first < now)
{
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLExpirations);
data.erase(it->second);
++it;
std::lock_guard lock(mutex);
data.clear();
queue.clear();
}
expiration_queue.erase(expiration_queue.begin(), it);
std::unordered_map<String, SchemaCache::SchemaInfo> SchemaCache::getAll()
{
std::lock_guard lock(mutex);
std::unordered_map<String, SchemaCache::SchemaInfo> result;
for (const auto & [key, value] : data)
result[key] = value.schema_info;
return result;
}
}

View File

@ -2,49 +2,64 @@
#include <Storages/ColumnsDescription.h>
#include <unordered_map>
#include <set>
#include <list>
#include <mutex>
#include <optional>
namespace DB
{
const size_t DEFAULT_SCHEMA_CACHE_ELEMENTS = 4096;
/// Cache that stores columns description by some string key. It's used in schema inference.
/// It supports TTL for keys. Before each action it looks for expired TTls and removes
/// corresponding keys from cache. After each access to a key in cache it's TTL resumes,
/// so a key will be removed by TTL only if it was not accessed during this TTL.
/// It implements LRU semantic: after each access to a key in cache we move this key to
/// the end of the queue, if we reached the limit of maximum elements in the cache we
/// remove keys from the beginning of the queue.
/// It also supports keys invalidations by last modification time. If last modification time
/// is provided and last modification happened after a key was added to the cache, this key
/// will be removed from cache.
class SchemaCache
{
public:
/// Add new key with a schema with optional TTL
void add(const String & key, const ColumnsDescription & columns, time_t ttl = 0);
/// Add many keys with the same schema with optional TTL (usually used for globs)
void addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl = 0);
std::optional<ColumnsDescription> tryGet(const String & key, std::function<std::optional<time_t>()> get_last_mod_time = {});
private:
void addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl);
/// Check for expired TTLs.
void clean();
SchemaCache(size_t max_elements_);
struct SchemaInfo
{
ColumnsDescription columns;
time_t registration_time;
time_t ttl;
time_t valid_until;
};
std::unordered_map<String, SchemaInfo> data;
/// Special queue for checking expired TTLs. It contains pairs
/// (expiration time, key) sorted in ascending order.
std::set<std::pair<time_t, String>> expiration_queue;
using LastModificationTimeGetter = std::function<std::optional<time_t>()>;
/// Add new key with a schema
void add(const String & key, const ColumnsDescription & columns);
/// Add many keys with the same schema (usually used for globs)
void addMany(const Strings & keys, const ColumnsDescription & columns);
std::optional<ColumnsDescription> tryGet(const String & key, LastModificationTimeGetter get_last_mod_time = {});
void clear();
std::unordered_map<String, SchemaInfo> getAll();
private:
void addUnlocked(const String & key, const ColumnsDescription & columns);
void checkOverflow();
using Queue = std::list<String>;
using QueueIterator = Queue::iterator;
struct Cell
{
SchemaInfo schema_info;
QueueIterator iterator;
};
Queue queue;
std::unordered_map<String, Cell> data;
size_t max_elements;
std::mutex mutex;
};

View File

@ -729,9 +729,9 @@ NamesAndTypesList StorageHDFS::getVirtuals() const
return virtual_columns;
}
SchemaCache & StorageHDFS::getSchemaCache()
SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache;
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_hdfs", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
@ -742,7 +742,7 @@ std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(ctx);
for (const auto & path : paths)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
@ -770,12 +770,12 @@ void StorageHDFS::addColumnsToCache(
const String & format_name,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(ctx);
Strings sources;
sources.reserve(paths.size());
std::transform(paths.begin(), paths.end(), std::back_inserter(sources), [&](const String & path){ return fs::path(uri_without_path) / path; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, {}, ctx);
schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds());
schema_cache.addMany(cache_keys, columns);
}
}

View File

@ -66,12 +66,12 @@ public:
const String & compression_method,
ContextPtr ctx);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
protected:
friend class HDFSSource;
private:
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & paths,
const String & uri_without_path,

View File

@ -1206,9 +1206,9 @@ NamesAndTypesList StorageFile::getVirtuals() const
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & StorageFile::getSchemaCache()
SchemaCache & StorageFile::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache;
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_file", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
@ -1216,7 +1216,7 @@ std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context)
{
/// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(context);
struct stat file_stat{};
for (const auto & path : paths)
{
@ -1244,9 +1244,9 @@ void StorageFile::addColumnsToCache(
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(paths, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds());
schema_cache.addMany(cache_keys, columns);
}
}

View File

@ -87,6 +87,8 @@ public:
const std::optional<FormatSettings> & format_settings,
ContextPtr context);
static SchemaCache & getSchemaCache(const ContextPtr & context);
protected:
friend class StorageFileSource;
friend class StorageFileSink;
@ -94,8 +96,6 @@ protected:
private:
void setStorageMetadata(CommonArguments args);
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & paths, const String & format_name, const std::optional<FormatSettings> & format_settings, ContextPtr context);

View File

@ -1327,9 +1327,9 @@ bool StorageS3::supportsPartitionBy() const
return true;
}
SchemaCache & StorageS3::getSchemaCache()
SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache;
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
@ -1342,7 +1342,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it)
{
String path = fs::path(s3_configuration.uri.bucket) / *it;
@ -1391,8 +1391,8 @@ void StorageS3::addColumnsToCache(
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; });
Strings cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache();
schema_cache.addMany(cache_keys, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds());
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addMany(cache_keys, columns);
}
}

View File

@ -211,6 +211,8 @@ public:
S3Settings::ReadWriteSettings rw_settings;
};
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
@ -259,8 +261,6 @@ private:
bool supportsSubsetOfColumns() const override;
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings::const_iterator & begin,
const Strings::const_iterator & end,

View File

@ -790,9 +790,9 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
}
}
SchemaCache & IStorageURLBase::getSchemaCache()
SchemaCache & IStorageURLBase::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache;
static SchemaCache schema_cache(context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_url", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
@ -804,7 +804,7 @@ std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(context);
for (const auto & url : urls)
{
auto get_last_mod_time = [&]() -> std::optional<time_t>
@ -813,7 +813,7 @@ std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(
/// Some URLs could not have Last-Modified header, in this case we cannot be sure that
/// data wasn't changed after adding it's schema to cache. Use schema from cache only if
/// special setting for this case is enabled.
if (!last_mod_time && context->getSettingsRef().allow_urls_without_last_mod_time_in_schema_inference_cache)
if (!last_mod_time && !context->getSettingsRef().schema_inference_cache_require_modification_time_for_url)
return 0;
return last_mod_time;
};
@ -834,9 +834,9 @@ void IStorageURLBase::addColumnsToCache(
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
auto & schema_cache = getSchemaCache(context);
Strings cache_keys = getKeysForSchemaCache(urls, format_name, format_settings, context);
schema_cache.addMany(cache_keys, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds());
schema_cache.addMany(cache_keys, columns);
}
std::optional<time_t> IStorageURLBase::getLastModificationTime(

View File

@ -49,6 +49,8 @@ public:
const std::optional<FormatSettings> & format_settings,
ContextPtr context);
static SchemaCache & getSchemaCache(const ContextPtr & context);
protected:
IStorageURLBase(
const String & uri_,
@ -99,8 +101,6 @@ protected:
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings & urls,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,

View File

@ -0,0 +1,69 @@
#include <Storages/System/StorageSystemSchemaInferenceCache.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <Formats/ReadSchemaUtils.h>
namespace DB
{
static String getSchemaString(const ColumnsDescription & columns)
{
WriteBufferFromOwnString buf;
const auto & names_and_types = columns.getAll();
for (auto it = names_and_types.begin(); it != names_and_types.end(); ++it)
{
if (it != names_and_types.begin())
writeCString(", ", buf);
writeString(it->name, buf);
writeChar(' ', buf);
writeString(it->type->getName(), buf);
}
return buf.str();
}
NamesAndTypesList StorageSystemSchemaInferenceCache::getNamesAndTypes()
{
return {
{"storage", std::make_shared<DataTypeString>()},
{"source", std::make_shared<DataTypeString>()},
{"format", std::make_shared<DataTypeString>()},
{"additional_format_info", std::make_shared<DataTypeString>()},
{"registration_time", std::make_shared<DataTypeDateTime>()},
{"schema", std::make_shared<DataTypeString>()}
};
}
void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File");
fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3");
fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS");
fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL");
}
void StorageSystemSchemaInferenceCache::fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name) const
{
auto s3_schema_cache_data = schema_cache.getAll();
String source;
String format;
String additional_format_info;
for (const auto & [key, schema_info] : s3_schema_cache_data)
{
splitSchemaCacheKey(key, source, format, additional_format_info);
res_columns[0]->insert(storage_name);
res_columns[1]->insert(source);
res_columns[2]->insert(format);
res_columns[3]->insert(additional_format_info);
res_columns[4]->insert(schema_info.registration_time);
res_columns[5]->insert(getSchemaString(schema_info.columns));
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Storages/Cache/SchemaCache.h>
namespace DB
{
class StorageSystemSchemaInferenceCache final : public IStorageSystemOneBlock<StorageSystemSchemaInferenceCache>
{
public:
std::string getName() const override { return "SystemSettingsChanges"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
void fillDataImpl(MutableColumns & res_columns, SchemaCache & schema_cache, const String & storage_name) const;
};
}

View File

@ -74,6 +74,7 @@
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Storages/System/StorageSystemRemoteDataPaths.h>
#include <Storages/System/StorageSystemCertificates.h>
#include <Storages/System/StorageSystemSchemaInferenceCache.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
@ -133,6 +134,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemLicenses>(context, system_database, "licenses");
attach<StorageSystemTimeZones>(context, system_database, "time_zones");
attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
#ifdef OS_LINUX
attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<schema_inference_cache_max_elements_for_file>2</schema_inference_cache_max_elements_for_file>
</clickhouse>

View File

@ -4,7 +4,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node", stay_alive=True, main_configs=["configs/config.d/query_log.xml"]
"node", stay_alive=True,
main_configs=["configs/config.d/query_log.xml", "configs/config.d/schema_cache.xml"]
)
@ -27,122 +28,115 @@ def get_profile_event_for_query(node, query, profile_event):
)
def check_cache_misses(node, file, amount=1):
assert get_profile_event_for_query(node, f"desc file('{file}')",
"SchemaInferenceCacheMisses") == amount
def check_cache_hits(node, file, amount=1):
assert get_profile_event_for_query(node, f"desc file('{file}')",
"SchemaInferenceCacheHits") == amount
def check_cache_invalidations(node, file, amount=1):
assert get_profile_event_for_query(node, f"desc file('{file}')",
"SchemaInferenceCacheInvalidations") == amount
def check_cache_evictions(node, file, amount=1):
assert get_profile_event_for_query(node, f"desc file('{file}')",
"SchemaInferenceCacheEvictions") == amount
def check_cache(node, expected_files):
sources = node.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files)
def test(start_cluster):
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_misses = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl"])
check_cache_misses(node, "data.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_invalidations = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
node.query("desc file('data.jsonl')")
check_cache_invalidations(node, "data.jsonl")
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
desc_query = (
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
)
node.query(desc_query)
cache_misses = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
time.sleep(1)
time.sleep(2)
node.query("desc file('data1.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data1.jsonl")
desc_query = (
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000"
)
node.query(desc_query)
cache_misses = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = "desc file('data1.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheTTLUpdates"
)
assert cache_ttl_updates == 1
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
desc_query = (
"desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
)
node.query(desc_query)
cache_invalidations = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
time.sleep(2)
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = "desc file('data1.jsonl')"
node.query(desc_query)
cache_misses = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("insert into function file('data2.jsonl') select * from numbers(100)")
node.query("insert into function file('data3.jsonl') select * from numbers(100)")
time.sleep(1)
desc_query = "desc file('data*.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
node.query("desc file('data2.jsonl')")
check_cache(node, ["data1.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
desc_query = "desc file('data2.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
desc_query = "desc file('data3.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(
node, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data.jsonl")
check_cache_evictions(node, "data.jsonl")
node.query("desc file('data2.jsonl')")
check_cache(node, ["data.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data3.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data*.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_hits(node, "data*.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_misses(node, "data2.jsonl")
node.query("desc file('data3.jsonl')")
check_cache_misses(node, "data3.jsonl")
node.query("system drop schema cache for file")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)
node.query("system drop schema cache")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)

View File

@ -0,0 +1,3 @@
<clickhouse>
<schema_inference_cache_max_elements_for_hdfs>2</schema_inference_cache_max_elements_for_hdfs>
</clickhouse>

View File

@ -8,7 +8,7 @@ from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/macro.xml"], with_hdfs=True
"node1", main_configs=["configs/macro.xml", "configs/schema_cache.xml"], with_hdfs=True
)
@ -21,7 +21,7 @@ def started_cluster():
cluster.shutdown()
def _test_read_write_storage(started_cluster):
def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query("drop table if exists SimpleHDFSStorage SYNC")
node1.query(
@ -32,7 +32,7 @@ def _test_read_write_storage(started_cluster):
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def _test_read_write_storage_with_globs(started_cluster):
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
@ -79,7 +79,7 @@ def _test_read_write_storage_with_globs(started_cluster):
assert "in readonly mode" in str(ex)
def _test_read_write_table(started_cluster):
def test_read_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
@ -95,7 +95,7 @@ def _test_read_write_table(started_cluster):
)
def _test_write_table(started_cluster):
def test_write_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
@ -110,7 +110,7 @@ def _test_write_table(started_cluster):
assert node1.query("select * from OtherHDFSStorage order by id") == result
def _test_bad_hdfs_uri(started_cluster):
def test_bad_hdfs_uri(started_cluster):
try:
node1.query(
"create table BadStorage1 (id UInt32, name String, weight Float64) ENGINE = HDFS('hads:hgsdfs100500:9000/other_storage', 'TSV')"
@ -136,7 +136,7 @@ def _test_bad_hdfs_uri(started_cluster):
@pytest.mark.timeout(800)
def _test_globs_in_read_table(started_cluster):
def test_globs_in_read_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
@ -191,7 +191,7 @@ def _test_globs_in_read_table(started_cluster):
).rstrip() == str(files_amount)
def _test_read_write_gzip_table(started_cluster):
def test_read_write_gzip_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
@ -207,7 +207,7 @@ def _test_read_write_gzip_table(started_cluster):
)
def _test_read_write_gzip_table_with_parameter_gzip(started_cluster):
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
@ -223,7 +223,7 @@ def _test_read_write_gzip_table_with_parameter_gzip(started_cluster):
)
def _test_read_write_table_with_parameter_none(started_cluster):
def test_read_write_table_with_parameter_none(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
@ -239,7 +239,7 @@ def _test_read_write_table_with_parameter_none(started_cluster):
)
def _test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
@ -255,7 +255,7 @@ def _test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
)
def _test_write_gz_storage(started_cluster):
def test_write_gz_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
@ -266,7 +266,7 @@ def _test_write_gz_storage(started_cluster):
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def _test_write_gzip_storage(started_cluster):
def test_write_gzip_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
@ -277,7 +277,7 @@ def _test_write_gzip_storage(started_cluster):
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
def _test_virtual_columns(started_cluster):
def test_virtual_columns(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
@ -295,7 +295,7 @@ def _test_virtual_columns(started_cluster):
)
def _test_read_files_with_spaces(started_cluster):
def test_read_files_with_spaces(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
@ -316,7 +316,7 @@ def _test_read_files_with_spaces(started_cluster):
fs.delete(dir, recursive=True)
def _test_truncate_table(started_cluster):
def test_truncate_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table test_truncate (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/tr', 'TSV')"
@ -329,7 +329,7 @@ def _test_truncate_table(started_cluster):
node1.query("drop table test_truncate")
def _test_partition_by(started_cluster):
def test_partition_by(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -373,7 +373,7 @@ def _test_partition_by(started_cluster):
assert result.strip() == "1\t2\t3"
def _test_seekable_formats(started_cluster):
def test_seekable_formats(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = (
@ -394,7 +394,7 @@ def _test_seekable_formats(started_cluster):
assert int(result) == 5000000
def _test_read_table_with_default(started_cluster):
def test_read_table_with_default(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "n\n100\n"
@ -410,7 +410,7 @@ def _test_read_table_with_default(started_cluster):
)
def _test_schema_inference(started_cluster):
def test_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/native', 'Native', 'a Int32, b String') SELECT number, randomString(100) FROM numbers(5000000)"
)
@ -433,7 +433,7 @@ def _test_schema_inference(started_cluster):
assert int(result) == 5000000
def _test_hdfsCluster(started_cluster):
def test_hdfsCluster(started_cluster):
hdfs_api = started_cluster.hdfs_api
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = "/test_hdfsCluster"
@ -459,13 +459,13 @@ def _test_hdfsCluster(started_cluster):
fs.delete(dir, recursive=True)
def _test_hdfs_directory_not_exist(started_cluster):
def test_hdfs_directory_not_exist(started_cluster):
ddl = "create table HDFSStorageWithNotExistDir (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/data/not_eixst', 'TSV')"
node1.query(ddl)
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
def _test_overwrite(started_cluster):
def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
@ -484,7 +484,7 @@ def _test_overwrite(started_cluster):
assert int(result) == 10
def _test_multiple_inserts(started_cluster):
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
@ -520,7 +520,7 @@ def _test_multiple_inserts(started_cluster):
assert int(result) == 60
def _test_format_detection(started_cluster):
def test_format_detection(started_cluster):
node1.query(
f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')"
)
@ -529,7 +529,7 @@ def _test_format_detection(started_cluster):
assert int(result) == 1
def _test_schema_inference_with_globs(started_cluster):
def test_schema_inference_with_globs(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
)
@ -570,7 +570,7 @@ def _test_schema_inference_with_globs(started_cluster):
)
def _test_insert_select_schema_inference(started_cluster):
def test_insert_select_schema_inference(started_cluster):
node1.query(
f"insert into table function hdfs('hdfs://hdfs1:9000/test.native.zst') select toUInt64(1) as x"
)
@ -582,7 +582,7 @@ def _test_insert_select_schema_inference(started_cluster):
assert int(result) == 1
def _test_cluster_join(started_cluster):
def test_cluster_join(started_cluster):
result = node1.query(
"""
SELECT l.id,r.id FROM hdfsCluster('test_cluster_two_shards', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32') as l
@ -593,7 +593,7 @@ def _test_cluster_join(started_cluster):
assert "AMBIGUOUS_COLUMN_NAME" not in result
def _test_cluster_macro(started_cluster):
def test_cluster_macro(started_cluster):
with_macro = node1.query(
"""
SELECT id FROM hdfsCluster('{default_cluster_macro}', 'hdfs://hdfs1:9000/test_hdfsCluster/file*', 'TSV', 'id UInt32')
@ -609,7 +609,7 @@ def _test_cluster_macro(started_cluster):
assert TSV(with_macro) == TSV(no_macro)
def _test_virtual_columns_2(started_cluster):
def test_virtual_columns_2(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = (
@ -639,129 +639,134 @@ def get_profile_event_for_query(node, query, profile_event):
)
def check_cache_misses(node1, file, amount=1):
assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheMisses") == amount
def check_cache_hits(node1, file, amount=1):
assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheHits") == amount
def check_cache_invalidations(node1, file, amount=1):
assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheInvalidations") == amount
def check_cache_evictions(node1, file, amount=1):
assert get_profile_event_for_query(node1, f"desc hdfs('hdfs://hdfs1:9000/{file}')",
"SchemaInferenceCacheEvictions") == amount
def check_cache(node1, expected_files):
sources = node1.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files)
def run_describe_query(node, file):
query = f"desc hdfs('hdfs://hdfs1:9000/{file}')"
node.query(query)
def test_schema_inference_cache(started_cluster):
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache0.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_invalidations = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
run_describe_query(node1, "test_cache0.jsonl")
check_cache_invalidations(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
time.sleep(1)
time.sleep(2)
run_describe_query(node1, "test_cache1.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache1.jsonl")
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1000"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheTTLUpdates"
)
assert cache_ttl_updates == 1
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
node1.query(desc_query)
cache_invalidations = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
time.sleep(2)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache1.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_hits(node1, "test_cache1.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(node1, "test_cache0.jsonl")
check_cache_evictions(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache(node1, ["test_cache0.jsonl", "test_cache2.jsonl"])
check_cache_misses(node1, "test_cache2.jsonl")
check_cache_evictions(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_hits(node1, "test_cache0.jsonl")
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache*.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
files = "test_cache{0,1,2,3}.jsonl"
run_describe_query(node1, files)
check_cache(node1, ["test_cache2.jsonl", "test_cache3.jsonl"])
check_cache_hits(node1, files)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache2.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(node1, "test_cache2.jsonl")
check_cache_hits(node1, "test_cache2.jsonl")
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache3.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(
node1, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(node1, "test_cache3.jsonl")
check_cache_hits(node1, "test_cache3.jsonl")
run_describe_query(node1, "test_cache0.jsonl")
check_cache_misses(node1, "test_cache0.jsonl")
run_describe_query(node1, "test_cache1.jsonl")
check_cache_misses(node1, "test_cache1.jsonl")
node1.query(f"system drop schema cache for hdfs")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
node1.query("system drop schema cache")
check_cache(node1, [])
run_describe_query(node1, files)
check_cache_misses(node1, files, 4)
if __name__ == "__main__":

View File

@ -0,0 +1,4 @@
<clickhouse>
<schema_inference_cache_max_elements_for_s3>2</schema_inference_cache_max_elements_for_s3>
<schema_inference_cache_max_elements_for_url>2</schema_inference_cache_max_elements_for_url>
</clickhouse>

View File

@ -103,7 +103,7 @@ def started_cluster():
cluster.add_instance(
"dummy",
with_minio=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml", "configs/schema_cache.xml"],
)
cluster.add_instance(
"s3_max_redirects",
@ -148,7 +148,7 @@ def run_query(instance, query, stdin=None, settings=None):
pytest.param("'wrongid','wrongkey',", False, "zstd", id="zstd"),
],
)
def _test_put(started_cluster, maybe_auth, positive, compression):
def test_put(started_cluster, maybe_auth, positive, compression):
# type: (ClickHouseCluster) -> None
bucket = (
@ -174,7 +174,7 @@ def _test_put(started_cluster, maybe_auth, positive, compression):
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
def _test_partition_by(started_cluster):
def test_partition_by(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -201,7 +201,7 @@ def _test_partition_by(started_cluster):
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test2_45.csv")
def _test_partition_by_string_column(started_cluster):
def test_partition_by_string_column(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "col_num UInt32, col_str String"
@ -221,7 +221,7 @@ def _test_partition_by_string_column(started_cluster):
assert '78,"你好"\n' == get_s3_file_content(started_cluster, bucket, "test_你好.csv")
def _test_partition_by_const_column(started_cluster):
def test_partition_by_const_column(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -239,7 +239,7 @@ def _test_partition_by_const_column(started_cluster):
@pytest.mark.parametrize("special", ["space", "plus"])
def _test_get_file_with_special(started_cluster, special):
def test_get_file_with_special(started_cluster, special):
symbol = {"space": " ", "plus": "+"}[special]
urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special]
auth = "'minio','minio123',"
@ -281,7 +281,7 @@ def _test_get_file_with_special(started_cluster, special):
@pytest.mark.parametrize("special", ["space", "plus", "plus2"])
def _test_get_path_with_special(started_cluster, special):
def test_get_path_with_special(started_cluster, special):
symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special]
safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special]
auth = "'minio','minio123',"
@ -293,7 +293,7 @@ def _test_get_path_with_special(started_cluster, special):
# Test put no data to S3.
@pytest.mark.parametrize("auth", [pytest.param("'minio','minio123',", id="minio")])
def _test_empty_put(started_cluster, auth):
def test_empty_put(started_cluster, auth):
# type: (ClickHouseCluster, str) -> None
bucket = started_cluster.minio_bucket
@ -349,7 +349,7 @@ def _test_empty_put(started_cluster, auth):
pytest.param("'wrongid','wrongkey',", False, id="negative"),
],
)
def _test_put_csv(started_cluster, maybe_auth, positive):
def test_put_csv(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster, bool, str) -> None
bucket = (
@ -381,7 +381,7 @@ def _test_put_csv(started_cluster, maybe_auth, positive):
# Test put and get with S3 server redirect.
def _test_put_get_with_redirect(started_cluster):
def test_put_get_with_redirect(started_cluster):
# type: (ClickHouseCluster) -> None
bucket = started_cluster.minio_bucket
@ -419,7 +419,7 @@ def _test_put_get_with_redirect(started_cluster):
# Test put with restricted S3 server redirect.
def _test_put_with_zero_redirect(started_cluster):
def test_put_with_zero_redirect(started_cluster):
# type: (ClickHouseCluster) -> None
bucket = started_cluster.minio_bucket
@ -458,7 +458,7 @@ def _test_put_with_zero_redirect(started_cluster):
assert exception_raised
def _test_put_get_with_globs(started_cluster):
def test_put_get_with_globs(started_cluster):
# type: (ClickHouseCluster) -> None
unique_prefix = random.randint(1, 10000)
bucket = started_cluster.minio_bucket
@ -515,7 +515,7 @@ def _test_put_get_with_globs(started_cluster):
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
],
)
def _test_multipart(started_cluster, maybe_auth, positive):
def test_multipart(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
bucket = (
@ -604,7 +604,7 @@ def _test_multipart(started_cluster, maybe_auth, positive):
)
def _test_remote_host_filter(started_cluster):
def test_remote_host_filter(started_cluster):
instance = started_cluster.instances["restricted_dummy"]
format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -624,7 +624,7 @@ def _test_remote_host_filter(started_cluster):
assert "not allowed in configuration file" in instance.query_and_get_error(query)
def _test_wrong_s3_syntax(started_cluster):
def test_wrong_s3_syntax(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
expected_err_msg = "Code: 42" # NUMBER_OF_ARGUMENTS_DOESNT_MATCH
@ -638,7 +638,7 @@ def _test_wrong_s3_syntax(started_cluster):
# https://en.wikipedia.org/wiki/One_Thousand_and_One_Nights
def _test_s3_glob_scheherazade(started_cluster):
def test_s3_glob_scheherazade(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -734,7 +734,7 @@ def replace_config(old, new):
config.close()
def _test_custom_auth_headers(started_cluster):
def test_custom_auth_headers(started_cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
@ -773,7 +773,7 @@ def _test_custom_auth_headers(started_cluster):
instance.query("DROP TABLE test")
def _test_custom_auth_headers_exclusion(started_cluster):
def test_custom_auth_headers_exclusion(started_cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = f"SELECT * FROM s3('http://resolver:8080/{started_cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')"
@ -787,7 +787,7 @@ def _test_custom_auth_headers_exclusion(started_cluster):
assert "Forbidden Error" in ei.value.stderr
def _test_infinite_redirect(started_cluster):
def test_infinite_redirect(started_cluster):
bucket = "redirected"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
@ -810,7 +810,7 @@ def _test_infinite_redirect(started_cluster):
pytest.param("gz", "auto", id="gz"),
],
)
def _test_storage_s3_get_gzip(started_cluster, extension, method):
def test_storage_s3_get_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = f"test_get_gzip.{extension}"
@ -853,7 +853,7 @@ def _test_storage_s3_get_gzip(started_cluster, extension, method):
run_query(instance, f"DROP TABLE {name}")
def _test_storage_s3_get_unstable(started_cluster):
def test_storage_s3_get_unstable(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
@ -862,7 +862,7 @@ def _test_storage_s3_get_unstable(started_cluster):
assert result.splitlines() == ["500001,500000,0"]
def _test_storage_s3_put_uncompressed(started_cluster):
def test_storage_s3_put_uncompressed(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = "test_put_uncompressed.bin"
@ -903,7 +903,7 @@ def _test_storage_s3_put_uncompressed(started_cluster):
"extension,method",
[pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")],
)
def _test_storage_s3_put_gzip(started_cluster, extension, method):
def test_storage_s3_put_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = f"test_put_gzip.{extension}"
@ -945,7 +945,7 @@ def _test_storage_s3_put_gzip(started_cluster, extension, method):
assert sum([int(i.split(",")[1]) for i in uncompressed_content.splitlines()]) == 708
def _test_truncate_table(started_cluster):
def test_truncate_table(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
name = "truncate"
@ -975,7 +975,7 @@ def _test_truncate_table(started_cluster):
assert instance.query("SELECT * FROM {}".format(name)) == ""
def _test_predefined_connection_configuration(started_cluster):
def test_predefined_connection_configuration(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
name = "test_table"
@ -998,7 +998,7 @@ def _test_predefined_connection_configuration(started_cluster):
result = ""
def _test_url_reconnect_in_the_middle(started_cluster):
def test_url_reconnect_in_the_middle(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_format = "id String, data String"
@ -1047,7 +1047,7 @@ def _test_url_reconnect_in_the_middle(started_cluster):
assert int(result) == 3914219105369203805
def _test_seekable_formats(started_cluster):
def test_seekable_formats(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
@ -1083,7 +1083,7 @@ def _test_seekable_formats(started_cluster):
assert int(result) > 80
def _test_seekable_formats_url(started_cluster):
def test_seekable_formats_url(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
@ -1109,7 +1109,7 @@ def _test_seekable_formats_url(started_cluster):
assert int(result) == 1000000
def _test_empty_file(started_cluster):
def test_empty_file(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1124,7 +1124,7 @@ def _test_empty_file(started_cluster):
assert int(result) == 0
def _test_insert_with_path_with_globs(started_cluster):
def test_insert_with_path_with_globs(started_cluster):
instance = started_cluster.instances["dummy"]
table_function_3 = f"s3('http://minio1:9001/root/test_parquet*', 'minio', 'minio123', 'Parquet', 'a Int32, b String')"
@ -1133,7 +1133,7 @@ def _test_insert_with_path_with_globs(started_cluster):
)
def _test_s3_schema_inference(started_cluster):
def test_s3_schema_inference(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1179,7 +1179,7 @@ def _test_s3_schema_inference(started_cluster):
assert int(result) == 5000000
def _test_empty_file(started_cluster):
def test_empty_file(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1194,7 +1194,7 @@ def _test_empty_file(started_cluster):
assert int(result) == 0
def _test_overwrite(started_cluster):
def test_overwrite(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1215,7 +1215,7 @@ def _test_overwrite(started_cluster):
assert int(result) == 200
def _test_create_new_files_on_insert(started_cluster):
def test_create_new_files_on_insert(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1256,7 +1256,7 @@ def _test_create_new_files_on_insert(started_cluster):
assert int(result) == 60
def _test_format_detection(started_cluster):
def test_format_detection(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1291,7 +1291,7 @@ def _test_format_detection(started_cluster):
assert int(result) == 1
def _test_schema_inference_from_globs(started_cluster):
def test_schema_inference_from_globs(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1364,7 +1364,7 @@ def _test_schema_inference_from_globs(started_cluster):
)
def _test_signatures(started_cluster):
def test_signatures(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1398,7 +1398,7 @@ def _test_signatures(started_cluster):
assert int(result) == 1
def _test_select_columns(started_cluster):
def test_select_columns(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
name = "test_table2"
@ -1429,7 +1429,7 @@ def _test_select_columns(started_cluster):
assert int(result1) * 3 <= int(result2)
def _test_insert_select_schema_inference(started_cluster):
def test_insert_select_schema_inference(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1447,7 +1447,7 @@ def _test_insert_select_schema_inference(started_cluster):
assert int(result) == 1
def _test_parallel_reading_with_memory_limit(started_cluster):
def test_parallel_reading_with_memory_limit(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1468,7 +1468,7 @@ def _test_parallel_reading_with_memory_limit(started_cluster):
assert int(result) == 1
def _test_wrong_format_usage(started_cluster):
def test_wrong_format_usage(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
@ -1493,259 +1493,178 @@ def get_profile_event_for_query(instance, query, profile_event):
)
def check_cache_misses(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert get_profile_event_for_query(instance, query,
"SchemaInferenceCacheMisses") == amount
def check_cache_hits(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert get_profile_event_for_query(instance, query,
"SchemaInferenceCacheHits") == amount
def check_cache_invalidations(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert get_profile_event_for_query(instance, query,
"SchemaInferenceCacheInvalidations") == amount
def check_cache_evictions(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert get_profile_event_for_query(instance, query,
"SchemaInferenceCacheEvictions") == amount
def run_describe_query(instance, file, storage_name, started_cluster, bucket):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
instance.query(query)
def check_cache(instance, expected_files):
sources = instance.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(expected_files)
def test_schema_inference_cache(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
def test(storage_name):
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache(instance, ["test_cache0.jsonl"])
check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache0.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache_invalidations(instance, "test_cache0.jsonl", storage_name,
started_cluster, bucket)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
time.sleep(1)
time.sleep(2)
run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1000"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLUpdates"
)
assert cache_ttl_updates == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
time.sleep(2)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache(instance, ["test_cache1.jsonl", "test_cache2.jsonl"])
check_cache_misses(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache_evictions(instance, "test_cache2.jsonl", storage_name,
started_cluster, bucket)
run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache(instance, ["test_cache0.jsonl", "test_cache1.jsonl"])
check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache_evictions(instance, "test_cache0.jsonl", storage_name,
started_cluster, bucket)
run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache(instance, ["test_cache0.jsonl", "test_cache2.jsonl"])
check_cache_misses(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache_evictions(instance, "test_cache2.jsonl", storage_name,
started_cluster, bucket)
run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache*.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
files = "test_cache{0,1,2,3}.jsonl"
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache(instance, ["test_cache2.jsonl", "test_cache3.jsonl"])
check_cache_hits(instance, files, storage_name, started_cluster, bucket)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache2.jsonl", storage_name, started_cluster,
bucket)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
run_describe_query(instance, "test_cache3.jsonl", storage_name, started_cluster,
bucket)
check_cache_hits(instance, "test_cache3.jsonl", storage_name, started_cluster,
bucket)
# Test the same scenarious but for URL table function
run_describe_query(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
check_cache_misses(instance, "test_cache0.jsonl", storage_name, started_cluster,
bucket)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
run_describe_query(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
check_cache_misses(instance, "test_cache1.jsonl", storage_name, started_cluster,
bucket)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
instance.query(f"system drop schema cache for {storage_name}")
check_cache(instance, [])
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
instance.query("system drop schema cache")
check_cache(instance, [])
time.sleep(2)
run_describe_query(instance, files, storage_name, started_cluster, bucket)
check_cache_misses(instance, files, storage_name, started_cluster, bucket, 4)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1000"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLUpdates"
)
assert cache_ttl_updates == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheInvalidations"
)
assert cache_invalidations == 1
time.sleep(2)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheTTLExpirations"
)
assert cache_ttl_expirations == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheMisses"
)
assert cache_misses == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
time.sleep(1)
file_name = "test_cache{1,2,3}.jsonl"
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name}')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(
instance, desc_query, "SchemaInferenceCacheHits"
)
assert cache_hits == 1
test("s3")
instance.query("system drop schema cache")
test("url")

View File

@ -0,0 +1,12 @@
storage String
source String
format String
additional_format_info String
registration_time DateTime
schema String
x Nullable(String)
s Nullable(String)
x Nullable(String)
s Nullable(String)
File 02374_data1.jsonl JSONEachRow x Nullable(String), s Nullable(String)
File 02374_data2.jsonl JSONEachRow x Nullable(String), s Nullable(String)

View File

@ -0,0 +1,12 @@
insert into function file('02374_data1.jsonl') select number as x, 'str' as s from numbers(10);
insert into function file('02374_data2.jsonl') select number as x, 'str' as s from numbers(10);
desc system.schema_inference_cache;
system drop schema cache for file;
desc file('02374_data1.jsonl');
desc file('02374_data2.jsonl');
select storage, splitByChar('/', source)[-1], format, schema from system.schema_inference_cache where storage='File';
system drop schema cache for file;
select storage, source, format, schema from system.schema_inference_cache where storage='File';