fix codestyle

This commit is contained in:
Sergey Katkovskiy 2023-05-02 18:10:57 +03:00
parent 841987a0a3
commit 559e8fa6e5
3 changed files with 46 additions and 42 deletions

View File

@ -109,20 +109,6 @@ static const std::unordered_set<std::string_view> optional_configuration_keys
"expiration_window_seconds", "expiration_window_seconds",
"no_sign_request"}; "no_sign_request"};
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
extern const int FILE_DOESNT_EXIST;
}
class IOutputFormat; class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>; using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -160,7 +146,8 @@ Strings StorageS3QueueSource::QueueGlobIterator::setProcessing(S3QueueMode & eng
LOG_INFO(log, "Found in exclude keys {}", val.key); LOG_INFO(log, "Found in exclude keys {}", val.key);
continue; continue;
} }
if (engine_mode == S3QueueMode::ORDERED && full_path.compare(max_file) <= 0) { if (engine_mode == S3QueueMode::ORDERED && full_path.compare(max_file) <= 0)
{
continue; continue;
} }
if (processing_keys.size() < max_poll_size) if (processing_keys.size() < max_poll_size)
@ -457,7 +444,8 @@ Chunk StorageS3QueueSource::generate()
void StorageS3QueueSource::setFileProcessed(const String & file_path) void StorageS3QueueSource::setFileProcessed(const String & file_path)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (mode == S3QueueMode::UNORDERED) { if (mode == S3QueueMode::UNORDERED)
{
String processed_files = zookeeper->get(zookeeper_path + "/processed"); String processed_files = zookeeper->get(zookeeper_path + "/processed");
std::unordered_set<String> processed = parseCollection(processed_files); std::unordered_set<String> processed = parseCollection(processed_files);
@ -466,7 +454,9 @@ void StorageS3QueueSource::setFileProcessed(const String & file_path)
set_processed.insert(set_processed.end(), processed.begin(), processed.end()); set_processed.insert(set_processed.end(), processed.begin(), processed.end());
zookeeper->set(zookeeper_path + "/processed", toString(set_processed)); zookeeper->set(zookeeper_path + "/processed", toString(set_processed));
} else { }
else
{
zookeeper->set(zookeeper_path + "/processed", file_path); zookeeper->set(zookeeper_path + "/processed", file_path);
} }
} }
@ -497,7 +487,9 @@ void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
{ {
const auto & err = outcome.GetError(); const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType())); LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
} else { }
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", file_path); LOG_TRACE(log, "Object with path {} was removed from S3", file_path);
} }
} }

View File

@ -96,16 +96,10 @@ static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_PARSE_TEXT; extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR; extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
extern const int FILE_DOESNT_EXIST;
extern const int QUERY_NOT_ALLOWED; extern const int QUERY_NOT_ALLOWED;
extern const int NO_ZOOKEEPER; extern const int NO_ZOOKEEPER;
extern const int REPLICA_ALREADY_EXISTS; extern const int REPLICA_ALREADY_EXISTS;
@ -256,7 +250,8 @@ Pipe StorageS3Queue::read(
const auto & virtuals = getVirtuals(); const auto & virtuals = getVirtuals();
std::erase_if( std::erase_if(
fetch_columns, fetch_columns,
[&](const String & col) { [&](const String & col)
{
return std::any_of( return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; }); virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
}); });
@ -305,6 +300,20 @@ Pipe StorageS3Queue::read(
return pipe; return pipe;
} }
SinkToStoragePtr StorageS3Queue::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
}
void StorageS3Queue::truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList StorageS3Queue::getVirtuals() const NamesAndTypesList StorageS3Queue::getVirtuals() const
{ {
return virtual_columns; return virtual_columns;
@ -462,7 +471,8 @@ void StorageS3Queue::streamToViews()
const auto & virtuals = getVirtuals(); const auto & virtuals = getVirtuals();
std::erase_if( std::erase_if(
fetch_columns, fetch_columns,
[&](const String & col) { [&](const String & col)
{
return std::any_of( return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; }); virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
}); });
@ -609,9 +619,12 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query, KeysW
std::unordered_set<String> exclude = getExcludedFiles(); std::unordered_set<String> exclude = getExcludedFiles();
Strings processing; Strings processing;
if (mode == S3QueueMode::UNORDERED) { if (mode == S3QueueMode::UNORDERED)
{
processing = it->setProcessing(mode, exclude); processing = it->setProcessing(mode, exclude);
} else { }
else
{
String max_processed_file = getMaxProcessedFile(); String max_processed_file = getMaxProcessedFile();
processing = it->setProcessing(mode, exclude, max_processed_file); processing = it->setProcessing(mode, exclude, max_processed_file);
} }
@ -623,7 +636,8 @@ StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query, KeysW
return it; return it;
} }
std::unordered_set<String> StorageS3Queue::getFailedFiles() { std::unordered_set<String> StorageS3Queue::getFailedFiles()
{
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
String failed = zookeeper->get(zookeeper_path + "/failed"); String failed = zookeeper->get(zookeeper_path + "/failed");
@ -632,7 +646,8 @@ std::unordered_set<String> StorageS3Queue::getFailedFiles() {
return failed_files; return failed_files;
} }
std::unordered_set<String> StorageS3Queue::getProcessedFiles() { std::unordered_set<String> StorageS3Queue::getProcessedFiles()
{
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
String processed = zookeeper->get(zookeeper_path + "/processed"); String processed = zookeeper->get(zookeeper_path + "/processed");
@ -641,14 +656,16 @@ std::unordered_set<String> StorageS3Queue::getProcessedFiles() {
return processed_files; return processed_files;
} }
String StorageS3Queue::getMaxProcessedFile() { String StorageS3Queue::getMaxProcessedFile()
{
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
String processed = zookeeper->get(zookeeper_path + "/processed"); String processed = zookeeper->get(zookeeper_path + "/processed");
return processed; return processed;
} }
std::unordered_set<String> StorageS3Queue::getProcessingFiles() { std::unordered_set<String> StorageS3Queue::getProcessingFiles()
{
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
Strings consumer_table_uuids; Strings consumer_table_uuids;
@ -671,7 +688,8 @@ std::unordered_set<String> StorageS3Queue::getExcludedFiles()
LOG_DEBUG(log, "failed_files {}", failed_files.size()); LOG_DEBUG(log, "failed_files {}", failed_files.size());
exclude_files.merge(failed_files); exclude_files.merge(failed_files);
if (mode != S3QueueMode::ORDERED) { if (mode != S3QueueMode::ORDERED)
{
std::unordered_set<String> processed_files = getProcessedFiles(); std::unordered_set<String> processed_files = getProcessedFiles();
LOG_DEBUG(log, "processed_files {}", processed_files.size()); LOG_DEBUG(log, "processed_files {}", processed_files.size());
exclude_files.merge(processed_files); exclude_files.merge(processed_files);

View File

@ -63,19 +63,13 @@ public:
size_t max_block_size, size_t max_block_size,
size_t num_streams) override; size_t num_streams) override;
SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override;
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
}
void truncate( void truncate(
const ASTPtr & /*query*/, const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/, const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/, ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override TableExclusiveLockHolder &) override;
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList getVirtuals() const override; NamesAndTypesList getVirtuals() const override;