Merge branch 'master' into improve_replica_recovery

This commit is contained in:
Alexander Tokmakov 2022-10-20 14:25:09 +03:00 committed by GitHub
commit d668a82829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1069 additions and 196 deletions

View File

@ -3,15 +3,15 @@
# This is a workaround for bug in llvm/clang,
# that does not produce .debug_aranges with LTO
#
# NOTE: this is a temporary solution, that should be removed once [1] will be
# resolved.
# NOTE: this is a temporary solution, that should be removed after upgrading to
# clang-16/llvm-16.
#
# [1]: https://discourse.llvm.org/t/clang-does-not-produce-full-debug-aranges-section-with-thinlto/64898/8
# Refs: https://reviews.llvm.org/D133092
# NOTE: only -flto=thin is supported.
# NOTE: it is not possible to check was there -gdwarf-aranges initially or not.
if [[ "$*" =~ -plugin-opt=thinlto ]]; then
exec "@LLD_PATH@" -mllvm -generate-arange-section "$@"
exec "@LLD_PATH@" -plugin-opt=-generate-arange-section "$@"
else
exec "@LLD_PATH@" "$@"
fi

View File

@ -45,6 +45,7 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperLogStore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperServer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManager.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperSnapshotManagerS3.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateMachine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStateManager.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperStorage.cpp

View File

@ -46,7 +46,7 @@ AggregateFunctionPtr createAggregateFunctionQuantile(
if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);

View File

@ -40,7 +40,7 @@ AggregateFunctionPtr createAggregateFunctionQuantile(
if (which.idx == TypeIndex::DateTime) return std::make_shared<Function<DataTypeDateTime::FieldType, false>>(argument_types, params);
if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);

View File

@ -47,7 +47,7 @@ AggregateFunctionPtr createAggregateFunctionQuantile(
if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);

View File

@ -46,7 +46,7 @@ AggregateFunctionPtr createAggregateFunctionQuantile(
if (which.idx == TypeIndex::DateTime64) return std::make_shared<Function<DateTime64, false>>(argument_types, params);
if (which.idx == TypeIndex::Int128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<Int128, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt128) return std::make_shared<Function<UInt128, true>>(argument_types, params);
if (which.idx == TypeIndex::Int256) return std::make_shared<Function<Int256, true>>(argument_types, params);
if (which.idx == TypeIndex::UInt256) return std::make_shared<Function<UInt256, true>>(argument_types, params);

View File

@ -40,7 +40,15 @@ struct WelchTTestData : public TTestMoments<Float64>
Float64 denominator_x = sx2 * sx2 / (nx * nx * (nx - 1));
Float64 denominator_y = sy2 * sy2 / (ny * ny * (ny - 1));
return numerator / (denominator_x + denominator_y);
auto result = numerator / (denominator_x + denominator_y);
if (result <= 0 || std::isinf(result) || isNaN(result))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot calculate p_value, because the t-distribution \
has inappropriate value of degrees of freedom (={}). It should be > 0", result);
return result;
}
std::tuple<Float64, Float64> getResult() const

View File

@ -393,24 +393,38 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (const ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
read_list.push_back(*connection->socket);
}
auto timeout = is_draining ? drain_timeout : receive_timeout;
int n = Poco::Net::Socket::select(
read_list,
write_list,
except_list,
timeout);
int n = 0;
/// EINTR loop
while (true)
{
read_list.clear();
for (const ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
read_list.push_back(*connection->socket);
}
/// poco returns 0 on EINTR, let's reset errno to ensure that EINTR came from select().
errno = 0;
n = Poco::Net::Socket::select(
read_list,
write_list,
except_list,
timeout);
if (n <= 0 && errno == EINTR)
continue;
break;
}
/// We treat any error as timeout for simplicity.
/// And we also check if read_list is still empty just in case.
if (n <= 0 || read_list.empty())
{
const auto & addresses = dumpAddressesUnlocked();
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
@ -423,7 +437,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Timeout ({} ms) exceeded while reading from {}",
timeout.totalMilliseconds(),
dumpAddressesUnlocked());
addresses);
}
}

View File

@ -1,14 +1,21 @@
#include <Coordination/KeeperDispatcher.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/hex.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
#include <chrono>
#include <Poco/Path.h>
#include <Common/hex.h>
#include <filesystem>
#include <Common/checkStackSize.h>
#include <Common/CurrentMetrics.h>
#include <future>
#include <chrono>
#include <filesystem>
#include <iterator>
#include <limits>
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections;
@ -32,9 +39,7 @@ KeeperDispatcher::KeeperDispatcher()
: responses_queue(std::numeric_limits<size_t>::max())
, configuration_and_settings(std::make_shared<KeeperConfigurationAndSettings>())
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
{}
void KeeperDispatcher::requestThread()
{
@ -191,7 +196,13 @@ void KeeperDispatcher::snapshotThread()
try
{
task.create_snapshot(std::move(task.snapshot));
auto snapshot_path = task.create_snapshot(std::move(task.snapshot));
if (snapshot_path.empty())
continue;
if (isLeader())
snapshot_s3.uploadSnapshot(snapshot_path);
}
catch (...)
{
@ -285,7 +296,9 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue);
snapshot_s3.startup(config);
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3);
try
{
@ -312,7 +325,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
updateConfiguration(config);
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -415,6 +427,8 @@ void KeeperDispatcher::shutdown()
if (server)
server->shutdown();
snapshot_s3.shutdown();
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
}
@ -678,6 +692,8 @@ void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfigurati
if (!push_result)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
}
snapshot_s3.updateS3Configuration(config);
}
void KeeperDispatcher::updateKeeperStatLatency(uint64_t process_time_ms)

View File

@ -14,6 +14,7 @@
#include <Coordination/CoordinationSettings.h>
#include <Coordination/Keeper4LWInfo.h>
#include <Coordination/KeeperConnectionStats.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
namespace DB
{
@ -76,6 +77,8 @@ private:
/// Counter for new session_id requests.
std::atomic<int64_t> internal_session_id_counter{0};
KeeperSnapshotManagerS3 snapshot_s3;
/// Thread put requests to raft
void requestThread();
/// Thread put responses for subscribed sessions

View File

@ -8,6 +8,7 @@
#include <string>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
@ -105,7 +106,8 @@ KeeperServer::KeeperServer(
const KeeperConfigurationAndSettingsPtr & configuration_and_settings_,
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_)
SnapshotsQueue & snapshots_queue_,
KeeperSnapshotManagerS3 & snapshot_manager_s3)
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer"))
@ -125,6 +127,7 @@ KeeperServer::KeeperServer(
configuration_and_settings_->snapshot_storage_path,
coordination_settings,
keeper_context,
config.getBool("keeper_server.upload_snapshot_on_exit", true) ? &snapshot_manager_s3 : nullptr,
checkAndGetSuperdigest(configuration_and_settings_->super_digest));
state_manager = nuraft::cs_new<KeeperStateManager>(

View File

@ -71,7 +71,8 @@ public:
const KeeperConfigurationAndSettingsPtr & settings_,
const Poco::Util::AbstractConfiguration & config_,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_);
SnapshotsQueue & snapshots_queue_,
KeeperSnapshotManagerS3 & snapshot_manager_s3);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);

View File

@ -87,7 +87,7 @@ public:
};
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<void(KeeperStorageSnapshotPtr &&)>;
using CreateSnapshotCallback = std::function<std::string(KeeperStorageSnapshotPtr &&)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;

View File

