Merge branch 'master' into feature-server-iface-metrics

This commit is contained in:
Alexey Milovidov 2023-12-30 15:44:42 +01:00
commit aad63c8273
79 changed files with 860 additions and 458 deletions

View File

@ -33,12 +33,7 @@ curl https://clickhouse.com/ | sh
## Upcoming Events
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/296488501/) - Nov 30
* [**ClickHouse Meetup in NYC**](https://www.meetup.com/clickhouse-new-york-user-group/events/296488779/) - Dec 11
* [**ClickHouse Meetup in Sydney**](https://www.meetup.com/clickhouse-sydney-user-group/events/297638812/) - Dec 12
* [**ClickHouse Meetup in Boston**](https://www.meetup.com/clickhouse-boston-user-group/events/296488840/) - Dec 12
Also, keep an eye out for upcoming meetups around the world. Somewhere else you want us to be? Please feel free to reach out to tyler <at> clickhouse <dot> com.
Keep an eye out for upcoming meetups around the world. Somewhere else you want us to be? Please feel free to reach out to tyler <at> clickhouse <dot> com.
## Recent Recordings
* **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"

View File

@ -18,6 +18,7 @@
#ifndef POCO_UTIL_NO_XMLCONFIGURATION
#include "Poco/String.h"
#include "Poco/SAX/InputSource.h"
#include "Poco/DOM/DOMParser.h"
#include "Poco/DOM/Element.h"
@ -28,6 +29,8 @@
#include "Poco/NumberParser.h"
#include "Poco/NumberFormatter.h"
#include <unordered_map>
#include <algorithm>
#include <iterator>
namespace Poco {
@ -275,8 +278,9 @@ void XMLConfiguration::enumerate(const std::string& key, Keys& range) const
{
if (pChild->nodeType() == Poco::XML::Node::ELEMENT_NODE)
{
const std::string& nodeName = pChild->nodeName();
std::string nodeName = pChild->nodeName();
size_t& count = keys[nodeName];
replaceInPlace(nodeName, ".", "\\.");
if (count)
range.push_back(nodeName + "[" + NumberFormatter::format(count) + "]");
else
@ -379,7 +383,21 @@ Poco::XML::Node* XMLConfiguration::findNode(std::string::const_iterator& it, con
{
while (it != end && *it == _delim) ++it;
std::string key;
while (it != end && *it != _delim && *it != '[') key += *it++;
while (it != end)
{
if (*it == '\\' && std::distance(it, end) > 1)
{
// Skip backslash, copy only the char after it
std::advance(it, 1);
key += *it++;
continue;
}
if (*it == _delim)
break;
if (*it == '[')
break;
key += *it++;
}
return findNode(it, end, findElement(key, pNode, create), create);
}
}

View File

@ -4,7 +4,7 @@ sidebar_position: 63
sidebar_label: User Settings
---
# User Settings
# Users and Roles Settings
The `users` section of the `user.xml` configuration file contains user settings.
@ -187,3 +187,34 @@ The following configuration forces that user `user1` can only see the rows of `t
```
The `filter` can be any expression resulting in a [UInt8](../../sql-reference/data-types/int-uint.md)-type value. It usually contains comparisons and logical operators. Rows from `database_name.table1` where filter results to 0 are not returned for this user. The filtering is incompatible with `PREWHERE` operations and disables `WHERE→PREWHERE` optimization.
## Roles
You can create any predefined roles using the `roles` section of the `user.xml` configuration file.
Structure of the `roles` section:
```xml
<roles>
<test_role>
<grants>
<query>GRANT SHOW ON *.*</query>
<query>REVOKE SHOW ON system.*</query>
<query>GRANT CREATE ON *.* WITH GRANT OPTION</query>
</grants>
</test_role>
</roles>
```
These roles can also be granted to users from the `users` section:
```xml
<users>
<user_name>
...
<grants>
<query>GRANT test_role</query>
</grants>
</user_name>
<users>
```

View File

@ -9,11 +9,15 @@ Columns:
- `name` ([String](../../sql-reference/data-types/string.md)) — name of the error (`errorCodeToName`).
- `code` ([Int32](../../sql-reference/data-types/int-uint.md)) — code number of the error.
- `value` ([UInt64](../../sql-reference/data-types/int-uint.md)) — the number of times this error has been happened.
- `last_error_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — time when the last error happened.
- `value` ([UInt64](../../sql-reference/data-types/int-uint.md)) — the number of times this error happened.
- `last_error_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — the time when the last error happened.
- `last_error_message` ([String](../../sql-reference/data-types/string.md)) — message for the last error.
- `last_error_trace` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — A [stack trace](https://en.wikipedia.org/wiki/Stack_trace) which represents a list of physical addresses where the called methods are stored.
- `remote` ([UInt8](../../sql-reference/data-types/int-uint.md)) — remote exception (i.e. received during one of the distributed query).
- `last_error_trace` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — A [stack trace](https://en.wikipedia.org/wiki/Stack_trace) that represents a list of physical addresses where the called methods are stored.
- `remote` ([UInt8](../../sql-reference/data-types/int-uint.md)) — remote exception (i.e. received during one of the distributed queries).
:::note
Counters for some errors may increase during successful query execution. It's not recommended to use this table for server monitoring purposes unless you are sure that corresponding error can not be a false positive.
:::
**Example**

View File

@ -1,9 +1,9 @@
---
slug: /ru/getting-started/example-datasets/github-events
sidebar_label: GitHub Events
title: "GitHub Events Dataset"
title: "Набор данных о событиях на GitHub"
---
import Content from '@site/docs/en/getting-started/example-datasets/github-events.md';
Набор данных о событиях на GitHub с 2011 года по 6 декабря 2020 года содержит 3,1 млрд записей. Объём исходных данных — 75 ГБ, для загрузки в Clickhouse потребуется около 200 ГБ свободного пространства хранения (при использовании метода сжатия lz4).
<Content />
Полное описание набора, инструкции по загрузке и запросы к нему опубликованы на https://ghe.clickhouse.tech/

View File

@ -0,0 +1 @@
clickhouse

View File

@ -0,0 +1,2 @@
[[ -v $_CLICKHOUSE_COMPLETION_LOADED ]] || source "$(dirname "${BASH_SOURCE[0]}")/clickhouse-bootstrap"
_complete_clickhouse_generic chc

View File

@ -0,0 +1,2 @@
[[ -v $_CLICKHOUSE_COMPLETION_LOADED ]] || source "$(dirname "${BASH_SOURCE[0]}")/clickhouse-bootstrap"
_complete_clickhouse_generic chl

View File

@ -31,3 +31,4 @@ function _complete_for_clickhouse_entrypoint_bin()
}
_complete_clickhouse_generic clickhouse _complete_for_clickhouse_entrypoint_bin
_complete_clickhouse_generic ch _complete_for_clickhouse_entrypoint_bin

View File

@ -158,7 +158,6 @@ std::pair<std::string_view, MainFunc> clickhouse_applications[] =
std::pair<std::string_view, std::string_view> clickhouse_short_names[] =
{
#if ENABLE_CLICKHOUSE_LOCAL
{"ch", "local"},
{"chl", "local"},
#endif
#if ENABLE_CLICKHOUSE_CLIENT
@ -502,6 +501,17 @@ int main(int argc_, char ** argv_)
}
}
/// Interpret binary without argument or with arguments starts with dash
/// ('-') as clickhouse-local for better usability:
///
/// clickhouse # dumps help
/// clickhouse -q 'select 1' # use local
/// clickhouse # spawn local
/// clickhouse local # spawn local
///
if (main_func == printHelp && !argv.empty() && (argv.size() == 1 || argv[1][0] == '-'))
main_func = mainEntryClickHouseLocal;
return main_func(static_cast<int>(argv.size()), argv.data());
}
#endif

View File

@ -1,6 +1,8 @@
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <Functions/FunctionFactory.h>
@ -52,6 +54,13 @@ public:
if (!isTuple(rhs_argument_result_type))
return;
if (function_node->getResultType()->equals(DataTypeNullable(std::make_shared<DataTypeNothing>())))
/** The function `equals` can return Nullable(Nothing), e.g., in the case of (a, b) == (NULL, 1).
* On the other hand, `AND` returns Nullable(UInt8), so we would need to convert types.
* It's better to just skip this trivial case.
*/
return;
auto lhs_argument_node_type = lhs_argument->getNodeType();
auto rhs_argument_node_type = rhs_argument->getNodeType();

View File

@ -154,14 +154,14 @@ BackupCoordinationStageSync::State BackupCoordinationStageSync::readCurrentState
/// If the "alive" node doesn't exist then we don't have connection to the corresponding host.
/// This node is ephemeral so probably it will be recreated soon. We use zookeeper retries to wait.
/// In worst case when we won't manage to see the alive node for a long time we will just abort the backup.
String message;
const auto * const suffix = retries_ctl.isLastRetry() ? "" : ", will retry";
if (started)
message = fmt::format("Lost connection to host {}", host);
retries_ctl.setUserError(Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Lost connection to host {}{}", host, suffix));
else
message = fmt::format("No connection to host {} yet", host);
if (!retries_ctl.isLastRetry())
message += ", will retry";
retries_ctl.setUserError(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE, message);
retries_ctl.setUserError(Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"No connection to host {} yet{}", host, suffix));
state.disconnected_host = host;
return state;
}

View File

@ -69,10 +69,15 @@ namespace
/// Requests in backups can be extremely long, set to one hour
client_configuration.requestTimeoutMs = 60 * 60 * 1000;
S3::ClientSettings client_settings{
.use_virtual_addressing = s3_uri.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
return S3::ClientFactory::instance().create(
client_configuration,
s3_uri.is_virtual_hosted_style,
local_settings.s3_disable_checksum,
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64,

View File

@ -28,25 +28,31 @@ namespace ErrorCodes
static thread_local char thread_name[THREAD_NAME_SIZE]{};
void setThreadName(const char * name)
void setThreadName(const char * name, bool truncate)
{
if (strlen(name) > THREAD_NAME_SIZE - 1)
size_t name_len = strlen(name);
if (!truncate && name_len > THREAD_NAME_SIZE - 1)
throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes");
size_t name_capped_len = std::min<size_t>(1 + name_len, THREAD_NAME_SIZE - 1);
char name_capped[THREAD_NAME_SIZE];
memcpy(name_capped, name, name_capped_len);
name_capped[name_capped_len] = '\0';
#if defined(OS_FREEBSD)
pthread_set_name_np(pthread_self(), name);
pthread_set_name_np(pthread_self(), name_capped);
if ((false))
#elif defined(OS_DARWIN)
if (0 != pthread_setname_np(name))
if (0 != pthread_setname_np(name_capped))
#elif defined(OS_SUNOS)
if (0 != pthread_setname_np(pthread_self(), name))
if (0 != pthread_setname_np(pthread_self(), name_capped))
#else
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
if (0 != prctl(PR_SET_NAME, name_capped, 0, 0, 0))
#endif
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)");
memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
memcpy(thread_name, name_capped, name_capped_len);
}
const char * getThreadName()

View File

@ -4,7 +4,9 @@
/** Sets the thread name (maximum length is 15 bytes),
* which will be visible in ps, gdb, /proc,
* for convenience of observation and debugging.
*
* @param truncate - if true, will truncate to 15 automatically, otherwise throw
*/
void setThreadName(const char * name);
void setThreadName(const char * name, bool truncate = false);
const char * getThreadName();

View File

@ -0,0 +1,30 @@
#include <Common/Config/ConfigHelper.h>
#include <Poco/AutoPtr.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DOM/DOMParser.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, ConfigWithDotInKeys)
{
std::string xml(R"CONFIG(<clickhouse>
<foo.bar>1</foo.bar>
</clickhouse>)CONFIG");
Poco::XML::DOMParser dom_parser;
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
/// directly
EXPECT_EQ(ConfigHelper::getBool(*config, "foo.bar", false, false), false);
EXPECT_EQ(ConfigHelper::getBool(*config, "foo\\.bar", false, false), true);
/// via keys()
Poco::Util::AbstractConfiguration::Keys keys;
config->keys("", keys);
ASSERT_EQ(1, keys.size());
ASSERT_EQ("foo\\.bar", keys[0]);
}

View File

@ -23,7 +23,7 @@ int main(int, char **)
Stopwatch stopwatch;
{
DB::WriteBufferFromFile buf("test1", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::WriteBufferFromFile buf("test1", DB::DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::CompressedWriteBuffer compressed_buf(buf);
stopwatch.restart();

View File

@ -660,6 +660,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
switch (type)
{
case nuraft::cb_func::PreAppendLogLeader:
{
/// we cannot preprocess anything new as leader because we don't have up-to-date in-memory state
/// until we preprocess all stored logs
return nuraft::cb_func::ReturnCode::ReturnNull;
}
case nuraft::cb_func::InitialBatchCommited:
{
preprocess_logs();

View File

@ -13,6 +13,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/S3/PocoHTTPClient.h>
#include <IO/S3/Client.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Common/Macros.h>
@ -98,10 +99,15 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
client_configuration.endpointOverride = new_uri.endpoint;
S3::ClientSettings client_settings{
.use_virtual_addressing = new_uri.is_virtual_hosted_style,
.disable_checksum = false,
.gcs_issue_compose_request = false,
};
auto client = S3::ClientFactory::instance().create(
client_configuration,
new_uri.is_virtual_hosted_style,
/* disable_checksum= */ false,
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,

View File

@ -1000,7 +1000,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin" + params.extension));
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_11_15.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./logs/changelog_11_15.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
DB::KeeperLogStore changelog_reader(
@ -1073,7 +1073,7 @@ TEST_P(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin" + params.extension));
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./logs/changelog_1_20.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(30);
DB::KeeperLogStore changelog_reader(
@ -1130,7 +1130,7 @@ TEST_F(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./logs/changelog_1_20.bin", DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(plain_buf.size() - 30);
DB::KeeperLogStore changelog_reader(
@ -1733,7 +1733,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotBroken)
/// Let's corrupt file
DB::WriteBufferFromFile plain_buf(
"./snapshots/snapshot_50.bin" + params.extension, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
"./snapshots/snapshot_50.bin" + params.extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(34);
plain_buf.sync();
@ -2770,7 +2770,7 @@ TEST_P(CoordinationTest, TestDurableState)
{
SCOPED_TRACE("Read from corrupted file");
state_manager.reset();
DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY);
DB::WriteBufferFromFile write_buf("./state", DB::DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY);
write_buf.seek(20, SEEK_SET);
DB::writeIntBinary(31, write_buf);
write_buf.sync();
@ -2787,7 +2787,7 @@ TEST_P(CoordinationTest, TestDurableState)
SCOPED_TRACE("Read from file with invalid size");
state_manager.reset();
DB::WriteBufferFromFile write_buf("./state", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
DB::WriteBufferFromFile write_buf("./state", DB::DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
DB::writeIntBinary(20, write_buf);
write_buf.sync();
write_buf.close();

View File

@ -3,66 +3,70 @@
#include <base/defines.h>
#include <base/unit.h>
#define DBMS_DEFAULT_PORT 9000
#define DBMS_DEFAULT_SECURE_PORT 9440
#define DBMS_DEFAULT_CONNECT_TIMEOUT_SEC 10
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
namespace DB
{
static constexpr auto DBMS_DEFAULT_PORT = 9000;
static constexpr auto DBMS_DEFAULT_SECURE_PORT = 9440;
static constexpr auto DBMS_DEFAULT_CONNECT_TIMEOUT_SEC = 10;
static constexpr auto DBMS_DEFAULT_SEND_TIMEOUT_SEC = 300;
static constexpr auto DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC = 300;
/// Timeout for synchronous request-result protocol call (like Ping or TablesStatus).
#define DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10
static constexpr auto DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC = 5;
static constexpr auto DBMS_DEFAULT_POLL_INTERVAL = 10;
/// The size of the I/O buffer by default.
#define DBMS_DEFAULT_BUFFER_SIZE 1048576ULL
static constexpr auto DBMS_DEFAULT_BUFFER_SIZE = 1048576ULL;
#define PADDING_FOR_SIMD 64
static constexpr auto PADDING_FOR_SIMD = 64;
/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
*/
#define DEFAULT_BLOCK_SIZE 65409 /// 65536 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
static constexpr auto DEFAULT_BLOCK_SIZE
= 65409; /// 65536 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in = arrays
/** Which blocks should be formed for insertion into the table, if we control the formation of blocks.
* (Sometimes the blocks are inserted exactly such blocks that have been read / transmitted from the outside, and this parameter does not affect their size.)
* More than DEFAULT_BLOCK_SIZE, because in some tables a block of data on the disk is created for each block (quite a big thing),
* and if the parts were small, then it would be costly then to combine them.
*/
#define DEFAULT_INSERT_BLOCK_SIZE \
1048449 /// 1048576 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
static constexpr auto DEFAULT_INSERT_BLOCK_SIZE
= 1048449; /// 1048576 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
#define DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC 60
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
static constexpr auto DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC = 60;
static constexpr auto SHOW_CHARS_ON_SYNTAX_ERROR = ptrdiff_t(160);
/// each period reduces the error counter by 2 times
/// too short a period can cause errors to disappear immediately after creation.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 60
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD = 60;
/// replica error max cap, this is to prevent replica from accumulating too many errors and taking to long to recover.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT 1000
static constexpr auto DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT = 1000;
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
static constexpr auto DEFAULT_AIO_FILE_BLOCK_SIZE = 4096;
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 30
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
static constexpr auto DEFAULT_HTTP_READ_BUFFER_TIMEOUT = 30;
static constexpr auto DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT = 1;
/// Maximum number of http-connections between two endpoints
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
static constexpr auto DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT = 15;
#define DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT 30
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30;
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/";
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.
/// Check with IStorage class for the list of possible locks
#define DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC 120
static constexpr auto DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC = 120;
/// Default limit on recursion depth of recursive descend parser.
#define DBMS_DEFAULT_MAX_PARSER_DEPTH 1000
static constexpr auto DBMS_DEFAULT_MAX_PARSER_DEPTH = 1000;
/// Default limit on query size.
#define DBMS_DEFAULT_MAX_QUERY_SIZE 262144
static constexpr auto DBMS_DEFAULT_MAX_QUERY_SIZE = 262144;
/// Max depth of hierarchical dictionary
#define DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH 1000
static constexpr auto DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH = 1000;
/// Default maximum (total and entry) sizes and policies of various caches
static constexpr auto DEFAULT_UNCOMPRESSED_CACHE_POLICY = "SLRU";
@ -95,7 +99,9 @@ static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS = 30'000'000uz;
///
/// Look at compiler-rt/lib/sanitizer_common/sanitizer_stacktrace.h
#if !defined(SANITIZER)
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 1000000000
static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 1000000000;
#else
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 0
static constexpr auto QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS = 0;
#endif
}

View File

@ -122,7 +122,7 @@ struct CustomType
bool isSecret() const { return impl->isSecret(); }
const char * getTypeName() const { return impl->getTypeName(); }
String toString(bool show_secrets = true) const { return impl->toString(show_secrets); }
const CustomTypeImpl & getImpl() { return *impl; }
const CustomTypeImpl & getImpl() const { return *impl; }
bool operator < (const CustomType & rhs) const { return *impl < *rhs.impl; }
bool operator <= (const CustomType & rhs) const { return *impl <= *rhs.impl; }
@ -292,7 +292,7 @@ concept not_field_or_bool_or_stringlike
/** 32 is enough. Round number is used for alignment and for better arithmetic inside std::vector.
* NOTE: Actually, sizeof(std::string) is 32 when using libc++, so Field is 40 bytes.
*/
#define DBMS_MIN_FIELD_SIZE 32
static constexpr auto DBMS_MIN_FIELD_SIZE = 32;
/** Discriminated union of several types.

View File

@ -5,13 +5,16 @@
#if USE_ICU
#include <unicode/ucnv.h>
#define CHUNK_SIZE 1024
static const char * TARGET_CHARSET = "utf8";
#endif
namespace DB
{
#if USE_ICU
static constexpr auto CHUNK_SIZE = 1024;
static constexpr auto TARGET_CHARSET = "utf8";
#endif
namespace ErrorCodes
{
extern const int UNKNOWN_EXCEPTION;

View File

@ -1,77 +1,80 @@
#pragma once
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
namespace DB
{
static constexpr auto DBMS_MIN_REVISION_WITH_CLIENT_INFO = 54032;
static constexpr auto DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE = 54058;
static constexpr auto DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO = 54060;
static constexpr auto DBMS_MIN_REVISION_WITH_TABLES_STATUS = 54226;
static constexpr auto DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE = 54337;
static constexpr auto DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME = 54372;
static constexpr auto DBMS_MIN_REVISION_WITH_VERSION_PATCH = 54401;
static constexpr auto DBMS_MIN_REVISION_WITH_SERVER_LOGS = 54406;
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54448
#define DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 21
#define DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 4
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
static constexpr auto DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD = 54448;
static constexpr auto DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD = 21;
static constexpr auto DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA = 54410;
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
#define DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO 54420
static constexpr auto DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE = 54405;
static constexpr auto DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO = 54420;
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
#define DBMS_MIN_REVISION_WITH_SCALARS 54429
static constexpr auto DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS = 54429;
static constexpr auto DBMS_MIN_REVISION_WITH_SCALARS = 54429;
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
static constexpr auto DBMS_MIN_REVISION_WITH_OPENTELEMETRY = 54442;
#define DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING 54452
static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54452;
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
#define DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION 3
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
#define DBMS_MERGE_TREE_PART_INFO_VERSION 1
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
static constexpr auto DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET = 54441;
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
static constexpr auto DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO = 54443;
static constexpr auto DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO = 54447;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH = 54448;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS 54451
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451;
#define DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION 54454
static constexpr auto DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION = 54454;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME = 54449;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT 54456
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT = 54456;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED 54457
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED = 54457;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM 54458
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY 54458
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY = 54458;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS 54459
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS = 54459;
/// The server will send query elapsed run time in the Progress packet.
#define DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS 54460
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS = 54460;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES 54461
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES = 54461;
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 54462
static constexpr auto DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 = 54462;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS 54463
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS = 54463;
#define DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES 54464
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES = 54464;
#define DBMS_MIN_REVISION_WITH_SPARSE_SERIALIZATION 54465
static constexpr auto DBMS_MIN_REVISION_WITH_SPARSE_SERIALIZATION = 54465;
#define DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION 54466
static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// Version of ClickHouse TCP protocol.
///
@ -80,4 +83,6 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54466
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54466;
}

View File

@ -4,6 +4,9 @@
#include "GeodataProviders/HierarchiesProvider.h"
#include "GeodataProviders/NamesProvider.h"
namespace DB
{
std::unique_ptr<RegionsHierarchies> GeoDictionariesLoader::reloadRegionsHierarchies(const Poco::Util::AbstractConfiguration & config)
{
static constexpr auto config_key = "path_to_regions_hierarchy_file";
@ -27,3 +30,5 @@ std::unique_ptr<RegionsNames> GeoDictionariesLoader::reloadRegionsNames(const Po
auto data_provider = std::make_unique<RegionsNamesDataProvider>(directory);
return std::make_unique<RegionsNames>(std::move(data_provider));
}
}

View File

@ -6,6 +6,9 @@
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
// Default implementation of geo dictionaries loader used by native server application
class GeoDictionariesLoader
{
@ -13,3 +16,5 @@ public:
static std::unique_ptr<RegionsHierarchies> reloadRegionsHierarchies(const Poco::Util::AbstractConfiguration & config);
static std::unique_ptr<RegionsNames> reloadRegionsNames(const Poco::Util::AbstractConfiguration & config);
};
}

View File

@ -3,6 +3,9 @@
#include <string>
#include "Types.h"
namespace DB
{
struct RegionEntry
{
RegionID id;
@ -17,3 +20,5 @@ struct RegionNameEntry
RegionID id;
std::string name;
};
}

View File

@ -9,6 +9,9 @@
namespace fs = std::filesystem;
namespace DB
{
bool RegionsHierarchyDataSource::isModified() const
{
return updates_tracker.isModified();
@ -17,7 +20,7 @@ bool RegionsHierarchyDataSource::isModified() const
IRegionsHierarchyReaderPtr RegionsHierarchyDataSource::createReader()
{
updates_tracker.fixCurrentVersion();
auto file_reader = std::make_shared<DB::ReadBufferFromFile>(path);
auto file_reader = std::make_shared<ReadBufferFromFile>(path);
return std::make_unique<RegionsHierarchyFormatReader>(std::move(file_reader));
}
@ -73,3 +76,5 @@ IRegionsHierarchyDataSourcePtr RegionsHierarchiesDataProvider::getHierarchySourc
throw Poco::Exception("Regions hierarchy '" + name + "' not found");
}
}

View File

@ -5,6 +5,8 @@
#include <unordered_map>
#include <Common/FileUpdatesTracker.h>
namespace DB
{
// Represents local file with regions hierarchy dump
class RegionsHierarchyDataSource : public IRegionsHierarchyDataSource
@ -50,3 +52,5 @@ public:
private:
void discoverFilesWithCustomHierarchies();
};
}

View File

@ -3,6 +3,8 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
bool RegionsHierarchyFormatReader::readNext(RegionEntry & entry)
{
@ -15,11 +17,11 @@ bool RegionsHierarchyFormatReader::readNext(RegionEntry & entry)
Int32 read_parent_id = 0;
Int8 read_type = 0;
DB::readIntText(read_region_id, *input);
DB::assertChar('\t', *input);
DB::readIntText(read_parent_id, *input);
DB::assertChar('\t', *input);
DB::readIntText(read_type, *input);
readIntText(read_region_id, *input);
assertChar('\t', *input);
readIntText(read_parent_id, *input);
assertChar('\t', *input);
readIntText(read_type, *input);
/** Then there can be a newline (old version)
* or tab, the region's population, line feed (new version).
@ -29,11 +31,11 @@ bool RegionsHierarchyFormatReader::readNext(RegionEntry & entry)
{
++input->position();
UInt64 population_big = 0;
DB::readIntText(population_big, *input);
readIntText(population_big, *input);
population = population_big > std::numeric_limits<RegionPopulation>::max() ? std::numeric_limits<RegionPopulation>::max()
: static_cast<RegionPopulation>(population_big);
}
DB::assertChar('\n', *input);
assertChar('\n', *input);
if (read_region_id <= 0 || read_type < 0)
continue;
@ -55,3 +57,5 @@ bool RegionsHierarchyFormatReader::readNext(RegionEntry & entry)
return false;
}
}

View File

@ -3,15 +3,19 @@
#include <IO/ReadBuffer.h>
#include "IHierarchiesProvider.h"
namespace DB
{
// Reads regions hierarchy in geoexport format
class RegionsHierarchyFormatReader : public IRegionsHierarchyReader
{
private:
DB::ReadBufferPtr input;
ReadBufferPtr input;
public:
explicit RegionsHierarchyFormatReader(DB::ReadBufferPtr input_) : input(std::move(input_)) {}
explicit RegionsHierarchyFormatReader(ReadBufferPtr input_) : input(std::move(input_)) {}
bool readNext(RegionEntry & entry) override;
};
}

View File

@ -5,6 +5,8 @@
#include <vector>
#include "Entries.h"
namespace DB
{
// Iterates over all regions in data source
class IRegionsHierarchyReader
@ -46,3 +48,5 @@ public:
};
using IRegionsHierarchiesDataProviderPtr = std::shared_ptr<IRegionsHierarchiesDataProvider>;
}

View File

@ -3,6 +3,8 @@
#include <memory>
#include "Entries.h"
namespace DB
{
// Iterates over all name entries in data source
class ILanguageRegionsNamesReader
@ -49,3 +51,5 @@ public:
};
using IRegionsNamesDataProviderPtr = std::unique_ptr<IRegionsNamesDataProvider>;
}

View File

@ -2,6 +2,8 @@
#include <IO/ReadHelpers.h>
namespace DB
{
bool LanguageRegionsNamesFormatReader::readNext(RegionNameEntry & entry)
{
@ -10,10 +12,10 @@ bool LanguageRegionsNamesFormatReader::readNext(RegionNameEntry & entry)
Int32 read_region_id;
std::string region_name;
DB::readIntText(read_region_id, *input);
DB::assertChar('\t', *input);
DB::readString(region_name, *input);
DB::assertChar('\n', *input);
readIntText(read_region_id, *input);
assertChar('\t', *input);
readString(region_name, *input);
assertChar('\n', *input);
if (read_region_id <= 0)
continue;
@ -25,3 +27,5 @@ bool LanguageRegionsNamesFormatReader::readNext(RegionNameEntry & entry)
return false;
}
}

View File

@ -3,15 +3,19 @@
#include <IO/ReadBuffer.h>
#include "INamesProvider.h"
namespace DB
{
// Reads regions names list in geoexport format
class LanguageRegionsNamesFormatReader : public ILanguageRegionsNamesReader
{
private:
DB::ReadBufferPtr input;
ReadBufferPtr input;
public:
explicit LanguageRegionsNamesFormatReader(DB::ReadBufferPtr input_) : input(std::move(input_)) {}
explicit LanguageRegionsNamesFormatReader(ReadBufferPtr input_) : input(std::move(input_)) {}
bool readNext(RegionNameEntry & entry) override;
};
}

View File

@ -6,6 +6,9 @@
namespace fs = std::filesystem;
namespace DB
{
bool LanguageRegionsNamesDataSource::isModified() const
{
return updates_tracker.isModified();
@ -19,7 +22,7 @@ size_t LanguageRegionsNamesDataSource::estimateTotalSize() const
ILanguageRegionsNamesReaderPtr LanguageRegionsNamesDataSource::createReader()
{
updates_tracker.fixCurrentVersion();
auto file_reader = std::make_shared<DB::ReadBufferFromFile>(path);
auto file_reader = std::make_shared<ReadBufferFromFile>(path);
return std::make_unique<LanguageRegionsNamesFormatReader>(std::move(file_reader));
}
@ -51,3 +54,5 @@ std::string RegionsNamesDataProvider::getDataFilePath(const std::string & langua
{
return directory + "/regions_names_" + language + ".txt";
}
}

View File

@ -3,6 +3,8 @@
#include <Common/FileUpdatesTracker.h>
#include "INamesProvider.h"
namespace DB
{
// Represents local file with list of regions ids / names
class LanguageRegionsNamesDataSource : public ILanguageRegionsNamesDataSource
@ -46,3 +48,5 @@ public:
private:
std::string getDataFilePath(const std::string & language) const;
};
}

View File

@ -2,6 +2,8 @@
#include <base/types.h>
namespace DB
{
using RegionID = UInt32;
using RegionDepth = UInt8;
@ -16,3 +18,5 @@ enum class RegionType : Int8
Area = 5,
City = 6,
};
}

View File

@ -3,6 +3,8 @@
#include <Poco/DirectoryIterator.h>
#include <Common/logger_useful.h>
namespace DB
{
RegionsHierarchies::RegionsHierarchies(IRegionsHierarchiesDataProviderPtr data_provider)
{
@ -19,3 +21,5 @@ RegionsHierarchies::RegionsHierarchies(IRegionsHierarchiesDataProviderPtr data_p
reload();
}
}

View File

@ -5,6 +5,8 @@
#include "GeodataProviders/IHierarchiesProvider.h"
#include "RegionsHierarchy.h"
namespace DB
{
/** Contains several hierarchies of regions.
* Used to support several different perspectives on the ownership of regions by countries.
@ -37,3 +39,5 @@ public:
return it->second;
}
};
}

View File

@ -12,7 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
extern const int LOGICAL_ERROR;
}
@ -54,9 +54,8 @@ void RegionsHierarchy::reload()
if (region_entry.id > max_region_id)
{
if (region_entry.id > max_size)
throw DB::Exception(DB::ErrorCodes::INCORRECT_DATA,
"Region id is too large: {}, should be not more than {}",
DB::toString(region_entry.id), DB::toString(max_size));
throw Exception(
ErrorCodes::INCORRECT_DATA, "Region id is too large: {}, should be not more than {}", region_entry.id, max_size);
max_region_id = region_entry.id;
@ -112,16 +111,18 @@ void RegionsHierarchy::reload()
++depth;
if (depth == std::numeric_limits<RegionDepth>::max())
throw Poco::Exception(
"Logical error in regions hierarchy: region " + DB::toString(current) + " possible is inside infinite loop");
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Logical error in regions hierarchy: region {} possible is inside infinite loop", current);
current = new_parents[current];
if (current == 0)
break;
if (current > max_region_id)
throw Poco::Exception(
"Logical error in regions hierarchy: region " + DB::toString(current) + " (specified as parent) doesn't exist");
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Logical error in regions hierarchy: region {} (specified as parent) doesn't exist",
current);
if (types[current] == RegionType::City)
new_city[i] = current;
@ -156,3 +157,5 @@ void RegionsHierarchy::reload()
populations.swap(new_populations);
depths.swap(new_depths);
}
}

View File

@ -6,6 +6,8 @@
#include "GeodataProviders/IHierarchiesProvider.h"
#include <Core/Defines.h>
namespace DB
{
class IRegionsHierarchyDataProvider;
@ -129,3 +131,5 @@ public:
return populations[region];
}
};
}

View File

@ -10,12 +10,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
}
RegionsNames::RegionsNames(IRegionsNamesDataProviderPtr data_provider)
{
@ -30,7 +30,7 @@ RegionsNames::RegionsNames(IRegionsNamesDataProviderPtr data_provider)
std::string RegionsNames::dumpSupportedLanguagesNames()
{
DB::WriteBufferFromOwnString out;
WriteBufferFromOwnString out;
for (size_t i = 0; i < total_languages; ++i)
{
if (i > 0)
@ -74,7 +74,8 @@ void RegionsNames::reload()
size_t old_size = new_chars.size();
if (new_chars.capacity() < old_size + name_entry.name.length() + 1)
throw Poco::Exception("Logical error. Maybe size estimate of " + names_source->getSourceName() + " is wrong.");
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Logical error. Maybe size estimate of {} is wrong", names_source->getSourceName());
new_chars.resize(old_size + name_entry.name.length() + 1);
memcpy(new_chars.data() + old_size, name_entry.name.c_str(), name_entry.name.length() + 1);
@ -84,9 +85,8 @@ void RegionsNames::reload()
max_region_id = name_entry.id;
if (name_entry.id > max_size)
throw DB::Exception(DB::ErrorCodes::INCORRECT_DATA,
"Region id is too large: {}, should be not more than {}",
DB::toString(name_entry.id), DB::toString(max_size));
throw Exception(
ErrorCodes::INCORRECT_DATA, "Region id is too large: {}, should be not more than {}", name_entry.id, max_size);
}
while (name_entry.id >= new_names_refs.size())
@ -102,3 +102,5 @@ void RegionsNames::reload()
for (size_t language_id = 0; language_id < total_languages; ++language_id)
names_refs[language_id].resize(max_region_id + 1, StringRef("", 0));
}
}

View File

@ -7,6 +7,8 @@
#include <base/types.h>
#include "GeodataProviders/INamesProvider.h"
namespace DB
{
/** A class that allows you to recognize by region id its text name in one of the supported languages.
*
@ -111,3 +113,5 @@ public:
void reload();
};
}

View File

@ -1,4 +1,5 @@
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include "IO/S3/Client.h"
#if USE_AWS_S3
@ -93,10 +94,15 @@ std::unique_ptr<S3::Client> getClient(
HTTPHeaderEntries headers = S3::getHTTPHeaders(config_prefix, config);
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
S3::ClientSettings client_settings{
.use_virtual_addressing = uri.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = config.getBool("s3.gcs_issue_compose_request", false),
};
return S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
local_settings.s3_disable_checksum,
client_settings,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),

View File

@ -39,6 +39,7 @@ public:
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override

View File

@ -91,19 +91,6 @@ namespace
const auto type_arr_from_nested = type_arr_from->getNestedType();
auto src = tryGetLeastSupertype(DataTypes{type_x, type_arr_from_nested});
if (!src
/// Compatibility with previous versions, that allowed even UInt64 with Int64,
/// regardless of ambiguous conversions.
&& !isNativeNumber(type_x) && !isNativeNumber(type_arr_from_nested))
{
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument and elements of array "
"of the second argument of function {} must have compatible types",
getName());
}
const DataTypeArray * type_arr_to = checkAndGetDataType<DataTypeArray>(arguments[2].get());
if (!type_arr_to)
@ -766,15 +753,18 @@ namespace
}
}
WhichDataType which(from_type);
/// Note: Doesn't check the duplicates in the `from` array.
/// Field may be of Float type, but for the purpose of bitwise equality we can treat them as UInt64
if (WhichDataType which(from_type); isNativeNumber(which) || which.isDecimal32() || which.isDecimal64())
if (isNativeNumber(which) || which.isDecimal32() || which.isDecimal64() || which.isEnum())
{
cache.table_num_to_idx = std::make_unique<Cache::NumToIdx>();
auto & table = *cache.table_num_to_idx;
for (size_t i = 0; i < size; ++i)
{
if (applyVisitor(FieldVisitorAccurateEquals(), (*cache.from_column)[i], (*from_column_uncasted)[i]))
if (which.isEnum() /// The correctness of strings are already checked by casting them to the Enum type.
|| applyVisitor(FieldVisitorAccurateEquals(), (*cache.from_column)[i], (*from_column_uncasted)[i]))
{
UInt64 key = 0;
auto * dst = reinterpret_cast<char *>(&key);

View File

@ -125,12 +125,11 @@ std::unique_ptr<Client> Client::create(
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing,
bool disable_checksum)
const ClientSettings & client_settings)
{
verifyClientConfiguration(client_configuration);
return std::unique_ptr<Client>(
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing, disable_checksum));
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, client_settings));
}
std::unique_ptr<Client> Client::clone() const
@ -160,14 +159,12 @@ Client::Client(
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const PocoHTTPClientConfiguration & client_configuration_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads_,
bool use_virtual_addressing_,
bool disable_checksum_)
: Aws::S3::S3Client(credentials_provider_, client_configuration_, sign_payloads_, use_virtual_addressing_)
const ClientSettings & client_settings_)
: Aws::S3::S3Client(credentials_provider_, client_configuration_, sign_payloads_, client_settings_.use_virtual_addressing)
, credentials_provider(credentials_provider_)
, client_configuration(client_configuration_)
, sign_payloads(sign_payloads_)
, use_virtual_addressing(use_virtual_addressing_)
, disable_checksum(disable_checksum_)
, client_settings(client_settings_)
, max_redirects(max_redirects_)
, sse_kms_config(std::move(sse_kms_config_))
, log(&Poco::Logger::get("S3Client"))
@ -207,13 +204,12 @@ Client::Client(
Client::Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration_)
: Aws::S3::S3Client(other.credentials_provider, client_configuration_, other.sign_payloads,
other.use_virtual_addressing)
other.client_settings.use_virtual_addressing)
, initial_endpoint(other.initial_endpoint)
, credentials_provider(other.credentials_provider)
, client_configuration(client_configuration_)
, sign_payloads(other.sign_payloads)
, use_virtual_addressing(other.use_virtual_addressing)
, disable_checksum(other.disable_checksum)
, client_settings(other.client_settings)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, provider_type(other.provider_type)
@ -417,7 +413,7 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(CompleteMu
outcome = Aws::S3::Model::CompleteMultipartUploadOutcome(Aws::S3::Model::CompleteMultipartUploadResult());
}
if (outcome.IsSuccess() && provider_type == ProviderType::GCS)
if (outcome.IsSuccess() && provider_type == ProviderType::GCS && client_settings.gcs_issue_compose_request)
{
/// For GCS we will try to compose object at the end, otherwise we cannot do a native copy
/// for the object (e.g. for backups)
@ -515,7 +511,7 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
addAdditionalAMZHeadersToCanonicalHeadersList(request, client_configuration.extra_headers);
const auto & bucket = request.GetBucket();
request.setApiMode(api_mode);
if (disable_checksum)
if (client_settings.disable_checksum)
request.disableChecksum();
if (auto region = getRegionForBucket(bucket); !region.empty())
@ -852,8 +848,7 @@ ClientFactory & ClientFactory::instance()
std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
const PocoHTTPClientConfiguration & cfg_,
bool is_virtual_hosted_style,
bool disable_checksum,
ClientSettings client_settings,
const String & access_key_id,
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,
@ -892,14 +887,17 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(client_configuration.s3_retry_attempts);
/// Use virtual addressing if endpoint is not specified.
if (client_configuration.endpointOverride.empty())
client_settings.use_virtual_addressing = true;
return Client::create(
client_configuration.s3_max_redirects,
std::move(sse_kms_config),
credentials_provider,
client_configuration, // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
is_virtual_hosted_style || client_configuration.endpointOverride.empty(), /// Use virtual addressing if endpoint is not specified.
disable_checksum
client_settings
);
}

View File

@ -92,6 +92,23 @@ private:
std::unordered_map<ClientCache *, std::weak_ptr<ClientCache>> client_caches;
};
struct ClientSettings
{
bool use_virtual_addressing;
/// Disable checksum to avoid extra read of the input stream
bool disable_checksum;
/// Should client send ComposeObject request after upload to GCS.
///
/// Previously ComposeObject request was required to make Copy possible,
/// but not anymore (see [1]).
///
/// [1]: https://cloud.google.com/storage/docs/release-notes#June_23_2023
///
/// Ability to enable it preserved since likely it is required for old
/// files.
bool gcs_issue_compose_request;
};
/// Client that improves the client from the AWS SDK
/// - inject region and URI into requests so they are rerouted to the correct destination if needed
/// - automatically detect endpoint and regions for each bucket and cache them
@ -116,8 +133,7 @@ public:
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing,
bool disable_checksum);
const ClientSettings & client_settings);
std::unique_ptr<Client> clone() const;
@ -195,7 +211,6 @@ public:
Model::DeleteObjectsOutcome DeleteObjects(DeleteObjectsRequest & request) const;
using ComposeObjectOutcome = Aws::Utils::Outcome<Aws::NoResult, Aws::S3::S3Error>;
ComposeObjectOutcome ComposeObject(ComposeObjectRequest & request) const;
using Aws::S3::S3Client::EnableRequestProcessing;
using Aws::S3::S3Client::DisableRequestProcessing;
@ -212,8 +227,7 @@ private:
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing,
bool disable_checksum_);
const ClientSettings & client_settings_);
Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration);
@ -236,6 +250,8 @@ private:
using Aws::S3::S3Client::DeleteObject;
using Aws::S3::S3Client::DeleteObjects;
ComposeObjectOutcome ComposeObject(ComposeObjectRequest & request) const;
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
doRequest(RequestType & request, RequestFn request_fn) const;
@ -258,8 +274,7 @@ private:
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
PocoHTTPClientConfiguration client_configuration;
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads;
bool use_virtual_addressing;
bool disable_checksum;
ClientSettings client_settings;
std::string explicit_region;
mutable bool detect_region = true;
@ -289,8 +304,7 @@ public:
std::unique_ptr<S3::Client> create(
const PocoHTTPClientConfiguration & cfg,
bool is_virtual_hosted_style,
bool disable_checksum,
ClientSettings client_settings,
const String & access_key_id,
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,

View File

@ -94,7 +94,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
client,
uri.bucket,
uri.key,
DBMS_DEFAULT_BUFFER_SIZE,
DB::DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
{}
);
@ -140,10 +140,15 @@ void testServerSideEncryption(
bool use_environment_credentials = false;
bool use_insecure_imds_request = false;
DB::S3::ClientSettings client_settings{
.use_virtual_addressing = uri.is_virtual_hosted_style,
.disable_checksum = disable_checksum,
.gcs_issue_compose_request = false,
};
std::shared_ptr<DB::S3::Client> client = DB::S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
disable_checksum,
client_settings,
access_key_id,
secret_access_key,
server_side_encryption_customer_key_base64,

View File

@ -19,7 +19,7 @@ try
{
auto buf
= std::make_unique<DB::WriteBufferFromFile>("test_lzma_buffers.xz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
= std::make_unique<DB::WriteBufferFromFile>("test_lzma_buffers.xz", DB::DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::LZMADeflatingWriteBuffer lzma_buf(std::move(buf), /*compression level*/ 3);
stopwatch.restart();

View File

@ -21,7 +21,7 @@ try
Stopwatch stopwatch;
{
auto buf = std::make_unique<DB::WriteBufferFromFile>("test_zlib_buffers.gz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
auto buf = std::make_unique<DB::WriteBufferFromFile>("test_zlib_buffers.gz", DB::DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZlibDeflatingWriteBuffer deflating_buf(std::move(buf), DB::CompressionMethod::Gzip, /* compression_level = */ 3);
stopwatch.restart();

View File

@ -21,7 +21,7 @@ try
{
auto buf
= std::make_unique<DB::WriteBufferFromFile>("test_zstd_buffers.zst", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
= std::make_unique<DB::WriteBufferFromFile>("test_zstd_buffers.zst", DB::DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZstdDeflatingWriteBuffer zstd_buf(std::move(buf), /*compression level*/ 3);
stopwatch.restart();

View File

@ -210,10 +210,13 @@ struct Client : DB::S3::Client
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>("", ""),
GetClientConfiguration(),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
/* use_virtual_addressing = */ true,
/* disable_checksum_= */ false)
DB::S3::ClientSettings{
.use_virtual_addressing = true,
.disable_checksum= false,
.gcs_issue_compose_request = false,
})
, store(mock_s3_store)
{ }
{}
static std::shared_ptr<Client> CreateClient(String bucket = "mock-s3-bucket")
{

View File

@ -12,14 +12,13 @@
namespace Poco { class Logger; namespace Util { class AbstractConfiguration; } }
namespace DB
{
class RegionsHierarchies;
class RegionsNames;
class GeoDictionariesLoader;
namespace DB
{
/// Metrica's Dictionaries which can be used in functions.
class EmbeddedDictionaries : WithContext

View File

@ -1335,8 +1335,7 @@ static void buildIndexes(
filter_actions_dag,
context,
primary_key_column_names,
primary_key.expression,
array_join_name_set}, {}, {}, {}, {}, false, {}});
primary_key.expression}, {}, {}, {}, {}, false, {}});
}
else
{
@ -1353,7 +1352,7 @@ static void buildIndexes(
auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key);
auto minmax_expression_actions = data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context));
indexes->minmax_idx_condition.emplace(filter_actions_dag, context, minmax_columns_names, minmax_expression_actions, NameSet());
indexes->minmax_idx_condition.emplace(filter_actions_dag, context, minmax_columns_names, minmax_expression_actions);
indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
}

View File

@ -374,7 +374,7 @@ Pipe ReadFromSystemNumbersStep::makePipe()
num_streams = 1;
/// Build rpn of query filters
KeyCondition condition(buildFilterDAG(), context, column_names, key_expression, NameSet{});
KeyCondition condition(buildFilterDAG(), context, column_names, key_expression);
Pipe pipe;
Ranges ranges;

View File

@ -41,8 +41,7 @@ protected:
filter_actions_dag,
context,
keys.getNames(),
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())),
NameSet{});
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>(keys.getColumnsWithTypeAndName())));
}
}

View File

@ -13,6 +13,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>
namespace CurrentMetrics
{
@ -46,15 +47,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms;
KafkaConsumer::KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_,
const Names & _topics)
: consumer(consumer_)
, log(log_)
: log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
@ -63,6 +62,25 @@ KafkaConsumer::KafkaConsumer(
, topics(_topics)
, exceptions_buffer(EXCEPTIONS_DEPTH)
{
}
void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
{
chassert(!consumer.get());
/// Using this should be safe, since cppkafka::Consumer can poll messages
/// (including statistics, which will trigger the callback below) only via
/// KafkaConsumer.
if (consumer_config.get("statistics.interval.ms") != "0")
{
consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json)
{
setRDKafkaStat(stat_json);
});
}
consumer = std::make_shared<cppkafka::Consumer>(consumer_config);
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
@ -133,8 +151,30 @@ KafkaConsumer::KafkaConsumer(
});
}
ConsumerPtr && KafkaConsumer::moveConsumer()
{
cleanUnprocessed();
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: {}", e.what());
}
drain();
}
return std::move(consumer);
}
KafkaConsumer::~KafkaConsumer()
{
if (!consumer)
return;
cleanUnprocessed();
try
{
if (!consumer->get_subscription().empty())
@ -154,7 +194,6 @@ KafkaConsumer::~KafkaConsumer()
{
LOG_ERROR(log, "Error while destructing consumer: {}", e.what());
}
}
// Needed to drain rest of the messages / queued callback calls from the consumer
@ -568,6 +607,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr
*/
std::string KafkaConsumer::getMemberId() const
{
if (!consumer)
return "";
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
std::string memberid_string = memberid_ptr;
rd_kafka_mem_free(nullptr, memberid_ptr);
@ -578,8 +620,14 @@ std::string KafkaConsumer::getMemberId() const
KafkaConsumer::Stat KafkaConsumer::getStat() const
{
KafkaConsumer::Stat::Assignments assignments;
auto cpp_assignments = consumer->get_assignment();
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
cppkafka::TopicPartitionList cpp_assignments;
cppkafka::TopicPartitionList cpp_offsets;
if (consumer)
{
cpp_assignments = consumer->get_assignment();
cpp_offsets = consumer->get_offsets_position(cpp_assignments);
}
for (size_t num = 0; num < cpp_assignments.size(); ++num)
{
@ -591,7 +639,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
}
return {
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
.consumer_id = getMemberId(),
.assignments = std::move(assignments),
.last_poll_time = last_poll_timestamp_usec.load(),
.num_messages_read = num_messages_read.load(),
@ -601,11 +649,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
.num_commits = num_commits.load(),
.num_rebalance_assignments = num_rebalance_assignments.load(),
.num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.exceptions_buffer = [&]()
{
std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;
}(),
.in_use = in_use.load(),
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;}(),
.last_used_usec = last_used_usec.load(),
.rdkafka_stat = [&]()
{
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;
}(),
};
}

View File

@ -57,12 +57,11 @@ public:
UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use;
UInt64 last_used_usec;
std::string rdkafka_stat;
};
public:
KafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
@ -72,6 +71,11 @@ public:
);
~KafkaConsumer();
void createConsumer(cppkafka::Configuration consumer_config);
bool hasConsumer() const { return consumer.get() != nullptr; }
ConsumerPtr && moveConsumer();
void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
@ -113,11 +117,20 @@ public:
rdkafka_stat = stat_json_string;
}
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
void notInUse()
{
in_use = false;
last_used_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
// For system.kafka_consumers
Stat getStat() const;
bool isInUse() const { return in_use; }
UInt64 getLastUsedUsec() const { return last_used_usec; }
std::string getMemberId() const;
private:
using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
@ -132,6 +145,10 @@ private:
ERRORS_RETURNED
};
// order is important, need to be destructed *after* consumer
mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
ConsumerPtr consumer;
Poco::Logger * log;
const size_t batch_size = 1;
@ -145,11 +162,11 @@ private:
const std::atomic<bool> & stopped;
// order is important, need to be destructed before consumer
// order is important, need to be destructed *before* consumer
Messages messages;
Messages::const_iterator current;
// order is important, need to be destructed before consumer
// order is important, need to be destructed *before* consumer
std::optional<cppkafka::TopicPartitionList> assignment;
const Names topics;
@ -168,9 +185,8 @@ private:
std::atomic<UInt64> num_rebalance_assignments = 0;
std::atomic<UInt64> num_rebalance_revocations = 0;
std::atomic<bool> in_use = 0;
mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
/// Last used time (for TTL)
std::atomic<UInt64> last_used_usec = 0;
void drain();
void cleanUnprocessed();
@ -178,8 +194,6 @@ private:
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();
std::string getMemberId() const;
};
}

View File

@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
@ -38,4 +39,15 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
}
}
void KafkaSettings::sanityCheck() const
{
if (kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})",
kafka_consumers_pool_ttl_ms, KAFKA_RESCHEDULE_MS);
if (kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be too big (greater then {}), since this may cause live memory leaks",
kafka_consumers_pool_ttl_ms, KAFKA_CONSUMERS_POOL_TTL_MS_MAX);
}
}

