Merge remote-tracking branch 'origin/master' into generated-file-cleanup

This commit is contained in:
Robert Schulze 2022-10-06 12:22:43 +00:00
commit 78be400ac0
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
68 changed files with 1180 additions and 283 deletions

View File

@ -11,6 +11,7 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
awscli \
brotli \
lz4 \
expect \
golang \
lsof \

View File

@ -0,0 +1,23 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.6.9.11-stable (9ec61dcac49) FIXME as compared to v22.6.8.35-stable (b91dc59a565)
#### Improvement
* Backported in [#42089](https://github.com/ClickHouse/ClickHouse/issues/42089): Replace back `clickhouse su` command with `sudo -u` in start in order to respect limits in `/etc/security/limits.conf`. [#41847](https://github.com/ClickHouse/ClickHouse/pull/41847) ([Eugene Konkov](https://github.com/ekonkov)).
#### Build/Testing/Packaging Improvement
* Backported in [#41558](https://github.com/ClickHouse/ClickHouse/issues/41558): Add `source` field to deb packages, update `nfpm`. [#41531](https://github.com/ClickHouse/ClickHouse/pull/41531) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#41504](https://github.com/ClickHouse/ClickHouse/issues/41504): Writing data in Apache `ORC` format might lead to a buffer overrun. [#41458](https://github.com/ClickHouse/ClickHouse/pull/41458) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Build latest tags ONLY from master branch [#41567](https://github.com/ClickHouse/ClickHouse/pull/41567) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -11,6 +11,7 @@ Columns:
- `path` ([String](../../sql-reference/data-types/string.md)) — Path to the mount point in the file system.
- `free_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Free space on disk in bytes.
- `total_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Disk volume in bytes.
- `unreserved_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Free space which is not taken by reservations (`free_space` minus the size of reservations taken by merges, inserts, and other disk write operations currently running).
- `keep_free_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Amount of disk space that should stay free on disk in bytes. Defined in the `keep_free_space_bytes` parameter of disk configuration.
**Example**

View File

@ -11,5 +11,6 @@ Cодержит информацию о дисках, заданных в [ко
- `path` ([String](../../sql-reference/data-types/string.md)) — путь к точке монтирования в файловой системе.
- `free_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — свободное место на диске в байтах.
- `total_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — объём диска в байтах.
- `unreserved_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — не зарезервированное cвободное место в байтах (`free_space` минус размер места, зарезервированного на выполняемые в данный момент фоновые слияния, вставки и другие операции записи на диск).
- `keep_free_space` ([UInt64](../../sql-reference/data-types/int-uint.md)) — место, которое должно остаться свободным на диске в байтах. Задаётся значением параметра `keep_free_space_bytes` конфигурации дисков.

View File

@ -1173,6 +1173,18 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</processors_profile_log>
<!-- Log of asynchronous inserts. It allows to check status
of insert query in fire-and-forget mode.
-->
<asynchronous_insert_log>
<database>system</database>
<table>asynchronous_insert_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<partition_by>event_date</partition_by>
<ttl>event_date + INTERVAL 3 DAY</ttl>
</asynchronous_insert_log>
<!-- <top_level_domains_path>/var/lib/clickhouse/top_level_domains/</top_level_domains_path> -->
<!-- Custom TLD lists.
Format: <name>/path/to/file</name>

View File

@ -210,7 +210,7 @@ public:
offsets.push_back(offsets.back() + 1);
}
virtual void insertManyDefaults(size_t length) override
void insertManyDefaults(size_t length) override
{
chars.resize_fill(chars.size() + length);
for (size_t i = 0; i < length; ++i)

View File

@ -55,3 +55,23 @@ private:
std::atomic<const DateLUTImpl *> default_impl;
};
inline UInt64 timeInMilliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 timeInMicroseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 timeInSeconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 timeInNanoseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(timepoint.time_since_epoch()).count();
}

View File

@ -381,9 +381,9 @@ public:
{
const LUTIndex i = toLUTIndex(v);
if constexpr (std::is_unsigned_v<DateOrTime> || std::is_same_v<DateOrTime, DayNum>)
return lut_saturated[i - lut[i].day_of_month + lut[i].days_in_month].date;
return lut_saturated[i + (lut[i].days_in_month - lut[i].day_of_month)].date;
else
return lut[i - lut[i].day_of_month + lut[i].days_in_month].date;
return lut[i + (lut[i].days_in_month - lut[i].day_of_month)].date;
}
template <typename DateOrTime>
@ -391,9 +391,9 @@ public:
{
const LUTIndex i = toLUTIndex(v);
if constexpr (std::is_unsigned_v<DateOrTime> || std::is_same_v<DateOrTime, DayNum>)
return toDayNum(LUTIndexWithSaturation(i - lut[i].day_of_month + lut[i].days_in_month));
return toDayNum(LUTIndexWithSaturation(i + (lut[i].days_in_month - lut[i].day_of_month)));
else
return toDayNum(LUTIndex(i - lut[i].day_of_month + lut[i].days_in_month));
return toDayNum(LUTIndex(i + (lut[i].days_in_month - lut[i].day_of_month)));
}
/// Round down to start of quarter.
@ -641,7 +641,7 @@ public:
{
const LUTIndex i = toLUTIndex(v);
/// We add 8 to avoid underflow at beginning of unix epoch.
return toDayNum(i + 8 - toDayOfWeek(i)) / 7;
return toDayNum(i + (8 - toDayOfWeek(i))) / 7;
}
/// Get year that contains most of the current week. Week begins at monday.
@ -650,7 +650,7 @@ public:
{
const LUTIndex i = toLUTIndex(v);
/// That's effectively the year of thursday of current week.
return toYear(toLUTIndex(i + 4 - toDayOfWeek(i)));
return toYear(toLUTIndex(i + (4 - toDayOfWeek(i))));
}
/// ISO year begins with a monday of the week that is contained more than by half in the corresponding calendar year.
@ -666,8 +666,8 @@ public:
auto first_day_of_week_of_year = lut[first_day_of_year].day_of_week;
return LUTIndex{first_day_of_week_of_year <= 4
? first_day_of_year + 1 - first_day_of_week_of_year
: first_day_of_year + 8 - first_day_of_week_of_year};
? first_day_of_year + (1 - first_day_of_week_of_year)
: first_day_of_year + (8 - first_day_of_week_of_year)};
}
template <typename DateOrTime>
@ -793,7 +793,7 @@ public:
const LUTIndex i = LUTIndex(v);
// Checking the week across the year
yw.first = toYear(i + 7 - toDayOfWeek(i + offset_day));
yw.first = toYear(i + (7 - toDayOfWeek(i + offset_day)));
auto first_day = makeLUTIndex(yw.first, 1, 1);
auto this_day = i;

View File

@ -13,6 +13,7 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>

View File

@ -27,7 +27,8 @@
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(FilesystemCacheLogElement)
M(FilesystemCacheLogElement) \
M(AsynchronousInsertLogElement)
namespace Poco
{

View File

@ -55,6 +55,30 @@ std::string toContentEncodingName(CompressionMethod method)
__builtin_unreachable();
}
CompressionMethod chooseHTTPCompressionMethod(const std::string & list)
{
/// The compression methods are ordered from most to least preferred.
if (std::string::npos != list.find("zstd"))
return CompressionMethod::Zstd;
else if (std::string::npos != list.find("br"))
return CompressionMethod::Brotli;
else if (std::string::npos != list.find("lz4"))
return CompressionMethod::Lz4;
else if (std::string::npos != list.find("snappy"))
return CompressionMethod::Snappy;
else if (std::string::npos != list.find("gzip"))
return CompressionMethod::Gzip;
else if (std::string::npos != list.find("deflate"))
return CompressionMethod::Zlib;
else if (std::string::npos != list.find("xz"))
return CompressionMethod::Xz;
else if (std::string::npos != list.find("bz2"))
return CompressionMethod::Bzip2;
else
return CompressionMethod::None;
}
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint)
{
std::string file_extension;

View File

@ -46,6 +46,10 @@ std::string toContentEncodingName(CompressionMethod method);
*/
CompressionMethod chooseCompressionMethod(const std::string & path, const std::string & hint);
/** Choose a compression method from HTTP header list of supported compression methods.
*/
CompressionMethod chooseHTTPCompressionMethod(const std::string & list);
/// Get a range of the valid compression levels for the compression method.
std::pair<uint64_t, uint64_t> getCompressionLevelRange(const CompressionMethod & method);

View File

@ -116,6 +116,8 @@ ReturnType parseDateTimeBestEffortImpl(
bool is_am = false;
bool is_pm = false;
bool has_comma_between_date_and_time = false;
auto read_alpha_month = [&month] (const auto & alpha)
{
if (0 == strncasecmp(alpha, "Jan", 3)) month = 1;
@ -137,6 +139,15 @@ ReturnType parseDateTimeBestEffortImpl(
while (!in.eof())
{
if ((year && !has_time) || (!year && has_time))
{
if (*in.position() == ',')
{
has_comma_between_date_and_time = true;
++in.position();
}
}
char digits[std::numeric_limits<UInt64>::digits10];
size_t num_digits = 0;
@ -552,6 +563,10 @@ ReturnType parseDateTimeBestEffortImpl(
}
}
//// Date like '2022/03/04, ' should parse fail?
if (has_comma_between_date_and_time && (!has_time || !year || !month || !day_of_month))
return on_error("Cannot read DateTime: unexpected word after Date", ErrorCodes::CANNOT_PARSE_DATETIME);
/// If neither Date nor Time is parsed successfully, it should fail
if (!year && !month && !day_of_month && !has_time)
return on_error("Cannot read DateTime: neither Date nor Time was parsed successfully", ErrorCodes::CANNOT_PARSE_DATETIME);

View File

@ -0,0 +1,82 @@
#include <Interpreters/AsynchronousInsertLog.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/queryToString.h>
namespace DB
{
NamesAndTypesList AsynchronousInsertLogElement::getNamesAndTypes()
{
auto type_status = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Ok", static_cast<Int8>(Status::Ok)},
{"ParsingError", static_cast<Int8>(Status::ParsingError)},
{"FlushError", static_cast<Int8>(Status::FlushError)},
});
return
{
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"query", std::make_shared<DataTypeString>()},
{"database", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"table", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"format", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"query_id", std::make_shared<DataTypeString>()},
{"bytes", std::make_shared<DataTypeUInt64>()},
{"exception", std::make_shared<DataTypeString>()},
{"status", type_status},
{"flush_time", std::make_shared<DataTypeDateTime>()},
{"flush_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"flush_query_id", std::make_shared<DataTypeString>()},
};
}
void AsynchronousInsertLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
auto event_date = DateLUT::instance().toDayNum(event_time).toUnderType();
columns[i++]->insert(event_date);
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*query);
columns[i++]->insert(queryToString(insert_query));
if (insert_query.table_id)
{
columns[i++]->insert(insert_query.table_id.getDatabaseName());
columns[i++]->insert(insert_query.table_id.getTableName());
}
else
{
columns[i++]->insertDefault();
columns[i++]->insertDefault();
}
columns[i++]->insert(insert_query.format);
columns[i++]->insert(query_id);
columns[i++]->insert(bytes);
columns[i++]->insert(exception);
columns[i++]->insert(status);
columns[i++]->insert(flush_time);
columns[i++]->insert(flush_time_microseconds);
columns[i++]->insert(flush_query_id);
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include "Common/Exception.h"
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
struct AsynchronousInsertLogElement
{
enum Status : Int8
{
Ok = 0,
ParsingError = 1,
FlushError = 2,
};
time_t event_time{};
Decimal64 event_time_microseconds{};
ASTPtr query;
String query_id;
UInt64 bytes{};
String exception;
Status status{};
time_t flush_time{};
Decimal64 flush_time_microseconds{};
String flush_query_id;
static std::string name() { return "AsynchronousInsertLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class AsynchronousInsertLog : public SystemLog<AsynchronousInsertLogElement>
{
public:
using SystemLog<AsynchronousInsertLogElement>::SystemLog;
/// This table is usually queried for fixed table name.
static const char * getDefaultOrderBy() { return "(database, table, event_date, event_time)"; }
};
}

View File

@ -4,6 +4,7 @@
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Executors/StreamingFormatExecutor.h>
@ -18,6 +19,7 @@
#include <Storages/IStorage.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitorHash.h>
#include <Common/DateLUT.h>
#include <Access/Common/AccessFlags.h>
#include <Access/EnabledQuota.h>
#include <Formats/FormatFactory.h>
@ -89,7 +91,9 @@ bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
}
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_)
: bytes(std::move(bytes_)), query_id(std::move(query_id_))
: bytes(std::move(bytes_))
, query_id(std::move(query_id_))
, create_time(std::chrono::system_clock::now())
{
}
@ -395,6 +399,31 @@ void AsynchronousInsertQueue::cleanup()
}
static void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
std::chrono::time_point<std::chrono::system_clock> flush_time,
const String & flush_query_id,
const String & flush_exception)
try
{
using Status = AsynchronousInsertLogElement::Status;
for (auto & elem : elements)
{
elem.flush_time = timeInSeconds(flush_time);
elem.flush_time_microseconds = timeInMicroseconds(flush_time);
elem.flush_query_id = flush_query_id;
elem.exception = flush_exception;
elem.status = flush_exception.empty() ? Status::Ok : Status::FlushError;
log.add(elem);
}
}
catch (...)
{
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
try
@ -402,6 +431,8 @@ try
if (!data)
return;
SCOPE_EXIT(CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size()));
const auto * log = &Poco::Logger::get("AsynchronousInsertQueue");
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);
auto insert_context = Context::createCopy(global_context);
@ -424,11 +455,13 @@ try
size_t total_rows = 0;
InsertData::EntryPtr current_entry;
String current_exception;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
current_exception = e.displayText();
LOG_ERROR(log, "Failed parsing for query '{}' with query id {}. {}",
queryToString(key.query), current_entry->query_id, e.displayText());
queryToString(key.query), current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
@ -448,6 +481,12 @@ try
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
auto insert_log = global_context->getAsynchronousInsertLog();
std::vector<AsynchronousInsertLogElement> log_elements;
if (insert_log)
log_elements.reserve(data->entries.size());
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
for (const auto & entry : data->entries)
@ -459,11 +498,40 @@ try
/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.
last_buffer = std::move(buffer);
if (insert_log)
{
AsynchronousInsertLogElement elem;
elem.event_time = timeInSeconds(entry->create_time);
elem.event_time_microseconds = timeInMicroseconds(entry->create_time);
elem.query = key.query;
elem.query_id = entry->query_id;
elem.bytes = entry->bytes.size();
elem.exception = current_exception;
current_exception.clear();
/// If there was a parsing error,
/// the entry won't be flushed anyway,
/// so add the log element immediately.
if (!elem.exception.empty())
{
elem.status = AsynchronousInsertLogElement::ParsingError;
insert_log->add(elem);
}
else
{
log_elements.push_back(elem);
}
}
}
format->addBuffer(std::move(last_buffer));
auto insert_query_id = insert_context->getCurrentQueryId();
if (total_rows)
if (total_rows == 0)
return;
try
{
auto chunk = Chunk(executor.getResultColumns(), total_rows);
size_t total_bytes = chunk.bytes();
@ -477,12 +545,28 @@ try
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'",
total_rows, total_bytes, queryToString(key.query));
}
catch (...)
{
if (!log_elements.empty())
{
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, exception);
}
throw;
}
for (const auto & entry : data->entries)
{
if (!entry->isFinished())
entry->finish();
}
CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size());
if (!log_elements.empty())
{
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
}
}
catch (const Exception & e)
{
@ -516,8 +600,6 @@ void AsynchronousInsertQueue::finishWithException(
entry->finish(std::make_exception_ptr(exception));
}
}
CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, entries.size());
}
}

View File

@ -47,6 +47,7 @@ private:
public:
const String bytes;
const String query_id;
std::chrono::time_point<std::chrono::system_clock> create_time;
Entry(String && bytes_, String && query_id_);

View File

@ -2665,6 +2665,16 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
return shared->system_logs->cache_log;
}
std::shared_ptr<AsynchronousInsertLog> Context::getAsynchronousInsertLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->asynchronous_insert_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

View File

@ -86,6 +86,7 @@ class BackupsWorker;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class AsynchronousInsertLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -891,8 +892,8 @@ public:
std::shared_ptr<SessionLog> getSessionLog() const;
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.

View File

@ -331,7 +331,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
if (query.select && settings.parallel_distributed_insert_select)
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());

View File

@ -33,6 +33,7 @@
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/TransactionLog.h>
#include <BridgeHelper/CatBoostLibraryBridgeHelper.h>
@ -523,7 +524,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] { if (auto session_log = getContext()->getSessionLog()) session_log->flush(true); },
[&] { if (auto transactions_info_log = getContext()->getTransactionsInfoLog()) transactions_info_log->flush(true); },
[&] { if (auto processors_profile_log = getContext()->getProcessorsProfileLog()) processors_profile_log->flush(true); },
[&] { if (auto cache_log = getContext()->getFilesystemCacheLog()) cache_log->flush(true); }
[&] { if (auto cache_log = getContext()->getFilesystemCacheLog()) cache_log->flush(true); },
[&] { if (auto asynchronous_insert_log = getContext()->getAsynchronousInsertLog()) asynchronous_insert_log->flush(true); }
);
break;
}

View File

@ -78,22 +78,6 @@ void MetricLog::shutdown()
}
static inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
}
static inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
static inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
void MetricLog::metricThreadFunction()
{
auto desired_timepoint = std::chrono::system_clock::now();
@ -109,8 +93,8 @@ void MetricLog::metricThreadFunction()
MetricLogElement elem;
elem.event_time = std::chrono::system_clock::to_time_t(current_time);
elem.event_time_microseconds = time_in_microseconds(current_time);
elem.milliseconds = time_in_milliseconds(current_time) - time_in_seconds(current_time) * 1000;
elem.event_time_microseconds = timeInMicroseconds(current_time);
elem.milliseconds = timeInMilliseconds(current_time) - timeInSeconds(current_time) * 1000;
elem.profile_events.resize(ProfileEvents::end());
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)

View File

@ -169,16 +169,6 @@ bool PartLog::addNewPart(
return addNewParts(current_context, {part}, elapsed_ns, execution_status);
}
static inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
static inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
bool PartLog::addNewParts(
ContextPtr current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
@ -209,8 +199,8 @@ bool PartLog::addNewParts(
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto time_now = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(time_now);
elem.event_time_microseconds = time_in_microseconds(time_now);
elem.event_time = timeInSeconds(time_now);
elem.event_time_microseconds = timeInMicroseconds(time_now);
elem.duration_ms = elapsed_ns / 1000000;
elem.database_name = table_id.database_name;

View File

@ -30,21 +30,11 @@ namespace
{
using namespace DB;
inline DateTime64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline time_t time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
auto eventTime()
{
const auto finish_time = std::chrono::system_clock::now();
return std::make_pair(time_in_seconds(finish_time), time_in_microseconds(finish_time));
return std::make_pair(timeInSeconds(finish_time), timeInMicroseconds(finish_time));
}
using AuthType = AuthenticationType;

View File

@ -13,6 +13,7 @@
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -208,6 +209,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
transactions_info_log = createSystemLog<TransactionsInfoLog>(
global_context, "system", "transactions_info_log", config, "transactions_info_log");
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log");
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -242,6 +244,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(processors_profile_log.get());
if (cache_log)
logs.emplace_back(cache_log.get());
if (asynchronous_insert_log)
logs.emplace_back(asynchronous_insert_log.get());
try
{

View File

@ -47,6 +47,7 @@ class SessionLog;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class AsynchronousInsertLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -79,6 +80,7 @@ struct SystemLogs
std::shared_ptr<TransactionsInfoLog> transactions_info_log;
/// Used to log processors profiling
std::shared_ptr<ProcessorsProfileLog> processors_profile_log;
std::shared_ptr<AsynchronousInsertLog> asynchronous_insert_log;
std::vector<ISystemLog *> logs;
};

View File

@ -17,6 +17,7 @@
#include <Common/ThreadProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/noexcept_scope.h>
#include <Common/DateLUT.h>
#include <base/errnoToString.h>
#if defined(OS_LINUX)
@ -154,22 +155,6 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
setupState(thread_group_);
}
inline UInt64 time_in_nanoseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
void ThreadStatus::initPerformanceCounters()
{
performance_counters_finalized = false;
@ -184,9 +169,9 @@ void ThreadStatus::initPerformanceCounters()
// to ensure that they are all equal up to the precision of a second.
const auto now = std::chrono::system_clock::now();
query_start_time_nanoseconds = time_in_nanoseconds(now);
query_start_time = time_in_seconds(now);
query_start_time_microseconds = time_in_microseconds(now);
query_start_time_nanoseconds = timeInNanoseconds(now);
query_start_time = timeInSeconds(now);
query_start_time_microseconds = timeInMicroseconds(now);
++queries_started;
// query_start_time_nanoseconds cannot be used here since RUsageCounters expect CLOCK_MONOTONIC
@ -261,7 +246,7 @@ void ThreadStatus::finalizePerformanceCounters()
if (settings.log_queries && settings.log_query_threads)
{
const auto now = std::chrono::system_clock::now();
Int64 query_duration_ms = (time_in_microseconds(now) - query_start_time_microseconds) / 1000;
Int64 query_duration_ms = (timeInMicroseconds(now) - query_start_time_microseconds) / 1000;
if (query_duration_ms >= settings.log_queries_min_query_duration_ms.totalMilliseconds())
{
if (auto thread_log = global_context_ptr->getQueryThreadLog())
@ -378,14 +363,14 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String
// construct current_time and current_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
auto current_time = time_in_seconds(now);
auto current_time_microseconds = time_in_microseconds(now);
auto current_time = timeInSeconds(now);
auto current_time_microseconds = timeInMicroseconds(now);
elem.event_time = current_time;
elem.event_time_microseconds = current_time_microseconds;
elem.query_start_time = query_start_time;
elem.query_start_time_microseconds = query_start_time_microseconds;
elem.query_duration_ms = (time_in_nanoseconds(now) - query_start_time_nanoseconds) / 1000000U;
elem.query_duration_ms = (timeInNanoseconds(now) - query_start_time_nanoseconds) / 1000000U;
elem.read_rows = progress_in.read_rows.load(std::memory_order_relaxed);
elem.read_bytes = progress_in.read_bytes.load(std::memory_order_relaxed);
@ -447,8 +432,8 @@ void ThreadStatus::logToQueryViewsLog(const ViewRuntimeData & vinfo)
QueryViewsLogElement element;
element.event_time = time_in_seconds(vinfo.runtime_stats->event_time);
element.event_time_microseconds = time_in_microseconds(vinfo.runtime_stats->event_time);
element.event_time = timeInSeconds(vinfo.runtime_stats->event_time);
element.event_time_microseconds = timeInMicroseconds(vinfo.runtime_stats->event_time);
element.view_duration_ms = vinfo.runtime_stats->elapsed_ms;
element.initial_query_id = query_id;

View File

@ -234,17 +234,6 @@ static void logException(ContextPtr context, QueryLogElement & elem)
elem.stack_trace);
}
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr context, UInt64 current_time_us, ASTPtr ast, const std::shared_ptr<OpenTelemetry::SpanHolder> & query_span)
{
/// Exception before the query execution.
@ -379,8 +368,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
// example, the query is from an initiator that is running an old version of clickhouse.
if (!internal && client_info.initial_query_start_time == 0)
{
client_info.initial_query_start_time = time_in_seconds(current_time);
client_info.initial_query_start_time_microseconds = time_in_microseconds(current_time);
client_info.initial_query_start_time = timeInSeconds(current_time);
client_info.initial_query_start_time_microseconds = timeInMicroseconds(current_time);
}
assert(internal || CurrentThread::get().getQueryContext());
@ -448,7 +437,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
logQuery(query_for_logging, context, internal, stage);
if (!internal)
onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast, query_span);
onExceptionBeforeStart(query_for_logging, context, timeInMicroseconds(current_time), ast, query_span);
throw;
}
@ -742,10 +731,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.type = QueryLogElementType::QUERY_START; //-V1048
elem.event_time = time_in_seconds(current_time);
elem.event_time_microseconds = time_in_microseconds(current_time);
elem.query_start_time = time_in_seconds(current_time);
elem.query_start_time_microseconds = time_in_microseconds(current_time);
elem.event_time = timeInSeconds(current_time);
elem.event_time_microseconds = timeInMicroseconds(current_time);
elem.query_start_time = timeInSeconds(current_time);
elem.query_start_time_microseconds = timeInMicroseconds(current_time);
elem.current_database = context->getCurrentDatabase();
elem.query = query_for_logging;
@ -874,8 +863,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
elem.event_time = timeInSeconds(finish_time);
elem.event_time_microseconds = timeInMicroseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
@ -915,8 +904,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.event_time = timeInSeconds(finish_time);
processor_elem.event_time_microseconds = timeInMicroseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
@ -1018,8 +1007,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
// to ensure that both the times will be equal up to the precision of a second.
const auto time_now = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(time_now);
elem.event_time_microseconds = time_in_microseconds(time_now);
elem.event_time = timeInSeconds(time_now);
elem.event_time_microseconds = timeInMicroseconds(time_now);
elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
elem.exception_code = getCurrentExceptionCode();
elem.exception = getCurrentExceptionMessage(false);
@ -1084,7 +1073,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
if (!internal)
onExceptionBeforeStart(query_for_logging, context, time_in_microseconds(current_time), ast, query_span);
onExceptionBeforeStart(query_for_logging, context, timeInMicroseconds(current_time), ast, query_span);
throw;
}

View File

@ -266,17 +266,17 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
desc.is_agg_func_type = is_agg_func;
desc.column_numbers = {i};
desc.real_type = column.type;
desc.nested_type = recursiveRemoveLowCardinality(desc.real_type);
if (desc.real_type.get() == desc.nested_type.get())
desc.nested_type = nullptr;
if (simple)
{
// simple aggregate function
desc.init(simple->getFunction(), true);
if (desc.function->allocatesMemoryInArena())
def.allocates_memory_in_arena = true;
desc.real_type = column.type;
desc.nested_type = recursiveRemoveLowCardinality(desc.real_type);
if (desc.real_type.get() == desc.nested_type.get())
desc.nested_type = nullptr;
}
else if (!is_agg_func)
{
@ -395,14 +395,11 @@ static MutableColumns getMergedDataColumns(
columns.emplace_back(ColumnTuple::create(std::move(tuple_columns)));
}
else if (desc.is_simple_agg_func_type)
else
{
const auto & type = desc.nested_type ? desc.nested_type
: desc.real_type;
const auto & type = desc.nested_type ? desc.nested_type : desc.real_type;
columns.emplace_back(type->createColumn());
}
else
columns.emplace_back(header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty());
}
for (const auto & column_number : def.column_numbers_not_to_aggregate)
@ -421,7 +418,7 @@ static void preprocessChunk(Chunk & chunk, const SummingSortedAlgorithm::Columns
for (const auto & desc : def.columns_to_aggregate)
{
if (desc.is_simple_agg_func_type && desc.nested_type)
if (desc.nested_type)
{
auto & col = columns[desc.column_numbers[0]];
col = recursiveRemoveLowCardinality(col);
@ -453,7 +450,7 @@ static void postprocessChunk(
for (size_t i = 0; i < tuple_size; ++i)
res_columns[desc.column_numbers[i]] = assert_cast<const ColumnTuple &>(*column).getColumnPtr(i);
}
else if (desc.is_simple_agg_func_type && desc.nested_type)
else if (desc.nested_type)
{
const auto & from_type = desc.nested_type;
const auto & to_type = desc.real_type;

View File

@ -542,22 +542,7 @@ void HTTPHandler::processQuery(
CompressionMethod http_response_compression_method = CompressionMethod::None;
if (!http_response_compression_methods.empty())
{
/// If client supports brotli - it's preferred.
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("br"))
http_response_compression_method = CompressionMethod::Brotli;
else if (std::string::npos != http_response_compression_methods.find("gzip"))
http_response_compression_method = CompressionMethod::Gzip;
else if (std::string::npos != http_response_compression_methods.find("deflate"))
http_response_compression_method = CompressionMethod::Zlib;
else if (std::string::npos != http_response_compression_methods.find("xz"))
http_response_compression_method = CompressionMethod::Xz;
else if (std::string::npos != http_response_compression_methods.find("zstd"))
http_response_compression_method = CompressionMethod::Zstd;
}
http_response_compression_method = chooseHTTPCompressionMethod(http_response_compression_methods);
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;

View File

@ -41,18 +41,7 @@ responseWriteBuffer(HTTPServerRequest & request, HTTPServerResponse & response,
CompressionMethod http_response_compression_method = CompressionMethod::None;
if (!http_response_compression_methods.empty())
{
/// If client supports brotli - it's preferred.
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("br"))
http_response_compression_method = CompressionMethod::Brotli;
else if (std::string::npos != http_response_compression_methods.find("gzip"))
http_response_compression_method = CompressionMethod::Gzip;
else if (std::string::npos != http_response_compression_methods.find("deflate"))
http_response_compression_method = CompressionMethod::Zlib;
}
http_response_compression_method = chooseHTTPCompressionMethod(http_response_compression_methods);
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;

View File

@ -819,10 +819,18 @@ struct StorageDistributedDirectoryMonitor::Batch
}
else
{
std::vector<std::string> files(file_index_to_path.size());
std::vector<std::string> files;
for (const auto && file_info : file_index_to_path | boost::adaptors::indexed())
files[file_info.index()] = file_info.value().second;
e.addMessage(fmt::format("While sending batch {}", fmt::join(files, "\n")));
{
if (file_info.index() > 8)
{
files.push_back("...");
break;
}
files.push_back(file_info.value().second);
}
e.addMessage(fmt::format("While sending batch, nums: {}, files: {}", file_index_to_path.size(), fmt::join(files, "\n")));
throw;
}

View File

@ -1,4 +1,5 @@
#include "config.h"
#include "Interpreters/Context_fwd.h"
#if USE_HDFS
@ -41,7 +42,7 @@ StorageHDFSCluster::StorageHDFSCluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
: IStorage(table_id_)
: IStorageCluster(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
@ -74,13 +75,8 @@ Pipe StorageHDFSCluster::read(
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -117,7 +113,7 @@ Pipe StorageHDFSCluster::read(
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
extension);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -140,6 +136,18 @@ QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage(
}
ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, ContextPtr context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
NamesAndTypesList StorageHDFSCluster::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -9,6 +9,7 @@
#include <Client/Connection.h>
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/HDFS/StorageHDFS.h>
namespace DB
@ -16,7 +17,7 @@ namespace DB
class Context;
class StorageHDFSCluster : public IStorage
class StorageHDFSCluster : public IStorageCluster
{
public:
StorageHDFSCluster(
@ -39,6 +40,9 @@ public:
NamesAndTypesList getVirtuals() const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
private:
String cluster_name;
String uri;

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
namespace DB
{
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
*/
class IStorageCluster : public IStorage
{
public:
explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {}
virtual ClusterPtr getCluster(ContextPtr context) const = 0;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const = 0;
bool isRemote() const override { return true; }
};
}

View File

@ -1640,6 +1640,13 @@ bool KeyCondition::tryParseAtomFromAST(const Tree & node, ContextPtr context, Bl
}
else if (func.getArgumentAt(1).tryGetConstant(block_with_constants, const_value, const_type))
{
/// If the const operand is null, the atom will be always false
if (const_value.isNull())
{
out.function = RPNElement::ALWAYS_FALSE;
return true;
}
if (isKeyPossiblyWrappedByMonotonicFunctions(func.getArgumentAt(0), context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 0;
@ -1663,6 +1670,13 @@ bool KeyCondition::tryParseAtomFromAST(const Tree & node, ContextPtr context, Bl
}
else if (func.getArgumentAt(0).tryGetConstant(block_with_constants, const_value, const_type))
{
/// If the const operand is null, the atom will be always false
if (const_value.isNull())
{
out.function = RPNElement::ALWAYS_FALSE;
return true;
}
if (isKeyPossiblyWrappedByMonotonicFunctions(func.getArgumentAt(1), context, key_column_num, key_expr_type, chain))
{
key_arg_pos = 1;

View File

@ -193,16 +193,6 @@ static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
}
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
MergeTreeData::MergeTreeData(
const StorageID & table_id_,
const String & relative_data_path_,
@ -1817,8 +1807,8 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
part_log_elem.event_type = PartLogElement::REMOVE_PART;
const auto time_now = std::chrono::system_clock::now();
part_log_elem.event_time = time_in_seconds(time_now);
part_log_elem.event_time_microseconds = time_in_microseconds(time_now);
part_log_elem.event_time = timeInSeconds(time_now);
part_log_elem.event_time_microseconds = timeInMicroseconds(time_now);
part_log_elem.duration_ms = 0; //-V1048
@ -6516,8 +6506,8 @@ try
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto time_now = std::chrono::system_clock::now();
part_log_elem.event_time = time_in_seconds(time_now);
part_log_elem.event_time_microseconds = time_in_microseconds(time_now);
part_log_elem.event_time = timeInSeconds(time_now);
part_log_elem.event_time_microseconds = timeInMicroseconds(time_now);
/// TODO: Stop stopwatch in outer code to exclude ZK timings and so on
part_log_elem.duration_ms = elapsed_ns / 1000000;

View File

@ -59,6 +59,8 @@
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -759,55 +761,35 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const
{
QueryPipeline pipeline;
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
const auto & settings = local_context->getSettingsRef();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (select.list_of_selects->children.size() == 1)
/// Unwrap view() function.
if (src_distributed.remote_table_function_ptr)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
if (joined_tables.tablesCount() == 1)
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
/// Unwrap view() function.
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto * select = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children.at(0)->as<ASTSelectQuery>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
}
}
}
}
new_query->select = select_with_union_query;
}
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses();
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
@ -822,7 +804,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_distributed.getClusterName(),
src_addresses.size(),
getClusterName(),
dst_addresses.size());
@ -849,6 +831,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
@ -882,6 +865,120 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(
select.list_of_selects->children.at(0)->as<ASTSelectQuery>()->clone(),
local_context);
auto dst_cluster = getCluster();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
/// Distributed write only works in the most trivial case INSERT ... SELECT
/// without any unions or joins on the right side
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(src_storage))
{
return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context);
}
if (auto src_storage_cluster = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context);
}
if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\
"Reason: distributed reading is supported only from Distributed engine or *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
@ -207,6 +208,9 @@ private:
void delayInsertOrThrowIfNeeded() const;
std::optional<QueryPipeline> distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const;
std::optional<QueryPipeline> distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;

View File

@ -46,11 +46,13 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
@ -60,6 +62,7 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -74,6 +77,7 @@
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/IBackup.h>
@ -136,6 +140,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int REPLICA_IS_NOT_IN_QUORUM;
extern const int TABLE_IS_READ_ONLY;
extern const int TABLE_IS_DROPPED;
extern const int NOT_FOUND_NODE;
extern const int NO_ACTIVE_REPLICAS;
extern const int NOT_A_LEADER;
@ -161,6 +166,7 @@ namespace ErrorCodes
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int NOT_INITIALIZED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -279,7 +285,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, restarting_thread(*this)
, part_moves_between_shards_orchestrator(*this)
, renaming_restrictions(renaming_restrictions_)
, replicated_fetches_pool_size(getContext()->getSettingsRef().background_fetches_pool_size)
, replicated_fetches_pool_size(getContext()->getFetchesExecutor()->getMaxTasksCount())
, replicated_fetches_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_fetches_network_bandwidth, getContext()->getReplicatedFetchesThrottler()))
, replicated_sends_throttler(std::make_shared<Throttler>(getSettings()->max_replicated_sends_network_bandwidth, getContext()->getReplicatedSendsThrottler()))
{
@ -443,7 +449,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
createNewZooKeeperNodes();
syncPinnedPartUUIDs();
createTableSharedID();
if (!has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper)
createTableSharedID();
initialization_done = true;
}
@ -4450,6 +4457,106 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context);
/// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function
auto src_cluster = src_storage_cluster->getCluster(local_context);
/// Actually the query doesn't change, we just serialize it to string
String query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
query.IAST::format(ast_format_settings);
query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (const auto & replicas : src_cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
/// Do not enable parallel distributed INSERT SELECT in case when query probably comes from another server
if (local_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
return {};
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(src_distributed, query, local_context);
}
else if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. Reason: distributed "
"reading into Replicated table is supported only from *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
bool StorageReplicatedMergeTree::optimize(
const ASTPtr &,
const StorageMetadataPtr &,
@ -7443,10 +7550,22 @@ String StorageReplicatedMergeTree::getTableSharedID() const
/// can be called only during table initialization
std::lock_guard lock(table_shared_id_mutex);
bool maybe_has_metadata_in_zookeeper = !has_metadata_in_zookeeper.has_value() || *has_metadata_in_zookeeper;
/// Can happen if table was partially initialized before drop by DatabaseCatalog
if (maybe_has_metadata_in_zookeeper && table_shared_id == UUIDHelpers::Nil)
createTableSharedID();
if (table_shared_id == UUIDHelpers::Nil)
{
if (has_metadata_in_zookeeper.has_value())
{
if (*has_metadata_in_zookeeper)
createTableSharedID();
else
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is already dropped", getStorageID().getNameForLogs());
}
else
{
throw Exception(ErrorCodes::NO_ZOOKEEPER, "No connection to ZooKeeper, cannot get shared table ID for table {}. "
"It will be resolve automatically when connection will be established", getStorageID().getNameForLogs());
}
}
return toString(table_shared_id);
}
@ -7622,6 +7741,10 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
return std::make_pair(true, NameSet{});
}
/// If table was completely dropped (no meta in zookeeper) we can safely remove parts
if (has_metadata_in_zookeeper.has_value() && !has_metadata_in_zookeeper)
return std::make_pair(true, NameSet{});
/// We remove parts during table shutdown. If exception happen, restarting thread will be already turned
/// off and nobody will reconnect our zookeeper connection. In this case we use zookeeper connection from
/// context.
@ -7631,6 +7754,11 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedData(const IMer
else
zookeeper = getZooKeeper();
/// It can happen that we didn't had the connection to zookeeper during table creation, but actually
/// table is completely dropped, so we can drop it without any additional checks.
if (!has_metadata_in_zookeeper.has_value() && !zookeeper->exists(zookeeper_path))
return std::make_pair(true, NameSet{});
return unlockSharedDataByID(part.getUniqueId(), getTableSharedID(), part.name, replica_name, part.data_part_storage->getDiskType(), zookeeper, *getSettings(), log,
zookeeper_path);
}

View File

@ -4,6 +4,7 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
@ -139,6 +140,8 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
@ -483,6 +486,8 @@ private:
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -51,7 +51,7 @@ StorageS3Cluster::StorageS3Cluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorage(table_id_)
: IStorageCluster(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
@ -101,11 +101,8 @@ Pipe StorageS3Cluster::read(
{
StorageS3::updateS3Configuration(context, s3_configuration);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -130,7 +127,6 @@ Pipe StorageS3Cluster::read(
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
@ -142,7 +138,7 @@ Pipe StorageS3Cluster::read(
scalars,
Tables(),
processed_stage,
RemoteQueryExecutor::Extension{.task_iterator = callback});
extension);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -165,6 +161,19 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
}
ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
NamesAndTypesList StorageS3Cluster::getVirtuals() const
{
return virtual_columns;

View File

@ -10,6 +10,7 @@
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <IO/S3Common.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageS3.h>
namespace DB
@ -17,7 +18,7 @@ namespace DB
class Context;
class StorageS3Cluster : public IStorage
class StorageS3Cluster : public IStorageCluster
{
public:
StorageS3Cluster(
@ -37,9 +38,11 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
ClusterPtr getCluster(ContextPtr context) const override;
private:
StorageS3::S3Configuration s3_configuration;
String filename;
String cluster_name;
String format_name;

View File

@ -21,6 +21,7 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
{"path", std::make_shared<DataTypeString>()},
{"free_space", std::make_shared<DataTypeUInt64>()},
{"total_space", std::make_shared<DataTypeUInt64>()},
{"unreserved_space", std::make_shared<DataTypeUInt64>()},
{"keep_free_space", std::make_shared<DataTypeUInt64>()},
{"type", std::make_shared<DataTypeString>()},
{"is_encrypted", std::make_shared<DataTypeUInt8>()},
@ -44,6 +45,7 @@ Pipe StorageSystemDisks::read(
MutableColumnPtr col_path = ColumnString::create();
MutableColumnPtr col_free = ColumnUInt64::create();
MutableColumnPtr col_total = ColumnUInt64::create();
MutableColumnPtr col_unreserved = ColumnUInt64::create();
MutableColumnPtr col_keep = ColumnUInt64::create();
MutableColumnPtr col_type = ColumnString::create();
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
@ -55,6 +57,7 @@ Pipe StorageSystemDisks::read(
col_path->insert(disk_ptr->getPath());
col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace());
col_unreserved->insert(disk_ptr->getUnreservedSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace());
auto data_source_description = disk_ptr->getDataSourceDescription();
col_type->insert(toString(data_source_description.type));
@ -72,6 +75,7 @@ Pipe StorageSystemDisks::read(
res_columns.emplace_back(std::move(col_path));
res_columns.emplace_back(std::move(col_free));
res_columns.emplace_back(std::move(col_total));
res_columns.emplace_back(std::move(col_unreserved));
res_columns.emplace_back(std::move(col_keep));
res_columns.emplace_back(std::move(col_type));
res_columns.emplace_back(std::move(col_is_encrypted));

View File

@ -1,5 +1,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
disk_types = {
"default": "local",
@ -28,18 +29,27 @@ def cluster():
def test_different_types(cluster):
node = cluster.instances["node"]
response = node.query("SELECT * FROM system.disks")
disks = response.split("\n")
for disk in disks:
if disk == "": # skip empty line (after split at last position)
continue
fields = disk.split("\t")
response = TSV.toMat(node.query("SELECT * FROM system.disks FORMAT TSVWithNames"))
assert len(response) > len(disk_types) # at least one extra line for header
name_col_ix = response[0].index("name")
type_col_ix = response[0].index("type")
encrypted_col_ix = response[0].index("is_encrypted")
for fields in response[1:]: # skip header
assert len(fields) >= 7
assert disk_types.get(fields[0], "UNKNOWN") == fields[5]
if "encrypted" in fields[0]:
assert fields[6] == "1"
assert (
disk_types.get(fields[name_col_ix], "UNKNOWN") == fields[type_col_ix]
), f"Wrong type ({fields[type_col_ix]}) for disk {fields[name_col_ix]}!"
if "encrypted" in fields[name_col_ix]:
assert (
fields[encrypted_col_ix] == "1"
), f"{fields[name_col_ix]} expected to be encrypted!"
else:
assert fields[6] == "0"
assert (
fields[encrypted_col_ix] == "0"
), f"{fields[name_col_ix]} expected to be non-encrypted!"
def test_select_by_type(cluster):

View File

@ -6,7 +6,6 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_object_metadata>true</send_object_metadata>
</s3>
</disks>
</storage_configuration>

View File

@ -1,7 +1,3 @@
<clickhouse>
<profiles>
<default>
<background_fetches_pool_size>3</background_fetches_pool_size>
</default>
</profiles>
<background_fetches_pool_size>3</background_fetches_pool_size>
</clickhouse>

View File

@ -11,10 +11,10 @@ import os
cluster = ClickHouseCluster(__file__)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
node1 = cluster.add_instance(
"node1", user_configs=["configs/custom_settings.xml"], with_zookeeper=True
"node1", main_configs=["configs/custom_settings.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"node2", user_configs=["configs/custom_settings.xml"], with_zookeeper=True
"node2", main_configs=["configs/custom_settings.xml"], with_zookeeper=True
)
MAX_THREADS_FOR_FETCH = 3

View File

@ -6,7 +6,6 @@
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_object_metadata>true</send_object_metadata>
</s3>
</disks>
</storage_configuration>

View File

@ -20,8 +20,23 @@
</shard>
</cluster_simple>
<!-- A part of the cluster above, represents only one shard-->
<first_shard>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
</first_shard>
</remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>
</macros>
</clickhouse>
</clickhouse>

View File

@ -1,5 +1,9 @@
from email.errors import HeaderParseError
import logging
import os
import csv
import shutil
import time
import pytest
from helpers.cluster import ClickHouseCluster
@ -19,6 +23,21 @@ S3_DATA = [
def create_buckets_s3(cluster):
minio = cluster.minio_client
for file_number in range(100):
file_name = f"data/generated/file_{file_number}.csv"
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
S3_DATA.append(file_name)
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
# a String, b UInt64
data = []
for number in range(100):
data.append([str(number) * 10, number])
writer = csv.writer(f)
writer.writerows(data)
for file in S3_DATA:
minio.fput_object(
bucket_name=cluster.minio_bucket,
@ -34,10 +53,24 @@ def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True
"s0_0_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
)
cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"])
cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"])
logging.info("Starting cluster...")
cluster.start()
@ -47,6 +80,7 @@ def started_cluster():
yield cluster
finally:
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"))
cluster.shutdown()
@ -55,17 +89,17 @@ def test_select_all(started_cluster):
pure_s3 = node.query(
"""
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
)
# print(s3_distibuted)
@ -78,15 +112,15 @@ def test_count(started_cluster):
pure_s3 = node.query(
"""
SELECT count(*) from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
@ -125,13 +159,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -143,13 +177,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -166,12 +200,12 @@ def test_wrong_cluster(started_cluster):
"""
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
@ -184,14 +218,139 @@ def test_ambiguous_join(started_cluster):
result = node.query(
"""
SELECT l.name, r.value from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l
JOIN s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r
ON l.name = r.name
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_distributed_insert_select(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_second_shard = started_cluster.instances["s0_1_0"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_local ON CLUSTER 'cluster_simple' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select', '{replica}')
ORDER BY (a, b);
"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_distributed ON CLUSTER 'cluster_simple' as insert_select_local
ENGINE = Distributed('cluster_simple', default, insert_select_local, b % 2);
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_distributed SETTINGS insert_distributed_sync=1 SELECT * FROM s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1, insert_distributed_sync=1;
"""
)
for line in (
first_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
second_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
first_replica_second_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 1
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
ORDER BY (a, b);
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM STOP FETCHES;
"""
)
replica.query(
"""
SYSTEM STOP MERGES;
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster(
'first_shard',
'http://minio1:9001/root/data/generated_replicated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1;
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM FLUSH LOGS;
"""
)
second = int(
second_replica_first_shard.query(
"""SELECT count(*) FROM system.query_log WHERE not is_initial_query and query like '%s3Cluster%';"""
).strip()
)
assert second != 0
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)

View File

@ -78,17 +78,44 @@
7
8
9
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9
< Content-Encoding: gzip
< Content-Encoding: deflate
< Content-Encoding: gzip
< Content-Encoding: br
< Content-Encoding: xz
< Content-Encoding: zstd
< Content-Encoding: lz4
< Content-Encoding: bz2
< Content-Encoding: snappy
1
1
1
1
1
1
1
Hello, world
Hello, world
Hello, world
Hello, world
Hello, world

View File

@ -18,6 +18,8 @@ ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' | brotli -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' | xz -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' | zstd -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: lz4' -d 'SELECT number FROM system.numbers LIMIT 10' | lz4 -d;
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: bz2' -d 'SELECT number FROM system.numbers LIMIT 10' | bzip2 -d;
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
@ -27,18 +29,25 @@ ${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: br' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: xz' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: zstd' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: lz4' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: bz2' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
${CLICKHOUSE_CURL} -vsS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: snappy' -d 'SELECT number FROM system.numbers LIMIT 10' 2>&1 | grep --text '< Content-Encoding';
echo "SELECT 1" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}";
echo "SELECT 1" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}";
echo "SELECT 1" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}";
echo "SELECT 1" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}";
echo "SELECT 1" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}";
echo "SELECT 1" | lz4 -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: lz4' "${CLICKHOUSE_URL}";
echo "SELECT 1" | bzip2 -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: bz2' "${CLICKHOUSE_URL}";
echo "'Hello, world'" | ${CLICKHOUSE_CURL} -sS --data-binary @- "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | gzip -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: gzip' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | brotli | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: br' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | xz -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: xz' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | zstd -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: zstd' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | lz4 -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: lz4' "${CLICKHOUSE_URL}&query=SELECT";
echo "'Hello, world'" | bzip2 -c | ${CLICKHOUSE_CURL} -sS --data-binary @- -H 'Content-Encoding: bz2' "${CLICKHOUSE_URL}&query=SELECT";
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&enable_http_compression=1" -H 'Accept-Encoding: gzip' -d 'SELECT number FROM system.numbers LIMIT 0' | wc -c;

View File

@ -3,5 +3,4 @@
2020-10-24 00:00:00 1.3619605237696326 0.16794469697335793 0.7637956767025532 0.8899329799574005 0.6227685185389797 0.30795997278638165 0.7637956767025532
2020-10-24 00:00:00 19 -1.9455094931672063 0.7759802460082872 0.6 0
2020-10-24 00:00:00 852 894
2 -1
999

View File

@ -136,7 +136,7 @@
61885
61885
17216
17217
17218
2686
-------toRelativeDayNum---------
39969

View File

@ -183,6 +183,7 @@ CREATE TABLE system.disks
`path` String,
`free_space` UInt64,
`total_space` UInt64,
`unreserved_space` UInt64,
`keep_free_space` UInt64,
`type` String,
`is_encrypted` UInt8,

View File

@ -1,4 +1,10 @@
#!/usr/bin/env bash
# Tags: no-debug
# no-debug: Query is canceled by timeout after max_execution_time,
# but sending an exception to the client may hang
# for more than MAX_PROCESS_WAIT seconds in a slow debug build,
# and test will fail.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -5,3 +5,4 @@
2021-12-31 2021-12-31 2021-12-31
2020-12-31 2020-12-31 2020-12-31
2020-12-31 2020-12-31
1970-01-31 1970-01-31 1900-01-31

View File

@ -44,3 +44,12 @@ SELECT toLastDayOfMonth(date_value), toLastDayOfMonth(date_time_value), toLastDa
WITH
toDate('2020-12-12') AS date_value
SELECT last_day(date_value), LAST_DAY(date_value);
-- boundaries
WITH
toDate('1970-01-01') AS date_value,
toDateTime('1970-01-01 11:22:33') AS date_time_value,
toDateTime64('1900-01-01 11:22:33', 3) AS date_time_64_value
SELECT toLastDayOfMonth(date_value), toLastDayOfMonth(date_time_value), toLastDayOfMonth(date_time_64_value)
SETTINGS enable_extended_results_for_datetime_functions = true;

View File

@ -0,0 +1,21 @@
-- From https://github.com/ClickHouse/ClickHouse/issues/41814
drop table if exists test;
create table test(a UInt64, m UInt64, d DateTime) engine MergeTree partition by toYYYYMM(d) order by (a, m, d);
insert into test select number, number, '2022-01-01 00:00:00' from numbers(1000000);
select count() from test where a = (select toUInt64(1) where 1 = 2) settings enable_early_constant_folding = 0, force_primary_key = 1;
drop table test;
-- From https://github.com/ClickHouse/ClickHouse/issues/34063
drop table if exists test_null_filter;
create table test_null_filter(key UInt64, value UInt32) engine MergeTree order by key;
insert into test_null_filter select number, number from numbers(10000000);
select count() from test_null_filter where key = null and value > 0 settings force_primary_key = 1;
drop table test_null_filter;

View File

@ -0,0 +1,7 @@
5
Values 21 1 Ok 1
t_async_inserts_logs JSONEachRow 39 1 Ok 1
t_async_inserts_logs Values 8 1 Ok 1
t_async_inserts_logs JSONEachRow 6 0 ParsingError 1
t_async_inserts_logs Values 6 0 ParsingError 1
t_async_inserts_logs Values 8 0 FlushError 1

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_inserts_logs"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_async_inserts_logs (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO t_async_inserts_logs FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}' &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_inserts_logs VALUES (1, 'a')" &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO t_async_inserts_logs FORMAT JSONEachRow qqqqqq' > /dev/null 2>&1 &
${CLICKHOUSE_CURL} -sS "$url" -d 'INSERT INTO t_async_inserts_logs VALUES qqqqqq' > /dev/null 2>&1 &
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO FUNCTION remote('127.0.0.1', currentDatabase(), t_async_inserts_logs) VALUES (1, 'aaa') (2, 'bbb')" &
wait
${CLICKHOUSE_CLIENT} -q "OPTIMIZE TABLE t_async_inserts_logs FINAL"
${CLICKHOUSE_CLIENT} -q "ALTER TABLE t_async_inserts_logs MODIFY SETTING parts_to_throw_insert = 1"
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO t_async_inserts_logs VALUES (1, 'a')" > /dev/null 2>&1 &
wait
${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_async_inserts_logs"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "
SELECT table, format, bytes, empty(exception), status,
status = 'ParsingError' ? flush_time_microseconds = 0 : flush_time_microseconds > event_time_microseconds AS time_ok
FROM system.asynchronous_insert_log
WHERE database = '$CLICKHOUSE_DATABASE' OR query ILIKE 'INSERT INTO FUNCTION%$CLICKHOUSE_DATABASE%'
ORDER BY table, status, format"
${CLICKHOUSE_CLIENT} -q "DROP TABLE t_async_inserts_logs"

View File

@ -0,0 +1,2 @@
1 6 2020-01-01 00:00:00
2 6 2020-01-02 00:00:00

View File

@ -0,0 +1,20 @@
SET allow_suspicious_low_cardinality_types = 1;
DROP TABLE IF EXISTS t_summing_lc;
CREATE TABLE t_summing_lc
(
`key` UInt32,
`val` LowCardinality(UInt32),
`date` DateTime
)
ENGINE = SummingMergeTree(val)
PARTITION BY date
ORDER BY key;
INSERT INTO t_summing_lc VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02');
OPTIMIZE TABLE t_summing_lc FINAL;
SELECT * FROM t_summing_lc ORDER BY key;
DROP TABLE t_summing_lc;

View File

@ -0,0 +1,10 @@
2017-12-01 18:31:44
2017-01-12 18:31:44
2017-12-01 18:31:44
2017-01-12 18:31:44
2017-12-01 18:31:44
2017-01-12 18:31:44
2015-12-31 18:31:44
2015-12-31 18:31:44
2015-12-31 18:31:44
2015-12-31 18:31:44

View File

@ -0,0 +1,16 @@
select parseDateTimeBestEffort('01/12/2017, 18:31:44');
select parseDateTimeBestEffortUS('01/12/2017, 18:31:44');
select parseDateTimeBestEffort('01/12/2017,18:31:44');
select parseDateTimeBestEffortUS('01/12/2017,18:31:44');
select parseDateTimeBestEffort('01/12/2017 , 18:31:44');
select parseDateTimeBestEffortUS('01/12/2017 ,18:31:44');
select parseDateTimeBestEffortUS('18:31:44, 31/12/2015');
select parseDateTimeBestEffortUS('18:31:44 , 31/12/2015');
select parseDateTimeBestEffort('18:31:44, 31/12/2015');
select parseDateTimeBestEffort('18:31:44 , 31/12/2015');
select parseDateTimeBestEffort('01/12/2017,'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTimeBestEffortUS('18:31:44,,,, 31/12/2015'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTimeBestEffortUS('18:31:44, 31/12/2015,'); -- { serverError CANNOT_PARSE_TEXT }
select parseDateTimeBestEffort('01/12/2017, 18:31:44,'); -- { serverError CANNOT_PARSE_TEXT }
select parseDateTimeBestEffort('01/12/2017, ,,,18:31:44'); -- { serverError CANNOT_PARSE_DATETIME }
select parseDateTimeBestEffort('18:31:44 ,,,,, 31/12/2015'); -- { serverError CANNOT_PARSE_DATETIME }

View File

@ -13,6 +13,7 @@ v22.7.4.16-stable 2022-08-23
v22.7.3.5-stable 2022-08-10
v22.7.2.15-stable 2022-08-03
v22.7.1.2484-stable 2022-07-21
v22.6.9.11-stable 2022-10-06
v22.6.8.35-stable 2022-09-19
v22.6.7.7-stable 2022-08-29
v22.6.6.16-stable 2022-08-23

1 v22.9.3.18-stable 2022-09-30
13 v22.7.3.5-stable 2022-08-10
14 v22.7.2.15-stable 2022-08-03
15 v22.7.1.2484-stable 2022-07-21
16 v22.6.9.11-stable 2022-10-06
17 v22.6.8.35-stable 2022-09-19
18 v22.6.7.7-stable 2022-08-29
19 v22.6.6.16-stable 2022-08-23