mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Remove logs
This commit is contained in:
parent
436f6463c0
commit
27ab6aa8b5
@ -1,18 +1,15 @@
|
|||||||
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
|
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
|
||||||
|
|
||||||
#include <exception>
|
#include <Interpreters/Context.h>
|
||||||
#include <filesystem>
|
#include <Common/filesystemHelpers.h>
|
||||||
#include <stdexcept>
|
#include <Common/logger_useful.h>
|
||||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
|
||||||
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
|
||||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||||
|
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include <Poco/Logger.h>
|
|
||||||
#include <Common/filesystemHelpers.h>
|
|
||||||
#include <Common/getRandomASCIIString.h>
|
#include <Common/getRandomASCIIString.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <filesystem>
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
@ -58,7 +55,6 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
|
|||||||
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
|
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
||||||
std::move(read_buffer_creator),
|
std::move(read_buffer_creator),
|
||||||
objects,
|
objects,
|
||||||
|
@ -200,42 +200,10 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
|||||||
restricted_seek);
|
restricted_seek);
|
||||||
};
|
};
|
||||||
|
|
||||||
// auto read_buffer_creator2 = [this, settings_ptr, disk_read_settings](
|
|
||||||
// bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
|
|
||||||
// {
|
|
||||||
// return std::make_unique<ReadBufferFromS3>(
|
|
||||||
// client.get(),
|
|
||||||
// uri.bucket,
|
|
||||||
// object_.remote_path,
|
|
||||||
// uri.version_id,
|
|
||||||
// settings_ptr->request_settings,
|
|
||||||
// disk_read_settings,
|
|
||||||
// /* use_external_buffer */ true,
|
|
||||||
// /* offset */ 0,
|
|
||||||
// /* read_until_position */ 0,
|
|
||||||
// restricted_seek);
|
|
||||||
// };
|
|
||||||
|
|
||||||
|
|
||||||
switch (read_settings.remote_fs_method)
|
switch (read_settings.remote_fs_method)
|
||||||
{
|
{
|
||||||
case RemoteFSReadMethod::read:
|
case RemoteFSReadMethod::read:
|
||||||
{
|
{
|
||||||
// auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
|
|
||||||
// std::move(read_buffer_creator2),
|
|
||||||
// objects,
|
|
||||||
// "s3:" + uri.bucket + "/",
|
|
||||||
// disk_read_settings,
|
|
||||||
// global_context->getFilesystemCacheLog(),
|
|
||||||
// /* use_external_buffer */ false);
|
|
||||||
// std::string answer(1000, ' ');
|
|
||||||
// size_t read_bytes = impl2->read(answer.data(), 1000);
|
|
||||||
// LOG_DEBUG(
|
|
||||||
// &Poco::Logger::get("ReadBufferFromRemoteFSGather 000"),
|
|
||||||
// "Read bytes: {}, string: {}",
|
|
||||||
// read_bytes,
|
|
||||||
// answer.substr(0, read_bytes));
|
|
||||||
|
|
||||||
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
return std::make_unique<ReadBufferFromRemoteFSGather>(
|
||||||
std::move(read_buffer_creator),
|
std::move(read_buffer_creator),
|
||||||
objects,
|
objects,
|
||||||
@ -244,30 +212,15 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
|||||||
global_context->getFilesystemCacheLog(),
|
global_context->getFilesystemCacheLog(),
|
||||||
/* use_external_buffer */false);
|
/* use_external_buffer */false);
|
||||||
}
|
}
|
||||||
case RemoteFSReadMethod::threadpool: {
|
case RemoteFSReadMethod::threadpool:
|
||||||
// auto impl2 = std::make_unique<ReadBufferFromRemoteFSGather>(
|
{
|
||||||
// std::move(read_buffer_creator2),
|
|
||||||
// objects,
|
|
||||||
// "s3:" + uri.bucket + "/",
|
|
||||||
// disk_read_settings,
|
|
||||||
// global_context->getFilesystemCacheLog(),
|
|
||||||
// /* use_external_buffer */ true);
|
|
||||||
|
|
||||||
// std::string answer(1000, ' ');
|
|
||||||
// size_t read_bytes = impl2->read(answer.data(), 1000);
|
|
||||||
// LOG_DEBUG(
|
|
||||||
// &Poco::Logger::get("ReadBufferFromRemoteFSGather 001"),
|
|
||||||
// "Read bytes: {}, string: {}",
|
|
||||||
// read_bytes,
|
|
||||||
// answer.substr(0, read_bytes));
|
|
||||||
|
|
||||||
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
|
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
|
||||||
std::move(read_buffer_creator),
|
std::move(read_buffer_creator),
|
||||||
objects,
|
objects,
|
||||||
"s3:" + uri.bucket + "/",
|
"s3:" + uri.bucket + "/",
|
||||||
disk_read_settings,
|
disk_read_settings,
|
||||||
global_context->getFilesystemCacheLog(),
|
global_context->getFilesystemCacheLog(),
|
||||||
/* use_external_buffer */ true);
|
/* use_external_buffer */true);
|
||||||
|
|
||||||
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
|
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
|
||||||
return std::make_unique<AsynchronousBoundedReadBuffer>(
|
return std::make_unique<AsynchronousBoundedReadBuffer>(
|
||||||
|
@ -42,32 +42,19 @@ const Block & PullingPipelineExecutor::getHeader() const
|
|||||||
|
|
||||||
bool PullingPipelineExecutor::pull(Chunk & chunk)
|
bool PullingPipelineExecutor::pull(Chunk & chunk)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Pulling 1"), "Pulling 1");
|
|
||||||
|
|
||||||
if (!executor)
|
if (!executor)
|
||||||
{
|
{
|
||||||
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
|
||||||
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Pulling 2"), "Pulling 2");
|
|
||||||
|
|
||||||
if (!executor->checkTimeLimitSoft())
|
if (!executor->checkTimeLimitSoft())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Pulling 3"), "Pulling 3");
|
|
||||||
|
|
||||||
|
|
||||||
if (!executor->executeStep(&has_data_flag))
|
if (!executor->executeStep(&has_data_flag))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Pulling 4"), "Pulling 4");
|
|
||||||
|
|
||||||
|
|
||||||
chunk = pulling_format->getChunk();
|
chunk = pulling_format->getChunk();
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Pulling 5"), "Pulling 5");
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,13 +73,10 @@ bool PullingPipelineExecutor::pull(Block & block)
|
|||||||
}
|
}
|
||||||
|
|
||||||
block = pulling_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
|
block = pulling_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
|
||||||
if (auto chunk_info = chunk.getChunkInfo())
|
if (auto agg_info = chunk.getChunkInfos().get<AggregatedChunkInfo>())
|
||||||
{
|
{
|
||||||
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
|
block.info.bucket_num = agg_info->bucket_num;
|
||||||
{
|
block.info.is_overflows = agg_info->is_overflows;
|
||||||
block.info.bucket_num = agg_info->bucket_num;
|
|
||||||
block.info.is_overflows = agg_info->is_overflows;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -1,18 +1,17 @@
|
|||||||
#include "StorageObjectStorageSource.h"
|
#include "StorageObjectStorageSource.h"
|
||||||
|
#include <Storages/VirtualColumnUtils.h>
|
||||||
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
|
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
|
||||||
|
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||||
|
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
||||||
|
#include <Processors/Sources/ConstChunkGenerator.h>
|
||||||
|
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||||
|
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||||
|
#include <IO/ReadBufferFromFileBase.h>
|
||||||
|
#include <IO/Archives/createArchiveReader.h>
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <Formats/ReadSchemaUtils.h>
|
#include <Formats/ReadSchemaUtils.h>
|
||||||
#include <IO/Archives/createArchiveReader.h>
|
|
||||||
#include <IO/ReadBufferFromFileBase.h>
|
|
||||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
|
||||||
#include <Processors/Sources/ConstChunkGenerator.h>
|
|
||||||
#include <Processors/Transforms/AddingDefaultsTransform.h>
|
|
||||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
|
||||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
|
||||||
#include <Storages/Cache/SchemaCache.h>
|
|
||||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||||
#include <Storages/VirtualColumnUtils.h>
|
#include <Storages/Cache/SchemaCache.h>
|
||||||
#include "Common/logger_useful.h"
|
|
||||||
#include <Common/parseGlobs.h>
|
#include <Common/parseGlobs.h>
|
||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
|
|
||||||
@ -71,7 +70,6 @@ StorageObjectStorageSource::StorageObjectStorageSource(
|
|||||||
, schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName()))
|
, schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName()))
|
||||||
, create_reader_scheduler(threadPoolCallbackRunnerUnsafe<ReaderHolder>(*create_reader_pool, "Reader"))
|
, create_reader_scheduler(threadPoolCallbackRunnerUnsafe<ReaderHolder>(*create_reader_pool, "Reader"))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Source created"), "Source created");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageObjectStorageSource::~StorageObjectStorageSource()
|
StorageObjectStorageSource::~StorageObjectStorageSource()
|
||||||
@ -134,7 +132,6 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
|||||||
{
|
{
|
||||||
ConfigurationPtr copy_configuration = configuration->clone();
|
ConfigurationPtr copy_configuration = configuration->clone();
|
||||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||||
|
|
||||||
if (filter_dag)
|
if (filter_dag)
|
||||||
{
|
{
|
||||||
auto keys = configuration->getPaths();
|
auto keys = configuration->getPaths();
|
||||||
@ -146,19 +143,6 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
|||||||
copy_configuration->setPaths(keys);
|
copy_configuration->setPaths(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Keys size: {}", configuration->getPaths().size());
|
|
||||||
for (auto && key : configuration->getPaths())
|
|
||||||
{
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Conf"), "Current key: {}", key);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Copy Conf"), "Keys size: {}", copy_configuration->getPaths().size());
|
|
||||||
for (auto && key : copy_configuration->getPaths())
|
|
||||||
{
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Copy Conf"), "Current key: {}", key);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
iterator = std::make_unique<KeysIterator>(
|
iterator = std::make_unique<KeysIterator>(
|
||||||
object_storage, copy_configuration, virtual_columns, is_archive ? nullptr : read_keys,
|
object_storage, copy_configuration, virtual_columns, is_archive ? nullptr : read_keys,
|
||||||
settings.ignore_non_existent_file, file_progress_callback);
|
settings.ignore_non_existent_file, file_progress_callback);
|
||||||
@ -187,11 +171,8 @@ Chunk StorageObjectStorageSource::generate()
|
|||||||
{
|
{
|
||||||
lazyInitialize();
|
lazyInitialize();
|
||||||
|
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Generating"), "Generating reader: {}", !(!reader));
|
|
||||||
|
|
||||||
if (isCancelled() || !reader)
|
if (isCancelled() || !reader)
|
||||||
{
|
{
|
||||||
if (reader)
|
if (reader)
|
||||||
@ -199,15 +180,10 @@ Chunk StorageObjectStorageSource::generate()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Generating 2"), "Generating 2");
|
|
||||||
|
|
||||||
Chunk chunk;
|
Chunk chunk;
|
||||||
if (reader->pull(chunk))
|
if (reader->pull(chunk))
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Generating 3"), "Generating 3");
|
|
||||||
|
|
||||||
UInt64 num_rows = chunk.getNumRows();
|
UInt64 num_rows = chunk.getNumRows();
|
||||||
LOG_DEBUG(&Poco::Logger::get("Creating_chunk"), "Chunk size: {}", num_rows);
|
|
||||||
total_rows_in_file += num_rows;
|
total_rows_in_file += num_rows;
|
||||||
|
|
||||||
size_t chunk_size = 0;
|
size_t chunk_size = 0;
|
||||||
@ -255,9 +231,6 @@ Chunk StorageObjectStorageSource::generate()
|
|||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Generating 4"), "Generating 4");
|
|
||||||
|
|
||||||
|
|
||||||
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
|
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
|
||||||
addNumRowsToCache(*reader.getObjectInfo(), total_rows_in_file);
|
addNumRowsToCache(*reader.getObjectInfo(), total_rows_in_file);
|
||||||
|
|
||||||
@ -328,8 +301,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
|||||||
}
|
}
|
||||||
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
|
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 1"), "");
|
|
||||||
|
|
||||||
QueryPipelineBuilder builder;
|
QueryPipelineBuilder builder;
|
||||||
std::shared_ptr<ISource> source;
|
std::shared_ptr<ISource> source;
|
||||||
std::unique_ptr<ReadBuffer> read_buf;
|
std::unique_ptr<ReadBuffer> read_buf;
|
||||||
@ -354,17 +325,11 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
|||||||
return schema_cache->tryGetNumRows(cache_key, get_last_mod_time);
|
return schema_cache->tryGetNumRows(cache_key, get_last_mod_time);
|
||||||
};
|
};
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 2"), "");
|
|
||||||
|
|
||||||
|
|
||||||
std::optional<size_t> num_rows_from_cache = need_only_count
|
std::optional<size_t> num_rows_from_cache = need_only_count
|
||||||
&& context_->getSettingsRef().use_cache_for_count_from_files
|
&& context_->getSettingsRef().use_cache_for_count_from_files
|
||||||
? try_get_num_rows_from_cache()
|
? try_get_num_rows_from_cache()
|
||||||
: std::nullopt;
|
: std::nullopt;
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 3"), "");
|
|
||||||
|
|
||||||
|
|
||||||
if (num_rows_from_cache)
|
if (num_rows_from_cache)
|
||||||
{
|
{
|
||||||
/// We should not return single chunk with all number of rows,
|
/// We should not return single chunk with all number of rows,
|
||||||
@ -377,8 +342,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Unreached point 4"), "");
|
|
||||||
|
|
||||||
CompressionMethod compression_method;
|
CompressionMethod compression_method;
|
||||||
if (const auto * object_info_in_archive = dynamic_cast<const ArchiveIterator::ObjectInfoInArchive *>(object_info.get()))
|
if (const auto * object_info_in_archive = dynamic_cast<const ArchiveIterator::ObjectInfoInArchive *>(object_info.get()))
|
||||||
{
|
{
|
||||||
@ -389,12 +352,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->compression_method);
|
compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->compression_method);
|
||||||
LOG_DEBUG(&Poco::Logger::get("Info relative path"), "Info: {}", object_info->relative_path);
|
|
||||||
read_buf = createReadBuffer(*object_info, object_storage, context_, log);
|
read_buf = createReadBuffer(*object_info, object_storage, context_, log);
|
||||||
auto new_read_buf = createReadBuffer(*object_info, object_storage, context_, log);
|
|
||||||
std::string answer(1000, ' ');
|
|
||||||
size_t read_bytes = new_read_buf->read(answer.data(), 1000);
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Read buffer"), "Read bytes: {}, string: {}", read_bytes, answer.substr(0, read_bytes));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto input_format = FormatFactory::instance().getInput(
|
auto input_format = FormatFactory::instance().getInput(
|
||||||
@ -460,17 +418,11 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
|||||||
const auto & object_size = object_info.metadata->size_bytes;
|
const auto & object_size = object_info.metadata->size_bytes;
|
||||||
|
|
||||||
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
|
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
|
||||||
|
|
||||||
// read_settings.remote_fs_method = RemoteFSReadMethod::read;
|
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Threadpool"), "Method threadpool: {}", read_settings.remote_fs_method == RemoteFSReadMethod::threadpool);
|
|
||||||
|
|
||||||
read_settings.enable_filesystem_cache = false;
|
read_settings.enable_filesystem_cache = false;
|
||||||
/// FIXME: Changing this setting to default value breaks something around parquet reading
|
/// FIXME: Changing this setting to default value breaks something around parquet reading
|
||||||
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
|
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
|
||||||
|
|
||||||
const bool object_too_small = object_size <= 2 * context_->getSettingsRef().max_download_buffer_size;
|
const bool object_too_small = object_size <= 2 * context_->getSettingsRef().max_download_buffer_size;
|
||||||
|
|
||||||
const bool use_prefetch = object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
const bool use_prefetch = object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||||
read_settings.remote_fs_method = use_prefetch ? RemoteFSReadMethod::threadpool : RemoteFSReadMethod::read;
|
read_settings.remote_fs_method = use_prefetch ? RemoteFSReadMethod::threadpool : RemoteFSReadMethod::read;
|
||||||
/// User's object may change, don't cache it.
|
/// User's object may change, don't cache it.
|
||||||
@ -479,28 +431,24 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
|||||||
// Create a read buffer that will prefetch the first ~1 MB of the file.
|
// Create a read buffer that will prefetch the first ~1 MB of the file.
|
||||||
// When reading lots of tiny files, this prefetching almost doubles the throughput.
|
// When reading lots of tiny files, this prefetching almost doubles the throughput.
|
||||||
// For bigger files, parallel reading is more useful.
|
// For bigger files, parallel reading is more useful.
|
||||||
// if (use_prefetch)
|
if (use_prefetch)
|
||||||
// {
|
{
|
||||||
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
|
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Read objects"), "Path: {}, object size: {}", object_info.getPath(), object_size);
|
auto async_reader = object_storage->readObjects(
|
||||||
|
StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings);
|
||||||
|
|
||||||
auto async_reader
|
async_reader->setReadUntilEnd();
|
||||||
= object_storage->readObjects(StoredObjects{StoredObject{object_info.getPath(), /* local_path */ "", object_size}}, read_settings);
|
if (read_settings.remote_fs_prefetch)
|
||||||
|
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
|
||||||
|
|
||||||
async_reader->setReadUntilEnd();
|
return async_reader;
|
||||||
if (read_settings.remote_fs_prefetch)
|
}
|
||||||
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
|
else
|
||||||
|
{
|
||||||
return async_reader;
|
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
|
||||||
// }
|
return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
|
||||||
// else
|
}
|
||||||
// {
|
|
||||||
// /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
|
|
||||||
// LOG_DEBUG(&Poco::Logger::get("Read object"), "Path: {}, object size: {}", object_info.getPath(), object_size);
|
|
||||||
|
|
||||||
// return object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)
|
StorageObjectStorageSource::IIterator::IIterator(const std::string & logger_name_)
|
||||||
@ -678,18 +626,11 @@ StorageObjectStorageSource::KeysIterator::KeysIterator(
|
|||||||
, keys(configuration->getPaths())
|
, keys(configuration->getPaths())
|
||||||
, ignore_non_existent_files(ignore_non_existent_files_)
|
, ignore_non_existent_files(ignore_non_existent_files_)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Keys size"), "Keys size: {}", keys.size());
|
|
||||||
for (auto && key : keys)
|
|
||||||
{
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("Current Key"), "Current key: {}", key);
|
|
||||||
}
|
|
||||||
if (read_keys_)
|
if (read_keys_)
|
||||||
{
|
{
|
||||||
/// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed?
|
/// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed?
|
||||||
|
|
||||||
for (auto && key : keys)
|
for (auto && key : keys)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("Current Key"), "Current key: {}", key);
|
|
||||||
auto object_info = std::make_shared<ObjectInfo>(key);
|
auto object_info = std::make_shared<ObjectInfo>(key);
|
||||||
read_keys_->emplace_back(object_info);
|
read_keys_->emplace_back(object_info);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user