@ -0,0 +1,311 @@
#include <Coordination/KeeperSnapshotManagerS3.h>
#if USE_AWS_S3
#include <Core/UUID.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <IO/S3Common.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/S3/PocoHTTPClient.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
struct KeeperSnapshotManagerS3::S3Configuration
{
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const Aws::S3::S3Client> client_)
: uri(std::move(uri_))
, auth_settings(std::move(auth_settings_))
, client(std::move(client_))
{}
S3::URI uri;
S3::AuthSettings auth_settings;
std::shared_ptr<const Aws::S3::S3Client> client;
};
KeeperSnapshotManagerS3::KeeperSnapshotManagerS3()
: snapshots_s3_queue(std::numeric_limits<size_t>::max())
, log(&Poco::Logger::get("KeeperSnapshotManagerS3"))
, uuid(UUIDHelpers::generateV4())
{}
void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractConfiguration & config)
{
try
{
const std::string config_prefix = "keeper_server.s3_snapshot";
if (!config.has(config_prefix))
{
std::lock_guard client_lock{snapshot_s3_client_mutex};
if (snapshot_s3_client)
LOG_INFO(log, "S3 configuration was removed");
snapshot_s3_client = nullptr;
return;
}
auto auth_settings = S3::AuthSettings::loadFromConfig(config_prefix, config);
auto endpoint = config.getString(config_prefix + ".endpoint");
auto new_uri = S3::URI{Poco::URI(endpoint)};
{
std::lock_guard client_lock{snapshot_s3_client_mutex};
// if client is not changed (same auth settings, same endpoint) we don't need to update
if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings
&& snapshot_s3_client->uri.uri == new_uri.uri)
return;
}
LOG_INFO(log, "S3 configuration was updated");
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
HeaderCollection headers = auth_settings.headers;
static constexpr size_t s3_max_redirects = 10;
static constexpr bool enable_s3_requests_logging = false;
if (!new_uri.key.empty())
{
LOG_ERROR(log, "Invalid endpoint defined for S3, it shouldn't contain key, endpoint: {}", endpoint);
return;
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
auth_settings.region,
RemoteHostFilter(), s3_max_redirects,
enable_s3_requests_logging,
/* for_disk_s3 = */ false);
client_configuration.endpointOverride = new_uri.endpoint;
auto client = S3::ClientFactory::instance().create(
client_configuration,
new_uri.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
auth_settings.use_environment_credentials.value_or(false),
auth_settings.use_insecure_imds_request.value_or(false));
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));
{
std::lock_guard client_lock{snapshot_s3_client_mutex};
snapshot_s3_client = std::move(new_client);
}
LOG_INFO(log, "S3 client was updated");
}
catch (...)
{
LOG_ERROR(log, "Failed to create an S3 client for snapshots");
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
std::shared_ptr<KeeperSnapshotManagerS3::S3Configuration> KeeperSnapshotManagerS3::getSnapshotS3Client() const
{
std::lock_guard lock{snapshot_s3_client_mutex};
return snapshot_s3_client;
}
void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_path)
{
try
{
auto s3_client = getSnapshotS3Client();
if (s3_client == nullptr)
return;
S3Settings::ReadWriteSettings read_write_settings;
read_write_settings.upload_part_size_multiply_parts_count_threshold = 10000;
const auto create_writer = [&](const auto & key)
{
return WriteBufferFromS3
{
s3_client->client,
s3_client->uri.bucket,
key,
read_write_settings
};
};
const auto file_exists = [&](const auto & key)
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(s3_client->uri.bucket);
request.SetKey(key);
auto outcome = s3_client->client->HeadObject(request);
if (outcome.IsSuccess())
return true;
const auto & error = outcome.GetError();
if (error.GetErrorType() != Aws::S3::S3Errors::NO_SUCH_KEY && error.GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
throw S3Exception(error.GetErrorType(), "Failed to verify existence of lock file: {}", error.GetMessage());
return false;
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
ReadBufferFromFile snapshot_file(snapshot_path);
auto snapshot_name = fs::path(snapshot_path).filename().string();
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
if (file_exists(snapshot_name))
{
LOG_ERROR(log, "Snapshot {} already exists", snapshot_name);
return;
}
// First we need to verify that there isn't already a lock file for the snapshot we want to upload
// Only leader uploads a snapshot, but there can be a rare case where we have 2 leaders in NuRaft
if (file_exists(lock_file))
{
LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name);
return;
}
// We write our UUID to lock file
LOG_DEBUG(log, "Trying to create a lock file");
WriteBufferFromS3 lock_writer = create_writer(lock_file);
writeUUIDText(uuid, lock_writer);
lock_writer.finalize();
// We read back the written UUID, if it's the same we can upload the file
ReadBufferFromS3 lock_reader
{
s3_client->client,
s3_client->uri.bucket,
lock_file,
"",
1,
{}
};
std::string read_uuid;
readStringUntilEOF(read_uuid, lock_reader);
if (read_uuid != toString(uuid))
{
LOG_ERROR(log, "Failed to create a lock file");
return;
}
SCOPE_EXIT(
{
LOG_INFO(log, "Removing lock file");
try
{
Aws::S3::Model::DeleteObjectRequest delete_request;
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
if (!delete_outcome.IsSuccess())
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
WriteBufferFromS3 snapshot_writer = create_writer(snapshot_name);
copyData(snapshot_file, snapshot_writer);
snapshot_writer.finalize();
LOG_INFO(log, "Successfully uploaded {} to S3", snapshot_path);
}
catch (...)
{
LOG_INFO(log, "Failure during upload of {} to S3", snapshot_path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void KeeperSnapshotManagerS3::snapshotS3Thread()
{
setThreadName("KeeperS3SnpT");
while (!shutdown_called)
{
std::string snapshot_path;
if (!snapshots_s3_queue.pop(snapshot_path))
break;
if (shutdown_called)
break;
uploadSnapshotImpl(snapshot_path);
}
}
void KeeperSnapshotManagerS3::uploadSnapshot(const std::string & path, bool async_upload)
{
if (getSnapshotS3Client() == nullptr)
return;
if (async_upload)
{
if (!snapshots_s3_queue.push(path))
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", path);
return;
}
uploadSnapshotImpl(path);
}
void KeeperSnapshotManagerS3::startup(const Poco::Util::AbstractConfiguration & config)
{
updateS3Configuration(config);
snapshot_s3_thread = ThreadFromGlobalPool([this] { snapshotS3Thread(); });
}
void KeeperSnapshotManagerS3::shutdown()
{
if (shutdown_called)
return;
LOG_DEBUG(log, "Shutting down KeeperSnapshotManagerS3");
shutdown_called = true;
try
{
snapshots_s3_queue.finish();
if (snapshot_s3_thread.joinable())
snapshot_s3_thread.join();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
LOG_INFO(log, "KeeperSnapshotManagerS3 shut down");
}
}
#endif

View File

@ -0,0 +1,68 @@
#pragma once
#include "config.h"
#include <Poco/Util/AbstractConfiguration.h>
#if USE_AWS_S3
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <string>
#endif
namespace DB
{
#if USE_AWS_S3
class KeeperSnapshotManagerS3
{
public:
KeeperSnapshotManagerS3();
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config);
void uploadSnapshot(const std::string & path, bool async_upload = true);
void startup(const Poco::Util::AbstractConfiguration & config);
void shutdown();
private:
using SnapshotS3Queue = ConcurrentBoundedQueue<std::string>;
SnapshotS3Queue snapshots_s3_queue;
/// Upload new snapshots to S3
ThreadFromGlobalPool snapshot_s3_thread;
struct S3Configuration;
mutable std::mutex snapshot_s3_client_mutex;
std::shared_ptr<S3Configuration> snapshot_s3_client;
std::atomic<bool> shutdown_called{false};
Poco::Logger * log;
UUID uuid;
std::shared_ptr<S3Configuration> getSnapshotS3Client() const;
void uploadSnapshotImpl(const std::string & snapshot_path);
/// Thread upload snapshots to S3 in the background
void snapshotS3Thread();
};
#else
class KeeperSnapshotManagerS3
{
public:
KeeperSnapshotManagerS3() = default;
void updateS3Configuration(const Poco::Util::AbstractConfiguration &) {}
void uploadSnapshot(const std::string &, [[maybe_unused]] bool async_upload = true) {}
void startup(const Poco::Util::AbstractConfiguration &) {}
void shutdown() {}
};
#endif
}

View File

@ -44,6 +44,7 @@ KeeperStateMachine::KeeperStateMachine(
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
const std::string & superdigest_)
: coordination_settings(coordination_settings_)
, snapshot_manager(
@ -59,6 +60,7 @@ KeeperStateMachine::KeeperStateMachine(
, log(&Poco::Logger::get("KeeperStateMachine"))
, superdigest(superdigest_)
, keeper_context(keeper_context_)
, snapshot_manager_s3(snapshot_manager_s3_)
{
}
@ -400,13 +402,22 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
when_done(ret, exception);
return ret ? latest_snapshot_path : "";
};
if (keeper_context->server_state == KeeperContext::Phase::SHUTDOWN)
{
LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled.");
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
auto snapshot_path = snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
if (!snapshot_path.empty() && snapshot_manager_s3)
{
LOG_INFO(log, "Uploading snapshot {} during shutdown because 'upload_snapshot_on_exit' is enabled.", snapshot_path);
snapshot_manager_s3->uploadSnapshot(snapshot_path, /* asnyc_upload */ false);
}
return;
}

View File

@ -2,11 +2,13 @@
#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperStorage.h>
#include <libnuraft/nuraft.hxx>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/logger_useful.h>
#include <Coordination/KeeperContext.h>
namespace DB
@ -26,6 +28,7 @@ public:
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
const std::string & superdigest_ = "");
/// Read state from the latest snapshot
@ -146,6 +149,8 @@ private:
const std::string superdigest;
KeeperContextPtr keeper_context;
KeeperSnapshotManagerS3 * snapshot_manager_s3;
};
}

View File

@ -1318,7 +1318,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings, keeper_context);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings, keeper_context, nullptr);
state_machine->init();
DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true, enable_compression);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
@ -1359,7 +1359,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
}
SnapshotsQueue snapshots_queue1{1};
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, "./snapshots", settings, keeper_context);
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, "./snapshots", settings, keeper_context, nullptr);
restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);
@ -1471,7 +1471,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings, keeper_context);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings, keeper_context, nullptr);
state_machine->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();