View File

@ -8,6 +8,12 @@ namespace DB
{
class ASTStorage;
const auto KAFKA_RESCHEDULE_MS = 500;
const auto KAFKA_CLEANUP_TIMEOUT_MS = 3000;
// once per minute leave do reschedule (we can't lock threads in pool forever)
const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
// 10min
const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
#define KAFKA_RELATED_SETTINGS(M, ALIAS) \
M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
@ -25,6 +31,7 @@ class ASTStorage;
/* default is stream_poll_timeout_ms */ \
M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \
M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
M(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \
/* default is stream_flush_interval_ms */ \
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
@ -53,6 +60,8 @@ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
struct KafkaSettings : public BaseSettings<KafkaSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
void sanityCheck() const;
};
}

View File

@ -27,10 +27,12 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <base/getFQDNOrHostName.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <cppkafka/configuration.h>
#include <librdkafka/rdkafka.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
@ -45,6 +47,7 @@
#include <Common/config_version.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <base/sleep.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
@ -76,6 +79,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int QUERY_NOT_ALLOWED;
extern const int ABORTED;
}
struct StorageKafkaInterceptors
@ -172,10 +176,6 @@ struct StorageKafkaInterceptors
namespace
{
const auto RESCHEDULE_MS = 500;
const auto CLEANUP_TIMEOUT_MS = 3000;
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
const String CONFIG_KAFKA_TAG = "kafka";
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_NAME_TAG = "name";
@ -262,17 +262,19 @@ StorageKafka::StorageKafka(
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value)
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers))
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
, settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
, collection_name(collection_name_)
{
kafka_settings->sanityCheck();
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
{
kafka_settings->input_format_allow_errors_num = 0;
kafka_settings->input_format_allow_errors_ratio = 0;
}
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
@ -283,6 +285,18 @@ StorageKafka::StorageKafka(
task->deactivate();
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}
consumers.resize(num_consumers);
for (size_t i = 0; i < num_consumers; ++i)
consumers[i] = createKafkaConsumer(i);
cleanup_thread = std::make_unique<ThreadFromGlobalPool>([this]()
{
const auto & table = getStorageID().getTableName();
const auto & thread_name = std::string("KfkCln:") + table;
setThreadName(thread_name.c_str(), /*truncate=*/ true);
cleanConsumers();
});
}
SettingsChanges StorageKafka::createSettingsAdjustments()
@ -343,8 +357,8 @@ Pipe StorageKafka::read(
size_t /* max_block_size */,
size_t /* num_streams */)
{
if (num_created_consumers == 0)
return {};
if (shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Table is detached");
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
@ -357,12 +371,12 @@ Pipe StorageKafka::read(
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_created_consumers);
pipes.reserve(num_consumers);
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_created_consumers; ++i)
for (size_t i = 0; i < num_consumers; ++i)
{
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
@ -412,21 +426,6 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
void StorageKafka::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
try
{
auto consumer = createConsumer(i);
pushConsumer(consumer);
all_consumers.push_back(consumer);
++num_created_consumers;
}
catch (const cppkafka::Exception &)
{
tryLogCurrentException(log);
}
}
// Start the reader thread
for (auto & task : tasks)
{
@ -437,21 +436,48 @@ void StorageKafka::startup()
void StorageKafka::shutdown(bool)
{
for (auto & task : tasks)
{
// Interrupt streaming thread
task->stream_cancelled = true;
shutdown_called = true;
cleanup_cv.notify_one();
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
{
LOG_TRACE(log, "Waiting for consumers cleanup thread");
Stopwatch watch;
if (cleanup_thread)
{
cleanup_thread->join();
cleanup_thread.reset();
}
LOG_TRACE(log, "Consumers cleanup thread finished in {} ms.", watch.elapsedMilliseconds());
}
LOG_TRACE(log, "Closing consumers");
for (size_t i = 0; i < num_created_consumers; ++i)
auto consumer = popConsumer();
LOG_TRACE(log, "Consumers closed");
{
LOG_TRACE(log, "Waiting for streaming jobs");
Stopwatch watch;
for (auto & task : tasks)
{
// Interrupt streaming thread
task->stream_cancelled = true;
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
LOG_TEST(log, "Waiting for cleanup of a task");
task->holder->deactivate();
}
LOG_TRACE(log, "Streaming jobs finished in {} ms.", watch.elapsedMilliseconds());
}
{
std::lock_guard lock(mutex);
LOG_TRACE(log, "Closing {} consumers", consumers.size());
Stopwatch watch;
consumers.clear();
LOG_TRACE(log, "Consumers closed. Took {} ms.", watch.elapsedMilliseconds());
}
{
LOG_TRACE(log, "Waiting for final cleanup");
Stopwatch watch;
rd_kafka_wait_destroyed(KAFKA_CLEANUP_TIMEOUT_MS);
LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), KAFKA_CLEANUP_TIMEOUT_MS);
}
}
@ -459,8 +485,7 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
{
std::lock_guard lock(mutex);
consumer->notInUse();
consumers.push_back(consumer);
semaphore.set();
cv.notify_one();
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
}
@ -473,26 +498,88 @@ KafkaConsumerPtr StorageKafka::popConsumer()
KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
{
// Wait for the first free buffer
if (timeout == std::chrono::milliseconds::zero())
semaphore.wait();
else
std::unique_lock lock(mutex);
KafkaConsumerPtr ret_consumer_ptr;
std::optional<size_t> closed_consumer_index;
for (size_t i = 0; i < consumers.size(); ++i)
{
if (!semaphore.tryWait(timeout.count()))
return nullptr;
auto & consumer_ptr = consumers[i];
if (consumer_ptr->isInUse())
continue;
if (consumer_ptr->hasConsumer())
{
ret_consumer_ptr = consumer_ptr;
break;
}
if (!closed_consumer_index.has_value() && !consumer_ptr->hasConsumer())
{
closed_consumer_index = i;
}
}
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto consumer = consumers.back();
consumers.pop_back();
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
consumer->inUse();
return consumer;
/// 1. There is consumer available - return it.
if (ret_consumer_ptr)
{
/// Noop
}
/// 2. There is no consumer, but we can create a new one.
else if (!ret_consumer_ptr && closed_consumer_index.has_value())
{
ret_consumer_ptr = consumers[*closed_consumer_index];
cppkafka::Configuration consumer_config = getConsumerConfiguration(*closed_consumer_index);
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
ret_consumer_ptr->createConsumer(consumer_config);
LOG_TRACE(log, "Created #{} consumer", *closed_consumer_index);
}
/// 3. There is no free consumer and num_consumers already created, waiting @timeout.
else
{
cv.wait_for(lock, timeout, [&]()
{
/// Note we are waiting only opened, free, consumers, since consumer cannot be closed right now
auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr)
{
return !ptr->isInUse() && ptr->hasConsumer();
});
if (it != consumers.end())
{
ret_consumer_ptr = *it;
return true;
}
return false;
});
}
if (ret_consumer_ptr)
{
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
ret_consumer_ptr->inUse();
}
return ret_consumer_ptr;
}
KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
{
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
auto & stream_cancelled = thread_per_consumer ? tasks[consumer_number]->stream_cancelled : tasks.back()->stream_cancelled;
KafkaConsumerPtr kafka_consumer_ptr = std::make_shared<KafkaConsumer>(
log,
getPollMaxBatchSize(),
getPollTimeoutMillisecond(),
intermediate_commit,
stream_cancelled,
topics);
return kafka_consumer_ptr;
}
cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number)
{
cppkafka::Configuration conf;
@ -517,35 +604,66 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
/// a reference to the consumer is needed in statistic callback
/// although the consumer does not exist when callback is being registered
/// shared_ptr<weak_ptr<KafkaConsumer>> comes to the rescue
auto consumer_weak_ptr_ptr = std::make_shared<KafkaConsumerWeakPtr>();
updateConfiguration(conf, consumer_weak_ptr_ptr);
updateConfiguration(conf);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
// Create a consumer and subscribe to topics
auto consumer_impl = std::make_shared<cppkafka::Consumer>(conf);
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
return conf;
}
KafkaConsumerPtr kafka_consumer_ptr;
void StorageKafka::cleanConsumers()
{
UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
if (thread_per_consumer)
std::unique_lock lock(mutex);
std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS);
while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; }))
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
/// Copy consumers for closing to a new vector to close them without a lock
std::vector<ConsumerPtr> consumers_to_close;
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
{
for (size_t i = 0; i < consumers.size(); ++i)
{
auto & consumer_ptr = consumers[i];
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
chassert(consumer_last_used_usec <= now_usec);
if (!consumer_ptr->hasConsumer())
continue;
if (consumer_ptr->isInUse())
continue;
if (now_usec - consumer_last_used_usec > ttl_usec)
{
LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId());
consumers_to_close.push_back(consumer_ptr->moveConsumer());
}
}
}
if (!consumers_to_close.empty())
{
lock.unlock();
Stopwatch watch;
size_t closed = consumers_to_close.size();
consumers_to_close.clear();
LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.",
closed, ttl_usec, watch.elapsedMilliseconds());
lock.lock();
}
ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
}
else
{
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
}
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
return kafka_consumer_ptr;
LOG_TRACE(log, "Consumers cleanup thread finished");
}
size_t StorageKafka::getMaxBlockSize() const
@ -578,8 +696,7 @@ String StorageKafka::getConfigPrefix() const
return CONFIG_KAFKA_TAG;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
std::shared_ptr<KafkaConsumerWeakPtr> kafka_consumer_weak_ptr_ptr)
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
{
// Update consumer configuration from the configuration. Example:
// <kafka>
@ -659,33 +776,12 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
});
if (kafka_consumer_weak_ptr_ptr)
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
///
/// This is the case when you have kafka table but no SELECT from it or
/// materialized view attached.
///
/// So for now it is disabled by default, until properly fixed.
#if 0
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
}
#endif
if (kafka_config.get("statistics.interval.ms") != "0")
{
kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
{
auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
if (kafka_consumer_ptr)
{
kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
}
});
}
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");
}
// Configure interceptor to change thread name
@ -756,7 +852,7 @@ void StorageKafka::threadFunc(size_t idx)
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled && num_created_consumers > 0)
while (!task->stream_cancelled)
{
if (!checkDependencies(table_id))
break;
@ -773,7 +869,7 @@ void StorageKafka::threadFunc(size_t idx)
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
if (duration.count() > KAFKA_MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
break;
@ -793,13 +889,10 @@ void StorageKafka::threadFunc(size_t idx)
LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str);
auto safe_consumers = getSafeConsumers();
for (auto const & consumer_ptr_weak : safe_consumers.consumers)
for (auto const & consumer_ptr : safe_consumers.consumers)
{
/// propagate materialized view exception to all consumers
if (auto consumer_ptr = consumer_ptr_weak.lock())
{
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
}
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
}
}
@ -807,7 +900,7 @@ void StorageKafka::threadFunc(size_t idx)
// Wait for attached views
if (!task->stream_cancelled)
task->holder->scheduleAfter(RESCHEDULE_MS);
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
}
@ -844,7 +937,7 @@ bool StorageKafka::streamToViews()
std::vector<std::shared_ptr<KafkaSource>> sources;
Pipes pipes;
auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
auto stream_count = thread_per_consumer ? 1 : num_consumers;
sources.reserve(stream_count);
pipes.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ThreadPool_fwd.h>
#include <Common/Macros.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
@ -9,16 +10,11 @@
#include <Poco/Semaphore.h>
#include <condition_variable>
#include <mutex>
#include <list>
#include <atomic>
namespace cppkafka
{
class Configuration;
}
#include <cppkafka/cppkafka.h>
namespace DB
{
@ -28,7 +24,7 @@ class StorageSystemKafkaConsumers;
struct StorageKafkaInterceptors;
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
using KafkaConsumerWeakPtr = std::weak_ptr<KafkaConsumer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
@ -84,10 +80,10 @@ public:
{
std::shared_ptr<IStorage> storage_ptr;
std::unique_lock<std::mutex> lock;
std::vector<KafkaConsumerWeakPtr> & consumers;
std::vector<KafkaConsumerPtr> & consumers;
};
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; }
private:
// Configuration and state
@ -102,20 +98,16 @@ private:
const String schema_name;
const size_t num_consumers; /// total number of consumers
Poco::Logger * log;
Poco::Semaphore semaphore;
const bool intermediate_commit;
const SettingsChanges settings_adjustments;
std::atomic<bool> mv_attached = false;
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.
std::vector<KafkaConsumerPtr> consumers; /// available consumers
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
std::vector<KafkaConsumerPtr> consumers;
std::mutex mutex;
std::condition_variable cv;
std::condition_variable cleanup_cv;
// Stream thread
struct TaskContext
@ -129,12 +121,17 @@ private:
std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
/// For memory accounting in the librdkafka threads.
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
SettingsChanges createSettingsAdjustments();
KafkaConsumerPtr createConsumer(size_t consumer_number);
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
/// Returns consumer configuration with all changes that had been overwritten in config
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
/// If named_collection is specified.
String collection_name;
@ -142,11 +139,7 @@ private:
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr<KafkaConsumerWeakPtr>);
void updateConfiguration(cppkafka::Configuration & kafka_config)
{
updateConfiguration(kafka_config, std::make_shared<KafkaConsumerWeakPtr>());
}
void updateConfiguration(cppkafka::Configuration & kafka_config);
String getConfigPrefix() const;
void threadFunc(size_t idx);
@ -161,6 +154,7 @@ private:
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
void cleanConsumers();
};
}

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/getLeastSupertype.h>
@ -852,12 +853,10 @@ KeyCondition::KeyCondition(
ContextPtr context,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr_,
NameSet array_joined_column_names_,
bool single_point_,
bool strict_)
: key_expr(key_expr_)
, key_subexpr_names(getAllSubexpressionNames(*key_expr))
, array_joined_column_names(std::move(array_joined_column_names_))
, single_point(single_point_)
, strict(strict_)
{
@ -1642,6 +1641,15 @@ static void castValueToType(const DataTypePtr & desired_type, Field & src_value,
bool KeyCondition::extractAtomFromTree(const RPNBuilderTreeNode & node, RPNElement & out)
{
const auto * node_dag = node.getDAGNode();
if (node_dag && node_dag->result_type->equals(DataTypeNullable(std::make_shared<DataTypeNothing>())))
{
/// If the inferred result type is Nullable(Nothing) at the query analysis stage,
/// we don't analyze this node further as its condition will always be false.
out.function = RPNElement::ALWAYS_FALSE;
return true;
}
/** Functions < > = != <= >= in `notIn` isNull isNotNull, where one argument is a constant, and the other is one of columns of key,
* or itself, wrapped in a chain of possibly-monotonic functions,
* (for example, if the table has ORDER BY time, we will check the conditions like

View File

@ -69,7 +69,6 @@ public:
ContextPtr context,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr,
NameSet array_joined_column_names,
bool single_point_ = false,
bool strict_ = false);

View File

@ -159,13 +159,7 @@ namespace
KeyCondition buildCondition(const IndexDescription & index, const SelectQueryInfo & query_info, ContextPtr context)
{
if (context->getSettingsRef().allow_experimental_analyzer)
{
NameSet array_join_name_set;
if (query_info.syntax_analyzer_result)
array_join_name_set = query_info.syntax_analyzer_result->getArrayJoinSourceNameSet();
return KeyCondition{query_info.filter_actions_dag, context, index.column_names, index.expression, array_join_name_set};
}
return KeyCondition{query_info.filter_actions_dag, context, index.column_names, index.expression};
return KeyCondition{query_info, context, index.column_names, index.expression};
}

View File

@ -321,7 +321,7 @@ public:
{
const auto & primary_key = storage_snapshot->metadata->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{});
KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression);
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
if (!key_condition.alwaysFalse())

View File

@ -10,7 +10,7 @@ namespace
KeyCondition buildKeyCondition(const KeyDescription & partition_key, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
{
if (context->getSettingsRef().allow_experimental_analyzer)
return {query_info.filter_actions_dag, context, partition_key.column_names, partition_key.expression, {}, true /* single_point */, strict};
return {query_info.filter_actions_dag, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict};
return {query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict};
}
@ -26,7 +26,7 @@ PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, const Sele
PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAGPtr filter_actions_dag, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(filter_actions_dag, context, partition_key.column_names, partition_key.expression, {}, true /* single_point */, strict)
, partition_condition(filter_actions_dag, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
{
}

View File

@ -776,7 +776,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (!writing_existing_part)
{
retries_ctl.setUserError(
ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path);
Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path));
return CommitRetryContext::LOCK_AND_COMMIT;
}
}
@ -1075,10 +1075,10 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
new_retry_controller.actionAfterLastFailedRetry([&]
{
/// We do not know whether or not data has been inserted in other replicas
new_retry_controller.setUserError(
new_retry_controller.setUserError(Exception(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: {}",
new_retry_controller.getLastKeeperErrorMessage());
new_retry_controller.getLastKeeperErrorMessage()));
});
new_retry_controller.retryLoop([&]()

View File

@ -112,7 +112,7 @@ public:
return false;
}
void setUserError(std::exception_ptr exception, int code, std::string message)
void setUserError(std::exception_ptr exception, int code, const std::string & message)
{
if (logger)
LOG_TRACE(logger, "ZooKeeperRetriesControl: {}: setUserError: error={} message={}", name, code, message);
@ -127,21 +127,9 @@ public:
keeper_error = KeeperError{};
}
template <typename... Args>
void setUserError(std::exception_ptr exception, int code, fmt::format_string<Args...> fmt, Args &&... args)
void setUserError(const Exception & exception)
{
setUserError(exception, code, fmt::format(fmt, std::forward<Args>(args)...));
}
void setUserError(int code, std::string message)
{
setUserError(std::make_exception_ptr(Exception::createDeprecated(message, code)), code, message);
}
template <typename... Args>
void setUserError(int code, fmt::format_string<Args...> fmt, Args &&... args)
{
setUserError(code, fmt::format(fmt, std::forward<Args>(args)...));
setUserError(std::make_exception_ptr(exception), exception.code(), exception.message());
}
void setKeeperError(std::exception_ptr exception, Coordination::Error code, std::string message)
@ -159,23 +147,6 @@ public:
user_error = UserError{};
}
template <typename... Args>
void setKeeperError(std::exception_ptr exception, Coordination::Error code, fmt::format_string<Args...> fmt, Args &&... args)
{
setKeeperError(exception, code, fmt::format(fmt, std::forward<Args>(args)...));
}
void setKeeperError(Coordination::Error code, std::string message)
{
setKeeperError(std::make_exception_ptr(zkutil::KeeperException::createDeprecated(message, code)), code, message);
}
template <typename... Args>
void setKeeperError(Coordination::Error code, fmt::format_string<Args...> fmt, Args &&... args)
{
setKeeperError(code, fmt::format(fmt, std::forward<Args>(args)...));
}
void stopRetries() { stop_retries = true; }
bool isLastRetry() const { return total_failures >= retries_info.max_retries; }

