Merge pull request #11079 from ClickHouse/merging_external_source_cassandra

Merging #4978
This commit is contained in:
tavplubix 2020-06-10 10:39:18 +03:00 committed by GitHub
commit a93ae46ac1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1072 additions and 58 deletions

8
.gitmodules vendored
View File

@ -157,6 +157,14 @@
[submodule "contrib/openldap"] [submodule "contrib/openldap"]
path = contrib/openldap path = contrib/openldap
url = https://github.com/openldap/openldap.git url = https://github.com/openldap/openldap.git
[submodule "contrib/cassandra"]
path = contrib/cassandra
url = https://github.com/ClickHouse-Extras/cpp-driver.git
branch = clickhouse
[submodule "contrib/libuv"]
path = contrib/libuv
url = https://github.com/ClickHouse-Extras/libuv.git
branch = clickhouse
[submodule "contrib/fmtlib"] [submodule "contrib/fmtlib"]
path = contrib/fmtlib path = contrib/fmtlib
url = https://github.com/fmtlib/fmt.git url = https://github.com/fmtlib/fmt.git

View File

@ -360,6 +360,7 @@ include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake) include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake) include (cmake/find/avro.cmake)
include (cmake/find/msgpack.cmake) include (cmake/find/msgpack.cmake)
include (cmake/find/cassandra.cmake)
find_contrib_lib(cityhash) find_contrib_lib(cityhash)
find_contrib_lib(farmhash) find_contrib_lib(farmhash)

View File

@ -0,0 +1,26 @@
option(ENABLE_CASSANDRA "Enable Cassandra" ${ENABLE_LIBRARIES})
if (ENABLE_CASSANDRA)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libuv")
message (ERROR "submodule contrib/libuv is missing. to fix try run: \n git submodule update --init --recursive")
elseif (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
message (ERROR "submodule contrib/cassandra is missing. to fix try run: \n git submodule update --init --recursive")
else()
set (LIBUV_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libuv")
set (CASSANDRA_INCLUDE_DIR
"${ClickHouse_SOURCE_DIR}/contrib/cassandra/include/")
if (USE_STATIC_LIBRARIES)
set (LIBUV_LIBRARY uv_a)
set (CASSANDRA_LIBRARY cassandra_static)
else()
set (LIBUV_LIBRARY uv)
set (CASSANDRA_LIBRARY cassandra)
endif()
set (USE_CASSANDRA 1)
set (CASS_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
endif()
endif()
message (STATUS "Using cassandra=${USE_CASSANDRA}: ${CASSANDRA_INCLUDE_DIR} : ${CASSANDRA_LIBRARY}")
message (STATUS "Using libuv: ${LIBUV_ROOT_DIR} : ${LIBUV_LIBRARY}")

View File

@ -295,4 +295,10 @@ if (USE_FASTOPS)
add_subdirectory (fastops-cmake) add_subdirectory (fastops-cmake)
endif() endif()
if (USE_CASSANDRA)
add_subdirectory (libuv)
add_subdirectory (cassandra)
endif()
add_subdirectory (fmtlib-cmake) add_subdirectory (fmtlib-cmake)

1
contrib/cassandra vendored Submodule

@ -0,0 +1 @@
Subproject commit a49b4e0e2696a4b8ef286a5b9538d1cbe8490509

1
contrib/libuv vendored Submodule

@ -0,0 +1 @@
Subproject commit 84438304f41d8ea6670ee5409f4d6c63ca784f28

View File

@ -0,0 +1,7 @@
version: '2.3'
services:
cassandra1:
image: cassandra
restart: always
ports:
- 9043:9042

View File

@ -625,4 +625,43 @@ Setting fields:
- `storage_type` The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`. - `storage_type` The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`.
- `db_index` The specific numeric index of Redis logical database. May be omitted, default value is 0. - `db_index` The specific numeric index of Redis logical database. May be omitted, default value is 0.
### Cassandra {#dicts-external_dicts_dict_sources-cassandra}
Example of settings:
```xml
<source>
<cassandra>
<host>localhost</host>
<port>9042</port>
<user>username</user>
<password>qwerty123</password>
<keyspase>database_name</keyspase>
<column_family>table_name</column_family>
<allow_filering>1</allow_filering>
<partition_key_prefix>1</partition_key_prefix>
<consistency>One</consistency>
<where>"SomeColumn" = 42</where>
<max_threads>8</max_threads>
</cassandra>
</source>
```
Setting fields:
- `host` The Cassandra host or comma-separated list of hosts.
- `port` The port on the Cassandra servers. If not specified, default port is used.
- `user` Name of the Cassandra user.
- `password` Password of the Cassandra user.
- `keyspace` Name of the keyspace (database).
- `column_family` Name of the column family (table).
- `allow_filering` Flag to allow or not potentially expensive conditions on clustering key columns. Default value is 1.
- `partition_key_prefix` Number of partition key columns in primary key of the Cassandra table.
Required for compose key dictionaries. Order of key columns in the dictionary definition must be the same as in Cassandra.
Default value is 1 (the first key column is a partition key and other key columns are clustering key).
- `consistency` Consistency level. Possible values: `One`, `Two`, `Three`,
`All`, `EachQuorum`, `Quorum`, `LocalQuorum`, `LocalOne`, `Serial`, `LocalSerial`. Default is `One`.
- `where` Optional selection criteria.
- `max_threads` The maximum number of threads to use for loading data from multiple partitions in compose key dictionaries.
[Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts_dict_sources/) <!--hide--> [Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts_dict_sources/) <!--hide-->

View File

@ -352,6 +352,11 @@ if (USE_OPENCL)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${OpenCL_INCLUDE_DIRS}) target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${OpenCL_INCLUDE_DIRS})
endif () endif ()
if (USE_CASSANDRA)
dbms_target_link_libraries(PUBLIC ${CASSANDRA_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})
endif()
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR}) target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR}) target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})