View File

@ -2,20 +2,22 @@
#include "config.h"
#include <string>
#include <vector>
#if USE_AWS_S3
#include <Common/RemoteHostFilter.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareIOStream.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/HeaderCollection.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
namespace Aws::Http::Standard
{
class StandardHttpResponse;
@ -23,6 +25,7 @@ class StandardHttpResponse;
namespace DB
{
class Context;
}

View File

@ -1,9 +1,11 @@
#include <IO/S3Common.h>
#include <Common/Exception.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "config.h"
#if USE_AWS_S3
# include <IO/S3Common.h>
# include <Common/quoteString.h>
# include <IO/WriteBufferFromString.h>
@ -780,25 +782,16 @@ namespace S3
boost::to_upper(name);
if (name != S3 && name != COS && name != OBS && name != OSS)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Object storage system name is unrecognized in virtual hosted style S3 URI: {}", quoteString(name));
}
if (name == S3)
{
storage_name = name;
}
else if (name == OBS)
{
storage_name = OBS;
}
else if (name == OSS)
{
storage_name = OSS;
}
else
{
storage_name = COSN;
}
}
else if (re2::RE2::PartialMatch(uri.getPath(), path_style_pattern, &bucket, &key))
{
@ -851,8 +844,82 @@ namespace S3
{
return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error, for_disk_s3).size;
}
}
}
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
namespace S3
{
AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
{
auto access_key_id = config.getString(config_elem + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + ".secret_access_key", "");
auto region = config.getString(config_elem + ".region", "");
auto server_side_encryption_customer_key_base64 = config.getString(config_elem + ".server_side_encryption_customer_key_base64", "");
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + ".use_environment_credentials"))
use_environment_credentials = config.getBool(config_elem + ".use_environment_credentials");
std::optional<bool> use_insecure_imds_request;
if (config.has(config_elem + ".use_insecure_imds_request"))
use_insecure_imds_request = config.getBool(config_elem + ".use_insecure_imds_request");
HeaderCollection headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem, subconfig_keys);
for (const std::string & subkey : subconfig_keys)
{
if (subkey.starts_with("header"))
{
auto header_str = config.getString(config_elem + "." + subkey);
auto delimiter = header_str.find(':');
if (delimiter == std::string::npos)
throw Exception("Malformed s3 header value", ErrorCodes::INVALID_CONFIG_PARAMETER);
headers.emplace_back(HttpHeader{header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos)});
}
}
return AuthSettings
{
std::move(access_key_id), std::move(secret_access_key),
std::move(region),
std::move(server_side_encryption_customer_key_base64),
std::move(headers),
use_environment_credentials,
use_insecure_imds_request
};
}
void AuthSettings::updateFrom(const AuthSettings & from)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
use_environment_credentials = from.use_environment_credentials;
use_insecure_imds_request = from.use_insecure_imds_request;
}
}
}

View File

@ -1,5 +1,11 @@
#pragma once
#include <Storages/HeaderCollection.h>
#include <IO/S3/PocoHTTPClient.h>
#include <string>
#include <optional>
#include "config.h"
#if USE_AWS_S3
@ -8,7 +14,6 @@
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Errors.h>
#include <IO/S3/PocoHTTPClient.h>
#include <Poco/URI.h>
#include <Common/Exception.h>
@ -27,8 +32,6 @@ namespace ErrorCodes
}
class RemoteHostFilter;
struct HttpHeader;
using HeaderCollection = std::vector<HttpHeader>;
class S3Exception : public Exception
{
@ -130,5 +133,33 @@ S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
}
#endif
namespace Poco::Util
{
class AbstractConfiguration;
};
namespace DB::S3
{
struct AuthSettings
{
static AuthSettings loadFromConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
std::string access_key_id;
std::string secret_access_key;
std::string region;
std::string server_side_encryption_customer_key_base64;
HeaderCollection headers;
std::optional<bool> use_environment_credentials;
std::optional<bool> use_insecure_imds_request;
bool operator==(const AuthSettings & other) const = default;
void updateFrom(const AuthSettings & from);
};
}

View File

