Merge pull request #48190 from CurtizJ/fix-race-storage-s3

Fix race in StorageS3
This commit is contained in:
robot-ch-test-poll1 2023-04-03 10:23:27 +02:00 committed by GitHub
commit 3cb7d48b75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 127 deletions

View File

@ -39,7 +39,7 @@ public:
std::optional<FormatSettings> format_settings_) std::optional<FormatSettings> format_settings_)
: Storage( : Storage(
getAdjustedConfiguration( getAdjustedConfiguration(
context_, Storage::updateConfiguration(context_, configuration_), &Poco::Logger::get("Storage" + String(name))), context_, Storage::copyAndUpdateConfiguration(context_, configuration_), &Poco::Logger::get("Storage" + String(name))),
table_id_, table_id_,
columns_, columns_,
constraints_, constraints_,
@ -59,7 +59,7 @@ public:
auto new_configuration = getAdjustedConfiguration(ctx, configuration, &Poco::Logger::get("Storage" + String(name))); auto new_configuration = getAdjustedConfiguration(ctx, configuration, &Poco::Logger::get("Storage" + String(name)));
return Storage::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr); return Storage::getTableStructureFromData(new_configuration, format_settings, ctx);
} }
static Configuration static Configuration

View File

@ -150,15 +150,13 @@ public:
ASTPtr & query_, ASTPtr & query_,
const Block & virtual_header_, const Block & virtual_header_,
ContextPtr context_, ContextPtr context_,
ObjectInfos * object_infos_, KeysWithInfo * read_keys_,
Strings * read_keys_,
const S3Settings::RequestSettings & request_settings_) const S3Settings::RequestSettings & request_settings_)
: WithContext(context_) : WithContext(context_)
, client(S3::Client::create(client_)) , client(S3::Client::create(client_))
, globbed_uri(globbed_uri_) , globbed_uri(globbed_uri_)
, query(query_) , query(query_)
, virtual_header(virtual_header_) , virtual_header(virtual_header_)
, object_infos(object_infos_)
, read_keys(read_keys_) , read_keys(read_keys_)
, request_settings(request_settings_) , request_settings(request_settings_)
, list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1) , list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
@ -289,9 +287,6 @@ private:
.last_modification_time = row.GetLastModified().Millis() / 1000, .last_modification_time = row.GetLastModified().Millis() / 1000,
}; };
if (object_infos)
(*object_infos)[fs::path(globbed_uri.bucket) / key] = info;
temp_buffer.emplace_back(std::move(key), std::move(info)); temp_buffer.emplace_back(std::move(key), std::move(info));
} }
} }
@ -335,11 +330,7 @@ private:
buffer_iter = buffer.begin(); buffer_iter = buffer.begin();
if (read_keys) if (read_keys)
{ read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
read_keys->reserve(read_keys->size() + buffer.size());
for (const auto & [key, _] : buffer)
read_keys->push_back(key);
}
} }
void createFilterAST(const String & any_key) void createFilterAST(const String & any_key)
@ -385,8 +376,7 @@ private:
std::unique_ptr<re2::RE2> matcher; std::unique_ptr<re2::RE2> matcher;
bool recursive{false}; bool recursive{false};
bool is_finished{false}; bool is_finished{false};
ObjectInfos * object_infos; KeysWithInfo * read_keys;
Strings * read_keys;
S3::ListObjectsV2Request request; S3::ListObjectsV2Request request;
S3Settings::RequestSettings request_settings; S3Settings::RequestSettings request_settings;
@ -403,10 +393,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context, ContextPtr context,
ObjectInfos * object_infos_, KeysWithInfo * read_keys_,
Strings * read_keys_,
const S3Settings::RequestSettings & request_settings_) const S3Settings::RequestSettings & request_settings_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_, request_settings_)) : pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, read_keys_, request_settings_))
{ {
} }
@ -432,8 +421,7 @@ public:
ASTPtr query_, ASTPtr query_,
const Block & virtual_header_, const Block & virtual_header_,
ContextPtr context_, ContextPtr context_,
ObjectInfos * object_infos_, KeysWithInfo * read_keys_)
Strings * read_keys_)
: WithContext(context_) : WithContext(context_)
, bucket(bucket_) , bucket(bucket_)
, query(query_) , query(query_)
@ -471,26 +459,15 @@ public:
} }
} }
if (read_keys_)
*read_keys_ = all_keys;
for (auto && key : all_keys) for (auto && key : all_keys)
{ {
std::optional<S3::ObjectInfo> info; auto info = S3::getObjectInfo(client_, bucket, key, version_id_, request_settings_);
total_size += info.size;
/// To avoid extra requests update total_size only if object_infos != nullptr
/// (which means we eventually need this info anyway, so it should be ok to do it now)
if (object_infos_)
{
info = S3::getObjectInfo(client_, bucket, key, version_id_, request_settings_);
total_size += info->size;
String path = fs::path(bucket) / key;
(*object_infos_)[std::move(path)] = *info;
}
keys.emplace_back(std::move(key), std::move(info)); keys.emplace_back(std::move(key), std::move(info));
} }
if (read_keys_)
*read_keys_ = keys;
} }
KeyWithInfo next() KeyWithInfo next()
@ -527,11 +504,10 @@ StorageS3Source::KeysIterator::KeysIterator(
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context, ContextPtr context,
ObjectInfos * object_infos, KeysWithInfo * read_keys)
Strings * read_keys)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>( : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
client_, version_id_, keys_, bucket_, request_settings_, client_, version_id_, keys_, bucket_, request_settings_,
query, virtual_header, context, object_infos, read_keys)) query, virtual_header, context, read_keys))
{ {
} }
@ -904,12 +880,12 @@ public:
private: private:
const String format; const String format;
const Block sample_block; const Block sample_block;
ContextPtr context; const ContextPtr context;
const CompressionMethod compression_method; const CompressionMethod compression_method;
const StorageS3::Configuration & s3_configuration; const StorageS3::Configuration s3_configuration;
const String bucket; const String bucket;
const String key; const String key;
std::optional<FormatSettings> format_settings; const std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr; ExpressionActionsPtr partition_by_expr;
@ -974,8 +950,7 @@ StorageS3::StorageS3(
compression_method, compression_method,
is_key_with_globs, is_key_with_globs,
format_settings, format_settings,
context_, context_);
&object_infos);
storage_metadata.setColumns(columns); storage_metadata.setColumns(columns);
} }
@ -1004,8 +979,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
ContextPtr local_context, ContextPtr local_context,
ASTPtr query, ASTPtr query,
const Block & virtual_block, const Block & virtual_block,
ObjectInfos * object_infos, KeysWithInfo * read_keys)
Strings * read_keys)
{ {
if (distributed_processing) if (distributed_processing)
{ {
@ -1016,14 +990,14 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
/// Iterate through disclosed globs and make a source for each file /// Iterate through disclosed globs and make a source for each file
return std::make_shared<StorageS3Source::DisclosedGlobIterator>( return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_block, *s3_configuration.client, s3_configuration.url, query, virtual_block,
local_context, object_infos, read_keys, s3_configuration.request_settings); local_context, read_keys, s3_configuration.request_settings);
} }
else else
{ {
return std::make_shared<StorageS3Source::KeysIterator>( return std::make_shared<StorageS3Source::KeysIterator>(
*s3_configuration.client, s3_configuration.url.version_id, keys, *s3_configuration.client, s3_configuration.url.version_id, keys,
s3_configuration.url.bucket, s3_configuration.request_settings, query, virtual_block, local_context, s3_configuration.url.bucket, s3_configuration.request_settings, query,
object_infos, read_keys); virtual_block, local_context, read_keys);
} }
} }
@ -1046,13 +1020,15 @@ Pipe StorageS3::read(
size_t max_block_size, size_t max_block_size,
size_t num_streams) size_t num_streams)
{ {
bool has_wildcards = s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos auto query_s3_configuration = copyAndUpdateConfiguration(local_context, s3_configuration);
bool has_wildcards =
query_s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos; || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
if (partition_by && has_wildcards) if (partition_by && has_wildcards)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
updateConfiguration(local_context, s3_configuration);
Pipes pipes; Pipes pipes;
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end()); std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
@ -1065,14 +1041,13 @@ Pipe StorageS3::read(
} }
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator( std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
s3_configuration, query_s3_configuration,
keys, keys,
is_key_with_globs, is_key_with_globs,
distributed_processing, distributed_processing,
local_context, local_context,
query_info.query, query_info.query,
virtual_block, virtual_block);
&object_infos);
ColumnsDescription columns_description; ColumnsDescription columns_description;
Block block_for_format; Block block_for_format;
@ -1109,11 +1084,11 @@ Pipe StorageS3::read(
format_settings, format_settings,
columns_description, columns_description,
max_block_size, max_block_size,
s3_configuration.request_settings, query_s3_configuration.request_settings,
compression_method, compression_method,
s3_configuration.client, query_s3_configuration.client,
s3_configuration.url.bucket, query_s3_configuration.url.bucket,
s3_configuration.url.version_id, query_s3_configuration.url.version_id,
iterator_wrapper, iterator_wrapper,
max_download_threads)); max_download_threads));
} }
@ -1126,11 +1101,11 @@ Pipe StorageS3::read(
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{ {
updateConfiguration(local_context, s3_configuration); auto query_s3_configuration = copyAndUpdateConfiguration(local_context, s3_configuration);
auto sample_block = metadata_snapshot->getSampleBlock(); auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method); auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
bool has_wildcards = s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; bool has_wildcards = query_s3_configuration.url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query); auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@ -1145,19 +1120,19 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
local_context, local_context,
format_settings, format_settings,
chosen_compression_method, chosen_compression_method,
s3_configuration, query_s3_configuration,
s3_configuration.url.bucket, query_s3_configuration.url.bucket,
keys.back()); keys.back());
} }
else else
{ {
if (is_key_with_globs) if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.url.key); "S3 key '{}' contains globs, so the table is in readonly mode", query_s3_configuration.url.key);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert; bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && S3::objectExists(*s3_configuration.client, s3_configuration.url.bucket, keys.back(), s3_configuration.url.version_id, s3_configuration.request_settings)) if (!truncate_in_insert && S3::objectExists(*query_s3_configuration.client, query_s3_configuration.url.bucket, keys.back(), query_s3_configuration.url.version_id, query_s3_configuration.request_settings))
{ {
if (local_context->getSettingsRef().s3_create_new_file_on_insert) if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{ {
@ -1169,7 +1144,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos)); new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index; ++index;
} }
while (S3::objectExists(*s3_configuration.client, s3_configuration.url.bucket, new_key, s3_configuration.url.version_id, s3_configuration.request_settings)); while (S3::objectExists(*query_s3_configuration.client, query_s3_configuration.url.bucket, new_key, query_s3_configuration.url.version_id, query_s3_configuration.request_settings));
keys.push_back(new_key); keys.push_back(new_key);
} }
else else
@ -1178,7 +1153,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
"Object in bucket {} with key {} already exists. " "Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you " "If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert", "want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
s3_configuration.url.bucket, query_s3_configuration.url.bucket,
keys.back()); keys.back());
} }
@ -1188,19 +1163,19 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
local_context, local_context,
format_settings, format_settings,
chosen_compression_method, chosen_compression_method,
s3_configuration, query_s3_configuration,
s3_configuration.url.bucket, query_s3_configuration.url.bucket,
keys.back()); keys.back());
} }
} }
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{ {
updateConfiguration(local_context, s3_configuration); auto query_s3_configuration = copyAndUpdateConfiguration(local_context, s3_configuration);
if (is_key_with_globs) if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.url.key); "S3 key '{}' contains globs, so the table is in readonly mode", query_s3_configuration.url.key);
Aws::S3::Model::Delete delkeys; Aws::S3::Model::Delete delkeys;
@ -1213,10 +1188,10 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
ProfileEvents::increment(ProfileEvents::S3DeleteObjects); ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
S3::DeleteObjectsRequest request; S3::DeleteObjectsRequest request;
request.SetBucket(s3_configuration.url.bucket); request.SetBucket(query_s3_configuration.url.bucket);
request.SetDelete(delkeys); request.SetDelete(delkeys);
auto response = s3_configuration.client->DeleteObjects(request); auto response = query_s3_configuration.client->DeleteObjects(request);
if (!response.IsSuccess()) if (!response.IsSuccess())
{ {
const auto & err = response.GetError(); const auto & err = response.GetError();
@ -1228,7 +1203,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
} }
StorageS3::Configuration StorageS3::updateConfiguration(ContextPtr local_context, const StorageS3::Configuration & configuration) StorageS3::Configuration StorageS3::copyAndUpdateConfiguration(ContextPtr local_context, const StorageS3::Configuration & configuration)
{ {
StorageS3::Configuration new_configuration(configuration); StorageS3::Configuration new_configuration(configuration);
updateConfiguration(local_context, new_configuration); updateConfiguration(local_context, new_configuration);
@ -1430,15 +1405,14 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
} }
ColumnsDescription StorageS3::getTableStructureFromData( ColumnsDescription StorageS3::getTableStructureFromData(
StorageS3::Configuration & configuration, const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx)
ObjectInfos * object_infos)
{ {
updateConfiguration(ctx, configuration); auto query_s3_configuration = copyAndUpdateConfiguration(ctx, configuration);
return getTableStructureFromDataImpl( return getTableStructureFromDataImpl(
configuration.format, configuration, configuration.compression_method, query_s3_configuration.format, query_s3_configuration, query_s3_configuration.compression_method,
configuration.url.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos); query_s3_configuration.url.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
} }
ColumnsDescription StorageS3::getTableStructureFromDataImpl( ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1447,10 +1421,9 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const String & compression_method, const String & compression_method,
bool is_key_with_globs, bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx)
ObjectInfos * object_infos)
{ {
std::vector<String> read_keys; KeysWithInfo read_keys;
auto file_iterator = createFileIterator( auto file_iterator = createFileIterator(
s3_configuration, s3_configuration,
@ -1458,12 +1431,12 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
is_key_with_globs, is_key_with_globs,
false, false,
ctx, nullptr, ctx, nullptr,
{}, object_infos, &read_keys); {}, &read_keys);
std::optional<ColumnsDescription> columns_from_cache; std::optional<ColumnsDescription> columns_from_cache;
size_t prev_read_keys_size = read_keys.size(); size_t prev_read_keys_size = read_keys.size();
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) if (ctx->getSettingsRef().schema_inference_use_cache_for_s3)
columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx); columns_from_cache = tryGetColumnsFromCache(read_keys.begin(), read_keys.end(), s3_configuration, format, format_settings, ctx);
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer> ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{ {
@ -1483,7 +1456,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
/// S3 file iterator could get new keys after new iteration, check them in schema cache. /// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size) if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
{ {
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, object_infos, format, format_settings, ctx); columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), s3_configuration, format, format_settings, ctx);
prev_read_keys_size = read_keys.size(); prev_read_keys_size = read_keys.size();
if (columns_from_cache) if (columns_from_cache)
{ {
@ -1604,10 +1577,9 @@ SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
} }
std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache( std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
const Strings::const_iterator & begin, const KeysWithInfo::const_iterator & begin,
const Strings::const_iterator & end, const KeysWithInfo::const_iterator & end,
const Configuration & s3_configuration, const Configuration & s3_configuration,
ObjectInfos * object_infos,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx) const ContextPtr & ctx)
@ -1615,31 +1587,33 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
auto & schema_cache = getSchemaCache(ctx); auto & schema_cache = getSchemaCache(ctx);
for (auto it = begin; it < end; ++it) for (auto it = begin; it < end; ++it)
{ {
String path = fs::path(s3_configuration.url.bucket) / *it; auto get_last_mod_time = [&]
auto get_last_mod_time = [&]() -> std::optional<time_t>
{ {
S3::ObjectInfo info; time_t last_modification_time = 0;
/// Check if we already have information about this object. if (it->info)
/// If no, request it and remember for possible future usage. {
if (object_infos && object_infos->contains(path)) last_modification_time = it->info->last_modification_time;
info = (*object_infos)[path]; }
else else
{ {
/// Note that in case of exception in getObjectInfo returned info will be empty, /// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache /// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time. /// because we can't say that it's valid without last modification time.
info = S3::getObjectInfo(*s3_configuration.client, s3_configuration.url.bucket, *it, s3_configuration.url.version_id, s3_configuration.request_settings, last_modification_time = S3::getObjectInfo(
{}, {}, /* throw_on_error= */ false); *s3_configuration.client,
if (object_infos) s3_configuration.url.bucket,
(*object_infos)[path] = info; it->key,
s3_configuration.url.version_id,
s3_configuration.request_settings,
/*with_metadata=*/ false,
/*for_disk_s3=*/ false,
/*throw_on_error= */ false).last_modification_time;
} }
if (info.last_modification_time) return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
return info.last_modification_time;
return std::nullopt;
}; };
String path = fs::path(s3_configuration.url.bucket) / it->key;
String source = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / path; String source = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx); auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, ctx);
auto columns = schema_cache.tryGet(cache_key, get_last_mod_time); auto columns = schema_cache.tryGet(cache_key, get_last_mod_time);
@ -1651,7 +1625,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
} }
void StorageS3::addColumnsToCache( void StorageS3::addColumnsToCache(
const Strings & keys, const KeysWithInfo & keys,
const Configuration & s3_configuration, const Configuration & s3_configuration,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const String & format_name, const String & format_name,
@ -1661,7 +1635,7 @@ void StorageS3::addColumnsToCache(
auto host_and_bucket = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / s3_configuration.url.bucket; auto host_and_bucket = fs::path(s3_configuration.url.uri.getHost() + std::to_string(s3_configuration.url.uri.getPort())) / s3_configuration.url.bucket;
Strings sources; Strings sources;
sources.reserve(keys.size()); sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const String & key){ return host_and_bucket / key; }); std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx); auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx); auto & schema_cache = getSchemaCache(ctx);
schema_cache.addMany(cache_keys, columns); schema_cache.addMany(cache_keys, columns);