View File

@ -495,6 +495,7 @@ namespace ErrorCodes
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524;
extern const int INCORRECT_DISK_INDEX = 525; extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526; extern const int UNKNOWN_VOLUME_TYPE = 526;
extern const int CASSANDRA_INTERNAL_ERROR = 527;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -9,5 +9,6 @@
#cmakedefine01 USE_BROTLI #cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND #cmakedefine01 USE_UNWIND
#cmakedefine01 USE_OPENCL #cmakedefine01 USE_OPENCL
#cmakedefine01 USE_CASSANDRA
#cmakedefine01 USE_GRPC #cmakedefine01 USE_GRPC
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY #cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -21,6 +21,10 @@ target_link_libraries(clickhouse_dictionaries
string_utils string_utils
) )
if(USE_CASSANDRA)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${CASSANDRA_INCLUDE_DIR})
endif()
add_subdirectory(Embedded) add_subdirectory(Embedded)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR}) target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR})

View File

@ -0,0 +1,274 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_CASSANDRA
#include <utility>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ExternalResultDescription.h>
#include <IO/ReadHelpers.h>
#include "CassandraBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
CassandraBlockInputStream::CassandraBlockInputStream(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size_)
: session(session_)
, statement(query_str.c_str(), /*parameters count*/ 0)
, max_block_size(max_block_size_)
, has_more_pages(cass_true)
{
description.init(sample_block);
cassandraCheck(cass_statement_set_paging_size(statement, max_block_size));
}
void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value)
{
switch (type)
{
case ValueType::vtUInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnUInt8 &>(column).insertValue(static_cast<UInt8>(value));
break;
}
case ValueType::vtUInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtUInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value));
break;
}
case ValueType::vtUInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt64 &>(column).insertValue(static_cast<UInt64>(value));
break;
}
case ValueType::vtInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
}
case ValueType::vtInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
}
case ValueType::vtInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
}
case ValueType::vtInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat32:
{
cass_float_t value;
cass_value_get_float(cass_value, &value);
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat64:
{
cass_double_t value;
cass_value_get_double(cass_value, &value);
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
}
case ValueType::vtString:
{
const char * value = nullptr;
size_t value_length;
cass_value_get_string(cass_value, &value, &value_length);
assert_cast<ColumnString &>(column).insertData(value, value_length);
break;
}
case ValueType::vtDate:
{
cass_uint32_t value;
cass_value_get_uint32(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtDateTime:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value / 1000));
break;
}
case ValueType::vtUUID:
{
CassUuid value;
cass_value_get_uuid(cass_value, &value);
std::array<char, CASS_UUID_STRING_LENGTH> uuid_str;
cass_uuid_string(value, uuid_str.data());
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
}
}
void CassandraBlockInputStream::readPrefix()
{
result_future = cass_session_execute(*session, statement);
}
Block CassandraBlockInputStream::readImpl()
{
if (!has_more_pages)
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
cassandraWaitAndCheck(result_future);
CassResultPtr result = cass_future_get_result(result_future);
assert(cass_result_column_count(result) == columns.size());
assertTypes(result);
has_more_pages = cass_result_has_more_pages(result);
if (has_more_pages)
{
cassandraCheck(cass_statement_set_paging_state(statement, result));
result_future = cass_session_execute(*session, statement);
}
CassIteratorPtr rows_iter = cass_iterator_from_result(result); /// Points to rows[-1]
while (cass_iterator_next(rows_iter))
{
const CassRow * row = cass_iterator_get_row(rows_iter);
for (size_t col_idx = 0; col_idx < columns.size(); ++col_idx)
{
const CassValue * val = cass_row_get_column(row, col_idx);
if (cass_value_is_null(val))
columns[col_idx]->insertDefault();
else if (description.types[col_idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[col_idx]);
insertValue(column_nullable.getNestedColumn(), description.types[col_idx].first, val);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[col_idx], description.types[col_idx].first, val);
}
}
assert(cass_result_row_count(result) == columns.front()->size());
return description.sample_block.cloneWithColumns(std::move(columns));
}
void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
{
if (!assert_types)
return;
size_t column_count = cass_result_column_count(result);
for (size_t i = 0; i < column_count; ++i)
{
CassValueType expected = CASS_VALUE_TYPE_UNKNOWN;
String expected_text;
/// Cassandra does not support unsigned integers (cass_uint32_t is for Date)
switch (description.types[i].first)
{
case ExternalResultDescription::ValueType::vtInt8:
case ExternalResultDescription::ValueType::vtUInt8:
expected = CASS_VALUE_TYPE_TINY_INT;
expected_text = "tinyint";
break;
case ExternalResultDescription::ValueType::vtInt16:
case ExternalResultDescription::ValueType::vtUInt16:
expected = CASS_VALUE_TYPE_SMALL_INT;
expected_text = "smallint";
break;
case ExternalResultDescription::ValueType::vtUInt32:
case ExternalResultDescription::ValueType::vtInt32:
expected = CASS_VALUE_TYPE_INT;
expected_text = "int";
break;
case ExternalResultDescription::ValueType::vtInt64:
case ExternalResultDescription::ValueType::vtUInt64:
expected = CASS_VALUE_TYPE_BIGINT;
expected_text = "bigint";
break;
case ExternalResultDescription::ValueType::vtFloat32:
expected = CASS_VALUE_TYPE_FLOAT;
expected_text = "float";
break;
case ExternalResultDescription::ValueType::vtFloat64:
expected = CASS_VALUE_TYPE_DOUBLE;
expected_text = "double";
break;
case ExternalResultDescription::ValueType::vtString:
expected = CASS_VALUE_TYPE_TEXT;
expected_text = "text, ascii or varchar";
break;
case ExternalResultDescription::ValueType::vtDate:
expected = CASS_VALUE_TYPE_DATE;
expected_text = "date";
break;
case ExternalResultDescription::ValueType::vtDateTime:
expected = CASS_VALUE_TYPE_TIMESTAMP;
expected_text = "timestamp";
break;
case ExternalResultDescription::ValueType::vtUUID:
expected = CASS_VALUE_TYPE_UUID;
expected_text = "uuid";
break;
}
CassValueType got = cass_result_column_type(result, i);
if (got != expected)
{
if (expected == CASS_VALUE_TYPE_TEXT && (got == CASS_VALUE_TYPE_ASCII || got == CASS_VALUE_TYPE_VARCHAR))
continue;
const auto & column_name = description.sample_block.getColumnsWithTypeAndName()[i].name;
throw Exception("Type mismatch for column " + column_name + ": expected Cassandra type " + expected_text,
ErrorCodes::TYPE_MISMATCH);
}
}
assert_types = false;
}
}
#endif

