Implement cache for schema inference for file/s3/hdfs/url

This commit is contained in:
avogar 2022-06-21 13:02:48 +00:00
parent 948b5b7804
commit d37ad2e6de
21 changed files with 847 additions and 47 deletions

View File

@ -343,7 +343,13 @@
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
\
M(SchemaInferenceCacheHits, "Number of times a schema from cache was used for schema inference") \
M(SchemaInferenceCacheMisses, "Number of times a schema is not in cache while schema inference") \
M(SchemaInferenceCacheTTLExpirations, "Number of times a schema from cache expires due to TTL") \
M(SchemaInferenceCacheTTLUpdates, "Number of times TTL for schema in cache was updated") \
M(SchemaInferenceCacheInvalidations, "Number of times a schema in cache became invalid due to changes in data")
namespace ProfileEvents
{

View File

@ -589,7 +589,15 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \
\
/** Experimental functions */ \
M(Bool, use_cache_for_file_schema_inference, true, "Use cache in schema inference while using file table function", 0) \
M(Seconds, cache_ttl_for_file_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using file table function. 0 means no ttl", 0) \
M(Bool, use_cache_for_s3_schema_inference, true, "Use cache in schema inference while using s3 table function", 0) \
M(Seconds, cache_ttl_for_s3_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using s3 table function. 0 means no ttl", 0) \
M(Bool, use_cache_for_hdfs_schema_inference, true, "Use cache in schema inference while using hdfs table function", 0) \
M(Seconds, cache_ttl_for_hdfs_schema_inference, 3600 * 24, "TTL for schemes in cache in schema inference while using hdfs table function. 0 means no ttl", 0) \
M(Bool, use_cache_for_url_schema_inference, true, "Use cache in schema inference while using url table function", 0) \
M(Seconds, cache_ttl_for_url_schema_inference, 3600, "TTL for schemes in cache in schema inference while using url table function. 0 means no ttl", 0) \
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions (hashid, etc)", 0) \

View File

@ -1,4 +1,3 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
@ -75,7 +74,8 @@ ColumnsDescription readSchemaFromFormat(
SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
size_t iterations = 0;
while ((buf = read_buffer_iterator()))
ColumnsDescription cached_columns;
while ((buf = read_buffer_iterator(cached_columns)))
{
++iterations;
@ -124,6 +124,9 @@ ColumnsDescription readSchemaFromFormat(
}
}
if (!cached_columns.empty())
return cached_columns;
if (names_and_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}", exception_messages);

View File

@ -6,7 +6,7 @@
namespace DB
{
using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>()>;
using ReadBufferIterator = std::function<std::unique_ptr<ReadBuffer>(ColumnsDescription &)>;
/// Try to determine the schema of the data in specifying format.
/// For formats that have an external schema reader, it will

View File

@ -790,7 +790,8 @@ namespace S3
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
{
Aws::S3::Model::HeadObjectRequest req;
req.SetBucket(bucket);
@ -804,13 +805,18 @@ namespace S3
if (outcome.IsSuccess())
{
auto read_result = outcome.GetResultWithOwnership();
return static_cast<size_t>(read_result.GetContentLength());
return {.size = static_cast<size_t>(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000};
}
else if (throw_on_error)
{
throw DB::Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
return 0;
return {};
}
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error)
{
return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error).size;
}
}

View File

@ -78,6 +78,14 @@ struct URI
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
};
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true);
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id = {}, bool throw_on_error = true);
}

View File

