Merge remote-tracking branch 'rschu1ze/master' into locate-mysql

This commit is contained in:
Robert Schulze 2024-03-15 13:11:22 +00:00
commit 9546370544
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
29 changed files with 395 additions and 201 deletions

View File

@ -45,62 +45,3 @@ jobs:
with:
data: "${{ needs.RunConfig.outputs.data }}"
set_latest: true
SonarCloud:
runs-on: [self-hosted, builder]
env:
SONAR_SCANNER_VERSION: 4.8.0.2856
SONAR_SERVER_URL: "https://sonarcloud.io"
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
CC: clang-17
CXX: clang++-17
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
filter: tree:0
submodules: true
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Download and set up sonar-scanner
env:
SONAR_SCANNER_DOWNLOAD_URL: https://binaries.sonarsource.com/Distribution/sonar-scanner-cli/sonar-scanner-cli-${{ env.SONAR_SCANNER_VERSION }}-linux.zip
run: |
mkdir -p "$HOME/.sonar"
curl -sSLo "$HOME/.sonar/sonar-scanner.zip" "${{ env.SONAR_SCANNER_DOWNLOAD_URL }}"
unzip -o "$HOME/.sonar/sonar-scanner.zip" -d "$HOME/.sonar/"
echo "$HOME/.sonar/sonar-scanner-${{ env.SONAR_SCANNER_VERSION }}-linux/bin" >> "$GITHUB_PATH"
- name: Download and set up build-wrapper
env:
BUILD_WRAPPER_DOWNLOAD_URL: ${{ env.SONAR_SERVER_URL }}/static/cpp/build-wrapper-linux-x86.zip
run: |
curl -sSLo "$HOME/.sonar/build-wrapper-linux-x86.zip" "${{ env.BUILD_WRAPPER_DOWNLOAD_URL }}"
unzip -o "$HOME/.sonar/build-wrapper-linux-x86.zip" -d "$HOME/.sonar/"
echo "$HOME/.sonar/build-wrapper-linux-x86" >> "$GITHUB_PATH"
- name: Set Up Build Tools
run: |
sudo apt-get update
sudo apt-get install -yq git cmake ccache ninja-build python3 yasm nasm
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
- name: Run build-wrapper
run: |
mkdir build
cd build
cmake ..
cd ..
build-wrapper-linux-x86-64 --out-dir ${{ env.BUILD_WRAPPER_OUT_DIR }} cmake --build build/
- name: Run sonar-scanner
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: |
sonar-scanner \
--define sonar.host.url="${{ env.SONAR_SERVER_URL }}" \
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
--define sonar.projectKey="ClickHouse_ClickHouse" \
--define sonar.organization="clickhouse-java" \
--define sonar.cfamily.cpp23.enabled=true \
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql"

View File

@ -248,6 +248,9 @@ Some of the files might not download fully. Check the file sizes and re-download
``` bash
$ curl -O https://datasets.clickhouse.com/trips_mergetree/partitions/trips_mergetree.tar
# Validate the checksum
$ md5sum trips_mergetree.tar
# Checksum should be equal to: f3b8d469b41d9a82da064ded7245d12c
$ tar xvf trips_mergetree.tar -C /var/lib/clickhouse # path to ClickHouse data directory
$ # check permissions of unpacked data, fix if required
$ sudo service clickhouse-server restart

View File

@ -26,7 +26,9 @@ priority: 0
is_active: 0
active_children: 0
dequeued_requests: 67
canceled_requests: 0
dequeued_cost: 4692272
canceled_cost: 0
busy_periods: 63
vruntime: 938454.1999999989
system_vruntime: ᴺᵁᴸᴸ
@ -54,7 +56,9 @@ Columns:
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
- `active_children` (`UInt64`) - The number of children in active state.
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
- `canceled_requests` (`UInt64`) - The total number of resource requests canceled from this node.
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
- `canceled_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests canceled from this node.
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.

View File