View File

@ -0,0 +1,47 @@
#pragma once
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
namespace DB
{
class CassandraBlockInputStream final : public IBlockInputStream
{
public:
CassandraBlockInputStream(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size);
String getName() const override { return "Cassandra"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
private:
using ValueType = ExternalResultDescription::ValueType;
Block readImpl() override;
static void insertValue(IColumn & column, ValueType type, const CassValue * cass_value);
void assertTypes(const CassResultPtr & result);
CassSessionShared session;
CassStatementPtr statement;
CassFuturePtr result_future;
const size_t max_block_size;
ExternalResultDescription description;
cass_bool_t has_more_pages;
bool assert_types = true;
};
}
#endif

View File

@ -0,0 +1,211 @@
#include "CassandraDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int NOT_IMPLEMENTED;
}
void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
{
auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
const Context & /* context */,
bool /*check_config*/) -> DictionarySourcePtr
{
#if USE_CASSANDRA
setupCassandraDriverLibraryLogging(CASS_LOG_INFO);
return std::make_unique<CassandraDictionarySource>(dict_struct, config, config_prefix + ".cassandra", sample_block);
#else
throw Exception{"Dictionary source of type `cassandra` is disabled because ClickHouse was built without cassandra support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("cassandra", create_table_source);
}
}
#if USE_CASSANDRA
#include <IO/WriteHelpers.h>
#include <Common/SipHash.h>
#include "CassandraBlockInputStream.h"
#include <common/logger_useful.h>
#include <DataStreams/UnionBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
}
CassandraSettings::CassandraSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: host(config.getString(config_prefix + ".host"))
, port(config.getUInt(config_prefix + ".port", 0))
, user(config.getString(config_prefix + ".user", ""))
, password(config.getString(config_prefix + ".password", ""))
, db(config.getString(config_prefix + ".keyspace"))
, table(config.getString(config_prefix + ".column_family"))
, allow_filtering(config.getBool(config_prefix + ".allow_filtering", false))
, partition_key_prefix(config.getUInt(config_prefix + ".partition_key_prefix", 1))
, max_threads(config.getUInt(config_prefix + ".max_threads", 8))
, where(config.getString(config_prefix + ".where", ""))
{
setConsistency(config.getString(config_prefix + ".consistency", "One"));
}
void CassandraSettings::setConsistency(const String & config_str)
{
if (config_str == "One")
consistency = CASS_CONSISTENCY_ONE;
else if (config_str == "Two")
consistency = CASS_CONSISTENCY_TWO;
else if (config_str == "Three")
consistency = CASS_CONSISTENCY_THREE;
else if (config_str == "All")
consistency = CASS_CONSISTENCY_ALL;
else if (config_str == "EachQuorum")
consistency = CASS_CONSISTENCY_EACH_QUORUM;
else if (config_str == "Quorum")
consistency = CASS_CONSISTENCY_QUORUM;
else if (config_str == "LocalQuorum")
consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
else if (config_str == "LocalOne")
consistency = CASS_CONSISTENCY_LOCAL_ONE;
else if (config_str == "Serial")
consistency = CASS_CONSISTENCY_SERIAL;
else if (config_str == "LocalSerial")
consistency = CASS_CONSISTENCY_LOCAL_SERIAL;
else /// CASS_CONSISTENCY_ANY is only valid for writes
throw Exception("Unsupported consistency level: " + config_str, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
static const size_t max_block_size = 8192;
CassandraDictionarySource::CassandraDictionarySource(
const DictionaryStructure & dict_struct_,
const CassandraSettings & settings_,
const Block & sample_block_)
: log(&Poco::Logger::get("CassandraDictionarySource"))
, dict_struct(dict_struct_)
, settings(settings_)
, sample_block(sample_block_)
, query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
{
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
if (settings.port)
cassandraCheck(cass_cluster_set_port(cluster, settings.port));
cass_cluster_set_credentials(cluster, settings.user.c_str(), settings.password.c_str());
cassandraCheck(cass_cluster_set_consistency(cluster, settings.consistency));
}
CassandraDictionarySource::CassandraDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block_)
: CassandraDictionarySource(
dict_struct_,
CassandraSettings(config, config_prefix),
sample_block_)
{
}
void CassandraDictionarySource::maybeAllowFiltering(String & query) const
{
if (!settings.allow_filtering)
return;
query.pop_back(); /// remove semicolon
query += " ALLOW FILTERING;";
}
BlockInputStreamPtr CassandraDictionarySource::loadAll()
{
String query = query_builder.composeLoadAllQuery();
maybeAllowFiltering(query);
LOG_INFO(log, "Loading all using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
std::string CassandraDictionarySource::toString() const
{
return "Cassandra: " + settings.db + '.' + settings.table;
}
BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
String query = query_builder.composeLoadIdsQuery(ids);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading ids using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (requested_rows.empty())
throw Exception("No rows requested", ErrorCodes::LOGICAL_ERROR);
/// TODO is there a better way to load data by complex keys?
std::unordered_map<UInt64, std::vector<size_t>> partitions;
for (const auto & row : requested_rows)
{
SipHash partition_key;
for (size_t i = 0; i < settings.partition_key_prefix; ++i)
key_columns[i]->updateHashWithValue(row, partition_key);
partitions[partition_key.get64()].push_back(row);
}
BlockInputStreams streams;
for (const auto & partition : partitions)
{
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
streams.push_back(std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size));
}
if (streams.size() == 1)
return streams.front();
return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
}
BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll()
{
throw Exception("Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED);
}
CassSessionShared CassandraDictionarySource::getSession()
{
/// Reuse connection if exists, create new one if not
auto session = maybe_session.lock();
if (session)
return session;
std::lock_guard lock(connect_mutex);
session = maybe_session.lock();
if (session)
return session;
session = std::make_shared<CassSessionPtr>();
CassFuturePtr future = cass_session_connect(*session, cluster);
cassandraWaitAndCheck(future);
maybe_session = session;
return session;
}
}
#endif

View File

@ -0,0 +1,89 @@
#pragma once
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <Poco/Logger.h>
#include <mutex>
namespace DB
{
struct CassandraSettings
{
String host;
UInt16 port;
String user;
String password;
String db;
String table;
CassConsistency consistency;
bool allow_filtering;
/// TODO get information about key from the driver
size_t partition_key_prefix;
size_t max_threads;
String where;
CassandraSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
void setConsistency(const String & config_str);
};
class CassandraDictionarySource final : public IDictionarySource
{
public:
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
const CassandraSettings & settings_,
const Block & sample_block);
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block);
BlockInputStreamPtr loadAll() override;
bool supportsSelectiveLoad() const override { return true; }
bool isModified() const override { return true; }
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override
{
return std::make_unique<CassandraDictionarySource>(dict_struct, settings, sample_block);
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadUpdatedAll() override;
String toString() const override;
private:
void maybeAllowFiltering(String & query) const;
CassSessionShared getSession();
Poco::Logger * log;
const DictionaryStructure dict_struct;
const CassandraSettings settings;
Block sample_block;
ExternalQueryBuilder query_builder;
std::mutex connect_mutex;
CassClusterPtr cluster;
CassSessionWeak maybe_session;
};
}
#endif

