mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into fp16
This commit is contained in:
commit
e6c6217e87
@ -22,13 +22,6 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
/** Due to a lack of proper code review, this code was contributed with a multiplication of template instantiations
|
||||
* over all pairs of data types, and we deeply regret that.
|
||||
*
|
||||
* We cannot remove all combinations, because the binary representation of serialized data has to remain the same,
|
||||
* but we can partially heal the wound by treating unsigned and signed data types in the same way.
|
||||
*/
|
||||
|
||||
template <typename ValueType, typename TimestampType>
|
||||
struct AggregationFunctionDeltaSumTimestampData
|
||||
{
|
||||
@ -44,22 +37,23 @@ template <typename ValueType, typename TimestampType>
|
||||
class AggregationFunctionDeltaSumTimestamp final
|
||||
: public IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>
|
||||
{
|
||||
public:
|
||||
AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params)
|
||||
: IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{arguments, params, createResultType()}
|
||||
{
|
||||
}
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>{arguments, params, createResultType()}
|
||||
{}
|
||||
|
||||
AggregationFunctionDeltaSumTimestamp()
|
||||
: IAggregateFunctionDataHelper<
|
||||
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{}
|
||||
{
|
||||
}
|
||||
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
|
||||
>{}
|
||||
{}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
@ -69,8 +63,8 @@ public:
|
||||
|
||||
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto value = unalignedLoad<ValueType>(columns[0]->getRawData().data() + row_num * sizeof(ValueType));
|
||||
auto ts = unalignedLoad<TimestampType>(columns[1]->getRawData().data() + row_num * sizeof(TimestampType));
|
||||
auto value = assert_cast<const ColumnVector<ValueType> &>(*columns[0]).getData()[row_num];
|
||||
auto ts = assert_cast<const ColumnVector<TimestampType> &>(*columns[1]).getData()[row_num];
|
||||
|
||||
auto & data = this->data(place);
|
||||
|
||||
@ -178,48 +172,10 @@ public:
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
{
|
||||
static_cast<ColumnFixedSizeHelper &>(to).template insertRawData<sizeof(ValueType)>(
|
||||
reinterpret_cast<const char *>(&this->data(place).sum));
|
||||
assert_cast<ColumnVector<ValueType> &>(to).getData().push_back(this->data(place).sum);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
IAggregateFunction * createWithTwoTypesSecond(const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(second_type);
|
||||
|
||||
if (which.idx == TypeIndex::UInt32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
|
||||
if (which.idx == TypeIndex::UInt64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
|
||||
if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
|
||||
if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
|
||||
if (which.idx == TypeIndex::Float32) return new AggregateFunctionTemplate<FirstType, Float32>(args...);
|
||||
if (which.idx == TypeIndex::Float64) return new AggregateFunctionTemplate<FirstType, Float64>(args...);
|
||||
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
|
||||
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
IAggregateFunction * createWithTwoTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(first_type);
|
||||
|
||||
if (which.idx == TypeIndex::UInt8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::UInt16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::UInt32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::UInt64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Int8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Int16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Int32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Int64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Float32) return createWithTwoTypesSecond<Float32, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Float64) return createWithTwoTypesSecond<Float64, AggregateFunctionTemplate>(second_type, args...);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
|
||||
const String & name,
|
||||
const DataTypes & arguments,
|
||||
@ -237,7 +193,7 @@ AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, "
|
||||
"must be Int, Float, Date, DateTime", arguments[1]->getName(), name);
|
||||
|
||||
return AggregateFunctionPtr(createWithTwoTypes<AggregationFunctionDeltaSumTimestamp>(
|
||||
return AggregateFunctionPtr(createWithTwoNumericOrDateTypes<AggregationFunctionDeltaSumTimestamp>(
|
||||
*arguments[0], *arguments[1], arguments, params));
|
||||
}
|
||||
}
|
||||
|
@ -184,8 +184,36 @@ static IAggregateFunction * createWithDecimalType(const IDataType & argument_typ
|
||||
}
|
||||
|
||||
/** For template with two arguments.
|
||||
* This is an extremely dangerous for code bloat - do not use.
|
||||
*/
|
||||
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(second_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
|
||||
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(first_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) \
|
||||
return createWithTwoNumericTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8)
|
||||
return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Enum16)
|
||||
return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoBasicNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
@ -209,6 +237,46 @@ static IAggregateFunction * createWithTwoBasicNumericTypes(const IDataType & fir
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(second_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
|
||||
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
|
||||
|
||||
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
|
||||
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
|
||||
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithTwoNumericOrDateTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(first_type);
|
||||
#define DISPATCH(TYPE) \
|
||||
if (which.idx == TypeIndex::TYPE) \
|
||||
return createWithTwoNumericOrDateTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
#undef DISPATCH
|
||||
if (which.idx == TypeIndex::Enum8)
|
||||
return createWithTwoNumericOrDateTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::Enum16)
|
||||
return createWithTwoNumericOrDateTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
|
||||
|
||||
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
|
||||
if (which.idx == TypeIndex::Date)
|
||||
return createWithTwoNumericOrDateTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
|
||||
if (which.idx == TypeIndex::DateTime)
|
||||
return createWithTwoNumericOrDateTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template <template <typename> class AggregateFunctionTemplate, typename... TArgs>
|
||||
static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args)
|
||||
{
|
||||
|
@ -140,8 +140,6 @@ void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors
|
||||
/// We don't do highlighting for foreign dialects, such as PRQL and Kusto.
|
||||
/// Only normal ClickHouse SQL queries are highlighted.
|
||||
|
||||
/// Currently we highlight only the first query in the multi-query mode.
|
||||
|
||||
ParserQuery parser(end, false, context.getSettingsRef()[Setting::implicit_select]);
|
||||
ASTPtr ast;
|
||||
bool parse_res = false;
|
||||
|
@ -87,6 +87,7 @@ APPLY_FOR_FAILPOINTS(M, M, M, M)
|
||||
|
||||
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointInjection::fail_point_wait_channels;
|
||||
std::mutex FailPointInjection::mu;
|
||||
|
||||
class FailPointChannel : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -27,6 +28,7 @@ namespace DB
|
||||
/// 3. in test file, we can use system failpoint enable/disable 'failpoint_name'
|
||||
|
||||
class FailPointChannel;
|
||||
|
||||
class FailPointInjection
|
||||
{
|
||||
public:
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include <mutex>
|
||||
#include <algorithm>
|
||||
#include <Poco/Timespan.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -49,16 +50,18 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis()
|
||||
}
|
||||
|
||||
HostResolver::HostResolver(String host_, Poco::Timespan history_)
|
||||
: host(std::move(host_))
|
||||
, history(history_)
|
||||
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); })
|
||||
{
|
||||
update();
|
||||
}
|
||||
: HostResolver(
|
||||
[](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); },
|
||||
host_,
|
||||
history_)
|
||||
{}
|
||||
|
||||
HostResolver::HostResolver(
|
||||
ResolveFunction && resolve_function_, String host_, Poco::Timespan history_)
|
||||
: host(std::move(host_)), history(history_), resolve_function(std::move(resolve_function_))
|
||||
: host(std::move(host_))
|
||||
, history(history_)
|
||||
, resolve_interval(history_.totalMicroseconds() / 3)
|
||||
, resolve_function(std::move(resolve_function_))
|
||||
{
|
||||
update();
|
||||
}
|
||||
@ -203,7 +206,7 @@ bool HostResolver::isUpdateNeeded()
|
||||
Poco::Timestamp now;
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
return last_resolve_time + history < now || records.empty();
|
||||
return last_resolve_time + resolve_interval < now || records.empty();
|
||||
}
|
||||
|
||||
void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddress> & next_gen)
|
||||
|
@ -26,7 +26,7 @@
|
||||
// a) it still occurs in resolve set after `history_` time or b) all other addresses are pessimized as well.
|
||||
// - resolve schedule
|
||||
// Addresses are resolved through `DB::DNSResolver::instance()`.
|
||||
// Usually it does not happen more often than once in `history_` time.
|
||||
// Usually it does not happen more often than 3 times in `history_` period.
|
||||
// But also new resolve performed each `setFail()` call.
|
||||
|
||||
namespace DB
|
||||
@ -212,6 +212,7 @@ protected:
|
||||
|
||||
const String host;
|
||||
const Poco::Timespan history;
|
||||
const Poco::Timespan resolve_interval;
|
||||
const HostResolverMetrics metrics = getMetrics();
|
||||
|
||||
// for tests purpose
|
||||
@ -245,4 +246,3 @@ private:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
#include <IO/BufferWithOwnMemory.h>
|
||||
#include <Parsers/StringRange.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Parsers/ExpressionElementParsers.h>
|
||||
#include <Parsers/IParser.h>
|
||||
#include <Parsers/TokenIterator.h>
|
||||
#include <base/types.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
|
||||
|
@ -4565,7 +4565,7 @@ Possible values:
|
||||
- 0 - Disable
|
||||
- 1 - Enable
|
||||
)", 0) \
|
||||
DECLARE(Bool, query_plan_merge_filters, true, R"(
|
||||
DECLARE(Bool, query_plan_merge_filters, false, R"(
|
||||
Allow to merge filters in the query plan
|
||||
)", 0) \
|
||||
DECLARE(Bool, query_plan_filter_push_down, true, R"(
|
||||
|
@ -77,7 +77,6 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."},
|
||||
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
|
||||
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
|
||||
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
|
||||
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
|
||||
{"allow_experimental_bfloat16_type", false, false, "Add new experimental BFloat16 type"},
|
||||
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},
|
||||
@ -128,7 +127,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
|
||||
{"max_parts_to_move", 0, 1000, "New setting"},
|
||||
{"hnsw_candidate_list_size_for_search", 64, 256, "New setting. Previously, the value was optionally specified in CREATE INDEX and 64 by default."},
|
||||
{"allow_reorder_prewhere_conditions", false, true, "New setting"},
|
||||
{"allow_reorder_prewhere_conditions", true, true, "New setting"},
|
||||
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
|
||||
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."},
|
||||
}
|
||||
|
@ -10,7 +10,6 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/parseDateTimeBestEffort.h>
|
||||
#include <Parsers/TokenIterator.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/LRUFileCachePriority.h>
|
||||
#include <Interpreters/Cache/SLRUFileCachePriority.h>
|
||||
#include <Interpreters/Cache/FileCacheUtils.h>
|
||||
#include <Interpreters/Cache/EvictionCandidates.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <base/hex.h>
|
||||
@ -53,16 +54,6 @@ namespace ErrorCodes
|
||||
|
||||
namespace
|
||||
{
|
||||
size_t roundDownToMultiple(size_t num, size_t multiple)
|
||||
{
|
||||
return (num / multiple) * multiple;
|
||||
}
|
||||
|
||||
size_t roundUpToMultiple(size_t num, size_t multiple)
|
||||
{
|
||||
return roundDownToMultiple(num + multiple - 1, multiple);
|
||||
}
|
||||
|
||||
std::string getCommonUserID()
|
||||
{
|
||||
auto user_from_context = DB::Context::getGlobalContextInstance()->getFilesystemCacheUser();
|
||||
@ -96,6 +87,7 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
|
||||
: max_file_segment_size(settings.max_file_segment_size)
|
||||
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
|
||||
, boundary_alignment(settings.boundary_alignment)
|
||||
, background_download_max_file_segment_size(settings.background_download_max_file_segment_size)
|
||||
, load_metadata_threads(settings.load_metadata_threads)
|
||||
, load_metadata_asynchronously(settings.load_metadata_asynchronously)
|
||||
, write_cache_per_user_directory(settings.write_cache_per_user_id_directory)
|
||||
@ -103,7 +95,10 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
|
||||
, keep_current_elements_to_max_ratio(1 - settings.keep_free_space_elements_ratio)
|
||||
, keep_up_free_space_remove_batch(settings.keep_free_space_remove_batch)
|
||||
, log(getLogger("FileCache(" + cache_name + ")"))
|
||||
, metadata(settings.base_path, settings.background_download_queue_size_limit, settings.background_download_threads, write_cache_per_user_directory)
|
||||
, metadata(settings.base_path,
|
||||
settings.background_download_queue_size_limit,
|
||||
settings.background_download_threads,
|
||||
write_cache_per_user_directory)
|
||||
{
|
||||
if (settings.cache_policy == "LRU")
|
||||
{
|
||||
@ -601,8 +596,8 @@ FileCache::getOrSet(
|
||||
/// 2. max_file_segments_limit
|
||||
FileSegment::Range result_range = initial_range;
|
||||
|
||||
const auto aligned_offset = roundDownToMultiple(initial_range.left, boundary_alignment);
|
||||
auto aligned_end_offset = std::min(roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
|
||||
const auto aligned_offset = FileCacheUtils::roundDownToMultiple(initial_range.left, boundary_alignment);
|
||||
auto aligned_end_offset = std::min(FileCacheUtils::roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
|
||||
|
||||
chassert(aligned_offset <= initial_range.left);
|
||||
chassert(aligned_end_offset >= initial_range.right);
|
||||
@ -1600,6 +1595,17 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
|
||||
}
|
||||
}
|
||||
|
||||
if (new_settings.background_download_max_file_segment_size != actual_settings.background_download_max_file_segment_size)
|
||||
{
|
||||
background_download_max_file_segment_size = new_settings.background_download_max_file_segment_size;
|
||||
|
||||
LOG_INFO(log, "Changed background_download_max_file_segment_size from {} to {}",
|
||||
actual_settings.background_download_max_file_segment_size,
|
||||
new_settings.background_download_max_file_segment_size);
|
||||
|
||||
actual_settings.background_download_max_file_segment_size = new_settings.background_download_max_file_segment_size;
|
||||
}
|
||||
|
||||
if (new_settings.max_size != actual_settings.max_size
|
||||
|| new_settings.max_elements != actual_settings.max_elements)
|
||||
{
|
||||
|
@ -161,6 +161,10 @@ public:
|
||||
|
||||
size_t getMaxFileSegmentSize() const { return max_file_segment_size; }
|
||||
|
||||
size_t getBackgroundDownloadMaxFileSegmentSize() const { return background_download_max_file_segment_size.load(); }
|
||||
|
||||
size_t getBoundaryAlignment() const { return boundary_alignment; }
|
||||
|
||||
bool tryReserve(
|
||||
FileSegment & file_segment,
|
||||
size_t size,
|
||||
@ -199,6 +203,7 @@ private:
|
||||
std::atomic<size_t> max_file_segment_size;
|
||||
const size_t bypass_cache_threshold;
|
||||
const size_t boundary_alignment;
|
||||
std::atomic<size_t> background_download_max_file_segment_size;
|
||||
size_t load_metadata_threads;
|
||||
const bool load_metadata_asynchronously;
|
||||
std::atomic<bool> stop_loading_metadata = false;
|
||||
|
@ -62,6 +62,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
|
||||
if (has("background_download_queue_size_limit"))
|
||||
background_download_queue_size_limit = get_uint("background_download_queue_size_limit");
|
||||
|
||||
if (has("background_download_max_file_segment_size"))
|
||||
background_download_max_file_segment_size = get_uint("background_download_max_file_segment_size");
|
||||
|
||||
if (has("load_metadata_threads"))
|
||||
load_metadata_threads = get_uint("load_metadata_threads");
|
||||
|
||||
|
@ -43,6 +43,8 @@ struct FileCacheSettings
|
||||
double keep_free_space_elements_ratio = FILECACHE_DEFAULT_FREE_SPACE_ELEMENTS_RATIO;
|
||||
size_t keep_free_space_remove_batch = FILECACHE_DEFAULT_FREE_SPACE_REMOVE_BATCH;
|
||||
|
||||
size_t background_download_max_file_segment_size = FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE_WITH_BACKGROUND_DOWLOAD;
|
||||
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
|
||||
void loadFromCollection(const NamedCollection & collection);
|
||||
|
||||
|
17
src/Interpreters/Cache/FileCacheUtils.h
Normal file
17
src/Interpreters/Cache/FileCacheUtils.h
Normal file
@ -0,0 +1,17 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
|
||||
namespace FileCacheUtils
|
||||
{
|
||||
|
||||
static size_t roundDownToMultiple(size_t num, size_t multiple)
|
||||
{
|
||||
return (num / multiple) * multiple;
|
||||
}
|
||||
|
||||
static size_t roundUpToMultiple(size_t num, size_t multiple)
|
||||
{
|
||||
return roundDownToMultiple(num + multiple - 1, multiple);
|
||||
}
|
||||
|
||||
}
|
@ -6,6 +6,7 @@ namespace DB
|
||||
|
||||
static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE = 32 * 1024 * 1024; /// 32Mi
|
||||
static constexpr int FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT = 4 * 1024 * 1024; /// 4Mi
|
||||
static constexpr int FILECACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE_WITH_BACKGROUND_DOWLOAD = 4 * 1024 * 1024; /// 4Mi
|
||||
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 5;
|
||||
static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_QUEUE_SIZE_LIMIT = 5000;
|
||||
static constexpr int FILECACHE_DEFAULT_LOAD_METADATA_THREADS = 16;
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheUtils.h>
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/hex.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
@ -360,11 +361,14 @@ void FileSegment::write(char * from, size_t size, size_t offset_in_file)
|
||||
"Expected DOWNLOADING state, got {}", stateToString(download_state));
|
||||
|
||||
const size_t first_non_downloaded_offset = getCurrentWriteOffset();
|
||||
|
||||
if (offset_in_file != first_non_downloaded_offset)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Attempt to write {} bytes to offset: {}, but current write offset is {}",
|
||||
size, offset_in_file, first_non_downloaded_offset);
|
||||
}
|
||||
|
||||
const size_t current_downloaded_size = getDownloadedSize();
|
||||
chassert(reserved_size >= current_downloaded_size);
|
||||
@ -375,8 +379,19 @@ void FileSegment::write(char * from, size_t size, size_t offset_in_file)
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size);
|
||||
|
||||
if (!is_unbound && current_downloaded_size == range().size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
|
||||
if (!is_unbound)
|
||||
{
|
||||
if (current_downloaded_size == range().size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded");
|
||||
|
||||
if (current_downloaded_size + size > range().size())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot download beyond file segment boundaries: {}. Write offset: {}, size: {}, downloaded size: {}",
|
||||
range().size(), first_non_downloaded_offset, size, current_downloaded_size);
|
||||
}
|
||||
}
|
||||
|
||||
if (!cache_writer && current_downloaded_size > 0)
|
||||
throw Exception(
|
||||
@ -629,6 +644,36 @@ void FileSegment::completePartAndResetDownloader()
|
||||
LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lk));
|
||||
}
|
||||
|
||||
size_t FileSegment::getSizeForBackgroundDownload() const
|
||||
{
|
||||
auto lk = lock();
|
||||
return getSizeForBackgroundDownloadUnlocked(lk);
|
||||
}
|
||||
|
||||
size_t FileSegment::getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const
|
||||
{
|
||||
if (!background_download_enabled
|
||||
|| !downloaded_size
|
||||
|| !remote_file_reader)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
chassert(downloaded_size <= range().size());
|
||||
|
||||
const size_t background_download_max_file_segment_size = cache->getBackgroundDownloadMaxFileSegmentSize();
|
||||
size_t desired_size;
|
||||
if (downloaded_size >= background_download_max_file_segment_size)
|
||||
desired_size = FileCacheUtils::roundUpToMultiple(downloaded_size, cache->getBoundaryAlignment());
|
||||
else
|
||||
desired_size = FileCacheUtils::roundUpToMultiple(background_download_max_file_segment_size, cache->getBoundaryAlignment());
|
||||
|
||||
desired_size = std::min(desired_size, range().size());
|
||||
chassert(desired_size >= downloaded_size);
|
||||
|
||||
return desired_size - downloaded_size;
|
||||
}
|
||||
|
||||
void FileSegment::complete(bool allow_background_download)
|
||||
{
|
||||
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentCompleteMicroseconds);
|
||||
@ -708,7 +753,8 @@ void FileSegment::complete(bool allow_background_download)
|
||||
if (is_last_holder)
|
||||
{
|
||||
bool added_to_download_queue = false;
|
||||
if (allow_background_download && background_download_enabled && remote_file_reader)
|
||||
size_t background_download_size = allow_background_download ? getSizeForBackgroundDownloadUnlocked(segment_lock) : 0;
|
||||
if (background_download_size)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::FilesystemCacheBackgroundDownloadQueuePush);
|
||||
added_to_download_queue = locked_key->addToDownloadQueue(offset(), segment_lock); /// Finish download in background.
|
||||
@ -862,7 +908,12 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
|
||||
chassert(downloaded_size == reserved_size);
|
||||
chassert(downloaded_size == range().size());
|
||||
chassert(downloaded_size > 0);
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
|
||||
auto file_size = fs::file_size(getPath());
|
||||
UNUSED(file_size);
|
||||
|
||||
chassert(file_size == range().size());
|
||||
chassert(downloaded_size == range().size());
|
||||
|
||||
chassert(queue_iterator || on_delayed_removal);
|
||||
check_iterator(queue_iterator);
|
||||
@ -884,7 +935,13 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock & lock)
|
||||
|
||||
chassert(reserved_size >= downloaded_size);
|
||||
chassert(downloaded_size > 0);
|
||||
chassert(fs::file_size(getPath()) > 0);
|
||||
|
||||
auto file_size = fs::file_size(getPath());
|
||||
UNUSED(file_size);
|
||||
|
||||
chassert(file_size > 0);
|
||||
chassert(file_size <= range().size());
|
||||
chassert(downloaded_size <= range().size());
|
||||
|
||||
chassert(queue_iterator);
|
||||
check_iterator(queue_iterator);
|
||||
|
@ -185,6 +185,8 @@ public:
|
||||
|
||||
bool assertCorrectness() const;
|
||||
|
||||
size_t getSizeForBackgroundDownload() const;
|
||||
|
||||
/**
|
||||
* ========== Methods that must do cv.notify() ==================
|
||||
*/
|
||||
@ -230,6 +232,7 @@ private:
|
||||
String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const;
|
||||
bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const;
|
||||
void resetDownloaderUnlocked(const FileSegmentGuard::Lock &);
|
||||
size_t getSizeForBackgroundDownloadUnlocked(const FileSegmentGuard::Lock &) const;
|
||||
|
||||
void setDownloadState(State state, const FileSegmentGuard::Lock &);
|
||||
void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &);
|
||||
|
@ -676,13 +676,17 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
log, "Downloading {} bytes for file segment {}",
|
||||
file_segment.range().size() - file_segment.getDownloadedSize(), file_segment.getInfoForLog());
|
||||
|
||||
size_t size_to_download = file_segment.getSizeForBackgroundDownload();
|
||||
if (!size_to_download)
|
||||
return;
|
||||
|
||||
auto reader = file_segment.getRemoteFileReader();
|
||||
if (!reader)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "No reader. "
|
||||
"File segment should not have been submitted for background download ({})",
|
||||
file_segment.getInfoForLog());
|
||||
LOG_TEST(log, "No reader in {}:{} (state: {}, range: {}, downloaded size: {})",
|
||||
file_segment.key(), file_segment.offset(), file_segment.state(),
|
||||
file_segment.range().toString(), file_segment.getDownloadedSize());
|
||||
return;
|
||||
}
|
||||
|
||||
/// If remote_fs_read_method == 'threadpool',
|
||||
@ -690,7 +694,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
if (reader->internalBuffer().empty())
|
||||
{
|
||||
if (!memory)
|
||||
memory.emplace(DBMS_DEFAULT_BUFFER_SIZE);
|
||||
memory.emplace(std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), size_to_download));
|
||||
reader->set(memory->data(), memory->size());
|
||||
}
|
||||
|
||||
@ -701,9 +705,13 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
if (offset != static_cast<size_t>(reader->getPosition()))
|
||||
reader->seek(offset, SEEK_SET);
|
||||
|
||||
while (!reader->eof())
|
||||
while (size_to_download && !reader->eof())
|
||||
{
|
||||
auto size = reader->available();
|
||||
const auto available = reader->available();
|
||||
chassert(available);
|
||||
|
||||
const auto size = std::min(available, size_to_download);
|
||||
size_to_download -= size;
|
||||
|
||||
std::string failure_reason;
|
||||
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds, failure_reason))
|
||||
@ -713,7 +721,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
"for {}:{} (downloaded size: {}/{})",
|
||||
file_segment.key(), file_segment.offset(),
|
||||
file_segment.getDownloadedSize(), file_segment.range().size());
|
||||
return;
|
||||
break;
|
||||
}
|
||||
|
||||
try
|
||||
@ -728,12 +736,14 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
|
||||
if (code == /* No space left on device */28 || code == /* Quota exceeded */122)
|
||||
{
|
||||
LOG_INFO(log, "Insert into cache is skipped due to insufficient disk space. ({})", e.displayText());
|
||||
return;
|
||||
break;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
file_segment.resetRemoteFileReader();
|
||||
|
||||
LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog());
|
||||
}
|
||||
|
||||
@ -1155,7 +1165,7 @@ std::vector<FileSegment::Info> LockedKey::sync()
|
||||
actual_size, expected_size, file_segment->getInfoForLog());
|
||||
|
||||
broken.push_back(FileSegment::getInfo(file_segment));
|
||||
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */false);
|
||||
it = removeFileSegment(file_segment->offset(), file_segment->lock(), /* can_be_broken */true);
|
||||
}
|
||||
return broken;
|
||||
}
|
||||
|
@ -210,6 +210,7 @@ public:
|
||||
|
||||
bool setBackgroundDownloadThreads(size_t threads_num);
|
||||
size_t getBackgroundDownloadThreads() const { return download_threads.size(); }
|
||||
|
||||
bool setBackgroundDownloadQueueSizeLimit(size_t size);
|
||||
|
||||
bool isBackgroundDownloadEnabled();
|
||||
|
@ -5,10 +5,10 @@
|
||||
#include <Parsers/ASTPartition.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Parsers/ASTQueryParameter.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -61,7 +61,7 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
else
|
||||
fields_count = 0;
|
||||
}
|
||||
else if (const auto* literal_ast = value->as<ASTLiteral>(); literal_ast)
|
||||
else if (const auto * literal_ast = value->as<ASTLiteral>(); literal_ast)
|
||||
{
|
||||
if (literal_ast->value.getType() == Field::Types::Tuple)
|
||||
{
|
||||
|
@ -1,71 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
#include <Parsers/TokenIterator.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <Common/SipHash.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct StringRange
|
||||
{
|
||||
const char * first = nullptr;
|
||||
const char * second = nullptr;
|
||||
|
||||
StringRange() = default;
|
||||
StringRange(const char * begin, const char * end) : first(begin), second(end) {}
|
||||
explicit StringRange(TokenIterator token) : first(token->begin), second(token->end) {}
|
||||
|
||||
StringRange(TokenIterator token_begin, TokenIterator token_end)
|
||||
{
|
||||
/// Empty range.
|
||||
if (token_begin == token_end)
|
||||
{
|
||||
first = token_begin->begin;
|
||||
second = token_begin->begin;
|
||||
return;
|
||||
}
|
||||
|
||||
TokenIterator token_last = token_end;
|
||||
--token_last;
|
||||
|
||||
first = token_begin->begin;
|
||||
second = token_last->end;
|
||||
}
|
||||
};
|
||||
|
||||
using StringPtr = std::shared_ptr<String>;
|
||||
|
||||
|
||||
inline String toString(const StringRange & range)
|
||||
{
|
||||
return range.first ? String(range.first, range.second) : String();
|
||||
}
|
||||
|
||||
/// Hashes only the values of pointers in StringRange. Is used with StringRangePointersEqualTo comparator.
|
||||
struct StringRangePointersHash
|
||||
{
|
||||
UInt64 operator()(const StringRange & range) const
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(range.first);
|
||||
hash.update(range.second);
|
||||
return hash.get64();
|
||||
}
|
||||
};
|
||||
|
||||
/// Ranges are equal only when they point to the same memory region.
|
||||
/// It may be used when it's enough to compare substrings by their position in the same string.
|
||||
struct StringRangePointersEqualTo
|
||||
{
|
||||
constexpr bool operator()(const StringRange &lhs, const StringRange &rhs) const
|
||||
{
|
||||
return std::tie(lhs.first, lhs.second) == std::tie(rhs.first, rhs.second);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -317,8 +317,9 @@ void LimitTransform::splitChunk(PortsData & data)
|
||||
length = offset + limit - (rows_read - num_rows) - start;
|
||||
}
|
||||
|
||||
/// check if other rows in current block equals to last one in limit
|
||||
if (with_ties && length)
|
||||
/// Check if other rows in current block equals to last one in limit
|
||||
/// when rows read >= offset + limit.
|
||||
if (with_ties && offset + limit <= rows_read && length)
|
||||
{
|
||||
UInt64 current_row_num = start + length;
|
||||
previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);
|
||||
|
@ -1045,7 +1045,6 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
|
||||
MarkRanges res;
|
||||
|
||||
size_t marks_count = part->index_granularity.getMarksCount();
|
||||
const auto & index = part->getIndex();
|
||||
if (marks_count == 0)
|
||||
return res;
|
||||
|
||||
@ -1073,14 +1072,19 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
|
||||
auto index_columns = std::make_shared<ColumnsWithTypeAndName>();
|
||||
const auto & key_indices = key_condition.getKeyIndices();
|
||||
DataTypes key_types;
|
||||
for (size_t i : key_indices)
|
||||
if (!key_indices.empty())
|
||||
{
|
||||
if (i < index->size())
|
||||
index_columns->emplace_back(index->at(i), primary_key.data_types[i], primary_key.column_names[i]);
|
||||
else
|
||||
index_columns->emplace_back(); /// The column of the primary key was not loaded in memory - we'll skip it.
|
||||
const auto & index = part->getIndex();
|
||||
|
||||
key_types.emplace_back(primary_key.data_types[i]);
|
||||
for (size_t i : key_indices)
|
||||
{
|
||||
if (i < index->size())
|
||||
index_columns->emplace_back(index->at(i), primary_key.data_types[i], primary_key.column_names[i]);
|
||||
else
|
||||
index_columns->emplace_back(); /// The column of the primary key was not loaded in memory - we'll skip it.
|
||||
|
||||
key_types.emplace_back(primary_key.data_types[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/// If there are no monotonic functions, there is no need to save block reference.
|
||||
|
@ -30,8 +30,8 @@ namespace ErrorCodes
|
||||
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
|
||||
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
|
||||
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
|
||||
DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
|
||||
DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \
|
||||
DECLARE(UInt64, polling_max_timeout_ms, 10 * 60 * 1000, "Maximum timeout before next polling", 0) \
|
||||
DECLARE(UInt64, polling_backoff_ms, 30 * 1000, "Polling backoff", 0) \
|
||||
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
||||
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
||||
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
|
||||
|
@ -395,7 +395,7 @@ def test_secure_connection_uri(started_cluster):
|
||||
simple_mongo_table.insert_many(data)
|
||||
node = started_cluster.instances["node"]
|
||||
node.query(
|
||||
"CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true', 'test_secure_connection_uri')"
|
||||
"CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true&authSource=admin', 'test_secure_connection_uri')"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM test_secure_connection_uri") == "100\n"
|
||||
|
@ -407,6 +407,8 @@ def test_failed_retry(started_cluster, mode, engine_name):
|
||||
additional_settings={
|
||||
"s3queue_loading_retries": retries_num,
|
||||
"keeper_path": keeper_path,
|
||||
"polling_max_timeout_ms": 5000,
|
||||
"polling_backoff_ms": 1000,
|
||||
},
|
||||
engine_name=engine_name,
|
||||
)
|
||||
@ -852,6 +854,8 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
|
||||
additional_settings={
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_buckets": 2,
|
||||
"polling_max_timeout_ms": 2000,
|
||||
"polling_backoff_ms": 1000,
|
||||
**({"s3queue_processing_threads_num": 1} if mode == "ordered" else {}),
|
||||
},
|
||||
)
|
||||
@ -929,6 +933,8 @@ def test_max_set_age(started_cluster):
|
||||
"cleanup_interval_min_ms": max_age / 3,
|
||||
"cleanup_interval_max_ms": max_age / 3,
|
||||
"loading_retries": 0,
|
||||
"polling_max_timeout_ms": 5000,
|
||||
"polling_backoff_ms": 1000,
|
||||
"processing_threads_num": 1,
|
||||
"loading_retries": 0,
|
||||
},
|
||||
@ -1423,6 +1429,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
|
||||
"keeper_path": keeper_path,
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_buckets": shards_num,
|
||||
"polling_max_timeout_ms": 1000,
|
||||
"polling_backoff_ms": 0,
|
||||
},
|
||||
)
|
||||
i += 1
|
||||
@ -1673,6 +1681,8 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads)
|
||||
"s3queue_processing_threads_num": processing_threads,
|
||||
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
|
||||
"s3queue_buckets": 2,
|
||||
"polling_max_timeout_ms": 2000,
|
||||
"polling_backoff_ms": 1000,
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -53,3 +53,5 @@
|
||||
100
|
||||
100
|
||||
100
|
||||
12
|
||||
12
|
||||
|
@ -35,4 +35,24 @@ select count() from (select number, number < 100 from numbers(2000) order by num
|
||||
SET max_block_size = 5;
|
||||
select count() from (select number < 100, number from numbers(2000) order by number < 100 desc limit 10 with ties);
|
||||
|
||||
SELECT count() FROM (WITH data AS (
|
||||
SELECT * FROM numbers(0, 10)
|
||||
UNION ALL
|
||||
SELECT * FROM numbers(10, 10)
|
||||
)
|
||||
SELECT number div 10 AS ten, number
|
||||
FROM data
|
||||
ORDER BY ten
|
||||
LIMIT 8,6 WITH TIES);
|
||||
|
||||
SELECT count() FROM (WITH data AS (
|
||||
SELECT * FROM numbers(0, 10)
|
||||
UNION ALL
|
||||
SELECT * FROM numbers(10, 10)
|
||||
)
|
||||
SELECT number div 11 AS eleven, number
|
||||
FROM data
|
||||
ORDER BY eleven
|
||||
LIMIT 8,6 WITH TIES);
|
||||
|
||||
DROP TABLE ties;
|
||||
|
@ -163,6 +163,7 @@ Filter column: notEquals(__table1.y, 2_UInt8)
|
||||
> filter is pushed down before CreatingSets
|
||||
CreatingSets
|
||||
Filter
|
||||
Filter
|
||||
1
|
||||
3
|
||||
> one condition of filter is pushed down before LEFT JOIN
|
||||
|
@ -332,12 +332,13 @@ SETTINGS optimize_aggregators_of_group_by_keys=0 -- avoid removing any() as it d
|
||||
Expression (Projection)
|
||||
Sorting (Sorting for ORDER BY)
|
||||
Expression (Before ORDER BY)
|
||||
Filter (((WHERE + (Projection + Before ORDER BY)) + HAVING))
|
||||
Aggregating
|
||||
Expression ((Before GROUP BY + Projection))
|
||||
Sorting (Sorting for ORDER BY)
|
||||
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
|
||||
ReadFromSystemNumbers
|
||||
Filter ((WHERE + (Projection + Before ORDER BY)))
|
||||
Filter (HAVING)
|
||||
Aggregating
|
||||
Expression ((Before GROUP BY + Projection))
|
||||
Sorting (Sorting for ORDER BY)
|
||||
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
|
||||
ReadFromSystemNumbers
|
||||
-- execute
|
||||
1
|
||||
2
|
||||
|
@ -28,17 +28,21 @@ WHERE type_1 = \'all\'
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(Filter)
|
||||
FilterTransform × 6
|
||||
(Aggregating)
|
||||
ExpressionTransform × 2
|
||||
AggregatingTransform × 2
|
||||
Copy 1 → 2
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromMergeTree)
|
||||
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
|
||||
FilterTransform × 2
|
||||
(Filter)
|
||||
FilterTransform × 2
|
||||
(Filter)
|
||||
FilterTransform × 2
|
||||
(Aggregating)
|
||||
ExpressionTransform × 2
|
||||
AggregatingTransform × 2
|
||||
Copy 1 → 2
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromMergeTree)
|
||||
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(Filter)
|
||||
@ -64,10 +68,14 @@ ExpressionTransform × 2
|
||||
ExpressionTransform × 2
|
||||
AggregatingTransform × 2
|
||||
Copy 1 → 2
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromMergeTree)
|
||||
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
|
||||
(Filter)
|
||||
FilterTransform
|
||||
(Filter)
|
||||
FilterTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromMergeTree)
|
||||
MergeTreeSelect(pool: ReadPoolInOrder, algorithm: InOrder) 0 → 1
|
||||
(Expression)
|
||||
ExpressionTransform × 2
|
||||
(Aggregating)
|
||||
|
@ -1,4 +1,18 @@
|
||||
100000000 100000000
|
||||
0 0
|
||||
Row 1:
|
||||
──────
|
||||
round(primary_key_bytes_in_memory, -7): 100000000 -- 100.00 million
|
||||
round(primary_key_bytes_in_memory_allocated, -7): 100000000 -- 100.00 million
|
||||
Row 1:
|
||||
──────
|
||||
primary_key_bytes_in_memory: 0
|
||||
primary_key_bytes_in_memory_allocated: 0
|
||||
1
|
||||
100000000 100000000
|
||||
Row 1:
|
||||
──────
|
||||
primary_key_bytes_in_memory: 0
|
||||
primary_key_bytes_in_memory_allocated: 0
|
||||
1
|
||||
Row 1:
|
||||
──────
|
||||
round(primary_key_bytes_in_memory, -7): 100000000 -- 100.00 million
|
||||
round(primary_key_bytes_in_memory_allocated, -7): 100000000 -- 100.00 million
|
||||
|
@ -3,17 +3,26 @@ CREATE TABLE test (s String) ENGINE = MergeTree ORDER BY s SETTINGS index_granul
|
||||
|
||||
SET optimize_trivial_insert_select = 1;
|
||||
INSERT INTO test SELECT randomString(1000) FROM numbers(100000);
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test';
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
|
||||
|
||||
DETACH TABLE test;
|
||||
SET max_memory_usage = '50M';
|
||||
ATTACH TABLE test;
|
||||
|
||||
SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test';
|
||||
SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
|
||||
|
||||
SET max_memory_usage = '200M';
|
||||
|
||||
-- Run a query that doesn use indexes
|
||||
SELECT s != '' FROM test LIMIT 1;
|
||||
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test';
|
||||
-- Check that index was not loaded
|
||||
SELECT primary_key_bytes_in_memory, primary_key_bytes_in_memory_allocated FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
|
||||
|
||||
-- Run a query that uses PK index
|
||||
SELECT s != '' FROM test WHERE s < '9999999999' LIMIT 1;
|
||||
|
||||
-- Check that index was loaded
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table = 'test' FORMAT Vertical;
|
||||
|
||||
DROP TABLE test;
|
||||
|
@ -5,9 +5,19 @@
|
||||
100000000 100000000
|
||||
0 0
|
||||
0 0
|
||||
Query that does not use index for table `test`
|
||||
1
|
||||
0 0
|
||||
0 0
|
||||
Query that uses index in for table `test`
|
||||
1
|
||||
100000000 100000000
|
||||
0 0
|
||||
Query that does not use index for table `test2`
|
||||
1
|
||||
100000000 100000000
|
||||
0 0
|
||||
Query that uses index for table `test2`
|
||||
1
|
||||
100000000 100000000
|
||||
100000000 100000000
|
||||
|
@ -16,8 +16,18 @@ SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory
|
||||
SYSTEM UNLOAD PRIMARY KEY {CLICKHOUSE_DATABASE:Identifier}.test2;
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
|
||||
|
||||
SELECT 'Query that does not use index for table `test`';
|
||||
SELECT s != '' FROM test LIMIT 1;
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
|
||||
|
||||
SELECT 'Query that uses index in for table `test`';
|
||||
SELECT s != '' FROM test WHERE s < '99999999' LIMIT 1;
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
|
||||
|
||||
SELECT 'Query that does not use index for table `test2`';
|
||||
SELECT s != '' FROM test2 LIMIT 1;
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
|
||||
|
||||
SELECT 'Query that uses index for table `test2`';
|
||||
SELECT s != '' FROM test2 WHERE s < '99999999' LIMIT 1;
|
||||
SELECT round(primary_key_bytes_in_memory, -7), round(primary_key_bytes_in_memory_allocated, -7) FROM system.parts WHERE database = currentDatabase() AND table IN ('test', 'test2');
|
||||
|
@ -1 +1 @@
|
||||
test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB
|
||||
test 10000000 352 39 39
|
||||
|
@ -1,4 +1,4 @@
|
||||
-- Tags: no-random-settings
|
||||
-- Tags: no-random-settings, no-fasttest
|
||||
|
||||
set allow_experimental_dynamic_type = 1;
|
||||
set allow_experimental_json_type = 1;
|
||||
@ -10,10 +10,10 @@ insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(
|
||||
|
||||
SELECT
|
||||
`table`,
|
||||
formatReadableQuantity(sum(rows)) AS rows,
|
||||
formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed,
|
||||
formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed,
|
||||
formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk
|
||||
sum(rows) AS rows,
|
||||
floor(sum(data_uncompressed_bytes) / (1024 * 1024)) AS data_size_uncompressed,
|
||||
floor(sum(data_compressed_bytes) / (1024 * 1024)) AS data_size_compressed,
|
||||
floor(sum(bytes_on_disk) / (1024 * 1024)) AS total_size_on_disk
|
||||
FROM system.parts
|
||||
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
|
||||
GROUP BY `table`
|
||||
|
@ -1,2 +1,2 @@
|
||||
Condition: and((materialize(auid) in [1, 1]), (_CAST(toDate(ts)) in (-Inf, 1703980800]))
|
||||
Granules: 1/3
|
||||
Condition: (_CAST(toDate(ts)) in (-Inf, 1703980800])
|
||||
Granules: 3/3
|
||||
|
@ -0,0 +1,2 @@
|
||||
-0.07947094746692918 -1017248723 0
|
||||
-0.07947094746692918 -1017248723 0
|
10
tests/queries/0_stateless/03271_sqllancer_having_issue.sql
Normal file
10
tests/queries/0_stateless/03271_sqllancer_having_issue.sql
Normal file
@ -0,0 +1,10 @@
|
||||
-- https://s3.amazonaws.com/clickhouse-test-reports/0/a02b20a9813c6ba0880c67f079363ef1c5440109/sqlancer__debug_.html
|
||||
-- Caused by enablement of query_plan_merge_filters. Will fail if the next line is uncommented
|
||||
-- set query_plan_merge_filters=1;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS t3 (c0 Int32) ENGINE = Memory() ;
|
||||
INSERT INTO t3(c0) VALUES (1110866669);
|
||||
|
||||
-- These 2 queries are expected to return the same
|
||||
SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 SETTINGS aggregate_functions_null_for_empty=1, enable_optimize_predicate_expression=0;
|
||||
SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING ((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723)))) UNION ALL SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING (NOT (((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723)))))) UNION ALL SELECT (tan (t3.c0)), SUM(-1017248723), ((t3.c0)%(t3.c0)) FROM t3 GROUP BY t3.c0 HAVING ((((tan ((- (SUM(-1017248723)))))) and ((sqrt (SUM(-1017248723))))) IS NULL) SETTINGS aggregate_functions_null_for_empty=1, enable_optimize_predicate_expression=0;
|
Loading…
Reference in New Issue
Block a user