mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge branch 'master' into s3-streams-scheduler
This commit is contained in:
commit
3458087ee4
@ -0,0 +1,90 @@
|
||||
---
|
||||
slug: /en/sql-reference/aggregate-functions/reference/groupconcat
|
||||
sidebar_position: 363
|
||||
sidebar_label: groupConcat
|
||||
title: groupConcat
|
||||
---
|
||||
|
||||
Calculates a concatenated string from a group of strings, optionally separated by a delimiter, and optionally limited by a maximum number of elements.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
groupConcat(expression [, delimiter] [, limit]);
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `expression` — The expression or column name that outputs strings to be concatenated..
|
||||
- `delimiter` — A [string](../../../sql-reference/data-types/string.md) that will be used to separate concatenated values. This parameter is optional and defaults to an empty string if not specified.
|
||||
- `limit` — A positive [integer](../../../sql-reference/data-types/int-uint.md) specifying the maximum number of elements to concatenate. If more elements are present, excess elements are ignored. This parameter is optional.
|
||||
|
||||
:::note
|
||||
If delimiter is specified without limit, it must be the first parameter following the expression. If both delimiter and limit are specified, delimiter must precede limit.
|
||||
:::
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Returns a [string](../../../sql-reference/data-types/string.md) consisting of the concatenated values of the column or expression. If the group has no elements or only null elements, and the function does not specify a handling for only null values, the result is a nullable string with a null value.
|
||||
|
||||
**Examples**
|
||||
|
||||
Input table:
|
||||
|
||||
``` text
|
||||
┌─id─┬─name─┐
|
||||
│ 1 │ John│
|
||||
│ 2 │ Jane│
|
||||
│ 3 │ Bob│
|
||||
└────┴──────┘
|
||||
```
|
||||
|
||||
1. Basic usage without a delimiter:
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT groupConcat(Name) FROM Employees;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
JohnJaneBob
|
||||
```
|
||||
|
||||
This concatenates all names into one continuous string without any separator.
|
||||
|
||||
|
||||
2. Using comma as a delimiter:
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT groupConcat(Name, ', ', 2) FROM Employees;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
John, Jane, Bob
|
||||
```
|
||||
|
||||
This output shows the names separated by a comma followed by a space.
|
||||
|
||||
|
||||
3. Limiting the number of concatenated elements
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT groupConcat(Name, ', ', 2) FROM Employees;
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
John, Jane
|
||||
```
|
||||
|
||||
This query limits the output to the first two names, even though there are more names in the table.
|
@ -44,13 +44,12 @@
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTColumnDeclaration.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/Kusto/ParserKQLStatement.h>
|
||||
#include <Parsers/PRQL/ParserPRQLQuery.h>
|
||||
#include <Parsers/Kusto/ParserKQLStatement.h>
|
||||
#include <Parsers/Kusto/parseKQLQuery.h>
|
||||
|
||||
#include <Processors/Formats/Impl/NullFormat.h>
|
||||
#include <Processors/Formats/IInputFormat.h>
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
|
@ -11,10 +11,10 @@
|
||||
#include <Poco/Util/XMLConfiguration.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <boost/intrusive/list.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <queue>
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -30,6 +30,8 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
class ISchedulerNode;
|
||||
class EventQueue;
|
||||
using EventId = UInt64;
|
||||
|
||||
inline const Poco::Util::AbstractConfiguration & emptyConfig()
|
||||
{
|
||||
@ -82,6 +84,115 @@ struct SchedulerNodeInfo
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* Node of hierarchy for scheduling requests for resource. Base class for all
|
||||
* kinds of scheduling elements (queues, policies, constraints and schedulers).
|
||||
*
|
||||
* Root node is a scheduler, which has it's thread to dequeue requests,
|
||||
* execute requests (see ResourceRequest) and process events in a thread-safe manner.
|
||||
* Immediate children of the scheduler represent independent resources.
|
||||
* Each resource has it's own hierarchy to achieve required scheduling policies.
|
||||
* Non-leaf nodes do not hold requests, but keep scheduling state
|
||||
* (e.g. consumption history, amount of in-flight requests, etc).
|
||||
* Leafs of hierarchy are queues capable of holding pending requests.
|
||||
*
|
||||
* scheduler (SchedulerRoot)
|
||||
* / \
|
||||
* constraint constraint (SemaphoreConstraint)
|
||||
* | |
|
||||
* policy policy (PriorityPolicy)
|
||||
* / \ / \
|
||||
* q1 q2 q3 q4 (FifoQueue)
|
||||
*
|
||||
* Dequeueing request from an inner node will dequeue request from one of active leaf-queues in its subtree.
|
||||
* Node is considered to be active iff:
|
||||
* - it has at least one pending request in one of leaves of it's subtree;
|
||||
* - and enforced constraints, if any, are satisfied
|
||||
* (e.g. amount of concurrent requests is not greater than some number).
|
||||
*
|
||||
* All methods must be called only from scheduler thread for thread-safety.
|
||||
*/
|
||||
class ISchedulerNode : public boost::intrusive::list_base_hook<>, private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
explicit ISchedulerNode(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
|
||||
: event_queue(event_queue_)
|
||||
, info(config, config_prefix)
|
||||
{}
|
||||
|
||||
virtual ~ISchedulerNode() = default;
|
||||
|
||||
/// Checks if two nodes configuration is equal
|
||||
virtual bool equals(ISchedulerNode * other)
|
||||
{
|
||||
return info.equals(other->info);
|
||||
}
|
||||
|
||||
/// Attach new child
|
||||
virtual void attachChild(const std::shared_ptr<ISchedulerNode> & child) = 0;
|
||||
|
||||
/// Detach and destroy child
|
||||
virtual void removeChild(ISchedulerNode * child) = 0;
|
||||
|
||||
/// Get attached child by name
|
||||
virtual ISchedulerNode * getChild(const String & child_name) = 0;
|
||||
|
||||
/// Activation of child due to the first pending request
|
||||
/// Should be called on leaf node (i.e. queue) to propagate activation signal through chain to the root
|
||||
virtual void activateChild(ISchedulerNode * child) = 0;
|
||||
|
||||
/// Returns true iff node is active
|
||||
virtual bool isActive() = 0;
|
||||
|
||||
/// Returns number of active children
|
||||
virtual size_t activeChildren() = 0;
|
||||
|
||||
/// Returns the first request to be executed as the first component of resulting pair.
|
||||
/// The second pair component is `true` iff node is still active after dequeueing.
|
||||
virtual std::pair<ResourceRequest *, bool> dequeueRequest() = 0;
|
||||
|
||||
/// Returns full path string using names of every parent
|
||||
String getPath()
|
||||
{
|
||||
String result;
|
||||
ISchedulerNode * ptr = this;
|
||||
while (ptr->parent)
|
||||
{
|
||||
result = "/" + ptr->basename + result;
|
||||
ptr = ptr->parent;
|
||||
}
|
||||
return result.empty() ? "/" : result;
|
||||
}
|
||||
|
||||
/// Attach to a parent (used by attachChild)
|
||||
virtual void setParent(ISchedulerNode * parent_)
|
||||
{
|
||||
parent = parent_;
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Notify parents about the first pending request or constraint becoming satisfied.
|
||||
/// Postponed to be handled in scheduler thread, so it is intended to be called from outside.
|
||||
void scheduleActivation();
|
||||
|
||||
public:
|
||||
EventQueue * const event_queue;
|
||||
String basename;
|
||||
SchedulerNodeInfo info;
|
||||
ISchedulerNode * parent = nullptr;
|
||||
EventId activation_event_id = 0; // Valid for `ISchedulerNode` placed in EventQueue::activations
|
||||
|
||||
/// Introspection
|
||||
std::atomic<UInt64> dequeued_requests{0};
|
||||
std::atomic<UInt64> canceled_requests{0};
|
||||
std::atomic<ResourceCost> dequeued_cost{0};
|
||||
std::atomic<ResourceCost> canceled_cost{0};
|
||||
std::atomic<UInt64> busy_periods{0};
|
||||
};
|
||||
|
||||
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;
|
||||
|
||||
/*
|
||||
* Simple waitable thread-safe FIFO task queue.
|
||||
* Intended to hold postponed events for later handling (usually by scheduler thread).
|
||||
@ -89,57 +200,70 @@ struct SchedulerNodeInfo
|
||||
class EventQueue
|
||||
{
|
||||
public:
|
||||
using Event = std::function<void()>;
|
||||
using Task = std::function<void()>;
|
||||
|
||||
static constexpr EventId not_postponed = 0;
|
||||
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
using Duration = std::chrono::system_clock::duration;
|
||||
static constexpr UInt64 not_postponed = 0;
|
||||
|
||||
struct Event
|
||||
{
|
||||
const EventId event_id;
|
||||
Task task;
|
||||
|
||||
Event(EventId event_id_, Task && task_)
|
||||
: event_id(event_id_)
|
||||
, task(std::move(task_))
|
||||
{}
|
||||
};
|
||||
|
||||
struct Postponed
|
||||
{
|
||||
TimePoint key;
|
||||
UInt64 id; // for canceling
|
||||
std::unique_ptr<Event> event;
|
||||
EventId event_id; // for canceling
|
||||
std::unique_ptr<Task> task;
|
||||
|
||||
Postponed(TimePoint key_, UInt64 id_, Event && event_)
|
||||
Postponed(TimePoint key_, EventId event_id_, Task && task_)
|
||||
: key(key_)
|
||||
, id(id_)
|
||||
, event(std::make_unique<Event>(std::move(event_)))
|
||||
, event_id(event_id_)
|
||||
, task(std::make_unique<Task>(std::move(task_)))
|
||||
{}
|
||||
|
||||
bool operator<(const Postponed & rhs) const
|
||||
{
|
||||
return std::tie(key, id) > std::tie(rhs.key, rhs.id); // reversed for min-heap
|
||||
return std::tie(key, event_id) > std::tie(rhs.key, rhs.event_id); // reversed for min-heap
|
||||
}
|
||||
};
|
||||
|
||||
/// Add an `event` to be processed after `until` time point.
|
||||
/// Returns a unique id for canceling.
|
||||
[[nodiscard]] UInt64 postpone(TimePoint until, Event && event)
|
||||
/// Returns a unique event id for canceling.
|
||||
[[nodiscard]] EventId postpone(TimePoint until, Task && task)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
if (postponed.empty() || until < postponed.front().key)
|
||||
pending.notify_one();
|
||||
auto id = ++last_id;
|
||||
postponed.emplace_back(until, id, std::move(event));
|
||||
auto event_id = ++last_event_id;
|
||||
postponed.emplace_back(until, event_id, std::move(task));
|
||||
std::push_heap(postponed.begin(), postponed.end());
|
||||
return id;
|
||||
return event_id;
|
||||
}
|
||||
|
||||
/// Cancel a postponed event using its unique id.
|
||||
/// NOTE: Only postponed events can be canceled.
|
||||
/// NOTE: If you need to cancel enqueued event, consider doing your actions inside another enqueued
|
||||
/// NOTE: event instead. This ensures that all previous events are processed.
|
||||
bool cancelPostponed(UInt64 postponed_id)
|
||||
bool cancelPostponed(EventId postponed_event_id)
|
||||
{
|
||||
if (postponed_id == not_postponed)
|
||||
if (postponed_event_id == not_postponed)
|
||||
return false;
|
||||
std::unique_lock lock{mutex};
|
||||
for (auto i = postponed.begin(), e = postponed.end(); i != e; ++i)
|
||||
{
|
||||
if (i->id == postponed_id)
|
||||
if (i->event_id == postponed_event_id)
|
||||
{
|
||||
postponed.erase(i);
|
||||
// It is O(n), but we do not expect either big heaps or frequent cancels. So it is fine.
|
||||
// It is O(n), but we do not expect neither big heaps nor frequent cancels. So it is fine.
|
||||
std::make_heap(postponed.begin(), postponed.end());
|
||||
return true;
|
||||
}
|
||||
@ -148,11 +272,23 @@ public:
|
||||
}
|
||||
|
||||
/// Add an `event` for immediate processing
|
||||
void enqueue(Event && event)
|
||||
void enqueue(Task && task)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
bool was_empty = queue.empty();
|
||||
queue.emplace_back(event);
|
||||
bool was_empty = events.empty() && activations.empty();
|
||||
auto event_id = ++last_event_id;
|
||||
events.emplace_back(event_id, std::move(task));
|
||||
if (was_empty)
|
||||
pending.notify_one();
|
||||
}
|
||||
|
||||
/// Add an activation `event` for immediate processing. Activations use a separate queue for performance reasons.
|
||||
void enqueueActivation(ISchedulerNode * node)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
bool was_empty = events.empty() && activations.empty();
|
||||
node->activation_event_id = ++last_event_id;
|
||||
activations.push_back(*node);
|
||||
if (was_empty)
|
||||
pending.notify_one();
|
||||
}
|
||||
@ -163,7 +299,7 @@ public:
|
||||
bool forceProcess()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
if (!queue.empty())
|
||||
if (!events.empty() || !activations.empty())
|
||||
{
|
||||
processQueue(std::move(lock));
|
||||
return true;
|
||||
@ -181,7 +317,7 @@ public:
|
||||
bool tryProcess()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
if (!queue.empty())
|
||||
if (!events.empty() || !activations.empty())
|
||||
{
|
||||
processQueue(std::move(lock));
|
||||
return true;
|
||||
@ -205,7 +341,7 @@ public:
|
||||
std::unique_lock lock{mutex};
|
||||
while (true)
|
||||
{
|
||||
if (!queue.empty())
|
||||
if (!events.empty() || !activations.empty())
|
||||
{
|
||||
processQueue(std::move(lock));
|
||||
return;
|
||||
@ -269,141 +405,63 @@ private:
|
||||
|
||||
void processQueue(std::unique_lock<std::mutex> && lock)
|
||||
{
|
||||
Event event = std::move(queue.front());
|
||||
queue.pop_front();
|
||||
if (events.empty())
|
||||
return processActivation(std::move(lock));
|
||||
if (activations.empty())
|
||||
return processEvent(std::move(lock));
|
||||
if (activations.front().activation_event_id < events.front().event_id)
|
||||
return processActivation(std::move(lock));
|
||||
else
|
||||
return processEvent(std::move(lock));
|
||||
}
|
||||
|
||||
void processActivation(std::unique_lock<std::mutex> && lock)
|
||||
{
|
||||
ISchedulerNode * node = &activations.front();
|
||||
activations.pop_front();
|
||||
node->activation_event_id = 0;
|
||||
lock.unlock(); // do not hold queue mutex while processing events
|
||||
event();
|
||||
node->parent->activateChild(node);
|
||||
}
|
||||
|
||||
void processEvent(std::unique_lock<std::mutex> && lock)
|
||||
{
|
||||
Task task = std::move(events.front().task);
|
||||
events.pop_front();
|
||||
lock.unlock(); // do not hold queue mutex while processing events
|
||||
task();
|
||||
}
|
||||
|
||||
void processPostponed(std::unique_lock<std::mutex> && lock)
|
||||
{
|
||||
Event event = std::move(*postponed.front().event);
|
||||
Task task = std::move(*postponed.front().task);
|
||||
std::pop_heap(postponed.begin(), postponed.end());
|
||||
postponed.pop_back();
|
||||
lock.unlock(); // do not hold queue mutex while processing events
|
||||
event();
|
||||
task();
|
||||
}
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable pending;
|
||||
std::deque<Event> queue;
|
||||
|
||||
// `events` and `activations` logically represent one ordered queue. To preserve the common order we use `EventId`
|
||||
// Activations are stored in a separate queue for performance reasons (mostly to avoid any allocations)
|
||||
std::deque<Event> events;
|
||||
boost::intrusive::list<ISchedulerNode> activations;
|
||||
|
||||
std::vector<Postponed> postponed;
|
||||
UInt64 last_id = 0;
|
||||
EventId last_event_id = 0;
|
||||
|
||||
std::atomic<TimePoint> manual_time{TimePoint()}; // for tests only
|
||||
};
|
||||
|
||||
/*
|
||||
* Node of hierarchy for scheduling requests for resource. Base class for all
|
||||
* kinds of scheduling elements (queues, policies, constraints and schedulers).
|
||||
*
|
||||
* Root node is a scheduler, which has it's thread to dequeue requests,
|
||||
* execute requests (see ResourceRequest) and process events in a thread-safe manner.
|
||||
* Immediate children of the scheduler represent independent resources.
|
||||
* Each resource has it's own hierarchy to achieve required scheduling policies.
|
||||
* Non-leaf nodes do not hold requests, but keep scheduling state
|
||||
* (e.g. consumption history, amount of in-flight requests, etc).
|
||||
* Leafs of hierarchy are queues capable of holding pending requests.
|
||||
*
|
||||
* scheduler (SchedulerRoot)
|
||||
* / \
|
||||
* constraint constraint (SemaphoreConstraint)
|
||||
* | |
|
||||
* policy policy (PriorityPolicy)
|
||||
* / \ / \
|
||||
* q1 q2 q3 q4 (FifoQueue)
|
||||
*
|
||||
* Dequeueing request from an inner node will dequeue request from one of active leaf-queues in its subtree.
|
||||
* Node is considered to be active iff:
|
||||
* - it has at least one pending request in one of leaves of it's subtree;
|
||||
* - and enforced constraints, if any, are satisfied
|
||||
* (e.g. amount of concurrent requests is not greater than some number).
|
||||
*
|
||||
* All methods must be called only from scheduler thread for thread-safety.
|
||||
*/
|
||||
class ISchedulerNode : private boost::noncopyable
|
||||
inline void ISchedulerNode::scheduleActivation()
|
||||
{
|
||||
public:
|
||||
explicit ISchedulerNode(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
|
||||
: event_queue(event_queue_)
|
||||
, info(config, config_prefix)
|
||||
{}
|
||||
|
||||
virtual ~ISchedulerNode() = default;
|
||||
|
||||
/// Checks if two nodes configuration is equal
|
||||
virtual bool equals(ISchedulerNode * other)
|
||||
if (likely(parent))
|
||||
{
|
||||
return info.equals(other->info);
|
||||
// The same as `enqueue([this] { parent->activateChild(this); });` but faster
|
||||
event_queue->enqueueActivation(this);
|
||||
}
|
||||
|
||||
/// Attach new child
|
||||
virtual void attachChild(const std::shared_ptr<ISchedulerNode> & child) = 0;
|
||||
|
||||
/// Detach and destroy child
|
||||
virtual void removeChild(ISchedulerNode * child) = 0;
|
||||
|
||||
/// Get attached child by name
|
||||
virtual ISchedulerNode * getChild(const String & child_name) = 0;
|
||||
|
||||
/// Activation of child due to the first pending request
|
||||
/// Should be called on leaf node (i.e. queue) to propagate activation signal through chain to the root
|
||||
virtual void activateChild(ISchedulerNode * child) = 0;
|
||||
|
||||
/// Returns true iff node is active
|
||||
virtual bool isActive() = 0;
|
||||
|
||||
/// Returns number of active children
|
||||
virtual size_t activeChildren() = 0;
|
||||
|
||||
/// Returns the first request to be executed as the first component of resulting pair.
|
||||
/// The second pair component is `true` iff node is still active after dequeueing.
|
||||
virtual std::pair<ResourceRequest *, bool> dequeueRequest() = 0;
|
||||
|
||||
/// Returns full path string using names of every parent
|
||||
String getPath()
|
||||
{
|
||||
String result;
|
||||
ISchedulerNode * ptr = this;
|
||||
while (ptr->parent)
|
||||
{
|
||||
result = "/" + ptr->basename + result;
|
||||
ptr = ptr->parent;
|
||||
}
|
||||
return result.empty() ? "/" : result;
|
||||
}
|
||||
|
||||
/// Attach to a parent (used by attachChild)
|
||||
virtual void setParent(ISchedulerNode * parent_)
|
||||
{
|
||||
parent = parent_;
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Notify parents about the first pending request or constraint becoming satisfied.
|
||||
/// Postponed to be handled in scheduler thread, so it is intended to be called from outside.
|
||||
void scheduleActivation()
|
||||
{
|
||||
if (likely(parent))
|
||||
{
|
||||
event_queue->enqueue([this] { parent->activateChild(this); });
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
EventQueue * const event_queue;
|
||||
String basename;
|
||||
SchedulerNodeInfo info;
|
||||
ISchedulerNode * parent = nullptr;
|
||||
|
||||
/// Introspection
|
||||
std::atomic<UInt64> dequeued_requests{0};
|
||||
std::atomic<UInt64> canceled_requests{0};
|
||||
std::atomic<ResourceCost> dequeued_cost{0};
|
||||
std::atomic<ResourceCost> canceled_cost{0};
|
||||
std::atomic<UInt64> busy_periods{0};
|
||||
};
|
||||
|
||||
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;
|
||||
}
|
||||
|
||||
}
|
||||
|
143
src/Common/Scheduler/Nodes/tests/gtest_event_queue.cpp
Normal file
143
src/Common/Scheduler/Nodes/tests/gtest_event_queue.cpp
Normal file
@ -0,0 +1,143 @@
|
||||
#include <chrono>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Common/Scheduler/ISchedulerNode.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
class FakeSchedulerNode : public ISchedulerNode
|
||||
{
|
||||
public:
|
||||
explicit FakeSchedulerNode(String & log_, EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
|
||||
: ISchedulerNode(event_queue_, config, config_prefix)
|
||||
, log(log_)
|
||||
{}
|
||||
|
||||
void attachChild(const SchedulerNodePtr & child) override
|
||||
{
|
||||
log += " +" + child->basename;
|
||||
}
|
||||
|
||||
void removeChild(ISchedulerNode * child) override
|
||||
{
|
||||
log += " -" + child->basename;
|
||||
}
|
||||
|
||||
ISchedulerNode * getChild(const String & /* child_name */) override
|
||||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void activateChild(ISchedulerNode * child) override
|
||||
{
|
||||
log += " A" + child->basename;
|
||||
}
|
||||
|
||||
bool isActive() override
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t activeChildren() override
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::pair<ResourceRequest *, bool> dequeueRequest() override
|
||||
{
|
||||
log += " D";
|
||||
return {nullptr, false};
|
||||
}
|
||||
|
||||
private:
|
||||
String & log;
|
||||
};
|
||||
|
||||
struct QueueTest {
|
||||
String log;
|
||||
EventQueue event_queue;
|
||||
FakeSchedulerNode root_node;
|
||||
|
||||
QueueTest()
|
||||
: root_node(log, &event_queue)
|
||||
{}
|
||||
|
||||
SchedulerNodePtr makeNode(const String & name)
|
||||
{
|
||||
auto node = std::make_shared<FakeSchedulerNode>(log, &event_queue);
|
||||
node->basename = name;
|
||||
node->setParent(&root_node);
|
||||
return std::static_pointer_cast<ISchedulerNode>(node);
|
||||
}
|
||||
|
||||
void process(EventQueue::TimePoint now, const String & expected_log, size_t limit = size_t(-1))
|
||||
{
|
||||
event_queue.setManualTime(now);
|
||||
for (;limit > 0; limit--)
|
||||
{
|
||||
if (!event_queue.tryProcess())
|
||||
break;
|
||||
}
|
||||
EXPECT_EQ(log, expected_log);
|
||||
log.clear();
|
||||
}
|
||||
|
||||
void activate(const SchedulerNodePtr & node)
|
||||
{
|
||||
event_queue.enqueueActivation(node.get());
|
||||
}
|
||||
|
||||
void event(const String & text)
|
||||
{
|
||||
event_queue.enqueue([this, text] { log += " " + text; });
|
||||
}
|
||||
|
||||
EventId postpone(EventQueue::TimePoint until, const String & text)
|
||||
{
|
||||
return event_queue.postpone(until, [this, text] { log += " " + text; });
|
||||
}
|
||||
|
||||
void cancel(EventId event_id)
|
||||
{
|
||||
event_queue.cancelPostponed(event_id);
|
||||
}
|
||||
};
|
||||
|
||||
TEST(SchedulerEventQueue, Smoke)
|
||||
{
|
||||
QueueTest t;
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
EventQueue::TimePoint start = std::chrono::system_clock::now();
|
||||
t.process(start, "", 0);
|
||||
|
||||
// Activations
|
||||
auto node1 = t.makeNode("1");
|
||||
auto node2 = t.makeNode("2");
|
||||
t.activate(node2);
|
||||
t.activate(node1);
|
||||
t.process(start + 42s, " A2 A1");
|
||||
|
||||
// Events
|
||||
t.event("E1");
|
||||
t.event("E2");
|
||||
t.process(start + 100s, " E1 E2");
|
||||
|
||||
// Postponed events
|
||||
t.postpone(start + 200s, "P200");
|
||||
auto p190 = t.postpone(start + 200s, "P190");
|
||||
t.postpone(start + 150s, "P150");
|
||||
t.postpone(start + 175s, "P175");
|
||||
t.process(start + 180s, " P150 P175");
|
||||
t.event("E3");
|
||||
t.cancel(p190);
|
||||
t.process(start + 300s, " E3 P200");
|
||||
|
||||
// Ordering of events and activations
|
||||
t.event("E1");
|
||||
t.activate(node1);
|
||||
t.event("E2");
|
||||
t.activate(node2);
|
||||
t.process(start + 300s, " E1 A1 E2 A2");
|
||||
}
|
@ -5,8 +5,6 @@
|
||||
|
||||
#include <Common/Scheduler/Nodes/FairPolicy.h>
|
||||
#include <Common/Scheduler/Nodes/ThrottlerConstraint.h>
|
||||
#include "Common/Scheduler/ISchedulerNode.h"
|
||||
#include "Common/Scheduler/ResourceRequest.h"
|
||||
|
||||
using namespace DB;
|
||||
|
||||
|
@ -535,7 +535,7 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
const static std::string_view needle = "<Error>";
|
||||
if (auto it = std::search(response_string.begin(), response_string.end(), std::default_searcher(needle.begin(), needle.end())); it != response_string.end())
|
||||
{
|
||||
LOG_WARNING(log, "Response for request contain <Error> tag in body, settings internal server error (500 code)");
|
||||
LOG_WARNING(log, "Response for the request contains an <Error> tag in the body, will treat it as an internal server error (code 500)");
|
||||
response->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
addMetric(request, S3MetricType::Errors);
|
||||
|
@ -2179,7 +2179,7 @@ public:
|
||||
|
||||
bool parse(IParser::Pos & pos, Expected & expected, Action & /*action*/) override
|
||||
{
|
||||
/// kql(table|project ...)
|
||||
/// kql('table|project ...')
|
||||
/// 0. Parse the kql query
|
||||
/// 1. Parse closing token
|
||||
if (state == 0)
|
||||
|
@ -853,7 +853,7 @@ Please note that the functions listed below only take constant parameters for no
|
||||
## KQL() function
|
||||
|
||||
- create table
|
||||
`CREATE TABLE kql_table4 ENGINE = Memory AS select *, now() as new_column From kql(Customers | project LastName,Age);`
|
||||
`CREATE TABLE kql_table4 ENGINE = Memory AS select *, now() as new_column From kql($$Customers | project LastName,Age$$);`
|
||||
verify the content of `kql_table`
|
||||
`select * from kql_table`
|
||||
|
||||
@ -867,12 +867,12 @@ Please note that the functions listed below only take constant parameters for no
|
||||
Age Nullable(UInt8)
|
||||
) ENGINE = Memory;
|
||||
```
|
||||
`INSERT INTO temp select * from kql(Customers|project FirstName,LastName,Age);`
|
||||
`INSERT INTO temp select * from kql($$Customers|project FirstName,LastName,Age$$);`
|
||||
verify the content of `temp`
|
||||
`select * from temp`
|
||||
|
||||
- Select from kql()
|
||||
`Select * from kql(Customers|project FirstName)`
|
||||
- Select from kql(...)
|
||||
`Select * from kql($$Customers|project FirstName$$)`
|
||||
|
||||
## KQL operators:
|
||||
- Tabular expression statements
|
||||
@ -993,4 +993,3 @@ Please note that the functions listed below only take constant parameters for no
|
||||
- dcount()
|
||||
- dcountif()
|
||||
- bin
|
||||
|
@ -301,8 +301,8 @@ String IParserKQLFunction::kqlCallToExpression(
|
||||
});
|
||||
|
||||
const auto kql_call = std::format("{}({})", function_name, params_str);
|
||||
DB::Tokens call_tokens(kql_call.c_str(), kql_call.c_str() + kql_call.length());
|
||||
DB::IParser::Pos tokens_pos(call_tokens, max_depth, max_backtracks);
|
||||
Tokens call_tokens(kql_call.data(), kql_call.data() + kql_call.length(), 0, true);
|
||||
IParser::Pos tokens_pos(call_tokens, max_depth, max_backtracks);
|
||||
return DB::IParserKQLFunction::getExpression(tokens_pos);
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,7 @@ bool ParserKQLDistinct::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
|
||||
expr = getExprFromToken(pos);
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(false).parse(new_pos, select_expression_list, expected))
|
||||
|
@ -22,7 +22,7 @@ bool ParserKQLExtend ::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
|
||||
String except_str;
|
||||
String new_extend_str;
|
||||
Tokens ntokens(extend_expr.c_str(), extend_expr.c_str() + extend_expr.size());
|
||||
Tokens ntokens(extend_expr.data(), extend_expr.data() + extend_expr.size(), 0, true);
|
||||
IParser::Pos npos(ntokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
String alias;
|
||||
@ -76,7 +76,7 @@ bool ParserKQLExtend ::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
apply_alias();
|
||||
|
||||
String expr = std::format("SELECT * {}, {} from prev", except_str, new_extend_str);
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserSelectQuery().parse(new_pos, select_query, expected))
|
||||
|
@ -13,7 +13,7 @@ bool ParserKQLFilter::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
String expr = getExprFromToken(pos);
|
||||
ASTPtr where_expression;
|
||||
|
||||
Tokens token_filter(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens token_filter(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos pos_filter(token_filter, pos.max_depth, pos.max_backtracks);
|
||||
if (!ParserExpressionWithOptionalAlias(false).parse(pos_filter, where_expression, expected))
|
||||
return false;
|
||||
|
@ -13,7 +13,7 @@ bool ParserKQLLimit::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
|
||||
auto expr = getExprFromToken(pos);
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserExpressionWithOptionalAlias(false).parse(new_pos, limit_length, expected))
|
||||
|
@ -298,7 +298,7 @@ bool ParserKQLMVExpand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
return false;
|
||||
|
||||
const String setting_str = "enable_unaligned_array_join = 1";
|
||||
Tokens token_settings(setting_str.c_str(), setting_str.c_str() + setting_str.size());
|
||||
Tokens token_settings(setting_str.data(), setting_str.data() + setting_str.size(), 0, true);
|
||||
IParser::Pos pos_settings(token_settings, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserSetQuery(true).parse(pos_settings, setting, expected))
|
||||
|
@ -173,7 +173,7 @@ bool ParserKQLMakeSeries ::makeSeries(KQLMakeSeries & kql_make_series, ASTPtr &
|
||||
|
||||
auto date_type_cast = [&](String & src)
|
||||
{
|
||||
Tokens tokens(src.c_str(), src.c_str() + src.size());
|
||||
Tokens tokens(src.data(), src.data() + src.size(), 0, true);
|
||||
IParser::Pos pos(tokens, max_depth, max_backtracks);
|
||||
String res;
|
||||
while (isValidKQLPos(pos))
|
||||
@ -200,7 +200,7 @@ bool ParserKQLMakeSeries ::makeSeries(KQLMakeSeries & kql_make_series, ASTPtr &
|
||||
auto get_group_expression_alias = [&]
|
||||
{
|
||||
std::vector<String> group_expression_tokens;
|
||||
Tokens tokens(group_expression.c_str(), group_expression.c_str() + group_expression.size());
|
||||
Tokens tokens(group_expression.data(), group_expression.data() + group_expression.size(), 0, true);
|
||||
IParser::Pos pos(tokens, max_depth, max_backtracks);
|
||||
while (isValidKQLPos(pos))
|
||||
{
|
||||
@ -413,7 +413,7 @@ bool ParserKQLMakeSeries ::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
|
||||
|
||||
makeSeries(kql_make_series, node, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
Tokens token_main_query(kql_make_series.main_query.c_str(), kql_make_series.main_query.c_str() + kql_make_series.main_query.size());
|
||||
Tokens token_main_query(kql_make_series.main_query.data(), kql_make_series.main_query.data() + kql_make_series.main_query.size(), 0, true);
|
||||
IParser::Pos pos_main_query(token_main_query, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(true).parse(pos_main_query, select_expression_list, expected))
|
||||
|
@ -1,20 +1,26 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/CommonParsers.h>
|
||||
#include <Parsers/Kusto/KustoFunctions/IParserKQLFunction.h>
|
||||
#include <Parsers/Kusto/KustoFunctions/KQLFunctionFactory.h>
|
||||
#include <Parsers/Kusto/ParserKQLOperators.h>
|
||||
#include <Parsers/Kusto/ParserKQLQuery.h>
|
||||
#include <Parsers/Kusto/ParserKQLStatement.h>
|
||||
#include <Parsers/Kusto/Utilities.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include "KustoFunctions/IParserKQLFunction.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum class KQLOperatorValue : uint16_t
|
||||
enum class KQLOperatorValue
|
||||
{
|
||||
none,
|
||||
between,
|
||||
@ -56,7 +62,8 @@ enum class KQLOperatorValue : uint16_t
|
||||
not_startswith_cs,
|
||||
};
|
||||
|
||||
const std::unordered_map<String, KQLOperatorValue> KQLOperator = {
|
||||
const std::unordered_map<String, KQLOperatorValue> KQLOperator =
|
||||
{
|
||||
{"between", KQLOperatorValue::between},
|
||||
{"!between", KQLOperatorValue::not_between},
|
||||
{"contains", KQLOperatorValue::contains},
|
||||
@ -96,44 +103,37 @@ const std::unordered_map<String, KQLOperatorValue> KQLOperator = {
|
||||
{"!startswith_cs", KQLOperatorValue::not_startswith_cs},
|
||||
};
|
||||
|
||||
void rebuildSubqueryForInOperator(DB::ASTPtr & node, bool useLowerCase)
|
||||
void rebuildSubqueryForInOperator(ASTPtr & node, bool useLowerCase)
|
||||
{
|
||||
//A sub-query for in operator in kql can have multiple columns, but only takes the first column.
|
||||
//A sub-query for in operator in ClickHouse can not have multiple columns
|
||||
//So only take the first column if there are multiple columns.
|
||||
//select * not working for subquery. (a tabular statement without project)
|
||||
|
||||
const auto selectColumns = node->children[0]->children[0]->as<DB::ASTSelectQuery>()->select();
|
||||
const auto selectColumns = node->children[0]->children[0]->as<ASTSelectQuery>()->select();
|
||||
while (selectColumns->children.size() > 1)
|
||||
selectColumns->children.pop_back();
|
||||
|
||||
if (useLowerCase)
|
||||
{
|
||||
auto args = std::make_shared<DB::ASTExpressionList>();
|
||||
auto args = std::make_shared<ASTExpressionList>();
|
||||
args->children.push_back(selectColumns->children[0]);
|
||||
auto func_lower = std::make_shared<DB::ASTFunction>();
|
||||
auto func_lower = std::make_shared<ASTFunction>();
|
||||
func_lower->name = "lower";
|
||||
func_lower->children.push_back(selectColumns->children[0]);
|
||||
func_lower->arguments = args;
|
||||
if (selectColumns->children[0]->as<DB::ASTIdentifier>())
|
||||
func_lower->alias = std::move(selectColumns->children[0]->as<DB::ASTIdentifier>()->alias);
|
||||
else if (selectColumns->children[0]->as<DB::ASTFunction>())
|
||||
func_lower->alias = std::move(selectColumns->children[0]->as<DB::ASTFunction>()->alias);
|
||||
if (selectColumns->children[0]->as<ASTIdentifier>())
|
||||
func_lower->alias = std::move(selectColumns->children[0]->as<ASTIdentifier>()->alias);
|
||||
else if (selectColumns->children[0]->as<ASTFunction>())
|
||||
func_lower->alias = std::move(selectColumns->children[0]->as<ASTFunction>()->alias);
|
||||
|
||||
auto funcs = std::make_shared<DB::ASTExpressionList>();
|
||||
auto funcs = std::make_shared<ASTExpressionList>();
|
||||
funcs->children.push_back(func_lower);
|
||||
selectColumns->children[0] = std::move(funcs);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SYNTAX_ERROR;
|
||||
}
|
||||
|
||||
String KQLOperators::genHasAnyAllOpExpr(std::vector<String> & tokens, IParser::Pos & token_pos, String kql_op, String ch_op)
|
||||
{
|
||||
@ -166,7 +166,7 @@ String KQLOperators::genHasAnyAllOpExpr(std::vector<String> & tokens, IParser::P
|
||||
return new_expr;
|
||||
}
|
||||
|
||||
String genEqOpExprCis(std::vector<String> & tokens, DB::IParser::Pos & token_pos, const String & ch_op)
|
||||
String genEqOpExprCis(std::vector<String> & tokens, IParser::Pos & token_pos, const String & ch_op)
|
||||
{
|
||||
String tmp_arg(token_pos->begin, token_pos->end);
|
||||
|
||||
@ -178,30 +178,30 @@ String genEqOpExprCis(std::vector<String> & tokens, DB::IParser::Pos & token_pos
|
||||
new_expr += ch_op + " ";
|
||||
++token_pos;
|
||||
|
||||
if (token_pos->type == DB::TokenType::StringLiteral || token_pos->type == DB::TokenType::QuotedIdentifier)
|
||||
new_expr += "lower('" + DB::IParserKQLFunction::escapeSingleQuotes(String(token_pos->begin + 1, token_pos->end - 1)) + "')";
|
||||
if (token_pos->type == TokenType::StringLiteral || token_pos->type == TokenType::QuotedIdentifier)
|
||||
new_expr += "lower('" + IParserKQLFunction::escapeSingleQuotes(String(token_pos->begin + 1, token_pos->end - 1)) + "')";
|
||||
else
|
||||
new_expr += "lower(" + DB::IParserKQLFunction::getExpression(token_pos) + ")";
|
||||
new_expr += "lower(" + IParserKQLFunction::getExpression(token_pos) + ")";
|
||||
|
||||
tokens.pop_back();
|
||||
return new_expr;
|
||||
}
|
||||
|
||||
String genInOpExprCis(std::vector<String> & tokens, DB::IParser::Pos & token_pos, const String & kql_op, const String & ch_op)
|
||||
String genInOpExprCis(std::vector<String> & tokens, IParser::Pos & token_pos, const String & kql_op, const String & ch_op)
|
||||
{
|
||||
DB::ParserKQLTableFunction kqlfun_p;
|
||||
DB::ParserToken s_lparen(DB::TokenType::OpeningRoundBracket);
|
||||
ParserKQLTableFunction kqlfun_p;
|
||||
ParserToken s_lparen(TokenType::OpeningRoundBracket);
|
||||
|
||||
DB::ASTPtr select;
|
||||
DB::Expected expected;
|
||||
ASTPtr select;
|
||||
Expected expected;
|
||||
String new_expr;
|
||||
|
||||
++token_pos;
|
||||
if (!s_lparen.ignore(token_pos, expected))
|
||||
throw DB::Exception(DB::ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
|
||||
if (tokens.empty())
|
||||
throw DB::Exception(DB::ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
|
||||
new_expr = "lower(" + tokens.back() + ") ";
|
||||
tokens.pop_back();
|
||||
@ -218,39 +218,39 @@ String genInOpExprCis(std::vector<String> & tokens, DB::IParser::Pos & token_pos
|
||||
--token_pos;
|
||||
|
||||
new_expr += ch_op;
|
||||
while (isValidKQLPos(token_pos) && token_pos->type != DB::TokenType::PipeMark && token_pos->type != DB::TokenType::Semicolon)
|
||||
while (isValidKQLPos(token_pos) && token_pos->type != TokenType::PipeMark && token_pos->type != TokenType::Semicolon)
|
||||
{
|
||||
auto tmp_arg = String(token_pos->begin, token_pos->end);
|
||||
if (token_pos->type != DB::TokenType::Comma && token_pos->type != DB::TokenType::ClosingRoundBracket
|
||||
&& token_pos->type != DB::TokenType::OpeningRoundBracket && token_pos->type != DB::TokenType::OpeningSquareBracket
|
||||
&& token_pos->type != DB::TokenType::ClosingSquareBracket && tmp_arg != "~" && tmp_arg != "dynamic")
|
||||
if (token_pos->type != TokenType::Comma && token_pos->type != TokenType::ClosingRoundBracket
|
||||
&& token_pos->type != TokenType::OpeningRoundBracket && token_pos->type != TokenType::OpeningSquareBracket
|
||||
&& token_pos->type != TokenType::ClosingSquareBracket && tmp_arg != "~" && tmp_arg != "dynamic")
|
||||
{
|
||||
if (token_pos->type == DB::TokenType::StringLiteral || token_pos->type == DB::TokenType::QuotedIdentifier)
|
||||
new_expr += "lower('" + DB::IParserKQLFunction::escapeSingleQuotes(String(token_pos->begin + 1, token_pos->end - 1)) + "')";
|
||||
if (token_pos->type == TokenType::StringLiteral || token_pos->type == TokenType::QuotedIdentifier)
|
||||
new_expr += "lower('" + IParserKQLFunction::escapeSingleQuotes(String(token_pos->begin + 1, token_pos->end - 1)) + "')";
|
||||
else
|
||||
new_expr += "lower(" + tmp_arg + ")";
|
||||
}
|
||||
else if (tmp_arg != "~" && tmp_arg != "dynamic" && tmp_arg != "[" && tmp_arg != "]")
|
||||
new_expr += tmp_arg;
|
||||
|
||||
if (token_pos->type == DB::TokenType::ClosingRoundBracket)
|
||||
if (token_pos->type == TokenType::ClosingRoundBracket)
|
||||
break;
|
||||
++token_pos;
|
||||
}
|
||||
return new_expr;
|
||||
}
|
||||
|
||||
std::string genInOpExpr(DB::IParser::Pos & token_pos, const std::string & kql_op, const std::string & ch_op)
|
||||
std::string genInOpExpr(IParser::Pos & token_pos, const std::string & kql_op, const std::string & ch_op)
|
||||
{
|
||||
DB::ParserKQLTableFunction kqlfun_p;
|
||||
DB::ParserToken s_lparen(DB::TokenType::OpeningRoundBracket);
|
||||
ParserKQLTableFunction kqlfun_p;
|
||||
ParserToken s_lparen(TokenType::OpeningRoundBracket);
|
||||
|
||||
DB::ASTPtr select;
|
||||
DB::Expected expected;
|
||||
ASTPtr select;
|
||||
Expected expected;
|
||||
|
||||
++token_pos;
|
||||
if (!s_lparen.ignore(token_pos, expected))
|
||||
throw DB::Exception(DB::ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
throw Exception(ErrorCodes::SYNTAX_ERROR, "Syntax error near {}", kql_op);
|
||||
|
||||
auto pos = token_pos;
|
||||
if (kqlfun_p.parse(pos, select, expected))
|
||||
|
@ -9,7 +9,7 @@ bool ParserKQLPrint::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
ASTPtr select_expression_list;
|
||||
const String expr = getExprFromToken(pos);
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(true).parse(new_pos, select_expression_list, expected))
|
||||
|
@ -11,7 +11,7 @@ bool ParserKQLProject ::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
|
||||
expr = getExprFromToken(pos);
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(false).parse(new_pos, select_expression_list, expected))
|
||||
|
@ -37,7 +37,7 @@ bool ParserKQLBase::parseByString(String expr, ASTPtr & node, uint32_t max_depth
|
||||
{
|
||||
Expected expected;
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos pos(tokens, max_depth, max_backtracks);
|
||||
return parse(pos, node, expected);
|
||||
}
|
||||
@ -45,7 +45,7 @@ bool ParserKQLBase::parseByString(String expr, ASTPtr & node, uint32_t max_depth
|
||||
bool ParserKQLBase::parseSQLQueryByString(ParserPtr && parser, String & query, ASTPtr & select_node, uint32_t max_depth, uint32_t max_backtracks)
|
||||
{
|
||||
Expected expected;
|
||||
Tokens token_subquery(query.c_str(), query.c_str() + query.size());
|
||||
Tokens token_subquery(query.data(), query.data() + query.size(), 0, true);
|
||||
IParser::Pos pos_subquery(token_subquery, max_depth, max_backtracks);
|
||||
if (!parser->parse(pos_subquery, select_node, expected))
|
||||
return false;
|
||||
@ -123,7 +123,7 @@ bool ParserKQLBase::setSubQuerySource(ASTPtr & select_query, ASTPtr & source, bo
|
||||
|
||||
String ParserKQLBase::getExprFromToken(const String & text, uint32_t max_depth, uint32_t max_backtracks)
|
||||
{
|
||||
Tokens tokens(text.c_str(), text.c_str() + text.size());
|
||||
Tokens tokens(text.data(), text.data() + text.size(), 0, true);
|
||||
IParser::Pos pos(tokens, max_depth, max_backtracks);
|
||||
|
||||
return getExprFromToken(pos);
|
||||
@ -522,7 +522,7 @@ bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
--last_pos;
|
||||
|
||||
String sub_query = std::format("({})", String(operation_pos.front().second->begin, last_pos->end));
|
||||
Tokens token_subquery(sub_query.c_str(), sub_query.c_str() + sub_query.size());
|
||||
Tokens token_subquery(sub_query.data(), sub_query.data() + sub_query.size(), 0, true);
|
||||
IParser::Pos pos_subquery(token_subquery, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserKQLSubquery().parse(pos_subquery, tables, expected))
|
||||
@ -543,7 +543,7 @@ bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
auto oprator = getOperator(op_str);
|
||||
if (oprator)
|
||||
{
|
||||
Tokens token_clause(op_calsue.c_str(), op_calsue.c_str() + op_calsue.size());
|
||||
Tokens token_clause(op_calsue.data(), op_calsue.data() + op_calsue.size(), 0, true);
|
||||
IParser::Pos pos_clause(token_clause, pos.max_depth, pos.max_backtracks);
|
||||
if (!oprator->parse(pos_clause, node, expected))
|
||||
return false;
|
||||
@ -576,7 +576,7 @@ bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
if (!node->as<ASTSelectQuery>()->select())
|
||||
{
|
||||
auto expr = String("*");
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
if (!std::make_unique<ParserKQLProject>()->parse(new_pos, node, expected))
|
||||
return false;
|
||||
|
@ -18,7 +18,7 @@ bool ParserKQLSort::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
|
||||
auto expr = getExprFromToken(pos);
|
||||
|
||||
Tokens tokens(expr.c_str(), expr.c_str() + expr.size());
|
||||
Tokens tokens(expr.data(), expr.data() + expr.size(), 0, true);
|
||||
IParser::Pos new_pos(tokens, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
auto pos_backup = new_pos;
|
||||
|
@ -2,13 +2,13 @@
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/CommonParsers.h>
|
||||
#include <Parsers/IParserBase.h>
|
||||
#include <Parsers/Kusto/KustoFunctions/KQLFunctionFactory.h>
|
||||
#include <Parsers/Kusto/ParserKQLQuery.h>
|
||||
#include <Parsers/Kusto/ParserKQLStatement.h>
|
||||
#include <Parsers/Kusto/Utilities.h>
|
||||
#include <Parsers/ParserSetQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -63,6 +63,8 @@ bool ParserKQLWithUnionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
|
||||
|
||||
bool ParserKQLTableFunction::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
/// TODO: This code is idiotic, see https://github.com/ClickHouse/ClickHouse/issues/61742
|
||||
|
||||
ParserToken lparen(TokenType::OpeningRoundBracket);
|
||||
|
||||
ASTPtr string_literal;
|
||||
@ -101,13 +103,16 @@ bool ParserKQLTableFunction::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
|
||||
++pos;
|
||||
}
|
||||
|
||||
Tokens token_kql(kql_statement.data(), kql_statement.data() + kql_statement.size());
|
||||
IParser::Pos pos_kql(token_kql, pos.max_depth, pos.max_backtracks);
|
||||
Tokens tokens_kql(kql_statement.data(), kql_statement.data() + kql_statement.size(), 0, true);
|
||||
IParser::Pos pos_kql(tokens_kql, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
Expected kql_expected;
|
||||
kql_expected.enable_highlighting = false;
|
||||
if (!ParserKQLWithUnionQuery().parse(pos_kql, node, kql_expected))
|
||||
return false;
|
||||
|
||||
++pos;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ protected:
|
||||
class ParserKQLTableFunction : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "KQL() function"; }
|
||||
const char * getName() const override { return "KQL function"; }
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
|
@ -194,7 +194,7 @@ bool ParserKQLSummarize::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
|
||||
String converted_columns = getExprFromToken(expr_columns, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
Tokens token_converted_columns(converted_columns.c_str(), converted_columns.c_str() + converted_columns.size());
|
||||
Tokens token_converted_columns(converted_columns.data(), converted_columns.data() + converted_columns.size(), 0, true);
|
||||
IParser::Pos pos_converted_columns(token_converted_columns, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(true).parse(pos_converted_columns, select_expression_list, expected))
|
||||
@ -206,7 +206,7 @@ bool ParserKQLSummarize::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
|
||||
{
|
||||
String converted_groupby = getExprFromToken(expr_groupby, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
Tokens token_converted_groupby(converted_groupby.c_str(), converted_groupby.c_str() + converted_groupby.size());
|
||||
Tokens token_converted_groupby(converted_groupby.data(), converted_groupby.data() + converted_groupby.size(), 0, true);
|
||||
IParser::Pos postoken_converted_groupby(token_converted_groupby, pos.max_depth, pos.max_backtracks);
|
||||
|
||||
if (!ParserNotEmptyExpressionList(false).parse(postoken_converted_groupby, group_expression_list, expected))
|
||||
|
@ -21,6 +21,7 @@ class Tokens
|
||||
{
|
||||
private:
|
||||
std::vector<Token> data;
|
||||
size_t max_pos = 0;
|
||||
Lexer lexer;
|
||||
bool skip_insignificant;
|
||||
|
||||
@ -35,10 +36,16 @@ public:
|
||||
while (true)
|
||||
{
|
||||
if (index < data.size())
|
||||
{
|
||||
max_pos = std::max(max_pos, index);
|
||||
return data[index];
|
||||
}
|
||||
|
||||
if (!data.empty() && data.back().isEnd())
|
||||
{
|
||||
max_pos = data.size() - 1;
|
||||
return data.back();
|
||||
}
|
||||
|
||||
Token token = lexer.nextToken();
|
||||
|
||||
@ -51,7 +58,12 @@ public:
|
||||
{
|
||||
if (data.empty())
|
||||
return (*this)[0];
|
||||
return data.back();
|
||||
return data[max_pos];
|
||||
}
|
||||
|
||||
void reset()
|
||||
{
|
||||
max_pos = 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Parsers/ParserQuery.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/ASTExplainQuery.h>
|
||||
#include <Parsers/CommonParsers.h>
|
||||
#include <Parsers/Lexer.h>
|
||||
#include <Parsers/TokenIterator.h>
|
||||
#include <Common/StringUtils.h>
|
||||
@ -285,6 +286,33 @@ ASTPtr tryParseQuery(
|
||||
}
|
||||
|
||||
Expected expected;
|
||||
|
||||
/** A shortcut - if Lexer found invalid tokens, fail early without full parsing.
|
||||
* But there are certain cases when invalid tokens are permitted:
|
||||
* 1. INSERT queries can have arbitrary data after the FORMAT clause, that is parsed by a different parser.
|
||||
* 2. It can also be the case when there are multiple queries separated by semicolons, and the first queries are ok
|
||||
* while subsequent queries have syntax errors.
|
||||
*
|
||||
* This shortcut is needed to avoid complex backtracking in case of obviously erroneous queries.
|
||||
*/
|
||||
IParser::Pos lookahead(token_iterator);
|
||||
if (!ParserKeyword(Keyword::INSERT_INTO).ignore(lookahead))
|
||||
{
|
||||
while (lookahead->type != TokenType::Semicolon && lookahead->type != TokenType::EndOfStream)
|
||||
{
|
||||
if (lookahead->isError())
|
||||
{
|
||||
out_error_message = getLexicalErrorMessage(query_begin, all_queries_end, *lookahead, hilite, query_description);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
++lookahead;
|
||||
}
|
||||
|
||||
/// We should not spoil the info about maximum parsed position in the original iterator.
|
||||
tokens.reset();
|
||||
}
|
||||
|
||||
ASTPtr res;
|
||||
const bool parse_res = parser.parse(token_iterator, res, expected);
|
||||
const auto last_token = token_iterator.max();
|
||||
|
@ -737,10 +737,10 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
|
||||
{
|
||||
/// Don't scare people with broken part error
|
||||
if (!isRetryableException(std::current_exception()))
|
||||
LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath());
|
||||
LOG_ERROR(storage.log, "Part {} is broken and needs manual correction", getDataPartStorage().getFullPath());
|
||||
|
||||
// There could be conditions that data part to be loaded is broken, but some of meta infos are already written
|
||||
// into meta data before exception, need to clean them all.
|
||||
// into metadata before exception, need to clean them all.
|
||||
metadata_manager->deleteAll(/*include_projection*/ true);
|
||||
metadata_manager->assertAllDeleted(/*include_projection*/ true);
|
||||
throw;
|
||||
|
@ -12,7 +12,6 @@
|
||||
#include <Common/Scheduler/Nodes/ThrottlerConstraint.h>
|
||||
#include <Common/Scheduler/Nodes/FifoQueue.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include "Common/Scheduler/ResourceRequest.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -1,8 +1,8 @@
|
||||
DROP TABLE IF EXISTS Customers;
|
||||
CREATE TABLE Customers
|
||||
(
|
||||
(
|
||||
FirstName Nullable(String),
|
||||
LastName String,
|
||||
LastName String,
|
||||
Occupation String,
|
||||
Education String,
|
||||
Age Nullable(UInt8)
|
||||
@ -10,20 +10,20 @@ CREATE TABLE Customers
|
||||
|
||||
INSERT INTO Customers VALUES ('Theodore','Diaz','Skilled Manual','Bachelors',28),('Stephanie','Cox','Management abcd defg','Bachelors',33),('Peter','Nara','Skilled Manual','Graduate Degree',26),('Latoya','Shen','Professional','Graduate Degree',25),('Apple','','Skilled Manual','Bachelors',28),(NULL,'why','Professional','Partial College',38);
|
||||
Select '-- test create table --' ;
|
||||
Select * from kql(Customers|project FirstName) limit 1;;
|
||||
Select * from kql($$Customers|project FirstName$$) limit 1;;
|
||||
DROP TABLE IF EXISTS kql_table1;
|
||||
CREATE TABLE kql_table1 ENGINE = Memory AS select *, now() as new_column From kql(Customers | project LastName | filter LastName=='Diaz');
|
||||
CREATE TABLE kql_table1 ENGINE = Memory AS select *, now() as new_column From kql($$Customers | project LastName | filter LastName=='Diaz'$$);
|
||||
select LastName from kql_table1 limit 1;
|
||||
DROP TABLE IF EXISTS kql_table2;
|
||||
CREATE TABLE kql_table2
|
||||
(
|
||||
(
|
||||
FirstName Nullable(String),
|
||||
LastName String,
|
||||
LastName String,
|
||||
Age Nullable(UInt8)
|
||||
) ENGINE = Memory;
|
||||
INSERT INTO kql_table2 select * from kql(Customers|project FirstName,LastName,Age | filter FirstName=='Theodore');
|
||||
INSERT INTO kql_table2 select * from kql($$Customers|project FirstName,LastName,Age | filter FirstName=='Theodore'$$);
|
||||
select * from kql_table2 limit 1;
|
||||
-- select * from kql(Customers | where FirstName !in ("test", "test2"));
|
||||
-- select * from kql($$Customers | where FirstName !in ("test", "test2")$$);
|
||||
DROP TABLE IF EXISTS Customers;
|
||||
DROP TABLE IF EXISTS kql_table1;
|
||||
DROP TABLE IF EXISTS kql_table2;
|
||||
DROP TABLE IF EXISTS kql_table2;
|
||||
|
@ -0,0 +1 @@
|
||||
Syntax error
|
7
tests/queries/0_stateless/03015_parser_shortcut_lexer_errors.sh
Executable file
7
tests/queries/0_stateless/03015_parser_shortcut_lexer_errors.sh
Executable file
@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_LOCAL --query "SELECT((((((((((SELECT(((((((((SELECT((((((((((SELECT(((((((((SELECT((((((((((SELECT(((((((((SELECT 1+)))))))))))))))))))))))))))))))))))))))))))))))))))))))))'" 2>&1 | grep -o -F 'Syntax error'
|
@ -1706,6 +1706,7 @@ groupBitmap
|
||||
groupBitmapAnd
|
||||
groupBitmapOr
|
||||
groupBitmapXor
|
||||
groupConcat
|
||||
groupUniqArray
|
||||
grouparray
|
||||
grouparrayinsertat
|
||||
@ -1722,6 +1723,7 @@ groupbitmapor
|
||||
groupbitmapxor
|
||||
groupbitor
|
||||
groupbitxor
|
||||
groupconcat
|
||||
groupuniqarray
|
||||
grpc
|
||||
grpcio
|
||||
|
Loading…
Reference in New Issue
Block a user