merge master

This commit is contained in:
Ivan Blinkov 2020-06-10 13:18:41 +03:00
commit 90ee4d52f4
56 changed files with 1612 additions and 119 deletions

8
.gitmodules vendored
View File

@ -157,6 +157,14 @@
[submodule "contrib/openldap"]
path = contrib/openldap
url = https://github.com/openldap/openldap.git
[submodule "contrib/cassandra"]
path = contrib/cassandra
url = https://github.com/ClickHouse-Extras/cpp-driver.git
branch = clickhouse
[submodule "contrib/libuv"]
path = contrib/libuv
url = https://github.com/ClickHouse-Extras/libuv.git
branch = clickhouse
[submodule "contrib/fmtlib"]
path = contrib/fmtlib
url = https://github.com/fmtlib/fmt.git

View File

@ -360,6 +360,7 @@ include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
include (cmake/find/msgpack.cmake)
include (cmake/find/cassandra.cmake)
include (cmake/find/sentry.cmake)
find_contrib_lib(cityhash)

View File

@ -0,0 +1,26 @@
option(ENABLE_CASSANDRA "Enable Cassandra" ${ENABLE_LIBRARIES})
if (ENABLE_CASSANDRA)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libuv")
message (ERROR "submodule contrib/libuv is missing. to fix try run: \n git submodule update --init --recursive")
elseif (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
message (ERROR "submodule contrib/cassandra is missing. to fix try run: \n git submodule update --init --recursive")
else()
set (LIBUV_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/libuv")
set (CASSANDRA_INCLUDE_DIR
"${ClickHouse_SOURCE_DIR}/contrib/cassandra/include/")
if (USE_STATIC_LIBRARIES)
set (LIBUV_LIBRARY uv_a)
set (CASSANDRA_LIBRARY cassandra_static)
else()
set (LIBUV_LIBRARY uv)
set (CASSANDRA_LIBRARY cassandra)
endif()
set (USE_CASSANDRA 1)
set (CASS_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/cassandra")
endif()
endif()
message (STATUS "Using cassandra=${USE_CASSANDRA}: ${CASSANDRA_INCLUDE_DIR} : ${CASSANDRA_LIBRARY}")
message (STATUS "Using libuv: ${LIBUV_ROOT_DIR} : ${LIBUV_LIBRARY}")

View File

@ -301,8 +301,14 @@ if (USE_FASTOPS)
add_subdirectory (fastops-cmake)
endif()
if (USE_CASSANDRA)
add_subdirectory (libuv)
add_subdirectory (cassandra)
endif()
if (USE_SENTRY)
add_subdirectory (sentry-native)
endif()
add_subdirectory (fmtlib-cmake)

1
contrib/cassandra vendored Submodule

@ -0,0 +1 @@
Subproject commit a49b4e0e2696a4b8ef286a5b9538d1cbe8490509

1
contrib/libuv vendored Submodule

@ -0,0 +1 @@
Subproject commit 84438304f41d8ea6670ee5409f4d6c63ca784f28

View File

@ -94,7 +94,7 @@ if [ -n "$(ls /docker-entrypoint-initdb.d/)" ] || [ -n "$CLICKHOUSE_DB" ]; then
# check if clickhouse is ready to accept connections
# will try to send ping clickhouse via http_port (max 12 retries, with 1 sec delay)
if ! wget --spider --quiet --tries=12 --waitretry=1 --retry-connrefused "http://localhost:$HTTP_PORT/ping" ; then
if ! wget --spider --quiet --prefer-family=IPv6 --tries=12 --waitretry=1 --retry-connrefused "http://localhost:$HTTP_PORT/ping" ; then
echo >&2 'ClickHouse init process failed.'
exit 1
fi

View File

@ -0,0 +1,7 @@
version: '2.3'
services:
cassandra1:
image: cassandra
restart: always
ports:
- 9043:9042

View File

@ -24,6 +24,8 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -59,7 +59,9 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/; \

View File

@ -62,7 +62,9 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -50,7 +50,9 @@ ln -s /usr/share/clickhouse-test/config/zookeeper.xml /etc/clickhouse-server/con
ln -s /usr/share/clickhouse-test/config/listen.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/text_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/metric_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/query_masking_rules.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/readonly.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/; \

View File

@ -31,6 +31,7 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
ln -s /usr/share/clickhouse-test/config/log_queries.xml /etc/clickhouse-server/users.d/; \
ln -s /usr/share/clickhouse-test/config/part_log.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/lib/llvm-9/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
echo "TSAN_OPTIONS='halt_on_error=1 history_size=7 ignore_noninstrumented_modules=1 verbosity=1'" >> /etc/environment; \

View File

@ -625,4 +625,43 @@ Setting fields:
- `storage_type` The structure of internal Redis storage using for work with keys. `simple` is for simple sources and for hashed single key sources, `hash_map` is for hashed sources with two keys. Ranged sources and cache sources with complex key are unsupported. May be omitted, default value is `simple`.
- `db_index` The specific numeric index of Redis logical database. May be omitted, default value is 0.
### Cassandra {#dicts-external_dicts_dict_sources-cassandra}
Example of settings:
```xml
<source>
<cassandra>
<host>localhost</host>
<port>9042</port>
<user>username</user>
<password>qwerty123</password>
<keyspase>database_name</keyspase>
<column_family>table_name</column_family>
<allow_filering>1</allow_filering>
<partition_key_prefix>1</partition_key_prefix>
<consistency>One</consistency>
<where>"SomeColumn" = 42</where>
<max_threads>8</max_threads>
</cassandra>
</source>
```
Setting fields:
- `host` The Cassandra host or comma-separated list of hosts.
- `port` The port on the Cassandra servers. If not specified, default port is used.
- `user` Name of the Cassandra user.
- `password` Password of the Cassandra user.
- `keyspace` Name of the keyspace (database).
- `column_family` Name of the column family (table).
- `allow_filering` Flag to allow or not potentially expensive conditions on clustering key columns. Default value is 1.
- `partition_key_prefix` Number of partition key columns in primary key of the Cassandra table.
Required for compose key dictionaries. Order of key columns in the dictionary definition must be the same as in Cassandra.
Default value is 1 (the first key column is a partition key and other key columns are clustering key).
- `consistency` Consistency level. Possible values: `One`, `Two`, `Three`,
`All`, `EachQuorum`, `Quorum`, `LocalQuorum`, `LocalOne`, `Serial`, `LocalSerial`. Default is `One`.
- `where` Optional selection criteria.
- `max_threads` The maximum number of threads to use for loading data from multiple partitions in compose key dictionaries.
[Original article](https://clickhouse.tech/docs/en/query_language/dicts/external_dicts_dict_sources/) <!--hide-->

View File

@ -1,27 +1,31 @@
# 配置文件 {#configuration_files}
主服务器配置文件是 `config.xml`. 它驻留在 `/etc/clickhouse-server/` 目录
ClickHouse支持多配置文件管理。主配置文件是`/etc/clickhouse-server/config.xml`。其余文件须在目录`/etc/clickhouse-server/config.d`
单个设置可以在复盖 `*.xml``*.conf` 在文件 `conf.d``config.d` 配置文件旁边的目录。
!!! 注意:
所有配置文件必须是XML格式。此外配置文件须有相同的跟元素通常是`<yandex>`。
`replace``remove` 可以为这些配置文件的元素指定属性
主配置文件中的一些配置可以通过`replace`或`remove`属性被配置文件覆盖
如果两者都未指定,则递归组合元素的内容,替换重复子项的值。
如果两者都未指定,则递归组合配置的内容,替换重复子项的值。
如果 `replace` 如果指定,则将整个元素替换为指定的元素。
如果指定`replace`属性,则将整个元素替换为指定的元素。
如果 `remove` 如果指定,则删除该元素。
如果指定`remove`属性,则删除该元素。
The config can also define «substitutions». If an element has the `incl` 属性时,从文件中的相应替换将被用作该值。 默认情况下,具有替换的文件的路径为 `/etc/metrika.xml`. 这可以在改变 [包括\_从](server-configuration-parameters/settings.md#server_configuration_parameters-include_from) 服务器配置中的元素。 替换值在指定 `/yandex/substitution_name` 这个文件中的元素。 如果在指定的替换 `incl` 不存在,则将其记录在日志中。 要防止ClickHouse记录丢失的替换请指定 `optional="true"` 属性(例如,设置 [](#macros) server\_settings/settings.md))
此外,配置文件还可指定"substitutions"。如果一个元素有`incl`属性,则文件中的相应替换值将被使用。默认情况下,具有替换的文件的路径为`/etc/metrika.xml`。这可以在服务配置中的[include\_from](server-configuration-parameters/settings.md#server_configuration_parameters-include_from)元素中被修改。替换值在这个文件的`/yandex/substitution_name`元素中被指定。如果`incl`中指定的替换值不存在则将其记录在日志中。为防止ClickHouse记录丢失的替换请指定`optional="true"`属性(例如,[宏](server-configuration-parameters/settings.md)设置)
替换也可以从ZooKeeper执行。 为此,请指定属性 `from_zk = "/path/to/node"`. 元素值被替换为节点的内容 `/path/to/node` 在动物园管理员。 您还可以将整个XML子树放在ZooKeeper节点上并将其完全插入到源元素中。
替换也可以从ZooKeeper执行。为此,请指定属性`from_zk = "/path/to/node"`。元素值被替换为ZooKeeper节点`/path/to/node`的内容。您还可以将整个XML子树放在ZooKeeper节点上并将其完全插入到源元素中。
`config.xml` 文件可以指定具有用户设置、配置文件和配额的单独配置。 这个配置的相对路径在 users\_config 元素。 默认情况下,它是 `users.xml`. 如果 `users_config` 被省略,用户设置,配置文件和配额直接在指定 `config.xml`.
`config.xml` 文件可以指定单独的配置文件用于配置用户设置、配置文件及配额。可在`users_config`元素中指定其配置文件相对路径。其默认值是`users.xml`。如果`users_config`被省略,用户设置,配置文件和配额则直接在`config.xml`中指定。
此外, `users_config` 可以从文件中复盖 `users_config.d` 目录(例如, `users.d`)和替换。 例如,您可以为每个用户提供单独的配置文件,如下所示:
用户配置可以分为如`config.xml`和`config.d/`等形式的单独配置文件。目录名称为配置`user_config`的值,去掉`.xml`后缀并与添加`.d`。由于`users_config`配置默认值为`users.xml`,所以目录名默认使用`users.d`。例如,您可以为每个用户有单独的配置文件,如下所示:
``` bash
$ cat /etc/clickhouse-server/users.d/alice.xml
```
``` xml
$ cat /etc/clickhouse-server/users.d/alice.xml
<yandex>
<users>
<alice>
@ -36,7 +40,7 @@ $ cat /etc/clickhouse-server/users.d/alice.xml
</yandex>
```
对于每个配置文件,服务器还会生成 `file-preprocessed.xml` 启动时的文件。 这些文件包含所有已完成的替换和复盖,并且它们旨在提供信息。 如果zookeeper替换在配置文件中使用但ZooKeeper在服务器启动时不可用则服务器将从预处理的文件中加载配置。
对于每个配置文件,服务器还会在启动时生成 `file-preprocessed.xml` 文件。这些文件包含所有已完成的替换和复盖并且它们旨在提供信息。如果zookeeper替换在配置文件中使用但ZooKeeper在服务器启动时不可用则服务器将从预处理的文件中加载配置。
服务器跟踪配置文件中的更改以及执行替换和复盖时使用的文件和ZooKeeper节点并动态重新加载用户和集群的设置。 这意味着您可以在不重新启动服务器的情况下修改群集、用户及其设置。

View File

@ -352,6 +352,11 @@ if (USE_OPENCL)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${OpenCL_INCLUDE_DIRS})
endif ()
if (USE_CASSANDRA)
dbms_target_link_libraries(PUBLIC ${CASSANDRA_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${CASS_INCLUDE_DIR})
endif()
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${DOUBLE_CONVERSION_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${MSGPACK_INCLUDE_DIR})

View File

@ -495,6 +495,7 @@ namespace ErrorCodes
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN = 524;
extern const int INCORRECT_DISK_INDEX = 525;
extern const int UNKNOWN_VOLUME_TYPE = 526;
extern const int CASSANDRA_INTERNAL_ERROR = 527;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -9,6 +9,7 @@
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND
#cmakedefine01 USE_OPENCL
#cmakedefine01 USE_CASSANDRA
#cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -61,8 +61,8 @@ public:
void cancel();
/// Get totals and extremes if any.
Block getTotals() const { return totals; }
Block getExtremes() const { return extremes; }
Block getTotals() { return std::move(totals); }
Block getExtremes() { return std::move(extremes); }
/// Set callback for progress. It will be called on Progress packet.
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }

View File

@ -21,6 +21,10 @@ target_link_libraries(clickhouse_dictionaries
string_utils
)
if(USE_CASSANDRA)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${CASSANDRA_INCLUDE_DIR})
endif()
add_subdirectory(Embedded)
target_include_directories(clickhouse_dictionaries SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR})

View File

@ -0,0 +1,274 @@
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_CASSANDRA
#include <utility>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ExternalResultDescription.h>
#include <IO/ReadHelpers.h>
#include "CassandraBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
}
CassandraBlockInputStream::CassandraBlockInputStream(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size_)
: session(session_)
, statement(query_str.c_str(), /*parameters count*/ 0)
, max_block_size(max_block_size_)
, has_more_pages(cass_true)
{
description.init(sample_block);
cassandraCheck(cass_statement_set_paging_size(statement, max_block_size));
}
void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, const CassValue * cass_value)
{
switch (type)
{
case ValueType::vtUInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnUInt8 &>(column).insertValue(static_cast<UInt8>(value));
break;
}
case ValueType::vtUInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtUInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value));
break;
}
case ValueType::vtUInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt64 &>(column).insertValue(static_cast<UInt64>(value));
break;
}
case ValueType::vtInt8:
{
cass_int8_t value;
cass_value_get_int8(cass_value, &value);
assert_cast<ColumnInt8 &>(column).insertValue(value);
break;
}
case ValueType::vtInt16:
{
cass_int16_t value;
cass_value_get_int16(cass_value, &value);
assert_cast<ColumnInt16 &>(column).insertValue(value);
break;
}
case ValueType::vtInt32:
{
cass_int32_t value;
cass_value_get_int32(cass_value, &value);
assert_cast<ColumnInt32 &>(column).insertValue(value);
break;
}
case ValueType::vtInt64:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnInt64 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat32:
{
cass_float_t value;
cass_value_get_float(cass_value, &value);
assert_cast<ColumnFloat32 &>(column).insertValue(value);
break;
}
case ValueType::vtFloat64:
{
cass_double_t value;
cass_value_get_double(cass_value, &value);
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
}
case ValueType::vtString:
{
const char * value = nullptr;
size_t value_length;
cass_value_get_string(cass_value, &value, &value_length);
assert_cast<ColumnString &>(column).insertData(value, value_length);
break;
}
case ValueType::vtDate:
{
cass_uint32_t value;
cass_value_get_uint32(cass_value, &value);
assert_cast<ColumnUInt16 &>(column).insertValue(static_cast<UInt16>(value));
break;
}
case ValueType::vtDateTime:
{
cass_int64_t value;
cass_value_get_int64(cass_value, &value);
assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(value / 1000));
break;
}
case ValueType::vtUUID:
{
CassUuid value;
cass_value_get_uuid(cass_value, &value);
std::array<char, CASS_UUID_STRING_LENGTH> uuid_str;
cass_uuid_string(value, uuid_str.data());
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
}
}
void CassandraBlockInputStream::readPrefix()
{
result_future = cass_session_execute(*session, statement);
}
Block CassandraBlockInputStream::readImpl()
{
if (!has_more_pages)
return {};
MutableColumns columns = description.sample_block.cloneEmptyColumns();
cassandraWaitAndCheck(result_future);
CassResultPtr result = cass_future_get_result(result_future);
assert(cass_result_column_count(result) == columns.size());
assertTypes(result);
has_more_pages = cass_result_has_more_pages(result);
if (has_more_pages)
{
cassandraCheck(cass_statement_set_paging_state(statement, result));
result_future = cass_session_execute(*session, statement);
}
CassIteratorPtr rows_iter = cass_iterator_from_result(result); /// Points to rows[-1]
while (cass_iterator_next(rows_iter))
{
const CassRow * row = cass_iterator_get_row(rows_iter);
for (size_t col_idx = 0; col_idx < columns.size(); ++col_idx)
{
const CassValue * val = cass_row_get_column(row, col_idx);
if (cass_value_is_null(val))
columns[col_idx]->insertDefault();
else if (description.types[col_idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[col_idx]);
insertValue(column_nullable.getNestedColumn(), description.types[col_idx].first, val);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[col_idx], description.types[col_idx].first, val);
}
}
assert(cass_result_row_count(result) == columns.front()->size());
return description.sample_block.cloneWithColumns(std::move(columns));
}
void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
{
if (!assert_types)
return;
size_t column_count = cass_result_column_count(result);
for (size_t i = 0; i < column_count; ++i)
{
CassValueType expected = CASS_VALUE_TYPE_UNKNOWN;
String expected_text;
/// Cassandra does not support unsigned integers (cass_uint32_t is for Date)
switch (description.types[i].first)
{
case ExternalResultDescription::ValueType::vtInt8:
case ExternalResultDescription::ValueType::vtUInt8:
expected = CASS_VALUE_TYPE_TINY_INT;
expected_text = "tinyint";
break;
case ExternalResultDescription::ValueType::vtInt16:
case ExternalResultDescription::ValueType::vtUInt16:
expected = CASS_VALUE_TYPE_SMALL_INT;
expected_text = "smallint";
break;
case ExternalResultDescription::ValueType::vtUInt32:
case ExternalResultDescription::ValueType::vtInt32:
expected = CASS_VALUE_TYPE_INT;
expected_text = "int";
break;
case ExternalResultDescription::ValueType::vtInt64:
case ExternalResultDescription::ValueType::vtUInt64:
expected = CASS_VALUE_TYPE_BIGINT;
expected_text = "bigint";
break;
case ExternalResultDescription::ValueType::vtFloat32:
expected = CASS_VALUE_TYPE_FLOAT;
expected_text = "float";
break;
case ExternalResultDescription::ValueType::vtFloat64:
expected = CASS_VALUE_TYPE_DOUBLE;
expected_text = "double";
break;
case ExternalResultDescription::ValueType::vtString:
expected = CASS_VALUE_TYPE_TEXT;
expected_text = "text, ascii or varchar";
break;
case ExternalResultDescription::ValueType::vtDate:
expected = CASS_VALUE_TYPE_DATE;
expected_text = "date";
break;
case ExternalResultDescription::ValueType::vtDateTime:
expected = CASS_VALUE_TYPE_TIMESTAMP;
expected_text = "timestamp";
break;
case ExternalResultDescription::ValueType::vtUUID:
expected = CASS_VALUE_TYPE_UUID;
expected_text = "uuid";
break;
}
CassValueType got = cass_result_column_type(result, i);
if (got != expected)
{
if (expected == CASS_VALUE_TYPE_TEXT && (got == CASS_VALUE_TYPE_ASCII || got == CASS_VALUE_TYPE_VARCHAR))
continue;
const auto & column_name = description.sample_block.getColumnsWithTypeAndName()[i].name;
throw Exception("Type mismatch for column " + column_name + ": expected Cassandra type " + expected_text,
ErrorCodes::TYPE_MISMATCH);
}
}
assert_types = false;
}
}
#endif