@ -7,6 +7,7 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
#include <QueryPipeline/Pipe.h>
#include <Parsers/queryToString.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/UnionStep.h>
@ -26,7 +27,7 @@ namespace ErrorCodes
namespace ClusterProxy
{
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, Poco::Logger * log)
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info, Poco::Logger * log)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
@ -96,6 +97,20 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
new_settings.limit.changed = false;
}
/// Setting additional_table_filters may be applied to Distributed table.
/// In case if query is executed up to WithMergableState on remote shard, it is impossible to filter on initiator.
/// We need to propagate the setting, but change the table name from distributed to source.
///
/// Here we don't try to analyze setting again. In case if query_info->additional_filter_ast is not empty, some filter was applied.
/// It's just easier to add this filter for a source table.
if (query_info && query_info->additional_filter_ast)
{
Tuple tuple;
tuple.push_back(main_table.getShortName());
tuple.push_back(queryToString(query_info->additional_filter_ast));
new_settings.additional_table_filters.value.push_back(std::move(tuple));
}
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
@ -121,7 +136,7 @@ void executeQuery(
std::vector<QueryPlanPtr> plans;
SelectStreamFactory::Shards remote_shards;
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log);
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, main_table, &query_info, log);
new_context->getClientInfo().distributed_depth += 1;

View File

@ -35,7 +35,7 @@ class SelectStreamFactory;
///
/// @return new Context with adjusted settings
ContextMutablePtr updateSettingsForCluster(
const Cluster & cluster, ContextPtr context, const Settings & settings, Poco::Logger * log = nullptr);
const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table, const SelectQueryInfo * query_info = nullptr, Poco::Logger * log = nullptr);
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
/// `stream_factory` object encapsulates the logic of creating plans for a different type of query

View File

@ -117,7 +117,7 @@ struct URLBasedDataSourceConfiguration
struct StorageS3Configuration : URLBasedDataSourceConfiguration
{
S3Settings::AuthSettings auth_settings;
S3::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
};

View File

@ -419,14 +419,14 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
LOG_TRACE(log, "Checking {} blocks ({} are not cached){}", stat.numChildren, not_cached_blocks, " to clear old ones from ZooKeeper.");
}
zkutil::AsyncResponses<Coordination::ExistsResponse> exists_futures;
std::vector<std::string> exists_paths;
for (const String & block : blocks)
{
auto it = cached_block_stats.find(block);
if (it == cached_block_stats.end())
{
/// New block. Fetch its stat asynchronously.
exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block));
exists_paths.emplace_back(storage.zookeeper_path + "/blocks/" + block);
}
else
{
@ -436,14 +436,18 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
}
}
auto exists_size = exists_paths.size();
auto exists_results = zookeeper.exists(exists_paths);
/// Put fetched stats into the cache
for (auto & elem : exists_futures)
for (size_t i = 0; i < exists_size; ++i)
{
auto status = elem.second.get();
auto status = exists_results[i];
if (status.error != Coordination::Error::ZNONODE)
{
cached_block_stats.emplace(elem.first, std::make_pair(status.stat.ctime, status.stat.version));
timed_blocks.emplace_back(elem.first, status.stat.ctime, status.stat.version);
auto node_name = fs::path(exists_paths[i]).filename();
cached_block_stats.emplace(node_name, std::make_pair(status.stat.ctime, status.stat.version));
timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.version);
}
}

View File

@ -153,17 +153,19 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
::sort(children.begin(), children.end());
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(children.size());
auto children_num = children.size();
std::vector<std::string> paths;
paths.reserve(children_num);
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(fs::path(queue_path) / child));
paths.emplace_back(fs::path(queue_path) / child);
for (auto & future : futures)
auto results = zookeeper->get(paths);
for (size_t i = 0; i < children_num; ++i)
{
Coordination::GetResponse res = future.second.get();
auto res = results[i];
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
entry->znode_name = future.first;
entry->znode_name = children[i];
std::lock_guard lock(state_mutex);
@ -641,11 +643,11 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
LOG_DEBUG(log, "Pulling {} entries to queue: {} - {}", (end - begin), *begin, *last);
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(end - begin);
Strings get_paths;
get_paths.reserve(end - begin);
for (auto it = begin; it != end; ++it)
futures.emplace_back(*it, zookeeper->asyncGet(fs::path(zookeeper_path) / "log" / *it));
get_paths.emplace_back(fs::path(zookeeper_path) / "log" / *it);
/// Simultaneously add all new entries to the queue and move the pointer to the log.
@ -655,9 +657,11 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
std::optional<time_t> min_unprocessed_insert_time_changed;
for (auto & future : futures)
auto get_results = zookeeper->get(get_paths);
auto get_num = get_results.size();
for (size_t i = 0; i < get_num; ++i)
{
Coordination::GetResponse res = future.second.get();
auto res = get_results[i];
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));

View File