View File

@ -0,0 +1,68 @@
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int CASSANDRA_INTERNAL_ERROR;
}
void cassandraCheck(CassError code)
{
if (code != CASS_OK)
throw Exception("Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code),
ErrorCodes::CASSANDRA_INTERNAL_ERROR);
}
void cassandraWaitAndCheck(CassFuturePtr & future)
{
auto code = cass_future_error_code(future); /// Waits if not ready
if (code == CASS_OK)
return;
/// `future` owns `message` and will free it on destruction
const char * message;
size_t message_len;
cass_future_error_message(future, &message, & message_len);
std::string full_message = "Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code) + ": " + message;
throw Exception(full_message, ErrorCodes::CASSANDRA_INTERNAL_ERROR);
}
static std::once_flag setup_logging_flag;
void setupCassandraDriverLibraryLogging(CassLogLevel level)
{
std::call_once(setup_logging_flag, [level]()
{
Poco::Logger * logger = &Poco::Logger::get("CassandraDriverLibrary");
cass_log_set_level(level);
if (level != CASS_LOG_DISABLED)
cass_log_set_callback(cassandraLogCallback, logger);
});
}
void cassandraLogCallback(const CassLogMessage * message, void * data)
{
Poco::Logger * logger = static_cast<Poco::Logger *>(data);
if (message->severity == CASS_LOG_CRITICAL || message->severity == CASS_LOG_ERROR)
LOG_ERROR(logger, message->message);
else if (message->severity == CASS_LOG_WARN)
LOG_WARNING(logger, message->message);
else if (message->severity == CASS_LOG_INFO)
LOG_INFO(logger, message->message);
else if (message->severity == CASS_LOG_DEBUG)
LOG_DEBUG(logger, message->message);
else if (message->severity == CASS_LOG_TRACE)
LOG_TRACE(logger, message->message);
}
}
#endif