View File

@ -0,0 +1,47 @@
#pragma once
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ExternalResultDescription.h>
namespace DB
{
class CassandraBlockInputStream final : public IBlockInputStream
{
public:
CassandraBlockInputStream(
const CassSessionShared & session_,
const String & query_str,
const Block & sample_block,
size_t max_block_size);
String getName() const override { return "Cassandra"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
void readPrefix() override;
private:
using ValueType = ExternalResultDescription::ValueType;
Block readImpl() override;
static void insertValue(IColumn & column, ValueType type, const CassValue * cass_value);
void assertTypes(const CassResultPtr & result);
CassSessionShared session;
CassStatementPtr statement;
CassFuturePtr result_future;
const size_t max_block_size;
ExternalResultDescription description;
cass_bool_t has_more_pages;
bool assert_types = true;
};
}
#endif

View File

@ -0,0 +1,211 @@
#include "CassandraDictionarySource.h"
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int NOT_IMPLEMENTED;
}
void registerDictionarySourceCassandra(DictionarySourceFactory & factory)
{
auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
const Context & /* context */,
bool /*check_config*/) -> DictionarySourcePtr
{
#if USE_CASSANDRA
setupCassandraDriverLibraryLogging(CASS_LOG_INFO);
return std::make_unique<CassandraDictionarySource>(dict_struct, config, config_prefix + ".cassandra", sample_block);
#else
throw Exception{"Dictionary source of type `cassandra` is disabled because ClickHouse was built without cassandra support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
};
factory.registerSource("cassandra", create_table_source);
}
}
#if USE_CASSANDRA
#include <IO/WriteHelpers.h>
#include <Common/SipHash.h>
#include "CassandraBlockInputStream.h"
#include <common/logger_useful.h>
#include <DataStreams/UnionBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
}
CassandraSettings::CassandraSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: host(config.getString(config_prefix + ".host"))
, port(config.getUInt(config_prefix + ".port", 0))
, user(config.getString(config_prefix + ".user", ""))
, password(config.getString(config_prefix + ".password", ""))
, db(config.getString(config_prefix + ".keyspace"))
, table(config.getString(config_prefix + ".column_family"))
, allow_filtering(config.getBool(config_prefix + ".allow_filtering", false))
, partition_key_prefix(config.getUInt(config_prefix + ".partition_key_prefix", 1))
, max_threads(config.getUInt(config_prefix + ".max_threads", 8))
, where(config.getString(config_prefix + ".where", ""))
{
setConsistency(config.getString(config_prefix + ".consistency", "One"));
}
void CassandraSettings::setConsistency(const String & config_str)
{
if (config_str == "One")
consistency = CASS_CONSISTENCY_ONE;
else if (config_str == "Two")
consistency = CASS_CONSISTENCY_TWO;
else if (config_str == "Three")
consistency = CASS_CONSISTENCY_THREE;
else if (config_str == "All")
consistency = CASS_CONSISTENCY_ALL;
else if (config_str == "EachQuorum")
consistency = CASS_CONSISTENCY_EACH_QUORUM;
else if (config_str == "Quorum")
consistency = CASS_CONSISTENCY_QUORUM;
else if (config_str == "LocalQuorum")
consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
else if (config_str == "LocalOne")
consistency = CASS_CONSISTENCY_LOCAL_ONE;
else if (config_str == "Serial")
consistency = CASS_CONSISTENCY_SERIAL;
else if (config_str == "LocalSerial")
consistency = CASS_CONSISTENCY_LOCAL_SERIAL;
else /// CASS_CONSISTENCY_ANY is only valid for writes
throw Exception("Unsupported consistency level: " + config_str, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
static const size_t max_block_size = 8192;
CassandraDictionarySource::CassandraDictionarySource(
const DictionaryStructure & dict_struct_,
const CassandraSettings & settings_,
const Block & sample_block_)
: log(&Poco::Logger::get("CassandraDictionarySource"))
, dict_struct(dict_struct_)
, settings(settings_)
, sample_block(sample_block_)
, query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
{
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
if (settings.port)
cassandraCheck(cass_cluster_set_port(cluster, settings.port));
cass_cluster_set_credentials(cluster, settings.user.c_str(), settings.password.c_str());
cassandraCheck(cass_cluster_set_consistency(cluster, settings.consistency));
}
CassandraDictionarySource::CassandraDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block_)
: CassandraDictionarySource(
dict_struct_,
CassandraSettings(config, config_prefix),
sample_block_)
{
}
void CassandraDictionarySource::maybeAllowFiltering(String & query) const
{
if (!settings.allow_filtering)
return;
query.pop_back(); /// remove semicolon
query += " ALLOW FILTERING;";
}
BlockInputStreamPtr CassandraDictionarySource::loadAll()
{
String query = query_builder.composeLoadAllQuery();
maybeAllowFiltering(query);
LOG_INFO(log, "Loading all using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
std::string CassandraDictionarySource::toString() const
{
return "Cassandra: " + settings.db + '.' + settings.table;
}
BlockInputStreamPtr CassandraDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
String query = query_builder.composeLoadIdsQuery(ids);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading ids using query: {}", query);
return std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size);
}
BlockInputStreamPtr CassandraDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
if (requested_rows.empty())
throw Exception("No rows requested", ErrorCodes::LOGICAL_ERROR);
/// TODO is there a better way to load data by complex keys?
std::unordered_map<UInt64, std::vector<size_t>> partitions;
for (const auto & row : requested_rows)
{
SipHash partition_key;
for (size_t i = 0; i < settings.partition_key_prefix; ++i)
key_columns[i]->updateHashWithValue(row, partition_key);
partitions[partition_key.get64()].push_back(row);
}
BlockInputStreams streams;
for (const auto & partition : partitions)
{
String query = query_builder.composeLoadKeysQuery(key_columns, partition.second, ExternalQueryBuilder::CASSANDRA_SEPARATE_PARTITION_KEY, settings.partition_key_prefix);
maybeAllowFiltering(query);
LOG_INFO(log, "Loading keys for partition hash {} using query: {}", partition.first, query);
streams.push_back(std::make_shared<CassandraBlockInputStream>(getSession(), query, sample_block, max_block_size));
}
if (streams.size() == 1)
return streams.front();
return std::make_shared<UnionBlockInputStream>(streams, nullptr, settings.max_threads);
}
BlockInputStreamPtr CassandraDictionarySource::loadUpdatedAll()
{
throw Exception("Method loadUpdatedAll is unsupported for CassandraDictionarySource", ErrorCodes::NOT_IMPLEMENTED);
}
CassSessionShared CassandraDictionarySource::getSession()
{
/// Reuse connection if exists, create new one if not
auto session = maybe_session.lock();
if (session)
return session;
std::lock_guard lock(connect_mutex);
session = maybe_session.lock();
if (session)
return session;
session = std::make_shared<CassSessionPtr>();
CassFuturePtr future = cass_session_connect(*session, cluster);
cassandraWaitAndCheck(future);
maybe_session = session;
return session;
}
}
#endif

