This commit is contained in:
Sergey Katkovskiy 2023-05-09 06:58:29 +03:00
parent 751337fad0
commit a443c7f289
4 changed files with 7 additions and 7 deletions

View File

@ -159,7 +159,7 @@ void S3QueueHolder::setFileProcessed(const String & file_path)
if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data)) if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data))
{ {
S3FilesCollection processing_files = parseCollection(node_data); S3FilesCollection processing_files = parseCollection(node_data);
for (auto x : processing_files) for (const auto & x : processing_files)
{ {
if (x != file_path) if (x != file_path)
{ {
@ -251,7 +251,7 @@ void S3QueueHolder::setFilesProcessing(Strings & file_paths)
if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data)) if (zookeeper->tryGet(fs::path(zookeeper_processing_path), node_data))
{ {
S3FilesCollection processing_files = parseCollection(node_data); S3FilesCollection processing_files = parseCollection(node_data);
for (auto x : processing_files) for (const auto & x : processing_files)
{ {
if (!std::count(file_paths.begin(), file_paths.end(), x)) if (!std::count(file_paths.begin(), file_paths.end(), x))
{ {

View File

@ -48,10 +48,8 @@
# include <Common/NamedCollections/NamedCollections.h> # include <Common/NamedCollections/NamedCollections.h>
# include <Common/parseGlobs.h> # include <Common/parseGlobs.h>
# include <filesystem>
# include <Processors/ISource.h> # include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h> # include <Processors/Sinks/SinkToStorage.h>
# include <QueryPipeline/Pipe.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;

View File

@ -539,10 +539,13 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
for (size_t i = 0; i < 1000; ++i) for (size_t i = 0; i < 1000; ++i)
{ {
Coordination::Requests ops; Coordination::Requests ops;
bool is_first_replica = true;
if (zookeeper->exists(zookeeper_path + "/metadata")) if (zookeeper->exists(zookeeper_path + "/metadata"))
{ {
if (!zookeeper->exists(zookeeper_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zookeeper_path); LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zookeeper_path);
return false; is_first_replica = false;
} }
else else
{ {
@ -569,7 +572,7 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
} }
return true; return is_first_replica;
} }
throw Exception( throw Exception(

View File

@ -130,7 +130,6 @@ private:
zkutil::ZooKeeperPtr getZooKeeper() const; zkutil::ZooKeeperPtr getZooKeeper() const;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot); bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot); void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
// Return default or custom zookeeper name for table
const String & getZooKeeperPath() const { return zookeeper_path; } const String & getZooKeeperPath() const { return zookeeper_path; }
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo; using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;