@ -99,19 +99,22 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & z
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
std::vector<std::future<Coordination::ExistsResponse>> replicas_status_futures;
replicas_status_futures.reserve(replicas.size());
Strings exists_paths;
for (const auto & replica : replicas)
if (replica != storage.replica_name)
replicas_status_futures.emplace_back(zookeeper->asyncExists(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active"));
exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active");
std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
auto exists_result = zookeeper->exists(exists_paths);
auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"});
size_t active_replicas = 1; /// Assume current replica is active (will check below)
for (auto & status : replicas_status_futures)
if (status.get().error == Coordination::Error::ZOK)
for (size_t i = 0; i < exists_paths.size(); ++i)
{
auto status = exists_result[i];
if (status.error == Coordination::Error::ZOK)
++active_replicas;
}
size_t replicas_number = replicas.size();
size_t quorum_size = getQuorumSize(replicas_number);
@ -135,8 +138,8 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & z
/// Both checks are implicitly made also later (otherwise there would be a race condition).
auto is_active = is_active_future.get();
auto host = host_future.get();
auto is_active = get_results[0];
auto host = get_results[1];
if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
throw Exception("Replica is not active right now", ErrorCodes::READONLY);

View File

@ -682,24 +682,20 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
auto client = getClient();
std::vector<std::future<Coordination::GetResponse>> values;
values.reserve(keys.size());
Strings full_key_paths;
full_key_paths.reserve(keys.size());
for (const auto & key : keys)
{
const auto full_path = fullPathForKey(key);
values.emplace_back(client->asyncTryGet(full_path));
full_key_paths.emplace_back(fullPathForKey(key));
}
auto wait_until = std::chrono::system_clock::now() + std::chrono::milliseconds(Coordination::DEFAULT_OPERATION_TIMEOUT_MS);
auto values = client->tryGet(full_key_paths);
for (size_t i = 0; i < keys.size(); ++i)
{
auto & value = values[i];
if (value.wait_until(wait_until) != std::future_status::ready)
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch values: timeout");
auto response = values[i];
auto response = value.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK)

View File

@ -3224,16 +3224,17 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
int32_t log_version,
MergeType merge_type)
{
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(parts.size());
Strings exists_paths;
exists_paths.reserve(parts.size());
for (const auto & part : parts)
exists_futures.emplace_back(zookeeper->asyncExists(fs::path(replica_path) / "parts" / part->name));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part->name);
auto exists_results = zookeeper->exists(exists_paths);
bool all_in_zk = true;
for (size_t i = 0; i < parts.size(); ++i)
{
/// If there is no information about part in ZK, we will not merge it.
if (exists_futures[i].get().error == Coordination::Error::ZNONODE)
if (exists_results[i].error == Coordination::Error::ZNONODE)
{
all_in_zk = false;
@ -6246,19 +6247,20 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(const Strin
auto zookeeper = getZooKeeper();
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(part_names.size());
Strings exists_paths;
exists_paths.reserve(part_names.size());
for (const String & part_name : part_names)
{
String part_path = fs::path(replica_path) / "parts" / part_name;
exists_futures.emplace_back(zookeeper->asyncExists(part_path));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part_name);
}
auto exists_results = zookeeper->exists(exists_paths);
std::vector<std::future<Coordination::MultiResponse>> remove_futures;
remove_futures.reserve(part_names.size());
for (size_t i = 0; i < part_names.size(); ++i)
{
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
Coordination::ExistsResponse exists_resp = exists_results[i];
if (exists_resp.error == Coordination::Error::ZOK)
{
Coordination::Requests ops;
@ -6304,9 +6306,9 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(const Strin
void StorageReplicatedMergeTree::removePartsFromZooKeeper(
zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, NameSet * parts_should_be_retried)
{
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
Strings exists_paths;
std::vector<std::future<Coordination::MultiResponse>> remove_futures;
exists_futures.reserve(part_names.size());
exists_paths.reserve(part_names.size());
remove_futures.reserve(part_names.size());
try
{
@ -6314,13 +6316,14 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
/// if zk session will be dropped
for (const String & part_name : part_names)
{
String part_path = fs::path(replica_path) / "parts" / part_name;
exists_futures.emplace_back(zookeeper->asyncExists(part_path));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part_name);
}
auto exists_results = zookeeper->exists(exists_paths);
for (size_t i = 0; i < part_names.size(); ++i)
{
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
auto exists_resp = exists_results[i];
if (exists_resp.error == Coordination::Error::ZOK)
{
Coordination::Requests ops;

View File

@ -197,7 +197,7 @@ public:
const S3::URI uri;
std::shared_ptr<const Aws::S3::S3Client> client;
S3Settings::AuthSettings auth_settings;
S3::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
/// If s3 configuration was passed from ast, then it is static.
@ -209,7 +209,7 @@ public:
S3Configuration(
const String & url_,
const S3Settings::AuthSettings & auth_settings_,
const S3::AuthSettings & auth_settings_,
const S3Settings::ReadWriteSettings & rw_settings_,
const HeaderCollection & headers_from_ast_)
: uri(S3::URI(url_))

View File

@ -1,5 +1,7 @@
#include <Storages/StorageS3Settings.h>
#include <IO/S3Common.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
@ -9,10 +11,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
@ -46,41 +44,8 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
if (config.has(config_elem + "." + key + ".endpoint"))
{
auto endpoint = get_string_for_key(key, "endpoint", false);
auto access_key_id = get_string_for_key(key, "access_key_id");
auto secret_access_key = get_string_for_key(key, "secret_access_key");
auto region = get_string_for_key(key, "region");
auto server_side_encryption_customer_key_base64 = get_string_for_key(key, "server_side_encryption_customer_key_base64");
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + "." + key + ".use_environment_credentials"))
use_environment_credentials = config.getBool(config_elem + "." + key + ".use_environment_credentials");
std::optional<bool> use_insecure_imds_request;
if (config.has(config_elem + "." + key + ".use_insecure_imds_request"))
use_insecure_imds_request = config.getBool(config_elem + "." + key + ".use_insecure_imds_request");
HeaderCollection headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem + "." + key, subconfig_keys);
for (const String & subkey : subconfig_keys)
{
if (subkey.starts_with("header"))
{
auto header_str = config.getString(config_elem + "." + key + "." + subkey);
auto delimiter = header_str.find(':');
if (delimiter == String::npos)
throw Exception("Malformed s3 header value", ErrorCodes::INVALID_CONFIG_PARAMETER);
headers.emplace_back(HttpHeader{header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos)});
}
}
S3Settings::AuthSettings auth_settings{
std::move(access_key_id), std::move(secret_access_key),
std::move(region),
std::move(server_side_encryption_customer_key_base64),
std::move(headers),
use_environment_credentials,
use_insecure_imds_request};
auto auth_settings = S3::AuthSettings::loadFromConfig(config_elem + "." + key, config);
S3Settings::ReadWriteSettings rw_settings;
rw_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);

View File

@ -9,6 +9,8 @@
#include <Interpreters/Context_fwd.h>
#include <Storages/HeaderCollection.h>
#include <IO/S3Common.h>
namespace Poco::Util
{
class AbstractConfiguration;
@ -21,46 +23,6 @@ struct Settings;
struct S3Settings
{
struct AuthSettings
{
String access_key_id;
String secret_access_key;
String region;
String server_side_encryption_customer_key_base64;
HeaderCollection headers;
std::optional<bool> use_environment_credentials;
std::optional<bool> use_insecure_imds_request;
inline bool operator==(const AuthSettings & other) const
{
return access_key_id == other.access_key_id && secret_access_key == other.secret_access_key
&& region == other.region
&& server_side_encryption_customer_key_base64 == other.server_side_encryption_customer_key_base64
&& headers == other.headers
&& use_environment_credentials == other.use_environment_credentials
&& use_insecure_imds_request == other.use_insecure_imds_request;
}
void updateFrom(const AuthSettings & from)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
use_environment_credentials = from.use_environment_credentials;
use_insecure_imds_request = from.use_insecure_imds_request;
}
};
struct ReadWriteSettings
{
size_t max_single_read_retries = 0;
@ -90,7 +52,7 @@ struct S3Settings
void updateFromSettingsIfEmpty(const Settings & settings);
};
AuthSettings auth_settings;
S3::AuthSettings auth_settings;
ReadWriteSettings rw_settings;
inline bool operator==(const S3Settings & other) const

View File

@ -58,7 +58,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
}
ColumnsDescription res;
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef());
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef(), table_id);
/// Expect only needed columns from the result of DESC TABLE. NOTE 'comment' column is ignored for compatibility reasons.
Block sample_block
@ -169,7 +169,7 @@ ColumnsDescriptionByShardNum getExtendedObjectsOfRemoteTables(
const auto & shards_info = cluster.getShardsInfo();
auto query = "DESC TABLE " + remote_table_id.getFullTableName();
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef());
auto new_context = ClusterProxy::updateSettingsForCluster(cluster, context, context->getSettingsRef(), remote_table_id);
new_context->setSetting("describe_extend_object_types", true);
/// Expect only needed columns from the result of DESC TABLE.

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,42 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>50</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,42 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,42 @@
<clickhouse>
<keeper_server>
<s3_snapshot>
<endpoint>http://minio1:9001/snapshots/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_snapshot>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,120 @@
import pytest
from helpers.cluster import ClickHouseCluster
from time import sleep
from kazoo.client import KazooClient
# from kazoo.protocol.serialization import Connect, read_buffer, write_buffer
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/keeper_config1.xml"],
stay_alive=True,
with_minio=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/keeper_config2.xml"],
stay_alive=True,
with_minio=True,
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/keeper_config3.xml"],
stay_alive=True,
with_minio=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
cluster.minio_client.make_bucket("snapshots")
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
)
_fake_zk_instance.start()
return _fake_zk_instance
def destroy_zk_client(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
def wait_node(node):
for _ in range(100):
zk = None
try:
zk = get_fake_zk(node.name, timeout=30.0)
zk.sync("/")
print("node", node.name, "ready")
break
except Exception as ex:
sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
destroy_zk_client(zk)
else:
raise Exception("Can't wait node", node.name, "to become ready")
def test_s3_upload(started_cluster):
node1_zk = get_fake_zk(node1.name)
# we defined in configs snapshot_distance as 50
# so after 50 requests we should generate a snapshot
for _ in range(210):
node1_zk.create("/test", sequence=True)
def get_saved_snapshots():
return [
obj.object_name
for obj in list(cluster.minio_client.list_objects("snapshots"))
]
saved_snapshots = get_saved_snapshots()
assert set(saved_snapshots) == set(
[
"snapshot_50.bin.zstd",
"snapshot_100.bin.zstd",
"snapshot_150.bin.zstd",
"snapshot_200.bin.zstd",
]
)
destroy_zk_client(node1_zk)
node1.stop_clickhouse(kill=True)
# wait for new leader to be picked and that it continues
# uploading snapshots
wait_node(node2)
node2_zk = get_fake_zk(node2.name)
for _ in range(200):
node2_zk.create("/test", sequence=True)
saved_snapshots = get_saved_snapshots()
assert len(saved_snapshots) > 4
success_upload_message = "Successfully uploaded"
assert node2.contains_in_log(success_upload_message) or node3.contains_in_log(
success_upload_message
)
destroy_zk_client(node2_zk)

View File

@ -11,11 +11,13 @@ node1 = cluster.add_instance(
"node1",
main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"],
with_zookeeper=True,
use_keeper=False,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"],
with_zookeeper=True,
use_keeper=False,
)

View File

@ -60,6 +60,14 @@ select * from remote('127.0.0.{1,2}', system.one) settings additional_table_filt
0
0
select * from remote('127.0.0.{1,2}', system.one) settings additional_table_filters={'system.one' : 'dummy != 0'};
select * from distr_table settings additional_table_filters={'distr_table' : 'x = 2'};
2 bb
2 bb
select * from distr_table settings additional_table_filters={'distr_table' : 'x != 2 and x != 3'};
1 a
4 dddd
1 a
4 dddd
select * from system.numbers limit 5;
0
1

View File

@ -1,3 +1,4 @@
-- Tags: distributed
drop table if exists table_1;
drop table if exists table_2;
drop table if exists v_numbers;
@ -6,6 +7,8 @@ drop table if exists mv_table;
create table table_1 (x UInt32, y String) engine = MergeTree order by x;
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
CREATE TABLE distr_table (x UInt32, y String) ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), 'table_1');
-- { echoOn }
select * from table_1;
@ -29,6 +32,9 @@ select x from table_1 prewhere x != 2 where x != 2 settings additional_table_fil
select * from remote('127.0.0.{1,2}', system.one) settings additional_table_filters={'system.one' : 'dummy = 0'};
select * from remote('127.0.0.{1,2}', system.one) settings additional_table_filters={'system.one' : 'dummy != 0'};
select * from distr_table settings additional_table_filters={'distr_table' : 'x = 2'};
select * from distr_table settings additional_table_filters={'distr_table' : 'x != 2 and x != 3'};
select * from system.numbers limit 5;
select * from system.numbers as t limit 5 settings additional_table_filters={'t' : 'number % 2 != 0'};
select * from system.numbers limit 5 settings additional_table_filters={'system.numbers' : 'number != 3'};

View File

@ -0,0 +1,3 @@
4 dddd
5 a
6 bb

View File

@ -0,0 +1,20 @@
-- Tags: no-parallel, distributed
create database if not exists shard_0;
create database if not exists shard_1;
drop table if exists dist_02346;
drop table if exists shard_0.data_02346;
drop table if exists shard_1.data_02346;
create table shard_0.data_02346 (x UInt32, y String) engine = MergeTree order by x settings index_granularity = 2;
insert into shard_0.data_02346 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
create table shard_1.data_02346 (x UInt32, y String) engine = MergeTree order by x settings index_granularity = 2;
insert into shard_1.data_02346 values (5, 'a'), (6, 'bb'), (7, 'ccc'), (8, 'dddd');
create table dist_02346 (x UInt32, y String) engine=Distributed('test_cluster_two_shards_different_databases', /* default_database= */ '', data_02346);
set max_rows_to_read=4;
select * from dist_02346 order by x settings additional_table_filters={'dist_02346' : 'x > 3 and x < 7'};

View File

@ -0,0 +1,30 @@
-- { echoOn }
set max_rows_to_read = 2;
select * from table_1 order by x settings additional_table_filters={'table_1' : 'x > 3'};
4 dddd
select * from table_1 order by x settings additional_table_filters={'table_1' : 'x < 3'};
1 a
2 bb
select * from table_1 order by x settings additional_table_filters={'table_1' : 'length(y) >= 3'};
3 ccc
4 dddd
select * from table_1 order by x settings additional_table_filters={'table_1' : 'length(y) < 3'};
1 a
2 bb
set max_rows_to_read = 4;
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'x > 3'};
4 dddd
4 dddd
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'x < 3'};
1 a
1 a
2 bb
2 bb
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'length(y) > 3'};
4 dddd
4 dddd
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'length(y) < 3'};
1 a
1 a
2 bb
2 bb

