diff --git a/.gitmodules b/.gitmodules
index 7f5d1307a6e..c05da0c9ff9 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -157,6 +157,14 @@
[submodule "contrib/openldap"]
path = contrib/openldap
url = https://github.com/openldap/openldap.git
+[submodule "contrib/cassandra"]
+ path = contrib/cassandra
+ url = https://github.com/ClickHouse-Extras/cpp-driver.git
+ branch = clickhouse
+[submodule "contrib/libuv"]
+ path = contrib/libuv
+ url = https://github.com/ClickHouse-Extras/libuv.git
+ branch = clickhouse
[submodule "contrib/fmtlib"]
path = contrib/fmtlib
url = https://github.com/fmtlib/fmt.git
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7b3d7676d0e..4683bf8dec1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -360,6 +360,7 @@ include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
include (cmake/find/msgpack.cmake)
+include (cmake/find/cassandra.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
diff --git a/cmake/find/cassandra.cmake b/cmake/find/cassandra.cmake
new file mode 100644
index 00000000000..f41e0f645f4
--- /dev/null
+++ b/cmake/find/cassandra.cmake
@@ -0,0 +1,26 @@
+option(ENABLE_CASSANDRA "Enable Cassandra" ${ENABLE_LIBRARIES})
+
+if (ENABLE_CASSANDRA)
+ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libuv")
+ message (ERROR "submodule contrib/libuv is missing. to fix try run: \n git submodule update --init --recursive")
+ elseif (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
+ message (ERROR "submodule contrib/cassandra is missing. to fix try run: \n git submodule update --init --recursive")
+ else()
+ set (LIBUV_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libuv")
+ set (CASSANDRA_INCLUDE_DIR
+ "${ClickHouse_SOURCE_DIR}/contrib/cassandra/include/")
+ if (USE_STATIC_LIBRARIES)
+ set (LIBUV_LIBRARY uv_a)
+ set (CASSANDRA_LIBRARY cassandra_static)
+ else()
+ set (LIBUV_LIBRARY uv)
+ set (CASSANDRA_LIBRARY cassandra)
+ endif()
+ set (USE_CASSANDRA 1)
+ set (CASS_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
+
+ endif()
+endif()
+
+message (STATUS "Using cassandra=${USE_CASSANDRA}: ${CASSANDRA_INCLUDE_DIR} : ${CASSANDRA_LIBRARY}")
+message (STATUS "Using libuv: ${LIBUV_ROOT_DIR} : ${LIBUV_LIBRARY}")
diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt
index 2b0fba43348..b8029124712 100644
--- a/contrib/CMakeLists.txt
+++ b/contrib/CMakeLists.txt
@@ -295,4 +295,10 @@ if (USE_FASTOPS)
add_subdirectory (fastops-cmake)
endif()
+if (USE_CASSANDRA)
+ add_subdirectory (libuv)
+ add_subdirectory (cassandra)
+endif()
+
add_subdirectory (fmtlib-cmake)
+
diff --git a/contrib/cassandra b/contrib/cassandra
new file mode 160000
index 00000000000..a49b4e0e269
--- /dev/null
+++ b/contrib/cassandra
@@ -0,0 +1 @@
+Subproject commit a49b4e0e2696a4b8ef286a5b9538d1cbe8490509
diff --git a/contrib/libuv b/contrib/libuv
new file mode 160000
index 00000000000..84438304f41
--- /dev/null
+++ b/contrib/libuv
@@ -0,0 +1 @@
+Subproject commit 84438304f41d8ea6670ee5409f4d6c63ca784f28
diff --git a/docker/test/integration/compose/docker_compose_cassandra.yml b/docker/test/integration/compose/docker_compose_cassandra.yml
new file mode 100644
index 00000000000..6567a352027
--- /dev/null
+++ b/docker/test/integration/compose/docker_compose_cassandra.yml
@@ -0,0 +1,7 @@
+version: '2.3'
+services:
+ cassandra1:
+ image: cassandra
+ restart: always
+ ports:
+ - 9043:9042
diff --git a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md
index b79a34f401a..71b719ce996 100644
--- a/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md
+++ b/docs/en/sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md
@@ -625,4 +625,43 @@ Setting fields:
- `storage_type` – The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`.
- `db_index` – The specific numeric index of Redis logical database. May be omitted, default value is 0.
+### Cassandra {#dicts-external_dicts_dict_sources-cassandra}
+
+Example of settings:
+
+```xml
+
+```
+
+Setting fields:
+- `host` – The Cassandra host or comma-separated list of hosts.
+- `port` – The port on the Cassandra servers. If not specified, default port is used.
+- `user` – Name of the Cassandra user.
+- `password` – Password of the Cassandra user.
+- `keyspace` – Name of the keyspace (database).
+- `column_family` – Name of the column family (table).
+- `allow_filering` – Flag to allow or not potentially expensive conditions on clustering key columns. Default value is 1.
+- `partition_key_prefix` – Number of partition key columns in primary key of the Cassandra table.
+ Required for compose key dictionaries. Order of key columns in the dictionary definition must be the same as in Cassandra.
+ Default value is 1 (the first key column is a partition key and other key columns are clustering key).
+- `consistency` – Consistency level. Possible values: `One`, `Two`, `Three`,
+ `All`, `EachQuorum`, `Quorum`, `LocalQuorum`, `LocalOne`, `Serial`, `LocalSerial`. Default is `One`.
+- `where` – Optional selection criteria.
+- `max_threads` – The maximum number of threads to use for loading data from multiple partitions in compose key dictionaries.
+
+
[Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts_dict_sources/)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4d947ecdae5..fe223373cf3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -352,6 +352,11 @@ if (USE_OPENCL)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${OpenCL_INCLUDE_DIRS})
endif ()
+if (USE_CASSANDRA)
+ dbms_target_link_libraries(PUBLIC ${CASSANDRA_LIBRARY})
+ dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})
+endif()
+
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})
diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp
index f29140a64ec..694f0979f63 100644
--- a/src/Common/ErrorCodes.cpp
+++ b/src/Common/ErrorCodes.cpp
@@ -495,6 +495,7 @@ namespace ErrorCodes
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524;
extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526;
+ extern const int CASSANDRA_INTERNAL_ERROR = 527;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
diff --git a/src/Common/config.h.in b/src/Common/config.h.in
index 237ca81ff9e..e6bc46256e0 100644
--- a/src/Common/config.h.in
+++ b/src/Common/config.h.in
@@ -9,5 +9,6 @@
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND
#cmakedefine01 USE_OPENCL
+#cmakedefine01 USE_CASSANDRA
#cmakedefine01 USE_GRPC
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
diff --git a/src/Dictionaries/CMakeLists.txt b/src/Dictionaries/CMakeLists.txt
index 4471b093add..0eb3c5f44d6 100644
--- a/src/Dictionaries/CMakeLists.txt
+++ b/src/Dictionaries/CMakeLists.txt
@@ -21,6 +21,10 @@ target_link_libraries(clickhouse_dictionaries
string_utils
)
+if(USE_CASSANDRA)
+ target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${CASSANDRA_INCLUDE_DIR})
+endif()
+
add_subdirectory(Embedded)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR})
diff --git a/src/Dictionaries/CassandraBlockInputStream.cpp b/src/Dictionaries/CassandraBlockInputStream.cpp
new file mode 100644
index 00000000000..4f6a62a0eea
--- /dev/null
+++ b/src/Dictionaries/CassandraBlockInputStream.cpp
@@ -0,0 +1,274 @@
+#if !defined(ARCADIA_BUILD)
+#include
+#endif
+
+#if USE_CASSANDRA
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include "CassandraBlockInputStream.h"
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int TYPE_MISMATCH;
+}
+
+CassandraBlockInputStream::CassandraBlockInputStream(
+ const CassSessionShared & session_,
+ const String & query_str,
+ const Block & sample_block,
+ size_t max_block_size_)
+ : session(session_)
+ , statement(query_str.c_str(), /*parameters count*/ 0)
+ , max_block_size(max_block_size_)
+ , has_more_pages(cass_true)
+{
+ description.init(sample_block);
+ cassandraCheck(cass_statement_set_paging_size(statement, max_block_size));
+}
+
+void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value)
+{
+ switch (type)
+ {
+ case ValueType::vtUInt8:
+ {
+ cass_int8_t value;
+ cass_value_get_int8(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value));
+ break;
+ }
+ case ValueType::vtUInt16:
+ {
+ cass_int16_t value;
+ cass_value_get_int16(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value));
+ break;
+ }
+ case ValueType::vtUInt32:
+ {
+ cass_int32_t value;
+ cass_value_get_int32(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value));
+ break;
+ }
+ case ValueType::vtUInt64:
+ {
+ cass_int64_t value;
+ cass_value_get_int64(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value));
+ break;
+ }
+ case ValueType::vtInt8:
+ {
+ cass_int8_t value;
+ cass_value_get_int8(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtInt16:
+ {
+ cass_int16_t value;
+ cass_value_get_int16(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtInt32:
+ {
+ cass_int32_t value;
+ cass_value_get_int32(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtInt64:
+ {
+ cass_int64_t value;
+ cass_value_get_int64(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtFloat32:
+ {
+ cass_float_t value;
+ cass_value_get_float(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtFloat64:
+ {
+ cass_double_t value;
+ cass_value_get_double(cass_value, &value);
+ assert_cast(column).insertValue(value);
+ break;
+ }
+ case ValueType::vtString:
+ {
+ const char * value = nullptr;
+ size_t value_length;
+ cass_value_get_string(cass_value, &value, &value_length);
+ assert_cast(column).insertData(value, value_length);
+ break;
+ }
+ case ValueType::vtDate:
+ {
+ cass_uint32_t value;
+ cass_value_get_uint32(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value));
+ break;
+ }
+ case ValueType::vtDateTime:
+ {
+ cass_int64_t value;
+ cass_value_get_int64(cass_value, &value);
+ assert_cast(column).insertValue(static_cast(value / 1000));
+ break;
+ }
+ case ValueType::vtUUID:
+ {
+ CassUuid value;
+ cass_value_get_uuid(cass_value, &value);
+ std::array uuid_str;
+ cass_uuid_string(value, uuid_str.data());
+ assert_cast(column).insert(parse(uuid_str.data(), uuid_str.size()));
+ break;
+ }
+ }
+}
+
+void CassandraBlockInputStream::readPrefix()
+{
+ result_future = cass_session_execute(*session, statement);
+}
+
+Block CassandraBlockInputStream::readImpl()
+{
+ if (!has_more_pages)
+ return {};
+
+ MutableColumns columns = description.sample_block.cloneEmptyColumns();
+
+ cassandraWaitAndCheck(result_future);
+ CassResultPtr result = cass_future_get_result(result_future);
+
+ assert(cass_result_column_count(result) == columns.size());
+
+ assertTypes(result);
+
+ has_more_pages = cass_result_has_more_pages(result);
+ if (has_more_pages)
+ {
+ cassandraCheck(cass_statement_set_paging_state(statement, result));
+ result_future = cass_session_execute(*session, statement);
+ }
+
+ CassIteratorPtr rows_iter = cass_iterator_from_result(result); /// Points to rows[-1]
+ while (cass_iterator_next(rows_iter))
+ {
+ const CassRow * row = cass_iterator_get_row(rows_iter);
+ for (size_t col_idx = 0; col_idx < columns.size(); ++col_idx)
+ {
+ const CassValue * val = cass_row_get_column(row, col_idx);
+ if (cass_value_is_null(val))
+ columns[col_idx]->insertDefault();
+ else if (description.types[col_idx].second)
+ {
+ ColumnNullable & column_nullable = assert_cast(*columns[col_idx]);
+ insertValue(column_nullable.getNestedColumn(), description.types[col_idx].first, val);
+ column_nullable.getNullMapData().emplace_back(0);
+ }
+ else
+ insertValue(*columns[col_idx], description.types[col_idx].first, val);
+ }
+ }
+
+ assert(cass_result_row_count(result) == columns.front()->size());
+
+ return description.sample_block.cloneWithColumns(std::move(columns));
+}
+
+void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
+{
+ if (!assert_types)
+ return;
+
+ size_t column_count = cass_result_column_count(result);
+ for (size_t i = 0; i < column_count; ++i)
+ {
+ CassValueType expected = CASS_VALUE_TYPE_UNKNOWN;
+ String expected_text;
+
+ /// Cassandra does not support unsigned integers (cass_uint32_t is for Date)
+ switch (description.types[i].first)
+ {
+ case ExternalResultDescription::ValueType::vtInt8:
+ case ExternalResultDescription::ValueType::vtUInt8:
+ expected = CASS_VALUE_TYPE_TINY_INT;
+ expected_text = "tinyint";
+ break;
+ case ExternalResultDescription::ValueType::vtInt16:
+ case ExternalResultDescription::ValueType::vtUInt16:
+ expected = CASS_VALUE_TYPE_SMALL_INT;
+ expected_text = "smallint";
+ break;
+ case ExternalResultDescription::ValueType::vtUInt32:
+ case ExternalResultDescription::ValueType::vtInt32:
+ expected = CASS_VALUE_TYPE_INT;
+ expected_text = "int";
+ break;
+ case ExternalResultDescription::ValueType::vtInt64:
+ case ExternalResultDescription::ValueType::vtUInt64:
+ expected = CASS_VALUE_TYPE_BIGINT;
+ expected_text = "bigint";
+ break;
+ case ExternalResultDescription::ValueType::vtFloat32:
+ expected = CASS_VALUE_TYPE_FLOAT;
+ expected_text = "float";
+ break;
+ case ExternalResultDescription::ValueType::vtFloat64:
+ expected = CASS_VALUE_TYPE_DOUBLE;
+ expected_text = "double";
+ break;
+ case ExternalResultDescription::ValueType::vtString:
+ expected = CASS_VALUE_TYPE_TEXT;
+ expected_text = "text, ascii or varchar";
+ break;
+ case ExternalResultDescription::ValueType::vtDate:
+ expected = CASS_VALUE_TYPE_DATE;
+ expected_text = "date";
+ break;
+ case ExternalResultDescription::ValueType::vtDateTime:
+ expected = CASS_VALUE_TYPE_TIMESTAMP;
+ expected_text = "timestamp";
+ break;
+ case ExternalResultDescription::ValueType::vtUUID:
+ expected = CASS_VALUE_TYPE_UUID;
+ expected_text = "uuid";
+ break;
+ }
+
+ CassValueType got = cass_result_column_type(result, i);
+
+ if (got != expected)
+ {
+ if (expected == CASS_VALUE_TYPE_TEXT && (got == CASS_VALUE_TYPE_ASCII || got == CASS_VALUE_TYPE_VARCHAR))
+ continue;
+
+ const auto & column_name = description.sample_block.getColumnsWithTypeAndName()[i].name;
+ throw Exception("Type mismatch for column " + column_name + ": expected Cassandra type " + expected_text,
+ ErrorCodes::TYPE_MISMATCH);
+ }
+ }
+
+ assert_types = false;
+}
+
+}
+#endif
diff --git a/src/Dictionaries/CassandraBlockInputStream.h b/src/Dictionaries/CassandraBlockInputStream.h
new file mode 100644
index 00000000000..3b0e583e3ad
--- /dev/null
+++ b/src/Dictionaries/CassandraBlockInputStream.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include
+
+#if USE_CASSANDRA
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+class CassandraBlockInputStream final : public IBlockInputStream
+{
+public:
+ CassandraBlockInputStream(
+ const CassSessionShared & session_,
+ const String & query_str,
+ const Block & sample_block,
+ size_t max_block_size);
+
+ String getName() const override { return "Cassandra"; }
+
+ Block getHeader() const override { return description.sample_block.cloneEmpty(); }
+
+ void readPrefix() override;
+
+private:
+ using ValueType = ExternalResultDescription::ValueType;
+
+ Block readImpl() override;
+ static void insertValue(IColumn & column, ValueType type, const CassValue * cass_value);
+ void assertTypes(const CassResultPtr & result);
+
+ CassSessionShared session;
+ CassStatementPtr statement;
+ CassFuturePtr result_future;
+ const size_t max_block_size;
+ ExternalResultDescription description;
+ cass_bool_t has_more_pages;
+ bool assert_types = true;
+};
+
+}
+
+#endif
diff --git a/src/Dictionaries/CassandraDictionarySource.cpp b/src/Dictionaries/CassandraDictionarySource.cpp
new file mode 100644
index 00000000000..c41f528db91
--- /dev/null
+++ b/src/Dictionaries/CassandraDictionarySource.cpp
@@ -0,0 +1,211 @@
+#include "CassandraDictionarySource.h"
+#include "DictionarySourceFactory.h"
+#include "DictionaryStructure.h"
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int SUPPORT_IS_DISABLED;
+ extern const int NOT_IMPLEMENTED;
+}
+
+void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
+{
+ auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
+ [[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
+ [[maybe_unused]] const std::string & config_prefix,
+ [[maybe_unused]] Block & sample_block,
+ const Context & /* context */,
+ bool /*check_config*/) -> DictionarySourcePtr
+ {
+#if USE_CASSANDRA
+ setupCassandraDriverLibraryLogging(CASS_LOG_INFO);
+ return std::make_unique(dict_struct, config, config_prefix + ".cassandra", sample_block);
+#else
+ throw Exception{"Dictionary source of type `cassandra` is disabled because ClickHouse was built without cassandra support.",
+ ErrorCodes::SUPPORT_IS_DISABLED};
+#endif
+ };
+ factory.registerSource("cassandra", create_table_source);
+}
+
+}
+
+#if USE_CASSANDRA
+
+#include
+#include
+#include "CassandraBlockInputStream.h"
+#include
+#include
+
+namespace DB
+{
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+ extern const int INVALID_CONFIG_PARAMETER;
+}
+
+CassandraSettings::CassandraSettings(
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix)
+ : host(config.getString(config_prefix + ".host"))
+ , port(config.getUInt(config_prefix + ".port", 0))
+ , user(config.getString(config_prefix + ".user", ""))
+ , password(config.getString(config_prefix + ".password", ""))
+ , db(config.getString(config_prefix + ".keyspace"))
+ , table(config.getString(config_prefix + ".column_family"))
+ , allow_filtering(config.getBool(config_prefix + ".allow_filtering", false))
+ , partition_key_prefix(config.getUInt(config_prefix + ".partition_key_prefix", 1))
+ , max_threads(config.getUInt(config_prefix + ".max_threads", 8))
+ , where(config.getString(config_prefix + ".where", ""))
+{
+ setConsistency(config.getString(config_prefix + ".consistency", "One"));
+}
+
+void CassandraSettings::setConsistency(const String & config_str)
+{
+ if (config_str == "One")
+ consistency = CASS_CONSISTENCY_ONE;
+ else if (config_str == "Two")
+ consistency = CASS_CONSISTENCY_TWO;
+ else if (config_str == "Three")
+ consistency = CASS_CONSISTENCY_THREE;
+ else if (config_str == "All")
+ consistency = CASS_CONSISTENCY_ALL;
+ else if (config_str == "EachQuorum")
+ consistency = CASS_CONSISTENCY_EACH_QUORUM;
+ else if (config_str == "Quorum")
+ consistency = CASS_CONSISTENCY_QUORUM;
+ else if (config_str == "LocalQuorum")
+ consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
+ else if (config_str == "LocalOne")
+ consistency = CASS_CONSISTENCY_LOCAL_ONE;
+ else if (config_str == "Serial")
+ consistency = CASS_CONSISTENCY_SERIAL;
+ else if (config_str == "LocalSerial")
+ consistency = CASS_CONSISTENCY_LOCAL_SERIAL;
+ else /// CASS_CONSISTENCY_ANY is only valid for writes
+ throw Exception("Unsupported consistency level: " + config_str, ErrorCodes::INVALID_CONFIG_PARAMETER);
+}
+
+static const size_t max_block_size = 8192;
+
+CassandraDictionarySource::CassandraDictionarySource(
+ const DictionaryStructure & dict_struct_,
+ const CassandraSettings & settings_,
+ const Block & sample_block_)
+ : log(&Poco::Logger::get("CassandraDictionarySource"))
+ , dict_struct(dict_struct_)
+ , settings(settings_)
+ , sample_block(sample_block_)
+ , query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
+{
+ cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
+ if (settings.port)
+ cassandraCheck(cass_cluster_set_port(cluster, settings.port));
+ cass_cluster_set_credentials(cluster, settings.user.c_str(), settings.password.c_str());
+ cassandraCheck(cass_cluster_set_consistency(cluster, settings.consistency));
+}
+
+CassandraDictionarySource::CassandraDictionarySource(
+ const DictionaryStructure & dict_struct_,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ Block & sample_block_)
+ : CassandraDictionarySource(
+ dict_struct_,
+ CassandraSettings(config, config_prefix),
+ sample_block_)
+{
+}
+
+void CassandraDictionarySource::maybeAllowFiltering(String & query) const
+{
+ if (!settings.allow_filtering)
+ return;
+ query.pop_back(); /// remove semicolon
+ query += " ALLOW FILTERING;";
+}
+
+BlockInputStreamPtr CassandraDictionarySource::loadAll()
+{
+ String query = query_builder.composeLoadAllQuery();
+ maybeAllowFiltering(query);
+ LOG_INFO(log, "Loading all using query: {}", query);
+ return std::make_shared(getSession(), query, sample_block, max_block_size);
+}
+
+std::string CassandraDictionarySource::toString() const
+{
+ return "Cassandra: " + settings.db + '.' + settings.table;
+}
+
+BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector & ids)
+{
+ String query = query_builder.composeLoadIdsQuery(ids);
+ maybeAllowFiltering(query);
+ LOG_INFO(log, "Loading ids using query: {}", query);
+ return std::make_shared(getSession(), query, sample_block, max_block_size);
+}
+
+BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector & requested_rows)
+{
+ if (requested_rows.empty())
+ throw Exception("No rows requested", ErrorCodes::LOGICAL_ERROR);
+
+ /// TODO is there a better way to load data by complex keys?
+ std::unordered_map> partitions;
+ for (const auto & row : requested_rows)
+ {
+ SipHash partition_key;
+ for (size_t i = 0; i < settings.partition_key_prefix; ++i)
+ key_columns[i]->updateHashWithValue(row, partition_key);
+ partitions[partition_key.get64()].push_back(row);
+ }
+
+ BlockInputStreams streams;
+ for (const auto & partition : partitions)
+ {
+ String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
+ maybeAllowFiltering(query);
+ LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
+ streams.push_back(std::make_shared(getSession(), query, sample_block, max_block_size));
+ }
+
+ if (streams.size() == 1)
+ return streams.front();
+
+ return std::make_shared(streams, nullptr, settings.max_threads);
+}
+
+BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll()
+{
+ throw Exception("Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED);
+}
+
+CassSessionShared CassandraDictionarySource::getSession()
+{
+ /// Reuse connection if exists, create new one if not
+ auto session = maybe_session.lock();
+ if (session)
+ return session;
+
+ std::lock_guard lock(connect_mutex);
+ session = maybe_session.lock();
+ if (session)
+ return session;
+
+ session = std::make_shared();
+ CassFuturePtr future = cass_session_connect(*session, cluster);
+ cassandraWaitAndCheck(future);
+ maybe_session = session;
+ return session;
+}
+
+}
+
+#endif
diff --git a/src/Dictionaries/CassandraDictionarySource.h b/src/Dictionaries/CassandraDictionarySource.h
new file mode 100644
index 00000000000..c0a4e774d23
--- /dev/null
+++ b/src/Dictionaries/CassandraDictionarySource.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include
+
+#if USE_CASSANDRA
+
+#include "DictionaryStructure.h"
+#include "IDictionarySource.h"
+#include "ExternalQueryBuilder.h"
+#include
+#include
+#include
+
+namespace DB
+{
+
+struct CassandraSettings
+{
+ String host;
+ UInt16 port;
+ String user;
+ String password;
+ String db;
+ String table;
+
+ CassConsistency consistency;
+ bool allow_filtering;
+ /// TODO get information about key from the driver
+ size_t partition_key_prefix;
+ size_t max_threads;
+ String where;
+
+ CassandraSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
+
+ void setConsistency(const String & config_str);
+};
+
+class CassandraDictionarySource final : public IDictionarySource
+{
+public:
+ CassandraDictionarySource(
+ const DictionaryStructure & dict_struct,
+ const CassandraSettings & settings_,
+ const Block & sample_block);
+
+ CassandraDictionarySource(
+ const DictionaryStructure & dict_struct,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ Block & sample_block);
+
+ BlockInputStreamPtr loadAll() override;
+
+ bool supportsSelectiveLoad() const override { return true; }
+
+ bool isModified() const override { return true; }
+
+ bool hasUpdateField() const override { return false; }
+
+ DictionarySourcePtr clone() const override
+ {
+ return std::make_unique(dict_struct, settings, sample_block);
+ }
+
+ BlockInputStreamPtr loadIds(const std::vector & ids) override;
+
+ BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector & requested_rows) override;
+
+ BlockInputStreamPtr loadUpdatedAll() override;
+
+ String toString() const override;
+
+private:
+ void maybeAllowFiltering(String & query) const;
+ CassSessionShared getSession();
+
+ Poco::Logger * log;
+ const DictionaryStructure dict_struct;
+ const CassandraSettings settings;
+ Block sample_block;
+ ExternalQueryBuilder query_builder;
+
+ std::mutex connect_mutex;
+ CassClusterPtr cluster;
+ CassSessionWeak maybe_session;
+};
+}
+
+#endif
diff --git a/src/Dictionaries/CassandraHelpers.cpp b/src/Dictionaries/CassandraHelpers.cpp
new file mode 100644
index 00000000000..6de80a455c7
--- /dev/null
+++ b/src/Dictionaries/CassandraHelpers.cpp
@@ -0,0 +1,68 @@
+#include
+
+#if USE_CASSANDRA
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int CASSANDRA_INTERNAL_ERROR;
+}
+
+void cassandraCheck(CassError code)
+{
+ if (code != CASS_OK)
+ throw Exception("Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code),
+ ErrorCodes::CASSANDRA_INTERNAL_ERROR);
+}
+
+
+void cassandraWaitAndCheck(CassFuturePtr & future)
+{
+ auto code = cass_future_error_code(future); /// Waits if not ready
+ if (code == CASS_OK)
+ return;
+
+ /// `future` owns `message` and will free it on destruction
+ const char * message;
+ size_t message_len;
+ cass_future_error_message(future, &message, & message_len);
+ std::string full_message = "Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code) + ": " + message;
+ throw Exception(full_message, ErrorCodes::CASSANDRA_INTERNAL_ERROR);
+}
+
+static std::once_flag setup_logging_flag;
+
+void setupCassandraDriverLibraryLogging(CassLogLevel level)
+{
+ std::call_once(setup_logging_flag, [level]()
+ {
+ Poco::Logger * logger = &Poco::Logger::get("CassandraDriverLibrary");
+ cass_log_set_level(level);
+ if (level != CASS_LOG_DISABLED)
+ cass_log_set_callback(cassandraLogCallback, logger);
+ });
+}
+
+void cassandraLogCallback(const CassLogMessage * message, void * data)
+{
+ Poco::Logger * logger = static_cast(data);
+ if (message->severity == CASS_LOG_CRITICAL || message->severity == CASS_LOG_ERROR)
+ LOG_ERROR(logger, message->message);
+ else if (message->severity == CASS_LOG_WARN)
+ LOG_WARNING(logger, message->message);
+ else if (message->severity == CASS_LOG_INFO)
+ LOG_INFO(logger, message->message);
+ else if (message->severity == CASS_LOG_DEBUG)
+ LOG_DEBUG(logger, message->message);
+ else if (message->severity == CASS_LOG_TRACE)
+ LOG_TRACE(logger, message->message);
+}
+
+}
+
+#endif
diff --git a/src/Dictionaries/CassandraHelpers.h b/src/Dictionaries/CassandraHelpers.h
new file mode 100644
index 00000000000..8a00e372c96
--- /dev/null
+++ b/src/Dictionaries/CassandraHelpers.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#if !defined(ARCADIA_BUILD)
+#include
+#endif
+
+#if USE_CASSANDRA
+#include // Y_IGNORE
+#include
+#include
+
+namespace DB
+{
+
+namespace Cassandra
+{
+
+template
+CassT * defaultCtor() { return nullptr; }
+
+/// RAII wrapper for raw pointers to objects from cassandra driver library
+template>
+class ObjectHolder
+{
+ CassT * ptr = nullptr;
+public:
+ template
+ ObjectHolder(Args &&... args) : ptr(Ctor(std::forward(args)...)) {}
+ ObjectHolder(CassT * ptr_) : ptr(ptr_) {}
+
+ ObjectHolder(const ObjectHolder &) = delete;
+ ObjectHolder & operator = (const ObjectHolder &) = delete;
+
+ ObjectHolder(ObjectHolder && rhs) noexcept : ptr(rhs.ptr) { rhs.ptr = nullptr; }
+ ObjectHolder & operator = (ObjectHolder && rhs) noexcept
+ {
+ if (ptr)
+ Dtor(ptr);
+ ptr = rhs.ptr;
+ rhs.ptr = nullptr;
+ return *this;
+ }
+
+ ~ObjectHolder()
+ {
+ if (ptr)
+ Dtor(ptr);
+ }
+
+ /// For implicit conversion when passing object to driver library functions
+ operator CassT * () { return ptr; }
+ operator const CassT * () const { return ptr; }
+};
+
+}
+
+/// These object are created on pointer construction
+using CassClusterPtr = Cassandra::ObjectHolder;
+using CassStatementPtr = Cassandra::ObjectHolder;
+using CassSessionPtr = Cassandra::ObjectHolder;
+
+/// Share connections between streams. Executing statements in one session object is thread-safe
+using CassSessionShared = std::shared_ptr;
+using CassSessionWeak = std::weak_ptr;
+
+/// The following objects are created inside Cassandra driver library,
+/// but must be freed by user code
+using CassFuturePtr = Cassandra::ObjectHolder;
+using CassResultPtr = Cassandra::ObjectHolder;
+using CassIteratorPtr = Cassandra::ObjectHolder;
+
+/// Checks return code, throws exception on error
+void cassandraCheck(CassError code);
+void cassandraWaitAndCheck(CassFuturePtr & future);
+
+/// By default driver library prints logs to stderr.
+/// It should be redirected (or, at least, disabled) before calling other functions from the library.
+void setupCassandraDriverLibraryLogging(CassLogLevel level);
+
+void cassandraLogCallback(const CassLogMessage * message, void * data);
+
+}
+
+#endif
diff --git a/src/Dictionaries/ExternalQueryBuilder.cpp b/src/Dictionaries/ExternalQueryBuilder.cpp
index 529fb3d60fa..e64f04d28f2 100644
--- a/src/Dictionaries/ExternalQueryBuilder.cpp
+++ b/src/Dictionaries/ExternalQueryBuilder.cpp
@@ -63,6 +63,13 @@ void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out)
std::string ExternalQueryBuilder::composeLoadAllQuery() const
{
WriteBufferFromOwnString out;
+ composeLoadAllQuery(out);
+ writeChar(';', out);
+ return out.str();
+}
+
+void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const
+{
writeString("SELECT ", out);
if (dict_struct.id)
@@ -149,24 +156,26 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeString(" WHERE ", out);
writeString(where, out);
}
-
- writeChar(';', out);
-
- return out.str();
}
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_field, const std::string & time_point) const
{
- std::string out = composeLoadAllQuery();
- std::string update_query;
+ WriteBufferFromOwnString out;
+ composeLoadAllQuery(out);
if (!where.empty())
- update_query = " AND " + update_field + " >= '" + time_point + "'";
+ writeString(" AND ", out);
else
- update_query = " WHERE " + update_field + " >= '" + time_point + "'";
+ writeString(" WHERE ", out);
- return out.insert(out.size() - 1, update_query); /// This is done to insert "update_query" before "out"'s semicolon
+ writeQuoted(update_field, out);
+ writeString(" >= '", out);
+ writeString(time_point, out);
+ writeChar('\'', out);
+
+ writeChar(';', out);
+ return out.str();
}
@@ -241,7 +250,7 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector
std::string
-ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector & requested_rows, LoadKeysMethod method)
+ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector & requested_rows, LoadKeysMethod method, size_t partition_key_prefix)
{
if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
@@ -284,9 +293,13 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
if (!where.empty())
{
- writeString("(", out);
+ if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
+ writeString("(", out);
writeString(where, out);
- writeString(") AND (", out);
+ if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
+ writeString(") AND (", out);
+ else
+ writeString(" AND ", out);
}
if (method == AND_OR_CHAIN)
@@ -298,28 +311,33 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
writeString(" OR ", out);
first = false;
- composeKeyCondition(key_columns, row, out);
+
+ writeString("(", out);
+ composeKeyCondition(key_columns, row, out, 0, key_columns.size());
+ writeString(")", out);
}
}
- else /* if (method == IN_WITH_TUPLES) */
+ else if (method == IN_WITH_TUPLES)
{
- writeString(composeKeyTupleDefinition(), out);
- writeString(" IN (", out);
-
- first = true;
- for (const auto row : requested_rows)
- {
- if (!first)
- writeString(", ", out);
-
- first = false;
- composeKeyTuple(key_columns, row, out);
- }
-
- writeString(")", out);
+ composeInWithTuples(key_columns, requested_rows, out, 0, key_columns.size());
+ }
+ else /* if (method == CASSANDRA_SEPARATE_PARTITION_KEY) */
+ {
+ /// CQL does not allow using OR conditions
+ /// and does not allow using multi-column IN expressions with partition key columns.
+ /// So we have to use multiple queries with conditions like
+ /// (partition_key_1 = val1 AND partition_key_2 = val2 ...) AND (clustering_key_1, ...) IN ((val3, ...), ...)
+ /// for each partition key.
+ /// `partition_key_prefix` is a number of columns from partition key.
+ /// All `requested_rows` must have the same values of partition key.
+ composeKeyCondition(key_columns, requested_rows.at(0), out, 0, partition_key_prefix);
+ if (partition_key_prefix && partition_key_prefix < key_columns.size())
+ writeString(" AND ", out);
+ if (partition_key_prefix < key_columns.size())
+ composeInWithTuples(key_columns, requested_rows, out, partition_key_prefix, key_columns.size());
}
- if (!where.empty())
+ if (!where.empty() && method != CASSANDRA_SEPARATE_PARTITION_KEY)
{
writeString(")", out);
}
@@ -330,13 +348,11 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
}
-void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const
+void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out,
+ size_t beg, size_t end) const
{
- writeString("(", out);
-
- const auto keys_size = key_columns.size();
auto first = true;
- for (const auto i : ext::range(0, keys_size))
+ for (const auto i : ext::range(beg, end))
{
if (!first)
writeString(" AND ", out);
@@ -346,45 +362,60 @@ void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, cons
const auto & key_description = (*dict_struct.key)[i];
/// key_i=value_i
- writeString(key_description.name, out);
+ writeQuoted(key_description.name, out);
writeString("=", out);
key_description.type->serializeAsTextQuoted(*key_columns[i], row, out, format_settings);
}
+}
+
+
+void ExternalQueryBuilder::composeInWithTuples(const Columns & key_columns, const std::vector & requested_rows,
+ WriteBuffer & out, size_t beg, size_t end)
+{
+ composeKeyTupleDefinition(out, beg, end);
+ writeString(" IN (", out);
+
+ bool first = true;
+ for (const auto row : requested_rows)
+ {
+ if (!first)
+ writeString(", ", out);
+
+ first = false;
+ composeKeyTuple(key_columns, row, out, beg, end);
+ }
writeString(")", out);
}
-std::string ExternalQueryBuilder::composeKeyTupleDefinition() const
+void ExternalQueryBuilder::composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const
{
if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
- std::string result{"("};
+ writeChar('(', out);
auto first = true;
- for (const auto & key : *dict_struct.key)
+ for (const auto i : ext::range(beg, end))
{
if (!first)
- result += ", ";
+ writeString(", ", out);
first = false;
- result += key.name;
+ writeQuoted((*dict_struct.key)[i].name, out);
}
- result += ")";
-
- return result;
+ writeChar(')', out);
}
-void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const
+void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const
{
writeString("(", out);
- const auto keys_size = key_columns.size();
auto first = true;
- for (const auto i : ext::range(0, keys_size))
+ for (const auto i : ext::range(beg, end))
{
if (!first)
writeString(", ", out);
diff --git a/src/Dictionaries/ExternalQueryBuilder.h b/src/Dictionaries/ExternalQueryBuilder.h
index 93e10f2d6b0..3011efbc895 100644
--- a/src/Dictionaries/ExternalQueryBuilder.h
+++ b/src/Dictionaries/ExternalQueryBuilder.h
@@ -42,30 +42,39 @@ struct ExternalQueryBuilder
std::string composeLoadIdsQuery(const std::vector & ids);
/** Generate a query to load data by set of composite keys.
- * There are two methods of specification of composite keys in WHERE:
+ * There are three methods of specification of composite keys in WHERE:
* 1. (x = c11 AND y = c12) OR (x = c21 AND y = c22) ...
* 2. (x, y) IN ((c11, c12), (c21, c22), ...)
+ * 3. (x = c1 AND (y, z) IN ((c2, c3), ...))
*/
enum LoadKeysMethod
{
AND_OR_CHAIN,
IN_WITH_TUPLES,
+ CASSANDRA_SEPARATE_PARTITION_KEY,
};
- std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector & requested_rows, LoadKeysMethod method);
+ std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector & requested_rows, LoadKeysMethod method, size_t partition_key_prefix = 0);
private:
const FormatSettings format_settings;
+ void composeLoadAllQuery(WriteBuffer & out) const;
+
+ /// In the following methods `beg` and `end` specifies which columns to write in expression
+
/// Expression in form (x = c1 AND y = c2 ...)
- void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const;
+ void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
+
+ /// Expression in form (x, y, ...) IN ((c1, c2, ...), ...)
+ void composeInWithTuples(const Columns & key_columns, const std::vector & requested_rows, WriteBuffer & out, size_t beg, size_t end);
/// Expression in form (x, y, ...)
- std::string composeKeyTupleDefinition() const;
+ void composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (c1, c2, ...)
- void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const;
+ void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Write string with specified quoting style.
void writeQuoted(const std::string & s, WriteBuffer & out) const;
diff --git a/src/Dictionaries/registerDictionaries.cpp b/src/Dictionaries/registerDictionaries.cpp
index ad6adbc86fb..8b2c984df6a 100644
--- a/src/Dictionaries/registerDictionaries.cpp
+++ b/src/Dictionaries/registerDictionaries.cpp
@@ -13,6 +13,7 @@ void registerDictionaries()
registerDictionarySourceClickHouse(source_factory);
registerDictionarySourceMongoDB(source_factory);
registerDictionarySourceRedis(source_factory);
+ registerDictionarySourceCassandra(source_factory);
registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory);
registerDictionarySourceExecutable(source_factory);
diff --git a/src/Dictionaries/registerDictionaries.h b/src/Dictionaries/registerDictionaries.h
index a3a4a175d41..37ba51a9ae3 100644
--- a/src/Dictionaries/registerDictionaries.h
+++ b/src/Dictionaries/registerDictionaries.h
@@ -9,6 +9,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
+void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);
diff --git a/src/Dictionaries/ya.make b/src/Dictionaries/ya.make
index 12983b9527a..3de623a9a8b 100644
--- a/src/Dictionaries/ya.make
+++ b/src/Dictionaries/ya.make
@@ -17,6 +17,9 @@ SRCS(
CacheDictionary_generate1.cpp
CacheDictionary_generate2.cpp
CacheDictionary_generate3.cpp
+ CassandraBlockInputStream.cpp
+ CassandraDictionarySource.cpp
+ CassandraHelpers.cpp
ClickHouseDictionarySource.cpp
ComplexKeyCacheDictionary.cpp
ComplexKeyCacheDictionary_createAttributeWithType.cpp
diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py
index 90337ed53c5..1c1f758b291 100644
--- a/tests/integration/helpers/cluster.py
+++ b/tests/integration/helpers/cluster.py
@@ -19,6 +19,7 @@ import pprint
import psycopg2
import pymongo
import pymysql
+import cassandra.cluster
from dicttoxml import dicttoxml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
@@ -108,6 +109,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
+ self.base_cassandra_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
@@ -119,6 +121,7 @@ class ClickHouseCluster:
self.with_mongo = False
self.with_net_trics = False
self.with_redis = False
+ self.with_cassandra = False
self.with_minio = False
self.minio_host = "minio1"
@@ -147,7 +150,7 @@ class ClickHouseCluster:
def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
- with_redis=False, with_minio=False,
+ with_redis=False, with_minio=False, with_cassandra=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
zookeeper_docker_compose_path=None, zookeeper_use_tmpfs=True):
@@ -169,7 +172,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
with_zookeeper,
- self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
+ self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
@@ -265,6 +268,12 @@ class ClickHouseCluster:
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_minio.yml')]
cmds.append(self.base_minio_cmd)
+ if with_cassandra and not self.with_cassandra:
+ self.with_cassandra = True
+ self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')])
+ self.base_cassandra_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
+ self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')]
+
return instance
def get_instance_docker_id(self, instance_name):
@@ -451,6 +460,18 @@ class ClickHouseCluster:
logging.warning("Can't connect to SchemaRegistry: %s", str(ex))
time.sleep(1)
+ def wait_cassandra_to_start(self, timeout=30):
+ cass_client = cassandra.cluster.Cluster(["localhost"], port="9043")
+ start = time.time()
+ while time.time() - start < timeout:
+ try:
+ cass_client.connect()
+ logging.info("Connected to Cassandra")
+ return
+ except Exception as ex:
+ logging.warning("Can't connect to Cassandra: %s", str(ex))
+ time.sleep(1)
+
def start(self, destroy_dirs=True):
if self.is_up:
return
@@ -527,6 +548,10 @@ class ClickHouseCluster:
logging.info("Trying to connect to Minio...")
self.wait_minio_to_start()
+ if self.with_cassandra and self.base_cassandra_cmd:
+ subprocess_check_call(self.base_cassandra_cmd + ['up', '-d', '--force-recreate'])
+ self.wait_cassandra_to_start()
+
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))
subprocess_check_call(clickhouse_start_cmd)
@@ -656,7 +681,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
- with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
+ with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test",
@@ -686,6 +711,7 @@ class ClickHouseInstance:
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
+ self.with_cassandra = with_cassandra
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
diff --git a/tests/integration/test_dictionaries_all_layouts_and_sources/external_sources.py b/tests/integration/test_dictionaries_all_layouts_and_sources/external_sources.py
index 7f8a480704c..f6985e7de54 100644
--- a/tests/integration/test_dictionaries_all_layouts_and_sources/external_sources.py
+++ b/tests/integration/test_dictionaries_all_layouts_and_sources/external_sources.py
@@ -2,11 +2,13 @@
import warnings
import pymysql.cursors
import pymongo
+import cassandra.cluster
import redis
import aerospike
from tzlocal import get_localzone
import datetime
import os
+import uuid
class ExternalSource(object):
@@ -405,6 +407,73 @@ class SourceHTTPS(SourceHTTPBase):
def _get_schema(self):
return "https"
+class SourceCassandra(ExternalSource):
+ TYPE_MAPPING = {
+ 'UInt8': 'tinyint',
+ 'UInt16': 'smallint',
+ 'UInt32': 'int',
+ 'UInt64': 'bigint',
+ 'Int8': 'tinyint',
+ 'Int16': 'smallint',
+ 'Int32': 'int',
+ 'Int64': 'bigint',
+ 'UUID': 'uuid',
+ 'Date': 'date',
+ 'DateTime': 'timestamp',
+ 'String': 'text',
+ 'Float32': 'float',
+ 'Float64': 'double'
+ }
+
+ def __init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password):
+ ExternalSource.__init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password)
+ self.structure = dict()
+
+ def get_source_str(self, table_name):
+ return '''
+
+ {host}
+ {port}
+ test
+ {table}
+ 1
+ "Int64_" < 1000000000000000000
+
+ '''.format(
+ host=self.docker_hostname,
+ port=self.docker_port,
+ table=table_name,
+ )
+
+ def prepare(self, structure, table_name, cluster):
+ self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port)
+ self.session = self.client.connect()
+ self.session.execute("create keyspace if not exists test with replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};")
+ self.session.execute('drop table if exists test."{}"'.format(table_name))
+ self.structure[table_name] = structure
+ columns = ['"' + col.name + '" ' + self.TYPE_MAPPING[col.field_type] for col in structure.get_all_fields()]
+ keys = ['"' + col.name + '"' for col in structure.keys]
+ query = 'create table test."{name}" ({columns}, primary key ({pk}));'.format(
+ name=table_name, columns=', '.join(columns), pk=', '.join(keys))
+ self.session.execute(query)
+ self.prepared = True
+
+ def get_value_to_insert(self, value, type):
+ if type == 'UUID':
+ return uuid.UUID(value)
+ elif type == 'DateTime':
+ local_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
+ return get_localzone().localize(local_datetime)
+ return value
+
+ def load_data(self, data, table_name):
+ names_and_types = [(field.name, field.field_type) for field in self.structure[table_name].get_all_fields()]
+ columns = ['"' + col[0] + '"' for col in names_and_types]
+ insert = 'insert into test."{table}" ({columns}) values ({args})'.format(
+ table=table_name, columns=','.join(columns), args=','.join(['%s']*len(columns)))
+ for row in data:
+ values = [self.get_value_to_insert(row.get_value_by_name(col[0]), col[1]) for col in names_and_types]
+ self.session.execute(insert, values)
class SourceRedis(ExternalSource):
def __init__(
diff --git a/tests/integration/test_dictionaries_all_layouts_and_sources/test.py b/tests/integration/test_dictionaries_all_layouts_and_sources/test.py
index 0aac0d27ff9..0a812ea2a8b 100644
--- a/tests/integration/test_dictionaries_all_layouts_and_sources/test.py
+++ b/tests/integration/test_dictionaries_all_layouts_and_sources/test.py
@@ -4,7 +4,7 @@ import os
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
-from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis
+from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis, SourceCassandra
import math
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -117,6 +117,7 @@ LAYOUTS = [
]
SOURCES = [
+ SourceCassandra("Cassandra", "localhost", "9043", "cassandra1", "9042", "", ""),
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMongoURI("MongoDB_URI", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
@@ -131,7 +132,7 @@ SOURCES = [
DICTIONARIES = []
-# Key-value dictionaries with onle one possible field for key
+# Key-value dictionaries with only one possible field for key
SOURCES_KV = [
SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"),
SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"),
@@ -183,7 +184,7 @@ def setup_module(module):
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
- node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True)
+ node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True, with_cassandra=True)
cluster.add_instance('clickhouse1')
diff --git a/utils/ci/jobs/quick-build/run.sh b/utils/ci/jobs/quick-build/run.sh
index 56f0950c717..10da06f7414 100755
--- a/utils/ci/jobs/quick-build/run.sh
+++ b/utils/ci/jobs/quick-build/run.sh
@@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0
-CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0"
+CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_CASSANDRA=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin