Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-11-14 21:11:25 +00:00
commit 5a04826fa5
46 changed files with 435 additions and 262 deletions

View File

@ -7,7 +7,6 @@
#include <random> #include <random>
#include <string_view> #include <string_view>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Poco/UUID.h>
#include <Poco/UUIDGenerator.h> #include <Poco/UUIDGenerator.h>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
@ -152,8 +151,6 @@ public:
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME)); global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial(); global_context->setQueryKindInitial();
std::cerr << std::fixed << std::setprecision(3);
/// This is needed to receive blocks with columns of AggregateFunction data type /// This is needed to receive blocks with columns of AggregateFunction data type
/// (example: when using stage = 'with_mergeable_state') /// (example: when using stage = 'with_mergeable_state')
registerAggregateFunctions(); registerAggregateFunctions();
@ -226,6 +223,8 @@ private:
ContextMutablePtr global_context; ContextMutablePtr global_context;
QueryProcessingStage::Enum query_processing_stage; QueryProcessingStage::Enum query_processing_stage;
WriteBufferFromFileDescriptor log{STDERR_FILENO};
std::atomic<size_t> consecutive_errors{0}; std::atomic<size_t> consecutive_errors{0};
/// Don't execute new queries after timelimit or SIGINT or exception /// Don't execute new queries after timelimit or SIGINT or exception
@ -303,16 +302,16 @@ private:
} }
std::cerr << "Loaded " << queries.size() << " queries.\n"; log << "Loaded " << queries.size() << " queries.\n" << flush;
} }
void printNumberOfQueriesExecuted(size_t num) void printNumberOfQueriesExecuted(size_t num)
{ {
std::cerr << "\nQueries executed: " << num; log << "\nQueries executed: " << num;
if (queries.size() > 1) if (queries.size() > 1)
std::cerr << " (" << (num * 100.0 / queries.size()) << "%)"; log << " (" << (num * 100.0 / queries.size()) << "%)";
std::cerr << ".\n"; log << ".\n" << flush;
} }
/// Try push new query and check cancellation conditions /// Try push new query and check cancellation conditions
@ -339,19 +338,19 @@ private:
if (interrupt_listener.check()) if (interrupt_listener.check())
{ {
std::cout << "Stopping launch of queries. SIGINT received." << std::endl; std::cout << "Stopping launch of queries. SIGINT received.\n";
return false; return false;
} }
}
double seconds = delay_watch.elapsedSeconds(); double seconds = delay_watch.elapsedSeconds();
if (delay > 0 && seconds > delay) if (delay > 0 && seconds > delay)
{ {
printNumberOfQueriesExecuted(queries_executed); printNumberOfQueriesExecuted(queries_executed);
cumulative cumulative
? report(comparison_info_total, total_watch.elapsedSeconds()) ? report(comparison_info_total, total_watch.elapsedSeconds())
: report(comparison_info_per_interval, seconds); : report(comparison_info_per_interval, seconds);
delay_watch.restart(); delay_watch.restart();
}
} }
return true; return true;
@ -438,16 +437,16 @@ private:
catch (...) catch (...)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
std::cerr << "An error occurred while processing the query " << "'" << query << "'" log << "An error occurred while processing the query " << "'" << query << "'"
<< ": " << getCurrentExceptionMessage(false) << std::endl; << ": " << getCurrentExceptionMessage(false) << '\n';
if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors)) if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors))
{ {
shutdown = true; shutdown = true;
throw; throw;
} }
std::cerr << getCurrentExceptionMessage(print_stacktrace, log << getCurrentExceptionMessage(print_stacktrace,
true /*check embedded stack trace*/) << std::endl; true /*check embedded stack trace*/) << '\n' << flush;
size_t info_index = round_robin ? 0 : connection_index; size_t info_index = round_robin ? 0 : connection_index;
++comparison_info_per_interval[info_index]->errors; ++comparison_info_per_interval[info_index]->errors;
@ -504,7 +503,7 @@ private:
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
std::cerr << "\n"; log << "\n";
for (size_t i = 0; i < infos.size(); ++i) for (size_t i = 0; i < infos.size(); ++i)
{ {
const auto & info = infos[i]; const auto & info = infos[i];
@ -524,31 +523,31 @@ private:
connection_description += conn->getDescription(); connection_description += conn->getDescription();
} }
} }
std::cerr log
<< connection_description << ", " << connection_description << ", "
<< "queries: " << info->queries << ", "; << "queries: " << info->queries.load() << ", ";
if (info->errors) if (info->errors)
{ {
std::cerr << "errors: " << info->errors << ", "; log << "errors: " << info->errors << ", ";
} }
std::cerr log
<< "QPS: " << (info->queries / seconds) << ", " << "QPS: " << fmt::format("{:.3f}", info->queries / seconds) << ", "
<< "RPS: " << (info->read_rows / seconds) << ", " << "RPS: " << fmt::format("{:.3f}", info->read_rows / seconds) << ", "
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", " << "MiB/s: " << fmt::format("{:.3f}", info->read_bytes / seconds / 1048576) << ", "
<< "result RPS: " << (info->result_rows / seconds) << ", " << "result RPS: " << fmt::format("{:.3f}", info->result_rows / seconds) << ", "
<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "." << "result MiB/s: " << fmt::format("{:.3f}", info->result_bytes / seconds / 1048576) << "."
<< "\n"; << "\n";
} }
std::cerr << "\n"; log << "\n";
auto print_percentile = [&](double percent) auto print_percentile = [&](double percent)
{ {
std::cerr << percent << "%\t\t"; log << percent << "%\t\t";
for (const auto & info : infos) for (const auto & info : infos)
{ {
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t"; log << fmt::format("{:.3f}", info->sampler.quantileNearest(percent / 100.0)) << " sec.\t";
} }
std::cerr << "\n"; log << "\n";
}; };
for (int percent = 0; percent <= 90; percent += 10) for (int percent = 0; percent <= 90; percent += 10)
@ -559,13 +558,15 @@ private:
print_percentile(99.9); print_percentile(99.9);
print_percentile(99.99); print_percentile(99.99);
std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n"; log << "\n" << t_test.compareAndReport(confidence).second << "\n";
if (!cumulative) if (!cumulative)
{ {
for (auto & info : infos) for (auto & info : infos)
info->clear(); info->clear();
} }
log.next();
} }
public: public:
@ -741,7 +742,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
} }
catch (...) catch (...)
{ {
std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl; std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << '\n';
return getCurrentExceptionCode(); return getCurrentExceptionCode();
} }
} }

View File

@ -22,13 +22,6 @@ namespace ErrorCodes
namespace namespace
{ {
/** Due to a lack of proper code review, this code was contributed with a multiplication of template instantiations
* over all pairs of data types, and we deeply regret that.
*
* We cannot remove all combinations, because the binary representation of serialized data has to remain the same,
* but we can partially heal the wound by treating unsigned and signed data types in the same way.
*/
template <typename ValueType, typename TimestampType> template <typename ValueType, typename TimestampType>
struct AggregationFunctionDeltaSumTimestampData struct AggregationFunctionDeltaSumTimestampData
{ {
@ -44,22 +37,23 @@ template <typename ValueType, typename TimestampType>
class AggregationFunctionDeltaSumTimestamp final class AggregationFunctionDeltaSumTimestamp final
: public IAggregateFunctionDataHelper< : public IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>, AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>> AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
>
{ {
public: public:
AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params) AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper< : IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>, AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{arguments, params, createResultType()} AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
{ >{arguments, params, createResultType()}
} {}
AggregationFunctionDeltaSumTimestamp() AggregationFunctionDeltaSumTimestamp()
: IAggregateFunctionDataHelper< : IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>, AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{} AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
{ >{}
} {}
bool allocatesMemoryInArena() const override { return false; } bool allocatesMemoryInArena() const override { return false; }
@ -69,8 +63,8 @@ public:
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{ {
auto value = unalignedLoad<ValueType>(columns[0]->getRawData().data() + row_num * sizeof(ValueType)); auto value = assert_cast<const ColumnVector<ValueType> &>(*columns[0]).getData()[row_num];
auto ts = unalignedLoad<TimestampType>(columns[1]->getRawData().data() + row_num * sizeof(TimestampType)); auto ts = assert_cast<const ColumnVector<TimestampType> &>(*columns[1]).getData()[row_num];
auto & data = this->data(place); auto & data = this->data(place);
@ -178,48 +172,10 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{ {
static_cast<ColumnFixedSizeHelper &>(to).template insertRawData<sizeof(ValueType)>( assert_cast<ColumnVector<ValueType> &>(to).getData().push_back(this->data(place).sum);
reinterpret_cast<const char *>(&this->data(place).sum));
} }
}; };
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
IAggregateFunction * createWithTwoTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
if (which.idx == TypeIndex::UInt32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
if (which.idx == TypeIndex::UInt64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
if (which.idx == TypeIndex::Float32) return new AggregateFunctionTemplate<FirstType, Float32>(args...);
if (which.idx == TypeIndex::Float64) return new AggregateFunctionTemplate<FirstType, Float64>(args...);
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
IAggregateFunction * createWithTwoTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
if (which.idx == TypeIndex::UInt8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Float32) return createWithTwoTypesSecond<Float32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Float64) return createWithTwoTypesSecond<Float64, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp( AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
const String & name, const String & name,
const DataTypes & arguments, const DataTypes & arguments,
@ -237,7 +193,7 @@ AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, " throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, "
"must be Int, Float, Date, DateTime", arguments[1]->getName(), name); "must be Int, Float, Date, DateTime", arguments[1]->getName(), name);
return AggregateFunctionPtr(createWithTwoTypes<AggregationFunctionDeltaSumTimestamp>( return AggregateFunctionPtr(createWithTwoNumericOrDateTypes<AggregationFunctionDeltaSumTimestamp>(
*arguments[0], *arguments[1], arguments, params)); *arguments[0], *arguments[1], arguments, params));
} }
} }

View File

@ -184,8 +184,36 @@ static IAggregateFunction * createWithDecimalType(const IDataType & argument_typ
} }
/** For template with two arguments. /** For template with two arguments.
* This is an extremely dangerous for code bloat - do not use.
*/ */
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
return createWithTwoNumericTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8)
return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Enum16)
return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs> template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoBasicNumericTypesSecond(const IDataType & second_type, TArgs && ... args) static IAggregateFunction * createWithTwoBasicNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
{ {
@ -209,6 +237,46 @@ static IAggregateFunction * createWithTwoBasicNumericTypes(const IDataType & fir
return nullptr; return nullptr;
} }
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericOrDateTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
return createWithTwoNumericOrDateTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8)
return createWithTwoNumericOrDateTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Enum16)
return createWithTwoNumericOrDateTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
if (which.idx == TypeIndex::Date)
return createWithTwoNumericOrDateTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::DateTime)
return createWithTwoNumericOrDateTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
template <template <typename> class AggregateFunctionTemplate, typename... TArgs> template <template <typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args) static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args)
{ {

View File

@ -140,8 +140,6 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
/// We don't do highlighting for foreign dialects, such as PRQL and Kusto. /// We don't do highlighting for foreign dialects, such as PRQL and Kusto.
/// Only normal ClickHouse SQL queries are highlighted. /// Only normal ClickHouse SQL queries are highlighted.
/// Currently we highlight only the first query in the multi-query mode.
ParserQuery parser(end, false, context.getSettingsRef()[Setting::implicit_select]); ParserQuery parser(end, false, context.getSettingsRef()[Setting::implicit_select]);
ASTPtr ast; ASTPtr ast;
bool parse_res = false; bool parse_res = false;

View File

@ -87,6 +87,7 @@ APPLY_FOR_FAILPOINTS(M, M, M, M)
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointInjection::fail_point_wait_channels; std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointInjection::fail_point_wait_channels;
std::mutex FailPointInjection::mu; std::mutex FailPointInjection::mu;
class FailPointChannel : private boost::noncopyable class FailPointChannel : private boost::noncopyable
{ {
public: public:

View File

@ -15,6 +15,7 @@
#include <unordered_map> #include <unordered_map>
namespace DB namespace DB
{ {
@ -27,6 +28,7 @@ namespace DB
/// 3. in test file, we can use system failpoint enable/disable 'failpoint_name' /// 3. in test file, we can use system failpoint enable/disable 'failpoint_name'
class FailPointChannel; class FailPointChannel;
class FailPointInjection class FailPointInjection
{ {
public: public:

View File

@ -9,6 +9,7 @@
#include <mutex> #include <mutex>
#include <algorithm> #include <algorithm>
#include <Poco/Timespan.h>
namespace ProfileEvents namespace ProfileEvents
@ -49,16 +50,18 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis()
} }
HostResolver::HostResolver(String host_, Poco::Timespan history_) HostResolver::HostResolver(String host_, Poco::Timespan history_)
: host(std::move(host_)) : HostResolver(
, history(history_) [](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); },
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); }) host_,
{ history_)
update(); {}
}
HostResolver::HostResolver( HostResolver::HostResolver(
ResolveFunction && resolve_function_, String host_, Poco::Timespan history_) ResolveFunction && resolve_function_, String host_, Poco::Timespan history_)
: host(std::move(host_)), history(history_), resolve_function(std::move(resolve_function_)) : host(std::move(host_))
, history(history_)
, resolve_interval(history_.totalMicroseconds() / 3)
, resolve_function(std::move(resolve_function_))
{ {
update(); update();
} }
@ -203,7 +206,7 @@ bool HostResolver::isUpdateNeeded()
Poco::Timestamp now; Poco::Timestamp now;
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
return last_resolve_time + history < now || records.empty(); return last_resolve_time + resolve_interval < now || records.empty();
} }
void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddress> & next_gen) void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddress> & next_gen)

View File

@ -26,7 +26,7 @@
// a) it still occurs in resolve set after `history_` time or b) all other addresses are pessimized as well. // a) it still occurs in resolve set after `history_` time or b) all other addresses are pessimized as well.
// - resolve schedule // - resolve schedule
// Addresses are resolved through `DB::DNSResolver::instance()`. // Addresses are resolved through `DB::DNSResolver::instance()`.
// Usually it does not happen more often than once in `history_` time. // Usually it does not happen more often than 3 times in `history_` period.
// But also new resolve performed each `setFail()` call. // But also new resolve performed each `setFail()` call.
namespace DB namespace DB
@ -212,6 +212,7 @@ protected:
const String host; const String host;
const Poco::Timespan history; const Poco::Timespan history;
const Poco::Timespan resolve_interval;
const HostResolverMetrics metrics = getMetrics(); const HostResolverMetrics metrics = getMetrics();
// for tests purpose // for tests purpose
@ -245,4 +246,3 @@ private:
}; };
} }

View File

@ -3,7 +3,7 @@
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h> #include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB namespace DB
{ {

View File

@ -7,7 +7,6 @@
#include <Parsers/ExpressionElementParsers.h> #include <Parsers/ExpressionElementParsers.h>
#include <Parsers/IParser.h> #include <Parsers/IParser.h>
#include <Parsers/TokenIterator.h> #include <Parsers/TokenIterator.h>
#include <base/types.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>

View File

@ -4565,7 +4565,7 @@ Possible values:
- 0 - Disable - 0 - Disable
- 1 - Enable - 1 - Enable
)", 0) \ )", 0) \
DECLARE(Bool, query_plan_merge_filters, true, R"( DECLARE(Bool, query_plan_merge_filters, false, R"(
Allow to merge filters in the query plan Allow to merge filters in the query plan
)", 0) \ )", 0) \
DECLARE(Bool, query_plan_filter_push_down, true, R"( DECLARE(Bool, query_plan_filter_push_down, true, R"(

View File

@ -77,7 +77,6 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."}, {"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."},
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."}, {"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."}, {"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"}, {"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"}, {"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"}, {"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
@ -127,7 +126,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"}, {"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
{"max_parts_to_move", 0, 1000, "New setting"}, {"max_parts_to_move", 0, 1000, "New setting"},
{"hnsw_candidate_list_size_for_search", 64, 256, "New setting. Previously, the value was optionally specified in CREATE INDEX and 64 by default."}, {"hnsw_candidate_list_size_for_search", 64, 256, "New setting. Previously, the value was optionally specified in CREATE INDEX and 64 by default."},
{"allow_reorder_prewhere_conditions", false, true, "New setting"}, {"allow_reorder_prewhere_conditions", true, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."}, {"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."}, {"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."},
} }

View File

@ -10,7 +10,6 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/parseDateTimeBestEffort.h> #include <IO/parseDateTimeBestEffort.h>
#include <Parsers/TokenIterator.h>
namespace DB namespace DB

View File

@ -8,6 +8,7 @@
#include <Interpreters/Cache/FileCacheSettings.h> #include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/LRUFileCachePriority.h> #include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/SLRUFileCachePriority.h> #include <Interpreters/Cache/SLRUFileCachePriority.h>
#include <Interpreters/Cache/FileCacheUtils.h>
#include <Interpreters/Cache/EvictionCandidates.h> #include <Interpreters/Cache/EvictionCandidates.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <base/hex.h> #include <base/hex.h>
@ -53,16 +54,6 @@ namespace ErrorCodes
namespace namespace
{ {
size_t roundDownToMultiple(size_t num, size_t multiple)
{
return (num / multiple) * multiple;
}
size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}
std::string getCommonUserID() std::string getCommonUserID()
{ {
auto user_from_context = DB::Context::getGlobalContextInstance()->getFilesystemCacheUser(); auto user_from_context = DB::Context::getGlobalContextInstance()->getFilesystemCacheUser();
@ -96,6 +87,7 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
: max_file_segment_size(settings.max_file_segment_size) : max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0) , bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment) , boundary_alignment(settings.boundary_alignment)
, background_download_max_file_segment_size(settings.background_download_max_file_segment_size)
, load_metadata_threads(settings.load_metadata_threads) , load_metadata_threads(settings.load_metadata_threads)
, load_metadata_asynchronously(settings.load_metadata_asynchronously) , load_metadata_asynchronously(settings.load_metadata_asynchronously)
, write_cache_per_user_directory(settings.write_cache_per_user_id_directory) , write_cache_per_user_directory(settings.write_cache_per_user_id_directory)
@ -103,7 +95,10 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
, keep_current_elements_to_max_ratio(1 - settings.keep_free_space_elements_ratio) , keep_current_elements_to_max_ratio(1 - settings.keep_free_space_elements_ratio)
, keep_up_free_space_remove_batch(settings.keep_free_space_remove_batch) , keep_up_free_space_remove_batch(settings.keep_free_space_remove_batch)
, log(getLogger("FileCache(" + cache_name + ")")) , log(getLogger("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit, settings.background_download_threads, write_cache_per_user_directory) , metadata(settings.base_path,
settings.background_download_queue_size_limit,
settings.background_download_threads,
write_cache_per_user_directory)
{ {
if (settings.cache_policy == "LRU") if (settings.cache_policy == "LRU")
{ {
@ -601,8 +596,8 @@ FileCache::getOrSet(
/// 2. max_file_segments_limit /// 2. max_file_segments_limit
FileSegment::Range result_range = initial_range; FileSegment::Range result_range = initial_range;
const auto aligned_offset = roundDownToMultiple(initial_range.left, boundary_alignment); const auto aligned_offset = FileCacheUtils::roundDownToMultiple(initial_range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1; auto aligned_end_offset = std::min(FileCacheUtils::roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= initial_range.left); chassert(aligned_offset <= initial_range.left);
chassert(aligned_end_offset >= initial_range.right); chassert(aligned_end_offset >= initial_range.right);
@ -1600,6 +1595,17 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
} }
} }
if (new_settings.background_download_max_file_segment_size != actual_settings.background_download_max_file_segment_size)
{
background_download_max_file_segment_size = new_settings.background_download_max_file_segment_size;
LOG_INFO(log, "Changed background_download_max_file_segment_size from {} to {}",
actual_settings.background_download_max_file_segment_size,
new_settings.background_download_max_file_segment_size);
actual_settings.background_download_max_file_segment_size = new_settings.background_download_max_file_segment_size;
}
if (new_settings.max_size != actual_settings.max_size if (new_settings.max_size != actual_settings.max_size
|| new_settings.max_elements != actual_settings.max_elements) || new_settings.max_elements != actual_settings.max_elements)
{ {

View File

@ -161,6 +161,10 @@ public:
size_t getMaxFileSegmentSize() const { return max_file_segment_size; } size_t getMaxFileSegmentSize() const { return max_file_segment_size; }
size_t getBackgroundDownloadMaxFileSegmentSize() const { return background_download_max_file_segment_size.load(); }
size_t getBoundaryAlignment() const { return boundary_alignment; }
bool tryReserve( bool tryReserve(
FileSegment & file_segment, FileSegment & file_segment,
size_t size, size_t size,
@ -199,6 +203,7 @@ private:
std::atomic<size_t> max_file_segment_size; std::atomic<size_t> max_file_segment_size;
const size_t bypass_cache_threshold; const size_t bypass_cache_threshold;
const size_t boundary_alignment; const size_t boundary_alignment;
std::atomic<size_t> background_download_max_file_segment_size;
size_t load_metadata_threads; size_t load_metadata_threads;
const bool load_metadata_asynchronously; const bool load_metadata_asynchronously;
std::atomic<bool> stop_loading_metadata = false; std::atomic<bool> stop_loading_metadata = false;

View File

@ -62,6 +62,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
if (has("background_download_queue_size_limit")) if (has("background_download_queue_size_limit"))
background_download_queue_size_limit = get_uint("background_download_queue_size_limit"); background_download_queue_size_limit = get_uint("background_download_queue_size_limit");
if (has("background_download_max_file_segment_size"))
background_download_max_file_segment_size = get_uint("background_download_max_file_segment_size");
if (has("load_metadata_threads")) if (has("load_metadata_threads"))
load_metadata_threads = get_uint("load_metadata_threads"); load_metadata_threads = get_uint("load_metadata_threads");

View File

@ -43,6 +43,8 @@ struct FileCacheSettings
double keep_free_space_elements_ratio = FILECACHE_DEFAULT_FREE_SPACE_ELEMENTS_RATIO; double keep_free_space_elements_ratio = FILECACHE_DEFAULT_FREE_SPACE_ELEMENTS_RATIO;
size_t keep_free_space_remove_batch = FILECACHE_DEFAULT_FREE_SPACE_REMOVE_BATCH; size_t keep_free_space_remove_batch = FILECACHE_DEFAULT_FREE_SPACE_REMOVE_BATCH;
size_t background_download_max_file_segment_size = FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE_WITH_BACKGROUND_DOWLOAD;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection); void loadFromCollection(const NamedCollection & collection);

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/Types.h>
namespace FileCacheUtils
{
static size_t roundDownToMultiple(size_t num, size_t multiple)
{
return (num / multiple) * multiple;
}
static size_t roundUpToMultiple(size_t num, size_t multiple)
{
return roundDownToMultiple(num + multiple - 1, multiple);
}
}

View File

@ -6,6 +6,7 @@ namespace DB
static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 32 * 1024 * 1024; /// 32Mi static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 32 * 1024 * 1024; /// 32Mi
static constexpr int FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT = 4 * 1024 * 1024; /// 4Mi static constexpr int FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT = 4 * 1024 * 1024; /// 4Mi
static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE_WITH_BACKGROUND_DOWLOAD = 4 * 1024 * 1024; /// 4Mi
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 5; static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 5;
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT = 5000; static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT = 5000;
static constexpr int FILECACHE_DEFAULT_LOAD_METADATA_THREADS = 16; static constexpr int FILECACHE_DEFAULT_LOAD_METADATA_THREADS = 16;

View File

@ -4,6 +4,7 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Interpreters/Cache/FileCache.h> #include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheUtils.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <base/hex.h> #include <base/hex.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
@ -360,11 +361,14 @@ void FileSegment::write(char * from, size_t size, size_t offset_in_file)
"Expected DOWNLOADING state, got {}", stateToString(download_state)); "Expected DOWNLOADING state, got {}", stateToString(download_state));
const size_t first_non_downloaded_offset = getCurrentWriteOffset(); const size_t first_non_downloaded_offset = getCurrentWriteOffset();
if (offset_in_file != first_non_downloaded_offset) if (offset_in_file != first_non_downloaded_offset)
{
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Attempt to write {} bytes to offset: {}, but current write offset is {}", "Attempt to write {} bytes to offset: {}, but current write offset is {}",
size, offset_in_file, first_non_downloaded_offset); size, offset_in_file, first_non_downloaded_offset);
}
const size_t current_downloaded_size = getDownloadedSize(); const size_t current_downloaded_size = getDownloadedSize();
chassert(reserved_size >= current_downloaded_size); chassert(reserved_size >= current_downloaded_size);
@ -375,8 +379,19 @@ void FileSegment::write(char * from, size_t size, size_t offset_in_file)
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size); "Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
if (!is_unbound && current_downloaded_size == range().size()) if (!is_unbound)
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded"); {
if (current_downloaded_size == range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
if (current_downloaded_size + size > range().size())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot download beyond file segment boundaries: {}. Write offset: {}, size: {}, downloaded size: {}",
range().size(), first_non_downloaded_offset, size, current_downloaded_size);
}
}
if (!cache_writer && current_downloaded_size > 0) if (!cache_writer && current_downloaded_size > 0)
throw Exception( throw Exception(
@ -629,6 +644,36 @@ void FileSegment::completePartAndResetDownloader()
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lk)); LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lk));
} }
size_t FileSegment::getSizeForBackgroundDownload() const
{
auto lk = lock();
return getSizeForBackgroundDownloadUnlocked(lk);
}
size_t FileSegment::getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const
{
if (!background_download_enabled
|| !downloaded_size
|| !remote_file_reader)
{
return 0;
}
chassert(downloaded_size <= range().size());
const size_t background_download_max_file_segment_size = cache->getBackgroundDownloadMaxFileSegmentSize();
size_t desired_size;
if (downloaded_size >= background_download_max_file_segment_size)
desired_size = FileCacheUtils::roundUpToMultiple(downloaded_size, cache->getBoundaryAlignment());
else
desired_size = FileCacheUtils::roundUpToMultiple(background_download_max_file_segment_size, cache->getBoundaryAlignment());
desired_size = std::min(desired_size, range().size());
chassert(desired_size >= downloaded_size);
return desired_size - downloaded_size;
}
void FileSegment::complete(bool allow_background_download) void FileSegment::complete(bool allow_background_download)
{ {
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentCompleteMicroseconds); ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentCompleteMicroseconds);
@ -708,7 +753,8 @@ void FileSegment::complete(bool allow_background_download)
if (is_last_holder) if (is_last_holder)
{ {
bool added_to_download_queue = false; bool added_to_download_queue = false;
if (allow_background_download && background_download_enabled && remote_file_reader) size_t background_download_size = allow_background_download ? getSizeForBackgroundDownloadUnlocked(segment_lock) : 0;
if (background_download_size)
{ {
ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundDownloadQueuePush); ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundDownloadQueuePush);
added_to_download_queue = locked_key->addToDownloadQueue(offset(), segment_lock); /// Finish download in background. added_to_download_queue = locked_key->addToDownloadQueue(offset(), segment_lock); /// Finish download in background.
@ -862,7 +908,12 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
chassert(downloaded_size == reserved_size); chassert(downloaded_size == reserved_size);
chassert(downloaded_size == range().size()); chassert(downloaded_size == range().size());
chassert(downloaded_size > 0); chassert(downloaded_size > 0);
chassert(fs::file_size(getPath()) > 0);
auto file_size = fs::file_size(getPath());
UNUSED(file_size);
chassert(file_size == range().size());
chassert(downloaded_size == range().size());
chassert(queue_iterator || on_delayed_removal); chassert(queue_iterator || on_delayed_removal);
check_iterator(queue_iterator); check_iterator(queue_iterator);
@ -884,7 +935,13 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
chassert(reserved_size >= downloaded_size); chassert(reserved_size >= downloaded_size);
chassert(downloaded_size > 0); chassert(downloaded_size > 0);
chassert(fs::file_size(getPath()) > 0);
auto file_size = fs::file_size(getPath());
UNUSED(file_size);
chassert(file_size > 0);
chassert(file_size <= range().size());
chassert(downloaded_size <= range().size());
chassert(queue_iterator); chassert(queue_iterator);
check_iterator(queue_iterator); check_iterator(queue_iterator);

View File

@ -185,6 +185,8 @@ public:
bool assertCorrectness() const; bool assertCorrectness() const;
size_t getSizeForBackgroundDownload() const;
/** /**
* ========== Methods that must do cv.notify() ================== * ========== Methods that must do cv.notify() ==================
*/ */
@ -230,6 +232,7 @@ private:
String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const; String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const;
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
void resetDownloaderUnlocked(const FileSegmentGuard::Lock &); void resetDownloaderUnlocked(const FileSegmentGuard::Lock &);
size_t getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const;
void setDownloadState(State state, const FileSegmentGuard::Lock &); void setDownloadState(State state, const FileSegmentGuard::Lock &);
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &); void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);

View File

@ -676,13 +676,17 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
log, "Downloading {} bytes for file segment {}", log, "Downloading {} bytes for file segment {}",
file_segment.range().size() - file_segment.getDownloadedSize(), file_segment.getInfoForLog()); file_segment.range().size() - file_segment.getDownloadedSize(), file_segment.getInfoForLog());
size_t size_to_download = file_segment.getSizeForBackgroundDownload();
if (!size_to_download)
return;
auto reader = file_segment.getRemoteFileReader(); auto reader = file_segment.getRemoteFileReader();
if (!reader) if (!reader)
{ {
throw Exception( LOG_TEST(log, "No reader in {}:{} (state: {}, range: {}, downloaded size: {})",
ErrorCodes::LOGICAL_ERROR, "No reader. " file_segment.key(), file_segment.offset(), file_segment.state(),
"File segment should not have been submitted for background download ({})", file_segment.range().toString(), file_segment.getDownloadedSize());
file_segment.getInfoForLog()); return;
} }
/// If remote_fs_read_method == 'threadpool', /// If remote_fs_read_method == 'threadpool',
@ -690,7 +694,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
if (reader->internalBuffer().empty()) if (reader->internalBuffer().empty())
{ {
if (!memory) if (!memory)
memory.emplace(DBMS_DEFAULT_BUFFER_SIZE); memory.emplace(std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size_to_download));
reader->set(memory->data(), memory->size()); reader->set(memory->data(), memory->size());
} }
@ -701,9 +705,13 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
if (offset != static_cast<size_t>(reader->getPosition())) if (offset != static_cast<size_t>(reader->getPosition()))
reader->seek(offset, SEEK_SET); reader->seek(offset, SEEK_SET);
while (!reader->eof()) while (size_to_download && !reader->eof())
{ {
auto size = reader->available(); const auto available = reader->available();
chassert(available);
const auto size = std::min(available, size_to_download);
size_to_download -= size;
std::string failure_reason; std::string failure_reason;
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds, failure_reason)) if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds, failure_reason))
@ -713,7 +721,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
"for {}:{} (downloaded size: {}/{})", "for {}:{} (downloaded size: {}/{})",
file_segment.key(), file_segment.offset(), file_segment.key(), file_segment.offset(),
file_segment.getDownloadedSize(), file_segment.range().size()); file_segment.getDownloadedSize(), file_segment.range().size());
return; break;
} }
try try
@ -728,12 +736,14 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
if (code == /* No space left on device */28 || code == /* Quota exceeded */122) if (code == /* No space left on device */28 || code == /* Quota exceeded */122)
{ {
LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText()); LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText());
return; break;
} }
throw; throw;
} }
} }
file_segment.resetRemoteFileReader();
LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog()); LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog());
} }
@ -1155,7 +1165,7 @@ std::vector<FileSegment::Info> LockedKey::sync()
actual_size, expected_size, file_segment->getInfoForLog()); actual_size, expected_size, file_segment->getInfoForLog());
broken.push_back(FileSegment::getInfo(file_segment)); broken.push_back(FileSegment::getInfo(file_segment));
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */false); it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */true);
} }
return broken; return broken;
} }

View File

@ -210,6 +210,7 @@ public:
bool setBackgroundDownloadThreads(size_t threads_num); bool setBackgroundDownloadThreads(size_t threads_num);
size_t getBackgroundDownloadThreads() const { return download_threads.size(); } size_t getBackgroundDownloadThreads() const { return download_threads.size(); }
bool setBackgroundDownloadQueueSizeLimit(size_t size); bool setBackgroundDownloadQueueSizeLimit(size_t size);
bool isBackgroundDownloadEnabled(); bool isBackgroundDownloadEnabled();

View File

@ -5,10 +5,10 @@
#include <Parsers/ASTPartition.h> #include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Parsers/ASTQueryParameter.h> #include <Parsers/ASTQueryParameter.h>
namespace DB namespace DB
{ {
@ -61,7 +61,7 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else else
fields_count = 0; fields_count = 0;
} }
else if (const auto* literal_ast = value->as<ASTLiteral>(); literal_ast) else if (const auto * literal_ast = value->as<ASTLiteral>(); literal_ast)
{ {
if (literal_ast->value.getType() == Field::Types::Tuple) if (literal_ast->value.getType() == Field::Types::Tuple)
{ {

View File

@ -1,71 +0,0 @@
#pragma once
#include <base/types.h>
#include <Parsers/TokenIterator.h>
#include <map>
#include <memory>
#include <Common/SipHash.h>
namespace DB
{
struct StringRange
{
const char * first = nullptr;
const char * second = nullptr;
StringRange() = default;
StringRange(const char * begin, const char * end) : first(begin), second(end) {}
explicit StringRange(TokenIterator token) : first(token->begin), second(token->end) {}
StringRange(TokenIterator token_begin, TokenIterator token_end)
{
/// Empty range.
if (token_begin == token_end)
{
first = token_begin->begin;
second = token_begin->begin;
return;
}
TokenIterator token_last = token_end;
--token_last;
first = token_begin->begin;
second = token_last->end;
}
};
using StringPtr = std::shared_ptr<String>;
inline String toString(const StringRange & range)
{
return range.first ? String(range.first, range.second) : String();
}
/// Hashes only the values of pointers in StringRange. Is used with StringRangePointersEqualTo comparator.
struct StringRangePointersHash
{
UInt64 operator()(const StringRange & range) const
{
SipHash hash;
hash.update(range.first);
hash.update(range.second);
return hash.get64();
}
};
/// Ranges are equal only when they point to the same memory region.
/// It may be used when it's enough to compare substrings by their position in the same string.
struct StringRangePointersEqualTo
{
constexpr bool operator()(const StringRange &lhs, const StringRange &rhs) const
{
return std::tie(lhs.first, lhs.second) == std::tie(rhs.first, rhs.second);
}
};
}

View File

@ -317,8 +317,9 @@ void LimitTransform::splitChunk(PortsData & data)
length = offset + limit - (rows_read - num_rows) - start; length = offset + limit - (rows_read - num_rows) - start;
} }
/// check if other rows in current block equals to last one in limit /// Check if other rows in current block equals to last one in limit
if (with_ties && length) /// when rows read >= offset + limit.
if (with_ties && offset + limit <= rows_read && length)
{ {
UInt64 current_row_num = start + length; UInt64 current_row_num = start + length;
previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1); previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);

View File

@ -1045,7 +1045,6 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
MarkRanges res; MarkRanges res;
size_t marks_count = part->index_granularity.getMarksCount(); size_t marks_count = part->index_granularity.getMarksCount();
const auto & index = part->getIndex();
if (marks_count == 0) if (marks_count == 0)
return res; return res;
@ -1073,14 +1072,19 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
auto index_columns = std::make_shared<ColumnsWithTypeAndName>(); auto index_columns = std::make_shared<ColumnsWithTypeAndName>();
const auto & key_indices = key_condition.getKeyIndices(); const auto & key_indices = key_condition.getKeyIndices();
DataTypes key_types; DataTypes key_types;
for (size_t i : key_indices) if (!key_indices.empty())
{ {
if (i < index->size()) const auto & index = part->getIndex();
index_columns->emplace_back(index->at(i), primary_key.data_types[i], primary_key.column_names[i]);
else
index_columns->emplace_back(); /// The column of the primary key was not loaded in memory - we'll skip it.
key_types.emplace_back(primary_key.data_types[i]); for (size_t i : key_indices)
{
if (i < index->size())
index_columns->emplace_back(index->at(i), primary_key.data_types[i], primary_key.column_names[i]);
else
index_columns->emplace_back(); /// The column of the primary key was not loaded in memory - we'll skip it.
key_types.emplace_back(primary_key.data_types[i]);
}
} }
/// If there are no monotonic functions, there is no need to save block reference. /// If there are no monotonic functions, there is no need to save block reference.

View File

@ -30,8 +30,8 @@ namespace ErrorCodes
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \ DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ DECLARE(UInt64, polling_max_timeout_ms, 10 * 60 * 1000, "Maximum timeout before next polling", 0) \
DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \ DECLARE(UInt64, polling_backoff_ms, 30 * 1000, "Polling backoff", 0) \
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \ DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \ DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \ DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \

View File

@ -395,7 +395,7 @@ def test_secure_connection_uri(started_cluster):
simple_mongo_table.insert_many(data) simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"] node = started_cluster.instances["node"]
node.query( node.query(
"CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true', 'test_secure_connection_uri')" "CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true&authSource=admin', 'test_secure_connection_uri')"
) )
assert node.query("SELECT COUNT() FROM test_secure_connection_uri") == "100\n" assert node.query("SELECT COUNT() FROM test_secure_connection_uri") == "100\n"

View File

@ -407,6 +407,8 @@ def test_failed_retry(started_cluster, mode, engine_name):
additional_settings={ additional_settings={
"s3queue_loading_retries": retries_num, "s3queue_loading_retries": retries_num,
"keeper_path": keeper_path, "keeper_path": keeper_path,
"polling_max_timeout_ms": 5000,
"polling_backoff_ms": 1000,
}, },
engine_name=engine_name, engine_name=engine_name,
) )
@ -852,6 +854,8 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
additional_settings={ additional_settings={
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_buckets": 2, "s3queue_buckets": 2,
"polling_max_timeout_ms": 2000,
"polling_backoff_ms": 1000,
**({"s3queue_processing_threads_num": 1} if mode == "ordered" else {}), **({"s3queue_processing_threads_num": 1} if mode == "ordered" else {}),
}, },
) )
@ -929,6 +933,8 @@ def test_max_set_age(started_cluster):
"cleanup_interval_min_ms": max_age / 3, "cleanup_interval_min_ms": max_age / 3,
"cleanup_interval_max_ms": max_age / 3, "cleanup_interval_max_ms": max_age / 3,
"loading_retries": 0, "loading_retries": 0,
"polling_max_timeout_ms": 5000,
"polling_backoff_ms": 1000,
"processing_threads_num": 1, "processing_threads_num": 1,
"loading_retries": 0, "loading_retries": 0,
}, },
@ -1423,6 +1429,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
"keeper_path": keeper_path, "keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads, "s3queue_processing_threads_num": processing_threads,
"s3queue_buckets": shards_num, "s3queue_buckets": shards_num,
"polling_max_timeout_ms": 1000,
"polling_backoff_ms": 0,
}, },
) )
i += 1 i += 1
@ -1673,6 +1681,8 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads)
"s3queue_processing_threads_num": processing_threads, "s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv", "s3queue_last_processed_path": f"{files_path}/test_5.csv",
"s3queue_buckets": 2, "s3queue_buckets": 2,
"polling_max_timeout_ms": 2000,
"polling_backoff_ms": 1000,
}, },
) )

View File

@ -53,3 +53,5 @@
100 100
100 100
100 100
12
12

View File

@ -35,4 +35,24 @@ select count() from (select number, number < 100 from numbers(2000) order by num
SET max_block_size = 5; SET max_block_size = 5;
select count() from (select number < 100, number from numbers(2000) order by number < 100 desc limit 10 with ties); select count() from (select number < 100, number from numbers(2000) order by number < 100 desc limit 10 with ties);
SELECT count() FROM (WITH data AS (
SELECT * FROM numbers(0, 10)
UNION ALL
SELECT * FROM numbers(10, 10)
)
SELECT number div 10 AS ten, number
FROM data
ORDER BY ten
LIMIT 8,6 WITH TIES);
SELECT count() FROM (WITH data AS (
SELECT * FROM numbers(0, 10)
UNION ALL
SELECT * FROM numbers(10, 10)
)
SELECT number div 11 AS eleven, number
FROM data
ORDER BY eleven
LIMIT 8,6 WITH TIES);
DROP TABLE ties; DROP TABLE ties;

View File

@ -163,6 +163,7 @@ Filter column: notEquals(__table1.y, 2_UInt8)
> filter is pushed down before CreatingSets > filter is pushed down before CreatingSets
CreatingSets CreatingSets
Filter Filter
Filter
1 1
3 3
> one condition of filter is pushed down before LEFT JOIN > one condition of filter is pushed down before LEFT JOIN

View File

@ -332,12 +332,13 @@ SETTINGS optimize_aggregators_of_group_by_keys=0 -- avoid removing any() as it d
Expression (Projection) Expression (Projection)
Sorting (Sorting for ORDER BY) Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY) Expression (Before ORDER BY)
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING)) Filter ((WHERE + (Projection + Before ORDER BY)))
Aggregating Filter (HAVING)
Expression ((Before GROUP BY + Projection)) Aggregating
Sorting (Sorting for ORDER BY) Expression ((Before GROUP BY + Projection))
Expression ((Before ORDER BY + (Projection + Before ORDER BY))) Sorting (Sorting for ORDER BY)
ReadFromSystemNumbers Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromSystemNumbers
-- execute -- execute
1 1
2 2

View File

@ -28,17 +28,21 @@ WHERE type_1 = \'all\'
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Filter) (Filter)
FilterTransform × 6 FilterTransform × 2
(Aggregating) (Filter)
ExpressionTransform × 2 FilterTransform × 2
AggregatingTransform × 2 (Filter)
Copy 1 → 2 FilterTransform × 2
(Expression) (Aggregating)
ExpressionTransform ExpressionTransform × 2
(Expression) AggregatingTransform × 2
ExpressionTransform Copy 1 → 2
(ReadFromMergeTree) (Expression)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 ExpressionTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Filter) (Filter)
@ -64,10 +68,14 @@ ExpressionTransform × 2
ExpressionTransform × 2 ExpressionTransform × 2
AggregatingTransform × 2 AggregatingTransform × 2
Copy 1 → 2 Copy 1 → 2
(Expression) (Filter)
ExpressionTransform FilterTransform
(ReadFromMergeTree) (Filter)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1 FilterTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
(Expression) (Expression)
ExpressionTransform × 2 ExpressionTransform × 2
(Aggregating) (Aggregating)

View File

@ -1,4 +1,18 @@
100000000 100000000 Row 1:
0 0 ──────
round(primary_key_bytes_in_memory, -7): 100000000 -- 100.00 million
round(primary_key_bytes_in_memory_allocated, -7): 100000000 -- 100.00 million
Row 1:
──────
primary_key_bytes_in_memory: 0
primary_key_bytes_in_memory_allocated: 0
1 1
100000000 100000000 Row 1:
──────
primary_key_bytes_in_memory: 0
primary_key_bytes_in_memory_allocated: 0
1
Row 1:
──────
round(primary_key_bytes_in_memory, -7): 100000000 -- 100.00 million
round(primary_key_bytes_in_memory_allocated, -7): 100000000 -- 100.00 million

View File

@ -3,17 +3,26 @@ CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granul
SET optimize_trivial_insert_select = 1; SET optimize_trivial_insert_select = 1;
INSERT INTO test SELECT randomString(1000) FROM numbers(100000); INSERT INTO test SELECT randomString(1000) FROM numbers(100000);
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test'; SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
DETACH TABLE test; DETACH TABLE test;
SET max_memory_usage = '50M'; SET max_memory_usage = '50M';
ATTACH TABLE test; ATTACH TABLE test;
SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test'; SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
SET max_memory_usage = '200M'; SET max_memory_usage = '200M';
-- Run a query that doesn use indexes
SELECT s != '' FROM test LIMIT 1; SELECT s != '' FROM test LIMIT 1;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test'; -- Check that index was not loaded
SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
-- Run a query that uses PK index
SELECT s != '' FROM test WHERE s < '9999999999' LIMIT 1;
-- Check that index was loaded
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
DROP TABLE test; DROP TABLE test;

View File

@ -5,9 +5,19 @@
100000000 100000000 100000000 100000000
0 0 0 0
0 0 0 0
Query that does not use index for table `test`
1
0 0
0 0
Query that uses index in for table `test`
1 1
100000000 100000000 100000000 100000000
0 0 0 0
Query that does not use index for table `test2`
1
100000000 100000000
0 0
Query that uses index for table `test2`
1 1
100000000 100000000 100000000 100000000
100000000 100000000 100000000 100000000

View File

@ -16,8 +16,18 @@ SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory
SYSTEM UNLOAD PRIMARY KEY {CLICKHOUSE_DATABASE:Identifier}.test2; SYSTEM UNLOAD PRIMARY KEY {CLICKHOUSE_DATABASE:Identifier}.test2;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2'); SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
SELECT 'Query that does not use index for table `test`';
SELECT s != '' FROM test LIMIT 1; SELECT s != '' FROM test LIMIT 1;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2'); SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
SELECT 'Query that uses index in for table `test`';
SELECT s != '' FROM test WHERE s < '99999999' LIMIT 1;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
SELECT 'Query that does not use index for table `test2`';
SELECT s != '' FROM test2 LIMIT 1; SELECT s != '' FROM test2 LIMIT 1;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2'); SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
SELECT 'Query that uses index for table `test2`';
SELECT s != '' FROM test2 WHERE s < '99999999' LIMIT 1;
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');

View File

@ -1 +1 @@
test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB test 10000000 352 39 39

View File

@ -1,4 +1,4 @@
-- Tags: no-random-settings -- Tags: no-random-settings, no-fasttest
set allow_experimental_dynamic_type = 1; set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1; set allow_experimental_json_type = 1;
@ -10,10 +10,10 @@ insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(
SELECT SELECT
`table`, `table`,
formatReadableQuantity(sum(rows)) AS rows, sum(rows) AS rows,
formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed, floor(sum(data_uncompressed_bytes) / (1024 * 1024)) AS data_size_uncompressed,
formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed, floor(sum(data_compressed_bytes) / (1024 * 1024)) AS data_size_compressed,
formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk floor(sum(bytes_on_disk) / (1024 * 1024)) AS total_size_on_disk
FROM system.parts FROM system.parts
WHERE active AND (database = currentDatabase()) AND (`table` = 'test') WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
GROUP BY `table` GROUP BY `table`

View File

@ -1,2 +1,2 @@
Condition: and((materialize(auid) in [1, 1]), (_CAST(toDate(ts)) in (-Inf, 1703980800])) Condition: (_CAST(toDate(ts)) in (-Inf, 1703980800])
Granules: 1/3 Granules: 3/3

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# A query with two seconds sleep cannot be processed with QPS > 0.5
$CLICKHOUSE_BENCHMARK --query "SELECT sleep(2)" 2>&1 | grep -m1 -o -P 'QPS: \d+\.\d+' | $CLICKHOUSE_LOCAL --query "SELECT throwIf(extract(line, 'QPS: (.+)')::Float64 > 0.75) FROM table" --input-format LineAsString

View File

@ -0,0 +1,2 @@
-0.07947094746692918 -1017248723 0
-0.07947094746692918 -1017248723 0

View File

@ -0,0 +1,10 @@
-- https://s3.amazonaws.com/clickhouse-test-reports/0/a02b20a9813c6ba0880c67f079363ef1c5440109/sqlancer__debug_.html
-- Caused by enablement of query_plan_merge_filters. Will fail if the next line is uncommented
-- set query_plan_merge_filters=1;
CREATE TABLE IF NOT EXISTS t3 (c0 Int32) ENGINE = Memory() ;
INSERT INTO t3(c0) VALUES (1110866669);
-- These 2 queries are expected to return the same
SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 SETTINGS aggregate_functions_null_for_empty=1, enable_optimize_predicate_expression=0;
SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING ((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723)))) UNION ALL SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING (NOT (((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723)))))) UNION ALL SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING ((((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723))))) IS NULL) SETTINGS aggregate_functions_null_for_empty=1, enable_optimize_predicate_expression=0;