View File

@ -51,7 +51,7 @@ public:
}; };
using KeysWithInfo = std::vector<KeyWithInfo>; using KeysWithInfo = std::vector<KeyWithInfo>;
using ObjectInfos = std::unordered_map<String, S3::ObjectInfo>;
class IIterator class IIterator
{ {
public: public:
@ -71,8 +71,7 @@ public:
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context, ContextPtr context,
ObjectInfos * object_infos = nullptr, KeysWithInfo * read_keys_ = nullptr,
Strings * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {}); const S3Settings::RequestSettings & request_settings_ = {});
KeyWithInfo next() override; KeyWithInfo next() override;
@ -96,8 +95,7 @@ public:
ASTPtr query, ASTPtr query,
const Block & virtual_header, const Block & virtual_header,
ContextPtr context, ContextPtr context,
ObjectInfos * object_infos = nullptr, KeysWithInfo * read_keys = nullptr);
Strings * read_keys = nullptr);
KeyWithInfo next() override; KeyWithInfo next() override;
size_t getTotalSize() const override; size_t getTotalSize() const override;
@ -288,8 +286,6 @@ public:
bool supportsPartitionBy() const override; bool supportsPartitionBy() const override;
using ObjectInfos = StorageS3Source::ObjectInfos;
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection); static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
static SchemaCache & getSchemaCache(const ContextPtr & ctx); static SchemaCache & getSchemaCache(const ContextPtr & ctx);
@ -297,14 +293,13 @@ public:
static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true); static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
static ColumnsDescription getTableStructureFromData( static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration, const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx);
ObjectInfos * object_infos = nullptr);
protected: protected:
static StorageS3::Configuration updateConfiguration(ContextPtr local_context, const Configuration & configuration); static StorageS3::Configuration copyAndUpdateConfiguration(ContextPtr local_context, const Configuration & configuration);
static void updateConfiguration(ContextPtr, Configuration &); static void updateConfiguration(ContextPtr ctx, StorageS3::Configuration & upd);
private: private:
friend class StorageS3Cluster; friend class StorageS3Cluster;
@ -323,7 +318,7 @@ private:
ASTPtr partition_by; ASTPtr partition_by;
bool is_key_with_globs = false; bool is_key_with_globs = false;
ObjectInfos object_infos; using KeysWithInfo = StorageS3Source::KeysWithInfo;
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator( static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & s3_configuration, const Configuration & s3_configuration,
@ -333,8 +328,7 @@ private:
ContextPtr local_context, ContextPtr local_context,
ASTPtr query, ASTPtr query,
const Block & virtual_block, const Block & virtual_block,
ObjectInfos * object_infos = nullptr, KeysWithInfo * read_keys = nullptr);
Strings * read_keys = nullptr);
static ColumnsDescription getTableStructureFromDataImpl( static ColumnsDescription getTableStructureFromDataImpl(
const String & format, const String & format,
@ -342,24 +336,22 @@ private:
const String & compression_method, const String & compression_method,
bool is_key_with_globs, bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
ContextPtr ctx, ContextPtr ctx);
ObjectInfos * object_infos = nullptr);
bool supportsSubcolumns() const override; bool supportsSubcolumns() const override;
bool supportsSubsetOfColumns() const override; bool supportsSubsetOfColumns() const override;
static std::optional<ColumnsDescription> tryGetColumnsFromCache( static std::optional<ColumnsDescription> tryGetColumnsFromCache(
const Strings::const_iterator & begin, const KeysWithInfo::const_iterator & begin,
const Strings::const_iterator & end, const KeysWithInfo::const_iterator & end,
const Configuration & s3_configuration, const Configuration & s3_configuration,
ObjectInfos * object_infos,
const String & format_name, const String & format_name,
const std::optional<FormatSettings> & format_settings, const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx); const ContextPtr & ctx);
static void addColumnsToCache( static void addColumnsToCache(
const Strings & keys, const KeysWithInfo & keys,
const Configuration & s3_configuration, const Configuration & s3_configuration,
const ColumnsDescription & columns, const ColumnsDescription & columns,
const String & format_name, const String & format_name,

View File

@ -0,0 +1 @@
OK

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
filename="test_${CLICKHOUSE_DATABASE}_${RANDOM}"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_s3_race"
$CLICKHOUSE_CLIENT --query "CREATE TABLE test_s3_race (u UInt64) ENGINE = S3(s3_conn, filename='$filename', format='CSV')"
$CLICKHOUSE_CLIENT --s3_truncate_on_insert 1 --query "INSERT INTO test_s3_race VALUES (1)"
$CLICKHOUSE_BENCHMARK -i 100 -c 4 <<< "SELECT * FROM test_s3_race" >/dev/null 2>&1
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_s3_race"
echo "OK"