@ -0,0 +1,154 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <unordered_map>
#include <mutex>
#include <ctime>
#include <optional>
#include <limits>
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event SchemaInferenceCacheHits;
extern const Event SchemaInferenceCacheMisses;
extern const Event SchemaInferenceCacheTTLExpirations;
extern const Event SchemaInferenceCacheTTLUpdates;
extern const Event SchemaInferenceCacheInvalidations;
}
namespace DB
{
/// Cache that stores columns description by some string key. It's used in schema inference.
/// It supports TTL for keys. Before each action it looks for expired TTls and removes
/// corresponding keys from cache. After each access to a key in cache it's TTL resumes,
/// so a key will be removed by TTL only if it was not accessed during this TTL.
/// It also supports keys invalidations by last modification time. If last modification time
/// is provided and last modification happened after a key was added to the cache, this key
/// will be removed from cache.
class SchemaCache
{
public:
void add(const String & key, const ColumnsDescription & columns, time_t ttl = 0)
{
std::lock_guard lock(mutex);
clean();
addUnlocked(key, columns, ttl);
}
void addMany(const Strings & keys, const ColumnsDescription & columns, time_t ttl = 0)
{
std::lock_guard lock(mutex);
clean();
for (const auto & key : keys)
addUnlocked(key, columns, ttl);
}
std::optional<ColumnsDescription> tryGet(const String & key, std::function<time_t()> get_last_mod_time = {})
{
std::lock_guard lock(mutex);
clean();
auto it = data.find(key);
if (it == data.end())
{
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheMisses);
return std::nullopt;
}
auto & schema_info = it->second;
if (get_last_mod_time)
{
/// It's important to call get_last_mod_time only if we have key in cache,
/// because this function can do some heavy operations.
auto last_mod_time = get_last_mod_time();
/// If get_last_mod_time function was provided but it returned 0, it means that
/// it failed to get last modification time, so we cannot safely use value from cache.
if (!last_mod_time)
return std::nullopt;
if (last_mod_time > schema_info.registration_time)
{
/// Object was modified after it was added in cache.
/// So, stored value is no more valid and we should remove it.
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheInvalidations);
/// If this key had TTL, we should remove it from expiration queue.
if (schema_info.ttl)
expiration_queue.erase({schema_info.valid_until, key});
data.erase(key);
return std::nullopt;
}
}
if (schema_info.ttl)
{
/// Current value in cache is valid and we can resume it's TTL bu updating it's expiration time.
/// We will extract current value from the expiration queue, modify it and insert back to the queue..
time_t now = std::time(nullptr);
auto jt = expiration_queue.find({schema_info.valid_until, key});
auto node = expiration_queue.extract(jt);
schema_info.valid_until = now + schema_info.ttl;
node.value().first = schema_info.valid_until;
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLUpdates);
expiration_queue.insert(std::move(node));
}
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits);
return schema_info.columns;
}
/// Check if this cache contains provided key.
bool has(const String & key)
{
std::lock_guard lock(mutex);
clean();
return data.contains(key);
}
private:
void addUnlocked(const String & key, const ColumnsDescription & columns, time_t ttl)
{
/// Do nothing if this key is already in cache;
if (data.contains(key))
return;
time_t now = std::time(nullptr);
time_t valid_until = now + ttl;
data[key] = SchemaInfo{columns, now, ttl, valid_until};
if (ttl)
expiration_queue.insert({valid_until, key});
}
/// Check for expired TTLs.
void clean()
{
time_t now = std::time(nullptr);
auto it = expiration_queue.begin();
/// Queue is sorted by time, so we need to check only the first
/// values that are less than current time.
while (it != expiration_queue.end() && it->first < now)
{
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheTTLExpirations);
data.erase(it->second);
++it;
}
expiration_queue.erase(expiration_queue.begin(), it);
}
struct SchemaInfo
{
ColumnsDescription columns;
time_t registration_time;
time_t ttl;
time_t valid_until;
};
std::unordered_map<String, SchemaInfo> data;
/// Special queue for checking expired TTLs. It contains pairs
/// (expiration time, key) sorted in ascending order.
std::set<std::pair<time_t, String>> expiration_queue;
std::mutex mutex;
};
}

View File