View File

@ -0,0 +1,89 @@
#pragma once
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include "DictionaryStructure.h"
#include "IDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <Poco/Logger.h>
#include <mutex>
namespace DB
{
struct CassandraSettings
{
String host;
UInt16 port;
String user;
String password;
String db;
String table;
CassConsistency consistency;
bool allow_filtering;
/// TODO get information about key from the driver
size_t partition_key_prefix;
size_t max_threads;
String where;
CassandraSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
void setConsistency(const String & config_str);
};
class CassandraDictionarySource final : public IDictionarySource
{
public:
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
const CassandraSettings & settings_,
const Block & sample_block);
CassandraDictionarySource(
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Block & sample_block);
BlockInputStreamPtr loadAll() override;
bool supportsSelectiveLoad() const override { return true; }
bool isModified() const override { return true; }
bool hasUpdateField() const override { return false; }
DictionarySourcePtr clone() const override
{
return std::make_unique<CassandraDictionarySource>(dict_struct, settings, sample_block);
}
BlockInputStreamPtr loadIds(const std::vector<UInt64> & ids) override;
BlockInputStreamPtr loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) override;
BlockInputStreamPtr loadUpdatedAll() override;
String toString() const override;
private:
void maybeAllowFiltering(String & query) const;
CassSessionShared getSession();
Poco::Logger * log;
const DictionaryStructure dict_struct;
const CassandraSettings settings;
Block sample_block;
ExternalQueryBuilder query_builder;
std::mutex connect_mutex;
CassClusterPtr cluster;
CassSessionWeak maybe_session;
};
}
#endif

