Merge branch 'master' into rename_materialize_mysql

This commit is contained in:
Alexander Tokmakov 2021-07-28 12:18:01 +03:00
commit fab4529c97
40 changed files with 868 additions and 65 deletions

View File

@ -14,10 +14,14 @@ services:
}
EOF
./docker-entrypoint.sh'
ports:
- 9020:9019
expose:
- 9019
healthcheck:
test: ["CMD", "curl", "-s", "localhost:9019/ping"]
interval: 5s
timeout: 3s
retries: 30
volumes:
- type: ${JDBC_BRIDGE_FS:-tmpfs}
source: ${JDBC_BRIDGE_LOGS:-}
target: /app/logs

View File

@ -143,15 +143,18 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
fi
tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||:
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
tar -chf /test_output/zookeeper_log_dump.tar /var/lib/clickhouse/data/system/zookeeper_log ||:
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
tar -chf /test_output/zookeeper_log_dump1.tar /var/lib/clickhouse1/data/system/zookeeper_log ||:
tar -chf /test_output/zookeeper_log_dump2.tar /var/lib/clickhouse2/data/system/zookeeper_log ||:
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
fi

View File

@ -42,6 +42,13 @@ CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user',
max_wait_time_when_mysql_unavailable=10000;
```
**Settings on MySQL-server side**
For the correct work of `MaterializeMySQL`, there are few mandatory `MySQL`-side configuration settings that should be set:
- `default_authentication_plugin = mysql_native_password` since `MaterializeMySQL` can only authorize with this method.
- `gtid_mode = on` since GTID based logging is a mandatory for providing correct `MaterializeMySQL` replication. Pay attention that while turning this mode `On` you should also specify `enforce_gtid_consistency = on`.
## Virtual columns {#virtual-columns}
When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) tables are used with virtual `_sign` and `_version` columns.
@ -70,6 +77,7 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](
| STRING | [String](../../sql-reference/data-types/string.md) |
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |
| BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) |
Other types are not supported. If MySQL table contains a column of such type, ClickHouse throws exception "Unhandled data type" and stops replication.
@ -77,6 +85,14 @@ Other types are not supported. If MySQL table contains a column of such type, Cl
## Specifics and Recommendations {#specifics-and-recommendations}
### Compatibility restrictions
Apart of the data types limitations there are few restrictions comparing to `MySQL` databases, that should be resolved before replication will be possible:
- Each table in `MySQL` should contain `PRIMARY KEY`.
- Replication for tables, those are containing rows with `ENUM` field values out of range (specified in `ENUM` signature) will not work.
### DDL Queries {#ddl-queries}
MySQL DDL queries are converted into the corresponding ClickHouse DDL queries ([ALTER](../../sql-reference/statements/alter/index.md), [CREATE](../../sql-reference/statements/create/index.md), [DROP](../../sql-reference/statements/drop.md), [RENAME](../../sql-reference/statements/rename.md)). If ClickHouse cannot parse some DDL query, the query is ignored.

View File

@ -7,48 +7,44 @@ toc_title: Testing Hardware
With this instruction you can run basic ClickHouse performance test on any server without installation of ClickHouse packages.
1. Go to “commits” page: https://github.com/ClickHouse/ClickHouse/commits/master
2. Click on the first green check mark or red cross with green “ClickHouse Build Check” and click on the “Details” link near “ClickHouse Build Check”. There is no such link in some commits, for example commits with documentation. In this case, choose the nearest commit having this link.
3. Copy the link to `clickhouse` binary for amd64 or aarch64.
4. ssh to the server and download it with wget:
1. ssh to the server and download the binary with wget:
```bash
# These links are outdated, please obtain the fresh link from the "commits" page.
# For amd64:
wget https://clickhouse-builds.s3.yandex.net/0/e29c4c3cc47ab2a6c4516486c1b77d57e7d42643/clickhouse_build_check/gcc-10_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse
wget https://builds.clickhouse.tech/master/amd64/clickhouse
# For aarch64:
wget https://clickhouse-builds.s3.yandex.net/0/e29c4c3cc47ab2a6c4516486c1b77d57e7d42643/clickhouse_special_build_check/clang-10-aarch64_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse
wget https://builds.clickhouse.tech/master/aarch64/clickhouse
# Then do:
chmod a+x clickhouse
```
5. Download benchmark files:
2. Download benchmark files:
```bash
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/benchmark-new.sh
chmod a+x benchmark-new.sh
wget https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/queries.sql
```
6. Download test data according to the [Yandex.Metrica dataset](../getting-started/example-datasets/metrica.md) instruction (“hits” table containing 100 million rows).
3. Download test data according to the [Yandex.Metrica dataset](../getting-started/example-datasets/metrica.md) instruction (“hits” table containing 100 million rows).
```bash
wget https://datasets.clickhouse.tech/hits/partitions/hits_100m_obfuscated_v1.tar.xz
tar xvf hits_100m_obfuscated_v1.tar.xz -C .
mv hits_100m_obfuscated_v1/* .
```
7. Run the server:
4. Run the server:
```bash
./clickhouse server
```
8. Check the data: ssh to the server in another terminal
5. Check the data: ssh to the server in another terminal
```bash
./clickhouse client --query "SELECT count() FROM hits_100m_obfuscated"
100000000
```
9. Edit the benchmark-new.sh, change `clickhouse-client` to `./clickhouse client` and add `--max_memory_usage 100000000000` parameter.
6. Edit the benchmark-new.sh, change `clickhouse-client` to `./clickhouse client` and add `--max_memory_usage 100000000000` parameter.
```bash
mcedit benchmark-new.sh
```
10. Run the benchmark:
7. Run the benchmark:
```bash
./benchmark-new.sh hits_100m_obfuscated
```
11. Send the numbers and the info about your hardware configuration to clickhouse-feedback@yandex-team.com
8. Send the numbers and the info about your hardware configuration to clickhouse-feedback@yandex-team.com
All the results are published here: https://clickhouse.tech/benchmark/hardware/

View File

@ -0,0 +1,60 @@
---
toc_priority: 38
toc_title: SETTING
---
# Table Settings Manipulations {#table_settings_manipulations}
There is a set of queries to change table settings. You can modify settings or reset them to default values. A single query can change several settings at once.
If a setting with the specified name does not exist, then the query raises an exception.
**Syntax**
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY|RESET SETTING ...
```
!!! note "Note"
These queries can be applied to [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md) tables only.
## MODIFY SETTING {#alter_modify_setting}
Changes table settings.
**Syntax**
```sql
MODIFY SETTING setting_name=value [, ...]
```
**Example**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id;
ALTER TABLE example_table MODIFY SETTING max_part_loading_threads=8, max_parts_in_total=50000;
```
## RESET SETTING {#alter_reset_setting}
Resets table settings to their default values. If a setting is in a default state, then no action is taken.
**Syntax**
```sql
RESET SETTING setting_name [, ...]
```
**Example**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id
SETTINGS max_part_loading_threads=8;
ALTER TABLE example_table RESET SETTING max_part_loading_threads;
```
**See Also**
- [MergeTree settings](../../../operations/settings/merge-tree-settings.md)

View File

@ -0,0 +1,60 @@
---
toc_priority: 38
toc_title: SETTING
---
# Изменение настроек таблицы {#table_settings_manipulations}
Существуют запросы, которые изменяют настройки таблицы или сбрасывают их в значения по умолчанию. В одном запросе можно изменить сразу несколько настроек.
Если настройка с указанным именем не существует, то генерируется исключение.
**Синтаксис**
``` sql
ALTER TABLE [db].name [ON CLUSTER cluster] MODIFY|RESET SETTING ...
```
!!! note "Примечание"
Эти запросы могут применяться только к таблицам на движке [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md).
## MODIFY SETTING {#alter_modify_setting}
Изменяет настройки таблицы.
**Синтаксис**
```sql
MODIFY SETTING setting_name=value [, ...]
```
**Пример**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id;
ALTER TABLE example_table MODIFY SETTING max_part_loading_threads=8, max_parts_in_total=50000;
```
## RESET SETTING {#alter_reset_setting}
Сбрасывает настройки таблицы в значения по умолчанию. Если настройка уже находится в состоянии по умолчанию, то никакие действия не выполняются.
**Синтаксис**
```sql
RESET SETTING setting_name [, ...]
```
**Пример**
```sql
CREATE TABLE example_table (id UInt32, data String) ENGINE=MergeTree() ORDER BY id
SETTINGS max_part_loading_threads=8;
ALTER TABLE example_table RESET SETTING max_part_loading_threads;
```
**Смотрите также**
- [Настройки MergeTree таблиц](../../../operations/settings/merge-tree-settings.md)

View File

@ -33,7 +33,7 @@ static std::string extractFromConfig(
{
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
*bootstrap_configuration, "zookeeper");
*bootstrap_configuration, "zookeeper", nullptr);
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache);
}

View File

@ -181,15 +181,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
else if (method == "loadIds")
{
params.read(request.getStream());
String ids_string;
readString(ids_string, request.getStream());
std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
if (!params.has("ids"))
{
processError(response, "No 'ids' in request URL");
return;
}
std::vector<uint64_t> ids = parseIdsFromBinary(params.get("ids"));
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
auto input = library_handler->loadIds(ids);

View File

@ -17,7 +17,7 @@ SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dic
if (library_handler != library_handlers.end())
return library_handler->second;
return nullptr;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found dictionary with id: {}", dictionary_id);
}

View File

@ -128,7 +128,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
return loadBase(uri, [ids_string](std::ostream & os) { os << "ids=" << ids_string; });
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
}

View File

@ -111,7 +111,8 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
identity_,
Poco::Timespan(0, session_timeout_ms_ * 1000),
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, operation_timeout_ms_ * 1000));
Poco::Timespan(0, operation_timeout_ms_ * 1000),
zk_log);
if (chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
@ -209,7 +210,8 @@ struct ZooKeeperArgs
std::string implementation;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
ZooKeeperArgs args(config, config_name);
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);

View File

@ -25,6 +25,10 @@ namespace CurrentMetrics
extern const Metric EphemeralNode;
}
namespace DB
{
class ZooKeeperLog;
}
namespace zkutil
{
@ -82,7 +86,7 @@ public:
<identity>user:password</identity>
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_);
/// Creates a new session with the same parameters. This method can be used for reconnecting
/// after the session has expired.
@ -298,6 +302,7 @@ private:
std::mutex mutex;
Poco::Logger * log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
};

View File

@ -537,6 +537,139 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
Coordination::write(server_id, out);
}
void ZooKeeperRequest::createLogElements(LogElements & elems) const
{
elems.emplace_back();
auto & elem = elems.back();
elem.xid = xid;
elem.has_watch = has_watch;
elem.op_num = static_cast<uint32_t>(getOpNum());
elem.path = getPath();
elem.request_idx = elems.size() - 1;
}
void ZooKeeperCreateRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.data = data;
elem.is_ephemeral = is_ephemeral;
elem.is_sequential = is_sequential;
}
void ZooKeeperRemoveRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.version = version;
}
void ZooKeeperSetRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.data = data;
elem.version = version;
}
void ZooKeeperCheckRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.version = version;
}
void ZooKeeperMultiRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
elems.back().requests_size = requests.size();
for (const auto & request : requests)
{
auto & req = dynamic_cast<ZooKeeperRequest &>(*request);
assert(!req.xid || req.xid == xid);
req.createLogElements(elems);
}
}
void ZooKeeperResponse::fillLogElements(LogElements & elems, size_t idx) const
{
auto & elem = elems[idx];
assert(!elem.xid || elem.xid == xid);
elem.xid = xid;
int32_t response_op = tryGetOpNum();
assert(!elem.op_num || elem.op_num == response_op || response_op < 0);
elem.op_num = response_op;
elem.zxid = zxid;
elem.error = static_cast<Int32>(error);
}
void ZooKeeperWatchResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.watch_type = type;
elem.watch_state = state;
elem.path = path;
}
void ZooKeeperCreateResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.path_created = path_created;
}
void ZooKeeperExistsResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
}
void ZooKeeperGetResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.data = data;
elem.stat = stat;
}
void ZooKeeperSetResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
}
void ZooKeeperListResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
elem.children = names;
}
void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) const
{
assert(idx == 0);
assert(elems.size() == responses.size() + 1);
ZooKeeperResponse::fillLogElements(elems, idx);
for (const auto & response : responses)
{
auto & resp = dynamic_cast<ZooKeeperResponse &>(*response);
assert(!resp.xid || resp.xid == xid);
assert(!resp.zxid || resp.zxid == zxid);
resp.xid = xid;
resp.zxid = zxid;
resp.fillLogElements(elems, ++idx);
}
}
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)

View File

@ -2,6 +2,7 @@
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Interpreters/ZooKeeperLog.h>
#include <boost/noncopyable.hpp>
#include <IO/ReadBuffer.h>
@ -22,6 +23,8 @@
namespace Coordination
{
using LogElements = std::vector<ZooKeeperLogElement>;
struct ZooKeeperResponse : virtual Response
{
XID xid = 0;
@ -32,6 +35,8 @@ struct ZooKeeperResponse : virtual Response
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void write(WriteBuffer & out) const;
virtual OpNum getOpNum() const = 0;
virtual void fillLogElements(LogElements & elems, size_t idx) const;
virtual int32_t tryGetOpNum() const { return static_cast<int32_t>(getOpNum()); }
};
using ZooKeeperResponsePtr = std::shared_ptr<ZooKeeperResponse>;
@ -63,6 +68,8 @@ struct ZooKeeperRequest : virtual Request
virtual ZooKeeperResponsePtr makeResponse() const = 0;
virtual bool isReadRequest() const = 0;
virtual void createLogElements(LogElements & elems) const;
};
using ZooKeeperRequestPtr = std::shared_ptr<ZooKeeperRequest>;
@ -119,6 +126,9 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
{
throw Exception("OpNum for watch response doesn't exist", Error::ZRUNTIMEINCONSISTENCY);
}
void fillLogElements(LogElements & elems, size_t idx) const override;
int32_t tryGetOpNum() const override { return 0; }
};
struct ZooKeeperAuthRequest final : ZooKeeperRequest
@ -188,6 +198,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
@ -199,6 +211,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Create; }
size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
@ -214,6 +228,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
@ -244,6 +260,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Exists; }
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
@ -265,6 +283,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Get; }
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
@ -279,6 +299,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
@ -288,6 +310,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Set; }
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
@ -313,6 +337,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::List; }
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
@ -333,6 +359,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
bool isReadRequest() const override { return true; }
size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
@ -409,6 +437,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
bool isReadRequest() const override;
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
@ -433,6 +463,8 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
/// Fake internal coordination (keeper) response. Never received from client

View File

@ -311,10 +311,12 @@ ZooKeeper::ZooKeeper(
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_)
Poco::Timespan operation_timeout_,
std::shared_ptr<ZooKeeperLog> zk_log_)
: root_path(root_path_),
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
operation_timeout(std::min(operation_timeout_, session_timeout_)),
zk_log(std::move(zk_log_))
{
if (!root_path.empty())
{
@ -578,6 +580,8 @@ void ZooKeeper::sendThread()
info.request->probably_sent = true;
info.request->write(*out);
logOperationIfNeeded(info.request);
/// We sent close request, exit
if (info.request->xid == CLOSE_XID)
break;
@ -747,6 +751,9 @@ void ZooKeeper::receiveEvent()
if (!response)
response = request_info.request->makeResponse();
response->xid = xid;
response->zxid = zxid;
if (err != Error::ZOK)
{
response->error = err;
@ -785,6 +792,8 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR);
logOperationIfNeeded(request_info.request, response); //-V614
}
catch (...)
{
@ -802,6 +811,8 @@ void ZooKeeper::receiveEvent()
{
if (request_info.callback)
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response);
}
catch (...)
{
@ -880,17 +891,19 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
for (auto & op : operations)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
ZooKeeperResponsePtr response = request_info.request->makeResponse();
response->error = request_info.request->probably_sent
? Error::ZCONNECTIONLOSS
: Error::ZSESSIONEXPIRED;
response->xid = request_info.request->xid;
if (request_info.callback)
{
try
{
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response, true);
}
catch (...)
{
@ -942,13 +955,15 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
ZooKeeperResponsePtr response = info.request->makeResponse();
if (response)
{
response->error = Error::ZSESSIONEXPIRED;
response->xid = info.request->xid;
try
{
info.callback(*response);
logOperationIfNeeded(info.request, response, true);
}
catch (...)
{
@ -993,6 +1008,12 @@ void ZooKeeper::pushRequest(RequestInfo && info)
throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED);
if (info.request->xid < 0)
throw Exception("XID overflow", Error::ZSESSIONEXPIRED);
if (auto * multi_request = dynamic_cast<ZooKeeperMultiRequest *>(info.request.get()))
{
for (auto & request : multi_request->requests)
dynamic_cast<ZooKeeperRequest &>(*request).xid = multi_request->xid;
}
}
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
@ -1190,4 +1211,46 @@ void ZooKeeper::close()
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
}
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
{
if (!zk_log)
return;
ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN;
Decimal64 event_time = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
LogElements elems;
if (request)
{
request->createLogElements(elems);
log_type = ZooKeeperLogElement::REQUEST;
}
else
{
assert(response);
assert(response->xid == PING_XID || response->xid == WATCH_XID);
elems.emplace_back();
}
if (response)
{
response->fillLogElements(elems, 0);
log_type = ZooKeeperLogElement::RESPONSE;
}
if (finalize)
log_type = ZooKeeperLogElement::FINALIZE;
for (auto & elem : elems)
{
elem.type = log_type;
elem.event_time = event_time;
elem.address = socket.peerAddress();
elem.session_id = session_id;
zk_log->add(elem);
}
}
}

View File

@ -80,6 +80,10 @@ namespace CurrentMetrics
extern const Metric ZooKeeperSession;
}
namespace DB
{
class ZooKeeperLog;
}
namespace Coordination
{
@ -110,7 +114,8 @@ public:
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_);
Poco::Timespan operation_timeout_,
std::shared_ptr<ZooKeeperLog> zk_log_);
~ZooKeeper() override;
@ -258,7 +263,10 @@ private:
template <typename T>
void read(T &);
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log;
};
}

View File

@ -25,7 +25,7 @@ int main(int argc, char ** argv)
DB::ConfigProcessor processor(argv[1], false, true);
auto config = processor.loadConfig().configuration;
zkutil::ZooKeeper zk(*config, "zookeeper");
zkutil::ZooKeeper zk(*config, "zookeeper", nullptr);
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
/// NOTE: setting watches in multiple threads because doing it in a single thread is too slow.

View File

@ -40,7 +40,7 @@ try
}
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000});
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000}, nullptr);
Poco::Event event(true);

View File

@ -5,7 +5,7 @@
int main()
try
{
Coordination::ZooKeeper zookeeper({Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{"localhost:2181"}, false}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
Coordination::ZooKeeper zookeeper({Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{"localhost:2181"}, false}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000}, nullptr);
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
{

View File

@ -31,10 +31,6 @@ SRCS(
MySQL/PacketsProtocolText.cpp
MySQL/PacketsReplication.cpp
NamesAndTypes.cpp
PostgreSQL/Connection.cpp
PostgreSQL/PoolWithFailover.cpp
PostgreSQL/Utils.cpp
PostgreSQL/insertPostgreSQLValue.cpp
PostgreSQLProtocol.cpp
QueryProcessingStage.cpp
Settings.cpp

View File

@ -1745,7 +1745,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper");
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper", getZooKeeperLog());
else if (shared->zookeeper->expired())
shared->zookeeper = shared->zookeeper->startNewSession();
@ -1809,8 +1809,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
"config.xml",
name);
zookeeper
= shared->auxiliary_zookeepers.emplace(name, std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name)).first;
zookeeper = shared->auxiliary_zookeepers.emplace(name,
std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first;
}
else if (zookeeper->second->expired())
zookeeper->second = zookeeper->second->startNewSession();
@ -1824,14 +1824,15 @@ void Context::resetZooKeeper() const
shared->zookeeper.reset();
}
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk)
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk,
std::shared_ptr<ZooKeeperLog> zk_log)
{
if (!zk || zk->configChanged(*config, config_name))
{
if (zk)
zk->finalize();
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name);
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name, std::move(zk_log));
}
}
@ -1839,7 +1840,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
{
std::lock_guard lock(shared->zookeeper_mutex);
shared->zookeeper_config = config;
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper);
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog());
}
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
@ -1854,7 +1855,7 @@ void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr &
it = shared->auxiliary_zookeepers.erase(it);
else
{
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second);
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second, getZooKeeperLog());
++it;
}
}
@ -2137,6 +2138,17 @@ std::shared_ptr<OpenTelemetrySpanLog> Context::getOpenTelemetrySpanLog() const
}
std::shared_ptr<ZooKeeperLog> Context::getZooKeeperLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->zookeeper_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

View File

@ -76,6 +76,7 @@ class TraceLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ZooKeeperLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -714,6 +715,7 @@ public:
std::shared_ptr<MetricLog> getMetricLog() const;
std::shared_ptr<AsynchronousMetricLog> getAsynchronousMetricLog() const;
std::shared_ptr<OpenTelemetrySpanLog> getOpenTelemetrySpanLog() const;
std::shared_ptr<ZooKeeperLog> getZooKeeperLog() const;
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.

View File

@ -31,6 +31,8 @@
#include <pcg_random.hpp>
#include <common/scope_guard_safe.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem;
@ -371,7 +373,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
@ -1136,10 +1138,32 @@ void DDLWorker::runMainThread()
cleanup_event->set();
scheduleTasks(reinitialized);
LOG_DEBUG(log, "Waiting for queue updates");
LOG_DEBUG(log, "Waiting for queue updates (stat: {}, {}, {}, {})",
queue_node_stat.version, queue_node_stat.cversion, queue_node_stat.numChildren, queue_node_stat.pzxid);
/// FIXME It may hang for unknown reason. Timeout is just a hotfix.
constexpr int queue_wait_timeout_ms = 10000;
queue_updated_event->tryWait(queue_wait_timeout_ms);
bool updated = queue_updated_event->tryWait(queue_wait_timeout_ms);
if (!updated)
{
Coordination::Stat new_stat;
tryGetZooKeeper()->get(queue_dir, &new_stat);
bool queue_changed = memcmp(&queue_node_stat, &new_stat, sizeof(Coordination::Stat)) != 0;
bool watch_triggered = queue_updated_event->tryWait(0);
if (queue_changed && !watch_triggered)
{
/// It should never happen.
/// Maybe log message, abort() and system.zookeeper_log will help to debug it and remove timeout (#26036).
LOG_TRACE(
log,
"Queue was not updated (stat: {}, {}, {}, {})",
new_stat.version,
new_stat.cversion,
new_stat.numChildren,
new_stat.pzxid);
context->getZooKeeperLog()->flush();
abort();
}
}
}
catch (const Coordination::Exception & e)
{

View File

@ -3,6 +3,7 @@
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
@ -125,6 +126,7 @@ protected:
std::optional<String> first_failed_task_name;
std::list<DDLTaskPtr> current_tasks;
Coordination::Stat queue_node_stat;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
std::atomic<bool> initialized = false;

View File

@ -25,6 +25,7 @@
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
@ -416,7 +417,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
[&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
[&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); },
[&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); }
);
break;
}

View File

@ -8,6 +8,7 @@
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
@ -103,6 +104,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
global_context, "system", "opentelemetry_span_log", config,
"opentelemetry_span_log");
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -122,6 +124,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
if (zookeeper_log)
logs.emplace_back(zookeeper_log.get());
try
{

View File

@ -74,6 +74,7 @@ class CrashLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ZooKeeperLog;
class ISystemLog
@ -110,6 +111,8 @@ struct SystemLogs
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
/// Used to log all actions of ZooKeeper client
std::shared_ptr<ZooKeeperLog> zookeeper_log;
std::vector<ISystemLog *> logs;
};

View File

@ -0,0 +1,202 @@
#include <Interpreters/ZooKeeperLog.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Interpreters/QueryLog.h>
#include <Poco/Net/IPAddress.h>
#include <Common/IPv6ToBinary.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
namespace DB
{
NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{
auto type_enum = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Request", static_cast<Int8>(REQUEST)},
{"Response", static_cast<Int8>(RESPONSE)},
{"Finalize", static_cast<Int8>(FINALIZE)},
});
auto op_num_enum = std::make_shared<DataTypeEnum16>(
DataTypeEnum16::Values
{
{"Watch", 0},
{"Close", static_cast<Int16>(Coordination::OpNum::Close)},
{"Error", static_cast<Int16>(Coordination::OpNum::Error)},
{"Create", static_cast<Int16>(Coordination::OpNum::Create)},
{"Remove", static_cast<Int16>(Coordination::OpNum::Remove)},
{"Exists", static_cast<Int16>(Coordination::OpNum::Exists)},
{"Get", static_cast<Int16>(Coordination::OpNum::Get)},
{"Set", static_cast<Int16>(Coordination::OpNum::Set)},
{"GetACL", static_cast<Int16>(Coordination::OpNum::GetACL)},
{"SetACL", static_cast<Int16>(Coordination::OpNum::SetACL)},
{"SimpleList", static_cast<Int16>(Coordination::OpNum::SimpleList)},
{"Sync", static_cast<Int16>(Coordination::OpNum::Sync)},
{"Heartbeat", static_cast<Int16>(Coordination::OpNum::Heartbeat)},
{"List", static_cast<Int16>(Coordination::OpNum::List)},
{"Check", static_cast<Int16>(Coordination::OpNum::Check)},
{"Multi", static_cast<Int16>(Coordination::OpNum::Multi)},
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
});
auto error_enum = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"ZOK", static_cast<Int8>(Coordination::Error::ZOK)},
{"ZSYSTEMERROR", static_cast<Int8>(Coordination::Error::ZSYSTEMERROR)},
{"ZRUNTIMEINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZRUNTIMEINCONSISTENCY)},
{"ZDATAINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZDATAINCONSISTENCY)},
{"ZCONNECTIONLOSS", static_cast<Int8>(Coordination::Error::ZCONNECTIONLOSS)},
{"ZMARSHALLINGERROR", static_cast<Int8>(Coordination::Error::ZMARSHALLINGERROR)},
{"ZUNIMPLEMENTED", static_cast<Int8>(Coordination::Error::ZUNIMPLEMENTED)},
{"ZOPERATIONTIMEOUT", static_cast<Int8>(Coordination::Error::ZOPERATIONTIMEOUT)},
{"ZBADARGUMENTS", static_cast<Int8>(Coordination::Error::ZBADARGUMENTS)},
{"ZINVALIDSTATE", static_cast<Int8>(Coordination::Error::ZINVALIDSTATE)},
{"ZAPIERROR", static_cast<Int8>(Coordination::Error::ZAPIERROR)},
{"ZNONODE", static_cast<Int8>(Coordination::Error::ZNONODE)},
{"ZNOAUTH", static_cast<Int8>(Coordination::Error::ZNOAUTH)},
{"ZBADVERSION", static_cast<Int8>(Coordination::Error::ZBADVERSION)},
{"ZNOCHILDRENFOREPHEMERALS", static_cast<Int8>(Coordination::Error::ZNOCHILDRENFOREPHEMERALS)},
{"ZNODEEXISTS", static_cast<Int8>(Coordination::Error::ZNODEEXISTS)},
{"ZNOTEMPTY", static_cast<Int8>(Coordination::Error::ZNOTEMPTY)},
{"ZSESSIONEXPIRED", static_cast<Int8>(Coordination::Error::ZSESSIONEXPIRED)},
{"ZINVALIDCALLBACK", static_cast<Int8>(Coordination::Error::ZINVALIDCALLBACK)},
{"ZINVALIDACL", static_cast<Int8>(Coordination::Error::ZINVALIDACL)},
{"ZAUTHFAILED", static_cast<Int8>(Coordination::Error::ZAUTHFAILED)},
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
});
auto watch_type_enum = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"CREATED", static_cast<Int8>(Coordination::Event::CREATED)},
{"DELETED", static_cast<Int8>(Coordination::Event::DELETED)},
{"CHANGED", static_cast<Int8>(Coordination::Event::CHANGED)},
{"CHILD", static_cast<Int8>(Coordination::Event::CHILD)},
{"SESSION", static_cast<Int8>(Coordination::Event::SESSION)},
{"NOTWATCHING", static_cast<Int8>(Coordination::Event::NOTWATCHING)},
});
auto watch_state_enum = std::make_shared<DataTypeEnum16>(
DataTypeEnum16::Values
{
{"EXPIRED_SESSION", static_cast<Int16>(Coordination::State::EXPIRED_SESSION)},
{"AUTH_FAILED", static_cast<Int16>(Coordination::State::AUTH_FAILED)},
{"CONNECTING", static_cast<Int16>(Coordination::State::CONNECTING)},
{"ASSOCIATING", static_cast<Int16>(Coordination::State::ASSOCIATING)},
{"CONNECTED", static_cast<Int16>(Coordination::State::CONNECTED)},
{"NOTCONNECTED", static_cast<Int16>(Coordination::State::NOTCONNECTED)},
});
return
{
{"type", std::move(type_enum)},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime64>(6)},
{"address", DataTypeFactory::instance().get("IPv6")},
{"port", std::make_shared<DataTypeUInt16>()},
{"session_id", std::make_shared<DataTypeInt64>()},
{"xid", std::make_shared<DataTypeInt32>()},
{"has_watch", std::make_shared<DataTypeUInt8>()},
{"op_num", op_num_enum},
{"path", std::make_shared<DataTypeString>()},
{"data", std::make_shared<DataTypeString>()},
{"is_ephemeral", std::make_shared<DataTypeUInt8>()},
{"is_sequential", std::make_shared<DataTypeUInt8>()},
{"version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>())},
{"requests_size", std::make_shared<DataTypeUInt32>()},
{"request_idx", std::make_shared<DataTypeUInt32>()},
{"zxid", std::make_shared<DataTypeInt64>()},
{"error", std::make_shared<DataTypeNullable>(error_enum)},
{"watch_type", std::make_shared<DataTypeNullable>(watch_type_enum)},
{"watch_state", std::make_shared<DataTypeNullable>(watch_state_enum)},
{"path_created", std::make_shared<DataTypeString>()},
{"stat_czxid", std::make_shared<DataTypeInt64>()},
{"stat_mzxid", std::make_shared<DataTypeInt64>()},
{"stat_pzxid", std::make_shared<DataTypeInt64>()},
{"stat_version", std::make_shared<DataTypeInt32>()},
{"stat_cversion", std::make_shared<DataTypeInt32>()},
{"stat_dataLength", std::make_shared<DataTypeInt32>()},
{"stat_numChildren", std::make_shared<DataTypeInt32>()},
{"children", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
};
}
void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
{
assert(type != UNKNOWN);
size_t i = 0;
columns[i++]->insert(type);
auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(IPv6ToBinary(address.host()).data());
columns[i++]->insert(address.port());
columns[i++]->insert(session_id);
columns[i++]->insert(xid);
columns[i++]->insert(has_watch);
columns[i++]->insert(op_num);
columns[i++]->insert(path);
columns[i++]->insert(data);
columns[i++]->insert(is_ephemeral);
columns[i++]->insert(is_sequential);
columns[i++]->insert(version ? Field(*version) : Field());
columns[i++]->insert(requests_size);
columns[i++]->insert(request_idx);
columns[i++]->insert(zxid);
columns[i++]->insert(error ? Field(*error) : Field());
columns[i++]->insert(watch_type ? Field(*watch_type) : Field());
columns[i++]->insert(watch_state ? Field(*watch_state) : Field());
columns[i++]->insert(path_created);
columns[i++]->insert(stat.czxid);
columns[i++]->insert(stat.mzxid);
columns[i++]->insert(stat.pzxid);
columns[i++]->insert(stat.version);
columns[i++]->insert(stat.cversion);
columns[i++]->insert(stat.dataLength);
columns[i++]->insert(stat.numChildren);
Array children_array;
for (const auto & c : children)
children_array.emplace_back(c);
columns[i++]->insert(children_array);
}
};

View File

@ -0,0 +1,76 @@
#pragma once
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Common/ZooKeeper/IKeeper.h>
namespace DB
{
struct ZooKeeperLogElement
{
enum Type
{
UNKNOWN = 0,
REQUEST = 1,
RESPONSE = 2,
FINALIZE = 3
};
Type type = UNKNOWN;
Decimal64 event_time = 0;
Poco::Net::SocketAddress address;
Int64 session_id = 0;
/// Common request info
Int32 xid = 0;
bool has_watch = false;
Int32 op_num = 0;
String path;
/// create, set
String data;
/// create
bool is_ephemeral = false;
bool is_sequential = false;
/// remove, check, set
std::optional<Int32> version;
/// multi
UInt32 requests_size = 0;
UInt32 request_idx = 0;
/// Common response info
Int64 zxid = 0;
std::optional<Int32> error;
/// watch
std::optional<Int32> watch_type;
std::optional<Int32> watch_state;
/// create
String path_created;
/// exists, get, set, list
Coordination::Stat stat = {};
/// list
Strings children;
static std::string name() { return "ZooKeeperLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
};
class ZooKeeperLog : public SystemLog<ZooKeeperLogElement>
{
using SystemLog<ZooKeeperLogElement>::SystemLog;
};
}

View File

@ -155,6 +155,7 @@ SRCS(
TreeOptimizer.cpp
TreeRewriter.cpp
WindowDescription.cpp
ZooKeeperLog.cpp
addMissingDefaults.cpp
addTypeConversionToAST.cpp
castColumn.cpp

View File

@ -12,7 +12,7 @@ RabbitMQSink::RabbitMQSink(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
ContextPtr context_)
: SinkToStorage(metadata_snapshot->getSampleBlockNonMaterialized())
: SinkToStorage(metadata_snapshot_->getSampleBlockNonMaterialized())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)

View File

@ -26,7 +26,7 @@ try
auto config = processor.loadConfig().configuration;
String root_path = argv[2];
zkutil::ZooKeeper zk(*config, "zookeeper");
zkutil::ZooKeeper zk(*config, "zookeeper", nullptr);
String temp_path = root_path + "/temp";
String blocks_path = root_path + "/block_numbers";

View File

@ -29,7 +29,7 @@ try
auto config = processor.loadConfig().configuration;
String zookeeper_path = argv[2];
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper");
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper", nullptr);
std::unordered_map<String, std::set<Int64>> current_inserts;

View File

@ -0,0 +1,7 @@
<yandex>
<zookeeper_log>
<database>system</database>
<table>zookeeper_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</zookeeper_log>
</yandex>

View File

@ -34,6 +34,7 @@ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/

View File

@ -392,6 +392,13 @@ class ClickHouseCluster:
self.zookeeper_instance_dir_prefix = p.join(self.instances_dir, "zk")
self.zookeeper_dirs_to_create = []
# available when with_jdbc_bridge == True
self.jdbc_bridge_host = "bridge1"
self.jdbc_bridge_ip = None
self.jdbc_bridge_port = 9019
self.jdbc_driver_dir = p.abspath(p.join(self.instances_dir, "jdbc_driver"))
self.jdbc_driver_logs_dir = os.path.join(self.jdbc_driver_dir, "logs")
self.docker_client = None
self.is_up = False
self.env = os.environ.copy()
@ -705,6 +712,8 @@ class ClickHouseCluster:
def setup_jdbc_bridge_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_jdbc_bridge = True
env_variables['JDBC_DRIVER_LOGS'] = self.jdbc_driver_logs_dir
env_variables['JDBC_DRIVER_FS'] = "bind"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')])
self.base_jdbc_bridge_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')]
@ -1523,8 +1532,12 @@ class ClickHouseCluster:
self.wait_cassandra_to_start()
if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd:
os.makedirs(self.jdbc_driver_logs_dir)
os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d'])
self.wait_for_url("http://localhost:9020/ping")
self.jdbc_bridge_ip = self.get_instance_ip(self.jdbc_bridge_host)
self.wait_for_url(f"http://{self.jdbc_bridge_ip}:{self.jdbc_bridge_port}/ping")
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
logging.debug(("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd))))

View File

@ -1,7 +1,6 @@
import contextlib
import logging
import os.path as p
import pytest
import time
import uuid
from helpers.cluster import ClickHouseCluster
@ -23,6 +22,14 @@ def started_cluster():
INSERT INTO test.ClickHouseTable(Num, Str)
SELECT number, toString(number) FROM system.numbers LIMIT {};
'''.format(records))
while True:
datasources = instance.query("select * from jdbc('', 'show datasources')")
if 'self' in datasources:
logging.debug(f"JDBC Driver self datasource initialized.\n{datasources}")
break
else:
logging.debug(f"Waiting JDBC Driver to initialize 'self' datasource.\n{datasources}")
yield cluster
finally:
cluster.shutdown()
@ -52,8 +59,9 @@ def test_jdbc_distributed_query(started_cluster):
def test_jdbc_insert(started_cluster):
"""Test insert query using JDBC table function"""
instance.query('DROP TABLE IF EXISTS test.test_insert')
instance.query('''
CREATE TABLE test.test_insert engine = Memory AS
CREATE TABLE test.test_insert ENGINE = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc('{0}?mutation', 'INSERT INTO test.test_insert VALUES({1}, ''{1}'', ''{1}'')');
@ -67,8 +75,9 @@ def test_jdbc_insert(started_cluster):
def test_jdbc_update(started_cluster):
"""Test update query using JDBC table function"""
secrets = str(uuid.uuid1())
instance.query('DROP TABLE IF EXISTS test.test_update')
instance.query('''
CREATE TABLE test.test_update engine = Memory AS
CREATE TABLE test.test_update ENGINE = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc(
@ -85,8 +94,9 @@ def test_jdbc_update(started_cluster):
def test_jdbc_delete(started_cluster):
"""Test delete query using JDBC table function"""
instance.query('DROP TABLE IF EXISTS test.test_delete')
instance.query('''
CREATE TABLE test.test_delete engine = Memory AS
CREATE TABLE test.test_delete ENGINE = Memory AS
SELECT * FROM test.ClickHouseTable;
SELECT *
FROM jdbc(
@ -102,6 +112,7 @@ def test_jdbc_delete(started_cluster):
def test_jdbc_table_engine(started_cluster):
"""Test query against a JDBC table"""
instance.query('DROP TABLE IF EXISTS test.jdbc_table')
actual = instance.query('''
CREATE TABLE test.jdbc_table(Str String)
ENGINE = JDBC('{}', 'test', 'ClickHouseTable');

View File

@ -0,0 +1,40 @@
log
Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0
Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
parts
Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 4 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 5 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 5 0 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
Response 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 ZOK \N \N 0 0 0 0
Response 0 Remove /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Response 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 5 ZOK \N \N /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0
Request 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 ZOK \N \N 0 0 96 0
blocks
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 3 0 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Response 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 ZOK \N \N /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 0 0
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 3 0 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Request 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 ZOK \N \N 0 0 9 0

View File

@ -0,0 +1,28 @@
drop table if exists rmt;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n;
system sync replica rmt;
insert into rmt values (1);
insert into rmt values (1);
system flush logs;
select 'log';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12)
order by xid, type, request_idx;
select 'parts';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path='/test/01158/' || currentDatabase() || '/rmt/replicas/1/parts/all_0_0_0')
order by xid, type, request_idx;
select 'blocks';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks%' and op_num not in (1, 12))
order by xid, type, request_idx;
drop table rmt;

View File

@ -137,6 +137,7 @@
"01532_execute_merges_on_single_replica",
"00652_replicated_mutations_default_database_zookeeper",
"00620_optimize_on_nonleader_replica_zookeeper",
"01158_zookeeper_log",
/// grep -c
"01018_ddl_dictionaries_bad_queries",
"00908_bloom_filter_index",
@ -182,7 +183,8 @@
],
"polymorphic-parts": [
"01508_partition_pruning_long", /// bug, shoud be fixed
"01482_move_to_prewhere_and_cast" /// bug, shoud be fixed
"01482_move_to_prewhere_and_cast", /// bug, shoud be fixed
"01158_zookeeper_log"
],
"parallel":
[