@ -63,7 +63,7 @@ namespace
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match, std::unordered_map<String, time_t> * last_mod_times)
{
const size_t first_glob = for_match.find_first_of("*?{");
@ -98,13 +98,15 @@ namespace
if (re2::RE2::FullMatch(file_name, matcher))
{
result.push_back(String(ls.file_info[i].mName));
if (last_mod_times)
(*last_mod_times)[result.back()] = ls.file_info[i].mLastMod;
}
}
else if (is_directory && looking_for_directory)
{
if (re2::RE2::FullMatch(file_name, matcher))
{
Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash));
Strings result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash), last_mod_times);
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
@ -120,12 +122,12 @@ namespace
return {uri.substr(begin_of_path), uri.substr(0, begin_of_path)};
}
std::vector<String> getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context)
std::vector<String> getPathsList(const String & path_from_uri, const String & uri_without_path, ContextPtr context, std::unordered_map<String, time_t> * last_mod_times = nullptr)
{
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
return LSWithRegexpMatching("/", fs, path_from_uri);
return LSWithRegexpMatching("/", fs, path_from_uri, last_mod_times);
}
}
@ -184,7 +186,8 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
ContextPtr ctx)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
auto paths = getPathsList(path_from_uri, uri, ctx);
std::unordered_map<String, time_t> last_mod_time;
auto paths = getPathsList(path_from_uri, uri, ctx, &last_mod_time);
if (paths.empty() && !FormatFactory::instance().checkIfFormatHasExternalSchemaReader(format))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
@ -192,7 +195,11 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
"specify table structure manually",
format);
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
std::optional<ColumnsDescription> columns_from_cache;
if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference)
columns_from_cache = tryGetColumnsFromCache(paths, last_mod_time);
ReadBufferIterator read_buffer_iterator = [&, uri_without_path = uri_without_path, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == paths.end())
return nullptr;
@ -200,7 +207,13 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, *it++, ctx->getGlobalContext()->getConfigRef()), compression);
};
return readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
auto columns = readSchemaFromFormat(format, std::nullopt, read_buffer_iterator, paths.size() > 1, ctx);
if (ctx->getSettingsRef().use_cache_for_hdfs_schema_inference)
addColumnsToCache(paths, columns, ctx);
return columns;
}
class HDFSSource::DisclosedGlobIterator::Impl
@ -700,6 +713,31 @@ NamesAndTypesList StorageHDFS::getVirtuals() const
return virtual_columns;
}
SchemaCache & StorageHDFS::getSchemaCache()
{
static SchemaCache schema_cache;
return schema_cache;
}
std::optional<ColumnsDescription> StorageHDFS::tryGetColumnsFromCache(const Strings & paths, std::unordered_map<String, time_t> & last_mod_time)
{
auto & schema_cache = getSchemaCache();
for (const auto & path : paths)
{
auto columns = schema_cache.tryGet(path, [&](){ return last_mod_time[path]; });
if (columns)
return columns;
}
return std::nullopt;
}
void StorageHDFS::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx)
{
auto & schema_cache = getSchemaCache();
schema_cache.addMany(paths, columns, ctx->getSettingsRef().cache_ttl_for_hdfs_schema_inference.totalSeconds());
}
}
#endif

View File

@ -6,6 +6,7 @@
#include <Processors/ISource.h>
#include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Poco/URI.h>
#include <Common/logger_useful.h>
@ -69,6 +70,10 @@ protected:
friend class HDFSSource;
private:
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & paths, std::unordered_map<String, time_t> & last_mod_time);
static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & ctx);
std::vector<const String> uris;
String format_name;
String compression_method;

View File