View File

@ -0,0 +1,68 @@
#include <Dictionaries/CassandraHelpers.h>
#if USE_CASSANDRA
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int CASSANDRA_INTERNAL_ERROR;
}
void cassandraCheck(CassError code)
{
if (code != CASS_OK)
throw Exception("Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code),
ErrorCodes::CASSANDRA_INTERNAL_ERROR);
}
void cassandraWaitAndCheck(CassFuturePtr & future)
{
auto code = cass_future_error_code(future); /// Waits if not ready
if (code == CASS_OK)
return;
/// `future` owns `message` and will free it on destruction
const char * message;
size_t message_len;
cass_future_error_message(future, &message, & message_len);
std::string full_message = "Cassandra driver error " + std::to_string(code) + ": " + cass_error_desc(code) + ": " + message;
throw Exception(full_message, ErrorCodes::CASSANDRA_INTERNAL_ERROR);
}
static std::once_flag setup_logging_flag;
void setupCassandraDriverLibraryLogging(CassLogLevel level)
{
std::call_once(setup_logging_flag, [level]()
{
Poco::Logger * logger = &Poco::Logger::get("CassandraDriverLibrary");
cass_log_set_level(level);
if (level != CASS_LOG_DISABLED)
cass_log_set_callback(cassandraLogCallback, logger);
});
}
void cassandraLogCallback(const CassLogMessage * message, void * data)
{
Poco::Logger * logger = static_cast<Poco::Logger *>(data);
if (message->severity == CASS_LOG_CRITICAL || message->severity == CASS_LOG_ERROR)
LOG_ERROR(logger, message->message);
else if (message->severity == CASS_LOG_WARN)
LOG_WARNING(logger, message->message);
else if (message->severity == CASS_LOG_INFO)
LOG_INFO(logger, message->message);
else if (message->severity == CASS_LOG_DEBUG)
LOG_DEBUG(logger, message->message);
else if (message->severity == CASS_LOG_TRACE)
LOG_TRACE(logger, message->message);
}
}
#endif

View File

@ -0,0 +1,84 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_CASSANDRA
#include <cassandra.h> // Y_IGNORE
#include <utility>
#include <memory>
namespace DB
{
namespace Cassandra
{
template<typename CassT>
CassT * defaultCtor() { return nullptr; }
/// RAII wrapper for raw pointers to objects from cassandra driver library
template<typename CassT, auto Dtor, auto Ctor = defaultCtor<CassT>>
class ObjectHolder
{
CassT * ptr = nullptr;
public:
template<typename... Args>
ObjectHolder(Args &&... args) : ptr(Ctor(std::forward<Args>(args)...)) {}
ObjectHolder(CassT * ptr_) : ptr(ptr_) {}
ObjectHolder(const ObjectHolder &) = delete;
ObjectHolder & operator = (const ObjectHolder &) = delete;
ObjectHolder(ObjectHolder && rhs) noexcept : ptr(rhs.ptr) { rhs.ptr = nullptr; }
ObjectHolder & operator = (ObjectHolder && rhs) noexcept
{
if (ptr)
Dtor(ptr);
ptr = rhs.ptr;
rhs.ptr = nullptr;
return *this;
}
~ObjectHolder()
{
if (ptr)
Dtor(ptr);
}
/// For implicit conversion when passing object to driver library functions
operator CassT * () { return ptr; }
operator const CassT * () const { return ptr; }
};
}
/// These object are created on pointer construction
using CassClusterPtr = Cassandra::ObjectHolder<CassCluster, cass_cluster_free, cass_cluster_new>;
using CassStatementPtr = Cassandra::ObjectHolder<CassStatement, cass_statement_free, cass_statement_new>;
using CassSessionPtr = Cassandra::ObjectHolder<CassSession, cass_session_free, cass_session_new>;
/// Share connections between streams. Executing statements in one session object is thread-safe
using CassSessionShared = std::shared_ptr<CassSessionPtr>;
using CassSessionWeak = std::weak_ptr<CassSessionPtr>;
/// The following objects are created inside Cassandra driver library,
/// but must be freed by user code
using CassFuturePtr = Cassandra::ObjectHolder<CassFuture, cass_future_free>;
using CassResultPtr = Cassandra::ObjectHolder<const CassResult, cass_result_free>;
using CassIteratorPtr = Cassandra::ObjectHolder<CassIterator, cass_iterator_free>;
/// Checks return code, throws exception on error
void cassandraCheck(CassError code);
void cassandraWaitAndCheck(CassFuturePtr & future);
/// By default driver library prints logs to stderr.
/// It should be redirected (or, at least, disabled) before calling other functions from the library.
void setupCassandraDriverLibraryLogging(CassLogLevel level);
void cassandraLogCallback(const CassLogMessage * message, void * data);
}
#endif

View File

@ -63,6 +63,13 @@ void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out)
std::string ExternalQueryBuilder::composeLoadAllQuery() const
{
WriteBufferFromOwnString out;
composeLoadAllQuery(out);
writeChar(';', out);
return out.str();
}
void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const
{
writeString("SELECT ", out);
if (dict_struct.id)
@ -149,24 +156,26 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeString(" WHERE ", out);
writeString(where, out);
}
writeChar(';', out);
return out.str();
}
std::string ExternalQueryBuilder::composeUpdateQuery(const std::string & update_field, const std::string & time_point) const
{
std::string out = composeLoadAllQuery();
std::string update_query;
WriteBufferFromOwnString out;
composeLoadAllQuery(out);
if (!where.empty())
update_query = " AND " + update_field + " >= '" + time_point + "'";
writeString(" AND ", out);
else
update_query = " WHERE " + update_field + " >= '" + time_point + "'";
writeString(" WHERE ", out);
return out.insert(out.size() - 1, update_query); /// This is done to insert "update_query" before "out"'s semicolon
writeQuoted(update_field, out);
writeString(" >= '", out);
writeString(time_point, out);
writeChar('\'', out);
writeChar(';', out);
return out.str();
}
@ -241,7 +250,7 @@ std::string ExternalQueryBuilder::composeLoadIdsQuery(const std::vector<UInt64>
std::string
ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method)
ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix)
{
if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
@ -284,9 +293,13 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
if (!where.empty())
{
writeString("(", out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString("(", out);
writeString(where, out);
writeString(") AND (", out);
if (method != CASSANDRA_SEPARATE_PARTITION_KEY)
writeString(") AND (", out);
else
writeString(" AND ", out);
}
if (method == AND_OR_CHAIN)
@ -298,28 +311,33 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
writeString(" OR ", out);
first = false;
composeKeyCondition(key_columns, row, out);
writeString("(", out);
composeKeyCondition(key_columns, row, out, 0, key_columns.size());
writeString(")", out);
}
}
else /* if (method == IN_WITH_TUPLES) */
else if (method == IN_WITH_TUPLES)
{
writeString(composeKeyTupleDefinition(), out);
writeString(" IN (", out);
first = true;
for (const auto row : requested_rows)
{
if (!first)
writeString(", ", out);
first = false;
composeKeyTuple(key_columns, row, out);
}
writeString(")", out);
composeInWithTuples(key_columns, requested_rows, out, 0, key_columns.size());
}
else /* if (method == CASSANDRA_SEPARATE_PARTITION_KEY) */
{
/// CQL does not allow using OR conditions
/// and does not allow using multi-column IN expressions with partition key columns.
/// So we have to use multiple queries with conditions like
/// (partition_key_1 = val1 AND partition_key_2 = val2 ...) AND (clustering_key_1, ...) IN ((val3, ...), ...)
/// for each partition key.
/// `partition_key_prefix` is a number of columns from partition key.
/// All `requested_rows` must have the same values of partition key.
composeKeyCondition(key_columns, requested_rows.at(0), out, 0, partition_key_prefix);
if (partition_key_prefix && partition_key_prefix < key_columns.size())
writeString(" AND ", out);
if (partition_key_prefix < key_columns.size())
composeInWithTuples(key_columns, requested_rows, out, partition_key_prefix, key_columns.size());
}
if (!where.empty())
if (!where.empty() && method != CASSANDRA_SEPARATE_PARTITION_KEY)
{
writeString(")", out);
}
@ -330,13 +348,11 @@ ExternalQueryBuilder::composeLoadKeysQuery(const Columns & key_columns, const st
}
void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const
void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out,
size_t beg, size_t end) const
{
writeString("(", out);
const auto keys_size = key_columns.size();
auto first = true;
for (const auto i : ext::range(0, keys_size))
for (const auto i : ext::range(beg, end))
{
if (!first)
writeString(" AND ", out);
@ -346,45 +362,60 @@ void ExternalQueryBuilder::composeKeyCondition(const Columns & key_columns, cons
const auto & key_description = (*dict_struct.key)[i];
/// key_i=value_i
writeString(key_description.name, out);
writeQuoted(key_description.name, out);
writeString("=", out);
key_description.type->serializeAsTextQuoted(*key_columns[i], row, out, format_settings);
}
}
void ExternalQueryBuilder::composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows,
WriteBuffer & out, size_t beg, size_t end)
{
composeKeyTupleDefinition(out, beg, end);
writeString(" IN (", out);
bool first = true;
for (const auto row : requested_rows)
{
if (!first)
writeString(", ", out);
first = false;
composeKeyTuple(key_columns, row, out, beg, end);
}
writeString(")", out);
}
std::string ExternalQueryBuilder::composeKeyTupleDefinition() const
void ExternalQueryBuilder::composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const
{
if (!dict_struct.key)
throw Exception{"Composite key required for method", ErrorCodes::UNSUPPORTED_METHOD};
std::string result{"("};
writeChar('(', out);
auto first = true;
for (const auto & key : *dict_struct.key)
for (const auto i : ext::range(beg, end))
{
if (!first)
result += ", ";
writeString(", ", out);
first = false;
result += key.name;
writeQuoted((*dict_struct.key)[i].name, out);
}
result += ")";
return result;
writeChar(')', out);
}
void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const
void ExternalQueryBuilder::composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const
{
writeString("(", out);
const auto keys_size = key_columns.size();
auto first = true;
for (const auto i : ext::range(0, keys_size))
for (const auto i : ext::range(beg, end))
{
if (!first)
writeString(", ", out);

View File

@ -42,30 +42,39 @@ struct ExternalQueryBuilder
std::string composeLoadIdsQuery(const std::vector<UInt64> & ids);
/** Generate a query to load data by set of composite keys.
* There are two methods of specification of composite keys in WHERE:
* There are three methods of specification of composite keys in WHERE:
* 1. (x = c11 AND y = c12) OR (x = c21 AND y = c22) ...
* 2. (x, y) IN ((c11, c12), (c21, c22), ...)
* 3. (x = c1 AND (y, z) IN ((c2, c3), ...))
*/
enum LoadKeysMethod
{
AND_OR_CHAIN,
IN_WITH_TUPLES,
CASSANDRA_SEPARATE_PARTITION_KEY,
};
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method);
std::string composeLoadKeysQuery(const Columns & key_columns, const std::vector<size_t> & requested_rows, LoadKeysMethod method, size_t partition_key_prefix = 0);
private:
const FormatSettings format_settings;
void composeLoadAllQuery(WriteBuffer & out) const;
/// In the following methods `beg` and `end` specifies which columns to write in expression
/// Expression in form (x = c1 AND y = c2 ...)
void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out) const;
void composeKeyCondition(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (x, y, ...) IN ((c1, c2, ...), ...)
void composeInWithTuples(const Columns & key_columns, const std::vector<size_t> & requested_rows, WriteBuffer & out, size_t beg, size_t end);
/// Expression in form (x, y, ...)
std::string composeKeyTupleDefinition() const;
void composeKeyTupleDefinition(WriteBuffer & out, size_t beg, size_t end) const;
/// Expression in form (c1, c2, ...)
void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out) const;
void composeKeyTuple(const Columns & key_columns, const size_t row, WriteBuffer & out, size_t beg, size_t end) const;
/// Write string with specified quoting style.
void writeQuoted(const std::string & s, WriteBuffer & out) const;

View File

@ -13,6 +13,7 @@ void registerDictionaries()
registerDictionarySourceClickHouse(source_factory);
registerDictionarySourceMongoDB(source_factory);
registerDictionarySourceRedis(source_factory);
registerDictionarySourceCassandra(source_factory);
registerDictionarySourceXDBC(source_factory);
registerDictionarySourceJDBC(source_factory);
registerDictionarySourceExecutable(source_factory);

View File

@ -9,6 +9,7 @@ void registerDictionarySourceFile(DictionarySourceFactory & source_factory);
void registerDictionarySourceMysql(DictionarySourceFactory & source_factory);
void registerDictionarySourceClickHouse(DictionarySourceFactory & source_factory);
void registerDictionarySourceMongoDB(DictionarySourceFactory & source_factory);
void registerDictionarySourceCassandra(DictionarySourceFactory & source_factory);
void registerDictionarySourceRedis(DictionarySourceFactory & source_factory);
void registerDictionarySourceXDBC(DictionarySourceFactory & source_factory);
void registerDictionarySourceJDBC(DictionarySourceFactory & source_factory);

View File

@ -17,6 +17,9 @@ SRCS(
CacheDictionary_generate1.cpp
CacheDictionary_generate2.cpp
CacheDictionary_generate3.cpp
CassandraBlockInputStream.cpp
CassandraDictionarySource.cpp
CassandraHelpers.cpp
ClickHouseDictionarySource.cpp
ComplexKeyCacheDictionary.cpp
ComplexKeyCacheDictionary_createAttributeWithType.cpp

View File

@ -4,6 +4,7 @@
#include <IO/AIOContextPool.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/MemorySanitizer.h>
#include <Core/Defines.h>
#include <sys/types.h>
@ -95,11 +96,8 @@ bool ReadBufferAIO::nextImpl()
if (profile_callback)
watch.emplace(clock_type);
if (!is_aio)
{
if (!is_pending_read)
synchronousRead();
is_aio = true;
}
else
receive();
@ -215,7 +213,9 @@ void ReadBufferAIO::synchronousRead()
void ReadBufferAIO::receive()
{
if (!waitForAIOCompletion())
return;
{
throw Exception("Trying to receive data from AIO, but nothing was queued. It's a bug", ErrorCodes::LOGICAL_ERROR);
}
finalize();
}
@ -224,8 +224,6 @@ void ReadBufferAIO::skip()
if (!waitForAIOCompletion())
return;
is_aio = false;
/// @todo I presume this assignment is redundant since waitForAIOCompletion() performs a similar one
// bytes_read = future_bytes_read.get();
if ((bytes_read < 0) || (static_cast<size_t>(bytes_read) < region_left_padding))
@ -274,6 +272,9 @@ void ReadBufferAIO::prepare()
region_aligned_size = region_aligned_end - region_aligned_begin;
buffer_begin = fill_buffer.internalBuffer().begin();
/// Unpoison because msan doesn't instrument linux AIO
__msan_unpoison(buffer_begin, fill_buffer.internalBuffer().size());
}
void ReadBufferAIO::finalize()

View File

@ -100,8 +100,6 @@ private:
bool is_eof = false;
/// At least one read request was sent.
bool is_started = false;
/// Is the operation asynchronous?
bool is_aio = false;
/// Did the asynchronous operation fail?
bool aio_failed = false;

View File

@ -1,8 +1,6 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
@ -13,9 +11,8 @@
#include <common/logger_useful.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/DelayedSource.h>
namespace ProfileEvents
{
@ -118,13 +115,13 @@ void SelectStreamFactory::createForShard(
const SelectQueryInfo &,
Pipes & res)
{
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals_port = false;
bool add_extremes_port = false;
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
bool add_extremes = false;
if (processed_stage == QueryProcessingStage::Complete)
{
add_totals_port = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
add_extremes_port = context.getSettingsRef().extremes;
add_totals = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
add_extremes = context.getSettingsRef().extremes;
}
auto modified_query_ast = query_ast->clone();
@ -140,20 +137,13 @@ void SelectStreamFactory::createForShard(
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard_info.pool, modified_query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
remote_query_executor->setMainTable(main_table);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream), force_add_agg_info);
if (add_totals_port)
source->addTotalsPort();
if (add_extremes_port)
source->addExtremesPort();
res.emplace_back(std::move(source));
res.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
};
const auto & settings = context.getSettingsRef();
@ -246,8 +236,8 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay]()
-> BlockInputStreamPtr
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]()
-> Pipe
{
auto current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
@ -277,8 +267,7 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return std::make_shared<PipelineExecutingBlockInputStream>(
createLocalStream(modified_query_ast, header, context, stage));
return createLocalStream(modified_query_ast, header, context, stage).getPipe();
else
{
std::vector<IConnectionPool::Entry> connections;
@ -286,20 +275,14 @@ void SelectStreamFactory::createForShard(
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), modified_query, header, context, nullptr, throttler, scalars, external_tables, stage);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes);
}
};
auto lazy_stream = std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream);
auto source = std::make_shared<SourceFromInputStream>(std::move(lazy_stream), force_add_agg_info);
if (add_totals_port)
source->addTotalsPort();
if (add_extremes_port)
source->addExtremesPort();
res.emplace_back(std::move(source));
res.emplace_back(createDelayedPipe(header, lazily_create_stream));
}
else
emplace_remote_stream();

View File

@ -20,6 +20,7 @@
#include <Common/CurrentThread.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h>
namespace DB
{
@ -673,8 +674,10 @@ void QueryPipeline::initRowsBeforeLimit()
{
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<SourceFromInputStream *> sources;
std::vector<RemoteSource *> remote_sources;
std::unordered_set<IProcessor *> visited;
@ -705,6 +708,9 @@ void QueryPipeline::initRowsBeforeLimit()
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
sources.emplace_back(source);
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
}
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
{
@ -735,7 +741,7 @@ void QueryPipeline::initRowsBeforeLimit()
}
}
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty()))
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
@ -744,6 +750,9 @@ void QueryPipeline::initRowsBeforeLimit()
for (auto & source : sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
/// If there is a limit, then enable rows_before_limit_at_least

View File

@ -15,6 +15,12 @@ public:
rows_before_limit.fetch_add(rows, std::memory_order_release);
}
void set(uint64_t rows)
{
setAppliedLimit();
rows_before_limit.store(rows, std::memory_order_release);
}
uint64_t get() const { return rows_before_limit.load(std::memory_order_acquire); }
void setAppliedLimit() { has_applied_limit.store(true, std::memory_order_release); }

View File

@ -0,0 +1,119 @@
#include <Processors/Sources/DelayedSource.h>
#include "NullSource.h"
namespace DB
{
DelayedSource::DelayedSource(const Block & header, Creator processors_creator)
: IProcessor({}, OutputPorts(3, header))
, creator(std::move(processors_creator))
{
}
IProcessor::Status DelayedSource::prepare()
{
/// At first, wait for main input is needed and expand pipeline.
if (inputs.empty())
{
auto & first_output = outputs.front();
/// If main port was finished before callback was called, stop execution.
if (first_output.isFinished())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
if (!first_output.isNeeded())
return Status::PortFull;
/// Call creator callback to get processors.
if (processors.empty())
return Status::Ready;
return Status::ExpandPipeline;
}
/// Process ports in order: main, totals, extremes
auto output = outputs.begin();
for (auto input = inputs.begin(); input != inputs.end(); ++input, ++output)
{
if (output->isFinished())
{
input->close();
continue;
}
if (!output->isNeeded())
return Status::PortFull;
if (input->isFinished())
{
output->finish();
continue;
}
input->setNeeded();
if (!input->hasData())
return Status::PortFull;
output->pushData(input->pullData(true));
return Status::PortFull;
}
return Status::Finished;
}
void DelayedSource::work()
{
auto pipe = creator();
main_output = &pipe.getPort();
totals_output = pipe.getTotalsPort();
extremes_output = pipe.getExtremesPort();
processors = std::move(pipe).detachProcessors();
if (!totals_output)
{
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
totals_output = &processors.back()->getOutputs().back();
}
if (!extremes_output)
{
processors.emplace_back(std::make_shared<NullSource>(main_output->getHeader()));
extremes_output = &processors.back()->getOutputs().back();
}
}
Processors DelayedSource::expandPipeline()
{
/// Add new inputs. They must have the same header as output.
for (const auto & output : {main_output, totals_output, extremes_output})
{
inputs.emplace_back(outputs.front().getHeader(), this);
/// Connect checks that header is same for ports.
connect(*output, inputs.back());
inputs.back().setNeeded();
}
/// Executor will check that all processors are connected.
return std::move(processors);
}
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator)
{
auto source = std::make_shared<DelayedSource>(header, std::move(processors_creator));
Pipe pipe(&source->getPort(DelayedSource::Main));
pipe.setTotalsPort(&source->getPort(DelayedSource::Totals));
pipe.setExtremesPort(&source->getPort(DelayedSource::Extremes));
pipe.addProcessors({std::move(source)});
return pipe;
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Pipe.h>
namespace DB
{
/// DelayedSource delays pipeline calculation until it starts execution.
/// It accepts callback which creates a new pipe.
///
/// First time when DelayedSource's main output port needs data, callback is called.
/// Then, DelayedSource expands pipeline: adds new inputs and connects pipe with it.
/// Then, DelayedSource just move data from inputs to outputs until finished.
///
/// It main output port of DelayedSource is never needed, callback won't be called.
class DelayedSource : public IProcessor
{
public:
using Creator = std::function<Pipe()>;
DelayedSource(const Block & header, Creator processors_creator);
String getName() const override { return "Delayed"; }
Status prepare() override;
void work() override;
Processors expandPipeline() override;
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
OutputPort & getPort(PortKind kind) { return *std::next(outputs.begin(), kind); }
private:
Creator creator;
Processors processors;
/// Outputs from returned pipe.
OutputPort * main_output = nullptr;
OutputPort * totals_output = nullptr;
OutputPort * extremes_output = nullptr;
};
/// Creates pipe from DelayedSource.
Pipe createDelayedPipe(const Block & header, DelayedSource::Creator processors_creator);
}

View File

@ -0,0 +1,132 @@
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
namespace DB
{
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_)
: SourceWithProgress(executor->getHeader(), false)
, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
{
/// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result.
const auto & sample = getPort().getHeader();
for (auto & type : sample.getDataTypes())
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
add_aggregation_info = true;
}
RemoteSource::~RemoteSource() = default;
Chunk RemoteSource::generate()
{
if (!was_query_sent)
{
/// Progress method will be called on Progress packet.
query_executor->setProgressCallback([this](const Progress & value) { progress(value); });
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const BlockStreamProfileInfo & info)
{
if (rows_before_limit && info.hasAppliedLimit())
rows_before_limit->set(info.getRowsBeforeLimit());
});
query_executor->sendQuery();
was_query_sent = true;
}
auto block = query_executor->read();
if (!block)
{
query_executor->finish();
return {};
}
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
if (add_aggregation_info)
{
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
chunk.setChunkInfo(std::move(info));
}
return chunk;
}
void RemoteSource::onCancel()
{
query_executor->cancel();
}
RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)
: ISource(executor->getHeader())
, query_executor(std::move(executor))
{
}
RemoteTotalsSource::~RemoteTotalsSource() = default;
Chunk RemoteTotalsSource::generate()
{
if (auto block = query_executor->getTotals())
{
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
return {};
}
RemoteExtremesSource::RemoteExtremesSource(RemoteQueryExecutorPtr executor)
: ISource(executor->getHeader())
, query_executor(std::move(executor))
{
}
RemoteExtremesSource::~RemoteExtremesSource() = default;
Chunk RemoteExtremesSource::generate()
{
if (auto block = query_executor->getExtremes())
{
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
return {};
}
Pipe createRemoteSourcePipe(
RemoteQueryExecutorPtr query_executor,
bool add_aggregation_info, bool add_totals, bool add_extremes)
{
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info));
if (add_totals)
{
auto totals_source = std::make_shared<RemoteTotalsSource>(query_executor);
pipe.setTotalsPort(&totals_source->getPort());
pipe.addProcessors({std::move(totals_source)});
}
if (add_extremes)
{
auto extremes_source = std::make_shared<RemoteExtremesSource>(query_executor);
pipe.setExtremesPort(&extremes_source->getPort());
pipe.addProcessors({std::move(extremes_source)});
}
return pipe;
}
}

