Fix build

This commit is contained in:
Antonio Andelic 2023-05-24 09:04:12 +00:00
parent 161afea266
commit 5db2160762
8 changed files with 358 additions and 183 deletions

View File

@ -42,6 +42,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
{
auto keeper_context = std::make_shared<KeeperContext>(true);
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>(), 0));
DB::KeeperStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);
@ -52,7 +53,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta);
DB::KeeperSnapshotManager manager(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>(), 0), 1, keeper_context);
DB::KeeperSnapshotManager manager(1, keeper_context);
auto snp = manager.serializeSnapshotToBuffer(snapshot);
auto path = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
std::cout << "Snapshot serialized to path:" << path << std::endl;

View File

@ -76,16 +76,32 @@ DiskPtr KeeperContext::getCurrentLogDisk() const
return getDisk(current_log_storage);
}
DiskPtr KeeperContext::getSnapshotsDisk() const
void KeeperContext::setLogDisk(DiskPtr disk)
{
log_storage = disk;
current_log_storage = std::move(disk);
}
DiskPtr KeeperContext::getSnapshotDisk() const
{
return getDisk(snapshot_storage);
}
void KeeperContext::setSnapshotDisk(DiskPtr disk)
{
snapshot_storage = std::move(disk);
}
DiskPtr KeeperContext::getStateFileDisk() const
{
return getDisk(state_file_storage);
}
void KeeperContext::setStateFileDisk(DiskPtr disk)
{
state_file_storage = std::move(disk);
}
KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::AbstractConfiguration & config) const
{
const auto create_local_disk = [](const auto & path)

View File

@ -34,8 +34,13 @@ public:
DiskPtr getCurrentLogDisk() const;
DiskPtr getLogDisk() const;
DiskPtr getSnapshotsDisk() const;
void setLogDisk(DiskPtr disk);
DiskPtr getSnapshotDisk() const;
void setSnapshotDisk(DiskPtr disk);
DiskPtr getStateFileDisk() const;
void setStateFileDisk(DiskPtr disk);
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;

View File

@ -336,28 +336,36 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
snapshot_s3.startup(config, macros);
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue, snapshot_s3, [this](const KeeperStorage::RequestForSession & request_for_session)
{
/// check if we have queue of read requests depending on this request to be committed
std::lock_guard lock(read_request_queue_mutex);
if (auto it = read_request_queue.find(request_for_session.session_id); it != read_request_queue.end())
server = std::make_unique<KeeperServer>(
configuration_and_settings,
config,
responses_queue,
snapshots_queue,
standalone_keeper,
snapshot_s3,
[this](const KeeperStorage::RequestForSession & request_for_session)
{
auto & xid_to_request_queue = it->second;
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid); request_queue_it != xid_to_request_queue.end())
/// check if we have queue of read requests depending on this request to be committed
std::lock_guard lock(read_request_queue_mutex);
if (auto it = read_request_queue.find(request_for_session.session_id); it != read_request_queue.end())
{
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
auto & xid_to_request_queue = it->second;
xid_to_request_queue.erase(request_queue_it);
if (auto request_queue_it = xid_to_request_queue.find(request_for_session.request->xid);
request_queue_it != xid_to_request_queue.end())
{
for (const auto & read_request : request_queue_it->second)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(read_request);
else
addErrorResponses({read_request}, Coordination::Error::ZCONNECTIONLOSS);
}
xid_to_request_queue.erase(request_queue_it);
}
}
}
});
});
try
{

View File

@ -108,13 +108,14 @@ KeeperServer::KeeperServer(
const Poco::Util::AbstractConfiguration & config,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
bool standalone_keeper,
KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback)
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer"))
, is_recovering(config.getBool("keeper_server.force_recovery", false))
, keeper_context{std::make_shared<KeeperContext>(true)}
, keeper_context{std::make_shared<KeeperContext>(standalone_keeper)}
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
{
if (coordination_settings->quorum_reads)

View File

@ -72,6 +72,7 @@ public:
const Poco::Util::AbstractConfiguration & config_,
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
bool standalone_keeper,
KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback);

View File

@ -676,7 +676,7 @@ SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot()
DiskPtr KeeperSnapshotManager::getDisk() const
{
return keeper_context->getSnapshotsDisk();
return keeper_context->getSnapshotDisk();
}
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()

File diff suppressed because it is too large Load Diff