@ -262,7 +262,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
/// in case of file descriptor we have a stream of data and we cannot
/// start reading data from the beginning after reading some data for
/// schema inference.
ReadBufferIterator read_buffer_iterator = [&]()
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer.
@ -306,7 +306,11 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
"table structure manually",
format);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()]() mutable -> std::unique_ptr<ReadBuffer>
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().use_cache_for_file_schema_inference)
columns_from_cache = tryGetColumnsFromCache(paths);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == paths.end())
return nullptr;
@ -314,7 +318,16 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return createReadBuffer(*it++, false, "File", -1, compression_method, context);
};
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
if (context->getSettingsRef().use_cache_for_file_schema_inference)
addColumnsToCache(paths, columns, context);
return columns;
}
bool StorageFile::supportsSubsetOfColumns() const
@ -1190,4 +1203,40 @@ NamesAndTypesList StorageFile::getVirtuals() const
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
SchemaCache & StorageFile::getSchemaCache()
{
static SchemaCache schema_cache;
return schema_cache;
}
std::optional<ColumnsDescription> StorageFile::tryGetColumnsFromCache(const Strings & paths)
{
/// Check if the cache contains one of the paths.
auto & schema_cache = getSchemaCache();
struct stat file_stat{};
for (const auto & path : paths)
{
auto get_last_mod_time = [&]()
{
if (0 != stat(path.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + path, ErrorCodes::CANNOT_STAT);
return file_stat.st_mtim.tv_sec;
};
auto columns = schema_cache.tryGet(path, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageFile::addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
schema_cache.addMany(paths, columns, context->getSettingsRef().cache_ttl_for_file_schema_inference.totalSeconds());
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Common/logger_useful.h>
@ -93,6 +94,10 @@ protected:
private:
void setStorageMetadata(CommonArguments args);
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & paths);
static void addColumnsToCache(const Strings & paths, const ColumnsDescription & columns, const ContextPtr & context);
std::string format_name;
// We use format settings from global context + CREATE query for File table
// function -- in this case, format_settings is set.

View File

@ -11,7 +11,6 @@
#include <IO/S3Common.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -91,12 +90,16 @@ public:
const S3::URI & globbed_uri_,
ASTPtr & query_,
const Block & virtual_header_,
ContextPtr context_)
ContextPtr context_,
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
: WithContext(context_)
, client(client_)
, globbed_uri(globbed_uri_)
, query(query_)
, virtual_header(virtual_header_)
, object_infos(object_infos_)
, read_keys(read_keys_)
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -188,6 +191,8 @@ private:
if (re2::RE2::FullMatch(key, *matcher))
{
String path = fs::path(globbed_uri.bucket) / key;
if (object_infos)
(*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000};
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
@ -222,6 +227,9 @@ private:
/// It returns false when all objects were returned
is_finished = !outcome.GetResult().GetIsTruncated();
if (read_keys)
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
}
std::mutex mutex;
@ -236,6 +244,8 @@ private:
Aws::S3::Model::ListObjectsV2Outcome outcome;
std::unique_ptr<re2::RE2> matcher;
bool is_finished{false};
std::unordered_map<String, S3::ObjectInfo> * object_infos;
Strings * read_keys;
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
@ -243,8 +253,10 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context))
ContextPtr context,
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_))
{
}
@ -396,7 +408,8 @@ StorageS3Source::StorageS3Source(
const String & bucket_,
const String & version_id_,
std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_)
const size_t download_thread_num_,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, name(std::move(name_))
@ -413,6 +426,7 @@ StorageS3Source::StorageS3Source(
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
, object_infos(object_infos_)
{
initialize();
}
@ -455,7 +469,12 @@ bool StorageS3Source::initialize()
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
{
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false);
size_t object_size;
auto it = object_infos.find(fs::path(bucket) / key);
if (it != object_infos.end())
object_size = it->second.size;
else
object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false);
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
@ -785,27 +804,34 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks)
const std::vector<String> & read_tasks,
std::unordered_map<String, S3::ObjectInfo> * object_infos,
Strings * read_keys)
{
if (distributed_processing)
{
return std::make_shared<StorageS3Source::IteratorWrapper>(
[read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback())]() -> String
[read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback()), read_keys]() -> String
{
return read_tasks_iterator->next();
auto key = read_tasks_iterator->next();
if (read_keys)
read_keys->push_back(key);
return key;
});
}
else if (is_key_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context);
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
}
else
{
auto keys_iterator
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
if (read_keys)
*read_keys = keys;
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
}
}
@ -850,7 +876,8 @@ Pipe StorageS3::read(
local_context,
query_info.query,
virtual_block,
read_tasks_used_in_schema_inference);
read_tasks_used_in_schema_inference,
&object_infos);
ColumnsDescription columns_description;
Block block_for_format;
@ -893,7 +920,8 @@ Pipe StorageS3::read(
s3_configuration.uri.bucket,
s3_configuration.uri.version_id,
iterator_wrapper,
max_download_threads));
max_download_threads,
object_infos));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -1130,11 +1158,12 @@ ColumnsDescription StorageS3::getTableStructureFromData(
const String & compression_method,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1145,12 +1174,20 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing)
std::vector<String> * read_keys_in_distributed_processing,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {});
std::vector<String> read_keys;
ReadBufferIterator read_buffer_iterator = [&, first = false]() mutable -> std::unique_ptr<ReadBuffer>
auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {}, {}, object_infos, &read_keys);
std::optional<ColumnsDescription> columns_from_cache;
size_t prev_read_keys_size = read_keys.size();
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference)
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos);
ReadBufferIterator read_buffer_iterator = [&, first = false](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{
auto key = (*file_iterator)();
@ -1166,8 +1203,17 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
return nullptr;
}
if (distributed_processing && read_keys_in_distributed_processing)
read_keys_in_distributed_processing->push_back(key);
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos);
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
{
cached_columns = *columns_from_cache;
return nullptr;
}
}
first = false;
return wrapReadBufferWithCompressionMethod(
@ -1176,7 +1222,19 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
chooseCompressionMethod(key, compression_method));
};
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
if (ctx->getSettingsRef().use_cache_for_s3_schema_inference)
addColumnsToCache(read_keys, s3_configuration, columns, ctx);
if (distributed_processing && read_keys_in_distributed_processing)
*read_keys_in_distributed_processing = std::move(read_keys);
return columns;
}
@ -1265,6 +1323,56 @@ bool StorageS3::supportsPartitionBy() const
return true;
}
SchemaCache & StorageS3::getSchemaCache()
{
static SchemaCache schema_cache;
return schema_cache;
}
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
auto & schema_cache = getSchemaCache();
for (auto it = begin; it < end; ++it)
{
String path = fs::path(s3_configuration.uri.bucket) / *it;
String cache_key = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / path;
auto get_last_mod_time = [&]()
{
S3::ObjectInfo info;
/// Check if we already have information about this object.
/// If no, request it and remember for possible future usage.
if (object_infos && object_infos->contains(path))
info = (*object_infos)[path];
else
{
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
info = S3::getObjectInfo(s3_configuration.client, s3_configuration.uri.bucket, *it, s3_configuration.uri.version_id, false);
if (object_infos)
(*object_infos)[path] = info;
}
return info.last_modification_time;
};
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
if (columns)
return columns;
}
return std::nullopt;
}
void StorageS3::addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & ctx)
{
auto host_and_bucket = fs::path(s3_configuration.uri.uri.getHost() + std::to_string(s3_configuration.uri.uri.getPort())) / s3_configuration.uri.bucket;
Strings objects;
std::transform(keys.begin(), keys.end(), std::back_inserter(objects), [&](const String & key){ return host_and_bucket / key; });
auto & schema_cache = getSchemaCache();
schema_cache.addMany(objects, columns, ctx->getSettingsRef().cache_ttl_for_s3_schema_inference.totalSeconds());
}
}
#endif