View File

@ -0,0 +1,82 @@
#pragma once
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Pipe.h>
namespace DB
{
class RemoteQueryExecutor;
using RemoteQueryExecutorPtr = std::shared_ptr<RemoteQueryExecutor>;
/// Source from RemoteQueryExecutor. Executes remote query and returns query result chunks.
class RemoteSource : public SourceWithProgress
{
public:
/// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk.
/// AggregatedChunkInfo stores the bucket number used for two-level aggregation.
/// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState.
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_);
~RemoteSource() override;
String getName() const override { return "Remote"; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
/// Stop reading from stream if output port is finished.
void onUpdatePorts() override
{
if (getPort().isFinished())
cancel();
}
protected:
Chunk generate() override;
void onCancel() override;
private:
bool was_query_sent = false;
bool add_aggregation_info = false;
RemoteQueryExecutorPtr query_executor;
RowsBeforeLimitCounterPtr rows_before_limit;
};
/// Totals source from RemoteQueryExecutor.
class RemoteTotalsSource : public ISource
{
public:
explicit RemoteTotalsSource(RemoteQueryExecutorPtr executor);
~RemoteTotalsSource() override;
String getName() const override { return "RemoteTotals"; }
protected:
Chunk generate() override;
private:
RemoteQueryExecutorPtr query_executor;
};
/// Extremes source from RemoteQueryExecutor.
class RemoteExtremesSource : public ISource
{
public:
explicit RemoteExtremesSource(RemoteQueryExecutorPtr executor);
~RemoteExtremesSource() override;
String getName() const override { return "RemoteExtremes"; }
protected:
Chunk generate() override;
private:
RemoteQueryExecutorPtr query_executor;
};
/// Create pipe with remote sources.
Pipe createRemoteSourcePipe(
RemoteQueryExecutorPtr query_executor,
bool add_aggregation_info, bool add_totals, bool add_extremes);
}

View File

@ -12,6 +12,11 @@ namespace ErrorCodes
extern const int TOO_MANY_BYTES;
}
SourceWithProgress::SourceWithProgress(Block header, bool enable_auto_progress)
: ISourceWithProgress(header), auto_progress(enable_auto_progress)
{
}
void SourceWithProgress::work()
{
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))
@ -24,7 +29,7 @@ void SourceWithProgress::work()
ISourceWithProgress::work();
if (!was_progress_called && has_input)
if (auto_progress && !was_progress_called && has_input)
progress({ current_chunk.chunk.getNumRows(), current_chunk.chunk.bytes() });
}
}

View File

@ -44,6 +44,8 @@ class SourceWithProgress : public ISourceWithProgress
{
public:
using ISourceWithProgress::ISourceWithProgress;
/// If enable_auto_progress flag is set, progress() will be automatically called on each generated chunk.
SourceWithProgress(Block header, bool enable_auto_progress);
using LocalLimits = IBlockInputStream::LocalLimits;
using LimitsMode = IBlockInputStream::LimitsMode;
@ -76,6 +78,9 @@ private:
/// This flag checks if progress() was manually called at generate() call.
/// If not, it will be called for chunk after generate() was finished.
bool was_progress_called = false;
/// If enabled, progress() will be automatically called on each generated chunk.
bool auto_progress = true;
};
}

View File

@ -106,9 +106,11 @@ SRCS(
Port.cpp
QueryPipeline.cpp
ResizeProcessor.cpp
Sources/DelayedSource.cpp
Sources/SinkToOutputStream.cpp
Sources/SourceFromInputStream.cpp
Sources/SourceWithProgress.cpp
Sources/RemoteSource.cpp
Transforms/AddingMissedTransform.cpp
Transforms/AddingSelectorTransform.cpp
Transforms/AggregatingTransform.cpp

View File

@ -26,6 +26,11 @@ public:
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
/// It is passed inside the query and solved at its level.
bool supportsPrewhere() const override { return true; }
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override { return to_stage; }
Pipes read(

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<metric_log>
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
</metric_log>
</yandex>

View File

@ -19,6 +19,7 @@ import pprint
import psycopg2
import pymongo
import pymysql
import cassandra.cluster
from dicttoxml import dicttoxml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
@ -108,6 +109,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
self.base_cassandra_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
@ -119,6 +121,7 @@ class ClickHouseCluster:
self.with_mongo = False
self.with_net_trics = False
self.with_redis = False
self.with_cassandra = False
self.with_minio = False
self.minio_host = "minio1"
@ -147,7 +150,7 @@ class ClickHouseCluster:
def add_instance(self, name, config_dir=None, main_configs=None, user_configs=None, macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False,
with_redis=False, with_minio=False, with_cassandra=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
zookeeper_docker_compose_path=None, zookeeper_use_tmpfs=True):
@ -169,7 +172,7 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs or [], user_configs or [], macros or {},
with_zookeeper,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
self.zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
self.base_configs_dir, self.server_bin_path,
self.odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=hostname,
env_variables=env_variables or {}, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address,
@ -265,6 +268,12 @@ class ClickHouseCluster:
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_minio.yml')]
cmds.append(self.base_minio_cmd)
if with_cassandra and not self.with_cassandra:
self.with_cassandra = True
self.base_cmd.extend(['--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')])
self.base_cassandra_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(DOCKER_COMPOSE_DIR, 'docker_compose_cassandra.yml')]
return instance
def get_instance_docker_id(self, instance_name):
@ -451,6 +460,18 @@ class ClickHouseCluster:
logging.warning("Can't connect to SchemaRegistry: %s", str(ex))
time.sleep(1)
def wait_cassandra_to_start(self, timeout=30):
cass_client = cassandra.cluster.Cluster(["localhost"], port="9043")
start = time.time()
while time.time() - start < timeout:
try:
cass_client.connect()
logging.info("Connected to Cassandra")
return
except Exception as ex:
logging.warning("Can't connect to Cassandra: %s", str(ex))
time.sleep(1)
def start(self, destroy_dirs=True):
if self.is_up:
return
@ -527,6 +548,10 @@ class ClickHouseCluster:
logging.info("Trying to connect to Minio...")
self.wait_minio_to_start()
if self.with_cassandra and self.base_cassandra_cmd:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d', '--force-recreate'])
self.wait_cassandra_to_start()
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
logging.info("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd)))
subprocess_check_call(clickhouse_start_cmd)
@ -656,7 +681,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macros,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio,
with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_mongo, with_redis, with_minio, with_cassandra,
base_configs_dir, server_bin_path, odbc_bridge_bin_path,
clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test",
@ -686,6 +711,7 @@ class ClickHouseInstance:
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
self.with_cassandra = with_cassandra
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')

View File

@ -2,11 +2,13 @@
import warnings
import pymysql.cursors
import pymongo
import cassandra.cluster
import redis
import aerospike
from tzlocal import get_localzone
import datetime
import os
import uuid
class ExternalSource(object):
@ -405,6 +407,73 @@ class SourceHTTPS(SourceHTTPBase):
def _get_schema(self):
return "https"
class SourceCassandra(ExternalSource):
TYPE_MAPPING = {
'UInt8': 'tinyint',
'UInt16': 'smallint',
'UInt32': 'int',
'UInt64': 'bigint',
'Int8': 'tinyint',
'Int16': 'smallint',
'Int32': 'int',
'Int64': 'bigint',
'UUID': 'uuid',
'Date': 'date',
'DateTime': 'timestamp',
'String': 'text',
'Float32': 'float',
'Float64': 'double'
}
def __init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password):
ExternalSource.__init__(self, name, internal_hostname, internal_port, docker_hostname, docker_port, user, password)
self.structure = dict()
def get_source_str(self, table_name):
return '''
<cassandra>
<host>{host}</host>
<port>{port}</port>
<keyspace>test</keyspace>
<column_family>{table}</column_family>
<allow_filtering>1</allow_filtering>
<where>"Int64_" &lt; 1000000000000000000</where>
</cassandra>
'''.format(
host=self.docker_hostname,
port=self.docker_port,
table=table_name,
)
def prepare(self, structure, table_name, cluster):
self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port)
self.session = self.client.connect()
self.session.execute("create keyspace if not exists test with replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};")
self.session.execute('drop table if exists test."{}"'.format(table_name))
self.structure[table_name] = structure
columns = ['"' + col.name + '" ' + self.TYPE_MAPPING[col.field_type] for col in structure.get_all_fields()]
keys = ['"' + col.name + '"' for col in structure.keys]
query = 'create table test."{name}" ({columns}, primary key ({pk}));'.format(
name=table_name, columns=', '.join(columns), pk=', '.join(keys))
self.session.execute(query)
self.prepared = True
def get_value_to_insert(self, value, type):
if type == 'UUID':
return uuid.UUID(value)
elif type == 'DateTime':
local_datetime = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
return get_localzone().localize(local_datetime)
return value
def load_data(self, data, table_name):
names_and_types = [(field.name, field.field_type) for field in self.structure[table_name].get_all_fields()]
columns = ['"' + col[0] + '"' for col in names_and_types]
insert = 'insert into test."{table}" ({columns}) values ({args})'.format(
table=table_name, columns=','.join(columns), args=','.join(['%s']*len(columns)))
for row in data:
values = [self.get_value_to_insert(row.get_value_by_name(col[0]), col[1]) for col in names_and_types]
self.session.execute(insert, values)
class SourceRedis(ExternalSource):
def __init__(

View File

@ -4,7 +4,7 @@ import os
from helpers.cluster import ClickHouseCluster
from dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from external_sources import SourceMySQL, SourceClickHouse, SourceFile, SourceExecutableCache, SourceExecutableHashed
from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis
from external_sources import SourceMongo, SourceMongoURI, SourceHTTP, SourceHTTPS, SourceRedis, SourceCassandra
import math
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -117,6 +117,7 @@ LAYOUTS = [
]
SOURCES = [
SourceCassandra("Cassandra", "localhost", "9043", "cassandra1", "9042", "", ""),
SourceMongo("MongoDB", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMongoURI("MongoDB_URI", "localhost", "27018", "mongo1", "27017", "root", "clickhouse"),
SourceMySQL("MySQL", "localhost", "3308", "mysql1", "3306", "root", "clickhouse"),
@ -131,7 +132,7 @@ SOURCES = [
DICTIONARIES = []
# Key-value dictionaries with onle one possible field for key
# Key-value dictionaries with only one possible field for key
SOURCES_KV = [
SourceRedis("RedisSimple", "localhost", "6380", "redis1", "6379", "", "", storage_type="simple"),
SourceRedis("RedisHash", "localhost", "6380", "redis1", "6379", "", "", storage_type="hash_map"),
@ -183,7 +184,7 @@ def setup_module(module):
for fname in os.listdir(dict_configs_path):
main_configs.append(os.path.join(dict_configs_path, fname))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True)
node = cluster.add_instance('node', main_configs=main_configs, with_mysql=True, with_mongo=True, with_redis=True, with_cassandra=True)
cluster.add_instance('clickhouse1')

View File

@ -0,0 +1,2 @@
5 1
10 2

View File

@ -0,0 +1,26 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS lv;
DROP TABLE IF EXISTS lv2;
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW lv AS SELECT sum(a) AS sum_a FROM mt PREWHERE a > 1;
CREATE LIVE VIEW lv2 AS SELECT sum(number) AS sum_number FROM system.numbers PREWHERE number > 1;
INSERT INTO mt VALUES (1),(2),(3);
SELECT *,_version FROM lv;
SELECT *,_version FROM lv PREWHERE sum_a > 5; -- { serverError 182 }
INSERT INTO mt VALUES (1),(2),(3);
SELECT *,_version FROM lv;
SELECT *,_version FROM lv PREWHERE sum_a > 10; -- { serverError 182 }
SELECT *,_version FROM lv2; -- { serverError 182 }
SELECT *,_version FROM lv2 PREWHERE sum_number > 10; -- { serverError 182 }
DROP TABLE lv;
DROP TABLE lv2;
DROP TABLE mt;

View File

@ -20,6 +20,14 @@ function execute_null()
function execute_group_by()
{
# Peak memory usage for the main query (with GROUP BY) is ~100MiB (with
# max_threads=2 as here).
# So set max_memory_usage_for_user to 150MiB and if the memory tracking
# accounting will be incorrect then the second query will fail
#
# Note that we also need one running query for the user (sleep(3)), since
# max_memory_usage_for_user is installed to 0 once there are no more
# queries for user.
local opts=(
--max_memory_usage_for_user=$((150<<20))
--max_threads=2

View File

@ -10,6 +10,11 @@ select * from system.distribution_queue;
select 'INSERT';
system stop distributed sends dist_01293;
insert into dist_01293 select * from numbers(10);
-- metrics updated only after distributed_directory_monitor_sleep_time_ms
set distributed_directory_monitor_sleep_time_ms=10;
-- 1 second should guarantee metrics update
-- XXX: but this is kind of quirk, way more better will be account this metrics without any delays.
select sleep(1) format Null;
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue;
system flush distributed dist_01293;

View File

@ -0,0 +1 @@
Loaded 1 queries.

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery --query "
DROP TABLE IF EXISTS bug;
CREATE TABLE bug (UserID UInt64, Date Date) ENGINE = MergeTree ORDER BY Date;
INSERT INTO bug SELECT rand64(), '2020-06-07' FROM numbers(50000000);
OPTIMIZE TABLE bug FINAL;"
$CLICKHOUSE_BENCHMARK --database $CLICKHOUSE_DATABASE --iterations 10 --max_threads 100 --min_bytes_to_use_direct_io 1 <<< "SELECT sum(UserID) FROM bug PREWHERE NOT ignore(Date)" 1>/dev/null 2>$CLICKHOUSE_TMP/err
cat $CLICKHOUSE_TMP/err | grep Exception
cat $CLICKHOUSE_TMP/err | grep Loaded
$CLICKHOUSE_CLIENT --multiquery --query "
DROP TABLE bug;"

View File

@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0"
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_REDIS=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0 -D ENABLE_SSL=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_CASSANDRA=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin