This commit is contained in:
kssenii 2024-05-23 13:48:23 +02:00
parent 1e5069b5dc
commit f1c191a3cb
6 changed files with 44 additions and 35 deletions

View File

@ -100,6 +100,10 @@ AzureObjectStorage::SettingsPtr StorageAzureConfiguration::createSettings(Contex
settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings_ptr->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
settings_ptr->strict_upload_part_size = context_settings.azure_strict_upload_part_size;
settings_ptr->max_upload_part_size = context_settings.azure_max_upload_part_size;
settings_ptr->max_blocks_in_multipart_upload = context_settings.azure_max_blocks_in_multipart_upload;
settings_ptr->min_upload_part_size = context_settings.azure_min_upload_part_size;
return settings_ptr;
}

View File

@ -35,9 +35,10 @@ ReadBufferIterator::ReadBufferIterator(
format = configuration->format;
}
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const String & path, const String & format_name) const
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const
{
auto source = std::filesystem::path(configuration->getDataSourceDescription()) / path;
chassert(!object_info.getPath().starts_with("/"));
auto source = std::filesystem::path(configuration->getDataSourceDescription()) / object_info.getPath();
return DB::getKeyForSchemaCache(source, format_name, format_settings, getContext());
}
@ -50,6 +51,7 @@ SchemaCache::Keys ReadBufferIterator::getKeysForSchemaCache() const
std::back_inserter(sources),
[&](const auto & elem)
{
chassert(!elem->getPath().starts_with("/"));
return std::filesystem::path(configuration->getDataSourceDescription()) / elem->getPath();
});
return DB::getKeysForSchemaCache(sources, *format, format_settings, getContext());
@ -78,7 +80,7 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
if (format)
{
auto cache_key = getKeyForSchemaCache(object_info->getPath(), *format);
const auto cache_key = getKeyForSchemaCache(*object_info, *format);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
return columns;
}
@ -89,7 +91,7 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
/// If we have such entry for some format, we can use this format to read the file.
for (const auto & format_name : FormatFactory::instance().getAllInputFormats())
{
auto cache_key = getKeyForSchemaCache(object_info->getPath(), format_name);
const auto cache_key = getKeyForSchemaCache(*object_info, format_name);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
{
/// Now format is known. It should be the same for all files.
@ -99,14 +101,13 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
}
}
}
return std::nullopt;
}
void ReadBufferIterator::setNumRowsToLastFile(size_t num_rows)
{
if (query_settings.schema_inference_use_cache)
schema_cache.addNumRows(getKeyForSchemaCache(current_object_info->getPath(), *format), num_rows);
schema_cache.addNumRows(getKeyForSchemaCache(*current_object_info, *format), num_rows);
}
void ReadBufferIterator::setSchemaToLastFile(const ColumnsDescription & columns)
@ -114,7 +115,7 @@ void ReadBufferIterator::setSchemaToLastFile(const ColumnsDescription & columns)
if (query_settings.schema_inference_use_cache
&& query_settings.schema_inference_mode == SchemaInferenceMode::UNION)
{
schema_cache.addColumns(getKeyForSchemaCache(current_object_info->getPath(), *format), columns);
schema_cache.addColumns(getKeyForSchemaCache(*current_object_info, *format), columns);
}
}
@ -135,7 +136,7 @@ void ReadBufferIterator::setFormatName(const String & format_name)
String ReadBufferIterator::getLastFileName() const
{
if (current_object_info)
return current_object_info->getFileName();
return current_object_info->getPath();
else
return "";
}
@ -255,17 +256,21 @@ ReadBufferIterator::Data ReadBufferIterator::next()
}
}
LOG_TEST(getLogger("KSSENII"), "Will read columns from {}", current_object_info->getPath());
std::unique_ptr<ReadBuffer> read_buf;
CompressionMethod compression_method;
using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive;
if (const auto * object_info_in_archive = dynamic_cast<const ObjectInfoInArchive *>(current_object_info.get()))
{
compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method);
LOG_TEST(getLogger("KSSENII"), "Will read columns from {} from archive", current_object_info->getPath());
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
const auto & archive_reader = object_info_in_archive->archive_reader;
read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true);
}
else
{
LOG_TEST(getLogger("KSSENII"), "Will read columns from {} from s3", current_object_info->getPath());
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
read_buf = object_storage->readObject(
StoredObject(current_object_info->getPath()),

View File

@ -13,6 +13,7 @@ public:
using FileIterator = std::shared_ptr<StorageObjectStorageSource::IIterator>;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ObjectInfoPtr = StorageObjectStorage::ObjectInfoPtr;
using ObjectInfo = StorageObjectStorage::ObjectInfo;
using ObjectInfos = StorageObjectStorage::ObjectInfos;
ReadBufferIterator(
@ -41,7 +42,7 @@ public:
std::unique_ptr<ReadBuffer> recreateLastReadBuffer() override;
private:
SchemaCache::Key getKeyForSchemaCache(const String & path, const String & format_name) const;
SchemaCache::Key getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const;
SchemaCache::Keys getKeysForSchemaCache() const;
std::optional<ColumnsDescription> tryGetColumnsFromCache(
const ObjectInfos::iterator & begin, const ObjectInfos::iterator & end);

View File

@ -183,14 +183,14 @@ Chunk StorageObjectStorageSource::generate()
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk,
read_from_format_info.requested_virtual_columns,
fs::path(configuration->getNamespace()) / reader.getRelativePath(),
fs::path(configuration->getNamespace()) / reader.getObjectInfo().getPath(),
object_info.metadata->size_bytes, &filename);
return chunk;
}
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
addNumRowsToCache(reader.getRelativePath(), total_rows_in_file);
addNumRowsToCache(reader.getObjectInfo(), total_rows_in_file);
total_rows_in_file = 0;
@ -209,29 +209,28 @@ Chunk StorageObjectStorageSource::generate()
return {};
}
void StorageObjectStorageSource::addNumRowsToCache(const String & path, size_t num_rows)
void StorageObjectStorageSource::addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows)
{
const auto cache_key = getKeyForSchemaCache(
fs::path(configuration->getDataSourceDescription()) / path,
fs::path(configuration->getDataSourceDescription()) / object_info.getPath(),
configuration->format,
format_settings,
getContext());
schema_cache.addNumRows(cache_key, num_rows);
}
std::optional<size_t> StorageObjectStorageSource::tryGetNumRowsFromCache(const ObjectInfoPtr & object_info)
std::optional<size_t> StorageObjectStorageSource::tryGetNumRowsFromCache(const ObjectInfo & object_info)
{
const auto cache_key = getKeyForSchemaCache(
fs::path(configuration->getDataSourceDescription()) / object_info->getPath(),
fs::path(configuration->getDataSourceDescription()) / object_info.getPath(),
configuration->format,
format_settings,
getContext());
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
return object_info->metadata
? std::optional<size_t>(object_info->metadata->last_modified.epochTime())
return object_info.metadata
? std::optional<size_t>(object_info.metadata->last_modified.epochTime())
: std::nullopt;
};
return schema_cache.tryGetNumRows(cache_key, get_last_mod_time);
@ -263,7 +262,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
std::optional<size_t> num_rows_from_cache = need_only_count
&& getContext()->getSettingsRef().use_cache_for_count_from_files
? tryGetNumRowsFromCache(object_info)
? tryGetNumRowsFromCache(*object_info)
: std::nullopt;
if (num_rows_from_cache)
@ -505,7 +504,6 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
index = 0;
LOG_TEST(logger, "Filter: {}", filter_dag != nullptr);
if (filter_dag)
{
std::vector<String> paths;

View File

@ -94,7 +94,6 @@ protected:
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
std::string getRelativePath() const { return object_info->getPath(); }
const ObjectInfo & getObjectInfo() const { return *object_info; }
const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); }
@ -115,8 +114,8 @@ protected:
std::future<ReaderHolder> createReaderAsync(size_t processor = 0);
std::unique_ptr<ReadBuffer> createReadBuffer(const ObjectInfo & object_info);
void addNumRowsToCache(const String & path, size_t num_rows);
std::optional<size_t> tryGetNumRowsFromCache(const ObjectInfoPtr & object_info);
void addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows);
std::optional<size_t> tryGetNumRowsFromCache(const ObjectInfo & object_info);
void lazyInitialize(size_t processor);
};