View File

@ -18,6 +18,7 @@
#include <IO/CompressionMethod.h>
#include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
namespace Aws::S3
{
@ -40,7 +41,9 @@ public:
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context);
ContextPtr context,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys_ = nullptr);
String next();
@ -94,7 +97,8 @@ public:
const String & bucket,
const String & version_id,
std::shared_ptr<IteratorWrapper> file_iterator_,
size_t download_thread_num);
size_t download_thread_num,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_);
String getName() const override;
@ -128,6 +132,8 @@ private:
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
std::unordered_map<String, S3::ObjectInfo> object_infos;
/// Recreate ReadBuffer and Pipeline for each file.
bool initialize();
@ -190,7 +196,8 @@ public:
const String & compression_method,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args);
@ -223,6 +230,8 @@ private:
std::vector<String> read_tasks_used_in_schema_inference;
std::unordered_map<String, S3::ObjectInfo> object_infos;
static void updateS3Configuration(ContextPtr, S3Configuration &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(
@ -233,7 +242,9 @@ private:
ContextPtr local_context,
ASTPtr query,
const Block & virtual_block,
const std::vector<String> & read_tasks = {});
const std::vector<String> & read_tasks = {},
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys = nullptr);
static ColumnsDescription getTableStructureFromDataImpl(
const String & format,
@ -243,9 +254,14 @@ private:
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing = nullptr);
std::vector<String> * read_keys_in_distributed_processing = nullptr,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
bool supportsSubsetOfColumns() const override;
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings::const_iterator & begin, const Strings::const_iterator & end, const S3Configuration & s3_configuration, std::unordered_map<String, S3::ObjectInfo> * object_infos);
static void addColumnsToCache(const Strings & keys, const S3Configuration & s3_configuration, const ColumnsDescription & columns, const ContextPtr & context);
};
}

