mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge branch 'master' into patch-1
This commit is contained in:
commit
9123eacdeb
2
.github/workflows/debug.yml
vendored
2
.github/workflows/debug.yml
vendored
@ -2,7 +2,7 @@
|
||||
name: Debug
|
||||
|
||||
'on':
|
||||
[push, pull_request, release, workflow_dispatch]
|
||||
[push, pull_request, release, workflow_dispatch, workflow_call]
|
||||
|
||||
jobs:
|
||||
DebugInfo:
|
||||
|
3
.github/workflows/nightly.yml
vendored
3
.github/workflows/nightly.yml
vendored
@ -10,6 +10,9 @@ env:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
Debug:
|
||||
# The task for having a preserved ENV and event.json for later investigation
|
||||
uses: ./.github/workflows/debug.yml
|
||||
DockerHubPushAarch64:
|
||||
runs-on: [self-hosted, style-checker-aarch64]
|
||||
steps:
|
||||
|
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc
|
||||
Subproject commit e4e746a24eb56861a86f3672771e3308d8c40722
|
@ -1,7 +1,7 @@
|
||||
# docker build -t clickhouse/style-test .
|
||||
FROM ubuntu:20.04
|
||||
ARG ACT_VERSION=0.2.25
|
||||
ARG ACTIONLINT_VERSION=1.6.8
|
||||
ARG ACT_VERSION=0.2.33
|
||||
ARG ACTIONLINT_VERSION=1.6.22
|
||||
|
||||
# ARG for quick switch to a given ubuntu mirror
|
||||
ARG apt_archive="http://archive.ubuntu.com"
|
||||
|
@ -68,36 +68,57 @@ In the results of `SELECT` query, the values of `AggregateFunction` type have im
|
||||
|
||||
## Example of an Aggregated Materialized View {#example-of-an-aggregated-materialized-view}
|
||||
|
||||
`AggregatingMergeTree` materialized view that watches the `test.visits` table:
|
||||
We will create the table `test.visits` that contain the raw data:
|
||||
|
||||
``` sql
|
||||
CREATE MATERIALIZED VIEW test.basic
|
||||
ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate)
|
||||
CREATE TABLE test.visits
|
||||
(
|
||||
StartDate DateTime64 NOT NULL,
|
||||
CounterID UInt64,
|
||||
Sign Nullable(Int32),
|
||||
UserID Nullable(Int32)
|
||||
) ENGINE = MergeTree ORDER BY (StartDate, CounterID);
|
||||
```
|
||||
|
||||
`AggregatingMergeTree` materialized view that watches the `test.visits` table, and use the `AggregateFunction` type:
|
||||
|
||||
``` sql
|
||||
CREATE MATERIALIZED VIEW test.mv_visits
|
||||
(
|
||||
StartDate DateTime64 NOT NULL,
|
||||
CounterID UInt64,
|
||||
Visits AggregateFunction(sum, Nullable(Int32)),
|
||||
Users AggregateFunction(uniq, Nullable(Int32))
|
||||
)
|
||||
ENGINE = AggregatingMergeTree() ORDER BY (StartDate, CounterID)
|
||||
AS SELECT
|
||||
CounterID,
|
||||
StartDate,
|
||||
sumState(Sign) AS Visits,
|
||||
CounterID,
|
||||
sumState(Sign) AS Visits,
|
||||
uniqState(UserID) AS Users
|
||||
FROM test.visits
|
||||
GROUP BY CounterID, StartDate;
|
||||
GROUP BY StartDate, CounterID;
|
||||
```
|
||||
|
||||
Inserting data into the `test.visits` table.
|
||||
|
||||
``` sql
|
||||
INSERT INTO test.visits ...
|
||||
INSERT INTO test.visits (StartDate, CounterID, Sign, UserID)
|
||||
VALUES (1667446031, 1, 3, 4)
|
||||
INSERT INTO test.visits (StartDate, CounterID, Sign, UserID)
|
||||
VALUES (1667446031, 1, 6, 3)
|
||||
```
|
||||
|
||||
The data are inserted in both the table and view `test.basic` that will perform the aggregation.
|
||||
The data are inserted in both the table and the materialized view `test.mv_visits`.
|
||||
|
||||
To get the aggregated data, we need to execute a query such as `SELECT ... GROUP BY ...` from the view `test.basic`:
|
||||
To get the aggregated data, we need to execute a query such as `SELECT ... GROUP BY ...` from the materialized view `test.mv_visits`:
|
||||
|
||||
``` sql
|
||||
SELECT
|
||||
StartDate,
|
||||
sumMerge(Visits) AS Visits,
|
||||
uniqMerge(Users) AS Users
|
||||
FROM test.basic
|
||||
FROM test.mv_visits
|
||||
GROUP BY StartDate
|
||||
ORDER BY StartDate;
|
||||
```
|
||||
|
@ -126,7 +126,7 @@ clickhouse keeper --config /etc/your_path_to_config/config.xml
|
||||
|
||||
ClickHouse Keeper also provides 4lw commands which are almost the same with Zookeeper. Each command is composed of four letters such as `mntr`, `stat` etc. There are some more interesting commands: `stat` gives some general information about the server and connected clients, while `srvr` and `cons` give extended details on server and connections respectively.
|
||||
|
||||
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchc,wchs,dirs,mntr,isro`.
|
||||
The 4lw commands has a white list configuration `four_letter_word_white_list` which has default value `conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif`.
|
||||
|
||||
You can issue the commands to ClickHouse Keeper via telnet or nc, at the client port.
|
||||
|
||||
@ -309,6 +309,25 @@ Sessions with Ephemerals (1):
|
||||
/clickhouse/task_queue/ddl
|
||||
```
|
||||
|
||||
- `csnp`: Schedule a snapshot creation task. Return the last committed log index of the scheduled snapshot if success or `Failed to schedule snapshot creation task.` if failed. Note that `lgif` command can help you determine whether the snapshot is done.
|
||||
|
||||
```
|
||||
100
|
||||
```
|
||||
|
||||
- `lgif`: Keeper log information. `first_log_idx` : my first log index in log store; `first_log_term` : my first log term; `last_log_idx` : my last log index in log store; `last_log_term` : my last log term; `last_committed_log_idx` : my last committed log index in state machine; `leader_committed_log_idx` : leader's committed log index from my perspective; `target_committed_log_idx` : target log index should be committed to; `last_snapshot_idx` : the largest committed log index in last snapshot.
|
||||
|
||||
```
|
||||
first_log_idx 1
|
||||
first_log_term 1
|
||||
last_log_idx 101
|
||||
last_log_term 1
|
||||
last_committed_log_idx 100
|
||||
leader_committed_log_idx 101
|
||||
target_committed_log_idx 101
|
||||
last_snapshot_idx 50
|
||||
```
|
||||
|
||||
## Migration from ZooKeeper {#migration-from-zookeeper}
|
||||
|
||||
Seamlessly migration from ZooKeeper to ClickHouse Keeper is impossible you have to stop your ZooKeeper cluster, convert data and start ClickHouse Keeper. `clickhouse-keeper-converter` tool allows converting ZooKeeper logs and snapshots to ClickHouse Keeper snapshot. It works only with ZooKeeper > 3.4. Steps for migration:
|
||||
|
@ -1150,3 +1150,13 @@ A text with tags .
|
||||
The content within <b>CDATA</b>
|
||||
Do Nothing for 2 Minutes 2:00
|
||||
```
|
||||
|
||||
## ascii(s) {#ascii}
|
||||
|
||||
Returns the ASCII code point of the first character of str. The result type is Int32.
|
||||
|
||||
If s is empty, the result is 0. If the first character is not an ASCII character or not part of the Latin-1 Supplement range of UTF-16, the result is undefined.
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -36,7 +36,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
|
||||
}
|
||||
|
||||
|
||||
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv";
|
||||
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif";
|
||||
|
||||
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
|
||||
: server_id(NOT_EXIST)
|
||||
|
@ -136,6 +136,12 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
|
||||
FourLetterCommandPtr api_version_command = std::make_shared<ApiVersionCommand>(keeper_dispatcher);
|
||||
factory.registerCommand(api_version_command);
|
||||
|
||||
FourLetterCommandPtr create_snapshot_command = std::make_shared<CreateSnapshotCommand>(keeper_dispatcher);
|
||||
factory.registerCommand(create_snapshot_command);
|
||||
|
||||
FourLetterCommandPtr log_info_command = std::make_shared<LogInfoCommand>(keeper_dispatcher);
|
||||
factory.registerCommand(log_info_command);
|
||||
|
||||
factory.initializeAllowList(keeper_dispatcher);
|
||||
factory.setInitialize(true);
|
||||
}
|
||||
@ -472,4 +478,33 @@ String ApiVersionCommand::run()
|
||||
return toString(static_cast<uint8_t>(Coordination::current_keeper_api_version));
|
||||
}
|
||||
|
||||
String CreateSnapshotCommand::run()
|
||||
{
|
||||
auto log_index = keeper_dispatcher.createSnapshot();
|
||||
return log_index > 0 ? std::to_string(log_index) : "Failed to schedule snapshot creation task.";
|
||||
}
|
||||
|
||||
String LogInfoCommand::run()
|
||||
{
|
||||
KeeperLogInfo log_info = keeper_dispatcher.getKeeperLogInfo();
|
||||
StringBuffer ret;
|
||||
|
||||
auto append = [&ret] (String key, uint64_t value) -> void
|
||||
{
|
||||
writeText(key, ret);
|
||||
writeText('\t', ret);
|
||||
writeText(std::to_string(value), ret);
|
||||
writeText('\n', ret);
|
||||
};
|
||||
append("first_log_idx", log_info.first_log_idx);
|
||||
append("first_log_term", log_info.first_log_idx);
|
||||
append("last_log_idx", log_info.last_log_idx);
|
||||
append("last_log_term", log_info.last_log_term);
|
||||
append("last_committed_log_idx", log_info.last_committed_log_idx);
|
||||
append("leader_committed_log_idx", log_info.leader_committed_log_idx);
|
||||
append("target_committed_log_idx", log_info.target_committed_log_idx);
|
||||
append("last_snapshot_idx", log_info.last_snapshot_idx);
|
||||
return ret.str();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ using FourLetterCommandPtr = std::shared_ptr<DB::IFourLetterCommand>;
|
||||
/// Just like zookeeper Four Letter Words commands, CH Keeper responds to a small set of commands.
|
||||
/// Each command is composed of four letters, these commands are useful to monitor and issue system problems.
|
||||
/// The feature is based on Zookeeper 3.5.9, details is in https://zookeeper.apache.org/doc/r3.5.9/zookeeperAdmin.html#sc_zkCommands.
|
||||
/// Also we add some additional commands such as csnp, lgif etc.
|
||||
struct IFourLetterCommand
|
||||
{
|
||||
public:
|
||||
@ -327,4 +328,40 @@ struct ApiVersionCommand : public IFourLetterCommand
|
||||
String run() override;
|
||||
~ApiVersionCommand() override = default;
|
||||
};
|
||||
|
||||
/// Create snapshot manually
|
||||
struct CreateSnapshotCommand : public IFourLetterCommand
|
||||
{
|
||||
explicit CreateSnapshotCommand(KeeperDispatcher & keeper_dispatcher_)
|
||||
: IFourLetterCommand(keeper_dispatcher_)
|
||||
{
|
||||
}
|
||||
|
||||
String name() override { return "csnp"; }
|
||||
String run() override;
|
||||
~CreateSnapshotCommand() override = default;
|
||||
};
|
||||
|
||||
/** Raft log information:
|
||||
* first_log_idx 1
|
||||
* first_log_term 1
|
||||
* last_log_idx 101
|
||||
* last_log_term 1
|
||||
* last_committed_idx 100
|
||||
* leader_committed_log_idx 101
|
||||
* target_committed_log_idx 101
|
||||
* last_snapshot_idx 50
|
||||
*/
|
||||
struct LogInfoCommand : public IFourLetterCommand
|
||||
{
|
||||
explicit LogInfoCommand(KeeperDispatcher & keeper_dispatcher_)
|
||||
: IFourLetterCommand(keeper_dispatcher_)
|
||||
{
|
||||
}
|
||||
|
||||
String name() override { return "lgif"; }
|
||||
String run() override;
|
||||
~LogInfoCommand() override = default;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -47,4 +47,32 @@ struct Keeper4LWInfo
|
||||
}
|
||||
};
|
||||
|
||||
/// Keeper log information for 4lw commands
|
||||
struct KeeperLogInfo
|
||||
{
|
||||
/// My first log index in log store.
|
||||
uint64_t first_log_idx;
|
||||
|
||||
/// My first log term.
|
||||
uint64_t first_log_term;
|
||||
|
||||
/// My last log index in log store.
|
||||
uint64_t last_log_idx;
|
||||
|
||||
/// My last log term.
|
||||
uint64_t last_log_term;
|
||||
|
||||
/// My last committed log index in state machine.
|
||||
uint64_t last_committed_log_idx;
|
||||
|
||||
/// Leader's committed log index from my perspective.
|
||||
uint64_t leader_committed_log_idx;
|
||||
|
||||
/// Target log index should be committed to.
|
||||
uint64_t target_committed_log_idx;
|
||||
|
||||
/// The largest committed log index in last snapshot.
|
||||
uint64_t last_snapshot_idx;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -203,6 +203,18 @@ public:
|
||||
{
|
||||
keeper_stats.reset();
|
||||
}
|
||||
|
||||
/// Create snapshot manually, return the last committed log index in the snapshot
|
||||
uint64_t createSnapshot()
|
||||
{
|
||||
return server->createSnapshot();
|
||||
}
|
||||
|
||||
/// Get Raft information
|
||||
KeeperLogInfo getKeeperLogInfo()
|
||||
{
|
||||
return server->getKeeperLogInfo();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -907,4 +907,29 @@ Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t KeeperServer::createSnapshot()
|
||||
{
|
||||
uint64_t log_idx = raft_instance->create_snapshot();
|
||||
if (log_idx != 0)
|
||||
LOG_INFO(log, "Snapshot creation scheduled with last committed log index {}.", log_idx);
|
||||
else
|
||||
LOG_WARNING(log, "Failed to schedule snapshot creation task.");
|
||||
return log_idx;
|
||||
}
|
||||
|
||||
KeeperLogInfo KeeperServer::getKeeperLogInfo()
|
||||
{
|
||||
KeeperLogInfo log_info;
|
||||
auto log_store = state_manager->load_log_store();
|
||||
log_info.first_log_idx = log_store->start_index();
|
||||
log_info.first_log_term = log_store->term_at(log_info.first_log_idx);
|
||||
log_info.last_log_idx = raft_instance->get_last_log_idx();
|
||||
log_info.last_log_term = raft_instance->get_last_log_term();
|
||||
log_info.last_committed_log_idx = raft_instance->get_committed_log_idx();
|
||||
log_info.leader_committed_log_idx = raft_instance->get_leader_committed_log_idx();
|
||||
log_info.target_committed_log_idx = raft_instance->get_target_committed_log_idx();
|
||||
log_info.last_snapshot_idx = raft_instance->get_last_snapshot_idx();
|
||||
return log_info;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -131,6 +131,10 @@ public:
|
||||
/// Wait configuration update for action. Used by followers.
|
||||
/// Return true if update was successfully received.
|
||||
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
|
||||
|
||||
uint64_t createSnapshot();
|
||||
|
||||
KeeperLogInfo getKeeperLogInfo();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -377,6 +377,9 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
{
|
||||
assert(deltas.empty() || deltas.front().zxid >= commit_zxid);
|
||||
|
||||
// collect nodes that have no further modification in the current transaction
|
||||
std::unordered_set<std::string> modified_nodes;
|
||||
|
||||
while (!deltas.empty() && deltas.front().zxid == commit_zxid)
|
||||
{
|
||||
if (std::holds_alternative<SubDeltaEnd>(deltas.front().operation))
|
||||
@ -393,7 +396,17 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
assert(path_deltas.front() == &front_delta);
|
||||
path_deltas.pop_front();
|
||||
if (path_deltas.empty())
|
||||
{
|
||||
deltas_for_path.erase(front_delta.path);
|
||||
|
||||
// no more deltas for path -> no modification
|
||||
modified_nodes.insert(std::move(front_delta.path));
|
||||
}
|
||||
else if (path_deltas.front()->zxid > commit_zxid)
|
||||
{
|
||||
// next delta has a zxid from a different transaction -> no modification in this transaction
|
||||
modified_nodes.insert(std::move(front_delta.path));
|
||||
}
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&front_delta.operation))
|
||||
{
|
||||
@ -409,9 +422,12 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
}
|
||||
|
||||
// delete all cached nodes that were not modified after the commit_zxid
|
||||
// the commit can end on SubDeltaEnd so we don't want to clear cached nodes too soon
|
||||
if (deltas.empty() || deltas.front().zxid > commit_zxid)
|
||||
std::erase_if(nodes, [commit_zxid](const auto & node) { return node.second.zxid == commit_zxid; });
|
||||
// we only need to check the nodes that were modified in this transaction
|
||||
for (const auto & node : modified_nodes)
|
||||
{
|
||||
if (nodes[node].zxid == commit_zxid)
|
||||
nodes.erase(node);
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
|
@ -443,6 +443,11 @@ ASTPtr DatabasePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) c
|
||||
if (which.isArray())
|
||||
return makeASTFunction("Array", getColumnDeclaration(typeid_cast<const DataTypeArray *>(data_type.get())->getNestedType()));
|
||||
|
||||
if (which.isDateTime64())
|
||||
{
|
||||
return makeASTFunction("DateTime64", std::make_shared<ASTLiteral>(static_cast<UInt32>(6)));
|
||||
}
|
||||
|
||||
return std::make_shared<ASTIdentifier>(data_type->getName());
|
||||
}
|
||||
|
||||
|
86
src/Functions/ascii.cpp
Normal file
86
src/Functions/ascii.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionStringOrArrayToT.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
struct AsciiName
|
||||
{
|
||||
static constexpr auto name = "ascii";
|
||||
};
|
||||
|
||||
|
||||
struct AsciiImpl
|
||||
{
|
||||
static constexpr auto is_fixed_to_constant = false;
|
||||
using ReturnType = Int32;
|
||||
|
||||
|
||||
static void vector(const ColumnString::Chars & data, const ColumnString::Offsets & offsets, PaddedPODArray<ReturnType> & res)
|
||||
{
|
||||
size_t size = offsets.size();
|
||||
|
||||
ColumnString::Offset prev_offset = 0;
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
res[i] = doAscii(data, prev_offset, offsets[i] - prev_offset - 1);
|
||||
prev_offset = offsets[i];
|
||||
}
|
||||
}
|
||||
|
||||
[[noreturn]] static void vectorFixedToConstant(const ColumnString::Chars & /*data*/, size_t /*n*/, Int32 & /*res*/)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "vectorFixedToConstant not implemented for function {}", AsciiName::name);
|
||||
}
|
||||
|
||||
static void vectorFixedToVector(const ColumnString::Chars & data, size_t n, PaddedPODArray<ReturnType> & res)
|
||||
{
|
||||
size_t size = data.size() / n;
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
res[i] = doAscii(data, i * n, n);
|
||||
}
|
||||
}
|
||||
|
||||
[[noreturn]] static void array(const ColumnString::Offsets & /*offsets*/, PaddedPODArray<ReturnType> & /*res*/)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Cannot apply function {} to Array argument", AsciiName::name);
|
||||
}
|
||||
|
||||
[[noreturn]] static void uuid(const ColumnUUID::Container & /*offsets*/, size_t /*n*/, PaddedPODArray<ReturnType> & /*res*/)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Cannot apply function {} to UUID argument", AsciiName::name);
|
||||
}
|
||||
|
||||
private:
|
||||
static Int32 doAscii(const ColumnString::Chars & buf, size_t offset, size_t size)
|
||||
{
|
||||
return size ? static_cast<ReturnType>(buf[offset]) : 0;
|
||||
}
|
||||
};
|
||||
|
||||
using FunctionAscii = FunctionStringOrArrayToT<AsciiImpl, AsciiName, AsciiImpl::ReturnType>;
|
||||
|
||||
REGISTER_FUNCTION(Ascii)
|
||||
{
|
||||
factory.registerFunction<FunctionAscii>(
|
||||
{
|
||||
R"(
|
||||
Returns the ASCII code point of the first character of str. The result type is Int32.
|
||||
|
||||
If s is empty, the result is 0. If the first character is not an ASCII character or not part of the Latin-1 Supplement range of UTF-16, the result is undefined)
|
||||
)",
|
||||
Documentation::Examples{{"ascii", "SELECT ascii('234')"}},
|
||||
Documentation::Categories{"String"}
|
||||
}, FunctionFactory::CaseInsensitive);
|
||||
}
|
||||
|
||||
}
|
@ -1197,6 +1197,9 @@ public:
|
||||
if (!mergeElement())
|
||||
return false;
|
||||
|
||||
if (elements.size() != 2)
|
||||
return false;
|
||||
|
||||
elements = {makeASTFunction("CAST", elements[0], elements[1])};
|
||||
finished = true;
|
||||
return true;
|
||||
@ -1406,7 +1409,7 @@ public:
|
||||
protected:
|
||||
bool getResultImpl(ASTPtr & node) override
|
||||
{
|
||||
if (state == 2)
|
||||
if (state == 2 && elements.size() == 2)
|
||||
std::swap(elements[1], elements[0]);
|
||||
|
||||
node = makeASTFunction("position", std::move(elements));
|
||||
|
@ -5426,6 +5426,7 @@ static void selectBestProjection(
|
||||
|
||||
auto projection_result_ptr = reader.estimateNumMarksToRead(
|
||||
projection_parts,
|
||||
candidate.prewhere_info,
|
||||
candidate.required_columns,
|
||||
storage_snapshot->metadata,
|
||||
candidate.desc->metadata,
|
||||
@ -5449,6 +5450,7 @@ static void selectBestProjection(
|
||||
{
|
||||
auto normal_result_ptr = reader.estimateNumMarksToRead(
|
||||
normal_parts,
|
||||
query_info.prewhere_info,
|
||||
required_columns,
|
||||
storage_snapshot->metadata,
|
||||
storage_snapshot->metadata,
|
||||
@ -5783,7 +5785,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
||||
const auto & analysis_result = select.getAnalysisResult();
|
||||
|
||||
query_info.prepared_sets = select.getQueryAnalyzer()->getPreparedSets();
|
||||
query_info.prewhere_info = analysis_result.prewhere_info;
|
||||
|
||||
const auto & before_where = analysis_result.before_where;
|
||||
const auto & where_column_name = analysis_result.where_column_name;
|
||||
@ -6060,6 +6061,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
||||
{
|
||||
auto normal_result_ptr = reader.estimateNumMarksToRead(
|
||||
normal_parts,
|
||||
query_info.prewhere_info,
|
||||
analysis_result.required_columns,
|
||||
metadata_snapshot,
|
||||
metadata_snapshot,
|
||||
@ -6092,6 +6094,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
||||
{
|
||||
query_info.merge_tree_select_result_ptr = reader.estimateNumMarksToRead(
|
||||
parts,
|
||||
query_info.prewhere_info,
|
||||
analysis_result.required_columns,
|
||||
metadata_snapshot,
|
||||
metadata_snapshot,
|
||||
@ -6173,8 +6176,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
|
||||
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
|
||||
}
|
||||
|
||||
/// Just in case, reset prewhere info calculated from projection.
|
||||
query_info.prewhere_info.reset();
|
||||
return *selected_candidate;
|
||||
}
|
||||
|
||||
|
@ -214,6 +214,14 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
/// Previous part only in boundaries of partition frame
|
||||
const MergeTreeData::DataPartPtr * prev_part = nullptr;
|
||||
|
||||
/// collect min_age for each partition while iterating parts
|
||||
struct PartitionInfo
|
||||
{
|
||||
time_t min_age{std::numeric_limits<time_t>::max()};
|
||||
};
|
||||
|
||||
std::unordered_map<std::string, PartitionInfo> partitions_info;
|
||||
|
||||
size_t parts_selected_precondition = 0;
|
||||
for (const MergeTreeData::DataPartPtr & part : data_parts)
|
||||
{
|
||||
@ -277,6 +285,9 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
|
||||
part_info.shall_participate_in_merges = has_volumes_with_disabled_merges ? part->shallParticipateInMerges(storage_policy) : true;
|
||||
|
||||
auto & partition_info = partitions_info[partition_id];
|
||||
partition_info.min_age = std::min(partition_info.min_age, part_info.age);
|
||||
|
||||
++parts_selected_precondition;
|
||||
|
||||
parts_ranges.back().emplace_back(part_info);
|
||||
@ -333,7 +344,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
SimpleMergeSelector::Settings merge_settings;
|
||||
/// Override value from table settings
|
||||
merge_settings.max_parts_to_merge_at_once = data_settings->max_parts_to_merge_at_once;
|
||||
merge_settings.min_age_to_force_merge = data_settings->min_age_to_force_merge_seconds;
|
||||
if (!data_settings->min_age_to_force_merge_on_partition_only)
|
||||
merge_settings.min_age_to_force_merge = data_settings->min_age_to_force_merge_seconds;
|
||||
|
||||
if (aggressive)
|
||||
merge_settings.base = 1;
|
||||
@ -347,6 +359,20 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
|
||||
if (parts_to_merge.empty())
|
||||
{
|
||||
if (data_settings->min_age_to_force_merge_on_partition_only && data_settings->min_age_to_force_merge_seconds)
|
||||
{
|
||||
auto best_partition_it = std::max_element(
|
||||
partitions_info.begin(),
|
||||
partitions_info.end(),
|
||||
[](const auto & e1, const auto & e2) { return e1.second.min_age < e2.second.min_age; });
|
||||
|
||||
assert(best_partition_it != partitions_info.end());
|
||||
|
||||
if (static_cast<size_t>(best_partition_it->second.min_age) >= data_settings->min_age_to_force_merge_seconds)
|
||||
return selectAllPartsToMergeWithinPartition(
|
||||
future_part, can_merge_callback, best_partition_it->first, true, metadata_snapshot, txn, out_disable_reason);
|
||||
}
|
||||
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
|
@ -1294,6 +1294,7 @@ static void selectColumnNames(
|
||||
|
||||
MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
const PrewhereInfoPtr & prewhere_info,
|
||||
const Names & column_names_to_return,
|
||||
const StorageMetadataPtr & metadata_snapshot_base,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
@ -1318,7 +1319,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
|
||||
|
||||
return ReadFromMergeTree::selectRangesToRead(
|
||||
std::move(parts),
|
||||
query_info.prewhere_info,
|
||||
prewhere_info,
|
||||
added_filter_nodes,
|
||||
metadata_snapshot_base,
|
||||
metadata_snapshot,
|
||||
|
@ -56,6 +56,7 @@ public:
|
||||
/// This method is used to select best projection for table.
|
||||
MergeTreeDataSelectAnalysisResultPtr estimateNumMarksToRead(
|
||||
MergeTreeData::DataPartsVector parts,
|
||||
const PrewhereInfoPtr & prewhere_info,
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot_base,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
|
@ -63,6 +63,7 @@ struct Settings;
|
||||
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
|
||||
M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \
|
||||
M(UInt64, min_age_to_force_merge_seconds, 0, "If all parts in a certain range are older than this value, range will be always eligible for merging. Set to 0 to disable.", 0) \
|
||||
M(Bool, min_age_to_force_merge_on_partition_only, false, "Whether min_age_to_force_merge_seconds should be applied only on the entire partition and not on subset.", false) \
|
||||
M(UInt64, merge_tree_enable_clear_old_broken_detached, false, "Enable clearing old broken detached parts operation in background.", 0) \
|
||||
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
|
||||
\
|
||||
|
@ -87,7 +87,7 @@ class PRInfo:
|
||||
self.body = ""
|
||||
self.diff_urls = []
|
||||
self.release_pr = 0
|
||||
ref = github_event.get("ref", "refs/head/master")
|
||||
ref = github_event.get("ref", "refs/heads/master")
|
||||
if ref and ref.startswith("refs/heads/"):
|
||||
ref = ref[11:]
|
||||
|
||||
|
@ -596,3 +596,48 @@ def test_cmd_wchp(started_cluster):
|
||||
assert "/test_4lw_normal_node_1" in list_data
|
||||
finally:
|
||||
destroy_zk_client(zk)
|
||||
|
||||
|
||||
def test_cmd_csnp(started_cluster):
|
||||
zk = None
|
||||
try:
|
||||
wait_nodes()
|
||||
zk = get_fake_zk(node1.name, timeout=30.0)
|
||||
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="csnp")
|
||||
try:
|
||||
int(data)
|
||||
assert True
|
||||
except ValueError:
|
||||
assert False
|
||||
finally:
|
||||
destroy_zk_client(zk)
|
||||
|
||||
|
||||
def test_cmd_lgif(started_cluster):
|
||||
zk = None
|
||||
try:
|
||||
wait_nodes()
|
||||
clear_znodes()
|
||||
|
||||
zk = get_fake_zk(node1.name, timeout=30.0)
|
||||
do_some_action(zk, create_cnt=100)
|
||||
|
||||
data = keeper_utils.send_4lw_cmd(cluster, node1, cmd="lgif")
|
||||
print(data)
|
||||
reader = csv.reader(data.split("\n"), delimiter="\t")
|
||||
result = {}
|
||||
|
||||
for row in reader:
|
||||
if len(row) != 0:
|
||||
result[row[0]] = row[1]
|
||||
|
||||
assert int(result["first_log_idx"]) == 1
|
||||
assert int(result["first_log_term"]) == 1
|
||||
assert int(result["last_log_idx"]) >= 1
|
||||
assert int(result["last_log_term"]) == 1
|
||||
assert int(result["last_committed_log_idx"]) >= 1
|
||||
assert int(result["leader_committed_log_idx"]) >= 1
|
||||
assert int(result["target_committed_log_idx"]) >= 1
|
||||
assert int(result["last_snapshot_idx"]) >= 1
|
||||
finally:
|
||||
destroy_zk_client(zk)
|
||||
|
@ -1,8 +0,0 @@
|
||||
<clickhouse>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>zoo1</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
</clickhouse>
|
@ -1,88 +0,0 @@
|
||||
import pytest
|
||||
import time
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/zookeeper_config.xml"],
|
||||
with_zookeeper=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_part_number(table_name):
|
||||
return TSV(
|
||||
node.query(
|
||||
f"SELECT count(*) FROM system.parts where table='{table_name}' and active=1"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def check_expected_part_number(seconds, table_name, expected):
|
||||
ok = False
|
||||
for i in range(int(seconds) * 2):
|
||||
result = get_part_number(table_name)
|
||||
if result == expected:
|
||||
ok = True
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
assert ok
|
||||
|
||||
|
||||
def test_without_force_merge_old_parts(start_cluster):
|
||||
node.query(
|
||||
"CREATE TABLE test_without_merge (i Int64) ENGINE = MergeTree ORDER BY i;"
|
||||
)
|
||||
node.query("INSERT INTO test_without_merge SELECT 1")
|
||||
node.query("INSERT INTO test_without_merge SELECT 2")
|
||||
node.query("INSERT INTO test_without_merge SELECT 3")
|
||||
|
||||
expected = TSV("""3\n""")
|
||||
# verify that the parts don't get merged
|
||||
for i in range(10):
|
||||
if get_part_number("test_without_merge") != expected:
|
||||
assert False
|
||||
time.sleep(1)
|
||||
|
||||
node.query("DROP TABLE test_without_merge;")
|
||||
|
||||
|
||||
def test_force_merge_old_parts(start_cluster):
|
||||
node.query(
|
||||
"CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;"
|
||||
)
|
||||
node.query("INSERT INTO test_with_merge SELECT 1")
|
||||
node.query("INSERT INTO test_with_merge SELECT 2")
|
||||
node.query("INSERT INTO test_with_merge SELECT 3")
|
||||
|
||||
expected = TSV("""1\n""")
|
||||
check_expected_part_number(10, "test_with_merge", expected)
|
||||
|
||||
node.query("DROP TABLE test_with_merge;")
|
||||
|
||||
|
||||
def test_force_merge_old_parts_replicated_merge_tree(start_cluster):
|
||||
node.query(
|
||||
"CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/testing/test', 'node') ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;"
|
||||
)
|
||||
node.query("INSERT INTO test_replicated SELECT 1")
|
||||
node.query("INSERT INTO test_replicated SELECT 2")
|
||||
node.query("INSERT INTO test_replicated SELECT 3")
|
||||
|
||||
expected = TSV("""1\n""")
|
||||
check_expected_part_number(10, "test_replicated", expected)
|
||||
|
||||
node.query("DROP TABLE test_replicated;")
|
@ -693,6 +693,19 @@ def test_auto_close_connection(started_cluster):
|
||||
assert count == 2
|
||||
|
||||
|
||||
def test_datetime(started_cluster):
|
||||
cursor = started_cluster.postgres_conn.cursor()
|
||||
cursor.execute("drop table if exists test")
|
||||
cursor.execute("create table test (u timestamp)")
|
||||
|
||||
node1.query("drop database if exists pg")
|
||||
node1.query("create database pg engine = PostgreSQL(postgres1)")
|
||||
assert "DateTime64(6)" in node1.query("show create table pg.test")
|
||||
node1.query("detach table pg.test")
|
||||
node1.query("attach table pg.test")
|
||||
assert "DateTime64(6)" in node1.query("show create table pg.test")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -1,2 +1,3 @@
|
||||
1 1 1
|
||||
2 2 2
|
||||
1
|
||||
|
@ -9,3 +9,13 @@ set allow_experimental_projection_optimization = 1, max_rows_to_read = 3;
|
||||
select * from t where i < 5 and j in (1, 2);
|
||||
|
||||
drop table t;
|
||||
|
||||
drop table if exists test;
|
||||
|
||||
create table test (name String, time Int64) engine MergeTree order by time;
|
||||
|
||||
insert into test values ('hello world', 1662336000241);
|
||||
|
||||
select count() from (select fromUnixTimestamp64Milli(time, 'UTC') time_fmt, name from test where time_fmt > '2022-09-05 00:00:00');
|
||||
|
||||
drop table test;
|
||||
|
14
tests/queries/0_stateless/02353_ascii.reference
Normal file
14
tests/queries/0_stateless/02353_ascii.reference
Normal file
@ -0,0 +1,14 @@
|
||||
50
|
||||
0
|
||||
50
|
||||
0
|
||||
48
|
||||
49
|
||||
50
|
||||
51
|
||||
52
|
||||
53
|
||||
54
|
||||
55
|
||||
56
|
||||
57
|
5
tests/queries/0_stateless/02353_ascii.sql
Normal file
5
tests/queries/0_stateless/02353_ascii.sql
Normal file
@ -0,0 +1,5 @@
|
||||
SELECT ascii('234');
|
||||
SELECT ascii('');
|
||||
SELECT ascii(materialize('234'));
|
||||
SELECT ascii(materialize(''));
|
||||
SELECT ascii(toString(number) || 'abc') from numbers(10);
|
12
tests/queries/0_stateless/02473_optimize_old_parts.reference
Normal file
12
tests/queries/0_stateless/02473_optimize_old_parts.reference
Normal file
@ -0,0 +1,12 @@
|
||||
Without merge
|
||||
3
|
||||
With merge any part range
|
||||
1
|
||||
With merge partition only
|
||||
1
|
||||
With merge replicated any part range
|
||||
1
|
||||
With merge replicated partition only
|
||||
1
|
||||
With merge partition only and new parts
|
||||
3
|
87
tests/queries/0_stateless/02473_optimize_old_parts.sql
Normal file
87
tests/queries/0_stateless/02473_optimize_old_parts.sql
Normal file
@ -0,0 +1,87 @@
|
||||
-- Tags: long
|
||||
|
||||
DROP TABLE IF EXISTS test_without_merge;
|
||||
DROP TABLE IF EXISTS test_with_merge;
|
||||
DROP TABLE IF EXISTS test_replicated;
|
||||
|
||||
SELECT 'Without merge';
|
||||
|
||||
CREATE TABLE test_without_merge (i Int64) ENGINE = MergeTree ORDER BY i;
|
||||
INSERT INTO test_without_merge SELECT 1;
|
||||
INSERT INTO test_without_merge SELECT 2;
|
||||
INSERT INTO test_without_merge SELECT 3;
|
||||
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_without_merge' AND active;
|
||||
|
||||
DROP TABLE test_without_merge;
|
||||
|
||||
SELECT 'With merge any part range';
|
||||
|
||||
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
|
||||
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=false;
|
||||
INSERT INTO test_with_merge SELECT 1;
|
||||
INSERT INTO test_with_merge SELECT 2;
|
||||
INSERT INTO test_with_merge SELECT 3;
|
||||
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
|
||||
|
||||
DROP TABLE test_with_merge;
|
||||
|
||||
SELECT 'With merge partition only';
|
||||
|
||||
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
|
||||
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
|
||||
INSERT INTO test_with_merge SELECT 1;
|
||||
INSERT INTO test_with_merge SELECT 2;
|
||||
INSERT INTO test_with_merge SELECT 3;
|
||||
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
|
||||
|
||||
DROP TABLE test_with_merge;
|
||||
|
||||
SELECT 'With merge replicated any part range';
|
||||
|
||||
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473', 'node') ORDER BY i
|
||||
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=false;
|
||||
INSERT INTO test_replicated SELECT 1;
|
||||
INSERT INTO test_replicated SELECT 2;
|
||||
INSERT INTO test_replicated SELECT 3;
|
||||
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_replicated' AND active;
|
||||
|
||||
DROP TABLE test_replicated;
|
||||
|
||||
SELECT 'With merge replicated partition only';
|
||||
|
||||
CREATE TABLE test_replicated (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test02473_partition_only', 'node') ORDER BY i
|
||||
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
|
||||
INSERT INTO test_replicated SELECT 1;
|
||||
INSERT INTO test_replicated SELECT 2;
|
||||
INSERT INTO test_replicated SELECT 3;
|
||||
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_replicated' AND active;
|
||||
|
||||
DROP TABLE test_replicated;
|
||||
|
||||
SELECT 'With merge partition only and new parts';
|
||||
|
||||
CREATE TABLE test_with_merge (i Int64) ENGINE = MergeTree ORDER BY i
|
||||
SETTINGS min_age_to_force_merge_seconds=3, min_age_to_force_merge_on_partition_only=true;
|
||||
SYSTEM STOP MERGES test_with_merge;
|
||||
-- These three parts will have min_age=6 at the time of merge
|
||||
INSERT INTO test_with_merge SELECT 1;
|
||||
INSERT INTO test_with_merge SELECT 2;
|
||||
SELECT sleepEachRow(1) FROM numbers(9) FORMAT Null;
|
||||
-- These three parts will have min_age=0 at the time of merge
|
||||
-- and so, nothing will be merged.
|
||||
INSERT INTO test_with_merge SELECT 3;
|
||||
SYSTEM START MERGES test_with_merge;
|
||||
|
||||
SELECT count(*) FROM system.parts WHERE database = currentDatabase() AND table='test_with_merge' AND active;
|
||||
|
||||
DROP TABLE test_with_merge;
|
1
tests/queries/0_stateless/02476_fix_cast_parser_bug.sql
Normal file
1
tests/queries/0_stateless/02476_fix_cast_parser_bug.sql
Normal file
@ -0,0 +1 @@
|
||||
SELECT CAST(a, b -> c) ++; -- { clientError SYNTAX_ERROR }
|
@ -20,17 +20,13 @@ add_subdirectory (report)
|
||||
# Not used in package
|
||||
if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
|
||||
add_subdirectory (compressor)
|
||||
add_subdirectory (iotest)
|
||||
add_subdirectory (corrector_utf8)
|
||||
add_subdirectory (zookeeper-cli)
|
||||
add_subdirectory (zookeeper-dump-tree)
|
||||
add_subdirectory (zookeeper-remove-by-list)
|
||||
add_subdirectory (zookeeper-create-entry-to-download-part)
|
||||
add_subdirectory (zookeeper-adjust-block-numbers-to-parts)
|
||||
add_subdirectory (wikistat-loader)
|
||||
add_subdirectory (check-marks)
|
||||
add_subdirectory (checksum-for-compressed-block)
|
||||
add_subdirectory (db-generator)
|
||||
add_subdirectory (wal-dump)
|
||||
add_subdirectory (check-mysql-binlog)
|
||||
add_subdirectory (keeper-bench)
|
||||
|
@ -1,2 +0,0 @@
|
||||
clickhouse_add_executable (query_db_generator query_db_generator.cpp)
|
||||
target_link_libraries(query_db_generator PRIVATE clickhouse_parsers boost::program_options)
|
@ -1,35 +0,0 @@
|
||||
# Clickhouse query analysis
|
||||
|
||||
Here we will consider only `SELECT` queries, i.e. those queries that get data from the table.
|
||||
The built-in Clickhouse parser accepts a string as input, which is a query. Among 14 main clauses of `SELECT` statement: `WITH`, `SELECT`, `TABLES`, `PREWHERE`, `WHERE`, `GROUP_BY`, `HAVING`, `ORDER_BY`, `LIMIT_BY_OFFSET`, `LIMIT_BY_LENGTH`, `LIMIT_BY`, `LIMIT_OFFSET`, `LIMIT_LENGTH`, `SETTINGS`, we will analyze the `SELECT`, `TABLES`, `WHERE`, `GROUP_BY`, `HAVING`, `ORDER_BY` clauses because the most of data is there. We need this data to analyze the structure and to identify values. The parser issues a tree structure after parsing a query, where each node is a specific query execution operation, a function over values, a constant, a designation, etc. Nodes also have subtrees where their arguments or suboperations are located. We will try to reveal the data we need by avoiding this tree.
|
||||
|
||||
## Scheme analysis
|
||||
|
||||
It is necessary to determine possible tables by a query. Having a query string, you can understand which parts of it represent the names of the tables, so you can determine their number in our database.
|
||||
In the Clickhouse parser, `TABLES` (Figure 1) is a query subtree responsible for tables where we get data. It contains the main table where the columns come from, as well as the `JOIN` operations that are performed in the query. Avoiding all nodes in the subtree, we use the names of the tables and databases where they are located, as well as their alias, i.e. the shortened names chosen by the query author. We may need these names to determine the ownership of the column in the future.
|
||||
Thus, we get a set of databases for the query, as well as tables and their aliases, with the help of them a query is made.
|
||||
|
||||
Then we need to define the set of columns that are in the query and the tables they can refer to. The set of columns in each table is already known during the query execution. Therefore, the program automatically links the column and table at runtime. However, in our case, it is impossible to unambiguously interpret the belonging of a column to a specific table, for example, in the following query `SELECT column1, column2, column3 FROM table1 JOIN table2 on table1.column2 = table2.column3`. In this case, we can say which table `column2` and `column3` belong to. However, `column1` can belong to either the first or the second table. We will refer undefined columns to the main table, on which a query is made, for unambiguous interpretation of such cases. For example, in this case, it will be `table1`.
|
||||
All columns in the tree are in `IDENTIFIER` type nodes, which are in the `SELECT`, `TABLES`, `WHERE`, `GROUP_BY`, `HAVING`, `ORDER_BY` subtrees. We form a set of all tables recursively avoiding the subtrees, then we split the column into constituents such as the table (if it is explicitly specified with a dot) and the name. Then, since the table can be an alias, we replace the alias with the original table name. We now have a list of all the columns and tables they belong to. We define the main query table for non-table columns.
|
||||
|
||||
## Column analysis
|
||||
|
||||
Then we need to exactly define data types for columns that have a value in the query. An example is the boolean `WHERE` clause where we test boolean expressions in its attributes. If the query specifies `column > 5`, then we can conclude that this column contains a numeric value, or if the `LIKE` expression is applied to the attribute, then the attribute has a string type.
|
||||
In this part, you need to learn how to extract such expressions from a query and match data types for columns, where it is possible. At the same time, it is clear that it is not always possible to make an unambiguous decision about the type of a particular attribute from the available values. For example, `column > 5` can mean many numeric types such as `UINT8`, `UINT32`, `INT32`, `INT64`, etc. It is necessary to determine the interpretation of certain values since searching through all possible values can be quite large and long.
|
||||
It can take a long time to iterate over all possible values, so we use `INT64` and `FLOAT64` types for numeric values, `STRING` for strings, `DATE` and `DATETIME` for dates, and `ARRAY`.
|
||||
We can determine column values using boolean, arithmetic and other functions on the column values that are specified in the query. Such functions are in the `SELECT` and `WHERE` subtrees. The function parameter can be a constant, a column or another function (Figure 2). Thus, the following parameters can help to understand the type of the column:
|
||||
- The types of arguments that a function can take, for example, the `TOSTARTOFMINUTE` function (truncate time up to a multiple of 5 minutes down) can only accept `DATETIME`, so if the argument of this function is a column, then this column has `DATETIME` type.
|
||||
- The types of the remaining arguments in this function. For example, the `EQUALS` function means equality of its argument types, so if a constant and a column are present in this function, then we can define the type of the column as the type of the constant.
|
||||
|
||||
Thus, we define the possible argument types, the return type, the parameter for each function, and the function arguments of the identical type. The recursive function handler will determine the possible types of columns used in these functions by the values of the arguments, and then return the possible types of the function's result.
|
||||
Now, for each column, we have many possible types of values. We will choose one specific type from this set to interpret the query unambiguously.
|
||||
|
||||
## Column values definition
|
||||
|
||||
At this stage, we already have a certain structure of the database tables, we need to fill this table with values. We should understand which columns depend on each other when executing the function (for example, the join is done according to two columns, which means that they must have the same values). We also need to understand what values the columns must have to fulfill various conditions during execution.
|
||||
We search for all comparison operations in our query to achieve the goal. If the arguments of the operation are two columns, then we consider them linked. If the arguments are the column and the value, then we assign that value to the possible column value and add the value with some noise. A random number is a noise for a numeric type, it is a random number of days for a date, etc. In this case, a handler for this operation is required for each comparison operation, which generates at least two values, one of them is the operation condition, and the other is not. For example, a value greater than 5 and less than or equal to 5 must be assigned for the operation `column1 > 5`, `column1`, for the operation `column2 LIKE some% string` the same is true. The satisfying and not satisfying expression must be assigned to `column2`.
|
||||
Now we have many associated columns and many values. We know that the connectivity of columns is symmetric, but we need to add transitivity for a complete definition, because if `column1 = column2` and `column2 = column3`, then `column1 = column3`, but this does not follow from the construction. Accordingly, we need to extend the connectivity across all columns. We combine multiple values for each column with the values associated with it. If we have columns with no values, then we generate random values.
|
||||
|
||||
## Generation
|
||||
|
||||
We have a complete view of the database schema as well as many values for each table now. We will generate data by cartesian product of the value set of each column for a specific table. Thus, we get a set for each table, consisting of sets of values for each column. We start generating queries that create this table and fill it with data. We generate the `CREATE QUERY` that creates this table based on the structure of the table and the types of its columns, and then we generate the `INSERT QUERY` over the set of values, which fills the table with data.
|
File diff suppressed because it is too large
Load Diff
@ -1,9 +0,0 @@
|
||||
|
||||
clickhouse_add_executable (iotest iotest.cpp ${SRCS})
|
||||
target_link_libraries (iotest PRIVATE clickhouse_common_io)
|
||||
|
||||
clickhouse_add_executable (iotest_nonblock iotest_nonblock.cpp ${SRCS})
|
||||
target_link_libraries (iotest_nonblock PRIVATE clickhouse_common_io)
|
||||
|
||||
clickhouse_add_executable (iotest_aio iotest_aio.cpp ${SRCS})
|
||||
target_link_libraries (iotest_aio PRIVATE clickhouse_common_io)
|
@ -1,197 +0,0 @@
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Poco/Exception.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <base/getPageSize.h>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <vector>
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <ctime>
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_CLOSE_FILE;
|
||||
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
||||
extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
enum Mode
|
||||
{
|
||||
MODE_NONE = 0,
|
||||
MODE_READ = 1,
|
||||
MODE_WRITE = 2,
|
||||
MODE_ALIGNED = 4,
|
||||
MODE_DIRECT = 8,
|
||||
MODE_SYNC = 16,
|
||||
};
|
||||
|
||||
|
||||
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t count)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
Memory<> direct_buf(block_size, ::getPageSize());
|
||||
std::vector<char> simple_buf(block_size);
|
||||
|
||||
char * buf;
|
||||
if ((mode & MODE_DIRECT))
|
||||
buf = direct_buf.data();
|
||||
else
|
||||
buf = simple_buf.data();
|
||||
|
||||
pcg64 rng(randomSeed());
|
||||
|
||||
for (size_t i = 0; i < count; ++i)
|
||||
{
|
||||
uint64_t rand_result1 = rng();
|
||||
uint64_t rand_result2 = rng();
|
||||
uint64_t rand_result3 = rng();
|
||||
|
||||
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
|
||||
size_t offset;
|
||||
if ((mode & MODE_DIRECT) || (mode & MODE_ALIGNED))
|
||||
offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
|
||||
else
|
||||
offset = min_offset + rand_result % (max_offset - min_offset - block_size + 1);
|
||||
|
||||
if (mode & MODE_READ)
|
||||
{
|
||||
if (static_cast<int>(block_size) != pread(fd, buf, block_size, offset))
|
||||
throwFromErrno("Cannot read", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (static_cast<int>(block_size) != pwrite(fd, buf, block_size, offset))
|
||||
throwFromErrno("Cannot write", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int mainImpl(int argc, char ** argv)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
const char * file_name = nullptr;
|
||||
int mode = MODE_NONE;
|
||||
UInt64 min_offset = 0;
|
||||
UInt64 max_offset = 0;
|
||||
UInt64 block_size = 0;
|
||||
UInt64 threads = 0;
|
||||
UInt64 count = 0;
|
||||
|
||||
if (argc != 8)
|
||||
{
|
||||
std::cerr << "Usage: " << argv[0] << " file_name (r|w)[a][d][s] min_offset max_offset block_size threads count" << std::endl <<
|
||||
"a - aligned, d - direct, s - sync" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
file_name = argv[1];
|
||||
min_offset = parse<UInt64>(argv[3]);
|
||||
max_offset = parse<UInt64>(argv[4]);
|
||||
block_size = parse<UInt64>(argv[5]);
|
||||
threads = parse<UInt64>(argv[6]);
|
||||
count = parse<UInt64>(argv[7]);
|
||||
|
||||
for (int i = 0; argv[2][i]; ++i)
|
||||
{
|
||||
char c = argv[2][i];
|
||||
switch (c)
|
||||
{
|
||||
case 'r':
|
||||
mode |= MODE_READ;
|
||||
break;
|
||||
case 'w':
|
||||
mode |= MODE_WRITE;
|
||||
break;
|
||||
case 'a':
|
||||
mode |= MODE_ALIGNED;
|
||||
break;
|
||||
case 'd':
|
||||
mode |= MODE_DIRECT;
|
||||
break;
|
||||
case 's':
|
||||
mode |= MODE_SYNC;
|
||||
break;
|
||||
default:
|
||||
throw Poco::Exception("Invalid mode");
|
||||
}
|
||||
}
|
||||
|
||||
ThreadPool pool(threads);
|
||||
|
||||
#ifndef OS_DARWIN
|
||||
int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_DIRECT) ? O_DIRECT : 0) | ((mode & MODE_SYNC) ? O_SYNC : 0));
|
||||
#else
|
||||
int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_SYNC) ? O_SYNC : 0));
|
||||
#endif
|
||||
if (-1 == fd)
|
||||
throwFromErrno("Cannot open file", ErrorCodes::CANNOT_OPEN_FILE);
|
||||
#ifdef OS_DARWIN
|
||||
if (mode & MODE_DIRECT)
|
||||
if (fcntl(fd, F_NOCACHE, 1) == -1)
|
||||
throwFromErrno("Cannot open file", ErrorCodes::CANNOT_CLOSE_FILE);
|
||||
#endif
|
||||
Stopwatch watch;
|
||||
|
||||
for (size_t i = 0; i < threads; ++i)
|
||||
pool.scheduleOrThrowOnError([=]{ thread(fd, mode, min_offset, max_offset, block_size, count); });
|
||||
pool.wait();
|
||||
|
||||
#if defined(OS_DARWIN)
|
||||
fsync(fd);
|
||||
#else
|
||||
fdatasync(fd);
|
||||
#endif
|
||||
|
||||
watch.stop();
|
||||
|
||||
if (0 != close(fd))
|
||||
throwFromErrno("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
|
||||
|
||||
std::cout << std::fixed << std::setprecision(2)
|
||||
<< "Done " << count << " * " << threads << " ops";
|
||||
if (mode & MODE_ALIGNED)
|
||||
std::cout << " (aligned)";
|
||||
if (mode & MODE_DIRECT)
|
||||
std::cout << " (direct)";
|
||||
if (mode & MODE_SYNC)
|
||||
std::cout << " (sync)";
|
||||
std::cout << " in " << watch.elapsedSeconds() << " sec."
|
||||
<< ", " << count * threads / watch.elapsedSeconds() << " ops/sec."
|
||||
<< ", " << count * threads * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
|
||||
<< std::endl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
return mainImpl(argc, argv);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.message() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
@ -1,203 +0,0 @@
|
||||
#if !defined(OS_LINUX)
|
||||
int main(int, char **) { return 0; }
|
||||
#else
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <cstdlib>
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <vector>
|
||||
#include <Poco/Exception.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <base/getPageSize.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <cstdio>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <IO/AIO.h>
|
||||
#include <malloc.h>
|
||||
#include <sys/syscall.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_CLOSE_FILE;
|
||||
extern const int CANNOT_IO_SUBMIT;
|
||||
extern const int CANNOT_IO_GETEVENTS;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
enum Mode
|
||||
{
|
||||
MODE_READ = 1,
|
||||
MODE_WRITE = 2,
|
||||
};
|
||||
|
||||
|
||||
void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block_size, size_t buffers_count, size_t count)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
AIOContext ctx;
|
||||
|
||||
std::vector<Memory<>> buffers(buffers_count);
|
||||
for (size_t i = 0; i < buffers_count; ++i)
|
||||
buffers[i] = Memory<>(block_size, ::getPageSize());
|
||||
|
||||
pcg64_fast rng(randomSeed());
|
||||
|
||||
size_t in_progress = 0;
|
||||
size_t blocks_sent = 0;
|
||||
std::vector<bool> buffer_used(buffers_count, false);
|
||||
std::vector<iocb> iocbs(buffers_count);
|
||||
std::vector<iocb*> query_cbs;
|
||||
std::vector<io_event> events(buffers_count);
|
||||
|
||||
while (blocks_sent < count || in_progress > 0)
|
||||
{
|
||||
/// Prepare queries.
|
||||
query_cbs.clear();
|
||||
for (size_t i = 0; i < buffers_count; ++i)
|
||||
{
|
||||
if (blocks_sent >= count || in_progress >= buffers_count)
|
||||
break;
|
||||
|
||||
if (buffer_used[i])
|
||||
continue;
|
||||
|
||||
buffer_used[i] = true;
|
||||
++blocks_sent;
|
||||
++in_progress;
|
||||
|
||||
char * buf = buffers[i].data();
|
||||
|
||||
uint64_t rand_result1 = rng();
|
||||
uint64_t rand_result2 = rng();
|
||||
uint64_t rand_result3 = rng();
|
||||
|
||||
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
|
||||
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
|
||||
|
||||
iocb & cb = iocbs[i];
|
||||
memset(&cb, 0, sizeof(cb));
|
||||
cb.aio_buf = reinterpret_cast<UInt64>(buf);
|
||||
cb.aio_fildes = fd;
|
||||
cb.aio_nbytes = block_size;
|
||||
cb.aio_offset = offset;
|
||||
cb.aio_data = static_cast<UInt64>(i);
|
||||
|
||||
if (mode == MODE_READ)
|
||||
{
|
||||
cb.aio_lio_opcode = IOCB_CMD_PREAD;
|
||||
}
|
||||
else
|
||||
{
|
||||
cb.aio_lio_opcode = IOCB_CMD_PWRITE;
|
||||
}
|
||||
|
||||
query_cbs.push_back(&cb);
|
||||
}
|
||||
|
||||
/// Send queries.
|
||||
if (io_submit(ctx.ctx, query_cbs.size(), query_cbs.data()) < 0)
|
||||
throwFromErrno("io_submit failed", ErrorCodes::CANNOT_IO_SUBMIT);
|
||||
|
||||
/// Receive answers. If we have something else to send, then receive at least one answer (after that send them), otherwise wait all answers.
|
||||
memset(events.data(), 0, buffers_count * sizeof(events[0]));
|
||||
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, events.data(), nullptr);
|
||||
if (evs < 0)
|
||||
throwFromErrno("io_getevents failed", ErrorCodes::CANNOT_IO_GETEVENTS);
|
||||
|
||||
for (int i = 0; i < evs; ++i)
|
||||
{
|
||||
int b = static_cast<int>(events[i].data);
|
||||
if (events[i].res != static_cast<int>(block_size))
|
||||
throw Poco::Exception("read/write error");
|
||||
--in_progress;
|
||||
buffer_used[b] = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int mainImpl(int argc, char ** argv)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
const char * file_name = nullptr;
|
||||
int mode = MODE_READ;
|
||||
UInt64 min_offset = 0;
|
||||
UInt64 max_offset = 0;
|
||||
UInt64 block_size = 0;
|
||||
UInt64 buffers_count = 0;
|
||||
UInt64 threads_count = 0;
|
||||
UInt64 count = 0;
|
||||
|
||||
if (argc != 9)
|
||||
{
|
||||
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
file_name = argv[1];
|
||||
if (argv[2][0] == 'w')
|
||||
mode = MODE_WRITE;
|
||||
min_offset = parse<UInt64>(argv[3]);
|
||||
max_offset = parse<UInt64>(argv[4]);
|
||||
block_size = parse<UInt64>(argv[5]);
|
||||
threads_count = parse<UInt64>(argv[6]);
|
||||
buffers_count = parse<UInt64>(argv[7]);
|
||||
count = parse<UInt64>(argv[8]);
|
||||
|
||||
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
|
||||
if (-1 == fd)
|
||||
throwFromErrno("Cannot open file", ErrorCodes::CANNOT_OPEN_FILE);
|
||||
|
||||
ThreadPool pool(threads_count);
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
for (size_t i = 0; i < threads_count; ++i)
|
||||
pool.scheduleOrThrowOnError([=]{ thread(fd, mode, min_offset, max_offset, block_size, buffers_count, count); });
|
||||
pool.wait();
|
||||
|
||||
watch.stop();
|
||||
|
||||
if (0 != close(fd))
|
||||
throwFromErrno("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
|
||||
|
||||
std::cout << std::fixed << std::setprecision(2)
|
||||
<< "Done " << count << " * " << threads_count << " ops";
|
||||
std::cout << " in " << watch.elapsedSeconds() << " sec."
|
||||
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
|
||||
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
|
||||
<< std::endl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
return mainImpl(argc, argv);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.message() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
#endif
|
@ -1,177 +0,0 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <Poco/Exception.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/randomSeed.h>
|
||||
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <vector>
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <poll.h>
|
||||
#include <cstdlib>
|
||||
#include <ctime>
|
||||
#include <unistd.h>
|
||||
|
||||
#if defined (OS_LINUX)
|
||||
# include <malloc.h>
|
||||
#endif
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_CLOSE_FILE;
|
||||
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
||||
extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
|
||||
extern const int CANNOT_FSYNC;
|
||||
extern const int SYSTEM_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
enum Mode
|
||||
{
|
||||
MODE_READ,
|
||||
MODE_WRITE,
|
||||
};
|
||||
|
||||
|
||||
int mainImpl(int argc, char ** argv)
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
const char * file_name = nullptr;
|
||||
Mode mode = MODE_READ;
|
||||
UInt64 min_offset = 0;
|
||||
UInt64 max_offset = 0;
|
||||
UInt64 block_size = 0;
|
||||
UInt64 descriptors = 0;
|
||||
UInt64 count = 0;
|
||||
|
||||
if (argc != 8)
|
||||
{
|
||||
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size descriptors count" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
file_name = argv[1];
|
||||
min_offset = parse<UInt64>(argv[3]);
|
||||
max_offset = parse<UInt64>(argv[4]);
|
||||
block_size = parse<UInt64>(argv[5]);
|
||||
descriptors = parse<UInt64>(argv[6]);
|
||||
count = parse<UInt64>(argv[7]);
|
||||
|
||||
if (!strcmp(argv[2], "r"))
|
||||
mode = MODE_READ;
|
||||
else if (!strcmp(argv[2], "w"))
|
||||
mode = MODE_WRITE;
|
||||
else
|
||||
throw Poco::Exception("Invalid mode");
|
||||
|
||||
std::vector<int> fds(descriptors);
|
||||
for (size_t i = 0; i < descriptors; ++i)
|
||||
{
|
||||
fds[i] = open(file_name, O_SYNC | ((mode == MODE_READ) ? O_RDONLY : O_WRONLY));
|
||||
if (-1 == fds[i])
|
||||
throwFromErrno("Cannot open file", ErrorCodes::CANNOT_OPEN_FILE);
|
||||
}
|
||||
|
||||
std::vector<char> buf(block_size);
|
||||
|
||||
pcg64 rng(randomSeed());
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
std::vector<pollfd> polls(descriptors);
|
||||
|
||||
for (size_t i = 0; i < descriptors; ++i)
|
||||
{
|
||||
polls[i].fd = fds[i];
|
||||
polls[i].events = (mode == MODE_READ) ? POLLIN : POLLOUT;
|
||||
polls[i].revents = 0;
|
||||
}
|
||||
|
||||
size_t ops = 0;
|
||||
while (ops < count)
|
||||
{
|
||||
if (poll(polls.data(), static_cast<nfds_t>(descriptors), -1) <= 0)
|
||||
throwFromErrno("poll failed", ErrorCodes::SYSTEM_ERROR);
|
||||
for (size_t i = 0; i < descriptors; ++i)
|
||||
{
|
||||
if (!polls[i].revents)
|
||||
continue;
|
||||
|
||||
if (polls[i].revents != polls[i].events)
|
||||
throw Poco::Exception("revents indicates error");
|
||||
polls[i].revents = 0;
|
||||
++ops;
|
||||
|
||||
uint64_t rand_result1 = rng();
|
||||
uint64_t rand_result2 = rng();
|
||||
uint64_t rand_result3 = rng();
|
||||
|
||||
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
|
||||
size_t offset;
|
||||
offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
|
||||
|
||||
if (mode == MODE_READ)
|
||||
{
|
||||
if (static_cast<int>(block_size) != pread(fds[i], buf.data(), block_size, offset))
|
||||
throwFromErrno("Cannot read", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (static_cast<int>(block_size) != pwrite(fds[i], buf.data(), block_size, offset))
|
||||
throwFromErrno("Cannot write", ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < descriptors; ++i)
|
||||
{
|
||||
#if defined(OS_DARWIN)
|
||||
if (fsync(fds[i]))
|
||||
throwFromErrno("Cannot fsync", ErrorCodes::CANNOT_FSYNC);
|
||||
#else
|
||||
if (fdatasync(fds[i]))
|
||||
throwFromErrno("Cannot fdatasync", ErrorCodes::CANNOT_FSYNC);
|
||||
#endif
|
||||
}
|
||||
|
||||
watch.stop();
|
||||
|
||||
for (size_t i = 0; i < descriptors; ++i)
|
||||
{
|
||||
if (0 != close(fds[i]))
|
||||
throwFromErrno("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
|
||||
}
|
||||
|
||||
std::cout << std::fixed << std::setprecision(2)
|
||||
<< "Done " << count << " ops" << " in " << watch.elapsedSeconds() << " sec."
|
||||
<< ", " << count / watch.elapsedSeconds() << " ops/sec."
|
||||
<< ", " << count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
|
||||
<< std::endl;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
return mainImpl(argc, argv);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.message() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
@ -1,3 +0,0 @@
|
||||
clickhouse_add_executable (zookeeper-adjust-block-numbers-to-parts main.cpp ${SRCS})
|
||||
target_compile_options(zookeeper-adjust-block-numbers-to-parts PRIVATE -Wno-format)
|
||||
target_link_libraries (zookeeper-adjust-block-numbers-to-parts PRIVATE clickhouse_aggregate_functions dbms clickhouse_common_zookeeper boost::program_options)
|
@ -1,286 +0,0 @@
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/program_options.hpp>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
#include <unordered_map>
|
||||
#include <cmath>
|
||||
|
||||
|
||||
std::vector<std::string> getAllShards(zkutil::ZooKeeper & zk, const std::string & root)
|
||||
{
|
||||
return zk.getChildren(root);
|
||||
}
|
||||
|
||||
|
||||
std::vector<std::string> removeNotExistingShards(zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards)
|
||||
{
|
||||
auto existing_shards = getAllShards(zk, root);
|
||||
std::vector<std::string> filtered_shards;
|
||||
filtered_shards.reserve(shards.size());
|
||||
for (const auto & shard : shards)
|
||||
if (std::find(existing_shards.begin(), existing_shards.end(), shard) == existing_shards.end())
|
||||
std::cerr << "Shard " << shard << " not found." << std::endl;
|
||||
else
|
||||
filtered_shards.emplace_back(shard);
|
||||
return filtered_shards;
|
||||
}
|
||||
|
||||
|
||||
std::vector<std::string> getAllTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard)
|
||||
{
|
||||
return zk.getChildren(root + "/" + shard);
|
||||
}
|
||||
|
||||
|
||||
std::vector<std::string> removeNotExistingTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard, const std::vector<std::string> & tables)
|
||||
{
|
||||
auto existing_tables = getAllTables(zk, root, shard);
|
||||
std::vector<std::string> filtered_tables;
|
||||
filtered_tables.reserve(tables.size());
|
||||
for (const auto & table : tables)
|
||||
if (std::find(existing_tables.begin(), existing_tables.end(), table) == existing_tables.end())
|
||||
std::cerr << "\tTable " << table << " not found on shard " << shard << "." << std::endl;
|
||||
else
|
||||
filtered_tables.emplace_back(table);
|
||||
return filtered_tables;
|
||||
}
|
||||
|
||||
|
||||
Int64 getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk,
|
||||
const std::string & replica_path,
|
||||
const std::string & partition_name,
|
||||
const DB::MergeTreeDataFormatVersion & format_version)
|
||||
{
|
||||
auto replicas_path = replica_path + "/replicas";
|
||||
auto replica_hosts = zk.getChildren(replicas_path);
|
||||
Int64 max_block_num = 0;
|
||||
for (const auto & replica_host : replica_hosts)
|
||||
{
|
||||
auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts");
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto info = DB::MergeTreePartInfo::fromPartName(part, format_version);
|
||||
if (info.partition_id == partition_name)
|
||||
max_block_num = std::max<Int64>(info.max_block, max_block_num);
|
||||
}
|
||||
catch (const DB::Exception & ex)
|
||||
{
|
||||
std::cerr << ex.displayText() << ", Part " << part << "skipped." << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
return max_block_num;
|
||||
}
|
||||
|
||||
|
||||
Int64 getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path)
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
zk.get(part_path, &stat);
|
||||
|
||||
/// References:
|
||||
/// https://stackoverflow.com/a/10347910
|
||||
/// https://bowenli86.github.io/2016/07/07/distributed%20system/zookeeper/How-does-ZooKeeper-s-persistent-sequential-id-work/
|
||||
return (stat.cversion + stat.numChildren) / 2;
|
||||
}
|
||||
|
||||
|
||||
std::unordered_map<std::string, Int64> getPartitionsNeedAdjustingBlockNumbers(
|
||||
zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards, const std::vector<std::string> & tables)
|
||||
{
|
||||
std::unordered_map<std::string, Int64> result;
|
||||
|
||||
std::vector<std::string> use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards);
|
||||
|
||||
for (const auto & shard : use_shards)
|
||||
{
|
||||
std::cout << "Shard: " << shard << std::endl;
|
||||
std::vector<std::string> use_tables = tables.empty() ? getAllTables(zk, root, shard) : removeNotExistingTables(zk, root, shard, tables);
|
||||
|
||||
for (const auto & table : use_tables)
|
||||
{
|
||||
std::cout << "\tTable: " << table << std::endl;
|
||||
std::string table_path = root + "/" + shard + "/" + table;
|
||||
std::string blocks_path = table_path + "/block_numbers";
|
||||
|
||||
std::vector<std::string> partitions;
|
||||
DB::MergeTreeDataFormatVersion format_version;
|
||||
try
|
||||
{
|
||||
format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version;
|
||||
partitions = zk.getChildren(blocks_path);
|
||||
}
|
||||
catch (const DB::Exception & ex)
|
||||
{
|
||||
std::cerr << ex.displayText() << ", table " << table << " skipped." << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (const auto & partition : partitions)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::string part_path = blocks_path + "/" + partition;
|
||||
Int64 partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version);
|
||||
Int64 current_block_number = getCurrentBlockNumberForPartition(zk, part_path);
|
||||
if (current_block_number < partition_max_block + 1)
|
||||
{
|
||||
std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number
|
||||
<< ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl;
|
||||
result.emplace(part_path, partition_max_block);
|
||||
}
|
||||
}
|
||||
catch (const DB::Exception & ex)
|
||||
{
|
||||
std::cerr << ex.displayText() << ", partition " << partition << " skipped." << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int64 new_current_block_number)
|
||||
{
|
||||
Int64 current_block_number = getCurrentBlockNumberForPartition(zk, path);
|
||||
|
||||
auto create_ephemeral_nodes = [&](size_t count)
|
||||
{
|
||||
std::string block_prefix = path + "/block-";
|
||||
Coordination::Requests requests;
|
||||
requests.reserve(count);
|
||||
for (size_t i = 0; i != count; ++i)
|
||||
requests.emplace_back(zkutil::makeCreateRequest(block_prefix, "", zkutil::CreateMode::EphemeralSequential));
|
||||
auto responses = zk.multi(requests);
|
||||
|
||||
std::vector<std::string> paths_created;
|
||||
paths_created.reserve(responses.size());
|
||||
for (const auto & response : responses)
|
||||
{
|
||||
const auto * create_response = dynamic_cast<Coordination::CreateResponse*>(response.get());
|
||||
if (!create_response)
|
||||
{
|
||||
std::cerr << "\tCould not create ephemeral node " << block_prefix << std::endl;
|
||||
return false;
|
||||
}
|
||||
paths_created.emplace_back(create_response->path_created);
|
||||
}
|
||||
|
||||
std::sort(paths_created.begin(), paths_created.end());
|
||||
for (const auto & path_created : paths_created)
|
||||
{
|
||||
Int64 number = DB::parse<Int64>(path_created.c_str() + block_prefix.size(), path_created.size() - block_prefix.size());
|
||||
if (number != current_block_number)
|
||||
{
|
||||
char suffix[11] = "";
|
||||
size_t size = snprintf(suffix, sizeof(suffix), "%010lld", current_block_number);
|
||||
std::string expected_path = block_prefix + std::string(suffix, size);
|
||||
std::cerr << "\t" << path_created << ": Ephemeral node has been created with an unexpected path (expected something like "
|
||||
<< expected_path << ")." << std::endl;
|
||||
return false;
|
||||
}
|
||||
std::cout << "\t" << path_created << std::endl;
|
||||
++current_block_number;
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
if (current_block_number >= new_current_block_number)
|
||||
return;
|
||||
|
||||
std::cout << "Creating ephemeral sequential nodes:" << std::endl;
|
||||
create_ephemeral_nodes(1); /// Firstly try to create just a single node.
|
||||
|
||||
/// Create other nodes in batches of 50 nodes.
|
||||
while (current_block_number + 50 <= new_current_block_number) // NOLINT: clang-tidy thinks that the loop is infinite
|
||||
create_ephemeral_nodes(50);
|
||||
|
||||
create_ephemeral_nodes(new_current_block_number - current_block_number);
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
try
|
||||
{
|
||||
/// Parse the command line.
|
||||
namespace po = boost::program_options;
|
||||
po::options_description desc("Allowed options");
|
||||
desc.add_options()
|
||||
("help,h", "show help")
|
||||
("zookeeper,z", po::value<std::string>(), "Addresses of ZooKeeper instances, comma-separated. Example: example01e.clickhouse.com:2181")
|
||||
("path,p", po::value<std::string>(), "[optional] Path of replica queue to insert node (without trailing slash). By default it's /clickhouse/tables")
|
||||
("shard,s", po::value<std::string>(), "[optional] Shards to process, comma-separated. If not specified then the utility will process all the shards.")
|
||||
("table,t", po::value<std::string>(), "[optional] Tables to process, comma-separated. If not specified then the utility will process all the tables.")
|
||||
("dry-run", "[optional] Specify if you want this utility just to analyze block numbers without any changes.");
|
||||
|
||||
po::variables_map options;
|
||||
po::store(po::parse_command_line(argc, argv, desc), options);
|
||||
|
||||
auto show_usage = [&]
|
||||
{
|
||||
std::cout << "Usage: " << std::endl;
|
||||
std::cout << " " << argv[0] << " [options]" << std::endl;
|
||||
std::cout << desc << std::endl;
|
||||
};
|
||||
|
||||
if (options.count("help") || (argc == 1))
|
||||
{
|
||||
std::cout << "This utility adjusts the /block_numbers zookeeper nodes to the correct block number in partition." << std::endl;
|
||||
std::cout << "It might be useful when incorrect block numbers stored in zookeeper don't allow you to insert data into a table or drop/detach a partition." << std::endl;
|
||||
show_usage();
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!options.count("zookeeper"))
|
||||
{
|
||||
std::cerr << "Option --zookeeper should be set." << std::endl;
|
||||
show_usage();
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::string root = options.count("path") ? options.at("path").as<std::string>() : "/clickhouse/tables";
|
||||
|
||||
std::vector<std::string> shards, tables;
|
||||
if (options.count("shard"))
|
||||
boost::split(shards, options.at("shard").as<std::string>(), boost::algorithm::is_any_of(","));
|
||||
if (options.count("table"))
|
||||
boost::split(tables, options.at("table").as<std::string>(), boost::algorithm::is_any_of(","));
|
||||
|
||||
/// Check if the adjusting of the block numbers is required.
|
||||
std::cout << "Checking if adjusting of the block numbers is required:" << std::endl;
|
||||
zkutil::ZooKeeper zookeeper(options.at("zookeeper").as<std::string>());
|
||||
auto part_paths_with_max_block_numbers = getPartitionsNeedAdjustingBlockNumbers(zookeeper, root, shards, tables);
|
||||
|
||||
if (part_paths_with_max_block_numbers.empty())
|
||||
{
|
||||
std::cout << "No adjusting required." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::cout << "Required adjusting of " << part_paths_with_max_block_numbers.size() << " block numbers." << std::endl;
|
||||
|
||||
/// Adjust the block numbers.
|
||||
if (options.count("dry-run"))
|
||||
{
|
||||
std::cout << "This is a dry-run, exiting." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::cout << std::endl << "Adjusting the block numbers:" << std::endl;
|
||||
for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers)
|
||||
setCurrentBlockNumber(zookeeper, part_path, max_block_number + 1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
|
||||
throw;
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
clickhouse_add_executable (zookeeper-create-entry-to-download-part main.cpp ${SRCS})
|
||||
target_link_libraries (zookeeper-create-entry-to-download-part PRIVATE dbms clickhouse_common_zookeeper boost::program_options)
|
@ -1,47 +0,0 @@
|
||||
#include <list>
|
||||
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
try
|
||||
{
|
||||
boost::program_options::options_description desc("Allowed options");
|
||||
desc.add_options()
|
||||
("help,h", "produce help message")
|
||||
("address,a", boost::program_options::value<std::string>()->required(),
|
||||
"addresses of ZooKeeper instances, comma separated. Example: example01e.clickhouse.com:2181")
|
||||
("path,p", boost::program_options::value<std::string>()->required(), "path of replica queue to insert node (without trailing slash)")
|
||||
("name,n", boost::program_options::value<std::string>()->required(), "name of part to download")
|
||||
;
|
||||
|
||||
boost::program_options::variables_map options;
|
||||
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
|
||||
|
||||
if (options.count("help"))
|
||||
{
|
||||
std::cout << "Insert log entry to replication queue to download part from any replica." << std::endl;
|
||||
std::cout << "Usage: " << argv[0] << " [options]" << std::endl;
|
||||
std::cout << desc << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
std::string path = options.at("path").as<std::string>();
|
||||
std::string name = options.at("name").as<std::string>();
|
||||
|
||||
zkutil::ZooKeeper zookeeper(options.at("address").as<std::string>());
|
||||
|
||||
DB::ReplicatedMergeTreeLogEntry entry;
|
||||
entry.type = DB::ReplicatedMergeTreeLogEntry::MERGE_PARTS;
|
||||
entry.source_parts = {name};
|
||||
entry.new_part_name = name;
|
||||
|
||||
zookeeper.create(path + "/queue-", entry.toString(), zkutil::CreateMode::PersistentSequential);
|
||||
return 0;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
|
||||
throw;
|
||||
}
|
Loading…
Reference in New Issue
Block a user