Merge branch 'master' into tighten-limits-functional-tests

This commit is contained in:
Alexey Milovidov 2024-08-04 21:44:38 +02:00
commit f08b32d528
71 changed files with 598 additions and 330 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit cb5dc3c906e80f253e9ce9535807caef827cc2e0
Subproject commit c2b0811f164a7948208489562dab4f186eb305ce

View File

@ -4,9 +4,7 @@ else ()
option(ENABLE_ICU "Enable ICU" 0)
endif ()
# Temporarily disabled s390x because the ICU build links a blob (icudt71b_dat.S) and our friends from IBM did not explain how they generated
# the blob on s390x: https://github.com/ClickHouse/icudata/pull/2#issuecomment-2226957255
if (NOT ENABLE_ICU OR ARCH_S390X)
if (NOT ENABLE_ICU)
message(STATUS "Not using ICU")
return()
endif()

2
contrib/icudata vendored

@ -1 +1 @@
Subproject commit d345d6ac22f381c882420de9053d30ae1ff38d75
Subproject commit 4904951339a70b4814d2d3723436b20d079cb01b

View File

@ -212,6 +212,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--shared-catalog')
fi
if [[ "$USE_DISTRIBUTED_CACHE" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--distributed-cache')
fi
if [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
# Too many tests fail for DatabaseReplicated in parallel.

View File

@ -43,7 +43,7 @@ Result:
## mapFromArrays
Creates a map from an array of keys and an array of values.
Creates a map from an array or map of keys and an array or map of values.
The function is a convenient alternative to syntax `CAST([...], 'Map(key_type, value_type)')`.
For example, instead of writing
@ -62,8 +62,8 @@ Alias: `MAP_FROM_ARRAYS(keys, values)`
**Arguments**
- `keys` — Array of keys to create the map from. [Array(T)](../data-types/array.md) where `T` can be any type supported by [Map](../data-types/map.md) as key type.
- `values` - Array or map of values to create the map from. [Array](../data-types/array.md) or [Map](../data-types/map.md).
- `keys` — Array or map of keys to create the map from [Array](../data-types/array.md) or [Map](../data-types/map.md). If `keys` is an array, we accept `Array(Nullable(T))` or `Array(LowCardinality(Nullable(T)))` as its type as long as it doesn't contain NULL value.
- `values` - Array or map of values to create the map from [Array](../data-types/array.md) or [Map](../data-types/map.md).
**Returned value**
@ -99,6 +99,18 @@ Result:
└───────────────────────────────────────────────────────┘
```
```sql
SELECT mapFromArrays(map('a', 1, 'b', 2, 'c', 3), [1, 2, 3])
```
Result:
```
┌─mapFromArrays(map('a', 1, 'b', 2, 'c', 3), [1, 2, 3])─┐
│ {('a',1):1,('b',2):2,('c',3):3} │
└───────────────────────────────────────────────────────┘
```
## extractKeyValuePairs
Converts a string of key-value pairs to a [Map(String, String)](../data-types/map.md).

View File

@ -849,7 +849,7 @@ try
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers"
LOG_INFO(log, "Query Profiler is disabled because it cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif

View File

@ -1130,8 +1130,7 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</query_views_log>
<!-- Uncomment if use part log.
Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads).-->
<!-- Part log contains information about all actions with parts in MergeTree tables (creation, deletion, merges, downloads). -->
<part_log>
<database>system</database>
<table>part_log</table>
@ -1143,9 +1142,9 @@
<flush_on_crash>false</flush_on_crash>
</part_log>
<!-- Uncomment to write text log into table.
Text log contains all information from usual server log but stores it in structured and efficient way.
<!-- Text log contains all information from usual server log but stores it in structured and efficient way.
The level of the messages that goes to the table can be limited (<level>), if not specified all messages will go to the table.
-->
<text_log>
<database>system</database>
<table>text_log</table>
@ -1154,9 +1153,8 @@
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
<level></level>
<level>trace</level>
</text_log>
-->
<!-- Metric log contains rows with current values of ProfileEvents, CurrentMetrics collected with "collect_interval_milliseconds" interval. -->
<metric_log>

View File

@ -17,7 +17,7 @@
--input-shadow-color: rgba(0, 255, 0, 1);
--error-color: red;
--global-error-color: white;
--legend-background: rgba(255, 255, 255, 0.75);
--legend-background: rgba(255, 255, 0, 0.75);
--title-color: #666;
--text-color: black;
--edit-title-background: #FEE;
@ -41,7 +41,7 @@
--moving-shadow-color: rgba(255, 255, 255, 0.25);
--input-shadow-color: rgba(255, 128, 0, 0.25);
--error-color: #F66;
--legend-background: rgba(255, 255, 255, 0.25);
--legend-background: rgba(0, 96, 128, 0.75);
--title-color: white;
--text-color: white;
--edit-title-background: #364f69;
@ -218,6 +218,7 @@
#chart-params .param {
width: 6%;
font-family: monospace;
}
input {
@ -256,6 +257,7 @@
font-weight: bold;
user-select: none;
cursor: pointer;
margin-bottom: 1rem;
}
#run:hover {
@ -309,7 +311,7 @@
color: var(--param-text-color);
display: inline-block;
box-shadow: 1px 1px 0 var(--shadow-color);
margin-bottom: 1rem;
margin-bottom: 0.5rem;
}
input:focus {
@ -657,6 +659,10 @@ function insertParam(name, value) {
param_value.value = value;
param_value.spellcheck = false;
let setWidth = e => { e.style.width = (e.value.length + 1) + 'ch' };
if (value) { setWidth(param_value); }
param_value.addEventListener('input', e => setWidth(e.target));
param_wrapper.appendChild(param_name);
param_wrapper.appendChild(param_value);
document.getElementById('chart-params').appendChild(param_wrapper);
@ -945,6 +951,7 @@ function showMassEditor() {
let editor = document.getElementById('mass-editor-textarea');
editor.value = JSON.stringify({params: params, queries: queries}, null, 2);
editor.focus();
mass_editor_active = true;
}
@ -1004,14 +1011,14 @@ function legendAsTooltipPlugin({ className, style = { background: "var(--legend-
className && legendEl.classList.add(className);
uPlot.assign(legendEl.style, {
textAlign: "left",
textAlign: "right",
pointerEvents: "none",
display: "none",
position: "absolute",
left: 0,
top: 0,
zIndex: 100,
boxShadow: "2px 2px 10px rgba(0,0,0,0.1)",
boxShadow: "2px 2px 10px rgba(0, 0, 0, 0.1)",
...style
});
@ -1051,8 +1058,10 @@ function legendAsTooltipPlugin({ className, style = { background: "var(--legend-
function update(u) {
let { left, top } = u.cursor;
left -= legendEl.clientWidth / 2;
top -= legendEl.clientHeight / 2;
/// This will make the balloon to the right of the cursor when the cursor is on the left side, and vise-versa,
/// avoiding the borders of the chart.
left -= legendEl.clientWidth * (left / u.width);
top -= legendEl.clientHeight;
legendEl.style.transform = "translate(" + left + "px, " + top + "px)";
if (multiline) {
@ -1139,7 +1148,7 @@ async function draw(idx, chart, url_params, query) {
let {reply, error} = await doFetch(query, url_params);
if (!error) {
if (reply.rows.length == 0) {
if (reply.rows == 0) {
error = "Query returned empty result.";
} else if (reply.meta.length < 2) {
error = "Query should return at least two columns: unix timestamp and value.";
@ -1229,14 +1238,53 @@ async function draw(idx, chart, url_params, query) {
let sync = uPlot.sync("sync");
let axis = {
function formatDateTime(t) {
return (new Date(t * 1000)).toISOString().replace('T', '\n').replace('.000Z', '');
}
function formatDateTimes(self, ticks) {
return ticks.map((t, idx) => {
let res = formatDateTime(t);
if (idx == 0 || res.substring(0, 10) != formatDateTime(ticks[idx - 1]).substring(0, 10)) {
return res;
} else {
return res.substring(11);
}
});
}
function formatValue(v) {
const a = Math.abs(v);
if (a >= 1000000000000000) { return (v / 1000000000000000) + 'P'; }
if (a >= 1000000000000) { return (v / 1000000000000) + 'T'; }
if (a >= 1000000000) { return (v / 1000000000) + 'G'; }
if (a >= 1000000) { return (v / 1000000) + 'M'; }
if (a >= 1000) { return (v / 1000) + 'K'; }
if (a > 0 && a < 0.001) { return (v * 1000000) + "μ"; }
return v;
}
let axis_x = {
stroke: axes_color,
grid: { width: 1 / devicePixelRatio, stroke: grid_color },
ticks: { width: 1 / devicePixelRatio, stroke: grid_color }
ticks: { width: 1 / devicePixelRatio, stroke: grid_color },
values: formatDateTimes,
space: 80,
incrs: [1, 5, 10, 15, 30,
60, 60 * 5, 60 * 10, 60 * 15, 60 * 30,
3600, 3600 * 2, 3600 * 3, 3600 * 4, 3600 * 6, 3600 * 12,
3600 * 24],
};
let axes = [axis, axis];
let series = [{ label: "x" }];
let axis_y = {
stroke: axes_color,
grid: { width: 1 / devicePixelRatio, stroke: grid_color },
ticks: { width: 1 / devicePixelRatio, stroke: grid_color },
values: (self, ticks) => ticks.map(formatValue)
};
let axes = [axis_x, axis_y];
let series = [{ label: "time", value: (self, t) => formatDateTime(t) }];
let data = [reply.data[reply.meta[0].name]];
// Treat every column as series
@ -1254,9 +1302,10 @@ async function draw(idx, chart, url_params, query) {
const opts = {
width: chart.clientWidth,
height: chart.clientHeight,
scales: { x: { time: false } }, /// Because we want to split and format time on our own.
axes,
series,
padding: [ null, null, null, (Math.round(max_value * 100) / 100).toString().length * 6 - 10 ],
padding: [ null, null, null, 3 ],
plugins: [ legendAsTooltipPlugin() ],
cursor: {
sync: {

View File

@ -67,6 +67,9 @@ struct UniqVariadicHash<false, true>
{
static UInt64 apply(size_t num_args, const IColumn ** columns, size_t row_num)
{
if (!num_args)
return 0;
UInt64 hash;
const auto & tuple_columns = assert_cast<const ColumnTuple *>(columns[0])->getColumns();

View File

@ -43,6 +43,12 @@ size_t getCompoundTypeDepth(const IDataType & type)
const auto & tuple_elements = assert_cast<const DataTypeTuple &>(*current_type).getElements();
if (!tuple_elements.empty())
current_type = tuple_elements.at(0).get();
else
{
/// Special case: tuple with no element - tuple(). In this case, what's the compound type depth?
/// I'm not certain about the theoretical answer, but from experiment, 1 is the most reasonable choice.
return 1;
}
++result;
}

View File

@ -323,7 +323,7 @@ bool RestoreCoordinationRemote::hasConcurrentRestores(const std::atomic<size_t>
return false;
bool result = false;
std::string path = zookeeper_path +"/stage";
std::string path = zookeeper_path + "/stage";
auto holder = with_retries.createRetriesControlHolder("createRootNodes");
holder.retries_ctl.retryLoop(

View File

@ -61,8 +61,6 @@ private:
void createRootNodes();
void removeAllNodes();
class ReplicatedDatabasesMetadataSync;
/// get_zookeeper will provide a zookeeper client without any fault injection
const zkutil::GetZooKeeper get_zookeeper;
const String root_zookeeper_path;

View File

@ -222,10 +222,19 @@ void RestorerFromBackup::setStage(const String & new_stage, const String & messa
if (restore_coordination)
{
restore_coordination->setStage(new_stage, message);
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(new_stage);
/// The initiator of a RESTORE ON CLUSTER query waits for other hosts to complete their work (see waitForStage(Stage::COMPLETED) in BackupsWorker::doRestore),
/// but other hosts shouldn't wait for each others' completion. (That's simply unnecessary and also
/// the initiator may start cleaning up (e.g. removing restore-coordination ZooKeeper nodes) once all other hosts are in Stage::COMPLETED.)
bool need_wait = (new_stage != Stage::COMPLETED);
if (need_wait)
{
if (new_stage == Stage::FINDING_TABLES_IN_BACKUP)
restore_coordination->waitForStage(new_stage, on_cluster_first_sync_timeout);
else
restore_coordination->waitForStage(new_stage);
}
}
}

View File

@ -19,7 +19,7 @@ Epoll::Epoll() : events_count(0)
{
epoll_fd = epoll_create1(0);
if (epoll_fd == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot open epoll descriptor");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot open epoll descriptor");
}
Epoll::Epoll(Epoll && other) noexcept : epoll_fd(other.epoll_fd), events_count(other.events_count.load())
@ -47,7 +47,7 @@ void Epoll::add(int fd, void * ptr, uint32_t events)
++events_count;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot add new descriptor to epoll");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot add new descriptor to epoll");
}
void Epoll::remove(int fd)
@ -55,7 +55,7 @@ void Epoll::remove(int fd)
--events_count;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == -1)
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Cannot remove descriptor from epoll");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Cannot remove descriptor from epoll");
}
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, int timeout) const
@ -82,7 +82,7 @@ size_t Epoll::getManyReady(int max_events, epoll_event * events_out, int timeout
continue;
}
else
throw DB::ErrnoException(DB::ErrorCodes::EPOLL_ERROR, "Error in epoll_wait");
throw ErrnoException(ErrorCodes::EPOLL_ERROR, "Error in epoll_wait");
}
else
break;

View File

@ -253,18 +253,18 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
}
}
for (auto & rec : merged)
for (auto & record : merged)
{
if (!rec.failed)
continue;
if (!record.failed || !record.consecutive_fail_count)
continue;
/// Exponential increased time for each consecutive fail
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (rec.consecutive_fail_count - 1)));
if (rec.fail_time < banned_until)
{
rec.failed = false;
CurrentMetrics::sub(metrics.banned_count);
}
/// Exponential increased time for each consecutive fail
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (record.consecutive_fail_count - 1)));
if (record.fail_time < banned_until)
{
record.failed = false;
CurrentMetrics::sub(metrics.banned_count);
}
}
chassert(std::is_sorted(merged.begin(), merged.end()));

View File

@ -19,6 +19,10 @@ TaskTracker::TaskTracker(ThreadPoolCallbackRunnerUnsafe<void> scheduler_, size_t
TaskTracker::~TaskTracker()
{
/// Tasks should be waited outside of dtor.
/// Important for WriteBufferFromS3/AzureBlobStorage, where TaskTracker is currently used.
chassert(finished_futures.empty() && futures.empty());
safeWaitAll();
}
@ -170,4 +174,3 @@ bool TaskTracker::isAsync() const
}
}

View File

@ -2,6 +2,7 @@
#include <Common/TimerDescriptor.h>
#include <Common/Exception.h>
#include <Common/Epoll.h>
#include <Common/logger_useful.h>
#include <sys/timerfd.h>
@ -75,10 +76,22 @@ void TimerDescriptor::drain() const
/// or since the last successful read(2), then the buffer given to read(2) returns an unsigned 8-byte integer (uint64_t)
/// containing the number of expirations that have occurred.
/// (The returned value is in host byte order—that is, the native byte order for integers on the host machine.)
/// Due to a bug in Linux Kernel, reading from timerfd in non-blocking mode can be still blocking.
/// Avoid it with polling.
Epoll epoll;
epoll.add(timer_fd);
epoll_event event;
event.data.fd = -1;
size_t ready_count = epoll.getManyReady(1, &event, 0);
if (!ready_count)
return;
uint64_t buf;
while (true)
{
ssize_t res = ::read(timer_fd, &buf, sizeof(buf));
if (res < 0)
{
/// man timerfd_create:

View File

@ -1127,6 +1127,7 @@ class IColumn;
M(Bool, input_format_json_throw_on_bad_escape_sequence, true, "Throw an exception if JSON string contains bad escape sequence in JSON input formats. If disabled, bad escape sequences will remain as is in the data", 0) \
M(Bool, input_format_json_ignore_unnecessary_fields, true, "Ignore unnecessary fields and not parse them. Enabling this may not throw exceptions on json strings of invalid format or with duplicated fields", 0) \
M(Bool, input_format_json_case_insensitive_column_matching, false, "Ignore case when matching JSON keys with CH columns", 0) \
M(UInt64, input_format_json_max_depth, 1000, "Maximum depth of a field in JSON. This is not a strict limit, it does not have to be applied precisely.", 0) \
M(Bool, input_format_try_infer_integers, true, "Try to infer integers instead of floats while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_dates, true, "Try to infer dates from string fields while schema inference in text formats", 0) \
M(Bool, input_format_try_infer_datetimes, true, "Try to infer datetimes from string fields while schema inference in text formats", 0) \

View File

@ -75,6 +75,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
{"24.8",
{
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
}

View File

@ -44,7 +44,7 @@ namespace ErrorCodes
DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, ContextPtr context_)
: DatabaseOnDisk(name_, metadata_path_, "data/" + escapeForFileName(name_) + "/", "DatabaseLazy (" + name_ + ")", context_)
: DatabaseOnDisk(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseLazy (" + name_ + ")", context_)
, expiration_time(expiration_time_)
{
}

View File

@ -12,7 +12,7 @@ class DatabaseLazyIterator;
class Context;
/** Lazy engine of databases.
* Works like DatabaseOrdinary, but stores in memory only cache.
* Works like DatabaseOrdinary, but stores in memory only the cache.
* Can be used only with *Log engines.
*/
class DatabaseLazy final : public DatabaseOnDisk

View File

@ -135,8 +135,11 @@ bool CachedOnDiskReadBufferFromFile::nextFileSegmentsBatch()
else
{
CreateFileSegmentSettings create_settings(FileSegmentKind::Regular);
file_segments = cache->getOrSet(cache_key, file_offset_of_buffer_end, size, file_size.value(), create_settings, settings.filesystem_cache_segments_batch_size, user);
file_segments = cache->getOrSet(
cache_key, file_offset_of_buffer_end, size, file_size.value(),
create_settings, settings.filesystem_cache_segments_batch_size, user);
}
return !file_segments->empty();
}
@ -158,8 +161,8 @@ void CachedOnDiskReadBufferFromFile::initialize()
LOG_TEST(
log,
"Having {} file segments to read: {}, current offset: {}",
file_segments->size(), file_segments->toString(), file_offset_of_buffer_end);
"Having {} file segments to read: {}, current read range: [{}, {})",
file_segments->size(), file_segments->toString(), file_offset_of_buffer_end, read_until_position);
initialized = true;
}
@ -1043,6 +1046,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (file_segments->size() == 1)
{
size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1;
LOG_TEST(log, "Remaining size to read: {}, read: {}. Resizing buffer to {}",
remaining_size_to_read, size, nextimpl_working_buffer_offset + std::min(size, remaining_size_to_read));
size = std::min(size, remaining_size_to_read);
chassert(implementation_buffer->buffer().size() >= nextimpl_working_buffer_offset + size);
implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size);
@ -1055,8 +1062,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
chassert(
file_offset_of_buffer_end <= read_until_position,
fmt::format("Expected {} <= {} (size: {}, read range: {})",
file_offset_of_buffer_end, read_until_position, size, current_read_range.toString()));
fmt::format("Expected {} <= {} (size: {}, read range: {}, hold file segments: {} ({}))",
file_offset_of_buffer_end, read_until_position, size, current_read_range.toString(), file_segments->size(), file_segments->toString(true)));
}
swap(*implementation_buffer);

View File

@ -123,6 +123,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;
format_settings.input_allow_errors_ratio = settings.input_format_allow_errors_ratio;
format_settings.json.max_depth = settings.input_format_json_max_depth;
format_settings.json.array_of_rows = settings.output_format_json_array_of_rows;
format_settings.json.escape_forward_slashes = settings.output_format_json_escape_forward_slashes;
format_settings.json.write_named_tuples_as_objects = settings.output_format_json_named_tuples_as_objects;

View File

@ -205,6 +205,7 @@ struct FormatSettings
struct JSON
{
size_t max_depth = 1000;
bool array_of_rows = false;
bool quote_64bit_integers = true;
bool quote_64bit_floats = false;

View File

@ -1,9 +1,9 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <base/arithmeticOverflow.h>
namespace DB
@ -15,7 +15,8 @@ namespace ErrorCodes
extern const int TOO_LARGE_ARRAY_SIZE;
}
/// Reasonable threshold.
/// Reasonable thresholds.
static constexpr Int64 max_array_size_in_columns_bytes = 1000000000;
static constexpr size_t max_arrays_size_in_columns = 1000000000;
@ -63,12 +64,19 @@ public:
auto array_size = col_num->getInt(i);
if (unlikely(array_size < 0))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size cannot be negative: while executing function {}", getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
Int64 estimated_size = 0;
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
offset += array_size;
if (unlikely(offset > max_arrays_size_in_columns))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size while executing function {}", getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size {} (will generate at least {} elements) while executing function {}", array_size, offset, getName());
offsets.push_back(offset);
}

View File

@ -1,14 +1,17 @@
#include <Functions/IFunction.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/getLeastSupertype.h>
#include <Columns/ColumnMap.h>
#include <Interpreters/castColumn.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>
#include <Common/HashTable/HashSet.h>
#include <Core/Settings.h>
@ -21,6 +24,7 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SIZES_OF_ARRAYS_DONT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
}
namespace
@ -155,7 +159,7 @@ private:
bool use_variant_as_common_type = false;
};
/// mapFromArrays(keys, values) is a function that allows you to make key-value pair from a pair of arrays
/// mapFromArrays(keys, values) is a function that allows you to make key-value pair from a pair of arrays or maps
class FunctionMapFromArrays : public IFunction
{
public:
@ -179,21 +183,28 @@ public:
getName(),
arguments.size());
/// The first argument should always be Array.
/// Because key type can not be nested type of Map, which is Tuple
DataTypePtr key_type;
if (const auto * keys_type = checkAndGetDataType<DataTypeArray>(arguments[0].get()))
key_type = keys_type->getNestedType();
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be an Array", getName());
auto get_nested_type = [&](const DataTypePtr & type)
{
DataTypePtr nested;
if (const auto * type_as_array = checkAndGetDataType<DataTypeArray>(type.get()))
nested = type_as_array->getNestedType();
else if (const auto * type_as_map = checkAndGetDataType<DataTypeMap>(type.get()))
nested = std::make_shared<DataTypeTuple>(type_as_map->getKeyValueTypes());
else
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} must be Array or Map, but {} is given",
getName(),
type->getName());
DataTypePtr value_type;
if (const auto * value_array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get()))
value_type = value_array_type->getNestedType();
else if (const auto * value_map_type = checkAndGetDataType<DataTypeMap>(arguments[1].get()))
value_type = std::make_shared<DataTypeTuple>(value_map_type->getKeyValueTypes());
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Second argument for function {} must be Array or Map", getName());
return nested;
};
auto key_type = get_nested_type(arguments[0]);
auto value_type = get_nested_type(arguments[1]);
/// We accept Array(Nullable(T)) or Array(LowCardinality(Nullable(T))) as key types as long as the actual array doesn't contain NULL value(this is checked in executeImpl).
key_type = removeNullableOrLowCardinalityNullable(key_type);
DataTypes key_value_types{key_type, value_type};
return std::make_shared<DataTypeMap>(key_value_types);
@ -202,44 +213,59 @@ public:
ColumnPtr executeImpl(
const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* result_type */, size_t /* input_rows_count */) const override
{
bool is_keys_const = isColumnConst(*arguments[0].column);
ColumnPtr holder_keys;
const ColumnArray * col_keys;
if (is_keys_const)
auto get_array_column = [&](const ColumnPtr & column) -> std::pair<const ColumnArray *, ColumnPtr>
{
holder_keys = arguments[0].column->convertToFullColumnIfConst();
col_keys = checkAndGetColumn<ColumnArray>(holder_keys.get());
}
else
bool is_const = isColumnConst(*column);
ColumnPtr holder = is_const ? column->convertToFullColumnIfConst() : column;
const ColumnArray * col_res = nullptr;
if (const auto * col_array = checkAndGetColumn<ColumnArray>(holder.get()))
col_res = col_array;
else if (const auto * col_map = checkAndGetColumn<ColumnMap>(holder.get()))
col_res = &col_map->getNestedColumn();
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Argument columns of function {} must be Array or Map, but {} is given",
getName(),
holder->getName());
return {col_res, holder};
};
auto [col_keys, key_holder] = get_array_column(arguments[0].column);
auto [col_values, values_holder] = get_array_column(arguments[1].column);
/// Nullable(T) or LowCardinality(Nullable(T)) are okay as nested key types but actual NULL values are not okay.
ColumnPtr data_keys = col_keys->getDataPtr();
if (isColumnNullableOrLowCardinalityNullable(*data_keys))
{
col_keys = checkAndGetColumn<ColumnArray>(arguments[0].column.get());
const NullMap * null_map = nullptr;
if (const auto * nullable = checkAndGetColumn<ColumnNullable>(data_keys.get()))
{
null_map = &nullable->getNullMapData();
data_keys = nullable->getNestedColumnPtr();
}
else if (const auto * low_cardinality = checkAndGetColumn<ColumnLowCardinality>(data_keys.get()))
{
if (const auto * nullable_dict = checkAndGetColumn<ColumnNullable>(low_cardinality->getDictionaryPtr().get()))
{
null_map = &nullable_dict->getNullMapData();
data_keys = ColumnLowCardinality::create(nullable_dict->getNestedColumnPtr(), low_cardinality->getIndexesPtr());
}
}
if (null_map && !memoryIsZero(null_map->data(), 0, null_map->size()))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "The nested column of first argument in function {} must not contain NULLs", getName());
}
if (!col_keys)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The first argument of function {} must be Array", getName());
bool is_values_const = isColumnConst(*arguments[1].column);
ColumnPtr holder_values;
if (is_values_const)
holder_values = arguments[1].column->convertToFullColumnIfConst();
else
holder_values = arguments[1].column;
const ColumnArray * col_values;
if (const auto * col_values_array = checkAndGetColumn<ColumnArray>(holder_values.get()))
col_values = col_values_array;
else if (const auto * col_values_map = checkAndGetColumn<ColumnMap>(holder_values.get()))
col_values = &col_values_map->getNestedColumn();
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The second arguments of function {} must be Array or Map", getName());
if (!col_keys->hasEqualOffsets(*col_values))
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Two arguments for function {} must have equal sizes", getName());
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Two arguments of function {} must have equal sizes", getName());
const auto & data_keys = col_keys->getDataPtr();
const auto & data_values = col_values->getDataPtr();
const auto & offsets = col_keys->getOffsetsPtr();
auto nested_column = ColumnArray::create(ColumnTuple::create(Columns{data_keys, data_values}), offsets);
auto nested_column = ColumnArray::create(ColumnTuple::create(Columns{std::move(data_keys), data_values}), offsets);
return ColumnMap::create(nested_column);
}
};
@ -250,10 +276,7 @@ public:
static constexpr auto name = "mapUpdate";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMapUpdate>(); }
String getName() const override
{
return name;
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
@ -262,9 +285,11 @@ public:
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 2",
getName(), arguments.size());
getName(),
arguments.size());
const auto * left = checkAndGetDataType<DataTypeMap>(arguments[0].type.get());
const auto * right = checkAndGetDataType<DataTypeMap>(arguments[1].type.get());
@ -380,7 +405,6 @@ public:
return ColumnMap::create(nested_column);
}
};
}
REGISTER_FUNCTION(Map)

View File

@ -3,13 +3,13 @@
#include <Common/PODArray.h>
#include <Common/StringUtils.h>
#include <Common/memcpySmall.h>
#include <Common/checkStackSize.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBufferFromString.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/readFloatText.h>
#include <IO/Operators.h>
#include <base/find_symbols.h>
#include <cstdlib>
#include <bit>
@ -39,6 +39,7 @@ namespace ErrorCodes
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int TOO_DEEP_RECURSION;
}
template <size_t num_bytes, typename IteratorSrc, typename IteratorDst>
@ -1494,10 +1495,20 @@ template bool readDateTimeTextFallback<bool, true>(time_t &, ReadBuffer &, const
template <typename ReturnType>
ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings)
ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings, size_t current_depth)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
if (unlikely(current_depth > settings.max_depth))
{
if constexpr (throw_exception)
throw Exception(ErrorCodes::TOO_DEEP_RECURSION, "JSON is too deep for key '{}'", name_of_field.toString());
return ReturnType(false);
}
if (unlikely(current_depth > 0 && current_depth % 1024 == 0))
checkStackSize();
if (buf.eof())
{
if constexpr (throw_exception)
@ -1560,8 +1571,8 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
while (true)
{
if constexpr (throw_exception)
skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings);
else if (!skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings))
skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings, current_depth + 1);
else if (!skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings, current_depth + 1))
return ReturnType(false);
skipWhitespaceIfAny(buf);
@ -1619,8 +1630,8 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
skipWhitespaceIfAny(buf);
if constexpr (throw_exception)
skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings);
else if (!skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings))
skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings, current_depth + 1);
else if (!skipJSONFieldImpl<ReturnType>(buf, name_of_field, settings, current_depth + 1))
return ReturnType(false);
skipWhitespaceIfAny(buf);
@ -1659,12 +1670,12 @@ ReturnType skipJSONFieldImpl(ReadBuffer & buf, StringRef name_of_field, const Fo
void skipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings)
{
skipJSONFieldImpl<void>(buf, name_of_field, settings);
skipJSONFieldImpl<void>(buf, name_of_field, settings, 0);
}
bool trySkipJSONField(ReadBuffer & buf, StringRef name_of_field, const FormatSettings::JSON & settings)
{
return skipJSONFieldImpl<bool>(buf, name_of_field, settings);
return skipJSONFieldImpl<bool>(buf, name_of_field, settings, 0);
}

View File

@ -277,12 +277,10 @@ WriteBufferFromS3::~WriteBufferFromS3()
"The file might not be written to S3. "
"{}.",
getVerboseLogDetails());
return;
}
/// That destructor could be call with finalized=false in case of exceptions
if (!finalized && !canceled)
else if (!finalized)
{
/// That destructor could be call with finalized=false in case of exceptions
LOG_INFO(
log,
"WriteBufferFromS3 is not finalized in destructor. "
@ -291,9 +289,10 @@ WriteBufferFromS3::~WriteBufferFromS3()
getVerboseLogDetails());
}
/// Wait for all tasks, because they contain reference to this write buffer.
task_tracker->safeWaitAll();
if (!multipart_upload_id.empty() && !multipart_upload_finished)
if (!canceled && !multipart_upload_id.empty() && !multipart_upload_finished)
{
LOG_WARNING(log, "WriteBufferFromS3 was neither finished nor aborted, try to abort upload in destructor. {}.", getVerboseLogDetails());
tryToAbortMultipartUpload();

View File

@ -316,14 +316,36 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
return result;
}
std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size)
std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size, size_t aligned_size)
{
assert(size > 0);
chassert(size > 0);
chassert(size <= aligned_size);
/// Consider this example to understand why we need to account here for both `size` and `aligned_size`.
/// [________________]__________________] <-- requested range
/// ^ ^
/// right offset aligned_right_offset
/// [_________] <-- last cached file segment, e.g. we have uncovered suffix of the requested range
/// [________________]
/// size
/// [____________________________________]
/// aligned_size
///
/// So it is possible that we split this hole range into sub-segments by `max_file_segment_size`
/// and get something like this:
///
/// [________________________]
/// ^ ^
/// right_offset right_offset + max_file_segment_size
/// e.g. there is no need to create sub-segment for range (right_offset + max_file_segment_size, aligned_right_offset].
/// Because its left offset would be bigger than right_offset.
/// Therefore, we set end_pos_non_included as offset+size, but remaining_size as aligned_size.
std::vector<FileSegment::Range> ranges;
size_t current_pos = offset;
size_t end_pos_non_included = offset + size;
size_t remaining_size = size;
size_t remaining_size = aligned_size;
FileSegments file_segments;
const size_t max_size = max_file_segment_size.load();
@ -339,43 +361,30 @@ std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size
return ranges;
}
FileSegments FileCache::splitRangeIntoFileSegments(
FileSegments FileCache::createFileSegmentsFromRanges(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
const std::vector<FileSegment::Range> & ranges,
size_t & file_segments_count,
size_t file_segments_limit,
const CreateFileSegmentSettings & create_settings)
{
assert(size > 0);
auto current_pos = offset;
auto end_pos_non_included = offset + size;
size_t current_file_segment_size;
size_t remaining_size = size;
FileSegments file_segments;
const size_t max_size = max_file_segment_size.load();
while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit))
FileSegments result;
for (const auto & r : ranges)
{
current_file_segment_size = std::min(remaining_size, max_size);
remaining_size -= current_file_segment_size;
auto file_segment_metadata_it = addFileSegment(
locked_key, current_pos, current_file_segment_size, state, create_settings, nullptr);
file_segments.push_back(file_segment_metadata_it->second->file_segment);
current_pos += current_file_segment_size;
if (file_segments_limit && file_segments_count >= file_segments_limit)
break;
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
result.push_back(metadata_it->second->file_segment);
++file_segments_count;
}
return file_segments;
return result;
}
void FileCache::fillHolesWithEmptyFileSegments(
LockedKey & locked_key,
FileSegments & file_segments,
const FileSegment::Range & range,
size_t non_aligned_right_offset,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & create_settings)
@ -442,18 +451,9 @@ void FileCache::fillHolesWithEmptyFileSegments(
}
else
{
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
if (is_limit_reached())
break;
}
file_segments.splice(it, std::move(hole));
const auto ranges = splitRange(current_pos, hole_size, hole_size);
auto hole_segments = createFileSegmentsFromRanges(locked_key, ranges, processed_count, file_segments_limit, create_settings);
file_segments.splice(it, std::move(hole_segments));
}
if (is_limit_reached())
@ -479,7 +479,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
chassert(!file_segments_limit || file_segments.size() < file_segments_limit);
if (current_pos <= range.right)
if (current_pos <= non_aligned_right_offset)
{
/// ________] -- requested range
/// _____]
@ -487,28 +487,20 @@ void FileCache::fillHolesWithEmptyFileSegments(
/// segmentN
auto hole_size = range.right - current_pos + 1;
auto non_aligned_hole_size = non_aligned_right_offset - current_pos + 1;
if (fill_with_detached_file_segments)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
locked_key.getKey(), current_pos, non_aligned_hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(file_segments.end(), file_segment);
}
else
{
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
if (is_limit_reached())
break;
}
file_segments.splice(it, std::move(hole));
const auto ranges = splitRange(current_pos, non_aligned_hole_size, hole_size);
auto hole_segments = createFileSegmentsFromRanges(locked_key, ranges, processed_count, file_segments_limit, create_settings);
file_segments.splice(it, std::move(hole_segments));
if (is_limit_reached())
erase_unprocessed();
@ -541,8 +533,9 @@ FileSegmentsHolderPtr FileCache::set(
}
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, create_settings);
const auto ranges = splitRange(offset, size, size);
size_t file_segments_count = 0;
file_segments = createFileSegmentsFromRanges(*locked_key, ranges, file_segments_count, /* file_segments_limit */0, create_settings);
}
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -562,23 +555,27 @@ FileCache::getOrSet(
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
FileSegment::Range initial_range(offset, offset + size - 1);
/// result_range is initial range, which will be adjusted according to
/// 1. aligned offset, alighed_end_offset
/// 2. max_file_segments_limit
FileSegment::Range result_range = initial_range;
const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
const auto aligned_offset = roundDownToMultiple(initial_range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(initial_range.right + 1, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= range.left);
chassert(aligned_end_offset >= range.right);
chassert(aligned_offset <= initial_range.left);
chassert(aligned_end_offset >= initial_range.right);
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, user);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
auto file_segments = getImpl(*locked_key, initial_range, file_segments_limit);
if (file_segments_limit)
{
chassert(file_segments.size() <= file_segments_limit);
if (file_segments.size() == file_segments_limit)
range.right = aligned_end_offset = file_segments.back()->range().right;
result_range.right = aligned_end_offset = file_segments.back()->range().right;
}
/// Check case if we have uncovered prefix, e.g.
@ -590,11 +587,11 @@ FileCache::getOrSet(
/// [ ]
/// ^----^
/// uncovered prefix.
const bool has_uncovered_prefix = file_segments.empty() || range.left < file_segments.front()->range().left;
const bool has_uncovered_prefix = file_segments.empty() || result_range.left < file_segments.front()->range().left;
if (aligned_offset < range.left && has_uncovered_prefix)
if (aligned_offset < result_range.left && has_uncovered_prefix)
{
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? range.left - 1 : file_segments.front()->range().left - 1);
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? result_range.left - 1 : file_segments.front()->range().left - 1);
auto prefix_file_segments = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);
if (prefix_file_segments.empty())
@ -603,7 +600,7 @@ FileCache::getOrSet(
/// ^ ^ ^
/// aligned_offset range.left range.right
/// [___] [__________] <-- current cache (example)
range.left = aligned_offset;
result_range.left = aligned_offset;
}
else
{
@ -614,10 +611,10 @@ FileCache::getOrSet(
/// ^
/// prefix_file_segments.back().right
chassert(prefix_file_segments.back()->range().right < range.left);
chassert(prefix_file_segments.back()->range().right < result_range.left);
chassert(prefix_file_segments.back()->range().right >= aligned_offset);
range.left = prefix_file_segments.back()->range().right + 1;
result_range.left = prefix_file_segments.back()->range().right + 1;
}
}
@ -630,11 +627,11 @@ FileCache::getOrSet(
/// [___]
/// ^---^
/// uncovered_suffix
const bool has_uncovered_suffix = file_segments.empty() || file_segments.back()->range().right < range.right;
const bool has_uncovered_suffix = file_segments.empty() || file_segments.back()->range().right < result_range.right;
if (range.right < aligned_end_offset && has_uncovered_suffix)
if (result_range.right < aligned_end_offset && has_uncovered_suffix)
{
auto suffix_range = FileSegment::Range(range.right, aligned_end_offset);
auto suffix_range = FileSegment::Range(result_range.right, aligned_end_offset);
/// We need to get 1 file segment, so file_segments_limit = 1 here.
auto suffix_file_segments = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);
@ -645,7 +642,7 @@ FileCache::getOrSet(
/// range.left range.right aligned_end_offset
/// [___] [___] <-- current cache (example)
range.right = aligned_end_offset;
result_range.right = aligned_end_offset;
}
else
{
@ -655,31 +652,33 @@ FileCache::getOrSet(
/// [___] [___] [_________] <-- current cache (example)
/// ^
/// suffix_file_segments.front().left
range.right = suffix_file_segments.front()->range().left - 1;
result_range.right = suffix_file_segments.front()->range().left - 1;
}
}
if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, create_settings);
auto ranges = splitRange(result_range.left, initial_range.size() + (initial_range.left - result_range.left), result_range.size());
size_t file_segments_count = file_segments.size();
file_segments.splice(file_segments.end(), createFileSegmentsFromRanges(*locked_key, ranges, file_segments_count, file_segments_limit, create_settings));
}
else
{
chassert(file_segments.front()->range().right >= range.left);
chassert(file_segments.back()->range().left <= range.right);
chassert(file_segments.front()->range().right >= result_range.left);
chassert(file_segments.back()->range().left <= result_range.right);
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, create_settings);
*locked_key, file_segments, result_range, offset + size - 1, file_segments_limit, /* fill_with_detached */false, create_settings);
if (!file_segments.front()->range().contains(offset))
if (!file_segments.front()->range().contains(result_range.left))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} to include {} "
"(end offset: {}, aligned offset: {}, aligned end offset: {})",
file_segments.front()->range().toString(), offset, range.right, aligned_offset, aligned_end_offset);
file_segments.front()->range().toString(), offset, result_range.right, aligned_offset, aligned_end_offset);
}
}
chassert(file_segments_limit ? file_segments.back()->range().left <= range.right : file_segments.back()->range().contains(range.right));
chassert(file_segments_limit ? file_segments.back()->range().left <= result_range.right : file_segments.back()->range().contains(result_range.right));
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -713,7 +712,7 @@ FileSegmentsHolderPtr FileCache::get(
}
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */true, CreateFileSegmentSettings{});
*locked_key, file_segments, range, offset + size - 1, file_segments_limit, /* fill_with_detached */true, CreateFileSegmentSettings{});
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));

View File

@ -263,17 +263,12 @@ private:
/// Split range into subranges by max_file_segment_size,
/// each subrange size must be less or equal to max_file_segment_size.
std::vector<FileSegment::Range> splitRange(size_t offset, size_t size);
std::vector<FileSegment::Range> splitRange(size_t offset, size_t size, size_t aligned_size);
/// Split range into subranges by max_file_segment_size (same as in splitRange())
/// and create a new file segment for each subrange.
/// If `file_segments_limit` > 0, create no more than first file_segments_limit
/// file segments.
FileSegments splitRangeIntoFileSegments(
FileSegments createFileSegmentsFromRanges(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
const std::vector<FileSegment::Range> & ranges,
size_t & file_segments_count,
size_t file_segments_limit,
const CreateFileSegmentSettings & create_settings);
@ -281,6 +276,7 @@ private:
LockedKey & locked_key,
FileSegments & file_segments,
const FileSegment::Range & range,
size_t non_aligned_right_offset,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings);

View File

@ -1008,7 +1008,12 @@ FileSegment & FileSegmentsHolder::add(FileSegmentPtr && file_segment)
return *file_segments.back();
}
String FileSegmentsHolder::toString()
String FileSegmentsHolder::toString(bool with_state)
{
return DB::toString(file_segments, with_state);
}
String toString(const FileSegments & file_segments, bool with_state)
{
String ranges;
for (const auto & file_segment : file_segments)
@ -1018,6 +1023,8 @@ String FileSegmentsHolder::toString()
ranges += file_segment->range().toString();
if (file_segment->isUnbound())
ranges += "(unbound)";
if (with_state)
ranges += "(" + FileSegment::stateToString(file_segment->state()) + ")";
}
return ranges;
}

View File

@ -291,7 +291,7 @@ struct FileSegmentsHolder : private boost::noncopyable
size_t size() const { return file_segments.size(); }
String toString();
String toString(bool with_state = false);
void popFront() { completeAndPopFrontImpl(); }
@ -317,4 +317,6 @@ private:
using FileSegmentsHolderPtr = std::unique_ptr<FileSegmentsHolder>;
String toString(const FileSegments & file_segments, bool with_state = false);
}

View File

@ -657,7 +657,7 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
{
if (auto ctx = context.lock())
{
res.query_settings = std::make_shared<Settings>(ctx->getSettingsRef());
res.query_settings = std::make_shared<Settings>(ctx->getSettingsCopy());
res.current_database = ctx->getCurrentDatabase();
}
}

View File

@ -113,13 +113,13 @@ std::stack<const QueryNode *> getSupportingParallelReplicasQuery(const IQueryTre
return res;
}
class ReplaceTableNodeToDummyVisitor : public InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>
class ReplaceTableNodeToDummyVisitor : public InDepthQueryTreeVisitorWithContext<ReplaceTableNodeToDummyVisitor>
{
public:
using Base = InDepthQueryTreeVisitor<ReplaceTableNodeToDummyVisitor, true>;
using Base = InDepthQueryTreeVisitorWithContext<ReplaceTableNodeToDummyVisitor>;
using Base::Base;
void visitImpl(const QueryTreeNodePtr & node)
void enterImpl(QueryTreeNodePtr & node)
{
auto * table_node = node->as<TableNode>();
auto * table_function_node = node->as<TableFunctionNode>();
@ -134,21 +134,19 @@ public:
ColumnsDescription(storage_snapshot->getColumns(get_column_options)),
storage_snapshot);
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), context);
auto dummy_table_node = std::make_shared<TableNode>(std::move(storage_dummy), getContext());
dummy_table_node->setAlias(node->getAlias());
replacement_map.emplace(node.get(), std::move(dummy_table_node));
}
}
ContextPtr context;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
};
QueryTreeNodePtr replaceTablesWithDummyTables(const QueryTreeNodePtr & query, const ContextPtr & context)
QueryTreeNodePtr replaceTablesWithDummyTables(QueryTreeNodePtr query, const ContextPtr & context)
{
ReplaceTableNodeToDummyVisitor visitor;
visitor.context = context;
ReplaceTableNodeToDummyVisitor visitor(context);
visitor.visit(query);
return query->cloneAndReplace(visitor.replacement_map);

View File

@ -13,7 +13,7 @@ using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
struct SelectQueryOptions;
/// Find a qury which can be executed with parallel replicas up to WithMergableStage.
/// Find a query which can be executed with parallel replicas up to WithMergableStage.
/// Returned query will always contain some (>1) subqueries, possibly with joins.
const QueryNode * findQueryForParallelReplicas(const QueryTreeNodePtr & query_tree_node, SelectQueryOptions & select_query_options);

View File

@ -40,10 +40,10 @@ ColumnsDescription StorageSystemClusters::getColumnsDescription()
return description;
}
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8> columns_mask) const
{
for (const auto & name_and_cluster : context->getClusters())
writeCluster(res_columns, name_and_cluster, {});
writeCluster(res_columns, columns_mask, name_and_cluster, /* replicated= */ nullptr);
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & name_and_database : databases)
@ -52,18 +52,15 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr co
{
if (auto database_cluster = replicated->tryGetCluster())
writeCluster(res_columns, {name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
writeCluster(res_columns, columns_mask, {name_and_database.first, database_cluster}, replicated);
if (auto database_cluster = replicated->tryGetAllGroupsCluster())
writeCluster(res_columns, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster},
replicated->tryGetAreReplicasActive(database_cluster));
writeCluster(res_columns, columns_mask, {DatabaseReplicated::ALL_GROUPS_CLUSTER_PREFIX + name_and_database.first, database_cluster}, replicated);
}
}
}
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster,
const std::vector<UInt8> & is_active)
void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const std::vector<UInt8> & columns_mask, const NameAndCluster & name_and_cluster, const DatabaseReplicated * replicated)
{
const String & cluster_name = name_and_cluster.first;
const ClusterPtr & cluster = name_and_cluster.second;
@ -79,30 +76,55 @@ void StorageSystemClusters::writeCluster(MutableColumns & res_columns, const Nam
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
{
size_t i = 0;
size_t src_index = 0, res_index = 0;
const auto & address = shard_addresses[replica_index];
res_columns[i++]->insert(cluster_name);
res_columns[i++]->insert(shard_info.shard_num);
res_columns[i++]->insert(shard_info.weight);
res_columns[i++]->insert(shard_info.has_internal_replication);
res_columns[i++]->insert(replica_index + 1);
res_columns[i++]->insert(address.host_name);
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String());
res_columns[i++]->insert(address.port);
res_columns[i++]->insert(address.is_local);
res_columns[i++]->insert(address.user);
res_columns[i++]->insert(address.default_database);
res_columns[i++]->insert(pool_status[replica_index].error_count);
res_columns[i++]->insert(pool_status[replica_index].slowdown_count);
res_columns[i++]->insert(pool_status[replica_index].estimated_recovery_time.count());
res_columns[i++]->insert(address.database_shard_name);
res_columns[i++]->insert(address.database_replica_name);
if (is_active.empty())
res_columns[i++]->insertDefault();
else
res_columns[i++]->insert(is_active[replica_idx++]);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(cluster_name);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(shard_info.shard_num);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(shard_info.weight);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(shard_info.has_internal_replication);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(replica_index + 1);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.host_name);
if (columns_mask[src_index++])
{
auto resolved = address.getResolvedAddress();
res_columns[res_index++]->insert(resolved ? resolved->host().toString() : String());
}
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.port);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.is_local);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.user);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.default_database);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(pool_status[replica_index].error_count);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(pool_status[replica_index].slowdown_count);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(pool_status[replica_index].estimated_recovery_time.count());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.database_shard_name);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(address.database_replica_name);
if (columns_mask[src_index++])
{
std::vector<UInt8> is_active;
if (replicated)
is_active = replicated->tryGetAreReplicasActive(name_and_cluster.second);
if (is_active.empty())
res_columns[res_index++]->insertDefault();
else
res_columns[res_index++]->insert(is_active[replica_idx++]);
}
}
}
}

View File

@ -10,6 +10,7 @@ namespace DB
class Context;
class Cluster;
class DatabaseReplicated;
/** Implements system table 'clusters'
* that allows to obtain information about available clusters
@ -26,8 +27,9 @@ protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
using NameAndCluster = std::pair<String, std::shared_ptr<Cluster>>;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
static void writeCluster(MutableColumns & res_columns, const NameAndCluster & name_and_cluster, const std::vector<UInt8> & is_active);
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8> columns_mask) const override;
static void writeCluster(MutableColumns & res_columns, const std::vector<UInt8> & columns_mask, const NameAndCluster & name_and_cluster, const DatabaseReplicated * replicated);
bool supportsColumnsMask() const override { return true; }
};
}

View File

@ -681,6 +681,7 @@ class FailureReason(enum.Enum):
BUILD = "not running for current build"
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
SHARED_MERGE_TREE = "no-shared-merge-tree"
DISTRIBUTED_CACHE = "distributed-cache"
# UNKNOWN reasons
NO_REFERENCE = "no reference file"
@ -1191,6 +1192,9 @@ class TestCase:
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
elif tags and ("no-distributed-cache" in tags) and args.distributed_cache:
return FailureReason.DISTRIBUTED_CACHE
elif (
tags
and ("atomic-database" in tags)
@ -2218,7 +2222,6 @@ def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool
args, test_suite, client_options, server_logs_level
)
test_result = test_case.process_result(test_result, MESSAGES)
break
except TimeoutError:
break
finally:
@ -3204,6 +3207,12 @@ def parse_args():
default=False,
help="Run tests over s3 storage",
)
parser.add_argument(
"--distributed-cache",
action="store_true",
default=False,
help="Run tests with enabled distributed cache",
)
parser.add_argument(
"--azure-blob-storage",
action="store_true",

View File

@ -20,6 +20,7 @@
<disk>s3_disk</disk>
<path>s3_cache/</path>
<max_size>104857600</max_size>
<max_file_segment_size>5Mi</max_file_segment_size>
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<cache_policy>LRU</cache_policy>

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
rabbitmq1:
image: rabbitmq:3.12.6-management-alpine
image: rabbitmq:3.12.6-alpine
hostname: rabbitmq1
expose:
- ${RABBITMQ_PORT:-5672}

View File

@ -2371,7 +2371,7 @@ class ClickHouseCluster:
time.sleep(0.5)
raise Exception("Cannot wait PostgreSQL Java Client container")
def wait_rabbitmq_to_start(self, timeout=30):
def wait_rabbitmq_to_start(self, timeout=60):
self.print_all_docker_pieces()
self.rabbitmq_ip = self.get_instance_ip(self.rabbitmq_host)
@ -2399,7 +2399,7 @@ class ClickHouseCluster:
)
rabbitmq_debuginfo(self.rabbitmq_docker_id, self.rabbitmq_cookie)
except Exception as e:
logging.debug("Unable to get logs from docker.")
logging.debug(f"Unable to get logs from docker: {e}.")
raise Exception("Cannot wait RabbitMQ container")
def wait_nats_is_available(self, max_retries=5):
@ -2748,11 +2748,13 @@ class ClickHouseCluster:
images_pull_cmd = self.base_cmd + ["pull"]
# sometimes dockerhub/proxy can be flaky
retry(
log_function=lambda exception: logging.info(
"Got exception pulling images: %s", exception
),
)(run_and_check)(images_pull_cmd)
def logging_pulling_images(**kwargs):
if "exception" in kwargs:
logging.info(
"Got exception pulling images: %s", kwargs["exception"]
)
retry(log_function=logging_pulling_images)(run_and_check)(images_pull_cmd)
if self.with_zookeeper_secure and self.base_zookeeper_cmd:
logging.debug("Setup ZooKeeper Secure")
@ -3025,11 +3027,17 @@ class ClickHouseCluster:
"Trying to create Azurite instance by command %s",
" ".join(map(str, azurite_start_cmd)),
)
retry(
log_function=lambda exception: logging.info(
def logging_azurite_initialization(exception, retry_number, sleep_time):
logging.info(
f"Azurite initialization failed with error: {exception}"
),
)(run_and_check)(azurite_start_cmd)
)
retry(
log_function=logging_azurite_initialization,
)(
run_and_check
)(azurite_start_cmd)
self.up_called = True
logging.info("Trying to connect to Azurite")
self.wait_azurite_to_start()

View File

@ -8,7 +8,7 @@ def retry(
delay: float = 1,
backoff: float = 1.5,
jitter: float = 2,
log_function=lambda *args, **kwargs: None,
log_function=None, # should take **kwargs or arguments: `retry_number`, `exception` and `sleep_time`
retriable_expections_list: List[Type[BaseException]] = [Exception],
):
def inner(func):
@ -26,8 +26,11 @@ def retry(
break
if not should_retry or (retry == retries - 1):
raise e
log_function(retry=retry, exception=e)
sleep_time = current_delay + random.uniform(0, jitter)
if log_function is not None:
log_function(
retry_number=retry, exception=e, sleep_time=sleep_time
)
time.sleep(sleep_time)
current_delay *= backoff

View File

@ -13,9 +13,8 @@ node = cluster.add_instance(
)
system_logs = [
# disabled by default
("system.text_log", 0),
# enabled by default
("system.text_log", 1),
("system.query_log", 1),
("system.query_thread_log", 1),
("system.part_log", 1),

View File

@ -76,7 +76,7 @@ def get_used_disks_for_table(node, table_name, partition=None):
)
def check_used_disks_with_retry(node, table_name, expected_disks, retries):
def check_used_disks_with_retry(node, table_name, expected_disks, retries=1):
for _ in range(retries):
used_disks = get_used_disks_for_table(node, table_name)
if set(used_disks).issubset(expected_disks):
@ -1613,7 +1613,7 @@ def test_alter_with_merge_work(started_cluster, name, engine, positive):
ALTER TABLE {name} MODIFY
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
d1 + INTERVAL 5 SECOND TO VOLUME 'external',
d1 + INTERVAL 10 SECOND DELETE
d1 + INTERVAL 30 SECOND DELETE
""".format(
name=name
)
@ -1635,11 +1635,19 @@ def test_alter_with_merge_work(started_cluster, name, engine, positive):
optimize_table(20)
if positive:
assert check_used_disks_with_retry(node1, name, set(["external"]), 100)
assert check_used_disks_with_retry(
node1, name, set(["external"])
), "Parts: " + node1.query(
f"SELECT disk_name, name FROM system.parts WHERE table = '{name}' AND active = 1"
)
else:
assert check_used_disks_with_retry(node1, name, set(["jbod1", "jbod2"]), 50)
assert check_used_disks_with_retry(
node1, name, set(["jbod1", "jbod2"])
), "Parts: " + node1.query(
f"SELECT disk_name, name FROM system.parts WHERE table = '{name}' AND active = 1"
)
time.sleep(5)
time.sleep(25)
optimize_table(20)

View File

@ -2,23 +2,16 @@
import os, itertools, urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse, sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
def get_ch_answer(query):
return (
urllib.request.urlopen(
os.environ.get(
"CLICKHOUSE_URL",
"http://localhost:" + os.environ.get("CLICKHOUSE_PORT_HTTP", "8123"),
),
data=query.encode(),
)
.read()
.decode()
)
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
ch_answer = client.query(query)
if ch_answer.strip() != answer.strip():
print("FAIL on query:", query)
print("Expected answer:", answer)

View File

@ -8,6 +8,18 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
# Wait when the dictionary will update the value for 13 on its own:
function wait_for_dict_upate()
{
for ((i = 0; i < 100; ++i)); do
if [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" != -1 ]; then
return 0
fi
sleep 0.5
done
return 1
}
$CLICKHOUSE_CLIENT <<EOF
CREATE TABLE ${CLICKHOUSE_DATABASE}.table(x Int64, y Int64, insert_time DateTime) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (12, 102, now());
@ -19,7 +31,7 @@ CREATE DICTIONARY ${CLICKHOUSE_DATABASE}.dict
insert_time DateTime
)
PRIMARY KEY x
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table' DB '${CLICKHOUSE_DATABASE}' UPDATE_FIELD 'insert_time'))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table' DB '${CLICKHOUSE_DATABASE}' UPDATE_FIELD 'insert_time' UPDATE_LAG 60))
LAYOUT(FLAT())
LIFETIME(1);
EOF
@ -29,11 +41,10 @@ $CLICKHOUSE_CLIENT --query "SELECT '12 -> ', dictGetInt64('${CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (13, 103, now())"
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (14, 104, now() - INTERVAL 1 DAY)"
# Wait when the dictionary will update the value for 13 on its own:
while [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" = -1 ]
do
sleep 0.5
done
if ! wait_for_dict_upate; then
echo "Dictionary had not been reloaded" >&2
exit 1
fi
$CLICKHOUSE_CLIENT --query "SELECT '13 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"

View File

@ -1,5 +1,5 @@
-- Tags: no-tsan, no-msan, long
-- too long.
-- Tags: long, no-tsan, no-msan, no-distributed-cache
-- Too long for TSan and MSan
set enable_filesystem_cache=0;
set enable_filesystem_cache_on_write_operations=0;

View File

@ -1,4 +1,4 @@
-- Tags: no-random-merge-tree-settings, no-random-settings, no-tsan, no-debug, no-object-storage, long
-- Tags: long, no-random-merge-tree-settings, no-random-settings, no-tsan, no-debug, no-object-storage, no-distributed-cache
-- no-tsan: too slow
-- no-object-storage: for remote tables we use thread pool even when reading with one stream, so memory consumption is higher

View File

@ -52,3 +52,10 @@
{1:4,2:5}
{1:4,2:5}
{1:4,2:5}
{1:3,2:4}
{1:3,2:4}
{1:3,2:4} {(1,3):'a',(2,4):'b'}
{(1,'a'):'c',(2,'b'):'d'}
{(1,'a'):'c',(2,'b'):'d'}
{(1,'a'):'c',(2,'b'):'d'}
{(1,'a'):'c',(2,'b'):'d'}

View File

@ -67,12 +67,20 @@ select mapFromArrays(['aa', 'bb'], [4, 5, 6]); -- { serverError SIZES_OF_ARRAYS_
select mapFromArrays([[1,2], [3,4]], [4, 5, 6]); -- { serverError SIZES_OF_ARRAYS_DONT_MATCH }
select mapFromArrays(['a', 2], [4, 5]); -- { serverError NO_COMMON_TYPE}
select mapFromArrays([1, 2], [4, 'a']); -- { serverError NO_COMMON_TYPE}
select mapFromArrays(['aa', 'bb'], map('a', 4)); -- { serverError SIZES_OF_ARRAYS_DONT_MATCH }
select mapFromArrays([1,null]::Array(Nullable(UInt8)), [3,4]); -- { serverError BAD_ARGUMENTS }
select mapFromArrays(['aa', 'bb'], map('a', 4, 'b', 5));
select mapFromArrays(['aa', 'bb'], materialize(map('a', 4, 'b', 5))) from numbers(2);
select mapFromArrays(map('a', 4, 'b', 4), ['aa', 'bb']) from numbers(2); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
select mapFromArrays(['aa', 'bb'], map('a', 4)); -- { serverError SIZES_OF_ARRAYS_DONT_MATCH }
select mapFromArrays([toLowCardinality(1), toLowCardinality(2)], [4, 5]);
select mapFromArrays([toLowCardinality(1), toLowCardinality(2)], materialize([4, 5])) from numbers(2);
select mapFromArrays([1,2], [3,4]);
select mapFromArrays([1,2]::Array(Nullable(UInt8)), [3,4]);
select mapFromArrays([1,2], [3,4]) as x, mapFromArrays(x, ['a', 'b']);
select mapFromArrays(map(1, 'a', 2, 'b'), array('c', 'd'));
select mapFromArrays(materialize(map(1, 'a', 2, 'b')), array('c', 'd'));
select mapFromArrays(map(1, 'a', 2, 'b'), materialize(array('c', 'd')));
select mapFromArrays(materialize(map(1, 'a', 2, 'b')), materialize(array('c', 'd')));

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-random-settings, no-distributed-cache
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -6,7 +6,7 @@ import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from pure_http_client import ClickHouseClient
from pure_http_client import ClickHouseClient, requests_session_with_retries
class Tester:
@ -195,7 +195,7 @@ def main():
default_index_granularity = 10
total_rows = 7 * default_index_granularity
step = default_index_granularity
session = requests.Session()
session = requests_session_with_retries()
for index_granularity in [
default_index_granularity - 1,
default_index_granularity,

View File

@ -6,7 +6,7 @@ import sys
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
from pure_http_client import ClickHouseClient
from pure_http_client import ClickHouseClient, requests_session_with_retries
class Tester:
@ -161,7 +161,7 @@ def main():
default_index_granularity = 10
total_rows = 8 * default_index_granularity
step = default_index_granularity
session = requests.Session()
session = requests_session_with_retries()
for index_granularity in [default_index_granularity - 1, default_index_granularity]:
tester = Tester(session, url, index_granularity, total_rows)
# Test combinations of ranges of columns c and d

View File

@ -1,3 +1,5 @@
-- Tags: no-distributed-cache
SET min_bytes_to_use_direct_io='1Gi'; -- It does not work (fixme)
SET local_filesystem_read_method='pread'; -- ui_uring local_fs_method does not work here (fixme)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-random-settings
# Tags: no-fasttest, no-random-settings, no-distributed-cache
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage, no-distributed-cache
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage, no-distributed-cache
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage, no-distributed-cache
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage
# Tags: long, no-debug, no-asan, no-tsan, no-msan, no-ubsan, no-sanitize-coverage, no-distributed-cache
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,4 +1,4 @@
-- Tags: long, no-random-merge-tree-settings
-- Tags: long, no-random-merge-tree-settings, no-distributed-cache
-- no-random-merge-tree-settings - times out in private
DROP TABLE IF EXISTS build;

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,4 @@
-- Tags: no-fasttest
-- https://github.com/ClickHouse/ClickHouse/issues/67303
SELECT uniqTheta(tuple());
SELECT uniq(tuple());

View File

@ -0,0 +1 @@
SELECT tuple() IN tuple(1) SETTINGS allow_experimental_map_type = 1; -- { serverError INCORRECT_ELEMENT_OF_SET }

View File

@ -0,0 +1,5 @@
-- The default limit works.
SELECT * FROM format("JSONCompactEachRow", 'x UInt32, y UInt32', REPEAT('[1,1,', 100000)) SETTINGS input_format_json_compact_allow_variable_number_of_columns = 1; -- { serverError TOO_DEEP_RECURSION, INCORRECT_DATA }
-- Even if we relax the limit, it is also safe.
SET input_format_json_max_depth = 100000;
SELECT * FROM format("JSONCompactEachRow", 'x UInt32, y UInt32', REPEAT('[1,1,', 100000)) SETTINGS input_format_json_compact_allow_variable_number_of_columns = 1; -- { serverError TOO_DEEP_RECURSION, INCORRECT_DATA }

View File

@ -0,0 +1,15 @@
create table t (number UInt64) engine MergeTree order by number;
SELECT 1
FROM
(
SELECT number IN (
SELECT number
FROM view(
SELECT number
FROM numbers(1)
)
)
FROM t
)
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, max_parallel_replicas = 2, allow_experimental_analyzer = 1; -- { serverError CLUSTER_DOESNT_EXIST }

View File

@ -0,0 +1 @@
10000000

View File

@ -0,0 +1,3 @@
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));

View File

@ -1,7 +1,8 @@
import os
import io
import sys
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import pandas as pd
@ -18,7 +19,7 @@ class ClickHouseClient:
self.host = host
def query(
self, query, connection_timeout=1500, settings=dict(), binary_result=False
self, query, connection_timeout=500, settings=dict(), binary_result=False
):
NUMBER_OF_TRIES = 30
DELAY = 10
@ -47,12 +48,12 @@ class ClickHouseClient:
else:
raise ValueError(r.text)
def query_return_df(self, query, connection_timeout=1500):
def query_return_df(self, query, connection_timeout=500):
data = self.query(query, connection_timeout)
df = pd.read_csv(io.StringIO(data), sep="\t")
return df
def query_with_data(self, query, data, connection_timeout=1500, settings=dict()):
def query_with_data(self, query, data, connection_timeout=500, settings=dict()):
params = {
"query": query,
"timeout_before_checking_execution_speed": 120,
@ -77,3 +78,17 @@ class ClickHouseClient:
return result
else:
raise ValueError(r.text)
def requests_session_with_retries(retries=3, timeout=180):
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.timeout = timeout
return session