reformat code

This commit is contained in:
Sergey Katkovskiy 2023-05-08 16:31:24 +03:00
parent a8d56b2290
commit 751337fad0
3 changed files with 25 additions and 177 deletions

View File

@ -71,32 +71,6 @@ extern const Event S3ListObjects;
namespace DB
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static const std::unordered_set<std::string_view> optional_configuration_keys
= {"format",
"compression",
"compression_method",
"structure",
"access_key_id",
"secret_access_key",
"filename",
"use_environment_credentials",
"max_single_read_retries",
"min_upload_part_size",
"upload_part_size_multiply_factor",
"upload_part_size_multiply_parts_count_threshold",
"max_single_part_upload_size",
"max_connections",
"expiration_window_seconds",
"no_sign_request"};
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
@ -222,135 +196,37 @@ StorageS3QueueSource::StorageS3QueueSource(
, version_id(version_id_)
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
, request_settings(request_settings_)
, compression_hint(std::move(compression_hint_))
, client(client_)
, sample_block(sample_block_)
, format_settings(format_settings_)
, queue_holder(queue_holder_)
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, action(action_)
, download_thread_num(download_thread_num_)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3QReader"))
{
reader = createReader();
internal_source = std::make_shared<StorageS3Source>(
requested_virtual_columns_,
format_,
name_,
sample_block_,
context_,
format_settings_,
columns_,
max_block_size_,
request_settings_,
compression_hint_,
client_,
bucket_,
version_id_,
file_iterator,
download_thread_num_);
reader = std::move(internal_source->reader);
if (reader)
reader_future = createReaderAsync();
}
StorageS3QueueSource::ReaderHolder StorageS3QueueSource::createReader()
{
auto [current_key, info] = (*file_iterator)();
if (current_key.empty())
return {};
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id, request_settings);
auto compression_method = chooseCompressionMethod(current_key, compression_hint);
InputFormatPtr input_format;
std::unique_ptr<ReadBuffer> owned_read_buf;
auto read_buf_or_factory = createS3ReadBuffer(current_key, object_size);
if (read_buf_or_factory.buf_factory)
{
input_format = FormatFactory::instance().getInputRandomAccess(
format,
std::move(read_buf_or_factory.buf_factory),
sample_block,
getContext(),
max_block_size,
/* is_remote_fs */ true,
compression_method,
format_settings);
}
else
{
owned_read_buf = wrapReadBufferWithCompressionMethod(
std::move(read_buf_or_factory.buf), compression_method, static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
input_format
= FormatFactory::instance().getInput(format, *owned_read_buf, sample_block, getContext(), max_block_size, format_settings);
}
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
if (columns_desc.hasDefaults())
{
builder.addSimpleTransform(
[&](const Block & header)
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return ReaderHolder{fs::path(bucket) / current_key, std::move(owned_read_buf), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3QueueSource::ReaderHolder> StorageS3QueueSource::createReaderAsync()
{
return create_reader_scheduler([this] { return createReader(); }, 0);
}
StorageS3QueueSource::ReadBufferOrFactory StorageS3QueueSource::createS3ReadBuffer(const String & key, size_t object_size)
{
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool object_too_small = object_size <= 2 * download_buffer_size;
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
LOG_TRACE(log, "Downloading object of size {} from S3 with initial prefetch", object_size);
return {.buf = createAsyncS3ReadBuffer(key, read_settings, object_size)};
}
auto factory = std::make_unique<ReadBufferS3Factory>(client, bucket, key, version_id, object_size, request_settings, read_settings);
return {.buf_factory = std::move(factory)};
}
std::unique_ptr<ReadBuffer>
StorageS3QueueSource::createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto read_buffer_creator =
[this, read_settings, object_size](const std::string & path, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_shared<ReadBufferFromS3>(
client,
bucket,
path,
version_id,
request_settings,
read_settings,
/* use_external_buffer */ true,
/* offset */ 0,
read_until_position,
/* restricted_seek */ true,
object_size);
};
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), StoredObjects{StoredObject{key, object_size}}, read_settings);
auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(pool_reader, read_settings, std::move(s3_impl));
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
return async_reader;
reader_future = std::move(internal_source->reader_future);
}
StorageS3QueueSource::~StorageS3QueueSource()
{
create_reader_pool.wait();
internal_source->create_reader_pool.wait();
}
String StorageS3QueueSource::getName() const
@ -370,12 +246,10 @@ Chunk StorageS3QueueSource::generate()
}
Chunk chunk;
LOG_WARNING(log, "Try to pull new chunk");
try
{
if (reader->pull(chunk))
{
LOG_WARNING(log, "Success in pulling!");
UInt64 num_rows = chunk.getNumRows();
const auto & file_path = reader.getPath();
@ -399,7 +273,6 @@ Chunk StorageS3QueueSource::generate()
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
LOG_WARNING(log, "Set processed: {}", file_path);
queue_holder->setFileProcessed(file_path);
applyActionAfterProcessing(file_path);
return chunk;
@ -407,9 +280,8 @@ Chunk StorageS3QueueSource::generate()
}
catch (const Exception & e)
{
LOG_ERROR(log, "Exception: {} ", e.displayText());
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
const auto & failed_file_path = reader.getPath();
LOG_WARNING(log, "Set failed: {}", failed_file_path);
queue_holder->setFileFailed(failed_file_path);
}
@ -419,11 +291,10 @@ Chunk StorageS3QueueSource::generate()
if (!reader)
break;
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
create_reader_pool.wait();
reader_future = createReaderAsync();
internal_source->create_reader_pool.wait();
reader_future = internal_source->createReaderAsync();
}
return {};
@ -431,7 +302,7 @@ Chunk StorageS3QueueSource::generate()
void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
{
LOG_WARNING(log, "Delete {} Bucke {}", file_path, bucket);
LOG_WARNING(log, "Delete {} Bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(file_path);

View File

@ -36,7 +36,6 @@ public:
using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeysWithInfo = StorageS3Source::KeysWithInfo;
using KeyWithInfo = StorageS3Source::KeyWithInfo;
using ReadBufferOrFactory = StorageS3Source::ReadBufferOrFactory;
class QueueGlobIterator : public IIterator
{
public:
@ -95,8 +94,6 @@ public:
Chunk generate() override;
static std::unordered_set<String> parseCollection(String & files);
private:
String name;
@ -104,12 +101,8 @@ private:
String version_id;
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
S3Settings::RequestSettings request_settings;
String compression_hint;
std::shared_ptr<const S3::Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
std::shared_ptr<S3QueueHolder> queue_holder;
using ReaderHolder = StorageS3Source::ReaderHolder;
@ -118,12 +111,9 @@ private:
std::vector<NameAndTypePair> requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator;
const S3QueueAction action;
size_t download_thread_num = 1;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource");
ThreadPool create_reader_pool;
ThreadPoolCallbackRunner<ReaderHolder> create_reader_scheduler;
std::future<ReaderHolder> reader_future;
UInt64 total_rows_approx_max = 0;
@ -132,15 +122,7 @@ private:
mutable std::mutex mutex;
ReaderHolder createReader();
std::future<ReaderHolder> createReaderAsync();
ReadBufferOrFactory createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
void setFileProcessed(const String & file_path);
void setFileFailed(const String & file_path);
std::shared_ptr<StorageS3Source> internal_source;
void applyActionAfterProcessing(const String & file_path);
};

View File

@ -539,8 +539,6 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Requests ops;
auto table_uuid = getStorageID().uuid;
if (zookeeper->exists(zookeeper_path + "/metadata"))
{
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zookeeper_path);
@ -552,10 +550,7 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processed", "processed_files\n", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/failed", "[]", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(zookeeper_path) / "processing" / toString(table_uuid), "[]", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));