@ -36,9 +36,9 @@ You can explicitly set a time zone for `DateTime`-type columns when creating a t
The [clickhouse-client](../../interfaces/cli.md) applies the server time zone by default if a time zone isnt explicitly set when initializing the data type. To use the client time zone, run `clickhouse-client` with the `--use_client_time_zone` parameter.
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings-formats.md#date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
ClickHouse outputs values depending on the value of the [date_time_output_format](../../operations/settings/settings.md#settings-date_time_output_format) setting. `YYYY-MM-DD hh:mm:ss` text format by default. Additionally, you can change the output with the [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime) function.
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format) setting.
When inserting data into ClickHouse, you can use different formats of date and time strings, depending on the value of the [date_time_input_format](../../operations/settings/settings.md#settings-date_time_input_format) setting.
## Examples
@ -147,8 +147,8 @@ Time shifts for multiple days. Some pacific islands changed their timezone offse
- [Type conversion functions](../../sql-reference/functions/type-conversion-functions.md)
- [Functions for working with dates and times](../../sql-reference/functions/date-time-functions.md)
- [Functions for working with arrays](../../sql-reference/functions/array-functions.md)
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#date_time_input_format)
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#date_time_output_format)
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#settings-date_time_input_format)
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#settings-date_time_output_format)
- [The `timezone` server configuration parameter](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
- [The `session_timezone` setting](../../operations/settings/settings.md#session_timezone)
- [Operators for working with dates and times](../../sql-reference/operators/index.md#operators-datetime)

View File

@ -27,9 +27,9 @@ DateTime([timezone])
Консольный клиент ClickHouse по умолчанию использует часовой пояс сервера, если для значения `DateTime` часовой пояс не был задан в явном виде при инициализации типа данных. Чтобы использовать часовой пояс клиента, запустите [clickhouse-client](../../interfaces/cli.md) с параметром `--use_client_time_zone`.
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/settings-formats.md#date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
ClickHouse отображает значения в зависимости от значения параметра [date\_time\_output\_format](../../operations/settings/index.md#settings-date_time_output_format). Текстовый формат по умолчанию `YYYY-MM-DD hh:mm:ss`. Кроме того, вы можете поменять отображение с помощью функции [formatDateTime](../../sql-reference/functions/date-time-functions.md#formatdatetime).
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/settings-formats.md#date_time_input_format).
При вставке данных в ClickHouse, можно использовать различные форматы даты и времени в зависимости от значения настройки [date_time_input_format](../../operations/settings/index.md#settings-date_time_input_format).
## Примеры {#primery}
@ -119,8 +119,8 @@ FROM dt
- [Функции преобразования типов](../../sql-reference/functions/type-conversion-functions.md)
- [Функции для работы с датой и временем](../../sql-reference/functions/date-time-functions.md)
- [Функции для работы с массивами](../../sql-reference/functions/array-functions.md)
- [Настройка `date_time_input_format`](../../operations/settings/settings-formats.md#date_time_input_format)
- [Настройка `date_time_output_format`](../../operations/settings/settings-formats.md#date_time_output_format)
- [Настройка `date_time_input_format`](../../operations/settings/index.md#settings-date_time_input_format)
- [Настройка `date_time_output_format`](../../operations/settings/index.md)
- [Конфигурационный параметр сервера `timezone`](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
- [Параметр `session_timezone`](../../operations/settings/settings.md#session_timezone)
- [Операторы для работы с датой и временем](../../sql-reference/operators/index.md#operators-datetime)

View File

@ -9,6 +9,7 @@
#include <IO/MMappedFileCache.h>
#include <IO/ReadHelpers.h>
#include <base/errnoToString.h>
#include <base/find_symbols.h>
#include <base/getPageSize.h>
#include <sys/resource.h>
#include <chrono>
@ -90,6 +91,9 @@ AsynchronousMetrics::AsynchronousMetrics(
openFileIfExists("/sys/fs/cgroup/cpu/cpu.cfs_quota_us", cgroupcpu_cfs_quota);
}
openFileIfExists("/proc/sys/vm/max_map_count", vm_max_map_count);
openFileIfExists("/proc/self/maps", vm_maps);
openSensors();
openBlockDevices();
openEDAC();
@ -1423,6 +1427,55 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
}
}
if (vm_max_map_count)
{
try
{
vm_max_map_count->rewind();
uint64_t max_map_count = 0;
readText(max_map_count, *vm_max_map_count);
new_values["VMMaxMapCount"] = { max_map_count, "The maximum number of memory mappings a process may have (/proc/sys/vm/max_map_count)."};
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
openFileIfExists("/proc/sys/vm/max_map_count", vm_max_map_count);
}
}
if (vm_maps)
{
try
{
vm_maps->rewind();
uint64_t num_maps = 0;
while (!vm_maps->eof())
{
char * next_pos = find_first_symbols<'\n'>(vm_maps->position(), vm_maps->buffer().end());
vm_maps->position() = next_pos;
if (!vm_maps->hasPendingData())
continue;
if (*vm_maps->position() == '\n')
{
++num_maps;
++vm_maps->position();
}
}
new_values["VMNumMaps"] = { num_maps,
"The current number of memory mappings of the process (/proc/self/maps)."
" If it is close to the maximum (VMMaxMapCount), you should increase the limit for vm.max_map_count in /etc/sysctl.conf"};
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
openFileIfExists("/proc/self/maps", vm_maps);
}
}
try
{
for (size_t i = 0, size = thermal.size(); i < size; ++i)

View File

@ -123,6 +123,9 @@ private:
std::optional<ReadBufferFromFilePRead> cgroupcpu_cfs_quota TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> cgroupcpu_max TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> vm_max_map_count TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> vm_maps TSA_GUARDED_BY(data_mutex);
std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal TSA_GUARDED_BY(data_mutex);
std::unordered_map<String /* device name */,

View File

@ -387,7 +387,9 @@ public:
/// Introspection
std::atomic<UInt64> dequeued_requests{0};
std::atomic<UInt64> canceled_requests{0};
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<ResourceCost> canceled_cost{0};
std::atomic<UInt64> busy_periods{0};
};

View File

@ -50,6 +50,12 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
/// Cancel previously enqueued request.
/// Returns `false` and does nothing given unknown or already executed request.
/// Returns `true` if requests has been found and canceled.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual bool cancelRequest(ResourceRequest * request) = 0;
/// For introspection
ResourceCost getBudget() const
{

View File

@ -134,56 +134,65 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (heap_size == 0)
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
if (child_active) // Put active child back in heap after vruntime update
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
if (heap_size == 0)
return {nullptr, false};
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
if (request)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
}
if (child_active) // Put active child back in heap after vruntime update
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
}
}
}
bool isActive() override

View File

@ -39,8 +39,7 @@ public:
void enqueueRequest(ResourceRequest * request) override
{
std::unique_lock lock(mutex);
request->enqueue_ns = clock_gettime_ns();
std::lock_guard lock(mutex);
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(request);
@ -50,7 +49,7 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (requests.empty())
return {nullptr, false};
ResourceRequest * result = requests.front();
@ -63,9 +62,29 @@ public:
return {result, !requests.empty()};
}
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
// TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
{
if (*i == request)
{
requests.erase(i);
if (requests.empty())
busy_periods++;
queue_cost -= request->cost;
canceled_requests++;
canceled_cost += request->cost;
return true;
}
}
return false;
}
bool isActive() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return !requests.empty();
}
@ -98,14 +117,14 @@ public:
std::pair<UInt64, Int64> getQueueLengthAndCost()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return {requests.size(), queue_cost};
}
private:
std::mutex mutex;
Int64 queue_cost = 0;
std::deque<ResourceRequest *> requests;
std::deque<ResourceRequest *> requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
};
}

View File

@ -102,25 +102,31 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (items.empty())
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
// Deactivate child if it is empty
if (!child_active)
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
return {nullptr, false};
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
// Deactivate child if it is empty
if (!child_active)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
}
}
}
bool isActive() override

View File

@ -38,7 +38,6 @@ TEST(SchedulerDynamicResourceManager, Smoke)
{
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
gA.lock();
gA.setFailure();
gA.unlock();
ResourceGuard gB(cB->get("res1"));

View File

@ -4,6 +4,7 @@
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <barrier>
#include <future>
using namespace DB;
@ -73,6 +74,22 @@ struct ResourceHolder
}
};
struct MyRequest : public ResourceRequest
{
std::function<void()> on_execute;
explicit MyRequest(ResourceCost cost_, std::function<void()> on_execute_)
: ResourceRequest(cost_)
, on_execute(on_execute_)
{}
void execute() override
{
if (on_execute)
on_execute();
}
};
TEST(SchedulerRoot, Smoke)
{
ResourceTest t;
@ -111,3 +128,49 @@ TEST(SchedulerRoot, Smoke)
EXPECT_TRUE(fc2->requests.contains(&rg.request));
}
}
TEST(SchedulerRoot, Cancel)
{
ResourceTest t;
ResourceHolder r1(t);
auto * fc1 = r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "<priority>1</priority>");
auto b = r1.addQueue("/prio/B", "<priority>2</priority>");
r1.registerResource();
std::barrier destruct_sync(2);
std::barrier sync(2);
std::thread consumer1([&]
{
MyRequest request(1,[&]
{
sync.arrive_and_wait(); // (A)
EXPECT_TRUE(fc1->requests.contains(&request));
sync.arrive_and_wait(); // (B)
request.finish();
destruct_sync.arrive_and_wait(); // (C)
});
a.queue->enqueueRequest(&request);
destruct_sync.arrive_and_wait(); // (C)
});
std::thread consumer2([&]
{
MyRequest request(1,[&]
{
FAIL() << "This request must be canceled, but instead executes";
});
sync.arrive_and_wait(); // (A) wait for request of consumer1 to be inside execute, so that constraint is in violated state and our request will not be executed immediately
b.queue->enqueueRequest(&request);
bool canceled = b.queue->cancelRequest(&request);
EXPECT_TRUE(canceled);
sync.arrive_and_wait(); // (B) release request of consumer1 to be finished
});
consumer1.join();
consumer2.join();
EXPECT_TRUE(fc1->requests.empty());
}