View File

@ -1461,11 +1461,16 @@ void StorageS3::Configuration::connect(ContextPtr context)
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
S3::ClientSettings client_settings{
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token);
client = S3::ClientFactory::instance().create(
client_configuration,
url.is_virtual_hosted_style,
local_settings.s3_disable_checksum,
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,

View File

@ -41,6 +41,7 @@ NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
{"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()},
{"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()},
{"is_currently_used", std::make_shared<DataTypeUInt8>()},
{"last_used", std::make_shared<DataTypeDateTime64>(6)},
{"rdkafka_stat", std::make_shared<DataTypeString>()},
};
return names_and_types;
@ -78,6 +79,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]);
auto & last_used = assert_cast<ColumnDateTime64 &>(*res_columns[index++]);
auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]);
const auto access = context->getAccess();
@ -96,57 +98,55 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
for (const auto & weak_consumer : safe_consumers.consumers)
for (const auto & consumer : safe_consumers.consumers)
{
if (auto consumer = weak_consumer.lock())
auto consumer_stat = consumer->getStat();
database.insertData(database_str.data(), database_str.size());
table.insertData(table_str.data(), table_str.size());
consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
const auto num_assignnemts = consumer_stat.assignments.size();
for (size_t num = 0; num < num_assignnemts; ++num)
{
auto consumer_stat = consumer->getStat();
const auto & assign = consumer_stat.assignments[num];
database.insertData(database_str.data(), database_str.size());
table.insertData(table_str.data(), table_str.size());
assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
const auto num_assignnemts = consumer_stat.assignments.size();
for (size_t num = 0; num < num_assignnemts; ++num)
{
const auto & assign = consumer_stat.assignments[num];
assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
assigments_partition_id.insert(assign.partition_id);
assigments_current_offset.insert(assign.current_offset);
}
last_assignment_num += num_assignnemts;
assigments_topics_offsets.push_back(last_assignment_num);
assigments_partition_id_offsets.push_back(last_assignment_num);
assigments_current_offset_offsets.push_back(last_assignment_num);
for (const auto & exc : consumer_stat.exceptions_buffer)
{
exceptions_text.insertData(exc.text.data(), exc.text.size());
exceptions_time.insert(exc.timestamp_usec);
}
exceptions_num += consumer_stat.exceptions_buffer.size();
exceptions_text_offset.push_back(exceptions_num);
exceptions_time_offset.push_back(exceptions_num);
last_poll_time.insert(consumer_stat.last_poll_time);
num_messages_read.insert(consumer_stat.num_messages_read);
last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
num_commits.insert(consumer_stat.num_commits);
last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
is_currently_used.insert(consumer_stat.in_use);
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
assigments_partition_id.insert(assign.partition_id);
assigments_current_offset.insert(assign.current_offset);
}
last_assignment_num += num_assignnemts;
assigments_topics_offsets.push_back(last_assignment_num);
assigments_partition_id_offsets.push_back(last_assignment_num);
assigments_current_offset_offsets.push_back(last_assignment_num);
for (const auto & exc : consumer_stat.exceptions_buffer)
{
exceptions_text.insertData(exc.text.data(), exc.text.size());
exceptions_time.insert(exc.timestamp_usec);
}
exceptions_num += consumer_stat.exceptions_buffer.size();
exceptions_text_offset.push_back(exceptions_num);
exceptions_time_offset.push_back(exceptions_num);
last_poll_time.insert(consumer_stat.last_poll_time);
num_messages_read.insert(consumer_stat.num_messages_read);
last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
num_commits.insert(consumer_stat.num_commits);
last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
is_currently_used.insert(consumer_stat.in_use);
last_used.insert(consumer_stat.last_used_usec);
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
}
};

