Merge remote-tracking branch 'upstream/master' into better-read-buffers-2

This commit is contained in:
Ivan Lezhankin 2021-02-04 21:51:23 +03:00
commit 2b9909d396
80 changed files with 1654 additions and 318 deletions

View File

@ -278,6 +278,31 @@ public:
return res / 3600;
}
/** Calculating offset from UTC in seconds.
* which means Using the same literal time of "t" to get the corresponding timestamp in UTC,
* then subtract the former from the latter to get the offset result.
* The boundaries when meets DST(daylight saving time) change should be handled very carefully.
*/
inline time_t timezoneOffset(time_t t) const
{
DayNum index = findIndex(t);
/// Calculate daylight saving offset first.
/// Because the "amount_of_offset_change" in LUT entry only exists in the change day, it's costly to scan it from the very begin.
/// but we can figure out all the accumulated offsets from 1970-01-01 to that day just by get the whole difference between lut[].date,
/// and then, we can directly subtract multiple 86400s to get the real DST offsets for the leap seconds is not considered now.
time_t res = (lut[index].date - lut[0].date) % 86400;
/// As so far to know, the maximal DST offset couldn't be more than 2 hours, so after the modulo operation the remainder
/// will sits between [-offset --> 0 --> offset] which respectively corresponds to moving clock forward or backward.
res = res > 43200 ? (86400 - res) : (0 - res);
/// Check if has a offset change during this day. Add the change when cross the line
if (lut[index].amount_of_offset_change != 0 && t >= lut[index].date + lut[index].time_at_offset_change)
res += lut[index].amount_of_offset_change;
return res + offset_at_start_of_epoch;
}
/** Only for time zones with/when offset from UTC is multiple of five minutes.
* This is true for all time zones: right now, all time zones have an offset that is multiple of 15 minutes.
*

View File

@ -168,14 +168,6 @@ public:
static_assert(sizeof(LocalDate) == 4);
inline std::ostream & operator<< (std::ostream & ostr, const LocalDate & date)
{
return ostr << date.year()
<< '-' << (date.month() / 10) << (date.month() % 10)
<< '-' << (date.day() / 10) << (date.day() % 10);
}
namespace std
{
inline string to_string(const LocalDate & date)

View File

@ -169,20 +169,6 @@ public:
static_assert(sizeof(LocalDateTime) == 8);
inline std::ostream & operator<< (std::ostream & ostr, const LocalDateTime & datetime)
{
ostr << std::setfill('0') << std::setw(4) << datetime.year();
ostr << '-' << (datetime.month() / 10) << (datetime.month() % 10)
<< '-' << (datetime.day() / 10) << (datetime.day() % 10)
<< ' ' << (datetime.hour() / 10) << (datetime.hour() % 10)
<< ':' << (datetime.minute() / 10) << (datetime.minute() % 10)
<< ':' << (datetime.second() / 10) << (datetime.second() % 10);
return ostr;
}
namespace std
{
inline string to_string(const LocalDateTime & datetime)

View File

@ -319,6 +319,7 @@ function run_tests
# In fasttest, ENABLE_LIBRARIES=0, so rocksdb engine is not enabled by default
01504_rocksdb
01686_rocksdb
# Look at DistributedFilesToInsert, so cannot run in parallel.
01460_DistributedFilesToInsert

View File

@ -46,7 +46,7 @@ toc_title: Adopters
| <a href="https://www.exness.com" class="favicon">Exness</a> | Trading | Metrics, Logging | — | — | [Talk in Russian, May 2019](https://youtu.be/_rpU-TvSfZ8?t=3215) |
| <a href="https://fastnetmon.com/" class="favicon">FastNetMon</a> | DDoS Protection | Main Product | | — | [Official website](https://fastnetmon.com/docs-fnm-advanced/fastnetmon-advanced-traffic-persistency/) |
| <a href="https://www.flipkart.com/" class="favicon">Flipkart</a> | e-Commerce | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=239) |
| <a href="https://fun.co/rp" class="favicon">FunCorp</a> | Games | | — | | [Article](https://www.altinity.com/blog/migrating-from-redshift-to-clickhouse) |
| <a href="https://fun.co/rp" class="favicon">FunCorp</a> | Games | | — | 14 bn records/day as of Jan 2021 | [Article](https://www.altinity.com/blog/migrating-from-redshift-to-clickhouse) |
| <a href="https://geniee.co.jp" class="favicon">Geniee</a> | Ad network | Main product | — | — | [Blog post in Japanese, July 2017](https://tech.geniee.co.jp/entry/2017/07/20/160100) |
| <a href="https://www.genotek.ru/" class="favicon">Genotek</a> | Bioinformatics | Main product | — | — | [Video, August 2020](https://youtu.be/v3KyZbz9lEE) |
| <a href="https://www.huya.com/" class="favicon">HUYA</a> | Video Streaming | Analytics | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/7.%20ClickHouse万亿数据分析实践%20李本旺(sundy-li)%20虎牙.pdf) |
@ -74,6 +74,7 @@ toc_title: Adopters
| <a href="https://getnoc.com/" class="favicon">NOC Project</a> | Network Monitoring | Analytics | Main Product | — | [Official Website](https://getnoc.com/features/big-data/) |
| <a href="https://www.nuna.com/" class="favicon">Nuna Inc.</a> | Health Data Analytics | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=170) |
| <a href="https://www.oneapm.com/" class="favicon">OneAPM</a> | Monitorings and Data Analysis | Main product | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/8.%20clickhouse在OneAPM的应用%20杜龙.pdf) |
| <a href="https://panelbear.com/" class="favicon">Panelbear | Analytics | Monitoring and Analytics | — | — | [Tech Stack, November 2020](https://panelbear.com/blog/tech-stack/) |
| <a href="https://www.percent.cn/" class="favicon">Percent 百分点</a> | Analytics | Main Product | — | — | [Slides in Chinese, June 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup24/4.%20ClickHouse万亿数据双中心的设计与实践%20.pdf) |
| <a href="https://www.percona.com/" class="favicon">Percona</a> | Performance analysis | Percona Monitoring and Management | — | — | [Official website, Mar 2020](https://www.percona.com/blog/2020/03/30/advanced-query-analysis-in-percona-monitoring-and-management-with-direct-clickhouse-access/) |
| <a href="https://plausible.io/" class="favicon">Plausible</a> | Analytics | Main Product | — | — | [Blog post, June 2020](https://twitter.com/PlausibleHQ/status/1273889629087969280) |

View File

@ -12,7 +12,7 @@ Columns:
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Timestamp of the sampling moment.
- `event_time_microseconds` ([DateTime](../../sql-reference/data-types/datetime.md)) — Timestamp of the sampling moment with microseconds precision.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Timestamp of the sampling moment with microseconds precision.
- `timestamp_ns` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Timestamp of the sampling moment in nanoseconds.

View File

@ -79,6 +79,40 @@ Result:
└───────────────────────────────────────────────┘
```
# quantilesTimingWeighted {#quantilestimingweighted}
Same as `quantileTimingWeighted`, but accept multiple parameters with quantile levels and return an Array filled with many values of that quantiles.
**Example**
Input table:
``` text
┌─response_time─┬─weight─┐
│ 68 │ 1 │
│ 104 │ 2 │
│ 112 │ 3 │
│ 126 │ 2 │
│ 138 │ 1 │
│ 162 │ 1 │
└───────────────┴────────┘
```
Query:
``` sql
SELECT quantilesTimingWeighted(0,5, 0.99)(response_time, weight) FROM t
```
Result:
``` text
┌─quantilesTimingWeighted(0.5, 0.99)(response_time, weight)─┐
│ [112,162] │
└───────────────────────────────────────────────────────────┘
```
**See Also**
- [median](../../../sql-reference/aggregate-functions/reference/median.md#median)

View File

@ -1,9 +1,14 @@
# [development] Window Functions
---
toc_priority: 62
toc_title: Window Functions
---
# [experimental] Window Functions
!!! warning "Warning"
This is an experimental feature that is currently in development and is not ready
for general use. It will change in unpredictable backwards-incompatible ways in
the future releases.
the future releases. Set `allow_experimental_window_functions = 1` to enable it.
ClickHouse currently supports calculation of aggregate functions over a window.
Pure window functions such as `rank`, `lag`, `lead` and so on are not yet supported.
@ -11,9 +16,7 @@ Pure window functions such as `rank`, `lag`, `lead` and so on are not yet suppor
The window can be specified either with an `OVER` clause or with a separate
`WINDOW` clause.
Only two variants of frame are supported, `ROWS` and `RANGE`. The only supported
frame boundaries are `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`.
Only two variants of frame are supported, `ROWS` and `RANGE`. Offsets for the `RANGE` frame are not yet supported.
## References
@ -28,6 +31,7 @@ https://github.com/ClickHouse/ClickHouse/blob/master/tests/performance/window_fu
https://github.com/ClickHouse/ClickHouse/blob/master/tests/queries/0_stateless/01591_window_functions.sql
### Postgres Docs
https://www.postgresql.org/docs/current/sql-select.html#SQL-WINDOW
https://www.postgresql.org/docs/devel/sql-expressions.html#SYNTAX-WINDOW-FUNCTIONS
https://www.postgresql.org/docs/devel/functions-window.html
https://www.postgresql.org/docs/devel/tutorial-window.html

View File

@ -325,6 +325,51 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
// the generic recursion into IAST.children.
}
void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
{
switch (fuzz_rand() % 40)
{
case 0:
{
const auto r = fuzz_rand() % 3;
frame.type = r == 0 ? WindowFrame::FrameType::Rows
: r == 1 ? WindowFrame::FrameType::Range
: WindowFrame::FrameType::Groups;
break;
}
case 1:
{
const auto r = fuzz_rand() % 3;
frame.begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
break;
}
case 2:
{
const auto r = fuzz_rand() % 3;
frame.end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
break;
}
case 3:
{
frame.begin_offset = getRandomField(0).get<Int64>();
break;
}
case 4:
{
frame.end_offset = getRandomField(0).get<Int64>();
break;
}
default:
break;
}
frame.is_default = (frame == WindowFrame{});
}
void QueryFuzzer::fuzz(ASTs & asts)
{
for (auto & ast : asts)
@ -409,6 +454,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
auto & def = fn->window_definition->as<ASTWindowDefinition &>();
fuzzColumnLikeExpressionList(def.partition_by.get());
fuzzOrderByList(def.order_by.get());
fuzzWindowFrame(def.frame);
}
fuzz(fn->children);
@ -421,6 +467,23 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
fuzz(select->children);
}
/*
* The time to fuzz the settings has not yet come.
* Apparently we don't have any infractructure to validate the values of
* the settings, and the first query with max_block_size = -1 breaks
* because of overflows here and there.
*//*
* else if (auto * set = typeid_cast<ASTSetQuery *>(ast.get()))
* {
* for (auto & c : set->changes)
* {
* if (fuzz_rand() % 50 == 0)
* {
* c.value = fuzzField(c.value);
* }
* }
* }
*/
else if (auto * literal = typeid_cast<ASTLiteral *>(ast.get()))
{
// There is a caveat with fuzzing the children: many ASTs also keep the

View File

@ -14,6 +14,7 @@ namespace DB
class ASTExpressionList;
class ASTOrderByElement;
struct WindowFrame;
/*
* This is an AST-based query fuzzer that makes random modifications to query
@ -65,6 +66,7 @@ struct QueryFuzzer
void fuzzOrderByElement(ASTOrderByElement * elem);
void fuzzOrderByList(IAST * ast);
void fuzzColumnLikeExpressionList(IAST * ast);
void fuzzWindowFrame(WindowFrame & frame);
void fuzz(ASTs & asts);
void fuzz(ASTPtr & ast);
void collectFuzzInfoMain(const ASTPtr ast);

View File

@ -149,7 +149,6 @@ private:
UInt8 strict_order; // When the 'strict_order' is set, it doesn't allow interventions of other events.
// In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2.
// Loop through the entire events_list, update the event timestamp value
// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window.
// If found, returns the max event level, else return 0.

View File

@ -32,6 +32,8 @@ namespace ErrorCodes
* - a histogram (that is, value -> number), consisting of two parts
* -- for values from 0 to 1023 - in increments of 1;
* -- for values from 1024 to 30,000 - in increments of 16;
*
* NOTE: 64-bit integer weight can overflow, see also QantileExactWeighted.h::get()
*/
#define TINY_MAX_ELEMS 31
@ -396,9 +398,9 @@ namespace detail
/// Get the value of the `level` quantile. The level must be between 0 and 1.
UInt16 get(double level) const
{
UInt64 pos = std::ceil(count * level);
double pos = std::ceil(count * level);
UInt64 accumulated = 0;
double accumulated = 0;
Iterator it(*this);
while (it.isValid())
@ -422,9 +424,9 @@ namespace detail
const auto * indices_end = indices + size;
const auto * index = indices;
UInt64 pos = std::ceil(count * levels[*index]);
double pos = std::ceil(count * levels[*index]);
UInt64 accumulated = 0;
double accumulated = 0;
Iterator it(*this);
while (it.isValid())

View File

@ -34,7 +34,15 @@ public:
std::optional<std::string> file;
std::optional<UInt64> line;
};
static constexpr size_t capacity = 32;
static constexpr size_t capacity =
#ifndef NDEBUG
/* The stacks are normally larger in debug version due to less inlining. */
64
#else
32
#endif
;
using FramePointers = std::array<void *, capacity>;
using Frames = std::array<Frame, capacity>;

View File

@ -121,7 +121,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
out = std::make_shared<PushingToViewsBlockOutputStream>(
dependent_table, dependent_metadata_snapshot, *insert_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr});
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr, 0 /* elapsed_ms */});
}
/// Do not push to destination table if the flag is set
@ -146,8 +146,6 @@ Block PushingToViewsBlockOutputStream::getHeader() const
void PushingToViewsBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
/** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match.
* We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes.
* NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required),
@ -177,15 +175,15 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
{
// Push to views concurrently if enabled and more than one view is attached
ThreadPool pool(std::min(size_t(settings.max_threads), views.size()));
for (size_t view_num = 0; view_num < views.size(); ++view_num)
for (auto & view : views)
{
auto thread_group = CurrentThread::getGroup();
pool.scheduleOrThrowOnError([=, this]
pool.scheduleOrThrowOnError([=, &view, this]
{
setThreadName("PushingToViews");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
process(block, view_num);
process(block, view);
});
}
// Wait for concurrent view processing
@ -194,22 +192,14 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
else
{
// Process sequentially
for (size_t view_num = 0; view_num < views.size(); ++view_num)
for (auto & view : views)
{
process(block, view_num);
process(block, view);
if (views[view_num].exception)
std::rethrow_exception(views[view_num].exception);
if (view.exception)
std::rethrow_exception(view.exception);
}
}
UInt64 milliseconds = watch.elapsedMilliseconds();
if (views.size() > 1)
{
LOG_TRACE(log, "Pushing from {} to {} views took {} ms.",
storage->getStorageID().getNameForLogs(), views.size(),
milliseconds);
}
}
void PushingToViewsBlockOutputStream::writePrefix()
@ -257,12 +247,13 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (view.exception)
continue;
pool.scheduleOrThrowOnError([thread_group, &view]
pool.scheduleOrThrowOnError([thread_group, &view, this]
{
setThreadName("PushingToViews");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
Stopwatch watch;
try
{
view.out->writeSuffix();
@ -271,6 +262,12 @@ void PushingToViewsBlockOutputStream::writeSuffix()
{
view.exception = std::current_exception();
}
view.elapsed_ms += watch.elapsedMilliseconds();
LOG_TRACE(log, "Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.elapsed_ms);
});
}
// Wait for concurrent view processing
@ -290,6 +287,7 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (parallel_processing)
continue;
Stopwatch watch;
try
{
view.out->writeSuffix();
@ -299,10 +297,24 @@ void PushingToViewsBlockOutputStream::writeSuffix()
ex.addMessage("while write prefix to view " + view.table_id.getNameForLogs());
throw;
}
view.elapsed_ms += watch.elapsedMilliseconds();
LOG_TRACE(log, "Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
view.elapsed_ms);
}
if (first_exception)
std::rethrow_exception(first_exception);
UInt64 milliseconds = main_watch.elapsedMilliseconds();
if (views.size() > 1)
{
LOG_TRACE(log, "Pushing from {} to {} views took {} ms.",
storage->getStorageID().getNameForLogs(), views.size(),
milliseconds);
}
}
void PushingToViewsBlockOutputStream::flush()
@ -314,10 +326,9 @@ void PushingToViewsBlockOutputStream::flush()
view.out->flush();
}
void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_num)
void PushingToViewsBlockOutputStream::process(const Block & block, ViewInfo & view)
{
Stopwatch watch;
auto & view = views[view_num];
try
{
@ -379,11 +390,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
view.exception = std::current_exception();
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_TRACE(log, "Pushing from {} to {} took {} ms.",
storage->getStorageID().getNameForLogs(),
view.table_id.getNameForLogs(),
milliseconds);
view.elapsed_ms += watch.elapsedMilliseconds();
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Common/Stopwatch.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage.h>
@ -44,6 +45,7 @@ private:
const Context & context;
ASTPtr query_ptr;
Stopwatch main_watch;
struct ViewInfo
{
@ -51,13 +53,14 @@ private:
StorageID table_id;
BlockOutputStreamPtr out;
std::exception_ptr exception;
UInt64 elapsed_ms = 0;
};
std::vector<ViewInfo> views;
std::unique_ptr<Context> select_context;
std::unique_ptr<Context> insert_context;
void process(const Block & block, size_t view_num);
void process(const Block & block, ViewInfo & view);
};

View File

@ -1291,7 +1291,6 @@ void CacheDictionary::update(UpdateUnitPtr & update_unit_ptr)
BlockInputStreamPtr stream = current_source_ptr->loadIds(update_unit_ptr->requested_ids);
stream->readPrefix();
while (true)
{
Block block = stream->read();

View File

@ -186,6 +186,9 @@ namespace
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
if (thread.joinable())
thread.join();
command->wait();
}

View File

@ -117,3 +117,6 @@ target_link_libraries(clickhouse_functions PRIVATE clickhouse_functions_array)
if (USE_STATS)
target_link_libraries(clickhouse_functions PRIVATE stats)
endif()
# Signed integer overflow on user-provided data inside boost::geometry - ignore.
set_source_files_properties("pointInPolygon.cpp" PROPERTIES COMPILE_FLAGS -fno-sanitize=signed-integer-overflow)

View File

@ -407,6 +407,23 @@ struct ToHourImpl
using FactorTransform = ToDateImpl;
};
struct TimezoneOffsetImpl
{
static constexpr auto name = "timezoneOffset";
static inline time_t execute(UInt32 t, const DateLUTImpl & time_zone)
{
return time_zone.timezoneOffset(t);
}
static inline time_t execute(UInt16, const DateLUTImpl &)
{
return dateIsNotSupported(name);
}
using FactorTransform = ToTimeImpl;
};
struct ToMinuteImpl
{
static constexpr auto name = "toMinute";

View File

@ -69,6 +69,8 @@ void registerFunctionFormatDateTime(FunctionFactory &);
void registerFunctionFromModifiedJulianDay(FunctionFactory &);
void registerFunctionDateTrunc(FunctionFactory &);
void registerFunctiontimezoneOffset(FunctionFactory &);
void registerFunctionsDateTime(FunctionFactory & factory)
{
registerFunctionToYear(factory);
@ -136,6 +138,7 @@ void registerFunctionsDateTime(FunctionFactory & factory)
registerFunctionFormatDateTime(factory);
registerFunctionFromModifiedJulianDay(factory);
registerFunctionDateTrunc(factory);
registerFunctiontimezoneOffset(factory);
}
}

View File

@ -0,0 +1,19 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
using FunctiontimezoneOffset = FunctionDateOrDateTimeToSomething<DataTypeInt32, TimezoneOffsetImpl>;
void registerFunctiontimezoneOffset(FunctionFactory & factory)
{
factory.registerFunction<FunctiontimezoneOffset>();
}
}

View File

@ -452,6 +452,7 @@ SRCS(
timeSlot.cpp
timeSlots.cpp
timezone.cpp
timezoneOffset.cpp
toColumnTypeName.cpp
toCustomWeek.cpp
toDayOfMonth.cpp

View File

@ -4,6 +4,7 @@
#include <cstring>
#include <memory>
#include <iostream>
#include <cassert>
#include <Common/Exception.h>
#include <IO/BufferBase.h>
@ -37,7 +38,7 @@ public:
*/
inline void next()
{
if (!offset() && available())
if (!offset())
return;
bytes += offset();
@ -73,6 +74,9 @@ public:
{
size_t bytes_copied = 0;
/// Produces endless loop
assert(working_buffer.size() > 0);
while (bytes_copied < n)
{
nextIfAtEnd();

View File

@ -6,6 +6,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
std::string WindowFunctionDescription::dump() const
{
WriteBufferFromOwnString ss;
@ -33,4 +38,95 @@ std::string WindowDescription::dump() const
return ss.str();
}
std::string WindowFrame::toString() const
{
WriteBufferFromOwnString buf;
toString(buf);
return buf.str();
}
void WindowFrame::toString(WriteBuffer & buf) const
{
buf << toString(type) << " BETWEEN ";
if (begin_type == BoundaryType::Current)
{
buf << "CURRENT ROW";
}
else if (begin_type == BoundaryType::Unbounded)
{
buf << "UNBOUNDED PRECEDING";
}
else
{
buf << abs(begin_offset);
buf << " "
<< (begin_offset > 0 ? "FOLLOWING" : "PRECEDING");
}
buf << " AND ";
if (end_type == BoundaryType::Current)
{
buf << "CURRENT ROW";
}
else if (end_type == BoundaryType::Unbounded)
{
buf << "UNBOUNDED PRECEDING";
}
else
{
buf << abs(end_offset);
buf << " "
<< (end_offset > 0 ? "FOLLOWING" : "PRECEDING");
}
}
void WindowFrame::checkValid() const
{
if (begin_type == BoundaryType::Unbounded
|| end_type == BoundaryType::Unbounded)
{
return;
}
if (begin_type == BoundaryType::Current
&& end_type == BoundaryType::Offset
&& end_offset > 0)
{
return;
}
if (end_type == BoundaryType::Current
&& begin_type == BoundaryType::Offset
&& begin_offset < 0)
{
return;
}
if (end_type == BoundaryType::Current
&& begin_type == BoundaryType::Current)
{
// BETWEEN CURRENT ROW AND CURRENT ROW makes some sense for RANGE or
// GROUP frames, and is technically valid for ROWS frame.
return;
}
if (end_type == BoundaryType::Offset
&& begin_type == BoundaryType::Offset)
{
if (type == FrameType::Rows)
{
if (end_offset >= begin_offset)
{
return;
}
}
// For RANGE and GROUPS, we must check that end follows begin if sorted
// according to ORDER BY (we don't support them yet).
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window frame '{}' is invalid",
toString());
}
}

View File

@ -53,6 +53,13 @@ struct WindowFrame
int64_t end_offset = 0;
// Throws BAD_ARGUMENTS exception if the frame definition is incorrect, e.g.
// the frame start comes later than the frame end.
void checkValid() const;
std::string toString() const;
void toString(WriteBuffer & buf) const;
bool operator == (const WindowFrame & other) const
{
// We don't compare is_default because it's not a real property of the

View File

@ -145,6 +145,7 @@ SRCS(
TranslateQualifiedNamesVisitor.cpp
TreeOptimizer.cpp
TreeRewriter.cpp
WindowDescription.cpp
addMissingDefaults.cpp
addTypeConversionToAST.cpp
castColumn.cpp

View File

@ -46,6 +46,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SYNTAX_ERROR;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
@ -558,7 +559,24 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.begin_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.",
Field::Types::toString(value.getType()));
}
node->frame.begin_offset = value.get<Int64>();
node->frame.begin_type = WindowFrame::BoundaryType::Offset;
// We can easily get a UINT64_MAX here, which doesn't even fit into
// int64_t. Not sure what checks we are going to need here after we
// support floats and dates.
if (node->frame.begin_offset > INT_MAX || node->frame.begin_offset < INT_MIN)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset must be between {} and {}, but {} is given",
INT_MAX, INT_MIN, node->frame.begin_offset);
}
}
else
{
@ -567,7 +585,7 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (keyword_preceding.ignore(pos, expected))
{
node->frame.begin_offset = - node->frame.begin_offset;
node->frame.begin_offset = -node->frame.begin_offset;
}
else if (keyword_following.ignore(pos, expected))
{
@ -604,7 +622,22 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
}
else if (parser_literal.parse(pos, ast_literal, expected))
{
node->frame.end_offset = ast_literal->as<ASTLiteral &>().value.safeGet<Int64>();
const Field & value = ast_literal->as<ASTLiteral &>().value;
if (!isInt64FieldType(value.getType()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Only integer frame offsets are supported, '{}' is not supported.",
Field::Types::toString(value.getType()));
}
node->frame.end_offset = value.get<Int64>();
node->frame.end_type = WindowFrame::BoundaryType::Offset;
if (node->frame.end_offset > INT_MAX || node->frame.end_offset < INT_MIN)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame offset must be between {} and {}, but {} is given",
INT_MAX, INT_MIN, node->frame.end_offset);
}
}
else
{
@ -623,6 +656,7 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
}
else if (keyword_following.ignore(pos, expected))
{
// Positive offset or UNBOUNDED FOLLOWING.
}
else
{

View File

@ -147,12 +147,13 @@ namespace DB
/// We want to preallocate memory buffer (increase capacity)
/// and put the pointer at the beginning of the buffer
unit.segment.resize(DBMS_DEFAULT_BUFFER_SIZE);
/// The second invocation won't release memory, only set size equals to 0.
unit.segment.resize(0);
unit.actual_memory_size = 0;
BufferWithOutsideMemory<WriteBuffer> out_buffer(unit.segment);
/// The second invocation won't release memory, only set size equals to 0.
unit.segment.resize(0);
auto formatter = internal_formatter_creator(out_buffer);
switch (unit.type)

View File

@ -57,6 +57,7 @@ WindowStep::WindowStep(const DataStream & input_stream_,
{
// We don't remove any columns, only add, so probably we don't have to update
// the output DataStream::distinct_columns.
window_description.frame.checkValid();
}
void WindowStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -165,16 +165,197 @@ void WindowTransform::advancePartitionEnd()
assert(!partition_ended && partition_end == blocksEnd());
}
void WindowTransform::advanceFrameStart() const
auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int offset) const
{
// Frame start is always UNBOUNDED PRECEDING for now, so we don't have to
// move it. It is initialized when the new partition starts.
if (window_description.frame.begin_type
!= WindowFrame::BoundaryType::Unbounded)
RowNumber x = _x;
if (offset > 0)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start type '{}' is not implemented",
WindowFrame::toString(window_description.frame.begin_type));
for (;;)
{
assertValid(x);
assert(offset >= 0);
const auto block_rows = blockRowsNumber(x);
x.row += offset;
if (x.row >= block_rows)
{
offset = x.row - block_rows;
x.row = 0;
x.block++;
if (x == blocksEnd())
{
break;
}
}
else
{
offset = 0;
break;
}
}
}
else if (offset < 0)
{
for (;;)
{
assertValid(x);
assert(offset <= 0);
if (x.row >= static_cast<uint64_t>(-offset))
{
x.row -= -offset;
offset = 0;
break;
}
// Move to the first row in current block. Note that the offset is
// negative.
offset += x.row;
x.row = 0;
// Move to the last row of the previous block, if we are not at the
// first one. Offset also is incremented by one, because we pass over
// the first row of this block.
if (x.block == first_block_number)
{
break;
}
--x.block;
offset += 1;
x.row = blockRowsNumber(x) - 1;
}
}
return std::tuple{x, offset};
}
auto WindowTransform::moveRowNumber(const RowNumber & _x, int offset) const
{
auto [x, o] = moveRowNumberNoCheck(_x, offset);
#ifndef NDEBUG
// Check that it was reversible.
auto [xx, oo] = moveRowNumberNoCheck(x, -(offset - o));
// fmt::print(stderr, "{} -> {}, result {}, {}, new offset {}, twice {}, {}\n",
// _x, offset, x, o, -(offset - o), xx, oo);
assert(xx == _x);
assert(oo == 0);
#endif
return std::tuple{x, o};
}
void WindowTransform::advanceFrameStartRowsOffset()
{
// Just recalculate it each time by walking blocks.
const auto [moved_row, offset_left] = moveRowNumber(current_row,
window_description.frame.begin_offset);
frame_start = moved_row;
assertValid(frame_start);
// fmt::print(stderr, "frame start {} left {} partition start {}\n",
// frame_start, offset_left, partition_start);
if (frame_start <= partition_start)
{
// Got to the beginning of partition and can't go further back.
frame_start = partition_start;
frame_started = true;
return;
}
if (partition_end <= frame_start)
{
// A FOLLOWING frame start ran into the end of partition.
frame_start = partition_end;
frame_started = partition_ended;
return;
}
// Handled the equality case above. Now the frame start is inside the
// partition, if we walked all the offset, it's final.
assert(partition_start < frame_start);
frame_started = offset_left == 0;
// If we ran into the start of data (offset left is negative), we won't be
// able to make progress. Should have handled this case above.
assert(offset_left >= 0);
}
void WindowTransform::advanceFrameStartChoose()
{
switch (window_description.frame.begin_type)
{
case WindowFrame::BoundaryType::Unbounded:
// UNBOUNDED PRECEDING, just mark it valid. It is initialized when
// the new partition starts.
frame_started = true;
return;
case WindowFrame::BoundaryType::Current:
// CURRENT ROW differs between frame types only in how the peer
// groups are accounted.
assert(partition_start <= peer_group_start);
assert(peer_group_start < partition_end);
assert(peer_group_start <= current_row);
frame_start = peer_group_start;
frame_started = true;
return;
case WindowFrame::BoundaryType::Offset:
switch (window_description.frame.type)
{
case WindowFrame::FrameType::Rows:
advanceFrameStartRowsOffset();
return;
default:
// Fallthrough to the "not implemented" error.
break;
}
break;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Frame start type '{}' for frame '{}' is not implemented",
WindowFrame::toString(window_description.frame.begin_type),
WindowFrame::toString(window_description.frame.type));
}
void WindowTransform::advanceFrameStart()
{
if (frame_started)
{
return;
}
const auto frame_start_before = frame_start;
advanceFrameStartChoose();
assert(frame_start_before <= frame_start);
if (frame_start == frame_start_before)
{
// If the frame start didn't move, this means we validated that the frame
// starts at the point we reached earlier but were unable to validate.
// This probably only happens in degenerate cases where the frame start
// is further than the end of partition, and the partition ends at the
// last row of the block, but we can only tell for sure after a new
// block arrives. We still have to update the state of aggregate
// functions when the frame start becomes valid, so we continue.
assert(frame_started);
}
assert(partition_start <= frame_start);
assert(frame_start <= partition_end);
if (partition_ended && frame_start == partition_end)
{
// Check that if the start of frame (e.g. FOLLOWING) runs into the end
// of partition, it is marked as valid -- we can't advance it any
// further.
assert(frame_started);
}
}
@ -257,18 +438,15 @@ void WindowTransform::advanceFrameEndCurrentRow()
// fmt::print(stderr, "first row {} last {}\n", frame_end.row, rows_end);
// We could retreat the frame_end here, but for some reason I am reluctant
// to do this... It would have better data locality.
auto reference = current_row;
// Advance frame_end while it is still peers with the current row.
for (; frame_end.row < rows_end; ++frame_end.row)
{
if (!arePeers(reference, frame_end))
if (!arePeers(current_row, frame_end))
{
// fmt::print(stderr, "{} and {} don't match\n", reference, frame_end);
frame_ended = true;
return;
}
reference = frame_end;
}
// Might have gotten to the end of the current block, have to properly
@ -291,6 +469,39 @@ void WindowTransform::advanceFrameEndUnbounded()
frame_ended = partition_ended;
}
void WindowTransform::advanceFrameEndRowsOffset()
{
// Walk the specified offset from the current row. The "+1" is needed
// because the frame_end is a past-the-end pointer.
const auto [moved_row, offset_left] = moveRowNumber(current_row,
window_description.frame.end_offset + 1);
if (partition_end <= moved_row)
{
// Clamp to the end of partition. It might not have ended yet, in which
// case wait for more data.
frame_end = partition_end;
frame_ended = partition_ended;
return;
}
if (moved_row <= partition_start)
{
// Clamp to the start of partition.
frame_end = partition_start;
frame_ended = true;
return;
}
// Frame end inside partition, if we walked all the offset, it's final.
frame_end = moved_row;
frame_ended = offset_left == 0;
// If we ran into the start of data (offset left is negative), we won't be
// able to make progress. Should have handled this case above.
assert(offset_left >= 0);
}
void WindowTransform::advanceFrameEnd()
{
// No reason for this function to be called again after it succeeded.
@ -301,16 +512,23 @@ void WindowTransform::advanceFrameEnd()
switch (window_description.frame.end_type)
{
case WindowFrame::BoundaryType::Current:
// The only frame end we have for now is CURRENT ROW.
advanceFrameEndCurrentRow();
break;
case WindowFrame::BoundaryType::Unbounded:
advanceFrameEndUnbounded();
break;
case WindowFrame::BoundaryType::Offset:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The frame end type '{}' is not implemented",
WindowFrame::toString(window_description.frame.end_type));
switch (window_description.frame.type)
{
case WindowFrame::FrameType::Rows:
advanceFrameEndRowsOffset();
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The frame end type '{}' is not implemented",
WindowFrame::toString(window_description.frame.end_type));
}
break;
}
// fmt::print(stderr, "frame_end {} -> {}\n", frame_end_before, frame_end);
@ -321,44 +539,81 @@ void WindowTransform::advanceFrameEnd()
{
return;
}
}
// Add the rows over which we advanced the frame to the aggregate function
// states. We could have advanced over at most the entire last block.
uint64_t rows_end = frame_end.row;
if (frame_end.row == 0)
// Update the aggregation states after the frame has changed.
void WindowTransform::updateAggregationState()
{
// fmt::print(stderr, "update agg states [{}, {}) -> [{}, {})\n",
// prev_frame_start, prev_frame_end, frame_start, frame_end);
// Assert that the frame boundaries are known, have proper order wrt each
// other, and have not gone back wrt the previous frame.
assert(frame_started);
assert(frame_ended);
assert(frame_start <= frame_end);
assert(prev_frame_start <= prev_frame_end);
assert(prev_frame_start <= frame_start);
assert(prev_frame_end <= frame_end);
// We might have to reset aggregation state and/or add some rows to it.
// Figure out what to do.
bool reset_aggregation = false;
RowNumber rows_to_add_start;
RowNumber rows_to_add_end;
if (frame_start == prev_frame_start)
{
assert(frame_end == blocksEnd());
rows_end = blockRowsNumber(frame_end_before);
// The frame start didn't change, add the tail rows.
reset_aggregation = false;
rows_to_add_start = prev_frame_end;
rows_to_add_end = frame_end;
}
else
{
assert(frame_end_before.block == frame_end.block);
// The frame start changed, reset the state and aggregate over the
// entire frame. This can be made per-function after we learn to
// subtract rows from some types of aggregation states, but for now we
// always have to reset when the frame start changes.
reset_aggregation = true;
rows_to_add_start = frame_start;
rows_to_add_end = frame_end;
}
// Equality would mean "no data to process", for which we checked above.
assert(frame_end_before.row < rows_end);
for (auto & ws : workspaces)
{
if (frame_end_before.block != ws.cached_block_number)
{
const auto & block
= blocks[frame_end_before.block - first_block_number];
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(block.input_columns[i].get());
}
ws.cached_block_number = frame_end_before.block;
}
const auto * a = ws.window_function.aggregate_function.get();
auto * buf = ws.aggregate_function_state.data();
auto * columns = ws.argument_columns.data();
for (auto row = frame_end_before.row; row < rows_end; ++row)
if (reset_aggregation)
{
a->add(buf, columns, row, arena.get());
// fmt::print(stderr, "(2) reset aggregation\n");
a->destroy(buf);
a->create(buf);
}
for (auto row = rows_to_add_start; row < rows_to_add_end;
advanceRowNumber(row))
{
if (row.block != ws.cached_block_number)
{
const auto & block
= blocks[row.block - first_block_number];
ws.argument_columns.clear();
for (const auto i : ws.argument_column_indices)
{
ws.argument_columns.push_back(block.input_columns[i].get());
}
ws.cached_block_number = row.block;
}
// fmt::print(stderr, "(2) add row {}\n", row);
auto * columns = ws.argument_columns.data();
a->add(buf, columns, row.row, arena.get());
}
}
prev_frame_start = frame_start;
prev_frame_end = frame_end;
}
void WindowTransform::writeOutCurrentRow()
@ -414,8 +669,8 @@ void WindowTransform::appendChunk(Chunk & chunk)
for (;;)
{
advancePartitionEnd();
// fmt::print(stderr, "partition [?, {}), {}\n",
// partition_end, partition_ended);
// fmt::print(stderr, "partition [{}, {}), {}\n",
// partition_start, partition_end, partition_ended);
// Either we ran out of data or we found the end of partition (maybe
// both, but this only happens at the total end of data).
@ -430,15 +685,38 @@ void WindowTransform::appendChunk(Chunk & chunk)
// which is precisely the definition of `partition_end`.
while (current_row < partition_end)
{
// Advance the frame start, updating the state of the aggregate
// functions.
advanceFrameStart();
// Advance the frame end, updating the state of the aggregate
// functions.
advanceFrameEnd();
// fmt::print(stderr, "(1) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// fmt::print(stderr, "row {} frame [{}, {}) {}\n",
// current_row, frame_start, frame_end, frame_ended);
// We now know that the current row is valid, so we can update the
// peer group start.
if (!arePeers(peer_group_start, current_row))
{
peer_group_start = current_row;
}
// Advance the frame start.
advanceFrameStart();
if (!frame_started)
{
// Wait for more input data to find the start of frame.
assert(!input_is_finished);
assert(!partition_ended);
return;
}
// frame_end must be greater or equal than frame_start, so if the
// frame_start is already past the current frame_end, we can start
// from it to save us some work.
if (frame_end < frame_start)
{
frame_end = frame_start;
}
// Advance the frame end.
advanceFrameEnd();
if (!frame_ended)
{
@ -448,16 +726,34 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}
// The frame shouldn't be empty (probably?).
assert(frame_start < frame_end);
// fmt::print(stderr, "(2) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// The frame can be empty sometimes, e.g. the boundaries coincide
// or the start is after the partition end. But hopefully start is
// not after end.
assert(frame_started);
assert(frame_ended);
assert(frame_start <= frame_end);
// Now that we know the new frame boundaries, update the aggregation
// states. Theoretically we could do this simultaneously with moving
// the frame boundaries, but it would require some care not to
// perform unnecessary work while we are still looking for the frame
// start, so do it the simple way for now.
updateAggregationState();
// Write out the aggregation results.
writeOutCurrentRow();
// Move to the next row. The frame will have to be recalculated.
// The peer group start is updated at the beginning of the loop,
// because current_row might now be past-the-end.
advanceRowNumber(current_row);
first_not_ready_row = current_row;
frame_ended = false;
frame_started = false;
}
if (input_is_finished)
@ -478,15 +774,18 @@ void WindowTransform::appendChunk(Chunk & chunk)
}
// Start the next partition.
const auto new_partition_start = partition_end;
partition_start = partition_end;
advanceRowNumber(partition_end);
partition_ended = false;
// We have to reset the frame when the new partition starts. This is not a
// generally correct way to do so, but we don't really support moving frame
// for now.
frame_start = new_partition_start;
frame_end = new_partition_start;
assert(current_row == new_partition_start);
frame_start = partition_start;
frame_end = partition_start;
prev_frame_start = partition_start;
prev_frame_end = partition_start;
assert(current_row == partition_start);
peer_group_start = partition_start;
// fmt::print(stderr, "reinitialize agg data at start of {}\n",
// new_partition_start);
@ -534,6 +833,15 @@ IProcessor::Status WindowTransform::prepare()
return Status::Finished;
}
if (output_data.exception)
{
// An exception occurred during processing.
output.pushData(std::move(output_data));
output.finish();
input.close();
return Status::Finished;
}
assert(first_not_ready_row.block >= first_block_number);
// The first_not_ready_row might be past-the-end if we have already
// calculated the window functions for all input rows. That's why the
@ -665,6 +973,7 @@ void WindowTransform::work()
assert(next_output_block_number >= first_block_number);
assert(frame_start.block >= first_block_number);
assert(current_row.block >= first_block_number);
assert(peer_group_start.block >= first_block_number);
}
}

View File

@ -53,6 +53,11 @@ struct RowNumber
{
return block == other.block && row == other.row;
}
bool operator <= (const RowNumber & other) const
{
return *this < other || *this == other;
}
};
/*
@ -101,11 +106,15 @@ public:
private:
void advancePartitionEnd();
void advanceFrameStart() const;
void advanceFrameEnd();
void advanceFrameStart();
void advanceFrameStartChoose();
void advanceFrameStartRowsOffset();
void advanceFrameEndCurrentRow();
void advanceFrameEndUnbounded();
void advanceFrameEndRowsOffset();
void advanceFrameEnd();
bool arePeers(const RowNumber & x, const RowNumber & y) const;
void updateAggregationState();
void writeOutCurrentRow();
Columns & inputAt(const RowNumber & x)
@ -169,9 +178,28 @@ private:
#endif
}
auto moveRowNumber(const RowNumber & _x, int offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int offset) const;
void assertValid(const RowNumber & x) const
{
assert(x.block >= first_block_number);
if (x.block == first_block_number + blocks.size())
{
assert(x.row == 0);
}
else
{
assert(x.row < blockRowsNumber(x));
}
}
RowNumber blocksEnd() const
{ return RowNumber{first_block_number + blocks.size(), 0}; }
RowNumber blocksBegin() const
{ return RowNumber{first_block_number, 0}; }
public:
/*
* Data (formerly) inherited from ISimpleTransform, needed for the
@ -217,18 +245,26 @@ public:
// Used to determine which resulting blocks we can pass to the consumer.
RowNumber first_not_ready_row;
// We don't keep the pointer to start of partition, because we don't really
// need it, and we want to be able to drop the starting blocks to save memory.
// The `partition_end` is past-the-end, as usual. When partition_ended = false,
// it still haven't ended, and partition_end is the next row to check.
// Boundaries of the current partition.
// partition_start doesn't point to a valid block, because we want to drop
// the blocks early to save memory. We still have to track it so that we can
// cut off a PRECEDING frame at the partition start.
// The `partition_end` is past-the-end, as usual. When
// partition_ended = false, it still haven't ended, and partition_end is the
// next row to check.
RowNumber partition_start;
RowNumber partition_end;
bool partition_ended = false;
// This is the row for which we are computing the window functions now.
// The row for which we are now computing the window functions.
RowNumber current_row;
// The start of current peer group, needed for CURRENT ROW frame start.
// For ROWS frame, always equal to the current row, and for RANGE and GROUP
// frames may be earlier.
RowNumber peer_group_start;
// The frame is [frame_start, frame_end) if frame_ended, and unknown
// otherwise. Note that when we move to the next row, both the
// The frame is [frame_start, frame_end) if frame_ended && frame_started,
// and unknown otherwise. Note that when we move to the next row, both the
// frame_start and the frame_end may jump forward by an unknown amount of
// blocks, e.g. if we use a RANGE frame. This means that sometimes we don't
// know neither frame_end nor frame_start.
@ -239,6 +275,13 @@ public:
RowNumber frame_start;
RowNumber frame_end;
bool frame_ended = false;
bool frame_started = false;
// The previous frame boundaries that correspond to the current state of the
// aggregate function. We use them to determine how to update the aggregation
// state after we find the new frame.
RowNumber prev_frame_start;
RowNumber prev_frame_end;
};
}

View File

@ -1,5 +1,7 @@
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
@ -184,6 +186,37 @@ namespace
return disk->getDirectorySyncGuard(path);
return nullptr;
}
void writeRemoteConvert(const DistributedHeader & header, RemoteBlockOutputStream & remote, ReadBufferFromFile & in, Poco::Logger * log)
{
if (remote.getHeader() && header.header != remote.getHeader().dumpStructure())
{
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
remote.getHeader().dumpStructure(), header.header);
CompressedReadBuffer decompressing_in(in);
/// Lack of header, requires to read blocks
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
block_in.readPrefix();
while (Block block = block_in.read())
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
remote.getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
remote.write(adopted_block);
}
block_in.readSuffix();
}
else
{
CheckingCompressedReadBuffer checking_in(in);
remote.writePrepared(checking_in);
}
}
}
@ -438,11 +471,8 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto connection = pool->get(timeouts, &header.insert_settings);
RemoteBlockOutputStream remote{*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info};
CheckingCompressedReadBuffer checking_in(in);
remote.writePrefix();
remote.writePrepared(checking_in);
writeRemoteConvert(header, remote, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
@ -560,7 +590,6 @@ struct StorageDistributedDirectoryMonitor::Batch
try
{
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
for (UInt64 file_idx : file_indices)
{
@ -575,16 +604,14 @@ struct StorageDistributedDirectoryMonitor::Batch
ReadBufferFromFile in(file_path->second);
const auto & header = readDistributedHeader(in, parent.log);
if (first)
if (!remote)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts,
header.insert_query, header.insert_settings, header.client_info);
remote->writePrefix();
}
CheckingCompressedReadBuffer checking_in(in);
remote->writePrepared(checking_in);
writeRemoteConvert(header, *remote, in, parent.log);
}
if (remote)

View File

@ -60,24 +60,26 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats)
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
{
if (!blocksHaveEqualStructure(out->getHeader(), block))
{
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
out->getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name);
auto adopted_block = convert.read();
if (blocksHaveEqualStructure(header, block))
return block;
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
else
{
for (size_t i = 0; i < repeats; ++i)
out->write(block);
}
LOG_WARNING(log,
"Structure does not match (remote: {}, local: {}), implicit conversion will be done.",
header.dumpStructure(), block.dumpStructure());
ConvertingBlockInputStream convert(
std::make_shared<OneBlockInputStream>(block),
header,
ConvertingBlockInputStream::MatchColumnsMode::Name);
return convert.read();
}
static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & block, const size_t repeats, Poco::Logger * log)
{
Block adopted_block = adoptBlock(out->getHeader(), block, log);
for (size_t i = 0; i < repeats; ++i)
out->write(adopted_block);
}
@ -343,7 +345,9 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(shard_block);
Block adopted_shard_block = adoptBlock(job.stream->getHeader(), shard_block, log);
job.stream->write(adopted_shard_block);
}
else // local
{
@ -367,7 +371,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
job.stream->writePrefix();
}
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount());
writeBlockConvert(job.stream, shard_block, shard_info.getLocalNodeCount(), log);
}
job.blocks_written += 1;
@ -589,7 +593,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
auto block_io = interp.execute();
block_io.out->writePrefix();
writeBlockConvert(block_io.out, block, repeats);
writeBlockConvert(block_io.out, block, repeats, log);
block_io.out->writeSuffix();
}

View File

@ -6,11 +6,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_CREATE_IO_BUFFER;
}
KafkaBlockOutputStream::KafkaBlockOutputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -29,8 +24,6 @@ Block KafkaBlockOutputStream::getHeader() const
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer(getHeader());
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
auto format_settings = getFormatSettings(*context);
format_settings.protobuf.allow_many_rows_no_delimiters = true;

View File

@ -42,6 +42,8 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
timestamp_column_index = column_index;
}
}
reinitializeChunks();
}
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
@ -108,9 +110,7 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
break;
}
rows = 0;
chunks.clear();
set(nullptr, 0);
reinitializeChunks();
}
}
@ -135,10 +135,25 @@ void WriteBufferToKafkaProducer::flush()
}
void WriteBufferToKafkaProducer::nextImpl()
{
addChunk();
}
void WriteBufferToKafkaProducer::addChunk()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
void WriteBufferToKafkaProducer::reinitializeChunks()
{
rows = 0;
chunks.clear();
/// We cannot leave the buffer in the undefined state (i.e. without any
/// underlying buffer), since in this case the WriteBuffeR::next() will
/// not call our nextImpl() (due to available() == 0)
addChunk();
}
}

View File

@ -30,6 +30,8 @@ public:
private:
void nextImpl() override;
void addChunk();
void reinitializeChunks();
ProducerPtr producer;
const std::string topic;

View File

@ -1266,7 +1266,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
}
}
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), query_id, data, " with order");
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), query_id, data, "with order");
if (input_order_info->direction != 1)
{

View File

@ -16,6 +16,10 @@ class ASTStorage;
struct Settings;
/** These settings represent fine tunes for internal details of MergeTree storages
* and should not be changed by the user without a reason.
*/
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
M(UInt64, min_compress_block_size, 0, "When granule is written, compress the data in buffer if the size of pending uncompressed data is larger or equal than the specified threshold. If this setting is not set, the corresponding global setting is used.", 0) \
M(UInt64, max_compress_block_size, 0, "Compress the pending uncompressed data in buffer if its size is larger or equal than the specified threshold. Block of data will be compressed even if the current granule is not finished. If this setting is not set, the corresponding global setting is used.", 0) \
@ -40,7 +44,7 @@ struct Settings;
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(UInt64, max_number_of_merges_with_ttl_in_pool, 2, "When there is more than specified number of merges with TTL entries in pool, do not assign new merge with TTL. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \
M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories. You should not lower this value because merges and mutations may not be able to work with low value of this setting.", 0) \
M(Seconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \
M(UInt64, min_rows_to_fsync_after_merge, 0, "Minimal number of rows to do fsync for part after merge (0 - disabled)", 0) \
M(UInt64, min_compressed_bytes_to_fsync_after_merge, 0, "Minimal number of compressed bytes to do fsync for part after merge (0 - disabled)", 0) \

View File

@ -8,12 +8,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_CREATE_IO_BUFFER;
}
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
@ -37,9 +31,6 @@ void RabbitMQBlockOutputStream::writePrefix()
storage.unbindExchange();
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create RabbitMQ producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
buffer->activateWriting();
auto format_settings = getFormatSettings(context);

View File

@ -55,7 +55,6 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
{
loop = std::make_unique<uv_loop_t>();
uv_loop_init(loop.get());
event_handler = std::make_unique<RabbitMQHandler>(loop.get(), log);
@ -85,6 +84,8 @@ WriteBufferToRabbitMQProducer::WriteBufferToRabbitMQProducer(
key_arguments[matching[0]] = matching[1];
}
}
reinitializeChunks();
}
@ -122,9 +123,7 @@ void WriteBufferToRabbitMQProducer::countRow()
payload.append(last_chunk, 0, last_chunk_size);
rows = 0;
chunks.clear();
set(nullptr, 0);
reinitializeChunks();
++payload_counter;
payloads.push(std::make_pair(payload_counter, payload));
@ -321,17 +320,32 @@ void WriteBufferToRabbitMQProducer::writingFunc()
setupChannel();
}
LOG_DEBUG(log, "Prodcuer on channel {} completed", channel_id);
LOG_DEBUG(log, "Producer on channel {} completed", channel_id);
}
void WriteBufferToRabbitMQProducer::nextImpl()
{
addChunk();
}
void WriteBufferToRabbitMQProducer::addChunk()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
void WriteBufferToRabbitMQProducer::reinitializeChunks()
{
rows = 0;
chunks.clear();
/// We cannot leave the buffer in the undefined state (i.e. without any
/// underlying buffer), since in this case the WriteBuffeR::next() will
/// not call our nextImpl() (due to available() == 0)
addChunk();
}
void WriteBufferToRabbitMQProducer::iterateEventLoop()
{

View File

@ -41,6 +41,9 @@ public:
private:
void nextImpl() override;
void addChunk();
void reinitializeChunks();
void iterateEventLoop();
void writingFunc();
bool setupConnection(bool reconnecting);

View File

@ -24,6 +24,7 @@
#include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Poco/File.h>
#include <Poco/Path.h>
@ -44,9 +45,12 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
using FieldVectorPtr = std::shared_ptr<FieldVector>;
// returns keys may be filter by condition
static bool traverseASTFilter(const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res)
static bool traverseASTFilter(
const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVectorPtr & res)
{
const auto * function = elem->as<ASTFunction>();
if (!function)
@ -63,13 +67,9 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
else if (function->name == "or")
{
// make sure every child has the key filter condition
FieldVector child_res;
for (const auto & child : function->arguments->children)
{
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, child_res))
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, res))
return false;
}
res.insert(res.end(), child_res.begin(), child_res.end());
return true;
}
else if (function->name == "equals" || function->name == "in")
@ -108,9 +108,7 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
prepared_set->checkColumnsNumber(1);
const auto & set_column = *prepared_set->getSetElements()[0];
for (size_t row = 0; row < set_column.size(); ++row)
{
res.push_back(set_column[row]);
}
res->push_back(set_column[row]);
return true;
}
else
@ -125,10 +123,12 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
if (ident->name() != primary_key)
return false;
//function->name == "equals"
/// function->name == "equals"
if (const auto * literal = value->as<ASTLiteral>())
{
res.push_back(literal->value);
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
if (!converted_field.isNull())
res->push_back(converted_field);
return true;
}
}
@ -140,14 +140,14 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
* TODO support key like search
*/
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info)
static std::pair<FieldVectorPtr, bool> getFilterKeys(
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (!select.where())
{
return std::make_pair(FieldVector{}, true);
}
FieldVector res;
return {{}, true};
FieldVectorPtr res = std::make_shared<FieldVector>();
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, res);
return std::make_pair(res, !matched_keys);
}
@ -159,23 +159,19 @@ public:
EmbeddedRocksDBSource(
const StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const FieldVector & keys_,
const size_t start_,
const size_t end_,
FieldVectorPtr keys_,
FieldVector::const_iterator begin_,
FieldVector::const_iterator end_,
const size_t max_block_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, start(start_)
, keys(std::move(keys_))
, begin(begin_)
, end(end_)
, it(begin)
, max_block_size(max_block_size_)
{
// slice the keys
if (end > start)
{
keys.resize(end - start);
std::copy(keys_.begin() + start, keys_.begin() + end, keys.begin());
}
}
String getName() const override
@ -185,27 +181,34 @@ public:
Chunk generate() override
{
if (processed_keys >= keys.size() || (start == end))
if (it >= end)
return {};
std::vector<rocksdb::Slice> slices_keys;
slices_keys.reserve(keys.size());
std::vector<String> values;
std::vector<WriteBufferFromOwnString> wbs(keys.size());
size_t num_keys = end - begin;
std::vector<std::string> serialized_keys(num_keys);
std::vector<rocksdb::Slice> slices_keys(num_keys);
const auto & sample_block = metadata_snapshot->getSampleBlock();
const auto & key_column = sample_block.getByName(storage.primary_key);
auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key);
for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + max_block_size); ++i)
size_t rows_processed = 0;
while (it < end && rows_processed < max_block_size)
{
key_column.type->serializeBinary(keys[i], wbs[i]);
auto str_ref = wbs[i].stringRef();
slices_keys.emplace_back(str_ref.data, str_ref.size);
WriteBufferFromString wb(serialized_keys[rows_processed]);
key_column.type->serializeBinary(*it, wb);
wb.finalize();
slices_keys[rows_processed] = std::move(serialized_keys[rows_processed]);
++it;
++rows_processed;
}
std::vector<String> values;
auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
@ -221,7 +224,6 @@ public:
}
}
}
processed_keys += max_block_size;
UInt64 num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
@ -231,12 +233,11 @@ private:
const StorageEmbeddedRocksDB & storage;
const StorageMetadataPtr metadata_snapshot;
const size_t start;
const size_t end;
FieldVectorPtr keys;
FieldVector::const_iterator begin;
FieldVector::const_iterator end;
FieldVector::const_iterator it;
const size_t max_block_size;
FieldVector keys;
size_t processed_keys = 0;
};
@ -289,7 +290,8 @@ Pipe StorageEmbeddedRocksDB::read(
unsigned num_streams)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
FieldVector keys;
FieldVectorPtr keys;
bool all_scan = false;
auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type;
@ -302,37 +304,34 @@ Pipe StorageEmbeddedRocksDB::read(
}
else
{
if (keys.empty())
if (keys->empty())
return {};
std::sort(keys.begin(), keys.end());
auto unique_iter = std::unique(keys.begin(), keys.end());
if (unique_iter != keys.end())
keys.erase(unique_iter, keys.end());
std::sort(keys->begin(), keys->end());
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
Pipes pipes;
size_t start = 0;
size_t end;
const size_t num_threads = std::min(size_t(num_streams), keys.size());
const size_t batch_per_size = ceil(keys.size() * 1.0 / num_threads);
size_t num_keys = keys->size();
size_t num_threads = std::min(size_t(num_streams), keys->size());
for (size_t t = 0; t < num_threads; ++t)
assert(num_keys <= std::numeric_limits<uint32_t>::max());
assert(num_threads <= std::numeric_limits<uint32_t>::max());
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
{
if (start >= keys.size())
start = end = 0;
else
end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size;
size_t begin = num_keys * thread_idx / num_threads;
size_t end = num_keys * (thread_idx + 1) / num_threads;
pipes.emplace_back(
std::make_shared<EmbeddedRocksDBSource>(*this, metadata_snapshot, keys, start, end, max_block_size));
start += batch_per_size;
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
*this, metadata_snapshot, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
}
return Pipe::unitePipes(std::move(pipes));
}
}
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<EmbeddedRocksDBBlockOutputStream>(*this, metadata_snapshot);
}

View File

@ -175,38 +175,43 @@ def test_insert_distributed_async_send_different_header(batch):
create_tables('insert_distributed_async_send_cluster_two_shards')
node = get_node(batch)
node.query("INSERT INTO dist VALUES (0, '')", settings={
node.query("INSERT INTO dist VALUES (0, 'f')", settings={
'prefer_localhost_replica': 0,
})
node.query('ALTER TABLE dist MODIFY COLUMN value Nullable(String)')
node.query("INSERT INTO dist VALUES (2, '')", settings={
node.query('ALTER TABLE dist MODIFY COLUMN value UInt64')
node.query("INSERT INTO dist VALUES (2, 1)", settings={
'prefer_localhost_replica': 0,
})
n1.query('ALTER TABLE data MODIFY COLUMN value UInt64', settings={
'mutations_sync': 1,
})
if batch:
# first batch with Nullable(String)
n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={
'mutations_sync': 1,
})
# but only one batch will be sent
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: String to Nullable\(String\)\. Stack trace:"):
# but only one batch will be sent, and first is with UInt64 column, so
# one rows inserted, and for string ('f') exception will be throw.
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 1
# second batch with String
n1.query('ALTER TABLE data MODIFY COLUMN value String', settings={
'mutations_sync': 1,
})
# but once underlying column String, implicit conversion will do the
# thing, and insert left batch.
n1.query("""
DROP TABLE data SYNC;
CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key;
""")
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 2
else:
# first send with String
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: Nullable\(String\) to String\. Stack trace:"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 1
# second send with Nullable(String)
n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={
'mutations_sync': 1,
})
else:
# first send with String ('f'), so zero rows will be inserted
with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"):
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 0
# but once underlying column String, implicit conversion will do the
# thing, and insert 2 rows (mixed UInt64 and String).
n1.query("""
DROP TABLE data SYNC;
CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key;
""")
node.query('SYSTEM FLUSH DISTRIBUTED dist')
assert int(n1.query('SELECT count() FROM data')) == 2

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<settings>
<allow_experimental_map_type>1</allow_experimental_map_type>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<create_query>DROP TABLE IF EXISTS perf_lc_str</create_query>
<create_query>CREATE TABLE perf_lc_str(
str LowCardinality(String),

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<settings>
<max_memory_usage>15G</max_memory_usage>
</settings>

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<query>SELECT count() FROM zeros(10000000) WHERE NOT ignore(if(rand() % 2, toDateTime('2019-02-04 01:24:31'), toDate('2019-02-04')))</query>
<query>SELECT count() FROM zeros(10000000) WHERE NOT ignore(multiIf(rand() % 2, toDateTime('2019-02-04 01:24:31'), toDate('2019-02-04')))</query>
<query>SELECT count() FROM zeros(10000000) WHERE NOT ignore(if(rand() % 2, [toDateTime('2019-02-04 01:24:31')], [toDate('2019-02-04')]))</query>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<settings>
<max_memory_usage>15G</max_memory_usage>
</settings>

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<substitutions>
<substitution>
<name>gp_hash_func</name>

View File

@ -1,19 +1,24 @@
<test max_ignored_relative_change="0.2">
<settings>
<max_insert_threads>8</max_insert_threads>
</settings>
<create_query>
CREATE TABLE a
(
d Date,
os String
)
ENGINE = MergeTree
PARTITION BY d
ORDER BY d
CREATE TABLE a
(
d Date,
os String,
n UInt64
)
ENGINE = MergeTree
PARTITION BY d
ORDER BY (d, n)
</create_query>
<fill_query>insert into a select '2000-01-01', ['aa','bb','cc','dd'][number % 4 + 1] from numbers(100000000)</fill_query>
<fill_query>insert into a select '2000-01-02', ['aa','bb','cc','dd'][number % 4 + 1] from numbers(100000000)</fill_query>
<fill_query>insert into a select '2000-01-03', ['aa','bb','cc','dd'][number % 4 + 1] from numbers(100000000)</fill_query>
<fill_query>insert into a select '2000-01-04', ['aa','bb','cc','dd'][number % 4 + 1] from numbers(100000000)</fill_query>
<fill_query>insert into a select '2000-01-01', ['aa','bb','cc','dd'][number % 4 + 1], number from numbers_mt(100000000)</fill_query>
<fill_query>insert into a select '2000-01-02', ['aa','bb','cc','dd'][number % 4 + 1], number from numbers_mt(100000000)</fill_query>
<fill_query>insert into a select '2000-01-03', ['aa','bb','cc','dd'][number % 4 + 1], number from numbers_mt(100000000)</fill_query>
<fill_query>insert into a select '2000-01-04', ['aa','bb','cc','dd'][number % 4 + 1], number from numbers_mt(100000000)</fill_query>
<fill_query>OPTIMIZE TABLE a FINAL</fill_query>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<query>
WITH
bitXor(number, 0x4CF2D2BAAE6DA887) AS x0,

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.9">
<test max_ignored_relative_change="1.3">
<create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query>
<settings>

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<substitutions>
<substitution>
<name>format</name>

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.3">
<query>SELECT sumOrNull(number) FROM numbers(100000000)</query>
<query>SELECT sumOrDefault(toNullable(number)) FROM numbers(100000000)</query>
<query>SELECT sumOrNull(number) FROM numbers(10000000) GROUP BY number % 1024</query>

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<create_query>
CREATE TABLE hits_wide AS hits_10m_single ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<create_query>
CREATE TABLE hits_wide AS hits_10m_single ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)

View File

@ -1,4 +1,4 @@
<test>
<test max_ignored_relative_change="0.2">
<create_query>
CREATE TABLE hits_wide AS hits_10m_single ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<settings>
<optimize_aggregation_in_order>1</optimize_aggregation_in_order>
<optimize_read_in_order>1</optimize_read_in_order>

View File

@ -230,11 +230,11 @@
toInt256(number) as d,
toString(number) as f,
toFixedString(f, 20) as g
FROM numbers_mt(200000000)
FROM numbers_mt(20000000)
SETTINGS max_threads = 8
FORMAT Null
</query>
<query>
SELECT
reinterpretAsFixedString(a),
@ -249,7 +249,7 @@
toInt256(number) as d,
toString(number) as f,
toFixedString(f, 20) as g
FROM numbers_mt(200000000)
FROM numbers_mt(100000000)
SETTINGS max_threads = 8
FORMAT Null
</query>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.2">
<test>
<create_query>CREATE TABLE test_in (`a` UInt32) ENGINE = MergeTree() ORDER BY a</create_query>
<fill_query>INSERT INTO test_in SELECT number FROM numbers(500000000)</fill_query>

View File

@ -1,4 +1,4 @@
<test max_ignored_relative_change="0.7">
<test>
<preconditions>
<table_exists>hits_100m_single</table_exists>

View File

@ -25,7 +25,31 @@
select *
from (
select CounterID, UserID, count(*) user_hits,
count() over (partition by CounterID order by user_hits desc)
count()
over (partition by CounterID order by user_hits desc
rows unbounded preceding)
user_rank
from hits_100m_single
where CounterID < 10000
group by CounterID, UserID
)
where user_rank <= 10
format Null
]]></query>
<!--
The RANGE version should give (almost) the same result, because counts
for the top ranking users are probably different, so the ranks won't be
influenced by grouping. But it is going to be slower than ROWS because
of the additional work of finding the group boundaries.
-->
<query><![CDATA[
select *
from (
select CounterID, UserID, count(*) user_hits,
count()
over (partition by CounterID order by user_hits desc
range unbounded preceding)
user_rank
from hits_100m_single
where CounterID < 10000

View File

@ -1,6 +1,9 @@
DROP TABLE IF EXISTS dist_00967;
DROP TABLE IF EXISTS underlying_00967;
-- To suppress "Structure does not match (...), implicit conversion will be done." message
SET send_logs_level='error';
CREATE TABLE dist_00967 (key UInt64) Engine=Distributed('test_shard_localhost', currentDatabase(), underlying_00967);
-- fails for TinyLog()/MergeTree()/... but not for Memory()
CREATE TABLE underlying_00967 (key Nullable(UInt64)) Engine=TinyLog();

View File

@ -18,6 +18,9 @@ DROP TABLE tmp;
DETACH DATABASE test_01457;
ATTACH DATABASE test_01457;
-- To suppress "Structure does not match (...), implicit conversion will be done." message
SET send_logs_level='error';
CREATE TABLE tmp (n Int8) ENGINE=Memory;
INSERT INTO test_01457.tf_remote_explicit_structure VALUES ('42');
SELECT * FROM tmp;

View File

@ -23,7 +23,7 @@ Expression (Projection)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
SELECT
timestamp,
key
@ -37,7 +37,7 @@ Expression (Projection)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
SELECT
timestamp,
key

View File

@ -35,18 +35,18 @@ Expression (Projection)
Expression ((Before ORDER BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
Expression (Projection)
Limit (preliminary LIMIT)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
optimize_aggregation_in_order
Expression ((Projection + Before ORDER BY))
Aggregating
@ -58,17 +58,17 @@ Expression ((Projection + Before ORDER BY))
Expression ((Before GROUP BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
second-index
1
1

View File

@ -516,3 +516,196 @@ settings max_block_size = 2;
27 27 29 29
27 27 29 29
30 30 30 30
-- ROWS offset frame start
select number, p,
count(*) over (partition by p order by number
rows between 1 preceding and unbounded following),
count(*) over (partition by p order by number
rows between current row and unbounded following),
count(*) over (partition by p order by number
rows between 1 following and unbounded following)
from (select number, intDiv(number, 5) p from numbers(31))
order by p, number
settings max_block_size = 2;
0 0 5 5 4
1 0 5 4 3
2 0 4 3 2
3 0 3 2 1
4 0 2 1 0
5 1 5 5 4
6 1 5 4 3
7 1 4 3 2
8 1 3 2 1
9 1 2 1 0
10 2 5 5 4
11 2 5 4 3
12 2 4 3 2
13 2 3 2 1
14 2 2 1 0
15 3 5 5 4
16 3 5 4 3
17 3 4 3 2
18 3 3 2 1
19 3 2 1 0
20 4 5 5 4
21 4 5 4 3
22 4 4 3 2
23 4 3 2 1
24 4 2 1 0
25 5 5 5 4
26 5 5 4 3
27 5 4 3 2
28 5 3 2 1
29 5 2 1 0
30 6 1 1 0
-- ROWS offset frame start and end
select number, p,
count(*) over (partition by p order by number
rows between 2 preceding and 2 following)
from (select number, intDiv(number, 7) p from numbers(71))
order by p, number
settings max_block_size = 2;
0 0 3
1 0 4
2 0 5
3 0 5
4 0 5
5 0 4
6 0 3
7 1 3
8 1 4
9 1 5
10 1 5
11 1 5
12 1 4
13 1 3
14 2 3
15 2 4
16 2 5
17 2 5
18 2 5
19 2 4
20 2 3
21 3 3
22 3 4
23 3 5
24 3 5
25 3 5
26 3 4
27 3 3
28 4 3
29 4 4
30 4 5
31 4 5
32 4 5
33 4 4
34 4 3
35 5 3
36 5 4
37 5 5
38 5 5
39 5 5
40 5 4
41 5 3
42 6 3
43 6 4
44 6 5
45 6 5
46 6 5
47 6 4
48 6 3
49 7 3
50 7 4
51 7 5
52 7 5
53 7 5
54 7 4
55 7 3
56 8 3
57 8 4
58 8 5
59 8 5
60 8 5
61 8 4
62 8 3
63 9 3
64 9 4
65 9 5
66 9 5
67 9 5
68 9 4
69 9 3
70 10 1
SELECT count(*) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM numbers(4);
1
2
3
3
-- frame boundaries that runs into the partition end
select
count() over (partition by intDiv(number, 3)
rows between 100 following and unbounded following),
count() over (partition by intDiv(number, 3)
rows between current row and 100 following)
from numbers(10);
0 3
0 2
0 1
0 3
0 2
0 1
0 3
0 2
0 1
0 1
-- seen a use-after-free under MSan in this query once
SELECT number, max(number) OVER (PARTITION BY intDiv(number, 7) ORDER BY number ASC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM numbers(1024) SETTINGS max_block_size = 2 FORMAT Null;
-- a corner case
select count() over ();
1
-- RANGE CURRENT ROW frame start
select number, p, o,
count(*) over (partition by p order by o
range between current row and unbounded following)
from (select number, intDiv(number, 5) p, mod(number, 3) o
from numbers(31))
order by p, o, number
settings max_block_size = 2;
0 0 0 5
3 0 0 5
1 0 1 3
4 0 1 3
2 0 2 1
6 1 0 5
9 1 0 5
7 1 1 3
5 1 2 2
8 1 2 2
12 2 0 5
10 2 1 4
13 2 1 4
11 2 2 2
14 2 2 2
15 3 0 5
18 3 0 5
16 3 1 3
19 3 1 3
17 3 2 1
21 4 0 5
24 4 0 5
22 4 1 3
20 4 2 2
23 4 2 2
27 5 0 5
25 5 1 4
28 5 1 4
26 5 2 2
29 5 2 2
30 6 0 1
select
count(*) over (rows between current row and current row),
count(*) over (range between current row and current row)
from numbers(3);
1 3
1 3
1 3

View File

@ -163,3 +163,52 @@ window
rows between unbounded preceding and unbounded following)
settings max_block_size = 2;
-- ROWS offset frame start
select number, p,
count(*) over (partition by p order by number
rows between 1 preceding and unbounded following),
count(*) over (partition by p order by number
rows between current row and unbounded following),
count(*) over (partition by p order by number
rows between 1 following and unbounded following)
from (select number, intDiv(number, 5) p from numbers(31))
order by p, number
settings max_block_size = 2;
-- ROWS offset frame start and end
select number, p,
count(*) over (partition by p order by number
rows between 2 preceding and 2 following)
from (select number, intDiv(number, 7) p from numbers(71))
order by p, number
settings max_block_size = 2;
SELECT count(*) OVER (ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM numbers(4);
-- frame boundaries that runs into the partition end
select
count() over (partition by intDiv(number, 3)
rows between 100 following and unbounded following),
count() over (partition by intDiv(number, 3)
rows between current row and 100 following)
from numbers(10);
-- seen a use-after-free under MSan in this query once
SELECT number, max(number) OVER (PARTITION BY intDiv(number, 7) ORDER BY number ASC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM numbers(1024) SETTINGS max_block_size = 2 FORMAT Null;
-- a corner case
select count() over ();
-- RANGE CURRENT ROW frame start
select number, p, o,
count(*) over (partition by p order by o
range between current row and unbounded following)
from (select number, intDiv(number, 5) p, mod(number, 3) o
from numbers(31))
order by p, o, number
settings max_block_size = 2;
select
count(*) over (rows between current row and current row),
count(*) over (range between current row and current row)
from numbers(3);

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS tmp_01683;
DROP TABLE IF EXISTS dist_01683;
SET prefer_localhost_replica=0;
-- To suppress "Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done."
SET send_logs_level='error';
CREATE TABLE tmp_01683 (n Int8) ENGINE=Memory;
CREATE TABLE dist_01683 (n UInt64) Engine=Distributed(test_cluster_two_shards, currentDatabase(), tmp_01683, n);
SET insert_distributed_sync=1;
INSERT INTO dist_01683 VALUES (1),(2);
SET insert_distributed_sync=0;
INSERT INTO dist_01683 VALUES (1),(2);
SYSTEM FLUSH DISTRIBUTED dist_01683;
-- TODO: cover distributed_directory_monitor_batch_inserts=1
SELECT * FROM tmp_01683 ORDER BY n;
DROP TABLE tmp_01683;
DROP TABLE dist_01683;

View File

@ -0,0 +1,15 @@
123 Hello, world (123)
--
--
123 Hello, world (123)
4567 Hello, world (4567)
--
--
0 Hello, world (0)
--
123 Hello, world (123)
456 Hello, world (456)
--
99 Hello, world (99)
999 Hello, world (999)
9999 Hello, world (9999)

View File

@ -0,0 +1,27 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt64, value String) Engine=EmbeddedRocksDB PRIMARY KEY(key);
INSERT INTO test SELECT number, format('Hello, world ({})', toString(number)) FROM numbers(10000);
SELECT * FROM test WHERE key = 123;
SELECT '--';
SELECT * FROM test WHERE key = -123;
SELECT '--';
SELECT * FROM test WHERE key = 123 OR key = 4567 ORDER BY key;
SELECT '--';
SELECT * FROM test WHERE key = NULL;
SELECT '--';
SELECT * FROM test WHERE key = NULL OR key = 0;
SELECT '--';
SELECT * FROM test WHERE key IN (123, 456, -123) ORDER BY key;
SELECT '--';
SELECT * FROM test WHERE key = 'Hello'; -- { serverError 53 }
DETACH TABLE test NO DELAY;
ATTACH TABLE test;
SELECT * FROM test WHERE key IN (99, 999, 9999, -123) ORDER BY key;
DROP TABLE IF EXISTS test;

View File

@ -0,0 +1,2 @@
[0]
[0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1]

View File

@ -0,0 +1,31 @@
-- NOTE: that due to overflows it may give different result before
-- quantilesTimingWeighted() had been converted to double:
--
-- Before:
--
-- SELECT quantilesTimingWeighted(1)(number, 9223372036854775807)
-- FROM numbers(2)
--
-- ┌─quantilesTimingWeighted(1)(number, 9223372036854775807)─┐
-- │ [1] │
-- └─────────────────────────────────────────────────────────┘
--
-- After:
--
-- SELECT quantilesTimingWeighted(1)(number, 9223372036854775807)
-- FROM numbers(2)
--
-- ┌─quantilesTimingWeighted(1)(number, 9223372036854775807)─┐
-- │ [0] │
-- └─────────────────────────────────────────────────────────┘
SELECT quantilesTimingWeighted(0.1)(number, 9223372036854775807) FROM numbers(2);
-- same UB, but in the inner loop
SELECT quantilesTimingWeighted(0, 0.001, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99, 0.999, 1)(number, 9223372036854775807)
FROM
(
SELECT number
FROM system.numbers
LIMIT 100
);

View File

@ -0,0 +1,183 @@
DST boundary test for Europe/Moscow:
0 1981-04-01 22:40:00 10800 355002000
1 1981-04-01 22:50:00 10800 355002600
2 1981-04-02 00:00:00 14400 355003200
3 1981-04-02 00:10:00 14400 355003800
0 1981-09-30 23:00:00 14400 370724400
1 1981-09-30 23:10:00 14400 370725000
2 1981-09-30 23:20:00 14400 370725600
3 1981-09-30 23:30:00 14400 370726200
4 1981-09-30 23:40:00 14400 370726800
5 1981-09-30 23:50:00 14400 370727400
6 1981-09-30 23:00:00 10800 370728000
7 1981-09-30 23:10:00 10800 370728600
8 1981-09-30 23:20:00 10800 370729200
9 1981-09-30 23:30:00 10800 370729800
10 1981-09-30 23:40:00 10800 370730400
11 1981-09-30 23:50:00 10800 370731000
12 1981-10-01 00:00:00 10800 370731600
13 1981-10-01 00:10:00 10800 370732200
14 1981-10-01 00:20:00 10800 370732800
15 1981-10-01 00:30:00 10800 370733400
16 1981-10-01 00:40:00 10800 370734000
17 1981-10-01 00:50:00 10800 370734600
DST boundary test for Asia/Tehran:
0 2020-03-21 22:40:00 12600 1584817800
1 2020-03-21 22:50:00 12600 1584818400
2 2020-03-22 00:00:00 16200 1584819000
3 2020-03-22 00:10:00 16200 1584819600
0 2020-09-20 23:00:00 16200 1600626600
1 2020-09-20 23:10:00 16200 1600627200
2 2020-09-20 23:20:00 16200 1600627800
3 2020-09-20 23:30:00 16200 1600628400
4 2020-09-20 23:40:00 16200 1600629000
5 2020-09-20 23:50:00 16200 1600629600
6 2020-09-20 23:00:00 12600 1600630200
7 2020-09-20 23:10:00 12600 1600630800
8 2020-09-20 23:20:00 12600 1600631400
9 2020-09-20 23:30:00 12600 1600632000
10 2020-09-20 23:40:00 12600 1600632600
11 2020-09-20 23:50:00 12600 1600633200
12 2020-09-21 00:00:00 12600 1600633800
13 2020-09-21 00:10:00 12600 1600634400
14 2020-09-21 00:20:00 12600 1600635000
15 2020-09-21 00:30:00 12600 1600635600
16 2020-09-21 00:40:00 12600 1600636200
17 2020-09-21 00:50:00 12600 1600636800
DST boundary test for Australia/Lord_Howe. This is a special timezone with DST offset is 30mins with the timezone epoc also lays at half hour
37800
39600
DST boundary test for Australia/Lord_Howe:
0 2020-10-04 01:40:00 37800 1601737800
1 2020-10-04 01:50:00 37800 1601738400
2 2020-10-04 02:00:00 39600 1601739000
3 2020-10-04 02:10:00 39600 1601739600
0 2019-04-07 01:00:00 39600 1554559200
1 2019-04-07 01:10:00 39600 1554559800
2 2019-04-07 01:20:00 39600 1554560400
3 2019-04-07 01:30:00 39600 1554561000
4 2019-04-07 01:40:00 39600 1554561600
5 2019-04-07 01:50:00 39600 1554562200
6 2019-04-07 01:00:00 37800 1554562800
7 2019-04-07 01:10:00 37800 1554563400
8 2019-04-07 01:20:00 37800 1554564000
9 2019-04-07 02:30:00 37800 1554564600
10 2019-04-07 02:40:00 37800 1554565200
11 2019-04-07 02:50:00 37800 1554565800
12 2019-04-07 02:00:00 37800 1554566400
13 2019-04-07 02:10:00 37800 1554567000
14 2019-04-07 02:20:00 37800 1554567600
15 2019-04-07 03:30:00 37800 1554568200
16 2019-04-07 03:40:00 37800 1554568800
17 2019-04-07 03:50:00 37800 1554569400
4 days test in batch comparing with manually computation result for Europe/Moscow:
4 days test in batch comparing with manually computation result for Asia/Tehran:
The result maybe wrong for toDateTime processing Australia/Lord_Howe
1601739000 2020-10-04 02:00:00 39600 37800
1601739600 2020-10-04 02:10:00 39600 37800
1601740200 2020-10-04 02:20:00 39600 37800
1601740800 2020-10-04 03:30:00 39600 41400
1601741400 2020-10-04 03:40:00 39600 41400
1601742000 2020-10-04 03:50:00 39600 41400
1601742600 2020-10-04 03:00:00 39600 37800
1601743200 2020-10-04 03:10:00 39600 37800
1601743800 2020-10-04 03:20:00 39600 37800
1601744400 2020-10-04 04:30:00 39600 41400
1601745000 2020-10-04 04:40:00 39600 41400
1601745600 2020-10-04 04:50:00 39600 41400
1601746200 2020-10-04 04:00:00 39600 37800
1601746800 2020-10-04 04:10:00 39600 37800
1601747400 2020-10-04 04:20:00 39600 37800
1601748000 2020-10-04 05:30:00 39600 41400
1554562800 2019-04-07 01:00:00 37800 36000
1554563400 2019-04-07 01:10:00 37800 36000
1554564000 2019-04-07 01:20:00 37800 36000
1554564600 2019-04-07 02:30:00 37800 39600
1554565200 2019-04-07 02:40:00 37800 39600
1554565800 2019-04-07 02:50:00 37800 39600
1554566400 2019-04-07 02:00:00 37800 36000
1554567000 2019-04-07 02:10:00 37800 36000
1554567600 2019-04-07 02:20:00 37800 36000
1554568200 2019-04-07 03:30:00 37800 39600
1554568800 2019-04-07 03:40:00 37800 39600
1554569400 2019-04-07 03:50:00 37800 39600
Moscow DST Years:
11 1981-06-01 00:00:00 14400
12 1982-06-01 00:00:00 14400
13 1983-06-01 00:00:00 14400
14 1984-06-01 00:00:00 14400
15 1985-06-01 00:00:00 14400
16 1986-06-01 00:00:00 14400
17 1987-06-01 00:00:00 14400
18 1988-06-01 00:00:00 14400
19 1989-06-01 00:00:00 14400
20 1990-06-01 00:00:00 14400
22 1992-06-01 00:00:00 14400
23 1993-06-01 00:00:00 14400
24 1994-06-01 00:00:00 14400
25 1995-06-01 00:00:00 14400
26 1996-06-01 00:00:00 14400
27 1997-06-01 00:00:00 14400
28 1998-06-01 00:00:00 14400
29 1999-06-01 00:00:00 14400
30 2000-06-01 00:00:00 14400
31 2001-06-01 00:00:00 14400
32 2002-06-01 00:00:00 14400
33 2003-06-01 00:00:00 14400
34 2004-06-01 00:00:00 14400
35 2005-06-01 00:00:00 14400
36 2006-06-01 00:00:00 14400
37 2007-06-01 00:00:00 14400
38 2008-06-01 00:00:00 14400
39 2009-06-01 00:00:00 14400
40 2010-06-01 00:00:00 14400
41 2011-06-01 00:00:00 14400
42 2012-06-01 00:00:00 14400
43 2013-06-01 00:00:00 14400
44 2014-06-01 00:00:00 14400
Moscow DST Years with perment DST from 2011-2014:
2011-01-01 00:00:00 2011-03-27 00:00:00 86 2011_10800
2011-03-28 00:00:00 2011-12-31 00:00:00 279 2011_14400
2012-01-01 00:00:00 2012-12-31 00:00:00 366 2012_14400
2013-01-01 00:00:00 2013-12-31 00:00:00 365 2013_14400
2014-01-01 00:00:00 2014-10-26 00:00:00 299 2014_14400
2014-10-27 00:00:00 2014-12-31 00:00:00 66 2014_10800
Tehran DST Years:
8 1978-06-01 00:00:00 18000
9 1979-06-01 00:00:00 16200
10 1980-06-01 00:00:00 16200
21 1991-06-01 00:00:00 16200
22 1992-06-01 00:00:00 16200
23 1993-06-01 00:00:00 16200
24 1994-06-01 00:00:00 16200
25 1995-06-01 00:00:00 16200
26 1996-06-01 00:00:00 16200
27 1997-06-01 00:00:00 16200
28 1998-06-01 00:00:00 16200
29 1999-06-01 00:00:00 16200
30 2000-06-01 00:00:00 16200
31 2001-06-01 00:00:00 16200
32 2002-06-01 00:00:00 16200
33 2003-06-01 00:00:00 16200
34 2004-06-01 00:00:00 16200
35 2005-06-01 00:00:00 16200
38 2008-06-01 00:00:00 16200
39 2009-06-01 00:00:00 16200
40 2010-06-01 00:00:00 16200
41 2011-06-01 00:00:00 16200
42 2012-06-01 00:00:00 16200
43 2013-06-01 00:00:00 16200
44 2014-06-01 00:00:00 16200
45 2015-06-01 00:00:00 16200
46 2016-06-01 00:00:00 16200
47 2017-06-01 00:00:00 16200
48 2018-06-01 00:00:00 16200
49 2019-06-01 00:00:00 16200
50 2020-06-01 00:00:00 16200
Shanghai DST Years:
16 1986-08-01 00:00:00 32400
17 1987-08-01 00:00:00 32400
18 1988-08-01 00:00:00 32400
19 1989-08-01 00:00:00 32400
20 1990-08-01 00:00:00 32400
21 1991-08-01 00:00:00 32400

View File

@ -0,0 +1,46 @@
/* Test the DST(daylight saving time) offset changing boundary*/
SELECT 'DST boundary test for Europe/Moscow:';
SELECT number,(toDateTime('1981-04-01 22:40:00', 'Europe/Moscow') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(4);
SELECT number,(toDateTime('1981-09-30 23:00:00', 'Europe/Moscow') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(18);
SELECT 'DST boundary test for Asia/Tehran:';
SELECT number,(toDateTime('2020-03-21 22:40:00', 'Asia/Tehran') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(4);
SELECT number,(toDateTime('2020-09-20 23:00:00', 'Asia/Tehran') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(18);
SELECT 'DST boundary test for Australia/Lord_Howe. This is a special timezone with DST offset is 30mins with the timezone epoc also lays at half hour';
SELECT timezoneOffset(toDateTime('2018-08-21 22:20:00', 'Australia/Lord_Howe'));
SELECT timezoneOffset(toDateTime('2018-02-21 22:20:00', 'Australia/Lord_Howe'));
SELECT 'DST boundary test for Australia/Lord_Howe:';
SELECT number,(toDateTime('2020-10-04 01:40:00', 'Australia/Lord_Howe') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(4);
SELECT number,(toDateTime('2019-04-07 01:00:00', 'Australia/Lord_Howe') + INTERVAL number * 600 SECOND) AS k, timezoneOffset(k) AS t, toUnixTimestamp(k) as s FROM numbers(18);
/* The Batch Part. Test period is whole 4 days*/
SELECT '4 days test in batch comparing with manually computation result for Europe/Moscow:';
SELECT toUnixTimestamp(x) as tt, (toDateTime('1981-04-01 00:00:00', 'Europe/Moscow') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(576) where res != calc;
SELECT toUnixTimestamp(x) as tt, (toDateTime('1981-09-30 00:00:00', 'Europe/Moscow') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(576) where res != calc;
SELECT '4 days test in batch comparing with manually computation result for Asia/Tehran:';
SELECT toUnixTimestamp(x) as tt, (toDateTime('2020-03-21 00:00:00', 'Asia/Tehran') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(576) where res != calc;
SELECT toUnixTimestamp(x) as tt, (toDateTime('2020-09-20 00:00:00', 'Asia/Tehran') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(576) where res != calc;
/* During this test we got unexpected result comes from the toDateTime() function when process the special time zone of 'Australia/Lord_Howe', which may be some kind of bugs. */
SELECT 'The result maybe wrong for toDateTime processing Australia/Lord_Howe';
SELECT toUnixTimestamp(x) as tt, (toDateTime('2020-10-04 01:40:00', 'Australia/Lord_Howe') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(18) where res != calc;
SELECT toUnixTimestamp(x) as tt, (toDateTime('2019-04-07 01:00:00', 'Australia/Lord_Howe') + INTERVAL number * 600 SECOND) AS x, timezoneOffset(x) as res,(toDateTime(toString(x), 'UTC') - x ) AS calc FROM numbers(18) where res != calc;
/* Find all the years had followed DST during given period*/
SELECT 'Moscow DST Years:';
SELECT number, (toDateTime('1970-06-01 00:00:00', 'Europe/Moscow') + INTERVAL number YEAR) AS DST_Y, timezoneOffset(DST_Y) AS t FROM numbers(51) where t != 10800;
SELECT 'Moscow DST Years with perment DST from 2011-2014:';
SELECT min((toDateTime('2011-01-01 00:00:00', 'Europe/Moscow') + INTERVAL number DAY) as day) as start, max(day) as end, count(1), concat(toString(toYear(day)),'_',toString(timezoneOffset(day)))as DST from numbers(365*4+1) group by DST order by start;
SELECT 'Tehran DST Years:';
SELECT number, (toDateTime('1970-06-01 00:00:00', 'Asia/Tehran') + INTERVAL number YEAR) AS DST_Y, timezoneOffset(DST_Y) AS t FROM numbers(51) where t != 12600;
SELECT 'Shanghai DST Years:';
SELECT number, (toDateTime('1970-08-01 00:00:00', 'Asia/Shanghai') + INTERVAL number YEAR) AS DST_Y, timezoneOffset(DST_Y) AS t FROM numbers(51) where t != 28800;

View File

@ -0,0 +1 @@
SELECT pointInPolygon((0, 0), [[(0, 0), (10, 10), (256, -9223372036854775808)]]) FORMAT Null;

View File

@ -200,3 +200,5 @@
01676_clickhouse_client_autocomplete
01671_aggregate_function_group_bitmap_data
01674_executable_dictionary_implicit_key
01686_rocksdb
01683_dist_INSERT_block_structure_mismatch