View File

@ -71,8 +71,7 @@ public:
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (constraint)
constraint->finishRequest(this);
ResourceRequest::finish();
}
static Request & local()
@ -126,12 +125,6 @@ public:
}
}
/// Mark request as unsuccessful; by default request is considered to be successful
void setFailure()
{
request.successful = false;
}
ResourceLink link;
Request & request;
};

View File

@ -0,0 +1,13 @@
#include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ISchedulerConstraint.h>
namespace DB
{
void ResourceRequest::finish()
{
if (constraint)
constraint->finishRequest(this);
}
}

View File

@ -14,9 +14,6 @@ class ISchedulerConstraint;
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
/// Timestamps (nanoseconds since epoch)
using ResourceNs = UInt64;
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
@ -31,7 +28,7 @@ using ResourceNs = UInt64;
* 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request.
* 4) Callback ResourceRequest::execute() is called to provide access to the resource.
* 5) The resource consumption is happening outside of the scheduling subsystem.
* 6) request->constraint->finishRequest() is called when consumption is finished.
* 6) ResourceRequest::finish() is called when consumption is finished.
*
* Steps (5) and (6) can be omitted if constraint is not used by the resource.
*
@ -39,7 +36,10 @@ using ResourceNs = UInt64;
* Request ownership is done outside of the scheduling subsystem.
* After (6) request can be destructed safely.
*
* Request cancelling is not supported yet.
* Request can also be canceled before (3) using ISchedulerQueue::cancelRequest().
* Returning false means it is too late for request to be canceled. It should be processed in a regular way.
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
* and step (6) MUST be omitted.
*/
class ResourceRequest
{
@ -48,32 +48,20 @@ public:
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;
/// Request outcome
/// Should be filled during resource consumption
bool successful;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint;
/// Timestamps for introspection
ResourceNs enqueue_ns;
ResourceNs execute_ns;
ResourceNs finish_ns;
explicit ResourceRequest(ResourceCost cost_ = 1)
{
reset(cost_);
}
/// ResourceRequest object may be reused again after reset()
void reset(ResourceCost cost_)
{
cost = cost_;
successful = true;
constraint = nullptr;
enqueue_ns = 0;
execute_ns = 0;
finish_ns = 0;
}
virtual ~ResourceRequest() = default;
@ -83,6 +71,12 @@ public:
/// just triggering start of a consumption, not doing the consumption itself
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;
/// Stop resource consumption and notify resource scheduler.
/// Should be called when resource consumption is finished by consumer.
/// ResourceRequest should not be destructed or reset before calling to `finish()`.
/// WARNING: this function MUST not be called if request was canceled.
void finish();
};
}

View File

@ -145,22 +145,27 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (current == nullptr) // No active resources
return {nullptr, false};
while (true)
{
if (current == nullptr) // No active resources
return {nullptr, false};
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
assert(request != nullptr);
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
if (request == nullptr) // Possible in case of request cancel, just retry
continue;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
}
}
bool isActive() override
@ -245,7 +250,6 @@ private:
void execute(ResourceRequest * request)
{
request->execute_ns = clock_gettime_ns();
request->execute();
}

View File

@ -48,6 +48,12 @@ PlannerContext::PlannerContext(ContextMutablePtr query_context_, GlobalPlannerCo
, is_ast_level_optimization_allowed(!(query_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY || select_query_options_.ignore_ast_optimizations))
{}
PlannerContext::PlannerContext(ContextMutablePtr query_context_, PlannerContextPtr planner_context_)
: query_context(std::move(query_context_))
, global_planner_context(planner_context_->global_planner_context)
, is_ast_level_optimization_allowed(planner_context_->is_ast_level_optimization_allowed)
{}
TableExpressionData & PlannerContext::getOrCreateTableExpressionData(const QueryTreeNodePtr & table_expression_node)
{
auto [it, _] = table_expression_node_to_data.emplace(table_expression_node, TableExpressionData());

View File

@ -75,12 +75,18 @@ private:
using GlobalPlannerContextPtr = std::shared_ptr<GlobalPlannerContext>;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class PlannerContext
{
public:
/// Create planner context with query context and global planner context
PlannerContext(ContextMutablePtr query_context_, GlobalPlannerContextPtr global_planner_context_, const SelectQueryOptions & select_query_options_);
/// Create planner with modified query_context
PlannerContext(ContextMutablePtr query_context_, PlannerContextPtr planner_context_);
/// Get planner context query context
ContextPtr getQueryContext() const
{
@ -191,6 +197,4 @@ private:
PreparedSets prepared_sets;
};
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
}

View File

@ -422,6 +422,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
auto table_it = selected_tables.begin();
auto modified_context = Context::createCopy(context);
for (size_t i = 0; i < selected_tables.size(); ++i, ++table_it)
{
auto & child_plan = child_plans->at(i);
@ -438,7 +439,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
if (child_plan.row_policy_data_opt)
child_plan.row_policy_data_opt->extendNames(real_column_names);
auto modified_query_info = getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto modified_query_info = getModifiedQueryInfo(modified_context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto source_pipeline = createSources(
child_plan.plan,
@ -547,9 +548,10 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
}
/// Settings will be modified when planning children tables.
auto modified_context = Context::createCopy(context);
for (const auto & table : selected_tables)
{
auto modified_context = Context::createCopy(context);
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
@ -570,25 +572,25 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
auto & aliases = res.back().table_aliases;
auto & row_policy_data_opt = res.back().row_policy_data_opt;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, modified_context);
Names column_names_as_aliases;
Names real_column_names = column_names;
const auto & database_name = std::get<0>(table);
const auto & table_name = std::get<3>(table);
auto row_policy_filter_ptr = context->getRowPolicyFilter(
auto row_policy_filter_ptr = modified_context->getRowPolicyFilter(
database_name,
table_name,
RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter_ptr)
{
row_policy_data_opt = RowPolicyData(row_policy_filter_ptr, storage, context);
row_policy_data_opt = RowPolicyData(row_policy_filter_ptr, storage, modified_context);
row_policy_data_opt->extendNames(real_column_names);
}
auto modified_query_info
= getModifiedQueryInfo(context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
= getModifiedQueryInfo(modified_context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
if (!context->getSettingsRef().allow_experimental_analyzer)
{
@ -657,10 +659,9 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt,
modified_context,
current_streams);
res.back().plan.addInterpreterContext(modified_context);
}
if (!res.empty())
res[0].plan.addInterpreterContext(modified_context);
return res;
}
@ -681,8 +682,9 @@ public:
{
if (column->hasExpression())
{
auto column_name = column->getColumnName();
node = column->getExpressionOrThrow();
node->setAlias(column->getColumnName());
node->setAlias(column_name);
}
else
column->setColumnSource(replacement_table_expression);
@ -863,7 +865,7 @@ QueryTreeNodePtr replaceTableExpressionAndRemoveJoin(
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_context,
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot_,
Names required_column_names,
@ -877,6 +879,9 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextPtr & modified_
if (modified_query_info.optimized_prewhere_info && !modified_query_info.prewhere_info)
modified_query_info.prewhere_info = modified_query_info.optimized_prewhere_info;
if (modified_query_info.planner_context)
modified_query_info.planner_context = std::make_shared<PlannerContext>(modified_context, modified_query_info.planner_context);
if (modified_query_info.table_expression)
{
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot_);

View File

@ -192,7 +192,7 @@ private:
using Aliases = std::vector<AliasData>;
SelectQueryInfo getModifiedQueryInfo(const ContextPtr & modified_context,
SelectQueryInfo getModifiedQueryInfo(const ContextMutablePtr & modified_context,
const StorageWithLockAndName & storage_with_lock_and_name,
const StorageSnapshotPtr & storage_snapshot,
Names required_column_names,

View File

@ -30,7 +30,9 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
{"is_active", std::make_shared<DataTypeUInt8>(), "Whether this node is currently active - has resource requests to be dequeued and constraints satisfied."},
{"active_children", std::make_shared<DataTypeUInt64>(), "The number of children in active state."},
{"dequeued_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests dequeued from this node."},
{"canceled_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests canceled from this node."},
{"dequeued_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."},
{"canceled_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."},
{"busy_periods", std::make_shared<DataTypeUInt64>(), "The total number of deactivations of this node."},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()),
"For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner."},
@ -93,7 +95,9 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[i++]->insert(node->isActive());
res_columns[i++]->insert(node->activeChildren());
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->canceled_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->canceled_cost.load());
res_columns[i++]->insert(node->busy_periods.load());
Field vruntime;

View File

@ -1,3 +1,2 @@
test_concurrent_backups_s3/test.py::test_concurrent_backups
test_distributed_type_object/test.py::test_distributed_type_object
test_merge_table_over_distributed/test.py::test_global_in

View File

@ -12,20 +12,12 @@ cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node", with_zookeeper=True, main_configs=["configs/remote_servers.xml"]
)
node_1 = cluster.add_instance("node_1", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node_1.query_with_retry("DROP TABLE IF EXISTS replicated")
node_1.query_with_retry(
"""CREATE TABLE replicated (id UInt32, date Date) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/replicated', 'node_1') ORDER BY id PARTITION BY toYYYYMM(date)"""
)
node.query_with_retry(
"CREATE TABLE distributed (id UInt32, date Date) ENGINE = Distributed('test_cluster', 'default', 'replicated')"
)
@ -37,8 +29,6 @@ def started_cluster():
def test(started_cluster):
cluster.pause_container("node_1")
node.query("SYSTEM RELOAD CONFIG")
error = node.query_and_get_error(
"SELECT count() FROM distributed SETTINGS receive_timeout=1, handshake_timeout_ms=1"
@ -67,5 +57,3 @@ def test(started_cluster):
assert recovery_time == 0
assert errors_count == 0
cluster.unpause_container("node_1")

View File

@ -0,0 +1,10 @@
alias1
1 4 16 23
23 16 4 1
2020-02-02 1 4 2 16 3 23
alias2
1 3 4 4
4 4 3 1
23 16 4 1
2020-02-01 1 3 2 4 3 4
2020-02-02 1 4 2 16 3 23

View File

@ -0,0 +1,60 @@
-- Tags: no-parallel
drop table if exists merge;
set allow_experimental_analyzer = 1;
create table merge
(
dt Date,
colAlias0 Int32,
colAlias1 Int32,
col2 Int32,
colAlias2 UInt32,
col3 Int32,
colAlias3 UInt32
)
engine = Merge(currentDatabase(), '^alias_');
drop table if exists alias_1;
drop table if exists alias_2;
create table alias_1
(
dt Date,
col Int32,
colAlias0 UInt32 alias col,
colAlias1 UInt32 alias col3 + colAlias0,
col2 Int32,
colAlias2 Int32 alias colAlias1 + col2 + 10,
col3 Int32,
colAlias3 Int32 alias colAlias2 + colAlias1 + col3
)
engine = MergeTree()
order by (dt);
insert into alias_1 (dt, col, col2, col3) values ('2020-02-02', 1, 2, 3);
select 'alias1';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_1;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge;
select * from merge;
create table alias_2
(
dt Date,
col Int32,
col2 Int32,
colAlias0 UInt32 alias col,
colAlias3 Int32 alias col3 + colAlias0,
colAlias1 UInt32 alias colAlias0 + col2,
colAlias2 Int32 alias colAlias0 + colAlias1,
col3 Int32
)
engine = MergeTree()
order by (dt);
insert into alias_2 (dt, col, col2, col3) values ('2020-02-01', 1, 2, 3);
select 'alias2';
select colAlias0, colAlias1, colAlias2, colAlias3 from alias_2;
select colAlias3, colAlias2, colAlias1, colAlias0 from merge order by dt;
select * from merge order by dt;

View File

@ -0,0 +1,4 @@
-- Tags: no-replicated-database
SELECT least(value, 0) FROM system.asynchronous_metrics WHERE metric = 'VMMaxMapCount';
SELECT least(value, 0) FROM system.asynchronous_metrics WHERE metric = 'VMNumMaps';