This commit is contained in:
Alexander Tokmakov 2020-06-03 18:07:37 +03:00
parent e3aa20708a
commit e67837bc4a
21 changed files with 381 additions and 682 deletions

6
.gitmodules vendored
View File

@ -159,11 +159,11 @@
url = https://github.com/openldap/openldap.git
[submodule "contrib/cassandra"]
path = contrib/cassandra
url = https://github.com/tavplubix/cpp-driver.git
branch = ch-tmp
url = https://github.com/ClickHouse-Extras/cpp-driver.git
branch = clickhouse
[submodule "contrib/libuv"]
path = contrib/libuv
url = https://github.com/tavplubix/libuv.git
url = https://github.com/ClickHouse-Extras/libuv.git
branch = clickhouse
[submodule "contrib/fmtlib"]
path = contrib/fmtlib

View File

@ -328,6 +328,7 @@ message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE
include (GNUInstallDirs)
include (cmake/contrib_finder.cmake)
include (cmake/lib_name.cmake)
find_contrib_lib(double-conversion) # Must be before parquet
include (cmake/find/ssl.cmake)
include (cmake/find/ldap.cmake) # after ssl

View File

@ -1,8 +1,10 @@
if (NOT DEFINED ENABLE_CASSANDRA OR ENABLE_CASSANDRA)
option(ENABLE_CASSANDRA "Enable Cassandra" ${ENABLE_LIBRARIES})
if (ENABLE_CASSANDRA)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libuv")
message (WARNING "submodule contrib/libuv is missing. to fix try run: \n git submodule update --init --recursive")
message (ERROR "submodule contrib/libuv is missing. to fix try run: \n git submodule update --init --recursive")
elseif (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
message (WARNING "submodule contrib/cassandra is missing. to fix try run: \n git submodule update --init --recursive")
message (ERROR "submodule contrib/cassandra is missing. to fix try run: \n git submodule update --init --recursive")
else()
set (LIBUV_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libuv")
set (CASSANDRA_INCLUDE_DIR
@ -17,6 +19,8 @@ if (NOT DEFINED ENABLE_CASSANDRA OR ENABLE_CASSANDRA)
set (USE_CASSANDRA 1)
set (CASS_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
message(STATUS "Using cassandra: ${CASSANDRA_LIBRARY}")
endif()
endif()
message (STATUS "Using cassandra=${USE_CASSANDRA}: ${CASSANDRA_INCLUDE_DIR} : ${CASSANDRA_LIBRARY}")
message (STATUS "Using libuv: ${LIBUV_ROOT_DIR} : ${LIBUV_LIBRARY}")

View File

@ -291,7 +291,7 @@ if (USE_INTERNAL_AWS_S3_LIBRARY)
endif ()
if (USE_BASE64)
add_subdirectory(base64-cmake)
add_subdirectory (base64-cmake)
endif()
if (USE_INTERNAL_HYPERSCAN_LIBRARY)
@ -315,8 +315,8 @@ if (USE_FASTOPS)
endif()
if (USE_CASSANDRA)
add_subdirectory(libuv-cmake)
add_subdirectory(cassandra)
add_subdirectory (libuv)
add_subdirectory (cassandra)
endif()
add_subdirectory (fmtlib-cmake)

2
contrib/cassandra vendored

@ -1 +1 @@
Subproject commit 58a71947d9dd8412f5aeb38275fa81417ea27ee0
Subproject commit a49b4e0e2696a4b8ef286a5b9538d1cbe8490509

2
contrib/libuv vendored

@ -1 +1 @@
Subproject commit 379988fef9b0c6ac706a624dbac6be8924a3a0da
Subproject commit 84438304f41d8ea6670ee5409f4d6c63ca784f28

View File

@ -1,441 +0,0 @@
cmake_minimum_required(VERSION 3.4)
project(libuv LANGUAGES C)
include(CMakePackageConfigHelpers)
include(CMakeDependentOption)
include(GNUInstallDirs)
include(CTest)
#cmake_dependent_option(LIBUV_BUILD_TESTS
# "Build the unit tests when BUILD_TESTING is enabled and we are the root project" ON
# "BUILD_TESTING;CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR" OFF)
if(MSVC)
list(APPEND uv_cflags /W4)
elseif(CMAKE_C_COMPILER_ID MATCHES "AppleClang|Clang|GNU")
list(APPEND uv_cflags -fvisibility=hidden --std=gnu89)
list(APPEND uv_cflags -Wall -Wextra -Wstrict-prototypes)
list(APPEND uv_cflags -Wno-unused-parameter)
endif()
set(uv_sources
src/fs-poll.c
src/idna.c
src/inet.c
src/random.c
src/strscpy.c
src/threadpool.c
src/timer.c
src/uv-common.c
src/uv-data-getter-setters.c
src/version.c)
set(uv_test_sources
test/blackhole-server.c
test/echo-server.c
test/run-tests.c
test/runner.c
test/test-active.c
test/test-async-null-cb.c
test/test-async.c
test/test-barrier.c
test/test-callback-order.c
test/test-callback-stack.c
test/test-close-fd.c
test/test-close-order.c
test/test-condvar.c
test/test-connect-unspecified.c
test/test-connection-fail.c
test/test-cwd-and-chdir.c
test/test-default-loop-close.c
test/test-delayed-accept.c
test/test-dlerror.c
test/test-eintr-handling.c
test/test-embed.c
test/test-emfile.c
test/test-env-vars.c
test/test-error.c
test/test-fail-always.c
test/test-fork.c
test/test-fs-copyfile.c
test/test-fs-event.c
test/test-fs-poll.c
test/test-fs.c
test/test-fs-readdir.c
test/test-fs-fd-hash.c
test/test-fs-open-flags.c
test/test-get-currentexe.c
test/test-get-loadavg.c
test/test-get-memory.c
test/test-get-passwd.c
test/test-getaddrinfo.c
test/test-gethostname.c
test/test-getnameinfo.c
test/test-getsockname.c
test/test-getters-setters.c
test/test-gettimeofday.c
test/test-handle-fileno.c
test/test-homedir.c
test/test-hrtime.c
test/test-idle.c
test/test-idna.c
test/test-ip4-addr.c
test/test-ip6-addr.c
test/test-ipc-heavy-traffic-deadlock-bug.c
test/test-ipc-send-recv.c
test/test-ipc.c
test/test-loop-alive.c
test/test-loop-close.c
test/test-loop-configure.c
test/test-loop-handles.c
test/test-loop-stop.c
test/test-loop-time.c
test/test-multiple-listen.c
test/test-mutexes.c
test/test-osx-select.c
test/test-pass-always.c
test/test-ping-pong.c
test/test-pipe-bind-error.c
test/test-pipe-close-stdout-read-stdin.c
test/test-pipe-connect-error.c
test/test-pipe-connect-multiple.c
test/test-pipe-connect-prepare.c
test/test-pipe-getsockname.c
test/test-pipe-pending-instances.c
test/test-pipe-sendmsg.c
test/test-pipe-server-close.c
test/test-pipe-set-fchmod.c
test/test-pipe-set-non-blocking.c
test/test-platform-output.c
test/test-poll-close-doesnt-corrupt-stack.c
test/test-poll-close.c
test/test-poll-closesocket.c
test/test-poll-oob.c
test/test-poll.c
test/test-process-priority.c
test/test-process-title-threadsafe.c
test/test-process-title.c
test/test-queue-foreach-delete.c
test/test-random.c
test/test-ref.c
test/test-run-nowait.c
test/test-run-once.c
test/test-semaphore.c
test/test-shutdown-close.c
test/test-shutdown-eof.c
test/test-shutdown-twice.c
test/test-signal-multiple-loops.c
test/test-signal-pending-on-close.c
test/test-signal.c
test/test-socket-buffer-size.c
test/test-spawn.c
test/test-stdio-over-pipes.c
test/test-strscpy.c
test/test-tcp-alloc-cb-fail.c
test/test-tcp-bind-error.c
test/test-tcp-bind6-error.c
test/test-tcp-close-accept.c
test/test-tcp-close-while-connecting.c
test/test-tcp-close.c
test/test-tcp-close-reset.c
test/test-tcp-connect-error-after-write.c
test/test-tcp-connect-error.c
test/test-tcp-connect-timeout.c
test/test-tcp-connect6-error.c
test/test-tcp-create-socket-early.c
test/test-tcp-flags.c
test/test-tcp-oob.c
test/test-tcp-open.c
test/test-tcp-read-stop.c
test/test-tcp-shutdown-after-write.c
test/test-tcp-try-write.c
test/test-tcp-try-write-error.c
test/test-tcp-unexpected-read.c
test/test-tcp-write-after-connect.c
test/test-tcp-write-fail.c
test/test-tcp-write-queue-order.c
test/test-tcp-write-to-half-open-connection.c
test/test-tcp-writealot.c
test/test-thread-equal.c
test/test-thread.c
test/test-threadpool-cancel.c
test/test-threadpool.c
test/test-timer-again.c
test/test-timer-from-check.c
test/test-timer.c
test/test-tmpdir.c
test/test-tty-duplicate-key.c
test/test-tty.c
test/test-udp-alloc-cb-fail.c
test/test-udp-bind.c
test/test-udp-connect.c
test/test-udp-create-socket-early.c
test/test-udp-dgram-too-big.c
test/test-udp-ipv6.c
test/test-udp-multicast-interface.c
test/test-udp-multicast-interface6.c
test/test-udp-multicast-join.c
test/test-udp-multicast-join6.c
test/test-udp-multicast-ttl.c
test/test-udp-open.c
test/test-udp-options.c
test/test-udp-send-and-recv.c
test/test-udp-send-hang-loop.c
test/test-udp-send-immediate.c
test/test-udp-send-unreachable.c
test/test-udp-try-send.c
test/test-uname.c
test/test-walk-handles.c
test/test-watcher-cross-stop.c)
#if(WIN32)
# list(APPEND uv_defines WIN32_LEAN_AND_MEAN _WIN32_WINNT=0x0600)
# list(APPEND uv_libraries
# advapi32
# iphlpapi
# psapi
# shell32
# user32
# userenv
# ws2_32)
# list(APPEND uv_sources
# src/win/async.c
# src/win/core.c
# src/win/detect-wakeup.c
# src/win/dl.c
# src/win/error.c
# src/win/fs.c
# src/win/fs-event.c
# src/win/getaddrinfo.c
# src/win/getnameinfo.c
# src/win/handle.c
# src/win/loop-watcher.c
# src/win/pipe.c
# src/win/thread.c
# src/win/poll.c
# src/win/process.c
# src/win/process-stdio.c
# src/win/signal.c
# src/win/snprintf.c
# src/win/stream.c
# src/win/tcp.c
# src/win/tty.c
# src/win/udp.c
# src/win/util.c
# src/win/winapi.c
# src/win/winsock.c)
# list(APPEND uv_test_libraries ws2_32)
# list(APPEND uv_test_sources src/win/snprintf.c test/runner-win.c)
#else()
if(CMAKE_SIZEOF_VOID_P EQUAL 4)
list(APPEND uv_defines _FILE_OFFSET_BITS=64 _LARGEFILE_SOURCE)
endif()
if(NOT CMAKE_SYSTEM_NAME STREQUAL "Android")
# Android has pthread as part of its c library, not as a separate
# libpthread.so.
list(APPEND uv_libraries pthread)
endif()
list(APPEND uv_sources
src/unix/async.c
src/unix/core.c
src/unix/dl.c
src/unix/fs.c
src/unix/getaddrinfo.c
src/unix/getnameinfo.c
src/unix/loop-watcher.c
src/unix/loop.c
src/unix/pipe.c
src/unix/poll.c
src/unix/process.c
src/unix/random-devurandom.c
src/unix/signal.c
src/unix/stream.c
src/unix/tcp.c
src/unix/thread.c
src/unix/tty.c
src/unix/udp.c)
list(APPEND uv_test_sources test/runner-unix.c)
#endif()
if(CMAKE_SYSTEM_NAME STREQUAL "AIX")
list(APPEND uv_defines
_ALL_SOURCE
_LINUX_SOURCE_COMPAT
_THREAD_SAFE
_XOPEN_SOURCE=500)
list(APPEND uv_libraries perfstat)
list(APPEND uv_sources src/unix/aix.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "Android")
list(APPEND uv_libs dl)
list(APPEND uv_sources
src/unix/android-ifaddrs.c
src/unix/linux-core.c
src/unix/linux-inotify.c
src/unix/linux-syscalls.c
src/unix/procfs-exepath.c
src/unix/pthread-fixes.c
src/unix/random-getrandom.c
src/unix/random-sysctl-linux.c
src/unix/sysinfo-loadavg.c)
endif()
if(APPLE OR CMAKE_SYSTEM_NAME MATCHES "Android|Linux|OS/390")
list(APPEND uv_sources src/unix/proctitle.c)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "DragonFly|FreeBSD")
list(APPEND uv_sources src/unix/freebsd.c)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "DragonFly|FreeBSD|NetBSD|OpenBSD")
list(APPEND uv_sources src/unix/posix-hrtime.c src/unix/bsd-proctitle.c)
endif()
if(APPLE OR CMAKE_SYSTEM_NAME MATCHES "DragonFly|FreeBSD|NetBSD|OpenBSD")
list(APPEND uv_sources src/unix/bsd-ifaddrs.c src/unix/kqueue.c)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "FreeBSD")
list(APPEND uv_sources src/unix/random-getrandom.c)
endif()
if(APPLE OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
list(APPEND uv_sources src/unix/random-getentropy.c)
endif()
if(APPLE)
list(APPEND uv_defines _DARWIN_UNLIMITED_SELECT=1 _DARWIN_USE_64_BIT_INODE=1)
list(APPEND uv_sources
src/unix/darwin-proctitle.c
src/unix/darwin.c
src/unix/fsevents.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
list(APPEND uv_defines _GNU_SOURCE _POSIX_C_SOURCE=200112)
list(APPEND uv_libraries dl rt)
list(APPEND uv_sources
src/unix/linux-core.c
src/unix/linux-inotify.c
src/unix/linux-syscalls.c
src/unix/procfs-exepath.c
src/unix/random-getrandom.c
src/unix/random-sysctl-linux.c
src/unix/sysinfo-loadavg.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "NetBSD")
list(APPEND uv_sources src/unix/netbsd.c)
list(APPEND uv_libraries kvm)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
list(APPEND uv_sources src/unix/openbsd.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "OS/390")
list(APPEND uv_defines PATH_MAX=255)
list(APPEND uv_defines _AE_BIMODAL)
list(APPEND uv_defines _ALL_SOURCE)
list(APPEND uv_defines _LARGE_TIME_API)
list(APPEND uv_defines _OPEN_MSGQ_EXT)
list(APPEND uv_defines _OPEN_SYS_FILE_EXT)
list(APPEND uv_defines _OPEN_SYS_IF_EXT)
list(APPEND uv_defines _OPEN_SYS_SOCK_EXT3)
list(APPEND uv_defines _OPEN_SYS_SOCK_IPV6)
list(APPEND uv_defines _UNIX03_SOURCE)
list(APPEND uv_defines _UNIX03_THREADS)
list(APPEND uv_defines _UNIX03_WITHDRAWN)
list(APPEND uv_defines _XOPEN_SOURCE_EXTENDED)
list(APPEND uv_sources
src/unix/pthread-fixes.c
src/unix/pthread-barrier.c
src/unix/os390.c
src/unix/os390-syscalls.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "SunOS")
list(APPEND uv_defines __EXTENSIONS__ _XOPEN_SOURCE=500)
list(APPEND uv_libraries kstat nsl sendfile socket)
list(APPEND uv_sources src/unix/no-proctitle.c src/unix/sunos.c)
endif()
if(APPLE OR CMAKE_SYSTEM_NAME MATCHES "DragonFly|FreeBSD|Linux|NetBSD|OpenBSD")
list(APPEND uv_test_libraries util)
endif()
set(uv_sources_tmp "")
foreach(file ${uv_sources})
list(APPEND uv_sources_tmp "${LIBUV_ROOT_DIR}/${file}")
endforeach(file)
set(uv_sources "${uv_sources_tmp}")
list(APPEND uv_defines CLICKHOUSE_GLIBC_COMPATIBILITY)
add_library(uv SHARED ${uv_sources})
target_compile_definitions(uv
INTERFACE USING_UV_SHARED=1
PRIVATE ${uv_defines} BUILDING_UV_SHARED=1)
target_compile_options(uv PRIVATE ${uv_cflags})
target_include_directories(uv PUBLIC ${LIBUV_ROOT_DIR}/include PRIVATE ${LIBUV_ROOT_DIR}/src)
target_link_libraries(uv ${uv_libraries})
add_library(uv_a STATIC ${uv_sources})
target_compile_definitions(uv_a PRIVATE ${uv_defines})
target_compile_options(uv_a PRIVATE ${uv_cflags})
target_include_directories(uv_a PUBLIC ${LIBUV_ROOT_DIR}/include PRIVATE ${LIBUV_ROOT_DIR}/src)
target_link_libraries(uv_a ${uv_libraries})
#if(LIBUV_BUILD_TESTS)
# add_executable(uv_run_tests ${uv_test_sources})
# target_compile_definitions(uv_run_tests
# PRIVATE ${uv_defines} USING_UV_SHARED=1)
# target_compile_options(uv_run_tests PRIVATE ${uv_cflags})
# target_link_libraries(uv_run_tests uv ${uv_test_libraries})
# add_test(NAME uv_test
# COMMAND uv_run_tests
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
# add_executable(uv_run_tests_a ${uv_test_sources})
# target_compile_definitions(uv_run_tests_a PRIVATE ${uv_defines})
# target_compile_options(uv_run_tests_a PRIVATE ${uv_cflags})
# target_link_libraries(uv_run_tests_a uv_a ${uv_test_libraries})
# add_test(NAME uv_test_a
# COMMAND uv_run_tests_a
# WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
#endif()
if(UNIX)
# Now for some gibbering horrors from beyond the stars...
foreach(x ${uv_libraries})
set(LIBS "${LIBS} -l${x}")
endforeach(x)
file(STRINGS ${LIBUV_ROOT_DIR}/configure.ac configure_ac REGEX ^AC_INIT)
string(REGEX MATCH [0-9]+[.][0-9]+[.][0-9]+ PACKAGE_VERSION "${configure_ac}")
string(REGEX MATCH ^[0-9]+ UV_VERSION_MAJOR "${PACKAGE_VERSION}")
# The version in the filename is mirroring the behaviour of autotools.
set_target_properties(uv PROPERTIES VERSION ${UV_VERSION_MAJOR}.0.0
SOVERSION ${UV_VERSION_MAJOR})
set(includedir ${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_INCLUDEDIR})
set(libdir ${CMAKE_INSTALL_PREFIX}/${CMAKE_INSTALL_LIBDIR})
set(prefix ${CMAKE_INSTALL_PREFIX})
configure_file(${LIBUV_ROOT_DIR}/libuv.pc.in ${LIBUV_ROOT_DIR}/libuv.pc @ONLY)
install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})
install(FILES LICENSE DESTINATION ${CMAKE_INSTALL_DOCDIR})
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/libuv.pc
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
install(TARGETS uv LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
install(TARGETS uv_a ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif()
#if(WIN32)
# install(DIRECTORY include/ DESTINATION include)
# install(FILES LICENSE DESTINATION .)
# install(TARGETS uv uv_a
# RUNTIME DESTINATION lib/$<CONFIG>
# ARCHIVE DESTINATION lib/$<CONFIG>)
#endif()

View File

@ -633,9 +633,35 @@ Example of settings:
<source>
<cassandra>
<host>localhost</host>
<port>6349</port>
<port>9042</port>
<user>username</user>
<password>qwerty123</password>
<keyspase>database_name</keyspase>
<column_family>table_name</column_family>
<allow_filering>1</allow_filering>
<partition_key_prefix>1</partition_key_prefix>
<consistency>One</consistency>
<where>"SomeColumn" = 42</where>
<max_threads>8</max_threads>
</cassandra>
</source>
```
Setting fields:
- `host` The Cassandra host or comma-separated list of hosts.
- `port` The port on the Cassandra servers. If not specified, default port is used.
- `user` Name of the Cassandra user.
- `password` Password of the Cassandra user.
- `keyspace` Name of the keyspace (database).
- `column_family` Name of the column family (table).
- `allow_filering` Flag to allow or not potentially expensive conditions on clustering key columns. Default value is 1.
- `partition_key_prefix` Number of partition key columns in primary key of the Cassandra table.
Required for compose key dictionaries. Order of key columns in the dictionary definition must be the same as in Cassandra.
Default value is 1 (the first key column is a partition key and other key columns are clustering key).
- `consistency` Consistency level. Possible values: `One`, `Two`, `Three`,
`All`, `EachQuorum`, `Quorum`, `LocalQuorum`, `LocalOne`, `Serial`, `LocalSerial`. Default is `One`.
- `where` Optional selection criteria.
- `max_threads` The maximum number of threads to use for loading data from multiple partitions in compose key dictionaries.
[Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts_dict_sources/) <!--hide-->

View File

@ -16,189 +16,261 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int TYPE_MISMATCH;
extern const int CASSANDRA_INTERNAL_ERROR;
}
CassandraBlockInputStream::CassandraBlockInputStream(
const CassClusterPtr & cluster,
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
const size_t max_block_size_)
: statement(query_str.c_str(), /*parameters count*/ 0)
size_t max_block_size_)
: session(session_)
, statement(query_str.c_str(), /*parameters count*/ 0)
, max_block_size(max_block_size_)
, has_more_pages(cass_true)
{
description.init(sample_block);
cassandraCheck(cass_statement_set_paging_size(statement, max_block_size));
cassandraWaitAndCheck(cass_session_connect(session, cluster));
}
namespace
void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value) const
{
using ValueType = ExternalResultDescription::ValueType;
void insertValue(IColumn & column, const ValueType type, const CassValue * cass_value)
switch (type)
{
/// Cassandra does not support unsigned integers (cass_uint32_t is for Date)
switch (type)
case ValueType::vtUInt8:
{
case ValueType::vtUInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnUInt8 &>(column).insertValue(value);
break;
}
case ValueType::vtUInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(value);
break;
}
case ValueType::vtUInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(value);
break;
}
case ValueType::vtUInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt64 &>(column).insertValue(value);
break;
}
case ValueType::vtInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
}
case ValueType::vtInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
}
case ValueType::vtInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
}
case ValueType::vtInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat32:
{
cass_float_t value;
cass_value_get_float(cass_value, &value);
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat64:
{
cass_double_t value;
cass_value_get_double(cass_value, &value);
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
}
case ValueType::vtString:
{
const char * value;
size_t value_length;
cass_value_get_string(cass_value, &value, &value_length);
assert_cast<ColumnString &>(column).insertData(value, value_length);
break;
}
case ValueType::vtDate:
{
cass_uint32_t value;
cass_value_get_uint32(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtDateTime:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value / 1000));
break;
}
case ValueType::vtUUID:
{
CassUuid value;
cass_value_get_uuid(cass_value, &value);
std::array<char, CASS_UUID_STRING_LENGTH> uuid_str;
cass_uuid_string(value, uuid_str.data());
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnUInt8 &>(column).insertValue(static_cast<UInt8>(value));
break;
}
case ValueType::vtUInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtUInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value));
break;
}
case ValueType::vtUInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt64 &>(column).insertValue(static_cast<UInt64>(value));
break;
}
case ValueType::vtInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
}
case ValueType::vtInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
}
case ValueType::vtInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
}
case ValueType::vtInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat32:
{
cass_float_t value;
cass_value_get_float(cass_value, &value);
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat64:
{
cass_double_t value;
cass_value_get_double(cass_value, &value);
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
}
case ValueType::vtString:
{
const char * value = nullptr;
size_t value_length;
cass_value_get_string(cass_value, &value, &value_length);
assert_cast<ColumnString &>(column).insertData(value, value_length);
break;
}
case ValueType::vtDate:
{
cass_uint32_t value;
cass_value_get_uint32(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtDateTime:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value / 1000));
break;
}
case ValueType::vtUUID:
{
CassUuid value;
cass_value_get_uuid(cass_value, &value);
std::array<char, CASS_UUID_STRING_LENGTH> uuid_str;
cass_uuid_string(value, uuid_str.data());
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
}
}
Block CassandraBlockInputStream::readImpl()
void CassandraBlockInputStream::readPrefix()
{
result_future = cass_session_execute(*session, statement);
}
Block CassandraBlockInputStream::readImpl()
{
if (!has_more_pages)
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
cassandraWaitAndCheck(result_future);
CassResultPtr result = cass_future_get_result(result_future);
assert(cass_result_column_count(result) == columns.size());
assertTypes(result);
has_more_pages = cass_result_has_more_pages(result);
if (has_more_pages)
{
if (!has_more_pages)
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
CassFuturePtr query_future = cass_session_execute(session, statement);
CassResultPtr result = cass_future_get_result(query_future);
if (!result) {
const char* error_message;
size_t error_message_length;
cass_future_error_message(query_future, &error_message, &error_message_length);
throw Exception{error_message, ErrorCodes::CASSANDRA_INTERNAL_ERROR};
}
[[maybe_unused]] size_t row_count = 0;
assert(cass_result_column_count(result) == columns.size());
CassIteratorPtr rows_iter = cass_iterator_from_result(result); /// Points to rows[-1]
while (cass_iterator_next(rows_iter))
{
const CassRow * row = cass_iterator_get_row(rows_iter);
for (size_t col_idx = 0; col_idx < columns.size(); ++col_idx)
{
const CassValue * val = cass_row_get_column(row, col_idx);
if (cass_value_is_null(val))
columns[col_idx]->insertDefault();
else if (description.types[col_idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[col_idx]);
insertValue(column_nullable.getNestedColumn(), description.types[col_idx].first, val);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[col_idx], description.types[col_idx].first, val);
}
++row_count;
}
assert(cass_result_row_count(result) == row_count);
has_more_pages = cass_result_has_more_pages(result);
if (has_more_pages)
cassandraCheck(cass_statement_set_paging_state(statement, result));
return description.sample_block.cloneWithColumns(std::move(columns));
cassandraCheck(cass_statement_set_paging_state(statement, result));
result_future = cass_session_execute(*session, statement);
}
CassIteratorPtr rows_iter = cass_iterator_from_result(result); /// Points to rows[-1]
while (cass_iterator_next(rows_iter))
{
const CassRow * row = cass_iterator_get_row(rows_iter);
for (size_t col_idx = 0; col_idx < columns.size(); ++col_idx)
{
const CassValue * val = cass_row_get_column(row, col_idx);
if (cass_value_is_null(val))
columns[col_idx]->insertDefault();
else if (description.types[col_idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[col_idx]);
insertValue(column_nullable.getNestedColumn(), description.types[col_idx].first, val);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[col_idx], description.types[col_idx].first, val);
}
}
assert(cass_result_row_count(result) == columns.front()->size());
return description.sample_block.cloneWithColumns(std::move(columns));
}
void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
{
if (!assert_types)
return;
size_t column_count = cass_result_column_count(result);
for (size_t i = 0; i < column_count; ++i)
{
CassValueType expected;
String expected_text;
/// Cassandra does not support unsigned integers (cass_uint32_t is for Date)
switch (description.types[i].first)
{
case ExternalResultDescription::ValueType::vtInt8:
case ExternalResultDescription::ValueType::vtUInt8:
expected = CASS_VALUE_TYPE_TINY_INT;
expected_text = "tinyint";
break;
case ExternalResultDescription::ValueType::vtInt16:
case ExternalResultDescription::ValueType::vtUInt16:
expected = CASS_VALUE_TYPE_SMALL_INT;
expected_text = "smallint";
break;
case ExternalResultDescription::ValueType::vtUInt32:
case ExternalResultDescription::ValueType::vtInt32:
expected = CASS_VALUE_TYPE_INT;
expected_text = "int";
break;
case ExternalResultDescription::ValueType::vtInt64:
case ExternalResultDescription::ValueType::vtUInt64:
expected = CASS_VALUE_TYPE_BIGINT;
expected_text = "bigint";
break;
case ExternalResultDescription::ValueType::vtFloat32:
expected = CASS_VALUE_TYPE_FLOAT;
expected_text = "float";
break;
case ExternalResultDescription::ValueType::vtFloat64:
expected = CASS_VALUE_TYPE_DOUBLE;
expected_text = "double";
break;
case ExternalResultDescription::ValueType::vtString:
expected = CASS_VALUE_TYPE_TEXT;
expected_text = "text, ascii or varchar";
break;
case ExternalResultDescription::ValueType::vtDate:
expected = CASS_VALUE_TYPE_DATE;
expected_text = "date";
break;
case ExternalResultDescription::ValueType::vtDateTime:
expected = CASS_VALUE_TYPE_TIMESTAMP;
expected_text = "timestamp";
break;
case ExternalResultDescription::ValueType::vtUUID:
expected = CASS_VALUE_TYPE_UUID;
expected_text = "uuid";
break;
}
CassValueType got = cass_result_column_type(result, i);
if (got != expected)
{
if (expected == CASS_VALUE_TYPE_TEXT && (got == CASS_VALUE_TYPE_ASCII || got == CASS_VALUE_TYPE_VARCHAR))
continue;
const auto & column_name = description.sample_block.getColumnsWithTypeAndName()[i].name;
throw Exception("Type mismatch for column " + column_name + ": expected Cassandra type " + expected_text,
ErrorCodes::TYPE_MISMATCH);
}
}
assert_types = false;
}
}
#endif

View File

@ -9,30 +9,35 @@
namespace DB
{
class CassandraBlockInputStream final : public IBlockInputStream
{
public:
CassandraBlockInputStream(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size);
String getName() const override { return "Cassandra"; }
/// Allows processing results of a Cassandra query as a sequence of Blocks, simplifies chaining
class CassandraBlockInputStream final : public IBlockInputStream
{
public:
CassandraBlockInputStream(
const CassClusterPtr & cluster,
const String & query_str,
const Block & sample_block,
const size_t max_block_size);
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
String getName() const override { return "Cassandra"; }
void readPrefix() override;
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
using ValueType = ExternalResultDescription::ValueType;
private:
Block readImpl() override;
Block readImpl() override;
void insertValue(IColumn & column, ValueType type, const CassValue * cass_value) const;
void assertTypes(const CassResultPtr & result);
CassSessionPtr session;
CassStatementPtr statement;
const size_t max_block_size;
ExternalResultDescription description;
cass_bool_t has_more_pages;
};
CassSessionShared session;
CassStatementPtr statement;
CassFuturePtr result_future;
const size_t max_block_size;
ExternalResultDescription description;
cass_bool_t has_more_pages;
bool assert_types = true;
};
}

View File

@ -1,37 +1,35 @@
#include "CassandraDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include <common/logger_useful.h>
#include <Common/SipHash.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int NOT_IMPLEMENTED;
}
void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
{
auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
const Context & /* context */,
bool /*check_config*/) -> DictionarySourcePtr
{
auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
const Context & /* context */,
bool /*check_config*/) -> DictionarySourcePtr
{
#if USE_CASSANDRA
setupCassandraDriverLibraryLogging(CASS_LOG_TRACE);
return std::make_unique<CassandraDictionarySource>(dict_struct, config, config_prefix + ".cassandra", sample_block);
setupCassandraDriverLibraryLogging(CASS_LOG_INFO);
return std::make_unique<CassandraDictionarySource>(dict_struct, config, config_prefix + ".cassandra", sample_block);
#else
throw Exception{"Dictionary source of type `cassandra` is disabled because library was built without cassandra support.",
ErrorCodes::SUPPORT_IS_DISABLED};
throw Exception{"Dictionary source of type `cassandra` is disabled because ClickHouse was built without cassandra support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("cassandra", create_table_source);
}
};
factory.registerSource("cassandra", create_table_source);
}
}
@ -39,8 +37,9 @@ namespace DB
#include <IO/WriteHelpers.h>
#include <Common/SipHash.h>
#include <ext/range.h>
#include "CassandraBlockInputStream.h"
#include <common/logger_useful.h>
#include <DataStreams/UnionBlockInputStream.h>
namespace DB
{
@ -57,7 +56,7 @@ CassandraSettings::CassandraSettings(
, port(config.getUInt(config_prefix + ".port", 0))
, user(config.getString(config_prefix + ".user", ""))
, password(config.getString(config_prefix + ".password", ""))
, db(config.getString(config_prefix + ".keyspace", ""))
, db(config.getString(config_prefix + ".keyspace"))
, table(config.getString(config_prefix + ".column_family"))
, allow_filtering(config.getBool(config_prefix + ".allow_filtering", false))
, partition_key_prefix(config.getUInt(config_prefix + ".partition_key_prefix", 1))
@ -124,7 +123,7 @@ CassandraDictionarySource::CassandraDictionarySource(
{
}
void CassandraDictionarySource::maybeAllowFiltering(String & query)
void CassandraDictionarySource::maybeAllowFiltering(String & query) const
{
if (!settings.allow_filtering)
return;
@ -137,10 +136,11 @@ BlockInputStreamPtr CassandraDictionarySource::loadAll()
String query = query_builder.composeLoadAllQuery();
maybeAllowFiltering(query);
LOG_INFO(log, "Loading all using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(cluster, query, sample_block, max_block_size);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
std::string CassandraDictionarySource::toString() const {
std::string CassandraDictionarySource::toString() const
{
return "Cassandra: " + settings.db + '.' + settings.table;
}
@ -149,7 +149,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector<UInt64>
String query = query_builder.composeLoadIdsQuery(ids);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading ids using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(cluster, query, sample_block, max_block_size);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
@ -162,7 +162,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu
for (const auto & row : requested_rows)
{
SipHash partition_key;
for (const auto i : ext::range(0, settings.partition_key_prefix))
for (size_t i = 0; i < settings.partition_key_prefix; ++i)
key_columns[i]->updateHashWithValue(row, partition_key);
partitions[partition_key.get64()].push_back(row);
}
@ -173,7 +173,7 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
streams.push_back(std::make_shared<CassandraBlockInputStream>(cluster, query, sample_block, max_block_size));
streams.push_back(std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size));
}
if (streams.size() == 1)
@ -182,6 +182,30 @@ BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_colu
return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
}
BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll()
{
throw Exception("Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED);
}
CassSessionShared CassandraDictionarySource::getSession()
{
/// Reuse connection if exists, create new one if not
auto session = maybe_session.lock();
if (session)
return session;
std::lock_guard lock(connect_mutex);
session = maybe_session.lock();
if (session)
return session;
session = std::make_shared<CassSessionPtr>();
CassFuturePtr future = cass_session_connect(*session, cluster);
cassandraWaitAndCheck(future);
maybe_session = session;
return session;
}
}
#endif

View File

@ -34,7 +34,8 @@ struct CassandraSettings
void setConsistency(const String & config_str);
};
class CassandraDictionarySource final : public IDictionarySource {
class CassandraDictionarySource final : public IDictionarySource
{
public:
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
@ -64,15 +65,13 @@ public:
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadUpdatedAll() override
{
throw Exception{"Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED};
}
BlockInputStreamPtr loadUpdatedAll() override;
String toString() const override;
private:
void maybeAllowFiltering(String & query);
void maybeAllowFiltering(String & query) const;
CassSessionShared getSession();
Poco::Logger * log;
const DictionaryStructure dict_struct;
@ -80,7 +79,9 @@ private:
Block sample_block;
ExternalQueryBuilder query_builder;
std::mutex connect_mutex;
CassClusterPtr cluster;
CassSessionWeak maybe_session;
};
}

View File

@ -21,7 +21,7 @@ void cassandraCheck(CassError code)
}
void cassandraWaitAndCheck(CassFuturePtr && future)
void cassandraWaitAndCheck(CassFuturePtr & future)
{
auto code = cass_future_error_code(future); /// Waits if not ready
if (code == CASS_OK)

View File

@ -7,6 +7,7 @@
#if USE_CASSANDRA
#include <cassandra.h>
#include <utility>
#include <memory>
namespace DB
{
@ -37,6 +38,7 @@ public:
Dtor(ptr);
ptr = rhs.ptr;
rhs.ptr = nullptr;
return *this;
}
~ObjectHolder()
@ -54,8 +56,12 @@ public:
/// These object are created on pointer construction
using CassClusterPtr = Cassandra::ObjectHolder<CassCluster, cass_cluster_free, cass_cluster_new>;
using CassSessionPtr = Cassandra::ObjectHolder<CassSession, cass_session_free, cass_session_new>;
using CassStatementPtr = Cassandra::ObjectHolder<CassStatement, cass_statement_free, cass_statement_new>;
using CassSessionPtr = Cassandra::ObjectHolder<CassSession, cass_session_free, cass_session_new>;
/// Share connections between streams. Executing statements in one session object is thread-safe
using CassSessionShared = std::shared_ptr<CassSessionPtr>;
using CassSessionWeak = std::weak_ptr<CassSessionPtr>;
/// The following objects are created inside Cassandra driver library,
/// but must be freed by user code
@ -65,7 +71,7 @@ using CassIteratorPtr = Cassandra::ObjectHolder<CassIterator, cass_iterator_free
/// Checks return code, throws exception on error
void cassandraCheck(CassError code);
void cassandraWaitAndCheck(CassFuturePtr && future);
void cassandraWaitAndCheck(CassFuturePtr & future);
/// By default driver library prints logs to stderr.
/// It should be redirected (or, at least, disabled) before calling other functions from the library.

View File

@ -293,9 +293,13 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
if (!where.empty())
{
writeString("(", out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString("(", out);
writeString(where, out);
writeString(") AND (", out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString(") AND (", out);
else
writeString(" AND ", out);
}
if (method == AND_OR_CHAIN)
@ -333,7 +337,7 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
composeInWithTuples(key_columns, requested_rows, out, partition_key_prefix, key_columns.size());
}
if (!where.empty())
if (!where.empty() && method != CASSANDRA_SEPARATE_PARTITION_KEY)
{
writeString(")", out);
}

View File

@ -29,6 +29,5 @@ void registerDictionaryCache(DictionaryFactory & factory);
void registerDictionaryPolygon(DictionaryFactory & factory);
void registerDictionaryDirect(DictionaryFactory & factory);
void registerDictionaries();
}

View File

@ -270,9 +270,9 @@ class ClickHouseCluster:
if with_cassandra and not self.with_cassandra:
self.with_cassandra = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_cassandra.yml')])
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')])
self.base_cassandra_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_cassandra.yml')]
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')]
return instance

View File

@ -437,6 +437,7 @@ class SourceCassandra(ExternalSource):
<keyspace>test</keyspace>
<column_family>{table}</column_family>
<allow_filtering>1</allow_filtering>
<where>"Int64_" &lt; 1000000000000000000</where>
</cassandra>
'''.format(
host=self.docker_hostname,

View File

@ -3,12 +3,10 @@ import os
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMongo, SourceHTTP, SourceHTTPS, SourceRedis, SourceCassandra
from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis, SourceCassandra
import math
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
dict_configs_path = os.path.join(SCRIPT_DIR, 'configs/dictionaries')
@ -212,7 +210,6 @@ def get_dictionaries(fold, total_folds, all_dicts):
return all_dicts[fold * chunk_len : (fold + 1) * chunk_len]
#@pytest.mark.timeout(3000)
@pytest.mark.parametrize("fold", list(range(10)))
def test_simple_dictionaries(started_cluster, fold):
fields = FIELDS["simple"]