View File

@ -0,0 +1,84 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_CASSANDRA
#include <cassandra.h> // Y_IGNORE
#include <utility>
#include <memory>
namespace DB
{
namespace Cassandra
{
template<typename CassT>
CassT * defaultCtor() { return nullptr; }
/// RAII wrapper for raw pointers to objects from cassandra driver library
template<typename CassT, auto Dtor, auto Ctor = defaultCtor<CassT>>
class ObjectHolder
{
CassT * ptr = nullptr;
public:
template<typename... Args>
ObjectHolder(Args &&... args) : ptr(Ctor(std::forward<Args>(args)...)) {}
ObjectHolder(CassT * ptr_) : ptr(ptr_) {}
ObjectHolder(const ObjectHolder &) = delete;
ObjectHolder & operator = (const ObjectHolder &) = delete;
ObjectHolder(ObjectHolder && rhs) noexcept : ptr(rhs.ptr) { rhs.ptr = nullptr; }
ObjectHolder & operator = (ObjectHolder && rhs) noexcept
{
if (ptr)
Dtor(ptr);
ptr = rhs.ptr;
rhs.ptr = nullptr;
return *this;
}
~ObjectHolder()
{
if (ptr)
Dtor(ptr);
}
/// For implicit conversion when passing object to driver library functions
operator CassT * () { return ptr; }
operator const CassT * () const { return ptr; }
};
}
/// These object are created on pointer construction
using CassClusterPtr = Cassandra::ObjectHolder<CassCluster, cass_cluster_free, cass_cluster_new>;
using CassStatementPtr = Cassandra::ObjectHolder<CassStatement, cass_statement_free, cass_statement_new>;
using CassSessionPtr = Cassandra::ObjectHolder<CassSession, cass_session_free, cass_session_new>;
/// Share connections between streams. Executing statements in one session object is thread-safe
using CassSessionShared = std::shared_ptr<CassSessionPtr>;
using CassSessionWeak = std::weak_ptr<CassSessionPtr>;
/// The following objects are created inside Cassandra driver library,
/// but must be freed by user code
using CassFuturePtr = Cassandra::ObjectHolder<CassFuture, cass_future_free>;
using CassResultPtr = Cassandra::ObjectHolder<const CassResult, cass_result_free>;
using CassIteratorPtr = Cassandra::ObjectHolder<CassIterator, cass_iterator_free>;
/// Checks return code, throws exception on error
void cassandraCheck(CassError code);
void cassandraWaitAndCheck(CassFuturePtr & future);
/// By default driver library prints logs to stderr.
/// It should be redirected (or, at least, disabled) before calling other functions from the library.
void setupCassandraDriverLibraryLogging(CassLogLevel level);
void cassandraLogCallback(const CassLogMessage * message, void * data);
}
#endif

View File