View File

@ -238,12 +238,14 @@ Chunk StorageS3QueueSource::generate()
key_with_info->relative_path, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getRelativePath(), *file_status, processed_rows_from_file, false);
appendLogElement(reader.getObjectInfo().getPath(), *file_status, processed_rows_from_file, false);
}
break;
}
const auto & path = reader.getObjectInfo().getPath();
if (shutdown_called)
{
if (processed_rows_from_file == 0)
@ -253,7 +255,7 @@ Chunk StorageS3QueueSource::generate()
{
LOG_DEBUG(
log, "Table is being dropped, {} rows are already processed from {}, but file is not fully processed",
processed_rows_from_file, reader.getRelativePath());
processed_rows_from_file, path);
try
{
@ -265,7 +267,7 @@ Chunk StorageS3QueueSource::generate()
key_with_info->relative_path, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getRelativePath(), *file_status, processed_rows_from_file, false);
appendLogElement(path, *file_status, processed_rows_from_file, false);
/// Leave the file half processed. Table is being dropped, so we do not care.
break;
@ -273,7 +275,7 @@ Chunk StorageS3QueueSource::generate()
LOG_DEBUG(log, "Shutdown called, but file {} is partially processed ({} rows). "
"Will process the file fully and then shutdown",
reader.getRelativePath(), processed_rows_from_file);
path, processed_rows_from_file);
}
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
@ -287,31 +289,31 @@ Chunk StorageS3QueueSource::generate()
Chunk chunk;
if (reader->pull(chunk))
{
LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), reader.getRelativePath());
LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), path);
file_status->processed_rows += chunk.getNumRows();
processed_rows_from_file += chunk.getNumRows();
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(
chunk, requested_virtual_columns, reader.getRelativePath(), reader.getObjectInfo().metadata->size_bytes);
chunk, requested_virtual_columns, path, reader.getObjectInfo().metadata->size_bytes);
return chunk;
}
}
catch (...)
{
const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getRelativePath(), message);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", path, message);
files_metadata->setFileFailed(key_with_info->processing_holder, message);
appendLogElement(reader.getRelativePath(), *file_status, processed_rows_from_file, false);
appendLogElement(path, *file_status, processed_rows_from_file, false);
throw;
}
files_metadata->setFileProcessed(key_with_info->processing_holder);
applyActionAfterProcessing(reader.getRelativePath());
applyActionAfterProcessing(path);
appendLogElement(reader.getRelativePath(), *file_status, processed_rows_from_file, true);
appendLogElement(path, *file_status, processed_rows_from_file, true);
file_status.reset();
processed_rows_from_file = 0;
@ -327,7 +329,7 @@ Chunk StorageS3QueueSource::generate()
if (!reader)
break;
file_status = files_metadata->getFileStatus(reader.getRelativePath());
file_status = files_metadata->getFileStatus(reader.getObjectInfo().getPath());
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.