View File

@ -567,8 +567,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
urls_to_check = {uri};
}
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().use_cache_for_url_schema_inference)
columns_from_cache = tryGetColumnsFromCache(urls_to_check);
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()]() mutable -> std::unique_ptr<ReadBuffer>
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == urls_to_check.cend())
return nullptr;
@ -591,7 +594,16 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
return buf;
};
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
ColumnsDescription columns;
if (columns_from_cache)
columns = *columns_from_cache;
else
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, urls_to_check.size() > 1, context);
if (context->getSettingsRef().use_cache_for_url_schema_inference)
addColumnsToCache(urls_to_check, columns, context);
return columns;
}
bool IStorageURLBase::supportsSubsetOfColumns() const
@ -772,6 +784,31 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
}
}
SchemaCache & IStorageURLBase::getSchemaCache()
{
static SchemaCache schema_cache;
return schema_cache;
}
std::optional<ColumnsDescription> IStorageURLBase::tryGetColumnsFromCache(const Strings & urls)
{
auto & schema_cache = getSchemaCache();
for (const auto & url : urls)
{
auto columns = schema_cache.tryGet(url);
if (columns)
return columns;
}
return std::nullopt;
}
void IStorageURLBase::addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context)
{
auto & schema_cache = getSchemaCache();
schema_cache.addMany(urls, columns, context->getSettingsRef().cache_ttl_for_url_schema_inference.totalSeconds());
}
StorageURL::StorageURL(
const String & uri_,
const StorageID & table_id_,

View File

@ -8,6 +8,7 @@
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Storages/StorageFactory.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
namespace DB
@ -97,6 +98,10 @@ protected:
private:
virtual Block getHeaderBlock(const Names & column_names, const StorageSnapshotPtr & storage_snapshot) const = 0;
static SchemaCache & getSchemaCache();
static std::optional<ColumnsDescription> tryGetColumnsFromCache(const Strings & urls);
static void addColumnsToCache(const Strings & urls, const ColumnsDescription & columns, const ContextPtr & context);
};
class StorageURLSink : public SinkToStorage

View File

