Merge branch 'master' into system_table_add_column

This commit is contained in:
chen 2022-10-14 11:50:47 +08:00 committed by GitHub
commit e82ae0ffce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 424 additions and 91 deletions

View File

@ -23,7 +23,7 @@ namespace
{
/// Trim ending whitespace inplace
void trim(String & s)
void rightTrim(String & s)
{
s.erase(std::find_if(s.rbegin(), s.rend(), [](unsigned char ch) { return !std::isspace(ch); }).base(), s.end());
}
@ -441,7 +441,7 @@ LineReader::InputStatus ReplxxLineReader::readOneLine(const String & prompt)
return (errno != EAGAIN) ? ABORT : RESET_LINE;
input = cinput;
trim(input);
rightTrim(input);
return INPUT_LINE;
}
@ -512,6 +512,9 @@ void ReplxxLineReader::openInteractiveHistorySearch()
/// NOTE: You can use one of the following to configure the behaviour additionally:
/// - SKIM_DEFAULT_OPTIONS
/// - FZF_DEFAULT_OPTS
///
/// And also note, that fzf and skim is 95% compatible (at least option
/// that is used here)
std::string fuzzy_finder_command = fmt::format(
"{} --read0 --tac --no-sort --tiebreak=index --bind=ctrl-r:toggle-sort --height=30% < {} > {}",
fuzzy_finder, history_file.getPath(), output_file.getPath());
@ -521,7 +524,8 @@ void ReplxxLineReader::openInteractiveHistorySearch()
{
if (executeCommand(argv) == 0)
{
const std::string & new_query = readFile(output_file.getPath());
std::string new_query = readFile(output_file.getPath());
rightTrim(new_query);
rx.set_state(replxx::Replxx::State(new_query.c_str(), new_query.size()));
}
}

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit 49c656c62fbd36a1bc20d64c476853bdb7cf7bb9
Subproject commit 05ec08ce61e4b5c44692cc2f1ce4b6d8596679bf

@ -1 +1 @@
Subproject commit c7f7cfc85e4b81c1c76cdd633dd8808d2dfd6114
Subproject commit 3a39038345a400e7e767811b142a94355d511215

View File

@ -1,4 +1,4 @@
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined" OR NOT USE_STATIC_LIBRARIES)
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
@ -6,15 +6,16 @@ endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
# If USE_STATIC_LIBRARIES=0 was passed to CMake, we'll still build LLVM statically to keep complexity minimal.
if (NOT ENABLE_EMBEDDED_COMPILER)
message(STATUS "Not using LLVM")
return()
endif()
# TODO: Enable shared library build
# TODO: Enable compilation on AArch64
set (LLVM_VERSION "14.0.0bundled")
set (LLVM_VERSION "15.0.0bundled")
set (LLVM_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include"
"${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm/include"
@ -62,9 +63,6 @@ set (REQUIRED_LLVM_LIBRARIES
# list(APPEND REQUIRED_LLVM_LIBRARIES LLVMAArch64Info LLVMAArch64Desc LLVMAArch64CodeGen)
# endif ()
# ld: unknown option: --color-diagnostics
# set (LINKER_SUPPORTS_COLOR_DIAGNOSTICS 0 CACHE INTERNAL "")
set (CMAKE_INSTALL_RPATH "ON") # Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "") # Skip internal compiler selection
set (LLVM_ENABLE_EH 1 CACHE INTERNAL "") # With exception handling
@ -97,9 +95,6 @@ set(LLVM_INCLUDE_DOCS 0 CACHE INTERNAL "")
set(LLVM_ENABLE_OCAMLDOC 0 CACHE INTERNAL "")
set(LLVM_ENABLE_BINDINGS 0 CACHE INTERNAL "")
# C++20 is currently not supported due to ambiguous operator != etc.
set (CMAKE_CXX_STANDARD 17)
set (LLVM_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm")
set (LLVM_BINARY_DIR "${ClickHouse_BINARY_DIR}/contrib/llvm-project/llvm")
add_subdirectory ("${LLVM_SOURCE_DIR}" "${LLVM_BINARY_DIR}")

View File

@ -38,13 +38,13 @@ For other Linux distribution - check the availability of the [prebuild packages]
#### Use the latest clang for Builds
``` bash
export CC=clang-14
export CXX=clang++-14
export CC=clang-15
export CXX=clang++-15
```
In this example we use version 14 that is the latest as of Feb 2022.
In this example we use version 15 that is the latest as of Sept 2022.
Gcc can also be used though it is discouraged.
Gcc cannot be used.
### Checkout ClickHouse Sources {#checkout-clickhouse-sources}

View File

@ -1502,6 +1502,21 @@ If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored.
- Policy should have exactly one volume with local disks.
:::
## max_temporary_data_on_disk_size {#max_temporary_data_on_disk_size}
Limit the amount of disk space consumed by temporary files in `tmp_path` for the server.
Queries that exceed this limit will fail with an exception.
Default value: `0`.
**See also**
- [max_temporary_data_on_disk_size_for_user](../../operations/settings/query-complexity.md#settings_max_temporary_data_on_disk_size_for_user)
- [max_temporary_data_on_disk_size_for_query](../../operations/settings/query-complexity.md#settings_max_temporary_data_on_disk_size_for_query)
- [tmp_path](#tmp-path)
- [tmp_policy](#tmp-policy)
- [max_server_memory_usage](#max_server_memory_usage)
## uncompressed_cache_size {#server-settings-uncompressed_cache_size}
Cache size (in bytes) for uncompressed data used by table engines from the [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md).

View File

@ -313,4 +313,19 @@ When inserting data, ClickHouse calculates the number of partitions in the inser
> “Too many partitions for single INSERT block (more than” + toString(max_parts) + “). The limit is controlled by max_partitions_per_insert_block setting. A large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).”
## max_temporary_data_on_disk_size_for_user {#settings_max_temporary_data_on_disk_size_for_user}
The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries.
Zero means unlimited.
Default value: 0.
## max_temporary_data_on_disk_size_for_query {#settings_max_temporary_data_on_disk_size_for_query}
The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries.
Zero means unlimited.
Default value: 0.
[Original article](https://clickhouse.com/docs/en/operations/settings/query_complexity/) <!--hide-->

View File

@ -971,10 +971,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Storage with temporary data for processing of heavy queries.
{
std::string tmp_path = config().getString("tmp_path", path / "tmp/");
std::string tmp_policy = config().getString("tmp_policy", "");
size_t tmp_max_size = config().getUInt64("tmp_max_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy, tmp_max_size);
std::string temporary_path = config().getString("tmp_path", path / "tmp/");
std::string temporary_policy = config().getString("tmp_policy", "");
size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(temporary_path, temporary_policy, max_size);
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}

View File

@ -1498,6 +1498,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
if (!old_settings)
old_settings.emplace(global_context->getSettingsRef());
global_context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
global_context->resetSettingsToDefaultValue(settings_ast.as<ASTSetQuery>()->default_settings);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
@ -1543,6 +1544,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
else
global_context->applySettingChange(change);
}
global_context->resetSettingsToDefaultValue(set_query->default_settings);
}
if (const auto * use_query = parsed_query->as<ASTUseQuery>())
{

View File

@ -33,6 +33,7 @@
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
M(NetworkReceive, "Number of threads receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \

View File

@ -890,6 +890,8 @@ struct Settings : public BaseSettings<SettingsTraits>, public IHints<2, Settings
void set(std::string_view name, const Field & value) override;
void setDefaultValue(const String & name) { resetToDefault(name); }
private:
void applyCompatibilitySetting();

View File

@ -1,17 +1,31 @@
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <filesystem>
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace CurrentMetrics
{
extern const Metric TotalTemporaryFiles;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_)
: TemporaryFileOnDisk(disk_, disk_->getPath())
: TemporaryFileOnDisk(disk_, "")
{}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope)
@ -20,33 +34,54 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::
sub_metric_increment.emplace(metric_scope);
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_)
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix)
: disk(disk_)
, metric_increment(CurrentMetrics::TotalTemporaryFiles)
{
/// is is possible to use with disk other than DickLocal ?
disk->createDirectories(prefix_);
if (!disk)
throw Exception("Disk is not specified", ErrorCodes::LOGICAL_ERROR);
if (fs::path prefix_path(prefix); prefix_path.has_parent_path())
disk->createDirectories(prefix_path.parent_path());
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
/// Do not use default temporaty root path `/tmp/tmpXXXXXX`.
/// The `dummy_prefix` is used to know what to replace with the real prefix.
String dummy_prefix = "a/";
filepath = Poco::TemporaryFile::tempName(dummy_prefix);
relative_path = Poco::TemporaryFile::tempName(dummy_prefix);
dummy_prefix += "tmp";
/// a/tmpXXXXX -> <prefix>XXXXX
assert(filepath.starts_with(dummy_prefix));
filepath.replace(0, dummy_prefix.length(), prefix_);
assert(relative_path.starts_with(dummy_prefix));
relative_path.replace(0, dummy_prefix.length(), prefix);
if (relative_path.empty())
throw Exception("Temporary file name is empty", ErrorCodes::LOGICAL_ERROR);
}
String TemporaryFileOnDisk::getPath() const
{
return std::filesystem::path(disk->getPath()) / relative_path;
}
TemporaryFileOnDisk::~TemporaryFileOnDisk()
{
try
{
if (disk && !filepath.empty() && disk->exists(filepath))
disk->removeRecursive(filepath);
if (!disk || relative_path.empty())
return;
if (!disk->exists(relative_path))
{
LOG_WARNING(&Poco::Logger::get("TemporaryFileOnDisk"), "Temporary path '{}' does not exist in '{}'", relative_path, disk->getPath());
return;
}
disk->removeRecursive(relative_path);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

View File

@ -5,12 +5,6 @@
#include <Disks/IDisk.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric TotalTemporaryFiles;
}
namespace DB
{
using DiskPtr = std::shared_ptr<IDisk>;
@ -24,20 +18,21 @@ class TemporaryFileOnDisk
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix);
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
const String & getPath() const { return filepath; }
const String & path() const { return filepath; }
String getPath() const;
private:
DiskPtr disk;
String filepath;
/// Relative path in disk to the temporary file or directory
String relative_path;
CurrentMetrics::Increment metric_increment;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TotalTemporaryFiles};
/// Specified if we know what for file is used (sort/aggregate/join).
std::optional<CurrentMetrics::Increment> sub_metric_increment = {};
};

View File

@ -659,8 +659,8 @@ class FunctionBinaryArithmetic : public IFunction
static FunctionOverloadResolverPtr
getFunctionForIntervalArithmetic(const DataTypePtr & type0, const DataTypePtr & type1, ContextPtr context)
{
bool first_is_date_or_datetime = isDate(type0) || isDateTime(type0) || isDateTime64(type0);
bool second_is_date_or_datetime = isDate(type1) || isDateTime(type1) || isDateTime64(type1);
bool first_is_date_or_datetime = isDateOrDate32(type0) || isDateTime(type0) || isDateTime64(type0);
bool second_is_date_or_datetime = isDateOrDate32(type1) || isDateTime(type1) || isDateTime64(type1);
/// Exactly one argument must be Date or DateTime
if (first_is_date_or_datetime == second_is_date_or_datetime)
@ -699,7 +699,7 @@ class FunctionBinaryArithmetic : public IFunction
}
else
{
if (isDate(type_time))
if (isDateOrDate32(type_time))
function_name = is_plus ? "addDays" : "subtractDays";
else
function_name = is_plus ? "addSeconds" : "subtractSeconds";
@ -895,7 +895,7 @@ class FunctionBinaryArithmetic : public IFunction
ColumnsWithTypeAndName new_arguments = arguments;
/// Interval argument must be second.
if (isDate(arguments[1].type) || isDateTime(arguments[1].type) || isDateTime64(arguments[1].type))
if (isDateOrDate32(arguments[1].type) || isDateTime(arguments[1].type) || isDateTime64(arguments[1].type))
std::swap(new_arguments[0], new_arguments[1]);
/// Change interval argument type to its representation
@ -1099,7 +1099,7 @@ public:
new_arguments[i].type = arguments[i];
/// Interval argument must be second.
if (isDate(new_arguments[1].type) || isDateTime(new_arguments[1].type) || isDateTime64(new_arguments[1].type))
if (isDateOrDate32(new_arguments[1].type) || isDateTime(new_arguments[1].type) || isDateTime64(new_arguments[1].type))
std::swap(new_arguments[0], new_arguments[1]);
/// Change interval argument to its representation

View File

@ -91,15 +91,35 @@ static size_t getTypeDepth(const DataTypePtr & type)
return 0;
}
template <typename T>
static bool decimalEqualsFloat(Field field, Float64 float_value)
{
auto decimal_field = field.get<DecimalField<T>>();
auto decimal_to_float = DecimalUtils::convertTo<Float64>(decimal_field.getValue(), decimal_field.getScale());
return decimal_to_float == float_value;
}
/// Applies stricter rules than convertFieldToType:
/// Doesn't allow :
/// - loss of precision with `Decimals`
/// - loss of precision converting to Decimal
static bool convertFieldToTypeStrict(const Field & from_value, const IDataType & to_type, Field & result_value)
{
result_value = convertFieldToType(from_value, to_type);
if (Field::isDecimal(from_value.getType()) && Field::isDecimal(result_value.getType()))
return applyVisitor(FieldVisitorAccurateEquals{}, from_value, result_value);
if (from_value.getType() == Field::Types::Float64 && Field::isDecimal(result_value.getType()))
{
/// Convert back to Float64 and compare
if (result_value.getType() == Field::Types::Decimal32)
return decimalEqualsFloat<Decimal32>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal64)
return decimalEqualsFloat<Decimal64>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal128)
return decimalEqualsFloat<Decimal128>(result_value, from_value.get<Float64>());
if (result_value.getType() == Field::Types::Decimal256)
return decimalEqualsFloat<Decimal256>(result_value, from_value.get<Float64>());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown decimal type {}", result_value.getTypeName());
}
return true;
}

View File

@ -570,7 +570,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_)
, keys_positions(calculateKeysPositions(header, params_))
, params(params_)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope) : nullptr)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope, CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{
/// Use query-level memory tracker
@ -1573,7 +1573,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
Stopwatch watch;
size_t rows = data_variants.size();
auto & out_stream = tmp_data->createStream(getHeader(false), CurrentMetrics::TemporaryFilesForAggregation, max_temp_file_size);
auto & out_stream = tmp_data->createStream(getHeader(false), max_temp_file_size);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.path());

View File

@ -1374,6 +1374,15 @@ void Context::clampToSettingsConstraints(SettingsChanges & changes) const
getSettingsConstraintsAndCurrentProfiles()->constraints.clamp(settings, changes);
}
void Context::resetSettingsToDefaultValue(const std::vector<String> & names)
{
auto lock = getLock();
for (const String & name: names)
{
settings.setDefaultValue(name);
}
}
std::shared_ptr<const SettingsConstraintsAndProfileIDs> Context::getSettingsConstraintsAndCurrentProfiles() const
{
auto lock = getLock();

View File

@ -647,6 +647,9 @@ public:
void checkSettingsConstraints(SettingsChanges & changes) const;
void clampToSettingsConstraints(SettingsChanges & changes) const;
/// Reset settings to default value
void resetSettingsToDefaultValue(const std::vector<String> & names);
/// Returns the current constraints (can return null).
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfiles() const;

View File

@ -13,6 +13,7 @@ BlockIO InterpreterSetQuery::execute()
auto session_context = getContext()->getSessionContext();
session_context->applySettingsChanges(ast.changes);
session_context->addQueryParameters(ast.query_parameters);
session_context->resetSettingsToDefaultValue(ast.default_settings);
return {};
}
@ -22,6 +23,7 @@ void InterpreterSetQuery::executeForCurrentContext()
const auto & ast = query_ptr->as<ASTSetQuery &>();
getContext()->checkSettingsConstraints(ast.changes);
getContext()->applySettingsChanges(ast.changes);
getContext()->resetSettingsToDefaultValue(ast.default_settings);
}
}

View File

@ -1033,7 +1033,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{
auto load_func = [&]() -> std::shared_ptr<Block>
{
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block));
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getPath(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read());
};

View File

@ -264,7 +264,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{
return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
return Pipe(std::make_shared<TemporaryFileLazySource>(file->getPath(), materializeBlock(sample_block)));
}

View File

@ -41,7 +41,7 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(int compressed_delta, int unco
stat.uncompressed_size += uncompressed_delta;
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size)
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
{
DiskPtr disk;
if (max_file_size > 0)
@ -56,7 +56,7 @@ TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, Cu
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, metric_scope);
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
@ -94,9 +94,9 @@ struct TemporaryFileStream::OutputWriter
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
out_writer.write(block);
num_rows += block.rows();
}
void finalize()
{
if (finalized)
@ -127,6 +127,8 @@ struct TemporaryFileStream::OutputWriter
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
std::atomic_size_t num_rows = 0;
bool finalized = false;
};
@ -157,7 +159,7 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(file->path(), header))
, out_writer(std::make_unique<OutputWriter>(file->getPath(), header))
{
}
@ -172,6 +174,9 @@ void TemporaryFileStream::write(const Block & block)
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (isWriteFinished())
return stat;
if (out_writer)
{
out_writer->finalize();
@ -196,19 +201,19 @@ Block TemporaryFileStream::read()
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isFinalized())
if (isEof())
return {};
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(file->path(), header);
in_reader = std::make_unique<InputReader>(file->getPath(), header);
}
Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->finalize();
this->release();
}
return block;
}
@ -223,20 +228,21 @@ void TemporaryFileStream::updateAllocAndCheck()
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
file->path(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
file->getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
stat.num_rows = out_writer->num_rows;
}
bool TemporaryFileStream::isFinalized() const
bool TemporaryFileStream::isEof() const
{
return file == nullptr;
}
void TemporaryFileStream::finalize()
void TemporaryFileStream::release()
{
if (file)
{
@ -258,7 +264,7 @@ TemporaryFileStream::~TemporaryFileStream()
{
try
{
finalize();
release();
}
catch (...)
{

View File

@ -5,6 +5,13 @@
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesUnknown;
}
namespace DB
{
@ -18,7 +25,6 @@ using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryFileStream;
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
@ -65,15 +71,21 @@ protected:
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
public:
using TemporaryDataOnDiskScope::StatAtomic;
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), 0)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
{}
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
, current_metric_scope(metric_scope)
{}
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size = 0);
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
@ -83,6 +95,8 @@ public:
private:
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
typename CurrentMetrics::Value current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
};
/*
@ -99,6 +113,7 @@ public:
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
size_t num_rows = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
@ -109,17 +124,19 @@ public:
Block read();
const String & path() const { return file->getPath(); }
const String path() const { return file->getPath(); }
Block getHeader() const { return header; }
/// Read finished and file released
bool isEof() const;
~TemporaryFileStream();
private:
void updateAllocAndCheck();
/// Finalize everything, close reader and writer, delete file
void finalize();
bool isFinalized() const;
/// Release everything, close reader and writer, delete file
void release();
TemporaryDataOnDisk * parent;

View File

@ -109,6 +109,21 @@ Field convertDecimalToDecimalType(const Field & from, const DataTypeDecimal<T> &
return DecimalField<T>(value, type.getScale());
}
template <typename From, typename T>
Field convertFloatToDecimalType(const Field & from, const DataTypeDecimal<T> & type)
{
From value = from.get<From>();
if (!type.canStoreWhole(value))
throw Exception("Number is too big to place in " + type.getName(), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
//String sValue = convertFieldToString(from);
//int fromScale = sValue.length()- sValue.find('.') - 1;
UInt32 scale = type.getScale();
auto scaled_value = convertToDecimal<DataTypeNumber<From>, DataTypeDecimal<T>>(value, scale);
return DecimalField<T>(scaled_value, scale);
}
template <typename To>
Field convertDecimalType(const Field & from, const To & type)
{
@ -135,6 +150,9 @@ Field convertDecimalType(const Field & from, const To & type)
if (from.getType() == Field::Types::Decimal128)
return convertDecimalToDecimalType<Decimal128>(from, type);
if (from.getType() == Field::Types::Float64)
return convertFloatToDecimalType<Float64>(from, type);
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch in IN or VALUES section. Expected: {}. Got: {}",
type.getName(), from.getType());
}

View File

@ -37,6 +37,17 @@ void ASTSetQuery::formatImpl(const FormatSettings & format, FormatState &, Forma
format.ostr << " = " << applyVisitor(FieldVisitorToString(), change.value);
}
for (const auto & setting_name : default_settings)
{
if (!first)
format.ostr << ", ";
else
first = false;
formatSettingName(setting_name, format.ostr);
format.ostr << " = DEFAULT";
}
for (const auto & [name, value] : query_parameters)
{
if (!first)

View File

@ -23,6 +23,8 @@ public:
bool print_in_format = true;
SettingsChanges changes;
/// settings that will be reset to default value
std::vector<String> default_settings;
NameToNameMap query_parameters;
/** Get the text that identifies this element. */

View File

@ -118,6 +118,40 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
return true;
}
bool ParserSetQuery::parseNameValuePairWithDefault(SettingChange & change, String & default_settings, IParser::Pos & pos, Expected & expected)
{
ParserCompoundIdentifier name_p;
ParserLiteralOrMap value_p;
ParserToken s_eq(TokenType::Equals);
ASTPtr name;
ASTPtr value;
bool is_default = false;
if (!name_p.parse(pos, name, expected))
return false;
if (!s_eq.ignore(pos, expected))
return false;
if (ParserKeyword("TRUE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
else if (ParserKeyword("FALSE").ignore(pos, expected))
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
else if (ParserKeyword("DEFAULT").ignore(pos, expected))
is_default = true;
else if (!value_p.parse(pos, value, expected))
return false;
tryGetIdentifierNameInto(name, change.name);
if (is_default)
default_settings = change.name;
else
change.value = value->as<ASTLiteral &>().value;
return true;
}
bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
@ -137,20 +171,24 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
SettingsChanges changes;
NameToNameMap query_parameters;
std::vector<String> default_settings;
while (true)
{
if ((!changes.empty() || !query_parameters.empty()) && !s_comma.ignore(pos))
if ((!changes.empty() || !query_parameters.empty() || !default_settings.empty()) && !s_comma.ignore(pos))
break;
/// Either a setting or a parameter for prepared statement (if name starts with QUERY_PARAMETER_NAME_PREFIX)
SettingChange current;
String name_of_default_setting;
if (!parseNameValuePair(current, pos, expected))
if (!parseNameValuePairWithDefault(current, name_of_default_setting, pos, expected))
return false;
if (current.name.starts_with(QUERY_PARAMETER_NAME_PREFIX))
query_parameters.emplace(convertToQueryParameter(std::move(current)));
else if (!name_of_default_setting.empty())
default_settings.emplace_back(std::move(name_of_default_setting));
else
changes.push_back(std::move(current));
}
@ -161,6 +199,7 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->is_standalone = !parse_only_internals;
query->changes = std::move(changes);
query->query_parameters = std::move(query_parameters);
query->default_settings = std::move(default_settings);
return true;
}

View File

@ -17,6 +17,7 @@ class ParserSetQuery : public IParserBase
public:
explicit ParserSetQuery(bool parse_only_internals_ = false) : parse_only_internals(parse_only_internals_) {}
static bool parseNameValuePair(SettingChange & change, IParser::Pos & pos, Expected & expected);
static bool parseNameValuePairWithDefault(SettingChange & change, String & default_settings, IParser::Pos & pos, Expected & expected);
protected:
const char * getName() const override { return "SET query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;

View File

@ -9,6 +9,12 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
@ -197,7 +203,7 @@ void SortingStep::mergeSorting(QueryPipelineBuilder & pipeline, const SortDescri
max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort,
std::make_unique<TemporaryDataOnDisk>(tmp_data),
std::make_unique<TemporaryDataOnDisk>(tmp_data, CurrentMetrics::TemporaryFilesForSort),
min_free_disk_space);
});
}

View File

@ -22,10 +22,6 @@ namespace ProfileEvents
extern const Event ExternalProcessingUncompressedBytesTotal;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
@ -171,7 +167,7 @@ void MergeSortingTransform::consume(Chunk chunk)
{
/// If there's less free disk space than reserve_size, an exception will be thrown
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
auto & tmp_stream = tmp_data->createStream(header_without_constants, CurrentMetrics::TemporaryFilesForSort, reserve_size);
auto & tmp_stream = tmp_data->createStream(header_without_constants, reserve_size);
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);

View File

@ -5683,7 +5683,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
{
const auto & metadata_snapshot = storage_snapshot->metadata;
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections || query_info.is_projection_query)
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections || query_info.is_projection_query
|| settings.aggregate_functions_null_for_empty /* projections don't work correctly with this setting */)
return std::nullopt;
// Currently projections don't support parallel replicas reading yet.

View File

@ -77,15 +77,6 @@ struct StorageInMemoryMetadata
/// Sets projections
void setProjections(ProjectionsDescription projections_);
/// Set partition key for storage (methods below, are just wrappers for this struct).
void setPartitionKey(const KeyDescription & partition_key_);
/// Set sorting key for storage (methods below, are just wrappers for this struct).
void setSortingKey(const KeyDescription & sorting_key_);
/// Set primary key for storage (methods below, are just wrappers for this struct).
void setPrimaryKey(const KeyDescription & primary_key_);
/// Set sampling key for storage (methods below, are just wrappers for this struct).
void setSamplingKey(const KeyDescription & sampling_key_);
/// Set common table TTLs
void setTableTTLs(const TTLTableDescription & table_ttl_);

View File

@ -166,3 +166,63 @@
2005-01-01
2004-01-01
2003-01-01
2216-09-23
2216-10-13
2216-11-02
2216-11-22
2216-12-12
2217-01-01
2217-01-21
2217-02-10
2217-03-02
2217-03-22
2217-04-11
2217-03-22
2217-03-02
2217-02-10
2217-01-21
2217-01-01
2216-12-12
2216-11-22
2216-11-02
2216-10-13
2215-05-01
2215-09-01
2216-01-01
2216-05-01
2216-09-01
2217-01-01
2217-05-01
2217-09-01
2218-01-01
2218-05-01
2218-09-01
2218-05-01
2218-01-01
2217-09-01
2217-05-01
2217-01-01
2216-09-01
2216-05-01
2216-01-01
2215-09-01
2197-01-01
2201-01-01
2205-01-01
2209-01-01
2213-01-01
2217-01-01
2221-01-01
2225-01-01
2229-01-01
2233-01-01
2237-01-01
2233-01-01
2229-01-01
2225-01-01
2221-01-01
2217-01-01
2213-01-01
2209-01-01
2205-01-01
2201-01-01

View File

@ -53,8 +53,18 @@ SELECT toDate('2017-01-01') - INTERVAL 1 YEAR AS x;
SELECT toDate('2017-01-01') - INTERVAL -1 YEAR AS x;
SELECT toDate('2017-01-01') + INTERVAL number - 15 MONTH AS x FROM system.numbers LIMIT 30;
SELECT INTERVAL number - 15 MONTH + toDate('2017-01-01') AS x FROM system.numbers LIMIT 30;
SELECT toDate('2017-01-01') - INTERVAL number - 15 MONTH AS x FROM system.numbers LIMIT 30;
SELECT toDate('2017-01-01') + INTERVAL number - 15 YEAR AS x FROM system.numbers LIMIT 30;
SELECT INTERVAL number - 15 YEAR + toDate('2017-01-01') AS x FROM system.numbers LIMIT 30;
SELECT toDate('2017-01-01') - INTERVAL number - 15 YEAR AS x FROM system.numbers LIMIT 30;
SELECT toDate32('2217-01-01') + INTERVAL number * 20 - 100 DAY AS x FROM system.numbers LIMIT 10;
SELECT INTERVAL 100 - number * 20 DAY + toDate32('2217-01-01') AS x FROM system.numbers LIMIT 10;
SELECT INTERVAL number * 4 - 20 MONTH + toDate32('2217-01-01') AS x FROM system.numbers LIMIT 10;
SELECT toDate32('2217-01-01') - INTERVAL number * 4 - 20 MONTH AS x FROM system.numbers LIMIT 10;
SELECT INTERVAL number * 4 - 20 YEAR + toDate32('2217-01-01') AS x FROM system.numbers LIMIT 10;
SELECT toDate32('2217-01-01') - INTERVAL number * 4 - 20 YEAR AS x FROM system.numbers LIMIT 10;

View File

@ -0,0 +1,21 @@
0
1
1
0
0
1
1
0
0
1
1
0
0
1
1
0
1
1
1
1
1

View File

@ -0,0 +1,32 @@
SELECT toDecimal32(1.555,3) IN (1.5551);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555);
SELECT toDecimal32(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal32(1.555,3) IN (1.550,1.5);
SELECT toDecimal64(1.555,3) IN (1.5551);
SELECT toDecimal64(1.555,3) IN (1.5551,1.555);
SELECT toDecimal64(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal64(1.555,3) IN (1.550,1.5);
SELECT toDecimal128(1.555,3) IN (1.5551);
SELECT toDecimal128(1.555,3) IN (1.5551,1.555);
SELECT toDecimal128(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal128(1.555,3) IN (1.550,1.5);
SELECT toDecimal256(1.555,3) IN (1.5551);
SELECT toDecimal256(1.555,3) IN (1.5551,1.555);
SELECT toDecimal256(1.555,3) IN (1.5551,1.555000);
SELECT toDecimal256(1.555,3) IN (1.550,1.5);
DROP TABLE IF EXISTS decimal_in_float_test;
CREATE TABLE decimal_in_float_test ( `a` Decimal(18, 0), `b` Decimal(36, 2) ) ENGINE = Memory;
INSERT INTO decimal_in_float_test VALUES ('33', '44.44');
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33);
SELECT count() == 1 FROM decimal_in_float_test WHERE a IN (33.0);
SELECT count() == 1 FROM decimal_in_float_test WHERE a NOT IN (33.333);
SELECT count() == 1 FROM decimal_in_float_test WHERE b IN (44.44);
SELECT count() == 1 FROM decimal_in_float_test WHERE b NOT IN (44.4,44.444);
DROP TABLE IF EXISTS decimal_in_float_test;

View File

@ -0,0 +1,5 @@
1048545
100000
1
1048545
0

View File

@ -0,0 +1,9 @@
-- Tags: no-parallel
SELECT value FROM system.settings where name='max_insert_block_size';
SET max_insert_block_size=100000;
SELECT value FROM system.settings where name='max_insert_block_size';
SELECT changed FROM system.settings where name='max_insert_block_size';
SET max_insert_block_size=DEFAULT;
SELECT value FROM system.settings where name='max_insert_block_size';
SELECT changed FROM system.settings where name='max_insert_block_size';

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: depends on bzip2
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} --aggregate_functions_null_for_empty=1 --multiquery --query "create table test_date (date Int32) ENGINE = MergeTree ORDER BY (date) as select 20220920; SELECT max(date) FROM test_date";