rocksdb-cmake && fix iteration bug && improve tests

This commit is contained in:
sundy-li 2020-11-11 09:39:09 +08:00
parent 765c9eaeed
commit c56d1212a2
9 changed files with 766 additions and 42 deletions

View File

@ -322,19 +322,5 @@ if (USE_KRB5)
endif() endif()
if (USE_INTERNAL_ROCKSDB_LIBRARY) if (USE_INTERNAL_ROCKSDB_LIBRARY)
set(WITH_TESTS OFF) add_subdirectory(rocksdb-cmake)
set(WITH_BENCHMARK_TOOLS OFF)
set(WITH_TOOLS OFF)
set(WITH_GFLAGS OFF)
set(PORTABLE ON)
if (SANITIZE STREQUAL "undefined")
set(WITH_UBSAN ON)
elseif (SANITIZE STREQUAL "address")
set(WITH_ASAN ON)
elseif (SANITIZE STREQUAL "thread")
set(WITH_TSAN ON)
endif()
add_subdirectory (rocksdb)
endif() endif()

View File

@ -0,0 +1,732 @@
## this file is extracted from `contrib/rocksdb/CMakeLists.txt`
set(ROCKSDB_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb")
list(APPEND CMAKE_MODULE_PATH "${ROCKSDB_SOURCE_DIR}/cmake/modules/")
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
endif(CCACHE_FOUND)
if (SANITIZE STREQUAL "undefined")
set(WITH_UBSAN ON)
elseif (SANITIZE STREQUAL "address")
set(WITH_ASAN ON)
elseif (SANITIZE STREQUAL "thread")
set(WITH_TSAN ON)
endif()
set(PORTABLE ON)
option(WITH_JEMALLOC "build with JeMalloc" ${ENABLE_JEMALLOC})
option(WITH_SNAPPY "build with SNAPPY" ON)
option(WITH_LZ4 "build with lz4" ON)
option(WITH_ZLIB "build with zlib" ON)
option(WITH_ZSTD "build with zstd" ON)
# third-party/folly is only validated to work on Linux and Windows for now.
# So only turn it on there by default.
if(CMAKE_SYSTEM_NAME MATCHES "Linux|Windows")
if(MSVC AND MSVC_VERSION LESS 1910)
# Folly does not compile with MSVC older than VS2017
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" OFF)
else()
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" ON)
endif()
else()
option(WITH_FOLLY_DISTRIBUTED_MUTEX "build with folly::DistributedMutex" OFF)
endif()
if( NOT DEFINED CMAKE_CXX_STANDARD )
set(CMAKE_CXX_STANDARD 11)
endif()
if(MSVC)
option(WITH_XPRESS "build with windows built in compression" OFF)
include(${ROCKSDB_SOURCE_DIR}/thirdparty.inc)
else()
if(CMAKE_SYSTEM_NAME MATCHES "FreeBSD" AND NOT CMAKE_SYSTEM_NAME MATCHES "kFreeBSD")
# FreeBSD has jemalloc as default malloc
# but it does not have all the jemalloc files in include/...
set(WITH_JEMALLOC ON)
else()
if(WITH_JEMALLOC)
find_package(JeMalloc REQUIRED)
add_definitions(-DROCKSDB_JEMALLOC -DJEMALLOC_NO_DEMANGLE)
list(APPEND THIRDPARTY_LIBS JeMalloc::JeMalloc)
endif()
endif()
if(WITH_SNAPPY)
find_package(Snappy CONFIG)
if(NOT Snappy_FOUND)
find_package(Snappy REQUIRED)
endif()
add_definitions(-DSNAPPY)
list(APPEND THIRDPARTY_LIBS Snappy::snappy)
endif()
if(WITH_ZLIB)
find_package(ZLIB REQUIRED)
add_definitions(-DZLIB)
list(APPEND THIRDPARTY_LIBS ZLIB::ZLIB)
endif()
option(WITH_BZ2 "build with bzip2" OFF)
if(WITH_BZ2)
find_package(BZip2 REQUIRED)
add_definitions(-DBZIP2)
if(BZIP2_INCLUDE_DIRS)
include_directories(${BZIP2_INCLUDE_DIRS})
else()
include_directories(${BZIP2_INCLUDE_DIR})
endif()
list(APPEND THIRDPARTY_LIBS ${BZIP2_LIBRARIES})
endif()
if(WITH_LZ4)
find_package(lz4 REQUIRED)
add_definitions(-DLZ4)
list(APPEND THIRDPARTY_LIBS lz4::lz4)
endif()
if(WITH_ZSTD)
find_package(zstd REQUIRED)
add_definitions(-DZSTD)
include_directories(${ZSTD_INCLUDE_DIR})
list(APPEND THIRDPARTY_LIBS zstd::zstd)
endif()
endif()
string(TIMESTAMP TS "%Y/%m/%d %H:%M:%S" UTC)
set(GIT_DATE_TIME "${TS}" CACHE STRING "the time we first built rocksdb")
find_package(Git)
if(GIT_FOUND AND EXISTS "${ROCKSDB_SOURCE_DIR}/.git")
if(WIN32)
execute_process(COMMAND $ENV{COMSPEC} /C ${GIT_EXECUTABLE} -C ${ROCKSDB_SOURCE_DIR} rev-parse HEAD OUTPUT_VARIABLE GIT_SHA)
else()
execute_process(COMMAND ${GIT_EXECUTABLE} -C ${ROCKSDB_SOURCE_DIR} rev-parse HEAD OUTPUT_VARIABLE GIT_SHA)
endif()
else()
set(GIT_SHA 0)
endif()
string(REGEX REPLACE "[^0-9a-f]+" "" GIT_SHA "${GIT_SHA}")
set(BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/rocksdb_build_version.cc)
configure_file(${ROCKSDB_SOURCE_DIR}/util/build_version.cc.in ${BUILD_VERSION_CC} @ONLY)
add_library(rocksdb_build_version OBJECT ${BUILD_VERSION_CC})
target_include_directories(rocksdb_build_version PRIVATE
${ROCKSDB_SOURCE_DIR}/util)
if(MSVC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /Zi /nologo /EHsc /GS /Gd /GR /GF /fp:precise /Zc:wchar_t /Zc:forScope /errorReport:queue")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /FC /d2Zi+ /W4 /wd4127 /wd4800 /wd4996 /wd4351 /wd4100 /wd4204 /wd4324")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -W -Wextra -Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wsign-compare -Wshadow -Wno-unused-parameter -Wno-unused-variable -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers -Wno-strict-aliasing")
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format -fno-asynchronous-unwind-tables")
add_definitions(-D_POSIX_C_SOURCE=1)
endif()
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-momit-leaf-frame-pointer" HAVE_OMIT_LEAF_FRAME_POINTER)
if(HAVE_OMIT_LEAF_FRAME_POINTER)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -momit-leaf-frame-pointer")
endif()
endif()
endif()
include(CheckCCompilerFlag)
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
CHECK_C_COMPILER_FLAG("-mcpu=power9" HAS_POWER9)
if(HAS_POWER9)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mcpu=power9 -mtune=power9")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcpu=power9 -mtune=power9")
else()
CHECK_C_COMPILER_FLAG("-mcpu=power8" HAS_POWER8)
if(HAS_POWER8)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mcpu=power8 -mtune=power8")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcpu=power8 -mtune=power8")
endif(HAS_POWER8)
endif(HAS_POWER9)
CHECK_C_COMPILER_FLAG("-maltivec" HAS_ALTIVEC)
if(HAS_ALTIVEC)
message(STATUS " HAS_ALTIVEC yes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -maltivec")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -maltivec")
endif(HAS_ALTIVEC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
CHECK_C_COMPILER_FLAG("-march=armv8-a+crc+crypto" HAS_ARMV8_CRC)
if(HAS_ARMV8_CRC)
message(STATUS " HAS_ARMV8_CRC yes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
endif(HAS_ARMV8_CRC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
include(CheckCXXSourceCompiles)
if(NOT MSVC)
set(CMAKE_REQUIRED_FLAGS "-msse4.2 -mpclmul")
endif()
CHECK_CXX_SOURCE_COMPILES("
#include <cstdint>
#include <nmmintrin.h>
#include <wmmintrin.h>
int main() {
volatile uint32_t x = _mm_crc32_u32(0, 0);
const auto a = _mm_set_epi64x(0, 0);
const auto b = _mm_set_epi64x(0, 0);
const auto c = _mm_clmulepi64_si128(a, b, 0x00);
auto d = _mm_cvtsi128_si64(c);
}
" HAVE_SSE42)
unset(CMAKE_REQUIRED_FLAGS)
if(HAVE_SSE42)
add_definitions(-DHAVE_SSE42)
add_definitions(-DHAVE_PCLMUL)
elseif(FORCE_SSE42)
message(FATAL_ERROR "FORCE_SSE42=ON but unable to compile with SSE4.2 enabled")
endif()
CHECK_CXX_SOURCE_COMPILES("
#if defined(_MSC_VER) && !defined(__thread)
#define __thread __declspec(thread)
#endif
int main() {
static __thread int tls;
}
" HAVE_THREAD_LOCAL)
if(HAVE_THREAD_LOCAL)
add_definitions(-DROCKSDB_SUPPORT_THREAD_LOCAL)
endif()
option(FAIL_ON_WARNINGS "Treat compile warnings as errors" ON)
if(FAIL_ON_WARNINGS)
if(MSVC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /WX")
else() # assume GCC
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
endif()
endif()
option(WITH_ASAN "build with ASAN" OFF)
if(WITH_ASAN)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address")
if(WITH_JEMALLOC)
message(FATAL "ASAN does not work well with JeMalloc")
endif()
endif()
option(WITH_TSAN "build with TSAN" OFF)
if(WITH_TSAN)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -pie")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fPIC")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=thread -fPIC")
if(WITH_JEMALLOC)
message(FATAL "TSAN does not work well with JeMalloc")
endif()
endif()
option(WITH_UBSAN "build with UBSAN" OFF)
if(WITH_UBSAN)
add_definitions(-DROCKSDB_UBSAN_RUN)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=undefined")
if(WITH_JEMALLOC)
message(FATAL "UBSAN does not work well with JeMalloc")
endif()
endif()
option(WITH_NUMA "build with NUMA policy support" OFF)
if(WITH_NUMA)
find_package(NUMA REQUIRED)
add_definitions(-DNUMA)
include_directories(${NUMA_INCLUDE_DIR})
list(APPEND THIRDPARTY_LIBS NUMA::NUMA)
endif()
option(WITH_TBB "build with Threading Building Blocks (TBB)" OFF)
if(WITH_TBB)
find_package(TBB REQUIRED)
add_definitions(-DTBB)
list(APPEND THIRDPARTY_LIBS TBB::TBB)
endif()
# Stall notifications eat some performance from inserts
option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF)
if(DISABLE_STALL_NOTIF)
add_definitions(-DROCKSDB_DISABLE_STALL_NOTIFICATION)
endif()
option(WITH_DYNAMIC_EXTENSION "build with dynamic extension support" OFF)
if(NOT WITH_DYNAMIC_EXTENSION)
add_definitions(-DROCKSDB_NO_DYNAMIC_EXTENSION)
endif()
if(DEFINED USE_RTTI)
if(USE_RTTI)
message(STATUS "Enabling RTTI")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
else()
if(MSVC)
message(STATUS "Disabling RTTI in Release builds. Always on in Debug.")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
message(STATUS "Disabling RTTI in Release builds")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
else()
message(STATUS "Enabling RTTI in Debug builds only (default)")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
if(MSVC)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
if(CMAKE_SYSTEM_NAME MATCHES "Cygwin")
add_definitions(-fno-builtin-memcmp -DCYGWIN)
elseif(CMAKE_SYSTEM_NAME MATCHES "Darwin")
add_definitions(-DOS_MACOSX)
if(CMAKE_SYSTEM_PROCESSOR MATCHES arm)
add_definitions(-DIOS_CROSS_COMPILE -DROCKSDB_LITE)
# no debug info for IOS, that will make our library big
add_definitions(-DNDEBUG)
endif()
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_definitions(-DOS_LINUX)
elseif(CMAKE_SYSTEM_NAME MATCHES "SunOS")
add_definitions(-DOS_SOLARIS)
elseif(CMAKE_SYSTEM_NAME MATCHES "kFreeBSD")
add_definitions(-DOS_GNU_KFREEBSD)
elseif(CMAKE_SYSTEM_NAME MATCHES "FreeBSD")
add_definitions(-DOS_FREEBSD)
elseif(CMAKE_SYSTEM_NAME MATCHES "NetBSD")
add_definitions(-DOS_NETBSD)
elseif(CMAKE_SYSTEM_NAME MATCHES "OpenBSD")
add_definitions(-DOS_OPENBSD)
elseif(CMAKE_SYSTEM_NAME MATCHES "DragonFly")
add_definitions(-DOS_DRAGONFLYBSD)
elseif(CMAKE_SYSTEM_NAME MATCHES "Android")
add_definitions(-DOS_ANDROID)
elseif(CMAKE_SYSTEM_NAME MATCHES "Windows")
add_definitions(-DWIN32 -DOS_WIN -D_MBCS -DWIN64 -DNOMINMAX)
if(MINGW)
add_definitions(-D_WIN32_WINNT=_WIN32_WINNT_VISTA)
endif()
endif()
if(NOT WIN32)
add_definitions(-DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX)
endif()
option(WITH_FALLOCATE "build with fallocate" ON)
if(WITH_FALLOCATE)
CHECK_CXX_SOURCE_COMPILES("
#include <fcntl.h>
#include <linux/falloc.h>
int main() {
int fd = open(\"/dev/null\", 0);
fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 1024);
}
" HAVE_FALLOCATE)
if(HAVE_FALLOCATE)
add_definitions(-DROCKSDB_FALLOCATE_PRESENT)
endif()
endif()
CHECK_CXX_SOURCE_COMPILES("
#include <fcntl.h>
int main() {
int fd = open(\"/dev/null\", 0);
sync_file_range(fd, 0, 1024, SYNC_FILE_RANGE_WRITE);
}
" HAVE_SYNC_FILE_RANGE_WRITE)
if(HAVE_SYNC_FILE_RANGE_WRITE)
add_definitions(-DROCKSDB_RANGESYNC_PRESENT)
endif()
CHECK_CXX_SOURCE_COMPILES("
#include <pthread.h>
int main() {
(void) PTHREAD_MUTEX_ADAPTIVE_NP;
}
" HAVE_PTHREAD_MUTEX_ADAPTIVE_NP)
if(HAVE_PTHREAD_MUTEX_ADAPTIVE_NP)
add_definitions(-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX)
endif()
include(CheckCXXSymbolExists)
if(CMAKE_SYSTEM_NAME MATCHES "^FreeBSD")
check_cxx_symbol_exists(malloc_usable_size ${ROCKSDB_SOURCE_DIR}/malloc_np.h HAVE_MALLOC_USABLE_SIZE)
else()
check_cxx_symbol_exists(malloc_usable_size ${ROCKSDB_SOURCE_DIR}/malloc.h HAVE_MALLOC_USABLE_SIZE)
endif()
if(HAVE_MALLOC_USABLE_SIZE)
add_definitions(-DROCKSDB_MALLOC_USABLE_SIZE)
endif()
check_cxx_symbol_exists(sched_getcpu sched.h HAVE_SCHED_GETCPU)
if(HAVE_SCHED_GETCPU)
add_definitions(-DROCKSDB_SCHED_GETCPU_PRESENT)
endif()
check_cxx_symbol_exists(getauxval auvx.h HAVE_AUXV_GETAUXVAL)
if(HAVE_AUXV_GETAUXVAL)
add_definitions(-DROCKSDB_AUXV_GETAUXVAL_PRESENT)
endif()
include_directories(${ROCKSDB_SOURCE_DIR})
include_directories(${ROCKSDB_SOURCE_DIR}/include)
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
include_directories(${ROCKSDB_SOURCE_DIR}/third-party/folly)
endif()
find_package(Threads REQUIRED)
# Main library source code
set(SOURCES
${ROCKSDB_SOURCE_DIR}/cache/cache.cc
${ROCKSDB_SOURCE_DIR}/cache/clock_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/lru_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/sharded_cache.cc
${ROCKSDB_SOURCE_DIR}/db/arena_wrapped_db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_addition.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_builder.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_garbage.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_meta.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_format.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_reader.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_log_writer.cc
${ROCKSDB_SOURCE_DIR}/db/builder.cc
${ROCKSDB_SOURCE_DIR}/db/c.cc
${ROCKSDB_SOURCE_DIR}/db/column_family.cc
${ROCKSDB_SOURCE_DIR}/db/compacted_db_impl.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_iterator.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_job.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_fifo.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_level.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_picker_universal.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/sst_partitioner.cc
${ROCKSDB_SOURCE_DIR}/db/convenience.cc
${ROCKSDB_SOURCE_DIR}/db/db_filesnapshot.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_write.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_compaction_flush.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_files.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_open.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_debug.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_experimental.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_readonly.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_secondary.cc
${ROCKSDB_SOURCE_DIR}/db/db_info_dumper.cc
${ROCKSDB_SOURCE_DIR}/db/db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/dbformat.cc
${ROCKSDB_SOURCE_DIR}/db/error_handler.cc
${ROCKSDB_SOURCE_DIR}/db/event_helpers.cc
${ROCKSDB_SOURCE_DIR}/db/experimental.cc
${ROCKSDB_SOURCE_DIR}/db/external_sst_file_ingestion_job.cc
${ROCKSDB_SOURCE_DIR}/db/file_indexer.cc
${ROCKSDB_SOURCE_DIR}/db/flush_job.cc
${ROCKSDB_SOURCE_DIR}/db/flush_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/forward_iterator.cc
${ROCKSDB_SOURCE_DIR}/db/import_column_family_job.cc
${ROCKSDB_SOURCE_DIR}/db/internal_stats.cc
${ROCKSDB_SOURCE_DIR}/db/logs_with_prep_tracker.cc
${ROCKSDB_SOURCE_DIR}/db/log_reader.cc
${ROCKSDB_SOURCE_DIR}/db/log_writer.cc
${ROCKSDB_SOURCE_DIR}/db/malloc_stats.cc
${ROCKSDB_SOURCE_DIR}/db/memtable.cc
${ROCKSDB_SOURCE_DIR}/db/memtable_list.cc
${ROCKSDB_SOURCE_DIR}/db/merge_helper.cc
${ROCKSDB_SOURCE_DIR}/db/merge_operator.cc
${ROCKSDB_SOURCE_DIR}/db/range_del_aggregator.cc
${ROCKSDB_SOURCE_DIR}/db/range_tombstone_fragmenter.cc
${ROCKSDB_SOURCE_DIR}/db/repair.cc
${ROCKSDB_SOURCE_DIR}/db/snapshot_impl.cc
${ROCKSDB_SOURCE_DIR}/db/table_cache.cc
${ROCKSDB_SOURCE_DIR}/db/table_properties_collector.cc
${ROCKSDB_SOURCE_DIR}/db/transaction_log_impl.cc
${ROCKSDB_SOURCE_DIR}/db/trim_history_scheduler.cc
${ROCKSDB_SOURCE_DIR}/db/version_builder.cc
${ROCKSDB_SOURCE_DIR}/db/version_edit.cc
${ROCKSDB_SOURCE_DIR}/db/version_edit_handler.cc
${ROCKSDB_SOURCE_DIR}/db/version_set.cc
${ROCKSDB_SOURCE_DIR}/db/wal_edit.cc
${ROCKSDB_SOURCE_DIR}/db/wal_manager.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch.cc
${ROCKSDB_SOURCE_DIR}/db/write_batch_base.cc
${ROCKSDB_SOURCE_DIR}/db/write_controller.cc
${ROCKSDB_SOURCE_DIR}/db/write_thread.cc
${ROCKSDB_SOURCE_DIR}/env/env.cc
${ROCKSDB_SOURCE_DIR}/env/env_chroot.cc
${ROCKSDB_SOURCE_DIR}/env/env_encryption.cc
${ROCKSDB_SOURCE_DIR}/env/env_hdfs.cc
${ROCKSDB_SOURCE_DIR}/env/file_system.cc
${ROCKSDB_SOURCE_DIR}/env/file_system_tracer.cc
${ROCKSDB_SOURCE_DIR}/env/mock_env.cc
${ROCKSDB_SOURCE_DIR}/file/delete_scheduler.cc
${ROCKSDB_SOURCE_DIR}/file/file_prefetch_buffer.cc
${ROCKSDB_SOURCE_DIR}/file/file_util.cc
${ROCKSDB_SOURCE_DIR}/file/filename.cc
${ROCKSDB_SOURCE_DIR}/file/random_access_file_reader.cc
${ROCKSDB_SOURCE_DIR}/file/read_write_util.cc
${ROCKSDB_SOURCE_DIR}/file/readahead_raf.cc
${ROCKSDB_SOURCE_DIR}/file/sequence_file_reader.cc
${ROCKSDB_SOURCE_DIR}/file/sst_file_manager_impl.cc
${ROCKSDB_SOURCE_DIR}/file/writable_file_writer.cc
${ROCKSDB_SOURCE_DIR}/logging/auto_roll_logger.cc
${ROCKSDB_SOURCE_DIR}/logging/event_logger.cc
${ROCKSDB_SOURCE_DIR}/logging/log_buffer.cc
${ROCKSDB_SOURCE_DIR}/memory/arena.cc
${ROCKSDB_SOURCE_DIR}/memory/concurrent_arena.cc
${ROCKSDB_SOURCE_DIR}/memory/jemalloc_nodump_allocator.cc
${ROCKSDB_SOURCE_DIR}/memory/memkind_kmem_allocator.cc
${ROCKSDB_SOURCE_DIR}/memtable/alloc_tracker.cc
${ROCKSDB_SOURCE_DIR}/memtable/hash_linklist_rep.cc
${ROCKSDB_SOURCE_DIR}/memtable/hash_skiplist_rep.cc
${ROCKSDB_SOURCE_DIR}/memtable/skiplistrep.cc
${ROCKSDB_SOURCE_DIR}/memtable/vectorrep.cc
${ROCKSDB_SOURCE_DIR}/memtable/write_buffer_manager.cc
${ROCKSDB_SOURCE_DIR}/monitoring/histogram.cc
${ROCKSDB_SOURCE_DIR}/monitoring/histogram_windowing.cc
${ROCKSDB_SOURCE_DIR}/monitoring/in_memory_stats_history.cc
${ROCKSDB_SOURCE_DIR}/monitoring/instrumented_mutex.cc
${ROCKSDB_SOURCE_DIR}/monitoring/iostats_context.cc
${ROCKSDB_SOURCE_DIR}/monitoring/perf_context.cc
${ROCKSDB_SOURCE_DIR}/monitoring/perf_level.cc
${ROCKSDB_SOURCE_DIR}/monitoring/persistent_stats_history.cc
${ROCKSDB_SOURCE_DIR}/monitoring/statistics.cc
${ROCKSDB_SOURCE_DIR}/monitoring/stats_dump_scheduler.cc
${ROCKSDB_SOURCE_DIR}/monitoring/thread_status_impl.cc
${ROCKSDB_SOURCE_DIR}/monitoring/thread_status_updater.cc
${ROCKSDB_SOURCE_DIR}/monitoring/thread_status_util.cc
${ROCKSDB_SOURCE_DIR}/monitoring/thread_status_util_debug.cc
${ROCKSDB_SOURCE_DIR}/options/cf_options.cc
${ROCKSDB_SOURCE_DIR}/options/db_options.cc
${ROCKSDB_SOURCE_DIR}/options/options.cc
${ROCKSDB_SOURCE_DIR}/options/options_helper.cc
${ROCKSDB_SOURCE_DIR}/options/options_parser.cc
${ROCKSDB_SOURCE_DIR}/port/stack_trace.cc
${ROCKSDB_SOURCE_DIR}/table/adaptive/adaptive_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/binary_search_index_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_iterator.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_based_table_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefetcher.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/block_prefix_index.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/data_block_hash_index.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/data_block_footer.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/filter_block_reader_common.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/filter_policy.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/flush_block_policy.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/full_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/hash_index_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/index_builder.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/index_reader_common.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/parsed_full_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/partitioned_filter_block.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/partitioned_index_iterator.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/partitioned_index_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/reader_common.cc
${ROCKSDB_SOURCE_DIR}/table/block_based/uncompression_dict_reader.cc
${ROCKSDB_SOURCE_DIR}/table/block_fetcher.cc
${ROCKSDB_SOURCE_DIR}/table/cuckoo/cuckoo_table_builder.cc
${ROCKSDB_SOURCE_DIR}/table/cuckoo/cuckoo_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/cuckoo/cuckoo_table_reader.cc
${ROCKSDB_SOURCE_DIR}/table/format.cc
${ROCKSDB_SOURCE_DIR}/table/get_context.cc
${ROCKSDB_SOURCE_DIR}/table/iterator.cc
${ROCKSDB_SOURCE_DIR}/table/merging_iterator.cc
${ROCKSDB_SOURCE_DIR}/table/meta_blocks.cc
${ROCKSDB_SOURCE_DIR}/table/persistent_cache_helper.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_bloom.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_builder.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_factory.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_index.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_key_coding.cc
${ROCKSDB_SOURCE_DIR}/table/plain/plain_table_reader.cc
${ROCKSDB_SOURCE_DIR}/table/sst_file_dumper.cc
${ROCKSDB_SOURCE_DIR}/table/sst_file_reader.cc
${ROCKSDB_SOURCE_DIR}/table/sst_file_writer.cc
${ROCKSDB_SOURCE_DIR}/table/table_properties.cc
${ROCKSDB_SOURCE_DIR}/table/two_level_iterator.cc
${ROCKSDB_SOURCE_DIR}/test_util/sync_point.cc
${ROCKSDB_SOURCE_DIR}/test_util/sync_point_impl.cc
${ROCKSDB_SOURCE_DIR}/test_util/testutil.cc
${ROCKSDB_SOURCE_DIR}/test_util/transaction_test_util.cc
${ROCKSDB_SOURCE_DIR}/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
${ROCKSDB_SOURCE_DIR}/tools/dump/db_dump_tool.cc
${ROCKSDB_SOURCE_DIR}/tools/ldb_cmd.cc
${ROCKSDB_SOURCE_DIR}/tools/ldb_tool.cc
${ROCKSDB_SOURCE_DIR}/tools/sst_dump_tool.cc
${ROCKSDB_SOURCE_DIR}/tools/trace_analyzer_tool.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/trace_replay.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/block_cache_tracer.cc
${ROCKSDB_SOURCE_DIR}/trace_replay/io_tracer.cc
${ROCKSDB_SOURCE_DIR}/util/coding.cc
${ROCKSDB_SOURCE_DIR}/util/compaction_job_stats_impl.cc
${ROCKSDB_SOURCE_DIR}/util/comparator.cc
${ROCKSDB_SOURCE_DIR}/util/compression_context_cache.cc
${ROCKSDB_SOURCE_DIR}/util/concurrent_task_limiter_impl.cc
${ROCKSDB_SOURCE_DIR}/util/crc32c.cc
${ROCKSDB_SOURCE_DIR}/util/dynamic_bloom.cc
${ROCKSDB_SOURCE_DIR}/util/hash.cc
${ROCKSDB_SOURCE_DIR}/util/murmurhash.cc
${ROCKSDB_SOURCE_DIR}/util/random.cc
${ROCKSDB_SOURCE_DIR}/util/rate_limiter.cc
${ROCKSDB_SOURCE_DIR}/util/slice.cc
${ROCKSDB_SOURCE_DIR}/util/file_checksum_helper.cc
${ROCKSDB_SOURCE_DIR}/util/status.cc
${ROCKSDB_SOURCE_DIR}/util/string_util.cc
${ROCKSDB_SOURCE_DIR}/util/thread_local.cc
${ROCKSDB_SOURCE_DIR}/util/threadpool_imp.cc
${ROCKSDB_SOURCE_DIR}/util/xxhash.cc
${ROCKSDB_SOURCE_DIR}/utilities/backupable/backupable_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_compaction_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_db_impl_filesnapshot.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_dump_tool.cc
${ROCKSDB_SOURCE_DIR}/utilities/blob_db/blob_file.cc
${ROCKSDB_SOURCE_DIR}/utilities/cassandra/cassandra_compaction_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/cassandra/format.cc
${ROCKSDB_SOURCE_DIR}/utilities/cassandra/merge_operator.cc
${ROCKSDB_SOURCE_DIR}/utilities/checkpoint/checkpoint_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
${ROCKSDB_SOURCE_DIR}/utilities/debug.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_mirror.cc
${ROCKSDB_SOURCE_DIR}/utilities/env_timed.cc
${ROCKSDB_SOURCE_DIR}/utilities/fault_injection_env.cc
${ROCKSDB_SOURCE_DIR}/utilities/fault_injection_fs.cc
${ROCKSDB_SOURCE_DIR}/utilities/leveldb_options/leveldb_options.cc
${ROCKSDB_SOURCE_DIR}/utilities/memory/memory_util.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/bytesxor.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/max.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/put.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/sortlist.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/string_append/stringappend.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/string_append/stringappend2.cc
${ROCKSDB_SOURCE_DIR}/utilities/merge_operators/uint64add.cc
${ROCKSDB_SOURCE_DIR}/utilities/object_registry.cc
${ROCKSDB_SOURCE_DIR}/utilities/option_change_migration/option_change_migration.cc
${ROCKSDB_SOURCE_DIR}/utilities/options/options_util.cc
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/block_cache_tier.cc
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/block_cache_tier_file.cc
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/block_cache_tier_metadata.cc
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/persistent_cache_tier.cc
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/volatile_tier_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/cache_simulator.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/sim_cache.cc
${ROCKSDB_SOURCE_DIR}/utilities/table_properties_collectors/compact_on_deletion_collector.cc
${ROCKSDB_SOURCE_DIR}/utilities/trace/file_trace_reader_writer.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/lock/lock_tracker.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/lock/point_lock_tracker.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/optimistic_transaction_db_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/optimistic_transaction.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/pessimistic_transaction.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/pessimistic_transaction_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/snapshot_checker.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/transaction_base.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/transaction_db_mutex_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/transaction_lock_mgr.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/transaction_util.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_prepared_txn.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_prepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/ttl/db_ttl_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/write_batch_with_index/write_batch_with_index.cc
${ROCKSDB_SOURCE_DIR}/utilities/write_batch_with_index/write_batch_with_index_internal.cc
$<TARGET_OBJECTS:rocksdb_build_version>)
if(HAVE_SSE42 AND NOT MSVC)
set_source_files_properties(
${ROCKSDB_SOURCE_DIR}/util/crc32c.cc
PROPERTIES COMPILE_FLAGS "-msse4.2 -mpclmul")
endif()
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/util/crc32c_ppc.c
${ROCKSDB_SOURCE_DIR}/util/crc32c_ppc_asm.S)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
if(HAS_ARMV8_CRC)
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/util/crc32c_arm64.cc)
endif(HAS_ARMV8_CRC)
if(WIN32)
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/port/win/io_win.cc
${ROCKSDB_SOURCE_DIR}/port/win/env_win.cc
${ROCKSDB_SOURCE_DIR}/port/win/env_default.cc
${ROCKSDB_SOURCE_DIR}/port/win/port_win.cc
${ROCKSDB_SOURCE_DIR}/port/win/win_logger.cc)
if(NOT MINGW)
# Mingw only supports std::thread when using
# posix threads.
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/port/win/win_thread.cc)
endif()
if(WITH_XPRESS)
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/port/win/xpress_win.cc)
endif()
if(WITH_JEMALLOC)
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/port/win/win_jemalloc.cc)
endif()
else()
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/port/port_posix.cc
${ROCKSDB_SOURCE_DIR}/env/env_posix.cc
${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc
${ROCKSDB_SOURCE_DIR}/env/io_posix.cc)
endif()
if(WITH_FOLLY_DISTRIBUTED_MUTEX)
list(APPEND SOURCES
${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/detail/Futex.cpp
${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/AtomicNotification.cpp
${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/DistributedMutex.cpp
${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/ParkingLot.cpp
${ROCKSDB_SOURCE_DIR}/third-party/folly/folly/synchronization/WaitOptions.cpp)
endif()
set(ROCKSDB_STATIC_LIB rocksdb)
if(WIN32)
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
else()
set(SYSTEM_LIBS ${CMAKE_THREAD_LIBS_INIT})
endif()
add_library(${ROCKSDB_STATIC_LIB} STATIC ${SOURCES})
target_link_libraries(${ROCKSDB_STATIC_LIB} PRIVATE
${THIRDPARTY_LIBS} ${SYSTEM_LIBS})

View File

@ -1,6 +1,5 @@
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h> #include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <rocksdb/db.h> #include <rocksdb/db.h>
@ -14,13 +13,11 @@ namespace ErrorCodes
extern const int ROCKSDB_ERROR; extern const int ROCKSDB_ERROR;
} }
class StorageEmbeddedRocksDB;
EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream( EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream(
StorageEmbeddedRocksDB & storage_, StorageEmbeddedRocksDB & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_)
const StorageMetadataPtr & metadata_snapshot_, : storage(storage_), metadata_snapshot(metadata_snapshot_), max_block_size(max_block_size_)
size_t max_block_size_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_block_size(max_block_size_)
{ {
sample_block = metadata_snapshot->getSampleBlock(); sample_block = metadata_snapshot->getSampleBlock();
primary_key_pos = sample_block.getPositionByName(storage.primary_key); primary_key_pos = sample_block.getPositionByName(storage.primary_key);
@ -39,8 +36,7 @@ Block EmbeddedRocksDBBlockInputStream::readImpl()
MutableColumns columns = sample_block.cloneEmptyColumns(); MutableColumns columns = sample_block.cloneEmptyColumns();
size_t rows = 0; for (size_t rows = 0; iterator->Valid() && rows < max_block_size; ++rows, iterator->Next())
for (; iterator->Valid(); iterator->Next())
{ {
ReadBufferFromString key_buffer(iterator->key()); ReadBufferFromString key_buffer(iterator->key());
ReadBufferFromString value_buffer(iterator->value()); ReadBufferFromString value_buffer(iterator->value());
@ -51,9 +47,6 @@ Block EmbeddedRocksDBBlockInputStream::readImpl()
elem.type->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer); elem.type->deserializeBinary(*columns[idx], idx == primary_key_pos ? key_buffer : value_buffer);
++idx; ++idx;
} }
++rows;
if (rows >= max_block_size)
break;
} }
finished = !iterator->Valid(); finished = !iterator->Valid();

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
namespace rocksdb namespace rocksdb
@ -11,8 +12,6 @@ namespace rocksdb
namespace DB namespace DB
{ {
class StorageEmbeddedRocksDB;
class EmbeddedRocksDBBlockInputStream : public IBlockInputStream class EmbeddedRocksDBBlockInputStream : public IBlockInputStream
{ {

View File

@ -1,5 +1,4 @@
#include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h> #include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <rocksdb/db.h> #include <rocksdb/db.h>
@ -7,12 +6,13 @@
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ROCKSDB_ERROR; extern const int ROCKSDB_ERROR;
} }
class StorageEmbeddedRocksDB;
EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream( EmbeddedRocksDBBlockOutputStream::EmbeddedRocksDBBlockOutputStream(
StorageEmbeddedRocksDB & storage_, StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_) const StorageMetadataPtr & metadata_snapshot_)
@ -42,6 +42,7 @@ void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
WriteBufferFromOwnString wb_value; WriteBufferFromOwnString wb_value;
rocksdb::WriteBatch batch; rocksdb::WriteBatch batch;
rocksdb::Status status;
for (size_t i = 0; i < rows; i++) for (size_t i = 0; i < rows; i++)
{ {
wb_key.restart(); wb_key.restart();
@ -53,10 +54,12 @@ void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
elem.type->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value); elem.type->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value);
++idx; ++idx;
} }
batch.Put(wb_key.str(), wb_value.str()); status = batch.Put(wb_key.str(), wb_value.str());
if (!status.ok())
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR);
} }
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch); status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) if (!status.ok())
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR); throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::ROCKSDB_ERROR);
} }

View File

@ -2,13 +2,12 @@
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Storages/StorageInMemoryMetadata.h> #include <Storages/StorageInMemoryMetadata.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
namespace DB namespace DB
{ {
class StorageEmbeddedRocksDB;
class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream
{ {
public: public:

View File

@ -268,6 +268,7 @@ void StorageEmbeddedRocksDB::initDb()
rocksdb::Options options; rocksdb::Options options;
rocksdb::DB * db; rocksdb::DB * db;
options.create_if_missing = true; options.create_if_missing = true;
options.compression = rocksdb::CompressionType::kZSTD;
rocksdb::Status status = rocksdb::DB::Open(options, rocksdb_dir, &db); rocksdb::Status status = rocksdb::DB::Open(options, rocksdb_dir, &db);
if (status != rocksdb::Status::OK()) if (status != rocksdb::Status::OK())

View File

@ -1,6 +1,8 @@
1 1
1 1
0 0 0 0 0 1
1
1 1 1 1 1
1 1
1 1
1 1

View File

@ -1,17 +1,26 @@
DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test;
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB primary key(key);
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB; -- { serverError 36 }
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB PRIMARY KEY(key2); -- { serverError 47 }
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB PRIMARY KEY(key, value); -- { serverError 36 }
CREATE TABLE test (key Tuple(String, UInt32), value UInt64) Engine=EmbeddedRocksDB PRIMARY KEY(key);
DROP TABLE IF EXISTS test;
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB PRIMARY KEY(key);
INSERT INTO test SELECT '1_1', number FROM numbers(10000); INSERT INTO test SELECT '1_1', number FROM numbers(10000);
SELECT count(1) == 1 FROM test; SELECT COUNT(1) == 1 FROM test;
INSERT INTO test SELECT concat(toString(number), '_1'), number FROM numbers(10000); INSERT INTO test SELECT concat(toString(number), '_1'), number FROM numbers(10000);
SELECT SUM(value) == 1 + 99 + 900 FROM test WHERE key in ('1_1', '99_1', '900_1'); SELECT COUNT(1) == 10000 FROM test;
SELECT uniqExact(key) == 32 FROM (SELECT * FROM test LIMIT 32 SETTINGS max_block_size = 1);
SELECT SUM(value) == 1 + 99 + 900 FROM test WHERE key IN ('1_1', '99_1', '900_1');
DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_memory; DROP TABLE IF EXISTS test_memory;
CREATE TABLE test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=EmbeddedRocksDB primary key(k); CREATE TABLE test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=EmbeddedRocksDB PRIMARY KEY(k);
CREATE TABLE test_memory AS test Engine = Memory; CREATE TABLE test_memory AS test Engine = Memory;
INSERT INTO test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k; INSERT INTO test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k;
@ -19,7 +28,7 @@ INSERT INTO test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmap
INSERT INTO test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k; INSERT INTO test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k;
SELECT A.a - B.a, A.b - B.b, A.c - B.c, A.d - B.d, A.e - B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test_memory) B USING a ORDER BY a; SELECT A.a = B.a, A.b = B.b, A.c = B.c, A.d = B.d, A.e = B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test_memory) B USING a ORDER BY a;
CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000); CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000);
@ -31,7 +40,7 @@ SELECT k, value FROM test WHERE k = 0 OR value > 0; -- { serverError 158 }
SELECT k, value FROM test WHERE k = 0 AND k IN (1, 3) OR k > 8; -- { serverError 158 } SELECT k, value FROM test WHERE k = 0 AND k IN (1, 3) OR k > 8; -- { serverError 158 }
TRUNCATE TABLE test; TRUNCATE TABLE test;
SELECT 0 == count(1) FROM test; SELECT 0 == COUNT(1) FROM test;
DROP TABLE IF EXISTS test; DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_memory; DROP TABLE IF EXISTS test_memory;