@ -49,7 +49,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
{
ReadBufferIterator read_buffer_iterator = [&]()
ReadBufferIterator read_buffer_iterator = [&](ColumnsDescription &)
{
return std::make_unique<ReadBufferFromString>(data);
};

View File

@ -0,0 +1,9 @@
<?xml version="1.0"?>
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>300</flush_interval_milliseconds>
</query_log>
</clickhouse>

View File

@ -0,0 +1,100 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", stay_alive=True, main_configs=["configs/config.d/query_log.xml"])
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_profile_event_for_query(node, query, profile_event):
node.query('system flush logs')
query = query.replace("'", "\\'")
return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
def test(start_cluster):
node.query("insert into function file('data.jsonl') select * from numbers(100)")
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
node.query("insert into function file('data.jsonl') select * from numbers(100)")
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
node.query(desc_query)
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
time.sleep(2)
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1000"
node.query(desc_query)
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = "desc file('data1.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLUpdates')
assert cache_ttl_updates == 1
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
desc_query = "desc file('data1.jsonl') settings cache_ttl_for_file_schema_inference=1"
node.query(desc_query)
cache_invalidations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
time.sleep(2)
desc_query = "desc file('data.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = "desc file('data1.jsonl')"
node.query(desc_query)
cache_misses = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
node.query("insert into function file('data2.jsonl') select * from numbers(100)")
node.query("insert into function file('data3.jsonl') select * from numbers(100)")
desc_query = "desc file('data*.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = "desc file('data2.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = "desc file('data3.jsonl')"
node.query(desc_query)
cache_hits = get_profile_event_for_query(node, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1

View File

@ -1,6 +1,7 @@
import os
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from pyhdfs import HdfsClient
@ -628,6 +629,99 @@ def test_virtual_columns_2(started_cluster):
assert result.strip() == "kek"
def get_profile_event_for_query(node, query, profile_event):
node.query('system flush logs')
query = query.replace("'", "\\'")
return int(node.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
def test_schema_inference_cache(started_cluster):
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
time.sleep(2)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1000"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLUpdates')
assert cache_ttl_updates == 1
node1.query(
f"insert into function hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1"
)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl') settings cache_ttl_for_hdfs_schema_inference=1"
node1.query(desc_query)
cache_invalidations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
time.sleep(2)
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache1.jsonl')"
node1.query(desc_query)
cache_misses = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache2.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1")
node1.query(f"insert into function hdfs('hdfs://hdfs1:9000/test_cache3.jsonl') select * from numbers(100) settings hdfs_truncate_on_insert=1")
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache*.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache2.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc hdfs('hdfs://hdfs1:9000/test_cache3.jsonl')"
node1.query(desc_query)
cache_hits = get_profile_event_for_query(node1, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -1483,3 +1483,152 @@ def test_wrong_format_usage(started_cluster):
)
assert "Not a Parquet file" in result
def get_profile_event_for_query(instance, query, profile_event):
instance.query('system flush logs')
query = query.replace("'", "\\'")
return int(instance.query(f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"))
def test_schema_inference_cache(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
time.sleep(2)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1000"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates')
assert cache_ttl_updates == 1
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1"
)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_s3_schema_inference=1"
instance.query(desc_query)
cache_invalidations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheInvalidations')
assert cache_invalidations == 1
time.sleep(2)
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
instance.query(f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl') select * from numbers(100) settings s3_truncate_on_insert=1")
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache*.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=1"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
time.sleep(2)
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_expirations = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLExpirations')
assert cache_ttl_expirations == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl') settings cache_ttl_for_url_schema_inference=10000"
instance.query(desc_query)
cache_misses = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheMisses')
assert cache_misses == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache1.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
cache_ttl_updates = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheTTLUpdates')
assert cache_ttl_updates == 1
file_name = 'test_cache{1,2,3}.jsonl'
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name}')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache2.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1
desc_query = f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
instance.query(desc_query)
cache_hits = get_profile_event_for_query(instance, desc_query, 'SchemaInferenceCacheHits')
assert cache_hits == 1