View File

@ -33,6 +33,7 @@ SET force_primary_key = 0;
SELECT * FROM test_tuple_filter WHERE (1, value) = (id, 'A');
SELECT * FROM test_tuple_filter WHERE tuple(id) = tuple(1);
SELECT * FROM test_tuple_filter WHERE (id, (id, id) = (1, NULL)) == (NULL, NULL);
SELECT * FROM test_tuple_filter WHERE (log_date, value) = tuple('2021-01-01'); -- { serverError 43 }
SELECT * FROM test_tuple_filter WHERE (id, value) = tuple(1); -- { serverError 43 }

View File

@ -3,7 +3,7 @@ DROP TABLE IF EXISTS tab;
SET allow_experimental_inverted_index=1;
CREATE TABLE tab (`k` UInt64, `s` Map(String, String), INDEX af mapKeys(s) TYPE inverted(2) GRANULARITY 1) ENGINE = MergeTree ORDER BY k SETTINGS index_granularity = 2, index_granularity_bytes = '10Mi';
INSERT INTO tab (k) VALUES (0);
SELECT * FROM tab PREWHERE (s[NULL]) = 'Click a03' SETTINGS allow_experimental_analyzer=1; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
SELECT * FROM tab PREWHERE (s[NULL]) = 'Click a03' SETTINGS allow_experimental_analyzer=1;
SELECT * FROM tab PREWHERE (s[1]) = 'Click a03' SETTINGS allow_experimental_analyzer=1; -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT * FROM tab PREWHERE (s['foo']) = 'Click a03' SETTINGS allow_experimental_analyzer=1;
DROP TABLE tab;

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1 @@
SELECT isConstant(format('{}, world', 'Hello'));

View File

@ -0,0 +1,4 @@
Hello 123
world 456
Hello test
world best

View File

@ -0,0 +1,3 @@
WITH arrayJoin(['Hello', 'world'])::Enum('Hello', 'world') AS x SELECT x, transform(x, ['Hello', 'world'], [123, 456], 0);
WITH arrayJoin(['Hello', 'world'])::Enum('Hello', 'world') AS x SELECT x, transform(x, ['Hello', 'world', 'goodbye'], [123, 456], 0); -- { serverError UNKNOWN_ELEMENT_OF_ENUM }
WITH arrayJoin(['Hello', 'world'])::Enum('Hello', 'world') AS x SELECT x, transform(x, ['Hello', 'world'], ['test', 'best']::Array(Enum('test' = 123, 'best' = 456, '' = 0)), ''::Enum('test' = 123, 'best' = 456, '' = 0)) AS y;