Merge pull request #25428 from ClickHouse/zookeeper_snapshots

Tool for conversion of ZooKeeper data into clickhouse-keeper snapshot
This commit is contained in:
alesapin 2021-06-29 12:58:45 +03:00 committed by GitHub
commit cdc95fa98a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1383 additions and 18 deletions

View File

@ -1,6 +1,8 @@
# docker build -t yandex/clickhouse-integration-test .
FROM yandex/clickhouse-test-base
SHELL ["/bin/bash", "-c"]
RUN apt-get update \
&& env DEBIAN_FRONTEND=noninteractive apt-get -y install \
tzdata \
@ -20,7 +22,9 @@ RUN apt-get update \
krb5-user \
iproute2 \
lsof \
g++
g++ \
default-jre
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
@ -30,6 +34,19 @@ RUN apt-get clean
# Install MySQL ODBC driver
RUN curl 'https://cdn.mysql.com//Downloads/Connector-ODBC/8.0/mysql-connector-odbc-8.0.21-linux-glibc2.12-x86-64bit.tar.gz' --output 'mysql-connector.tar.gz' && tar -xzf mysql-connector.tar.gz && cd mysql-connector-odbc-8.0.21-linux-glibc2.12-x86-64bit/lib && mv * /usr/local/lib && ln -s /usr/local/lib/libmyodbc8a.so /usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so
# Unfortunately this is required for a single test for conversion data from zookeeper to clickhouse-keeper.
# ZooKeeper is not started by default, but consumes some space in containers.
# 777 perms used to allow anybody to start/stop ZooKeeper
ENV ZOOKEEPER_VERSION='3.6.3'
RUN curl -O "https://mirrors.estointernet.in/apache/zookeeper/zookeeper-${ZOOKEEPER_VERSION}/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz"
RUN tar -zxvf apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz && mv apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/zookeeper && chmod -R 777 /opt/zookeeper && rm apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz
RUN echo $'tickTime=2500 \n\
tickTime=2500 \n\
dataDir=/zookeeper \n\
clientPort=2181 \n\
maxClientCnxns=80' > /opt/zookeeper/conf/zoo.cfg
RUN mkdir /zookeeper && chmod -R 777 /zookeeper
ENV TZ=Europe/Moscow
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

View File

@ -50,11 +50,15 @@ option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories"
option (ENABLE_CLICKHOUSE_KEEPER "ClickHouse alternative to ZooKeeper" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_KEEPER_CONVERTER "Util allows to convert ZooKeeper logs and snapshots into clickhouse-keeper snapshot" ${ENABLE_CLICKHOUSE_ALL})
if (NOT USE_NURAFT)
# RECONFIGURE_MESSAGE_LEVEL should not be used here,
# since USE_NURAFT is set to OFF for FreeBSD and Darwin.
message (STATUS "clickhouse-keeper will not be built (lack of NuRaft)")
message (STATUS "clickhouse-keeper and clickhouse-keeper-converter will not be built (lack of NuRaft)")
set(ENABLE_CLICKHOUSE_KEEPER OFF)
set(ENABLE_CLICKHOUSE_KEEPER_CONVERTER OFF)
endif()
if (CLICKHOUSE_SPLIT_BINARY)
@ -150,6 +154,12 @@ else()
message(STATUS "ClickHouse keeper mode: OFF")
endif()
if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER)
message(STATUS "ClickHouse keeper-converter mode: ON")
else()
message(STATUS "ClickHouse keeper-converter mode: OFF")
endif()
if(NOT (MAKE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES))
set(CLICKHOUSE_ONE_SHARED ON)
endif()
@ -222,6 +232,10 @@ if (ENABLE_CLICKHOUSE_KEEPER)
add_subdirectory (keeper)
endif()
if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER)
add_subdirectory (keeper-converter)
endif()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()
@ -231,9 +245,51 @@ if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
endif ()
if (CLICKHOUSE_ONE_SHARED)
add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES})
target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_KEEPER_LINK})
target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_GIT_IMPORT_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_KEEPER_INCLUDE})
add_library(clickhouse-lib SHARED
${CLICKHOUSE_SERVER_SOURCES}
${CLICKHOUSE_CLIENT_SOURCES}
${CLICKHOUSE_LOCAL_SOURCES}
${CLICKHOUSE_BENCHMARK_SOURCES}
${CLICKHOUSE_COPIER_SOURCES}
${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES}
${CLICKHOUSE_COMPRESSOR_SOURCES}
${CLICKHOUSE_FORMAT_SOURCES}
${CLICKHOUSE_OBFUSCATOR_SOURCES}
${CLICKHOUSE_GIT_IMPORT_SOURCES}
${CLICKHOUSE_ODBC_BRIDGE_SOURCES}
${CLICKHOUSE_KEEPER_SOURCES}
${CLICKHOUSE_KEEPER_CONVERTER_SOURCES})
target_link_libraries(clickhouse-lib
${CLICKHOUSE_SERVER_LINK}
${CLICKHOUSE_CLIENT_LINK}
${CLICKHOUSE_LOCAL_LINK}
${CLICKHOUSE_BENCHMARK_LINK}
${CLICKHOUSE_COPIER_LINK}
${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK}
${CLICKHOUSE_COMPRESSOR_LINK}
${CLICKHOUSE_FORMAT_LINK}
${CLICKHOUSE_OBFUSCATOR_LINK}
${CLICKHOUSE_GIT_IMPORT_LINK}
${CLICKHOUSE_ODBC_BRIDGE_LINK}
${CLICKHOUSE_KEEPER_LINK}
${CLICKHOUSE_KEEPER_CONVERTER_LINK})
target_include_directories(clickhouse-lib
${CLICKHOUSE_SERVER_INCLUDE}
${CLICKHOUSE_CLIENT_INCLUDE}
${CLICKHOUSE_LOCAL_INCLUDE}
${CLICKHOUSE_BENCHMARK_INCLUDE}
${CLICKHOUSE_COPIER_INCLUDE}
${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE}
${CLICKHOUSE_COMPRESSOR_INCLUDE}
${CLICKHOUSE_FORMAT_INCLUDE}
${CLICKHOUSE_OBFUSCATOR_INCLUDE}
${CLICKHOUSE_GIT_IMPORT_INCLUDE}
${CLICKHOUSE_ODBC_BRIDGE_INCLUDE}
${CLICKHOUSE_KEEPER_INCLUDE}
${CLICKHOUSE_KEEPER_CONVERTER_INCLUDE})
set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "")
install (TARGETS clickhouse-lib LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT clickhouse)
endif()
@ -264,6 +320,10 @@ if (CLICKHOUSE_SPLIT_BINARY)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper-converter)
endif ()
set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS})
@ -314,6 +374,9 @@ else ()
if (ENABLE_CLICKHOUSE_KEEPER)
clickhouse_target_link_split_lib(clickhouse keeper)
endif()
if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER)
clickhouse_target_link_split_lib(clickhouse keeper-converter)
endif()
if (ENABLE_CLICKHOUSE_INSTALL)
clickhouse_target_link_split_lib(clickhouse install)
endif ()
@ -374,6 +437,11 @@ else ()
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-keeper)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER)
add_custom_target (clickhouse-keeper-converter ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-keeper-converter DEPENDS clickhouse)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper-converter" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-keeper-converter)
endif ()
install (TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -17,3 +17,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_ODBC_BRIDGE
#cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER
#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER

View File

@ -0,0 +1,9 @@
set (CLICKHOUSE_KEEPER_CONVERTER_SOURCES KeeperConverter.cpp)
set (CLICKHOUSE_KEEPER_CONVERTER_LINK
PRIVATE
boost::program_options
dbms
)
clickhouse_program_add(keeper-converter)

View File

@ -0,0 +1,61 @@
#include <iostream>
#include <optional>
#include <boost/program_options.hpp>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ZooKeeperDataReader.h>
#include <Common/TerminalSize.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
{
using namespace DB;
namespace po = boost::program_options;
po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help,h", "produce help message")
("zookeeper-logs-dir", po::value<std::string>(), "Path to directory with ZooKeeper logs")
("zookeeper-snapshots-dir", po::value<std::string>(), "Path to directory with ZooKeeper snapshots")
("output-dir", po::value<std::string>(), "Directory to place output clickhouse-keeper snapshot")
;
po::variables_map options;
po::store(po::command_line_parser(argc, argv).options(desc).run(), options);
Poco::AutoPtr<Poco::ConsoleChannel> console_channel(new Poco::ConsoleChannel);
Poco::Logger * logger = &Poco::Logger::get("KeeperConverter");
logger->setChannel(console_channel);
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " --zookeeper-logs-dir /var/lib/zookeeper/data/version-2 --zookeeper-snapshots-dir /var/lib/zookeeper/data/version-2 --output-dir /var/lib/clickhouse/coordination/snapshots" << std::endl;
std::cout << desc << std::endl;
return 0;
}
try
{
DB::KeeperStorage storage(500, "");
DB::deserializeKeeperStorageFromSnapshotsDir(storage, options["zookeeper-snapshots-dir"].as<std::string>(), logger);
DB::deserializeLogsAndApplyToStorage(storage, options["zookeeper-logs-dir"].as<std::string>(), logger);
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta);
DB::KeeperSnapshotManager manager(options["output-dir"].as<std::string>(), 1);
auto snp = manager.serializeSnapshotToBuffer(snapshot);
auto path = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
std::cout << "Snapshot serialized to path:" << path << std::endl;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(true) << '\n';
return getCurrentExceptionCode();
}
return 0;
}

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseKeeperConverter(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseKeeperConverter(argc_, argv_); }

View File

@ -59,6 +59,9 @@ int mainEntryClickHouseGitImport(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_KEEPER
int mainEntryClickHouseKeeper(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_KEEPER
int mainEntryClickHouseKeeperConverter(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_INSTALL
int mainEntryClickHouseInstall(int argc, char ** argv);
int mainEntryClickHouseStart(int argc, char ** argv);
@ -119,6 +122,9 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_KEEPER
{"keeper", mainEntryClickHouseKeeper},
#endif
#if ENABLE_CLICKHOUSE_KEEPER_CONVERTER
{"keeper-converter", mainEntryClickHouseKeeperConverter},
#endif
#if ENABLE_CLICKHOUSE_INSTALL
{"install", mainEntryClickHouseInstall},
{"start", mainEntryClickHouseStart},

View File

@ -239,6 +239,53 @@ void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const
Coordination::write(stat, out);
}
void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
Coordination::write(acls, out);
Coordination::write(version, out);
}
void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
Coordination::read(acls, in);
Coordination::read(version, in);
}
void ZooKeeperSetACLResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(stat, out);
}
void ZooKeeperSetACLResponse::readImpl(ReadBuffer & in)
{
Coordination::read(stat, in);
}
void ZooKeeperGetACLRequest::readImpl(ReadBuffer & in)
{
Coordination::read(path, in);
}
void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
}
void ZooKeeperGetACLResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(acl, out);
Coordination::write(stat, out);
}
void ZooKeeperGetACLResponse::readImpl(ReadBuffer & in)
{
Coordination::read(acl, in);
Coordination::read(stat, in);
}
void ZooKeeperCheckRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(path, out);
@ -454,6 +501,8 @@ ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::ma
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared<ZooKeeperCheckResponse>(); }
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared<ZooKeeperMultiResponse>(requests); }
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperSetACLResponse>(); }
ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return std::make_shared<ZooKeeperGetACLResponse>(); }
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
@ -545,6 +594,8 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
}
}

View File

@ -183,6 +183,9 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
/// During recovery from log we don't rehash ACLs
bool need_to_hash_acls = true;
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
@ -350,6 +353,48 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
size_t bytesSize() const override { return ErrorResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::SetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetACLRequest::bytesSize() + sizeof(xid); }
bool need_to_hash_acls = true;
};
struct ZooKeeperSetACLResponse final : SetACLResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::SetACL; }
size_t bytesSize() const override { return SetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::GetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
size_t bytesSize() const override { return GetACLRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::GetACL; }
size_t bytesSize() const override { return GetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
{
OpNum getOpNum() const override { return OpNum::Multi; }

View File

@ -22,6 +22,8 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
static_cast<int32_t>(OpNum::SetACL),
static_cast<int32_t>(OpNum::GetACL),
};
std::string toString(OpNum op_num)
@ -58,6 +60,10 @@ std::string toString(OpNum op_num)
return "Auth";
case OpNum::SessionID:
return "SessionID";
case OpNum::SetACL:
return "SetACL";
case OpNum::GetACL:
return "GetACL";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -23,6 +23,8 @@ enum class OpNum : int32_t
Exists = 3,
Get = 4,
Set = 5,
GetACL = 6,
SetACL = 7,
SimpleList = 8,
Sync = 9,
Heartbeat = 11,

View File

@ -9,6 +9,14 @@ void write(size_t x, WriteBuffer & out)
writeBinary(x, out);
}
#ifdef __APPLE__
void write(uint64_t x, WriteBuffer & out)
{
x = __builtin_bswap64(x);
writeBinary(x, out);
}
#endif
void write(int64_t x, WriteBuffer & out)
{
x = __builtin_bswap64(x);
@ -63,6 +71,14 @@ void write(const Error & x, WriteBuffer & out)
write(static_cast<int32_t>(x), out);
}
#ifdef __APPLE__
void read(uint64_t & x, ReadBuffer & in)
{
readBinary(x, in);
x = __builtin_bswap64(x);
}
#endif
void read(size_t & x, ReadBuffer & in)
{
readBinary(x, in);

View File

@ -14,6 +14,12 @@ namespace Coordination
using namespace DB;
void write(size_t x, WriteBuffer & out);
/// uint64_t != size_t on darwin
#ifdef __APPLE__
void write(uint64_t x, WriteBuffer & out);
#endif
void write(int64_t x, WriteBuffer & out);
void write(int32_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out);
@ -39,6 +45,9 @@ void write(const std::vector<T> & arr, WriteBuffer & out)
}
void read(size_t & x, ReadBuffer & in);
#ifdef __APPLE__
void read(uint64_t & x, ReadBuffer & in);
#endif
void read(int64_t & x, ReadBuffer & in);
void read(int32_t & x, ReadBuffer & in);
void read(OpNum & x, ReadBuffer & in);

View File

@ -99,6 +99,10 @@ namespace
node.acl_id = acl_map.convertACLs(acls);
}
/// Some strange ACLID during deserialization from ZooKeeper
if (node.acl_id == std::numeric_limits<uint64_t>::max())
node.acl_id = 0;
acl_map.addUsage(node.acl_id);
readBinary(node.is_sequental, in);
@ -217,12 +221,14 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
if (current_version >= SnapshotVersion::V1)
{
size_t acls_map_size;
readBinary(acls_map_size, in);
size_t current_map_size = 0;
while (current_map_size < acls_map_size)
{
uint64_t acl_id;
readBinary(acl_id, in);
size_t acls_size;
readBinary(acls_size, in);
Coordination::ACLs acls;
@ -345,11 +351,23 @@ KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_
for (const auto & p : fs::directory_iterator(snapshots_path))
{
if (startsWith(p.path(), "tmp_")) /// Unfinished tmp files
const auto & path = p.path();
if (!path.has_filename())
continue;
if (startsWith(path.filename(), "tmp_")) /// Unfinished tmp files
{
std::filesystem::remove(p);
continue;
}
/// Not snapshot file
if (!startsWith(path.filename(), "snapshot_"))
{
continue;
}
size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path());
existing_snapshots[snapshot_up_to] = p.path();
}

View File

@ -57,7 +57,7 @@ static String generateDigest(const String & userdata)
{
std::vector<String> user_password;
boost::split(user_password, userdata, [](char c) { return c == ':'; });
return user_password[0] + ":" + base64Encode(getSHA1(user_password[1]));
return user_password[0] + ":" + base64Encode(getSHA1(userdata));
}
static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths)
@ -77,8 +77,10 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c
return true;
for (const auto & session_auth : session_auths)
{
if (node_acl.scheme == session_auth.scheme && node_acl.id == session_auth.id)
return true;
}
}
}
@ -88,7 +90,8 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c
static bool fixupACL(
const std::vector<Coordination::ACL> & request_acls,
const std::vector<KeeperStorage::AuthID> & current_ids,
std::vector<Coordination::ACL> & result_acls)
std::vector<Coordination::ACL> & result_acls,
bool hash_acls)
{
if (request_acls.empty())
return true;
@ -121,7 +124,8 @@ static bool fixupACL(
return false;
valid_found = true;
new_acl.id = generateDigest(new_acl.id);
if (hash_acls)
new_acl.id = generateDigest(new_acl.id);
result_acls.push_back(new_acl);
}
}
@ -263,12 +267,13 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
}
else
{
auto & session_auth_ids = storage.session_and_auth[session_id];
KeeperStorage::Node created_node;
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls))
if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls))
{
response.error = Coordination::Error::ZINVALIDACL;
return {response_ptr, {}};
@ -280,6 +285,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
created_node.acl_id = acl_id;
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
created_node.stat.pzxid = zxid;
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
created_node.stat.mtime = created_node.stat.ctime;
created_node.stat.numChildren = 0;
@ -302,12 +308,15 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
}
auto child_path = getBaseName(path_created);
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & parent)
int64_t prev_parent_zxid;
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid] (KeeperStorage::Node & parent)
{
/// Increment sequential number even if node is not sequential
++parent.seq_num;
parent.children.insert(child_path);
++parent.stat.cversion;
prev_parent_zxid = parent.stat.pzxid;
parent.stat.pzxid = zxid;
++parent.stat.numChildren;
});
@ -317,7 +326,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (request.is_ephemeral)
ephemerals[session_id].emplace(path_created);
undo = [&storage, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
undo = [&storage, prev_parent_zxid, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id]
{
storage.container.erase(path_created);
storage.acl_map.removeUsage(acl_id);
@ -325,11 +334,12 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
if (is_ephemeral)
storage.ephemerals[session_id].erase(path_created);
storage.container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent)
storage.container.updateValue(parent_path, [child_path, prev_parent_zxid] (KeeperStorage::Node & undo_parent)
{
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
--undo_parent.seq_num;
undo_parent.stat.pzxid = prev_parent_zxid;
undo_parent.children.erase(child_path);
});
};
@ -536,6 +546,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest
}
else if (request.version == -1 || request.version == it->value.stat.version)
{
auto prev_node = it->value;
auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value)
@ -667,6 +678,111 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageSetACLRequest final : public KeeperStorageRequest
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
auto & container = storage.container;
auto it = container.find(zk_request->getPath());
if (it == container.end())
return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
const auto & session_auths = storage.session_and_auth[session_id];
return checkACL(Coordination::ACL::Admin, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
{
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr);
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
auto it = container.find(request.path);
if (it == container.end())
{
response.error = Coordination::Error::ZNONODE;
}
else if (request.version != -1 && request.version != it->value.stat.aversion)
{
response.error = Coordination::Error::ZBADVERSION;
}
else
{
auto & session_auth_ids = storage.session_and_auth[session_id];
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls))
{
response.error = Coordination::Error::ZINVALIDACL;
return {response_ptr, {}};
}
uint64_t acl_id = storage.acl_map.convertACLs(node_acls);
storage.acl_map.addUsage(acl_id);
storage.container.updateValue(request.path, [acl_id] (KeeperStorage::Node & node)
{
node.acl_id = acl_id;
++node.stat.aversion;
});
response.stat = it->value.stat;
response.error = Coordination::Error::ZOK;
}
/// It cannot be used insied multitransaction?
return { response_ptr, {} };
}
};
struct KeeperStorageGetACLRequest final : public KeeperStorageRequest
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
auto & container = storage.container;
auto it = container.find(zk_request->getPath());
if (it == container.end())
return true;
const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id);
if (node_acls.empty())
return true;
const auto & session_auths = storage.session_and_auth[session_id];
/// LOL, GetACL require more permissions, then SetACL...
return checkACL(Coordination::ACL::Admin | Coordination::ACL::Read, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
auto & container = storage.container;
auto it = container.find(request.path);
if (it == container.end())
{
response.error = Coordination::Error::ZNONODE;
}
else
{
response.stat = it->value.stat;
response.acl = storage.acl_map.convertNumber(it->value.acl_id);
}
return {response_ptr, {}};
}
};
struct KeeperStorageMultiRequest final : public KeeperStorageRequest
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
@ -893,10 +1009,12 @@ KeeperWrapperFactory::KeeperWrapperFactory()
registerKeeperRequestWrapper<Coordination::OpNum::SimpleList, KeeperStorageListRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Check, KeeperStorageCheckRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Multi, KeeperStorageMultiRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::SetACL, KeeperStorageSetACLRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::GetACL, KeeperStorageGetACLRequest>(*this);
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid)
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid, bool check_acl)
{
KeeperStorage::ResponsesForSessions results;
if (new_last_zxid)
@ -954,7 +1072,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
Coordination::ZooKeeperResponsePtr response;
if (!storage_request->checkAuth(*this, session_id))
if (check_acl && !storage_request->checkAuth(*this, session_id))
{
response = zk_request->makeResponse();
/// Original ZooKeeper always throws no auth, even when user provided some credentials

View File

@ -116,7 +116,7 @@ public:
session_expiry_queue.update(session_id, session_timeout_ms);
}
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional<int64_t> new_last_zxid);
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional<int64_t> new_last_zxid, bool check_acl = true);
void finalize();

View File

@ -0,0 +1,555 @@
#include <Coordination/ZooKeeperDataReader.h>
#include <filesystem>
#include <cstdlib>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <IO/ReadBufferFromFile.h>
#include <string>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CORRUPTED_DATA;
}
static String parentPath(const String & path)
{
auto rslash_pos = path.rfind('/');
if (rslash_pos > 0)
return path.substr(0, rslash_pos);
return "/";
}
static std::string getBaseName(const String & path)
{
size_t basename_start = path.rfind('/');
return std::string{&path[basename_start + 1], path.length() - basename_start - 1};
}
int64_t getZxidFromName(const std::string & filename)
{
std::filesystem::path path(filename);
std::string extension = path.extension();
char * end;
int64_t zxid = std::strtoul(extension.data() + 1, &end, 16);
return zxid;
}
void deserializeSnapshotMagic(ReadBuffer & in)
{
int32_t magic_header, version;
int64_t dbid;
Coordination::read(magic_header, in);
Coordination::read(version, in);
if (version != 2)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot deserialize ZooKeeper data other than version 2, got version {}", version);
Coordination::read(dbid, in);
static constexpr int32_t SNP_HEADER = 1514885966; /// "ZKSN"
if (magic_header != SNP_HEADER)
throw Exception(ErrorCodes::CORRUPTED_DATA ,"Incorrect magic header in file, expected {}, got {}", SNP_HEADER, magic_header);
}
int64_t deserializeSessionAndTimeout(KeeperStorage & storage, ReadBuffer & in)
{
int32_t count;
Coordination::read(count, in);
int64_t max_session_id = 0;
while (count > 0)
{
int64_t session_id;
int32_t timeout;
Coordination::read(session_id, in);
Coordination::read(timeout, in);
storage.addSessionID(session_id, timeout);
max_session_id = std::max(session_id, max_session_id);
count--;
}
return max_session_id;
}
void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in)
{
int32_t count;
Coordination::read(count, in);
while (count > 0)
{
int64_t map_index;
Coordination::read(map_index, in);
Coordination::ACLs acls;
int32_t acls_len;
Coordination::read(acls_len, in);
while (acls_len > 0)
{
Coordination::ACL acl;
Coordination::read(acl.permissions, in);
Coordination::read(acl.scheme, in);
Coordination::read(acl.id, in);
acls.push_back(acl);
acls_len--;
}
storage.acl_map.addMapping(map_index, acls);
count--;
}
}
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * log)
{
int64_t max_zxid = 0;
std::string path;
Coordination::read(path, in);
size_t count = 0;
while (path != "/")
{
KeeperStorage::Node node{};
Coordination::read(node.data, in);
Coordination::read(node.acl_id, in);
/// Deserialize stat
Coordination::read(node.stat.czxid, in);
Coordination::read(node.stat.mzxid, in);
/// For some reason ZXID specified in filename can be smaller
/// then actual zxid from nodes. In this case we will use zxid from nodes.
max_zxid = std::max(max_zxid, node.stat.mzxid);
Coordination::read(node.stat.ctime, in);
Coordination::read(node.stat.mtime, in);
Coordination::read(node.stat.version, in);
Coordination::read(node.stat.cversion, in);
Coordination::read(node.stat.aversion, in);
Coordination::read(node.stat.ephemeralOwner, in);
Coordination::read(node.stat.pzxid, in);
if (!path.empty())
{
node.stat.dataLength = node.data.length();
node.seq_num = node.stat.cversion;
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
storage.acl_map.addUsage(node.acl_id);
}
Coordination::read(path, in);
count++;
if (count % 1000 == 0)
LOG_INFO(log, "Deserialized nodes from snapshot: {}", count);
}
for (const auto & itr : storage.container)
{
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; });
}
}
return max_zxid;
}
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log)
{
LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path);
int64_t zxid = getZxidFromName(snapshot_path);
ReadBufferFromFile reader(snapshot_path);
deserializeSnapshotMagic(reader);
LOG_INFO(log, "Magic deserialized, looks OK");
auto max_session_id = deserializeSessionAndTimeout(storage, reader);
LOG_INFO(log, "Sessions and timeouts deserialized");
storage.session_id_counter = max_session_id;
deserializeACLMap(storage, reader);
LOG_INFO(log, "ACLs deserialized");
LOG_INFO(log, "Deserializing data from snapshot");
int64_t zxid_from_nodes = deserializeStorageData(storage, reader, log);
storage.zxid = std::max(zxid, zxid_from_nodes);
LOG_INFO(log, "Finished, snapshot ZXID {}", storage.zxid);
}
void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log)
{
namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_snapshots;
for (const auto & p : fs::directory_iterator(path))
{
const auto & log_path = p.path();
if (!log_path.has_filename() || !startsWith(log_path.filename(), "snapshot."))
continue;
int64_t zxid = getZxidFromName(log_path);
existing_snapshots[zxid] = p.path();
}
LOG_INFO(log, "Totally have {} snapshots, will use latest", existing_snapshots.size());
/// deserialize only from latest snapshot
if (!existing_snapshots.empty())
deserializeKeeperStorageFromSnapshot(storage, existing_snapshots.rbegin()->second, log);
else
throw Exception(ErrorCodes::CORRUPTED_DATA, "No snapshots found on path {}. At least one snapshot must exist.", path);
}
void deserializeLogMagic(ReadBuffer & in)
{
int32_t magic_header, version;
int64_t dbid;
Coordination::read(magic_header, in);
Coordination::read(version, in);
Coordination::read(dbid, in);
static constexpr int32_t LOG_HEADER = 1514884167; /// "ZKLG"
if (magic_header != LOG_HEADER)
throw Exception(ErrorCodes::CORRUPTED_DATA ,"Incorrect magic header in file, expected {}, got {}", LOG_HEADER, magic_header);
if (version != 2)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,"Cannot deserialize ZooKeeper data other than version 2, got version {}", version);
}
/// For some reason zookeeper stores slightly different records in log then
/// requests. For example:
/// class CreateTxn {
/// ustring path;
/// buffer data;
/// vector<org.apache.zookeeper.data.ACL> acl;
/// boolean ephemeral;
/// int parentCVersion;
/// }
/// But Create Request:
/// class CreateRequest {
/// ustring path;
/// buffer data;
/// vector<org.apache.zookeeper.data.ACL> acl;
/// int flags;
/// }
///
/// However type is the same OpNum...
///
/// Also there is a comment in ZooKeeper's code base about log structure, but
/// it's almost completely incorrect. Actual ZooKeeper log structure starting from version 3.6+:
///
/// Magic Header: "ZKLG" + 4 byte version + 8 byte dbid.
/// After that goes serialized transactions, in the following format:
/// 8 byte checksum
/// 4 byte transaction length
/// 8 byte session_id (author of the transaction)
/// 4 byte user XID
/// 8 byte ZXID
/// 8 byte transaction time
/// 4 byte transaction type (OpNum)
/// [Transaction body depending on transaction type]
/// 12 bytes tail (starting from 3.6+): 4 byte version + 8 byte checksum of data tree
/// 1 byte -- 0x42
///
/// Transaction body is quite simple for all kinds of transactions except
/// Multitransactions. Their structure is following:
/// 4 byte sub transactions count
/// 4 byte sub transaction length
/// [Transaction body depending on transaction type]
/// and so on
///
/// Gotchas:
///
/// 1) For some reason ZooKeeper store ErrorTxn's in log. It's
/// reasonable for Multitransactions, but why they store standalone errors
/// is not clear.
///
/// 2) For some reason there is no 12 bytes tail (version + checksum of
/// tree) after standalone ErrorTxn.
///
/// 3) The most strange thing: In one of our production logs (about 1.2GB
/// size) we have found Multitransaction with two sub transactions: Error1
/// and Error2, both -1 OpCode. Normal Error transaction has 4 bytes length
/// (for error code), but the Error1 has 550 bytes length. What is more
/// strange, that this 550 bytes obviously was a part of Create transaction,
/// but the operation code was -1. We have added debug prints to original
/// zookeeper (3.6.3) and found that it just reads 550 bytes of this "Error"
/// transaction, tooks the first 4 bytes as an error code (it was 79, non
/// existing code) and skip all remaining 546 bytes. NOTE: it looks like a bug
/// in ZooKeeper.
///
namespace
{
Coordination::ZooKeeperRequestPtr deserializeCreateTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperCreateRequest> result = std::make_shared<Coordination::ZooKeeperCreateRequest>();
Coordination::read(result->path, in);
Coordination::read(result->data, in);
Coordination::read(result->acls, in);
Coordination::read(result->is_ephemeral, in);
result->need_to_hash_acls = false;
/// How we should use it? It should just increment on request execution
int32_t parent_c_version;
Coordination::read(parent_c_version, in);
return result;
}
Coordination::ZooKeeperRequestPtr deserializeDeleteTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperRemoveRequest> result = std::make_shared<Coordination::ZooKeeperRemoveRequest>();
Coordination::read(result->path, in);
return result;
}
Coordination::ZooKeeperRequestPtr deserializeSetTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperSetRequest> result = std::make_shared<Coordination::ZooKeeperSetRequest>();
Coordination::read(result->path, in);
Coordination::read(result->data, in);
Coordination::read(result->version, in);
/// It stores version + 1 (which should be, not for request)
result->version -= 1;
return result;
}
Coordination::ZooKeeperRequestPtr deserializeCheckVersionTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperCheckRequest> result = std::make_shared<Coordination::ZooKeeperCheckRequest>();
Coordination::read(result->path, in);
Coordination::read(result->version, in);
return result;
}
Coordination::ZooKeeperRequestPtr deserializeCreateSession(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> result = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
int32_t timeout;
Coordination::read(timeout, in);
result->session_timeout_ms = timeout;
return result;
}
Coordination::ZooKeeperRequestPtr deserializeCloseSession(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperCloseRequest> result = std::make_shared<Coordination::ZooKeeperCloseRequest>();
std::vector<std::string> data;
Coordination::read(data, in);
return result;
}
Coordination::ZooKeeperRequestPtr deserializeErrorTxn(ReadBuffer & in)
{
int32_t error;
Coordination::read(error, in);
return nullptr;
}
Coordination::ZooKeeperRequestPtr deserializeSetACLTxn(ReadBuffer & in)
{
std::shared_ptr<Coordination::ZooKeeperSetACLRequest> result = std::make_shared<Coordination::ZooKeeperSetACLRequest>();
Coordination::read(result->path, in);
Coordination::read(result->acls, in);
Coordination::read(result->version, in);
/// It stores version + 1 (which should be, not for request)
result->version -= 1;
result->need_to_hash_acls = false;
return result;
}
Coordination::ZooKeeperRequestPtr deserializeMultiTxn(ReadBuffer & in);
Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtxn)
{
int32_t type;
Coordination::read(type, in);
Coordination::ZooKeeperRequestPtr result = nullptr;
int32_t sub_txn_length = 0;
if (subtxn)
Coordination::read(sub_txn_length, in);
int64_t in_count_before = in.count();
switch (type)
{
case 1:
result = deserializeCreateTxn(in);
break;
case 2:
result = deserializeDeleteTxn(in);
break;
case 5:
result = deserializeSetTxn(in);
break;
case 7:
result = deserializeSetACLTxn(in);
break;
case 13:
result = deserializeCheckVersionTxn(in);
break;
case 14:
result = deserializeMultiTxn(in);
break;
case -10:
result = deserializeCreateSession(in);
break;
case -11:
result = deserializeCloseSession(in);
break;
case -1:
result = deserializeErrorTxn(in);
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented operation {}", type);
}
if (subtxn)
{
int64_t bytes_read = in.count() - in_count_before;
if (bytes_read < sub_txn_length)
in.ignore(sub_txn_length - bytes_read);
}
return result;
}
Coordination::ZooKeeperRequestPtr deserializeMultiTxn(ReadBuffer & in)
{
int32_t length;
Coordination::read(length, in);
std::shared_ptr<Coordination::ZooKeeperMultiRequest> result = std::make_shared<Coordination::ZooKeeperMultiRequest>();
while (length > 0)
{
auto subrequest = deserializeTxnImpl(in, true);
result->requests.push_back(subrequest);
length--;
}
return result;
}
bool isErrorRequest(Coordination::ZooKeeperRequestPtr request)
{
return request == nullptr;
}
bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
{
if (request == nullptr)
return true;
for (const auto & subrequest : dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get())->requests) //-V522
if (subrequest == nullptr)
return true;
return false;
}
}
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*log*/)
{
int64_t checksum;
Coordination::read(checksum, in);
/// Zero padding is possible until file end
if (checksum == 0)
return false;
int32_t txn_len;
Coordination::read(txn_len, in);
int64_t count_before = in.count();
int64_t session_id;
Coordination::read(session_id, in);
int32_t xid;
Coordination::read(xid, in);
int64_t zxid;
Coordination::read(zxid, in);
int64_t time;
Coordination::read(time, in);
Coordination::ZooKeeperRequestPtr request = deserializeTxnImpl(in, false);
/// Skip all other bytes
int64_t bytes_read = in.count() - count_before;
if (bytes_read < txn_len)
in.ignore(txn_len - bytes_read);
/// We don't need to apply error requests
if (isErrorRequest(request))
return true;
request->xid = xid;
if (zxid > storage.zxid)
{
/// Separate processing of session id requests
if (request->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request);
storage.getSessionID(session_id_request.session_timeout_ms);
}
else
{
/// Skip failed multirequests
if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request))
return true;
storage.processRequest(request, session_id, zxid, /* check_acl = */ false);
}
}
return true;
}
void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log)
{
ReadBufferFromFile reader(log_path);
LOG_INFO(log, "Deserializing log {}", log_path);
deserializeLogMagic(reader);
LOG_INFO(log, "Header looks OK");
size_t counter = 0;
while (!reader.eof() && deserializeTxn(storage, reader, log))
{
counter++;
if (counter % 1000 == 0)
LOG_INFO(log, "Deserialized txns log: {}", counter);
int8_t forty_two;
Coordination::read(forty_two, reader);
if (forty_two != 0x42)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Forty two check byte ({}) is not equal 0x42", forty_two);
}
LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter);
}
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log)
{
namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_logs;
for (const auto & p : fs::directory_iterator(path))
{
const auto & log_path = p.path();
if (!log_path.has_filename() || !startsWith(log_path.filename(), "log."))
continue;
int64_t zxid = getZxidFromName(log_path);
existing_logs[zxid] = p.path();
}
LOG_INFO(log, "Totally have {} logs", existing_logs.size());
for (auto [zxid, log_path] : existing_logs)
{
if (zxid > storage.zxid)
deserializeLogAndApplyToStorage(storage, log_path, log);
else
LOG_INFO(log, "Skipping log {}, it's ZXID {} is smaller than storages ZXID {}", log_path, zxid, storage.zxid);
}
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <string>
#include <Coordination/KeeperStorage.h>
#include <common/logger_useful.h>
namespace DB
{
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log);
void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log);
void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log);
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log);
}

View File

@ -4,7 +4,7 @@
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<superdigest>super:0DPiKuNIrrVmD8IUCuw1hQxNqZc=</superdigest>
<superdigest>super:xQJmxLMiHGwaqBvst5y6rkB6HQs=</superdigest>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from kazoo.client import KazooClient, KazooState
@ -300,3 +300,47 @@ def test_auth_snapshot(started_cluster):
with pytest.raises(NoAuthError):
connection2.get("/test_snapshot_acl1")
@pytest.mark.parametrize(
('get_zk'),
[
get_genuine_zk,
get_fake_zk
]
)
def test_get_set_acl(started_cluster, get_zk):
auth_connection = get_zk()
auth_connection.add_auth('digest', 'username1:secret1')
auth_connection.add_auth('digest', 'username2:secret2')
auth_connection.create("/test_set_get_acl", b"data", acl=[make_acl("auth", "", all=True)])
acls, stat = auth_connection.get_acls("/test_set_get_acl")
assert stat.aversion == 0
assert len(acls) == 2
for acl in acls:
assert acl.acl_list == ['ALL']
assert acl.id.scheme == 'digest'
assert acl.perms == 31
assert acl.id.id in ('username1:eGncMdBgOfGS/TCojt51xWsWv/Y=', 'username2:qgSSumukVlhftkVycylbHNvxhFU=')
other_auth_connection = get_zk()
other_auth_connection.add_auth('digest', 'username1:secret1')
other_auth_connection.add_auth('digest', 'username3:secret3')
other_auth_connection.set_acls("/test_set_get_acl", acls=[make_acl("auth", "", read=True, write=False, create=True, delete=True, admin=True)])
acls, stat = other_auth_connection.get_acls("/test_set_get_acl")
assert stat.aversion == 1
assert len(acls) == 2
for acl in acls:
assert acl.acl_list == ['READ', 'CREATE', 'DELETE', 'ADMIN']
assert acl.id.scheme == 'digest'
assert acl.perms == 29
assert acl.id.id in ('username1:eGncMdBgOfGS/TCojt51xWsWv/Y=', 'username3:CvWITOxxTwk+u6S5PoGlQ4hNoWI=')
with pytest.raises(KazooException):
other_auth_connection.set_acls("/test_set_get_acl", acls=[make_acl("auth", "", all=True)], version=0)

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,23 @@
<yandex>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/logs</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<snapshot_distance>75</snapshot_distance>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,284 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from kazoo.client import KazooClient, KazooState
from kazoo.security import ACL, make_digest_acl, make_acl
from kazoo.exceptions import AuthFailedError, InvalidACLError, NoAuthError, KazooException
import os
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], stay_alive=True)
def start_zookeeper():
node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh start'])
def stop_zookeeper():
node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh stop'])
def clear_zookeeper():
node.exec_in_container(['bash', '-c', 'rm -fr /zookeeper/*'])
def restart_and_clear_zookeeper():
stop_zookeeper()
clear_zookeeper()
start_zookeeper()
def clear_clickhouse_data():
node.exec_in_container(['bash', '-c', 'rm -fr /var/lib/clickhouse/coordination/logs/* /var/lib/clickhouse/coordination/snapshots/*'])
def convert_zookeeper_data():
cmd = '/usr/bin/clickhouse keeper-converter --zookeeper-logs-dir /zookeeper/version-2/ --zookeeper-snapshots-dir /zookeeper/version-2/ --output-dir /var/lib/clickhouse/coordination/snapshots'
node.exec_in_container(['bash', '-c', cmd])
def stop_clickhouse():
node.stop_clickhouse()
def start_clickhouse():
node.start_clickhouse()
def copy_zookeeper_data(make_zk_snapshots):
stop_zookeeper()
if make_zk_snapshots: # force zookeeper to create snapshot
start_zookeeper()
stop_zookeeper()
stop_clickhouse()
clear_clickhouse_data()
convert_zookeeper_data()
start_zookeeper()
start_clickhouse()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def get_genuine_zk(timeout=30.0):
_genuine_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":2181", timeout=timeout)
_genuine_zk_instance.start()
return _genuine_zk_instance
def compare_stats(stat1, stat2, path):
assert stat1.czxid == stat2.czxid, "path " + path + " cxzids not equal for stats: " + str(stat1.czxid) + " != " + str(stat2.zxid)
assert stat1.mzxid == stat2.mzxid, "path " + path + " mxzids not equal for stats: " + str(stat1.mzxid) + " != " + str(stat2.mzxid)
assert stat1.version == stat2.version, "path " + path + " versions not equal for stats: " + str(stat1.version) + " != " + str(stat2.version)
assert stat1.cversion == stat2.cversion, "path " + path + " cversions not equal for stats: " + str(stat1.cversion) + " != " + str(stat2.cversion)
assert stat1.aversion == stat2.aversion, "path " + path + " aversions not equal for stats: " + str(stat1.aversion) + " != " + str(stat2.aversion)
assert stat1.ephemeralOwner == stat2.ephemeralOwner,"path " + path + " ephemeralOwners not equal for stats: " + str(stat1.ephemeralOwner) + " != " + str(stat2.ephemeralOwner)
assert stat1.dataLength == stat2.dataLength , "path " + path + " ephemeralOwners not equal for stats: " + str(stat1.dataLength) + " != " + str(stat2.dataLength)
assert stat1.numChildren == stat2.numChildren, "path " + path + " numChildren not equal for stats: " + str(stat1.numChildren) + " != " + str(stat2.numChildren)
assert stat1.pzxid == stat2.pzxid, "path " + path + " pzxid not equal for stats: " + str(stat1.pzxid) + " != " + str(stat2.pzxid)
def compare_states(zk1, zk2, path="/"):
data1, stat1 = zk1.get(path)
data2, stat2 = zk2.get(path)
print("Left Stat", stat1)
print("Right Stat", stat2)
assert data1 == data2, "Data not equal on path " + str(path)
# both paths have strange stats
if path not in ("/", "/zookeeper"):
compare_stats(stat1, stat2, path)
first_children = list(sorted(zk1.get_children(path)))
second_children = list(sorted(zk2.get_children(path)))
print("Got children left", first_children)
print("Got children rigth", second_children)
assert first_children == second_children, "Childrens are not equal on path " + path
for children in first_children:
print("Checking child", os.path.join(path, children))
compare_states(zk1, zk2, os.path.join(path, children))
@pytest.mark.parametrize(
('create_snapshots'),
[
True, False
]
)
def test_smoke(started_cluster, create_snapshots):
restart_and_clear_zookeeper()
genuine_connection = get_genuine_zk()
genuine_connection.create("/test", b"data")
assert genuine_connection.get("/test")[0] == b"data"
copy_zookeeper_data(create_snapshots)
genuine_connection = get_genuine_zk()
fake_connection = get_fake_zk()
compare_states(genuine_connection, fake_connection)
def get_bytes(s):
return s.encode()
@pytest.mark.parametrize(
('create_snapshots'),
[
True, False
]
)
def test_simple_crud_requests(started_cluster, create_snapshots):
restart_and_clear_zookeeper()
genuine_connection = get_genuine_zk()
for i in range(100):
genuine_connection.create("/test_create" + str(i), get_bytes("data" + str(i)))
# some set queries
for i in range(10):
for j in range(i + 1):
genuine_connection.set("/test_create" + str(i), get_bytes("value" + str(j)))
for i in range(10, 20):
genuine_connection.delete("/test_create" + str(i))
path = "/test_create_deep"
for i in range(10):
genuine_connection.create(path, get_bytes("data" + str(i)))
path = os.path.join(path, str(i))
genuine_connection.create("/test_sequential", b"")
for i in range(10):
genuine_connection.create("/test_sequential/" + "a" * i + "-", get_bytes("dataX" + str(i)), sequence=True)
genuine_connection.create("/test_ephemeral", b"")
for i in range(10):
genuine_connection.create("/test_ephemeral/" + str(i), get_bytes("dataX" + str(i)), ephemeral=True)
copy_zookeeper_data(create_snapshots)
genuine_connection = get_genuine_zk()
fake_connection = get_fake_zk()
compare_states(genuine_connection, fake_connection)
# especially ensure that counters are the same
genuine_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True)
fake_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True)
first_children = list(sorted(genuine_connection.get_children("/test_sequential")))
second_children = list(sorted(fake_connection.get_children("/test_sequential")))
assert first_children == second_children, "Childrens are not equal on path " + path
@pytest.mark.parametrize(
('create_snapshots'),
[
True, False
]
)
def test_multi_and_failed_requests(started_cluster, create_snapshots):
restart_and_clear_zookeeper()
genuine_connection = get_genuine_zk()
genuine_connection.create('/test_multitransactions')
for i in range(10):
t = genuine_connection.transaction()
t.create('/test_multitransactions/freddy' + str(i), get_bytes('data' + str(i)))
t.create('/test_multitransactions/fred' + str(i), get_bytes('value' + str(i)), ephemeral=True)
t.create('/test_multitransactions/smith' + str(i), get_bytes('entity' + str(i)), sequence=True)
t.set_data('/test_multitransactions', get_bytes("somedata" + str(i)))
t.commit()
with pytest.raises(Exception):
genuine_connection.set('/test_multitransactions/freddy0', get_bytes('mustfail' + str(i)), version=1)
t = genuine_connection.transaction()
t.create('/test_bad_transaction', get_bytes('data' + str(1)))
t.check('/test_multitransactions', version=32)
t.create('/test_bad_transaction1', get_bytes('data' + str(2)))
# should fail
t.commit()
assert genuine_connection.exists('/test_bad_transaction') is None
assert genuine_connection.exists('/test_bad_transaction1') is None
t = genuine_connection.transaction()
t.create('/test_bad_transaction2', get_bytes('data' + str(1)))
t.delete('/test_multitransactions/freddy0', version=5)
# should fail
t.commit()
assert genuine_connection.exists('/test_bad_transaction2') is None
assert genuine_connection.exists('/test_multitransactions/freddy0') is not None
copy_zookeeper_data(create_snapshots)
genuine_connection = get_genuine_zk()
fake_connection = get_fake_zk()
compare_states(genuine_connection, fake_connection)
@pytest.mark.parametrize(
('create_snapshots'),
[
True, False
]
)
def test_acls(started_cluster, create_snapshots):
restart_and_clear_zookeeper()
genuine_connection = get_genuine_zk()
genuine_connection.add_auth('digest', 'user1:password1')
genuine_connection.add_auth('digest', 'user2:password2')
genuine_connection.add_auth('digest', 'user3:password3')
genuine_connection.create("/test_multi_all_acl", b"data", acl=[make_acl("auth", "", all=True)])
other_connection = get_genuine_zk()
other_connection.add_auth('digest', 'user1:password1')
other_connection.set("/test_multi_all_acl", b"X")
assert other_connection.get("/test_multi_all_acl")[0] == b"X"
yet_other_auth_connection = get_genuine_zk()
yet_other_auth_connection.add_auth('digest', 'user2:password2')
yet_other_auth_connection.set("/test_multi_all_acl", b"Y")
genuine_connection.add_auth('digest', 'user3:password3')
# just to check that we are able to deserialize it
genuine_connection.set_acls("/test_multi_all_acl", acls=[make_acl("auth", "", read=True, write=False, create=True, delete=True, admin=True)])
no_auth_connection = get_genuine_zk()
with pytest.raises(Exception):
no_auth_connection.set("/test_multi_all_acl", b"Z")
copy_zookeeper_data(create_snapshots)
genuine_connection = get_genuine_zk()
genuine_connection.add_auth('digest', 'user1:password1')
genuine_connection.add_auth('digest', 'user2:password2')
genuine_connection.add_auth('digest', 'user3:password3')
fake_connection = get_fake_zk()
fake_connection.add_auth('digest', 'user1:password1')
fake_connection.add_auth('digest', 'user2:password2')
fake_connection.add_auth('digest', 'user3:password3')
compare_states(genuine_connection, fake_connection)
for connection in [genuine_connection, fake_connection]:
acls, stat = connection.get_acls("/test_multi_all_acl")
assert stat.aversion == 1
assert len(acls) == 3
for acl in acls:
assert acl.acl_list == ['READ', 'CREATE', 'DELETE', 'ADMIN']
assert acl.id.scheme == 'digest'
assert acl.perms == 29
assert acl.id.id in ('user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI=', 'user2:lo/iTtNMP+gEZlpUNaCqLYO3i5U=', 'user3:wr5Y0kEs9nFX3bKrTMKxrlcFeWo=')