View File

@ -0,0 +1,24 @@
-- Tags: distributed
create table table_1 (x UInt32, y String, INDEX a (length(y)) TYPE minmax GRANULARITY 1) engine = MergeTree order by x settings index_granularity = 2;
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
CREATE TABLE distr_table (x UInt32, y String) ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), 'table_1');
-- { echoOn }
set max_rows_to_read = 2;
select * from table_1 order by x settings additional_table_filters={'table_1' : 'x > 3'};
select * from table_1 order by x settings additional_table_filters={'table_1' : 'x < 3'};
select * from table_1 order by x settings additional_table_filters={'table_1' : 'length(y) >= 3'};
select * from table_1 order by x settings additional_table_filters={'table_1' : 'length(y) < 3'};
set max_rows_to_read = 4;
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'x > 3'};
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'x < 3'};
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'length(y) > 3'};
select * from distr_table order by x settings additional_table_filters={'distr_table' : 'length(y) < 3'};

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS welch_ttest__fuzz_7;
CREATE TABLE welch_ttest__fuzz_7 (left UInt128, right UInt128) ENGINE = Memory;
INSERT INTO welch_ttest__fuzz_7 VALUES (0.010268, 0), (0.000167, 0), (0.000167, 0), (0.159258, 1), (0.136278, 1), (0.122389, 1);
SELECT roundBankers(welchTTest(left, right).2, 6) from welch_ttest__fuzz_7; -- { serverError 36 }
SELECT roundBankers(studentTTest(left, right).2, 6) from welch_ttest__fuzz_7; -- { serverError 36 }