@ -63,6 +63,13 @@ void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out)
std::string ExternalQueryBuilder::composeLoadAllQuery() const std::string ExternalQueryBuilder::composeLoadAllQuery() const
{ {
WriteBufferFromOwnString out; WriteBufferFromOwnString out;
composeLoadAllQuery(out);
writeChar(';', out);
return out.str();
}
void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const
{
writeString("SELECT ", out); writeString("SELECT ", out);
if (dict_struct.id) if (dict_struct.id)
@ -149,24 +156,26 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeString(" WHERE ", out); writeString(" WHERE ", out);
writeString(where, out); writeString(where, out);
} }
writeChar(';', out);
return out.str();
} }
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_field, const std::string & time_point) const std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_field, const std::string & time_point) const
{ {
std::string out = composeLoadAllQuery(); WriteBufferFromOwnString out;
std::string update_query; composeLoadAllQuery(out);
if (!where.empty()) if (!where.empty())
update_query = " AND " + update_field + " >= '" + time_point + "'"; writeString(" AND ", out);
else else
update_query = " WHERE " + update_field + " >= '" + time_point + "'"; writeString(" WHERE ", out);
return out.insert(out.size() - 1, update_query); /// This is done to insert "update_query" before "out"'s semicolon writeQuoted(update_field, out);
writeString(" >= '", out);
writeString(time_point, out);
writeChar('\'', out);
writeChar(';', out);
return out.str();
} }
@ -241,7 +250,7 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
std::string std::string
ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method) ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix)
{ {
if (!dict_struct.key) if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD}; throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
@ -284,9 +293,13 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
if (!where.empty()) if (!where.empty())
{ {
writeString("(", out); if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString("(", out);
writeString(where, out); writeString(where, out);
writeString(") AND (", out); if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString(") AND (", out);
else
writeString(" AND ", out);
} }
if (method == AND_OR_CHAIN) if (method == AND_OR_CHAIN)
@ -298,28 +311,33 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
writeString(" OR ", out); writeString(" OR ", out);
first = false; first = false;
composeKeyCondition(key_columns, row, out);
writeString("(", out);
composeKeyCondition(key_columns, row, out, 0, key_columns.size());
writeString(")", out);
} }
} }
else /* if (method == IN_WITH_TUPLES) */ else if (method == IN_WITH_TUPLES)
{ {
writeString(composeKeyTupleDefinition(), out); composeInWithTuples(key_columns, requested_rows, out, 0, key_columns.size());
writeString(" IN (", out); }
else /* if (method == CASSANDRA_SEPARATE_PARTITION_KEY) */
first = true; {
for (const auto row : requested_rows) /// CQL does not allow using OR conditions
{ /// and does not allow using multi-column IN expressions with partition key columns.
if (!first) /// So we have to use multiple queries with conditions like
writeString(", ", out); /// (partition_key_1 = val1 AND partition_key_2 = val2 ...) AND (clustering_key_1, ...) IN ((val3, ...), ...)
/// for each partition key.
first = false; /// `partition_key_prefix` is a number of columns from partition key.
composeKeyTuple(key_columns, row, out); /// All `requested_rows` must have the same values of partition key.
} composeKeyCondition(key_columns, requested_rows.at(0), out, 0, partition_key_prefix);
if (partition_key_prefix && partition_key_prefix < key_columns.size())
writeString(")", out); writeString(" AND ", out);
if (partition_key_prefix < key_columns.size())
composeInWithTuples(key_columns, requested_rows, out, partition_key_prefix, key_columns.size());
} }
if (!where.empty()) if (!where.empty() && method != CASSANDRA_SEPARATE_PARTITION_KEY)
{ {
writeString(")", out); writeString(")", out);
} }
@ -330,13 +348,11 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
} }
void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out,
size_t beg, size_t end) const
{ {
writeString("(", out);
const auto keys_size = key_columns.size();
auto first = true; auto first = true;
for (const auto i : ext::range(0, keys_size)) for (const auto i : ext::range(beg, end))
{ {
if (!first) if (!first)
writeString(" AND ", out); writeString(" AND ", out);
@ -346,45 +362,60 @@ void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, cons
const auto & key_description = (*dict_struct.key)[i]; const auto & key_description = (*dict_struct.key)[i];
/// key_i=value_i /// key_i=value_i
writeString(key_description.name, out); writeQuoted(key_description.name, out);
writeString("=", out); writeString("=", out);
key_description.type->serializeAsTextQuoted(*key_columns[i], row, out, format_settings); key_description.type->serializeAsTextQuoted(*key_columns[i], row, out, format_settings);
} }
}
void ExternalQueryBuilder::composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows,
WriteBuffer & out, size_t beg, size_t end)
{
composeKeyTupleDefinition(out, beg, end);
writeString(" IN (", out);
bool first = true;
for (const auto row : requested_rows)
{
if (!first)
writeString(", ", out);
first = false;
composeKeyTuple(key_columns, row, out, beg, end);
}
writeString(")", out); writeString(")", out);
} }
std::string ExternalQueryBuilder::composeKeyTupleDefinition() const void ExternalQueryBuilder::composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const
{ {
if (!dict_struct.key) if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD}; throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
std::string result{"("}; writeChar('(', out);
auto first = true; auto first = true;
for (const auto & key : *dict_struct.key) for (const auto i : ext::range(beg, end))
{ {
if (!first) if (!first)
result += ", "; writeString(", ", out);
first = false; first = false;
result += key.name; writeQuoted((*dict_struct.key)[i].name, out);
} }
result += ")"; writeChar(')', out);
return result;
} }
void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const
{ {
writeString("(", out); writeString("(", out);
const auto keys_size = key_columns.size();
auto first = true; auto first = true;
for (const auto i : ext::range(0, keys_size)) for (const auto i : ext::range(beg, end))
{ {
if (!first) if (!first)
writeString(", ", out); writeString(", ", out);

View File

@ -42,30 +42,39 @@ struct ExternalQueryBuilder
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids); std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);
/** Generate a query to load data by set of composite keys. /** Generate a query to load data by set of composite keys.
* There are two methods of specification of composite keys in WHERE: * There are three methods of specification of composite keys in WHERE:
* 1. (x = c11 AND y = c12) OR (x = c21 AND y = c22) ... * 1. (x = c11 AND y = c12) OR (x = c21 AND y = c22) ...
* 2. (x, y) IN ((c11, c12), (c21, c22), ...) * 2. (x, y) IN ((c11, c12), (c21, c22), ...)
* 3. (x = c1 AND (y, z) IN ((c2, c3), ...))
*/ */
enum LoadKeysMethod enum LoadKeysMethod
{ {
AND_OR_CHAIN, AND_OR_CHAIN,
IN_WITH_TUPLES, IN_WITH_TUPLES,
CASSANDRA_SEPARATE_PARTITION_KEY,
}; };
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method); std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix = 0);
private: private:
const FormatSettings format_settings; const FormatSettings format_settings;
void composeLoadAllQuery(WriteBuffer & out) const;
/// In the following methods `beg` and `end` specifies which columns to write in expression
/// Expression in form (x = c1 AND y = c2 ...) /// Expression in form (x = c1 AND y = c2 ...)
void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const; void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (x, y, ...) IN ((c1, c2, ...), ...)
void composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows, WriteBuffer & out, size_t beg, size_t end);
/// Expression in form (x, y, ...) /// Expression in form (x, y, ...)
std::string composeKeyTupleDefinition() const; void composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (c1, c2, ...) /// Expression in form (c1, c2, ...)
void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const; void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Write string with specified quoting style. /// Write string with specified quoting style.
void writeQuoted(const std::string & s, WriteBuffer & out) const; void writeQuoted(const std::string & s, WriteBuffer & out) const;

View File

@ -13,6 +13,7 @@ void registerDictionaries()
registerDictionarySourceClickHouse(source_factory); registerDictionarySourceClickHouse(source_factory);
registerDictionarySourceMongoDB(source_factory); registerDictionarySourceMongoDB(source_factory);
registerDictionarySourceRedis(source_factory); registerDictionarySourceRedis(source_factory);
registerDictionarySourceCassandra(source_factory);
registerDictionarySourceXDBC(source_factory); registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory); registerDictionarySourceJDBC(source_factory);
registerDictionarySourceExecutable(source_factory); registerDictionarySourceExecutable(source_factory);

View File

@ -9,6 +9,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory); void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory); void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory); void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory); void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory); void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory); void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);

View File

@ -17,6 +17,9 @@ SRCS(
CacheDictionary_generate1.cpp CacheDictionary_generate1.cpp
CacheDictionary_generate2.cpp CacheDictionary_generate2.cpp
CacheDictionary_generate3.cpp CacheDictionary_generate3.cpp
CassandraBlockInputStream.cpp
CassandraDictionarySource.cpp
CassandraHelpers.cpp
ClickHouseDictionarySource.cpp ClickHouseDictionarySource.cpp
ComplexKeyCacheDictionary.cpp ComplexKeyCacheDictionary.cpp
ComplexKeyCacheDictionary_createAttributeWithType.cpp ComplexKeyCacheDictionary_createAttributeWithType.cpp

View File

@ -19,6 +19,7 @@ import pprint
import psycopg2 import psycopg2
import pymongo import pymongo
import pymysql import pymysql
import cassandra.cluster
from dicttoxml import dicttoxml from dicttoxml import dicttoxml
from kazoo.client import KazooClient from kazoo.client import KazooClient
from kazoo.exceptions import KazooException from kazoo.exceptions import KazooException
@ -108,6 +109,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None self.base_zookeeper_cmd = None
self.base_mysql_cmd = [] self.base_mysql_cmd = []
self.base_kafka_cmd = [] self.base_kafka_cmd = []
self.base_cassandra_cmd = []
self.pre_zookeeper_commands = [] self.pre_zookeeper_commands = []
self.instances = {} self.instances = {}
self.with_zookeeper = False self.with_zookeeper = False
@ -119,6 +121,7 @@ class ClickHouseCluster:
self.with_mongo = False self.with_mongo = False
self.with_net_trics = False self.with_net_trics = False
self.with_redis = False self.with_redis = False
self.with_cassandra = False
self.with_minio = False self.with_minio = False
self.minio_host = "minio1" self.minio_host = "minio1"
@ -147,7 +150,7 @@ class ClickHouseCluster:
def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None, def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None, with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_redis=False, with_minio=False, with_cassandra=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None, stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
zookeeper_docker_compose_path=None, zookeeper_use_tmpfs=True): zookeeper_docker_compose_path=None, zookeeper_use_tmpfs=True):
@ -169,7 +172,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance( instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {}, self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
with_zookeeper, with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
self.base_configs_dir, self.server_bin_path, self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname, self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
@ -265,6 +268,12 @@ class ClickHouseCluster:
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_minio.yml')] self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_minio.yml')]
cmds.append(self.base_minio_cmd) cmds.append(self.base_minio_cmd)
if with_cassandra and not self.with_cassandra:
self.with_cassandra = True
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')])
self.base_cassandra_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')]
return instance return instance
def get_instance_docker_id(self, instance_name): def get_instance_docker_id(self, instance_name):
@ -451,6 +460,18 @@ class ClickHouseCluster:
logging.warning("Can't connect to SchemaRegistry: %s", str(ex)) logging.warning("Can't connect to SchemaRegistry: %s", str(ex))
time.sleep(1) time.sleep(1)
def wait_cassandra_to_start(self, timeout=30):
cass_client = cassandra.cluster.Cluster(["localhost"], port="9043")
start = time.time()
while time.time() - start < timeout:
try:
cass_client.connect()
logging.info("Connected to Cassandra")
return
except Exception as ex:
logging.warning("Can't connect to Cassandra: %s", str(ex))
time.sleep(1)
def start(self, destroy_dirs=True): def start(self, destroy_dirs=True):
if self.is_up: if self.is_up:
return return
@ -527,6 +548,10 @@ class ClickHouseCluster:
logging.info("Trying to connect to Minio...") logging.info("Trying to connect to Minio...")
self.wait_minio_to_start() self.wait_minio_to_start()
if self.with_cassandra and self.base_cassandra_cmd:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d', '--force-recreate'])
self.wait_cassandra_to_start()
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate'] clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd))) logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))
subprocess_check_call(clickhouse_start_cmd) subprocess_check_call(clickhouse_start_cmd)
@ -656,7 +681,7 @@ class ClickHouseInstance:
def __init__( def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros, self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
base_configs_dir, server_bin_path, odbc_bridge_bin_path, base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None, clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test", image="yandex/clickhouse-integration-test",
@ -686,6 +711,7 @@ class ClickHouseInstance:
self.with_mongo = with_mongo self.with_mongo = with_mongo
self.with_redis = with_redis self.with_redis = with_redis
self.with_minio = with_minio self.with_minio = with_minio
self.with_cassandra = with_cassandra
self.path = p.join(self.cluster.instances_dir, name) self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml') self.docker_compose_path = p.join(self.path, 'docker_compose.yml')

View File

@ -2,11 +2,13 @@
import warnings import warnings
import pymysql.cursors import pymysql.cursors
import pymongo import pymongo
import cassandra.cluster
import redis import redis
import aerospike import aerospike
from tzlocal import get_localzone from tzlocal import get_localzone
import datetime import datetime
import os import os
import uuid
class ExternalSource(object): class ExternalSource(object):
@ -405,6 +407,73 @@ class SourceHTTPS(SourceHTTPBase):
def _get_schema(self): def _get_schema(self):
return "https" return "https"
class SourceCassandra(ExternalSource):
TYPE_MAPPING = {
'UInt8': 'tinyint',
'UInt16': 'smallint',
'UInt32': 'int',
'UInt64': 'bigint',
'Int8': 'tinyint',
'Int16': 'smallint',
'Int32': 'int',
'Int64': 'bigint',
'UUID': 'uuid',
'Date': 'date',
'DateTime': 'timestamp',
'String': 'text',
'Float32': 'float',
'Float64': 'double'
}
def __init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password):
ExternalSource.__init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password)
self.structure = dict()
def get_source_str(self, table_name):
return '''
<cassandra>
<host>{host}</host>
<port>{port}</port>
<keyspace>test</keyspace>
<column_family>{table}</column_family>
<allow_filtering>1</allow_filtering>
<where>"Int64_" &lt; 1000000000000000000</where>
</cassandra>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
table=table_name,
)
def prepare(self, structure, table_name, cluster):
self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port)
self.session = self.client.connect()
self.session.execute("create keyspace if not exists test with replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};")
self.session.execute('drop table if exists test."{}"'.format(table_name))
self.structure[table_name] = structure
columns = ['"' + col.name + '" ' + self.TYPE_MAPPING[col.field_type] for col in structure.get_all_fields()]
keys = ['"' + col.name + '"' for col in structure.keys]
query = 'create table test."{name}" ({columns}, primary key ({pk}));'.format(
name=table_name, columns=', '.join(columns), pk=', '.join(keys))
self.session.execute(query)
self.prepared = True
def get_value_to_insert(self, value, type):
if type == 'UUID':
return uuid.UUID(value)
elif type == 'DateTime':
local_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
return get_localzone().localize(local_datetime)
return value
def load_data(self, data, table_name):
names_and_types = [(field.name, field.field_type) for field in self.structure[table_name].get_all_fields()]
columns = ['"' + col[0] + '"' for col in names_and_types]
insert = 'insert into test."{table}" ({columns}) values ({args})'.format(
table=table_name, columns=','.join(columns), args=','.join(['%s']*len(columns)))
for row in data:
values = [self.get_value_to_insert(row.get_value_by_name(col[0]), col[1]) for col in names_and_types]
self.session.execute(insert, values)
class SourceRedis(ExternalSource): class SourceRedis(ExternalSource):
def __init__( def __init__(

View File

@ -4,7 +4,7 @@ import os
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis, SourceCassandra
import math import math
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -117,6 +117,7 @@ LAYOUTS = [
] ]
SOURCES = [ SOURCES = [
SourceCassandra("Cassandra", "localhost", "9043", "cassandra1", "9042", "", ""),
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMongoURI("MongoDB_URI", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"), SourceMongoURI("MongoDB_URI", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"), SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
@ -131,7 +132,7 @@ SOURCES = [
DICTIONARIES = [] DICTIONARIES = []
# Key-value dictionaries with onle one possible field for key # Key-value dictionaries with only one possible field for key
SOURCES_KV = [ SOURCES_KV = [
SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"), SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"),
SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"), SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"),
@ -183,7 +184,7 @@ def setup_module(module):
for fname in os.listdir(dict_configs_path): for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname)) main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs')) cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True) node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True, with_cassandra=True)
cluster.add_instance('clickhouse1') cluster.add_instance('clickhouse1')

View File

@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0 ENABLE_EMBEDDED_COMPILER=0
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0" CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_CASSANDRA=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin [[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin