diff --git a/docker/test/integration/base/Dockerfile b/docker/test/integration/base/Dockerfile index 1c962f1bf8f..e15697da029 100644 --- a/docker/test/integration/base/Dockerfile +++ b/docker/test/integration/base/Dockerfile @@ -1,6 +1,8 @@ # docker build -t yandex/clickhouse-integration-test . FROM yandex/clickhouse-test-base +SHELL ["/bin/bash", "-c"] + RUN apt-get update \ && env DEBIAN_FRONTEND=noninteractive apt-get -y install \ tzdata \ @@ -20,7 +22,9 @@ RUN apt-get update \ krb5-user \ iproute2 \ lsof \ - g++ + g++ \ + default-jre + RUN rm -rf \ /var/lib/apt/lists/* \ /var/cache/debconf \ @@ -30,6 +34,19 @@ RUN apt-get clean # Install MySQL ODBC driver RUN curl 'https://cdn.mysql.com//Downloads/Connector-ODBC/8.0/mysql-connector-odbc-8.0.21-linux-glibc2.12-x86-64bit.tar.gz' --output 'mysql-connector.tar.gz' && tar -xzf mysql-connector.tar.gz && cd mysql-connector-odbc-8.0.21-linux-glibc2.12-x86-64bit/lib && mv * /usr/local/lib && ln -s /usr/local/lib/libmyodbc8a.so /usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so +# Unfortunately this is required for a single test for conversion data from zookeeper to clickhouse-keeper. +# ZooKeeper is not started by default, but consumes some space in containers. +# 777 perms used to allow anybody to start/stop ZooKeeper +ENV ZOOKEEPER_VERSION='3.6.3' +RUN curl -O "https://mirrors.estointernet.in/apache/zookeeper/zookeeper-${ZOOKEEPER_VERSION}/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz" +RUN tar -zxvf apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz && mv apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/zookeeper && chmod -R 777 /opt/zookeeper && rm apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz +RUN echo $'tickTime=2500 \n\ +tickTime=2500 \n\ +dataDir=/zookeeper \n\ +clientPort=2181 \n\ +maxClientCnxns=80' > /opt/zookeeper/conf/zoo.cfg +RUN mkdir /zookeeper && chmod -R 777 /zookeeper + ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone diff --git a/programs/CMakeLists.txt b/programs/CMakeLists.txt index 7f85a3fc3d7..6929bd861ed 100644 --- a/programs/CMakeLists.txt +++ b/programs/CMakeLists.txt @@ -50,11 +50,15 @@ option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories" option (ENABLE_CLICKHOUSE_KEEPER "ClickHouse alternative to ZooKeeper" ${ENABLE_CLICKHOUSE_ALL}) + +option (ENABLE_CLICKHOUSE_KEEPER_CONVERTER "Util allows to convert ZooKeeper logs and snapshots into clickhouse-keeper snapshot" ${ENABLE_CLICKHOUSE_ALL}) + if (NOT USE_NURAFT) # RECONFIGURE_MESSAGE_LEVEL should not be used here, # since USE_NURAFT is set to OFF for FreeBSD and Darwin. - message (STATUS "clickhouse-keeper will not be built (lack of NuRaft)") + message (STATUS "clickhouse-keeper and clickhouse-keeper-converter will not be built (lack of NuRaft)") set(ENABLE_CLICKHOUSE_KEEPER OFF) + set(ENABLE_CLICKHOUSE_KEEPER_CONVERTER OFF) endif() if (CLICKHOUSE_SPLIT_BINARY) @@ -150,6 +154,12 @@ else() message(STATUS "ClickHouse keeper mode: OFF") endif() +if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER) + message(STATUS "ClickHouse keeper-converter mode: ON") +else() + message(STATUS "ClickHouse keeper-converter mode: OFF") +endif() + if(NOT (MAKE_STATIC_LIBRARIES OR SPLIT_SHARED_LIBRARIES)) set(CLICKHOUSE_ONE_SHARED ON) endif() @@ -222,6 +232,10 @@ if (ENABLE_CLICKHOUSE_KEEPER) add_subdirectory (keeper) endif() +if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER) + add_subdirectory (keeper-converter) +endif() + if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) add_subdirectory (odbc-bridge) endif () @@ -231,9 +245,51 @@ if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE) endif () if (CLICKHOUSE_ONE_SHARED) - add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} ${CLICKHOUSE_KEEPER_SOURCES}) - target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK} ${CLICKHOUSE_KEEPER_LINK}) - target_include_directories(clickhouse-lib ${CLICKHOUSE_SERVER_INCLUDE} ${CLICKHOUSE_CLIENT_INCLUDE} ${CLICKHOUSE_LOCAL_INCLUDE} ${CLICKHOUSE_BENCHMARK_INCLUDE} ${CLICKHOUSE_COPIER_INCLUDE} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} ${CLICKHOUSE_COMPRESSOR_INCLUDE} ${CLICKHOUSE_FORMAT_INCLUDE} ${CLICKHOUSE_OBFUSCATOR_INCLUDE} ${CLICKHOUSE_GIT_IMPORT_INCLUDE} ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} ${CLICKHOUSE_KEEPER_INCLUDE}) + add_library(clickhouse-lib SHARED + ${CLICKHOUSE_SERVER_SOURCES} + ${CLICKHOUSE_CLIENT_SOURCES} + ${CLICKHOUSE_LOCAL_SOURCES} + ${CLICKHOUSE_BENCHMARK_SOURCES} + ${CLICKHOUSE_COPIER_SOURCES} + ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} + ${CLICKHOUSE_COMPRESSOR_SOURCES} + ${CLICKHOUSE_FORMAT_SOURCES} + ${CLICKHOUSE_OBFUSCATOR_SOURCES} + ${CLICKHOUSE_GIT_IMPORT_SOURCES} + ${CLICKHOUSE_ODBC_BRIDGE_SOURCES} + ${CLICKHOUSE_KEEPER_SOURCES} + ${CLICKHOUSE_KEEPER_CONVERTER_SOURCES}) + + target_link_libraries(clickhouse-lib + ${CLICKHOUSE_SERVER_LINK} + ${CLICKHOUSE_CLIENT_LINK} + ${CLICKHOUSE_LOCAL_LINK} + ${CLICKHOUSE_BENCHMARK_LINK} + ${CLICKHOUSE_COPIER_LINK} + ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} + ${CLICKHOUSE_COMPRESSOR_LINK} + ${CLICKHOUSE_FORMAT_LINK} + ${CLICKHOUSE_OBFUSCATOR_LINK} + ${CLICKHOUSE_GIT_IMPORT_LINK} + ${CLICKHOUSE_ODBC_BRIDGE_LINK} + ${CLICKHOUSE_KEEPER_LINK} + ${CLICKHOUSE_KEEPER_CONVERTER_LINK}) + + target_include_directories(clickhouse-lib + ${CLICKHOUSE_SERVER_INCLUDE} + ${CLICKHOUSE_CLIENT_INCLUDE} + ${CLICKHOUSE_LOCAL_INCLUDE} + ${CLICKHOUSE_BENCHMARK_INCLUDE} + ${CLICKHOUSE_COPIER_INCLUDE} + ${CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE} + ${CLICKHOUSE_COMPRESSOR_INCLUDE} + ${CLICKHOUSE_FORMAT_INCLUDE} + ${CLICKHOUSE_OBFUSCATOR_INCLUDE} + ${CLICKHOUSE_GIT_IMPORT_INCLUDE} + ${CLICKHOUSE_ODBC_BRIDGE_INCLUDE} + ${CLICKHOUSE_KEEPER_INCLUDE} + ${CLICKHOUSE_KEEPER_CONVERTER_INCLUDE}) + set_target_properties(clickhouse-lib PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse DEBUG_POSTFIX "") install (TARGETS clickhouse-lib LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT clickhouse) endif() @@ -264,6 +320,10 @@ if (CLICKHOUSE_SPLIT_BINARY) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper) endif () + if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER) + list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-keeper-converter) + endif () + set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..) add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS}) @@ -314,6 +374,9 @@ else () if (ENABLE_CLICKHOUSE_KEEPER) clickhouse_target_link_split_lib(clickhouse keeper) endif() + if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER) + clickhouse_target_link_split_lib(clickhouse keeper-converter) + endif() if (ENABLE_CLICKHOUSE_INSTALL) clickhouse_target_link_split_lib(clickhouse install) endif () @@ -374,6 +437,11 @@ else () install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-keeper) endif () + if (ENABLE_CLICKHOUSE_KEEPER_CONVERTER) + add_custom_target (clickhouse-keeper-converter ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-keeper-converter DEPENDS clickhouse) + install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-keeper-converter" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-keeper-converter) + endif () install (TARGETS clickhouse RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) diff --git a/programs/config_tools.h.in b/programs/config_tools.h.in index 50ba0c16a83..62fc076861c 100644 --- a/programs/config_tools.h.in +++ b/programs/config_tools.h.in @@ -17,3 +17,4 @@ #cmakedefine01 ENABLE_CLICKHOUSE_ODBC_BRIDGE #cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE #cmakedefine01 ENABLE_CLICKHOUSE_KEEPER +#cmakedefine01 ENABLE_CLICKHOUSE_KEEPER_CONVERTER diff --git a/programs/keeper-converter/CMakeLists.txt b/programs/keeper-converter/CMakeLists.txt new file mode 100644 index 00000000000..d529f94d388 --- /dev/null +++ b/programs/keeper-converter/CMakeLists.txt @@ -0,0 +1,9 @@ +set (CLICKHOUSE_KEEPER_CONVERTER_SOURCES KeeperConverter.cpp) + +set (CLICKHOUSE_KEEPER_CONVERTER_LINK + PRIVATE + boost::program_options + dbms +) + +clickhouse_program_add(keeper-converter) diff --git a/programs/keeper-converter/KeeperConverter.cpp b/programs/keeper-converter/KeeperConverter.cpp new file mode 100644 index 00000000000..15dbc8bd220 --- /dev/null +++ b/programs/keeper-converter/KeeperConverter.cpp @@ -0,0 +1,61 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + + +int mainEntryClickHouseKeeperConverter(int argc, char ** argv) +{ + using namespace DB; + namespace po = boost::program_options; + + po::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth()); + desc.add_options() + ("help,h", "produce help message") + ("zookeeper-logs-dir", po::value(), "Path to directory with ZooKeeper logs") + ("zookeeper-snapshots-dir", po::value(), "Path to directory with ZooKeeper snapshots") + ("output-dir", po::value(), "Directory to place output clickhouse-keeper snapshot") + ; + po::variables_map options; + po::store(po::command_line_parser(argc, argv).options(desc).run(), options); + Poco::AutoPtr console_channel(new Poco::ConsoleChannel); + + Poco::Logger * logger = &Poco::Logger::get("KeeperConverter"); + logger->setChannel(console_channel); + + if (options.count("help")) + { + std::cout << "Usage: " << argv[0] << " --zookeeper-logs-dir /var/lib/zookeeper/data/version-2 --zookeeper-snapshots-dir /var/lib/zookeeper/data/version-2 --output-dir /var/lib/clickhouse/coordination/snapshots" << std::endl; + std::cout << desc << std::endl; + return 0; + } + + try + { + DB::KeeperStorage storage(500, ""); + + DB::deserializeKeeperStorageFromSnapshotsDir(storage, options["zookeeper-snapshots-dir"].as(), logger); + DB::deserializeLogsAndApplyToStorage(storage, options["zookeeper-logs-dir"].as(), logger); + DB::SnapshotMetadataPtr snapshot_meta = std::make_shared(storage.getZXID(), 1, std::make_shared()); + DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta); + + DB::KeeperSnapshotManager manager(options["output-dir"].as(), 1); + auto snp = manager.serializeSnapshotToBuffer(snapshot); + auto path = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID()); + std::cout << "Snapshot serialized to path:" << path << std::endl; + } + catch (...) + { + std::cerr << getCurrentExceptionMessage(true) << '\n'; + return getCurrentExceptionCode(); + } + + return 0; +} diff --git a/programs/keeper-converter/clickhouse-keeper-converter.cpp b/programs/keeper-converter/clickhouse-keeper-converter.cpp new file mode 100644 index 00000000000..3cb6f99f837 --- /dev/null +++ b/programs/keeper-converter/clickhouse-keeper-converter.cpp @@ -0,0 +1,2 @@ +int mainEntryClickHouseKeeperConverter(int argc, char ** argv); +int main(int argc_, char ** argv_) { return mainEntryClickHouseKeeperConverter(argc_, argv_); } diff --git a/programs/main.cpp b/programs/main.cpp index 0e3d71c2c7b..225c1ac84de 100644 --- a/programs/main.cpp +++ b/programs/main.cpp @@ -59,6 +59,9 @@ int mainEntryClickHouseGitImport(int argc, char ** argv); #if ENABLE_CLICKHOUSE_KEEPER int mainEntryClickHouseKeeper(int argc, char ** argv); #endif +#if ENABLE_CLICKHOUSE_KEEPER +int mainEntryClickHouseKeeperConverter(int argc, char ** argv); +#endif #if ENABLE_CLICKHOUSE_INSTALL int mainEntryClickHouseInstall(int argc, char ** argv); int mainEntryClickHouseStart(int argc, char ** argv); @@ -119,6 +122,9 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_KEEPER {"keeper", mainEntryClickHouseKeeper}, #endif +#if ENABLE_CLICKHOUSE_KEEPER_CONVERTER + {"keeper-converter", mainEntryClickHouseKeeperConverter}, +#endif #if ENABLE_CLICKHOUSE_INSTALL {"install", mainEntryClickHouseInstall}, {"start", mainEntryClickHouseStart}, diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 50bdc6c77ba..1560d7a25da 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -239,6 +239,53 @@ void ZooKeeperListResponse::writeImpl(WriteBuffer & out) const Coordination::write(stat, out); } + +void ZooKeeperSetACLRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); + Coordination::write(acls, out); + Coordination::write(version, out); +} + +void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); + Coordination::read(acls, in); + Coordination::read(version, in); +} + +void ZooKeeperSetACLResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(stat, out); +} + +void ZooKeeperSetACLResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(stat, in); +} + +void ZooKeeperGetACLRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(path, in); +} + +void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(path, out); +} + +void ZooKeeperGetACLResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(acl, out); + Coordination::write(stat, out); +} + +void ZooKeeperGetACLResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(acl, in); + Coordination::read(stat, in); +} + void ZooKeeperCheckRequest::writeImpl(WriteBuffer & out) const { Coordination::write(path, out); @@ -454,6 +501,8 @@ ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return std::ma ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::make_shared(); } ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared(requests); } ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperSetACLRequest::makeResponse() const { return std::make_shared(); } +ZooKeeperResponsePtr ZooKeeperGetACLRequest::makeResponse() const { return std::make_shared(); } void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const { @@ -545,6 +594,8 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); } } diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index ced154133b5..a816c1eb8bb 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -183,6 +183,9 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest bool isReadRequest() const override { return false; } size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); } + + /// During recovery from log we don't rehash ACLs + bool need_to_hash_acls = true; }; struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse @@ -350,6 +353,48 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse size_t bytesSize() const override { return ErrorResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; +struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return OpNum::SetACL; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } + + size_t bytesSize() const override { return SetACLRequest::bytesSize() + sizeof(xid); } + + bool need_to_hash_acls = true; +}; + +struct ZooKeeperSetACLResponse final : SetACLResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return OpNum::SetACL; } + + size_t bytesSize() const override { return SetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } +}; + +struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest +{ + OpNum getOpNum() const override { return OpNum::GetACL; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return true; } + + size_t bytesSize() const override { return GetACLRequest::bytesSize() + sizeof(xid); } +}; + +struct ZooKeeperGetACLResponse final : GetACLResponse, ZooKeeperResponse +{ + void readImpl(ReadBuffer & in) override; + void writeImpl(WriteBuffer & out) const override; + OpNum getOpNum() const override { return OpNum::GetACL; } + + size_t bytesSize() const override { return GetACLResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } +}; + struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest { OpNum getOpNum() const override { return OpNum::Multi; } diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index d2dde4c4cdd..3f480fb6b2b 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -22,6 +22,8 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::Multi), static_cast(OpNum::Auth), static_cast(OpNum::SessionID), + static_cast(OpNum::SetACL), + static_cast(OpNum::GetACL), }; std::string toString(OpNum op_num) @@ -58,6 +60,10 @@ std::string toString(OpNum op_num) return "Auth"; case OpNum::SessionID: return "SessionID"; + case OpNum::SetACL: + return "SetACL"; + case OpNum::GetACL: + return "GetACL"; } int32_t raw_op = static_cast(op_num); throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED); diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index f91204693a0..ed7afd83628 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -23,6 +23,8 @@ enum class OpNum : int32_t Exists = 3, Get = 4, Set = 5, + GetACL = 6, + SetACL = 7, SimpleList = 8, Sync = 9, Heartbeat = 11, diff --git a/src/Common/ZooKeeper/ZooKeeperIO.cpp b/src/Common/ZooKeeper/ZooKeeperIO.cpp index 55448c9a109..0e0a034c633 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.cpp +++ b/src/Common/ZooKeeper/ZooKeeperIO.cpp @@ -9,6 +9,14 @@ void write(size_t x, WriteBuffer & out) writeBinary(x, out); } +#ifdef __APPLE__ +void write(uint64_t x, WriteBuffer & out) +{ + x = __builtin_bswap64(x); + writeBinary(x, out); +} +#endif + void write(int64_t x, WriteBuffer & out) { x = __builtin_bswap64(x); @@ -63,6 +71,14 @@ void write(const Error & x, WriteBuffer & out) write(static_cast(x), out); } +#ifdef __APPLE__ +void read(uint64_t & x, ReadBuffer & in) +{ + readBinary(x, in); + x = __builtin_bswap64(x); +} +#endif + void read(size_t & x, ReadBuffer & in) { readBinary(x, in); diff --git a/src/Common/ZooKeeper/ZooKeeperIO.h b/src/Common/ZooKeeper/ZooKeeperIO.h index fd47e324664..1fcb96315a5 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.h +++ b/src/Common/ZooKeeper/ZooKeeperIO.h @@ -14,6 +14,12 @@ namespace Coordination using namespace DB; void write(size_t x, WriteBuffer & out); + +/// uint64_t != size_t on darwin +#ifdef __APPLE__ +void write(uint64_t x, WriteBuffer & out); +#endif + void write(int64_t x, WriteBuffer & out); void write(int32_t x, WriteBuffer & out); void write(OpNum x, WriteBuffer & out); @@ -39,6 +45,9 @@ void write(const std::vector & arr, WriteBuffer & out) } void read(size_t & x, ReadBuffer & in); +#ifdef __APPLE__ +void read(uint64_t & x, ReadBuffer & in); +#endif void read(int64_t & x, ReadBuffer & in); void read(int32_t & x, ReadBuffer & in); void read(OpNum & x, ReadBuffer & in); diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 3575966410c..40c898efdb5 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -99,6 +99,10 @@ namespace node.acl_id = acl_map.convertACLs(acls); } + /// Some strange ACLID during deserialization from ZooKeeper + if (node.acl_id == std::numeric_limits::max()) + node.acl_id = 0; + acl_map.addUsage(node.acl_id); readBinary(node.is_sequental, in); @@ -217,12 +221,14 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, if (current_version >= SnapshotVersion::V1) { size_t acls_map_size; + readBinary(acls_map_size, in); size_t current_map_size = 0; while (current_map_size < acls_map_size) { uint64_t acl_id; readBinary(acl_id, in); + size_t acls_size; readBinary(acls_size, in); Coordination::ACLs acls; @@ -345,11 +351,23 @@ KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_ for (const auto & p : fs::directory_iterator(snapshots_path)) { - if (startsWith(p.path(), "tmp_")) /// Unfinished tmp files + const auto & path = p.path(); + + if (!path.has_filename()) + continue; + + if (startsWith(path.filename(), "tmp_")) /// Unfinished tmp files { std::filesystem::remove(p); continue; } + + /// Not snapshot file + if (!startsWith(path.filename(), "snapshot_")) + { + continue; + } + size_t snapshot_up_to = getSnapshotPathUpToLogIdx(p.path()); existing_snapshots[snapshot_up_to] = p.path(); } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 3ae29edb77a..97c78e04f05 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -57,7 +57,7 @@ static String generateDigest(const String & userdata) { std::vector user_password; boost::split(user_password, userdata, [](char c) { return c == ':'; }); - return user_password[0] + ":" + base64Encode(getSHA1(user_password[1])); + return user_password[0] + ":" + base64Encode(getSHA1(userdata)); } static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector & session_auths) @@ -77,8 +77,10 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c return true; for (const auto & session_auth : session_auths) + { if (node_acl.scheme == session_auth.scheme && node_acl.id == session_auth.id) return true; + } } } @@ -88,7 +90,8 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c static bool fixupACL( const std::vector & request_acls, const std::vector & current_ids, - std::vector & result_acls) + std::vector & result_acls, + bool hash_acls) { if (request_acls.empty()) return true; @@ -121,7 +124,8 @@ static bool fixupACL( return false; valid_found = true; - new_acl.id = generateDigest(new_acl.id); + if (hash_acls) + new_acl.id = generateDigest(new_acl.id); result_acls.push_back(new_acl); } } @@ -263,12 +267,13 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest } else { + auto & session_auth_ids = storage.session_and_auth[session_id]; KeeperStorage::Node created_node; Coordination::ACLs node_acls; - if (!fixupACL(request.acls, session_auth_ids, node_acls)) + if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls)) { response.error = Coordination::Error::ZINVALIDACL; return {response_ptr, {}}; @@ -280,6 +285,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest created_node.acl_id = acl_id; created_node.stat.czxid = zxid; created_node.stat.mzxid = zxid; + created_node.stat.pzxid = zxid; created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); created_node.stat.mtime = created_node.stat.ctime; created_node.stat.numChildren = 0; @@ -302,12 +308,15 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest } auto child_path = getBaseName(path_created); - container.updateValue(parent_path, [child_path] (KeeperStorage::Node & parent) + int64_t prev_parent_zxid; + container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid] (KeeperStorage::Node & parent) { /// Increment sequential number even if node is not sequential ++parent.seq_num; parent.children.insert(child_path); ++parent.stat.cversion; + prev_parent_zxid = parent.stat.pzxid; + parent.stat.pzxid = zxid; ++parent.stat.numChildren; }); @@ -317,7 +326,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest if (request.is_ephemeral) ephemerals[session_id].emplace(path_created); - undo = [&storage, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id] + undo = [&storage, prev_parent_zxid, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path, acl_id] { storage.container.erase(path_created); storage.acl_map.removeUsage(acl_id); @@ -325,11 +334,12 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest if (is_ephemeral) storage.ephemerals[session_id].erase(path_created); - storage.container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent) + storage.container.updateValue(parent_path, [child_path, prev_parent_zxid] (KeeperStorage::Node & undo_parent) { --undo_parent.stat.cversion; --undo_parent.stat.numChildren; --undo_parent.seq_num; + undo_parent.stat.pzxid = prev_parent_zxid; undo_parent.children.erase(child_path); }); }; @@ -536,6 +546,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest } else if (request.version == -1 || request.version == it->value.stat.version) { + auto prev_node = it->value; auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value) @@ -667,6 +678,111 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest } }; + +struct KeeperStorageSetACLRequest final : public KeeperStorageRequest +{ + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override + { + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + return checkACL(Coordination::ACL::Admin, node_acls, session_auths); + } + + using KeeperStorageRequest::KeeperStorageRequest; + + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override + { + auto & container = storage.container; + + Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Coordination::ZooKeeperSetACLResponse & response = dynamic_cast(*response_ptr); + Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); + auto it = container.find(request.path); + if (it == container.end()) + { + response.error = Coordination::Error::ZNONODE; + } + else if (request.version != -1 && request.version != it->value.stat.aversion) + { + response.error = Coordination::Error::ZBADVERSION; + } + else + { + auto & session_auth_ids = storage.session_and_auth[session_id]; + Coordination::ACLs node_acls; + + if (!fixupACL(request.acls, session_auth_ids, node_acls, request.need_to_hash_acls)) + { + response.error = Coordination::Error::ZINVALIDACL; + return {response_ptr, {}}; + } + + uint64_t acl_id = storage.acl_map.convertACLs(node_acls); + storage.acl_map.addUsage(acl_id); + + storage.container.updateValue(request.path, [acl_id] (KeeperStorage::Node & node) + { + node.acl_id = acl_id; + ++node.stat.aversion; + }); + + response.stat = it->value.stat; + response.error = Coordination::Error::ZOK; + } + + /// It cannot be used insied multitransaction? + return { response_ptr, {} }; + } +}; + +struct KeeperStorageGetACLRequest final : public KeeperStorageRequest +{ + bool checkAuth(KeeperStorage & storage, int64_t session_id) const override + { + auto & container = storage.container; + auto it = container.find(zk_request->getPath()); + if (it == container.end()) + return true; + + const auto & node_acls = storage.acl_map.convertNumber(it->value.acl_id); + if (node_acls.empty()) + return true; + + const auto & session_auths = storage.session_and_auth[session_id]; + /// LOL, GetACL require more permissions, then SetACL... + return checkACL(Coordination::ACL::Admin | Coordination::ACL::Read, node_acls, session_auths); + } + using KeeperStorageRequest::KeeperStorageRequest; + + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + { + Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); + Coordination::ZooKeeperGetACLResponse & response = dynamic_cast(*response_ptr); + Coordination::ZooKeeperGetACLRequest & request = dynamic_cast(*zk_request); + auto & container = storage.container; + auto it = container.find(request.path); + if (it == container.end()) + { + response.error = Coordination::Error::ZNONODE; + } + else + { + response.stat = it->value.stat; + response.acl = storage.acl_map.convertNumber(it->value.acl_id); + } + + return {response_ptr, {}}; + } +}; + struct KeeperStorageMultiRequest final : public KeeperStorageRequest { bool checkAuth(KeeperStorage & storage, int64_t session_id) const override @@ -893,10 +1009,12 @@ KeeperWrapperFactory::KeeperWrapperFactory() registerKeeperRequestWrapper(*this); registerKeeperRequestWrapper(*this); registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); } -KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid) +KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid, bool check_acl) { KeeperStorage::ResponsesForSessions results; if (new_last_zxid) @@ -954,7 +1072,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request); Coordination::ZooKeeperResponsePtr response; - if (!storage_request->checkAuth(*this, session_id)) + if (check_acl && !storage_request->checkAuth(*this, session_id)) { response = zk_request->makeResponse(); /// Original ZooKeeper always throws no auth, even when user provided some credentials diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 7c90a9bd661..e3cb0f59fdc 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -116,7 +116,7 @@ public: session_expiry_queue.update(session_id, session_timeout_ms); } - ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid); + ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid, bool check_acl = true); void finalize(); diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp new file mode 100644 index 00000000000..8bcce25cfee --- /dev/null +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -0,0 +1,555 @@ +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; + extern const int CORRUPTED_DATA; +} + +static String parentPath(const String & path) +{ + auto rslash_pos = path.rfind('/'); + if (rslash_pos > 0) + return path.substr(0, rslash_pos); + return "/"; +} + +static std::string getBaseName(const String & path) +{ + size_t basename_start = path.rfind('/'); + return std::string{&path[basename_start + 1], path.length() - basename_start - 1}; +} + +int64_t getZxidFromName(const std::string & filename) +{ + std::filesystem::path path(filename); + std::string extension = path.extension(); + char * end; + int64_t zxid = std::strtoul(extension.data() + 1, &end, 16); + return zxid; +} + +void deserializeSnapshotMagic(ReadBuffer & in) +{ + int32_t magic_header, version; + int64_t dbid; + Coordination::read(magic_header, in); + Coordination::read(version, in); + if (version != 2) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot deserialize ZooKeeper data other than version 2, got version {}", version); + Coordination::read(dbid, in); + static constexpr int32_t SNP_HEADER = 1514885966; /// "ZKSN" + if (magic_header != SNP_HEADER) + throw Exception(ErrorCodes::CORRUPTED_DATA ,"Incorrect magic header in file, expected {}, got {}", SNP_HEADER, magic_header); +} + +int64_t deserializeSessionAndTimeout(KeeperStorage & storage, ReadBuffer & in) +{ + int32_t count; + Coordination::read(count, in); + int64_t max_session_id = 0; + while (count > 0) + { + int64_t session_id; + int32_t timeout; + + Coordination::read(session_id, in); + Coordination::read(timeout, in); + storage.addSessionID(session_id, timeout); + max_session_id = std::max(session_id, max_session_id); + count--; + } + return max_session_id; +} + +void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in) +{ + int32_t count; + Coordination::read(count, in); + while (count > 0) + { + int64_t map_index; + Coordination::read(map_index, in); + + Coordination::ACLs acls; + int32_t acls_len; + Coordination::read(acls_len, in); + + while (acls_len > 0) + { + Coordination::ACL acl; + Coordination::read(acl.permissions, in); + Coordination::read(acl.scheme, in); + Coordination::read(acl.id, in); + acls.push_back(acl); + acls_len--; + } + storage.acl_map.addMapping(map_index, acls); + + count--; + } +} + +int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * log) +{ + int64_t max_zxid = 0; + std::string path; + Coordination::read(path, in); + size_t count = 0; + while (path != "/") + { + KeeperStorage::Node node{}; + Coordination::read(node.data, in); + Coordination::read(node.acl_id, in); + + /// Deserialize stat + Coordination::read(node.stat.czxid, in); + Coordination::read(node.stat.mzxid, in); + /// For some reason ZXID specified in filename can be smaller + /// then actual zxid from nodes. In this case we will use zxid from nodes. + max_zxid = std::max(max_zxid, node.stat.mzxid); + + Coordination::read(node.stat.ctime, in); + Coordination::read(node.stat.mtime, in); + Coordination::read(node.stat.version, in); + Coordination::read(node.stat.cversion, in); + Coordination::read(node.stat.aversion, in); + Coordination::read(node.stat.ephemeralOwner, in); + Coordination::read(node.stat.pzxid, in); + if (!path.empty()) + { + node.stat.dataLength = node.data.length(); + node.seq_num = node.stat.cversion; + storage.container.insertOrReplace(path, node); + + if (node.stat.ephemeralOwner != 0) + storage.ephemerals[node.stat.ephemeralOwner].insert(path); + + storage.acl_map.addUsage(node.acl_id); + } + Coordination::read(path, in); + count++; + if (count % 1000 == 0) + LOG_INFO(log, "Deserialized nodes from snapshot: {}", count); + } + + for (const auto & itr : storage.container) + { + if (itr.key != "/") + { + auto parent_path = parentPath(itr.key); + storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; }); + } + } + + return max_zxid; +} + +void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log) +{ + LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path); + int64_t zxid = getZxidFromName(snapshot_path); + + ReadBufferFromFile reader(snapshot_path); + + deserializeSnapshotMagic(reader); + + LOG_INFO(log, "Magic deserialized, looks OK"); + auto max_session_id = deserializeSessionAndTimeout(storage, reader); + LOG_INFO(log, "Sessions and timeouts deserialized"); + + storage.session_id_counter = max_session_id; + deserializeACLMap(storage, reader); + LOG_INFO(log, "ACLs deserialized"); + + LOG_INFO(log, "Deserializing data from snapshot"); + int64_t zxid_from_nodes = deserializeStorageData(storage, reader, log); + storage.zxid = std::max(zxid, zxid_from_nodes); + + LOG_INFO(log, "Finished, snapshot ZXID {}", storage.zxid); +} + +void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log) +{ + namespace fs = std::filesystem; + std::map existing_snapshots; + for (const auto & p : fs::directory_iterator(path)) + { + const auto & log_path = p.path(); + if (!log_path.has_filename() || !startsWith(log_path.filename(), "snapshot.")) + continue; + int64_t zxid = getZxidFromName(log_path); + existing_snapshots[zxid] = p.path(); + } + + LOG_INFO(log, "Totally have {} snapshots, will use latest", existing_snapshots.size()); + /// deserialize only from latest snapshot + if (!existing_snapshots.empty()) + deserializeKeeperStorageFromSnapshot(storage, existing_snapshots.rbegin()->second, log); + else + throw Exception(ErrorCodes::CORRUPTED_DATA, "No snapshots found on path {}. At least one snapshot must exist.", path); +} + +void deserializeLogMagic(ReadBuffer & in) +{ + int32_t magic_header, version; + int64_t dbid; + Coordination::read(magic_header, in); + Coordination::read(version, in); + Coordination::read(dbid, in); + + static constexpr int32_t LOG_HEADER = 1514884167; /// "ZKLG" + if (magic_header != LOG_HEADER) + throw Exception(ErrorCodes::CORRUPTED_DATA ,"Incorrect magic header in file, expected {}, got {}", LOG_HEADER, magic_header); + + if (version != 2) + throw Exception(ErrorCodes::NOT_IMPLEMENTED,"Cannot deserialize ZooKeeper data other than version 2, got version {}", version); +} + + +/// For some reason zookeeper stores slightly different records in log then +/// requests. For example: +/// class CreateTxn { +/// ustring path; +/// buffer data; +/// vector acl; +/// boolean ephemeral; +/// int parentCVersion; +/// } +/// But Create Request: +/// class CreateRequest { +/// ustring path; +/// buffer data; +/// vector acl; +/// int flags; +/// } +/// +/// However type is the same OpNum... +/// +/// Also there is a comment in ZooKeeper's code base about log structure, but +/// it's almost completely incorrect. Actual ZooKeeper log structure starting from version 3.6+: +/// +/// Magic Header: "ZKLG" + 4 byte version + 8 byte dbid. +/// After that goes serialized transactions, in the following format: +/// 8 byte checksum +/// 4 byte transaction length +/// 8 byte session_id (author of the transaction) +/// 4 byte user XID +/// 8 byte ZXID +/// 8 byte transaction time +/// 4 byte transaction type (OpNum) +/// [Transaction body depending on transaction type] +/// 12 bytes tail (starting from 3.6+): 4 byte version + 8 byte checksum of data tree +/// 1 byte -- 0x42 +/// +/// Transaction body is quite simple for all kinds of transactions except +/// Multitransactions. Their structure is following: +/// 4 byte sub transactions count +/// 4 byte sub transaction length +/// [Transaction body depending on transaction type] +/// and so on +/// +/// Gotchas: +/// +/// 1) For some reason ZooKeeper store ErrorTxn's in log. It's +/// reasonable for Multitransactions, but why they store standalone errors +/// is not clear. +/// +/// 2) For some reason there is no 12 bytes tail (version + checksum of +/// tree) after standalone ErrorTxn. +/// +/// 3) The most strange thing: In one of our production logs (about 1.2GB +/// size) we have found Multitransaction with two sub transactions: Error1 +/// and Error2, both -1 OpCode. Normal Error transaction has 4 bytes length +/// (for error code), but the Error1 has 550 bytes length. What is more +/// strange, that this 550 bytes obviously was a part of Create transaction, +/// but the operation code was -1. We have added debug prints to original +/// zookeeper (3.6.3) and found that it just reads 550 bytes of this "Error" +/// transaction, tooks the first 4 bytes as an error code (it was 79, non +/// existing code) and skip all remaining 546 bytes. NOTE: it looks like a bug +/// in ZooKeeper. +/// +namespace +{ + +Coordination::ZooKeeperRequestPtr deserializeCreateTxn(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + Coordination::read(result->path, in); + Coordination::read(result->data, in); + Coordination::read(result->acls, in); + Coordination::read(result->is_ephemeral, in); + result->need_to_hash_acls = false; + /// How we should use it? It should just increment on request execution + int32_t parent_c_version; + Coordination::read(parent_c_version, in); + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeDeleteTxn(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + Coordination::read(result->path, in); + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeSetTxn(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + Coordination::read(result->path, in); + Coordination::read(result->data, in); + Coordination::read(result->version, in); + /// It stores version + 1 (which should be, not for request) + result->version -= 1; + + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeCheckVersionTxn(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + Coordination::read(result->path, in); + Coordination::read(result->version, in); + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeCreateSession(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + int32_t timeout; + Coordination::read(timeout, in); + result->session_timeout_ms = timeout; + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeCloseSession(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + std::vector data; + Coordination::read(data, in); + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeErrorTxn(ReadBuffer & in) +{ + int32_t error; + Coordination::read(error, in); + return nullptr; +} + +Coordination::ZooKeeperRequestPtr deserializeSetACLTxn(ReadBuffer & in) +{ + std::shared_ptr result = std::make_shared(); + + Coordination::read(result->path, in); + Coordination::read(result->acls, in); + Coordination::read(result->version, in); + /// It stores version + 1 (which should be, not for request) + result->version -= 1; + result->need_to_hash_acls = false; + + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeMultiTxn(ReadBuffer & in); + +Coordination::ZooKeeperRequestPtr deserializeTxnImpl(ReadBuffer & in, bool subtxn) +{ + int32_t type; + Coordination::read(type, in); + Coordination::ZooKeeperRequestPtr result = nullptr; + int32_t sub_txn_length = 0; + if (subtxn) + Coordination::read(sub_txn_length, in); + + int64_t in_count_before = in.count(); + + switch (type) + { + case 1: + result = deserializeCreateTxn(in); + break; + case 2: + result = deserializeDeleteTxn(in); + break; + case 5: + result = deserializeSetTxn(in); + break; + case 7: + result = deserializeSetACLTxn(in); + break; + case 13: + result = deserializeCheckVersionTxn(in); + break; + case 14: + result = deserializeMultiTxn(in); + break; + case -10: + result = deserializeCreateSession(in); + break; + case -11: + result = deserializeCloseSession(in); + break; + case -1: + result = deserializeErrorTxn(in); + break; + default: + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented operation {}", type); + } + + if (subtxn) + { + int64_t bytes_read = in.count() - in_count_before; + if (bytes_read < sub_txn_length) + in.ignore(sub_txn_length - bytes_read); + } + + return result; +} + +Coordination::ZooKeeperRequestPtr deserializeMultiTxn(ReadBuffer & in) +{ + int32_t length; + Coordination::read(length, in); + + std::shared_ptr result = std::make_shared(); + while (length > 0) + { + auto subrequest = deserializeTxnImpl(in, true); + result->requests.push_back(subrequest); + length--; + } + return result; +} + +bool isErrorRequest(Coordination::ZooKeeperRequestPtr request) +{ + return request == nullptr; +} + +bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request) +{ + if (request == nullptr) + return true; + + for (const auto & subrequest : dynamic_cast(request.get())->requests) //-V522 + if (subrequest == nullptr) + return true; + return false; +} + +} + +bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*log*/) +{ + int64_t checksum; + Coordination::read(checksum, in); + /// Zero padding is possible until file end + if (checksum == 0) + return false; + + int32_t txn_len; + Coordination::read(txn_len, in); + int64_t count_before = in.count(); + int64_t session_id; + Coordination::read(session_id, in); + int32_t xid; + Coordination::read(xid, in); + int64_t zxid; + Coordination::read(zxid, in); + int64_t time; + Coordination::read(time, in); + + Coordination::ZooKeeperRequestPtr request = deserializeTxnImpl(in, false); + + /// Skip all other bytes + int64_t bytes_read = in.count() - count_before; + if (bytes_read < txn_len) + in.ignore(txn_len - bytes_read); + + /// We don't need to apply error requests + if (isErrorRequest(request)) + return true; + + request->xid = xid; + + if (zxid > storage.zxid) + { + /// Separate processing of session id requests + if (request->getOpNum() == Coordination::OpNum::SessionID) + { + const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast(*request); + storage.getSessionID(session_id_request.session_timeout_ms); + } + else + { + /// Skip failed multirequests + if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request)) + return true; + + storage.processRequest(request, session_id, zxid, /* check_acl = */ false); + } + } + + return true; +} + +void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log) +{ + ReadBufferFromFile reader(log_path); + + LOG_INFO(log, "Deserializing log {}", log_path); + deserializeLogMagic(reader); + LOG_INFO(log, "Header looks OK"); + + size_t counter = 0; + while (!reader.eof() && deserializeTxn(storage, reader, log)) + { + counter++; + if (counter % 1000 == 0) + LOG_INFO(log, "Deserialized txns log: {}", counter); + + int8_t forty_two; + Coordination::read(forty_two, reader); + if (forty_two != 0x42) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Forty two check byte ({}) is not equal 0x42", forty_two); + } + + LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter); +} + +void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log) +{ + namespace fs = std::filesystem; + std::map existing_logs; + for (const auto & p : fs::directory_iterator(path)) + { + const auto & log_path = p.path(); + if (!log_path.has_filename() || !startsWith(log_path.filename(), "log.")) + continue; + int64_t zxid = getZxidFromName(log_path); + existing_logs[zxid] = p.path(); + } + + LOG_INFO(log, "Totally have {} logs", existing_logs.size()); + + for (auto [zxid, log_path] : existing_logs) + { + if (zxid > storage.zxid) + deserializeLogAndApplyToStorage(storage, log_path, log); + else + LOG_INFO(log, "Skipping log {}, it's ZXID {} is smaller than storages ZXID {}", log_path, zxid, storage.zxid); + } +} + +} diff --git a/src/Coordination/ZooKeeperDataReader.h b/src/Coordination/ZooKeeperDataReader.h new file mode 100644 index 00000000000..5f26457c113 --- /dev/null +++ b/src/Coordination/ZooKeeperDataReader.h @@ -0,0 +1,17 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log); + +void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log); + +void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log); + +void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log); + +} diff --git a/tests/integration/test_keeper_auth/configs/keeper_config.xml b/tests/integration/test_keeper_auth/configs/keeper_config.xml index bb3c9a5d94a..bee3ccb0aba 100644 --- a/tests/integration/test_keeper_auth/configs/keeper_config.xml +++ b/tests/integration/test_keeper_auth/configs/keeper_config.xml @@ -4,7 +4,7 @@ 1 /var/lib/clickhouse/coordination/log /var/lib/clickhouse/coordination/snapshots - super:0DPiKuNIrrVmD8IUCuw1hQxNqZc= + super:xQJmxLMiHGwaqBvst5y6rkB6HQs= 5000 diff --git a/tests/integration/test_keeper_auth/test.py b/tests/integration/test_keeper_auth/test.py index 5f60d5b8bdb..721ccd6fddb 100644 --- a/tests/integration/test_keeper_auth/test.py +++ b/tests/integration/test_keeper_auth/test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 + import pytest from helpers.cluster import ClickHouseCluster from kazoo.client import KazooClient, KazooState @@ -300,3 +300,47 @@ def test_auth_snapshot(started_cluster): with pytest.raises(NoAuthError): connection2.get("/test_snapshot_acl1") + + +@pytest.mark.parametrize( + ('get_zk'), + [ + get_genuine_zk, + get_fake_zk + ] +) +def test_get_set_acl(started_cluster, get_zk): + auth_connection = get_zk() + auth_connection.add_auth('digest', 'username1:secret1') + auth_connection.add_auth('digest', 'username2:secret2') + + auth_connection.create("/test_set_get_acl", b"data", acl=[make_acl("auth", "", all=True)]) + + acls, stat = auth_connection.get_acls("/test_set_get_acl") + + assert stat.aversion == 0 + assert len(acls) == 2 + for acl in acls: + assert acl.acl_list == ['ALL'] + assert acl.id.scheme == 'digest' + assert acl.perms == 31 + assert acl.id.id in ('username1:eGncMdBgOfGS/TCojt51xWsWv/Y=', 'username2:qgSSumukVlhftkVycylbHNvxhFU=') + + + other_auth_connection = get_zk() + other_auth_connection.add_auth('digest', 'username1:secret1') + other_auth_connection.add_auth('digest', 'username3:secret3') + other_auth_connection.set_acls("/test_set_get_acl", acls=[make_acl("auth", "", read=True, write=False, create=True, delete=True, admin=True)]) + + acls, stat = other_auth_connection.get_acls("/test_set_get_acl") + + assert stat.aversion == 1 + assert len(acls) == 2 + for acl in acls: + assert acl.acl_list == ['READ', 'CREATE', 'DELETE', 'ADMIN'] + assert acl.id.scheme == 'digest' + assert acl.perms == 29 + assert acl.id.id in ('username1:eGncMdBgOfGS/TCojt51xWsWv/Y=', 'username3:CvWITOxxTwk+u6S5PoGlQ4hNoWI=') + + with pytest.raises(KazooException): + other_auth_connection.set_acls("/test_set_get_acl", acls=[make_acl("auth", "", all=True)], version=0) diff --git a/tests/integration/test_keeper_zookeeper_converter/__init__.py b/tests/integration/test_keeper_zookeeper_converter/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_zookeeper_converter/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_zookeeper_converter/configs/keeper_config.xml b/tests/integration/test_keeper_zookeeper_converter/configs/keeper_config.xml new file mode 100644 index 00000000000..ceaca04762e --- /dev/null +++ b/tests/integration/test_keeper_zookeeper_converter/configs/keeper_config.xml @@ -0,0 +1,23 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/logs + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + trace + 75 + + + + + 1 + localhost + 44444 + + + + diff --git a/tests/integration/test_keeper_zookeeper_converter/configs/logs_conf.xml b/tests/integration/test_keeper_zookeeper_converter/configs/logs_conf.xml new file mode 100644 index 00000000000..318a6bca95d --- /dev/null +++ b/tests/integration/test_keeper_zookeeper_converter/configs/logs_conf.xml @@ -0,0 +1,12 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_keeper_zookeeper_converter/test.py b/tests/integration/test_keeper_zookeeper_converter/test.py new file mode 100644 index 00000000000..eac2b4c45c5 --- /dev/null +++ b/tests/integration/test_keeper_zookeeper_converter/test.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +import pytest +from helpers.cluster import ClickHouseCluster +from kazoo.client import KazooClient, KazooState +from kazoo.security import ACL, make_digest_acl, make_acl +from kazoo.exceptions import AuthFailedError, InvalidACLError, NoAuthError, KazooException +import os + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance('node', main_configs=['configs/keeper_config.xml', 'configs/logs_conf.xml'], stay_alive=True) + +def start_zookeeper(): + node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh start']) + +def stop_zookeeper(): + node.exec_in_container(['bash', '-c', '/opt/zookeeper/bin/zkServer.sh stop']) + +def clear_zookeeper(): + node.exec_in_container(['bash', '-c', 'rm -fr /zookeeper/*']) + +def restart_and_clear_zookeeper(): + stop_zookeeper() + clear_zookeeper() + start_zookeeper() + +def clear_clickhouse_data(): + node.exec_in_container(['bash', '-c', 'rm -fr /var/lib/clickhouse/coordination/logs/* /var/lib/clickhouse/coordination/snapshots/*']) + +def convert_zookeeper_data(): + cmd = '/usr/bin/clickhouse keeper-converter --zookeeper-logs-dir /zookeeper/version-2/ --zookeeper-snapshots-dir /zookeeper/version-2/ --output-dir /var/lib/clickhouse/coordination/snapshots' + node.exec_in_container(['bash', '-c', cmd]) + +def stop_clickhouse(): + node.stop_clickhouse() + +def start_clickhouse(): + node.start_clickhouse() + +def copy_zookeeper_data(make_zk_snapshots): + stop_zookeeper() + + if make_zk_snapshots: # force zookeeper to create snapshot + start_zookeeper() + stop_zookeeper() + + stop_clickhouse() + clear_clickhouse_data() + convert_zookeeper_data() + start_zookeeper() + start_clickhouse() + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + +def get_fake_zk(timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + +def get_genuine_zk(timeout=30.0): + _genuine_zk_instance = KazooClient(hosts=cluster.get_instance_ip('node') + ":2181", timeout=timeout) + _genuine_zk_instance.start() + return _genuine_zk_instance + +def compare_stats(stat1, stat2, path): + assert stat1.czxid == stat2.czxid, "path " + path + " cxzids not equal for stats: " + str(stat1.czxid) + " != " + str(stat2.zxid) + assert stat1.mzxid == stat2.mzxid, "path " + path + " mxzids not equal for stats: " + str(stat1.mzxid) + " != " + str(stat2.mzxid) + assert stat1.version == stat2.version, "path " + path + " versions not equal for stats: " + str(stat1.version) + " != " + str(stat2.version) + assert stat1.cversion == stat2.cversion, "path " + path + " cversions not equal for stats: " + str(stat1.cversion) + " != " + str(stat2.cversion) + assert stat1.aversion == stat2.aversion, "path " + path + " aversions not equal for stats: " + str(stat1.aversion) + " != " + str(stat2.aversion) + assert stat1.ephemeralOwner == stat2.ephemeralOwner,"path " + path + " ephemeralOwners not equal for stats: " + str(stat1.ephemeralOwner) + " != " + str(stat2.ephemeralOwner) + assert stat1.dataLength == stat2.dataLength , "path " + path + " ephemeralOwners not equal for stats: " + str(stat1.dataLength) + " != " + str(stat2.dataLength) + assert stat1.numChildren == stat2.numChildren, "path " + path + " numChildren not equal for stats: " + str(stat1.numChildren) + " != " + str(stat2.numChildren) + assert stat1.pzxid == stat2.pzxid, "path " + path + " pzxid not equal for stats: " + str(stat1.pzxid) + " != " + str(stat2.pzxid) + +def compare_states(zk1, zk2, path="/"): + data1, stat1 = zk1.get(path) + data2, stat2 = zk2.get(path) + print("Left Stat", stat1) + print("Right Stat", stat2) + assert data1 == data2, "Data not equal on path " + str(path) + # both paths have strange stats + if path not in ("/", "/zookeeper"): + compare_stats(stat1, stat2, path) + + first_children = list(sorted(zk1.get_children(path))) + second_children = list(sorted(zk2.get_children(path))) + print("Got children left", first_children) + print("Got children rigth", second_children) + assert first_children == second_children, "Childrens are not equal on path " + path + + for children in first_children: + print("Checking child", os.path.join(path, children)) + compare_states(zk1, zk2, os.path.join(path, children)) + +@pytest.mark.parametrize( + ('create_snapshots'), + [ + True, False + ] +) +def test_smoke(started_cluster, create_snapshots): + restart_and_clear_zookeeper() + + genuine_connection = get_genuine_zk() + genuine_connection.create("/test", b"data") + + assert genuine_connection.get("/test")[0] == b"data" + + copy_zookeeper_data(create_snapshots) + + genuine_connection = get_genuine_zk() + fake_connection = get_fake_zk() + + compare_states(genuine_connection, fake_connection) + +def get_bytes(s): + return s.encode() + +@pytest.mark.parametrize( + ('create_snapshots'), + [ + True, False + ] +) +def test_simple_crud_requests(started_cluster, create_snapshots): + restart_and_clear_zookeeper() + + genuine_connection = get_genuine_zk() + for i in range(100): + genuine_connection.create("/test_create" + str(i), get_bytes("data" + str(i))) + + # some set queries + for i in range(10): + for j in range(i + 1): + genuine_connection.set("/test_create" + str(i), get_bytes("value" + str(j))) + + for i in range(10, 20): + genuine_connection.delete("/test_create" + str(i)) + + path = "/test_create_deep" + for i in range(10): + genuine_connection.create(path, get_bytes("data" + str(i))) + path = os.path.join(path, str(i)) + + + genuine_connection.create("/test_sequential", b"") + for i in range(10): + genuine_connection.create("/test_sequential/" + "a" * i + "-", get_bytes("dataX" + str(i)), sequence=True) + + genuine_connection.create("/test_ephemeral", b"") + for i in range(10): + genuine_connection.create("/test_ephemeral/" + str(i), get_bytes("dataX" + str(i)), ephemeral=True) + + copy_zookeeper_data(create_snapshots) + + genuine_connection = get_genuine_zk() + fake_connection = get_fake_zk() + + compare_states(genuine_connection, fake_connection) + + # especially ensure that counters are the same + genuine_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True) + fake_connection.create("/test_sequential/" + "a" * 10 + "-", get_bytes("dataX" + str(i)), sequence=True) + + first_children = list(sorted(genuine_connection.get_children("/test_sequential"))) + second_children = list(sorted(fake_connection.get_children("/test_sequential"))) + assert first_children == second_children, "Childrens are not equal on path " + path + +@pytest.mark.parametrize( + ('create_snapshots'), + [ + True, False + ] +) +def test_multi_and_failed_requests(started_cluster, create_snapshots): + restart_and_clear_zookeeper() + + genuine_connection = get_genuine_zk() + genuine_connection.create('/test_multitransactions') + for i in range(10): + t = genuine_connection.transaction() + t.create('/test_multitransactions/freddy' + str(i), get_bytes('data' + str(i))) + t.create('/test_multitransactions/fred' + str(i), get_bytes('value' + str(i)), ephemeral=True) + t.create('/test_multitransactions/smith' + str(i), get_bytes('entity' + str(i)), sequence=True) + t.set_data('/test_multitransactions', get_bytes("somedata" + str(i))) + t.commit() + + with pytest.raises(Exception): + genuine_connection.set('/test_multitransactions/freddy0', get_bytes('mustfail' + str(i)), version=1) + + t = genuine_connection.transaction() + + t.create('/test_bad_transaction', get_bytes('data' + str(1))) + t.check('/test_multitransactions', version=32) + t.create('/test_bad_transaction1', get_bytes('data' + str(2))) + # should fail + t.commit() + + assert genuine_connection.exists('/test_bad_transaction') is None + assert genuine_connection.exists('/test_bad_transaction1') is None + + t = genuine_connection.transaction() + t.create('/test_bad_transaction2', get_bytes('data' + str(1))) + t.delete('/test_multitransactions/freddy0', version=5) + + # should fail + t.commit() + assert genuine_connection.exists('/test_bad_transaction2') is None + assert genuine_connection.exists('/test_multitransactions/freddy0') is not None + + copy_zookeeper_data(create_snapshots) + + genuine_connection = get_genuine_zk() + fake_connection = get_fake_zk() + + compare_states(genuine_connection, fake_connection) + +@pytest.mark.parametrize( + ('create_snapshots'), + [ + True, False + ] +) +def test_acls(started_cluster, create_snapshots): + restart_and_clear_zookeeper() + genuine_connection = get_genuine_zk() + genuine_connection.add_auth('digest', 'user1:password1') + genuine_connection.add_auth('digest', 'user2:password2') + genuine_connection.add_auth('digest', 'user3:password3') + + genuine_connection.create("/test_multi_all_acl", b"data", acl=[make_acl("auth", "", all=True)]) + + other_connection = get_genuine_zk() + other_connection.add_auth('digest', 'user1:password1') + other_connection.set("/test_multi_all_acl", b"X") + assert other_connection.get("/test_multi_all_acl")[0] == b"X" + + yet_other_auth_connection = get_genuine_zk() + yet_other_auth_connection.add_auth('digest', 'user2:password2') + + yet_other_auth_connection.set("/test_multi_all_acl", b"Y") + + genuine_connection.add_auth('digest', 'user3:password3') + + # just to check that we are able to deserialize it + genuine_connection.set_acls("/test_multi_all_acl", acls=[make_acl("auth", "", read=True, write=False, create=True, delete=True, admin=True)]) + + no_auth_connection = get_genuine_zk() + + with pytest.raises(Exception): + no_auth_connection.set("/test_multi_all_acl", b"Z") + + copy_zookeeper_data(create_snapshots) + + genuine_connection = get_genuine_zk() + genuine_connection.add_auth('digest', 'user1:password1') + genuine_connection.add_auth('digest', 'user2:password2') + genuine_connection.add_auth('digest', 'user3:password3') + + fake_connection = get_fake_zk() + fake_connection.add_auth('digest', 'user1:password1') + fake_connection.add_auth('digest', 'user2:password2') + fake_connection.add_auth('digest', 'user3:password3') + + compare_states(genuine_connection, fake_connection) + + for connection in [genuine_connection, fake_connection]: + acls, stat = connection.get_acls("/test_multi_all_acl") + assert stat.aversion == 1 + assert len(acls) == 3 + for acl in acls: + assert acl.acl_list == ['READ', 'CREATE', 'DELETE', 'ADMIN'] + assert acl.id.scheme == 'digest' + assert acl.perms == 29 + assert acl.id.id in ('user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI=', 'user2:lo/iTtNMP+gEZlpUNaCqLYO3i5U=', 'user3:wr5Y0kEs9nFX3bKrTMKxrlcFeWo=')