View File

@ -0,0 +1,10 @@
0
0
0
0
0
0
0
0
0
0

View File

@ -0,0 +1,21 @@
-- This is a regression test for EINTR handling in MultiplexedConnections::getReplicaForReading()
select * from remote('127.{2,4}', view(
-- This is the emulation of the slow query, the server will return a line each 0.1 second
select sleep(0.1) from numbers(20) settings max_block_size=1)
)
-- LIMIT is to activate query cancellation in case of enough rows already read.
limit 10
settings
-- This is to avoid draining in background and got the exception during query execution
drain_timeout=-1,
-- This is to activate as much signals as possible to trigger EINTR
query_profiler_real_time_period_ns=1,
-- This is to use MultiplexedConnections
use_hedged_requests=0,
-- This is to make the initiator waiting for cancel packet in MultiplexedConnections::getReplicaForReading()
--
-- NOTE: that even smaller sleep will be enough to trigger this problem
-- with 100% probability, however just to make it more reliable, increase
-- it to 2 seconds.
sleep_in_receive_cancel_ms=2000;

View File

@ -63,7 +63,7 @@ int main(int argc, char *argv[])
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>();
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, argv[1], settings, keeper_context);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, argv[1], settings, keeper_context, nullptr);
state_machine->init();
size_t last_commited_index = state_machine->last_commit_index();