Merge remote-tracking branch 'upstream/master' into remove-unused-code-1

This commit is contained in:
Anton Popov 2020-12-23 20:38:29 +03:00
commit c8cdaba180
67 changed files with 678 additions and 262 deletions

View File

@ -71,6 +71,10 @@
# define BOOST_USE_UCONTEXT 1
#endif
#if defined(ARCADIA_BUILD) && defined(BOOST_USE_UCONTEXT)
# undef BOOST_USE_UCONTEXT
#endif
/// TODO: Strange enough, there is no way to detect UB sanitizer.
/// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute.

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 08974cc024b2e748f5b1d45415396706b3521d0f
Subproject commit 2c32e17c7dfee1f8bf24227b697cdef5fddf0823

View File

@ -2,12 +2,6 @@
set(ROCKSDB_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb")
list(APPEND CMAKE_MODULE_PATH "${ROCKSDB_SOURCE_DIR}/cmake/modules/")
find_program(CCACHE_FOUND ccache)
if(CCACHE_FOUND)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
endif(CCACHE_FOUND)
if (SANITIZE STREQUAL "undefined")
set(WITH_UBSAN ON)
elseif (SANITIZE STREQUAL "address")

View File

@ -1855,6 +1855,18 @@ Default value: `0`.
- [Distributed Table Engine](../../engines/table-engines/special/distributed.md#distributed)
- [Managing Distributed Tables](../../sql-reference/statements/system.md#query-language-system-distributed)
## insert_distributed_one_random_shard {#insert_distributed_one_random_shard}
Enables or disables random shard insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table when there is no distributed key.
By default, when inserting data into a `Distributed` table with more than one shard, the ClickHouse server will any insertion request if there is no distributed key. When `insert_distributed_one_random_shard = 1`, insertions are allowed and data is forwarded randomly among all shards.
Possible values:
- 0 — Insertion is rejected if there are multiple shards and no distributed key is given.
- 1 — Insertion is done randomly among all available shards when no distributed key is given.
Default value: `0`.
## use_compact_format_in_distributed_parts_names {#use_compact_format_in_distributed_parts_names}

View File

@ -39,7 +39,7 @@ then
then
sleep 1m
# https://api.cloudflare.com/#zone-purge-files-by-cache-tags,-host-or-prefix
POST_DATA='{"hosts":"clickhouse.tech"}'
POST_DATA='{"hosts":["content.clickhouse.tech"]}'
curl -X POST "https://api.cloudflare.com/client/v4/zones/4fc6fb1d46e87851605aa7fa69ca6fe0/purge_cache" -H "Authorization: Bearer ${CLOUDFLARE_TOKEN}" -H "Content-Type:application/json" --data "${POST_DATA}"
fi
fi

View File

@ -87,7 +87,7 @@ public:
{
/// A more understandable error message.
if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value.", e.code());
throw DB::ParsingException("File " + path + " is empty. You must fill it manually with appropriate value.", e.code());
else
throw;
}

View File

@ -450,5 +450,49 @@ ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_
return ExecutionStatus(getCurrentExceptionCode(), msg);
}
ParsingException::ParsingException()
{
Exception::message(Exception::message() + "{}");
}
ParsingException::ParsingException(const std::string & msg, int code)
: Exception(msg, code)
{
Exception::message(Exception::message() + "{}");
}
ParsingException::ParsingException(int code, const std::string & message)
: Exception(message, code)
{
Exception::message(Exception::message() + "{}");
}
/// We use additional field formatted_message_ to make this method const.
std::string ParsingException::displayText() const
{
try
{
if (line_number_ == -1)
formatted_message_ = fmt::format(message(), "");
else
formatted_message_ = fmt::format(message(), fmt::format(": (at row {})\n", line_number_));
}
catch (...)
{}
if (!formatted_message_.empty())
{
std::string result = name();
result.append(": ");
result.append(formatted_message_);
return result;
}
else
{
return Exception::displayText();
}
}
}

View File

@ -96,6 +96,38 @@ private:
};
/// Special class of exceptions, used mostly in ParallelParsingInputFormat for
/// more convinient calculation of problem line number.
class ParsingException : public Exception
{
public:
ParsingException();
ParsingException(const std::string & msg, int code);
ParsingException(int code, const std::string & message);
// Format message with fmt::format, like the logging functions.
template <typename ...Args>
ParsingException(int code, const std::string & fmt, Args&&... args)
: Exception(fmt::format(fmt, std::forward<Args>(args)...), code)
{
Exception::message(Exception::message() + "{}");
}
std::string displayText() const override;
int getLineNumber() { return line_number_; }
void setLineNumber(int line_number) { line_number_ = line_number;}
private:
ssize_t line_number_{-1};
mutable std::string formatted_message_;
const char * name() const throw() override { return "DB::ParsingException"; }
const char * className() const throw() override { return "DB::ParsingException"; }
};
using Exceptions = std::vector<std::exception_ptr>;

View File

@ -55,6 +55,13 @@ void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
max_threads = value;
}
template <typename Thread>
size_t ThreadPoolImpl<Thread>::getMaxThreads() const
{
std::lock_guard lock(mutex);
return max_threads;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{

View File

@ -71,6 +71,7 @@ public:
void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);
void setQueueSize(size_t value);
size_t getMaxThreads() const;
private:
mutable std::mutex mutex;

View File

@ -490,7 +490,9 @@ class IColumn;
\
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0)
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.

View File

@ -106,7 +106,7 @@ Block NativeBlockInputStream::readImpl()
if (istr.eof())
{
if (use_index)
throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);
return res;
}

View File

@ -126,8 +126,11 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatu
// Segmentating the original input.
unit.segment.resize(0);
const bool have_more_data = file_segmentation_engine(original_buffer,
unit.segment, min_chunk_bytes);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(
original_buffer, unit.segment, min_chunk_bytes);
unit.offset = successfully_read_rows_count;
successfully_read_rows_count += currently_read_rows;
unit.is_last = !have_more_data;
unit.status = READY_TO_PARSE;
@ -142,7 +145,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatu
}
catch (...)
{
onBackgroundException();
onBackgroundException(successfully_read_rows_count);
}
}
@ -157,11 +160,11 @@ void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr
setThreadName("ChunkParser");
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
try
{
const auto current_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[current_unit_number];
/*
* This is kind of suspicious -- the input_process_creator contract with
* respect to multithreaded use is not clear, but we hope that it is
@ -195,19 +198,22 @@ void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr
}
catch (...)
{
onBackgroundException();
onBackgroundException(unit.offset);
}
}
void ParallelParsingBlockInputStream::onBackgroundException()
void ParallelParsingBlockInputStream::onBackgroundException(size_t offset)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
std::unique_lock<std::mutex> lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
if (ParsingException * e = exception_cast<ParsingException *>(background_exception))
if (e->getLineNumber() != -1)
e->setLineNumber(e->getLineNumber() + offset);
}
tryLogCurrentException(__PRETTY_FUNCTION__);
finished = true;
reader_condvar.notify_all();
segmentator_condvar.notify_all();

View File

@ -149,6 +149,8 @@ private:
BlockExt block_ext;
Memory<> segment;
std::atomic<ProcessingUnitStatus> status;
/// Needed for better exception message.
size_t offset = 0;
bool is_last{false};
};
@ -159,6 +161,10 @@ private:
std::deque<ProcessingUnit> processing_units;
/// Compute it to have a more understandable error message.
size_t successfully_read_rows_count{0};
void scheduleParserThreadForUnitWithNumber(size_t ticket_number);
void finishAndWait();
@ -169,7 +175,7 @@ private:
// threads. This function is used by segmentator and parsed threads.
// readImpl() is called from the main thread, so the exception handling
// is different.
void onBackgroundException();
void onBackgroundException(size_t offset);
};
}

View File

@ -6,6 +6,7 @@ LIBRARY()
PEERDIR(
clickhouse/src/Common
contrib/libs/poco/MongoDB
contrib/restricted/boost/libs
)
NO_COMPILER_WARNINGS()

View File

@ -5,6 +5,7 @@ LIBRARY()
PEERDIR(
clickhouse/src/Common
contrib/libs/poco/MongoDB
contrib/restricted/boost/libs
)
NO_COMPILER_WARNINGS()

View File

@ -272,7 +272,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams(
/// Check consistency between offsets and elements subcolumns.
/// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER.
if (!nested_column.empty() && nested_column.size() != last_offset)
throw Exception("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset),
throw ParsingException("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset),
ErrorCodes::CANNOT_READ_ALL_DATA);
}
@ -325,7 +325,7 @@ static void deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && r
if (*istr.position() == ',')
++istr.position();
else
throw Exception(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT,
throw ParsingException(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT,
"Cannot read array from text, expected comma or end of array, found '{}'",
*istr.position());
}

View File

@ -235,7 +235,7 @@ ReturnType DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer
/// Little tricky, because we cannot discriminate null from first character.
if (istr.eof())
throw Exception("Unexpected end of stream, while parsing value of Nullable type", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream, while parsing value of Nullable type", ErrorCodes::CANNOT_READ_ALL_DATA);
/// This is not null, surely.
if (*istr.position() != '\\')
@ -250,7 +250,7 @@ ReturnType DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer
++istr.position();
if (istr.eof())
throw Exception("Unexpected end of stream, while parsing value of Nullable type, after backslash", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream, while parsing value of Nullable type, after backslash", ErrorCodes::CANNOT_READ_ALL_DATA);
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr]
@ -405,11 +405,11 @@ ReturnType DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & i
/// or if someone uses 'U' or 'L' as delimiter in CSV.
/// In the first case we cannot continue reading anyway. The second case seems to be unlikely.
if (settings.csv.delimiter == 'U' || settings.csv.delimiter == 'L')
throw DB::Exception("Enabled setting input_format_csv_unquoted_null_literal_as_null may not work correctly "
throw DB::ParsingException("Enabled setting input_format_csv_unquoted_null_literal_as_null may not work correctly "
"with format_csv_delimiter = 'U' or 'L' for large input.", ErrorCodes::CANNOT_READ_ALL_DATA);
WriteBufferFromOwnString parsed_value;
nested_data_type->serializeAsTextCSV(nested, nested.size() - 1, parsed_value, settings);
throw DB::Exception("Error while parsing \"" + std::string(null_literal, null_prefix_len)
throw DB::ParsingException("Error while parsing \"" + std::string(null_literal, null_prefix_len)
+ std::string(istr.position(), std::min(size_t{10}, istr.available())) + "\" as Nullable(" + nested_data_type->getName()
+ ") at position " + std::to_string(istr.count()) + ": expected \"NULL\" or " + nested_data_type->getName()
+ ", got \"" + std::string(null_literal, buf.count()) + "\", which was deserialized as \""

View File

@ -163,7 +163,7 @@ BlockInputStreamPtr FormatFactory::getInput(
// (segmentator + two parsers + reader).
bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4;
if (settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage)
if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage)
parallel_parsing = false;
if (parallel_parsing && name == "JSONEachRow")

View File

@ -54,7 +54,7 @@ public:
* Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format.
* Used in ParallelParsingBlockInputStream.
*/
using FileSegmentationEngine = std::function<bool(
using FileSegmentationEngine = std::function<std::pair<bool, size_t>(
ReadBuffer & buf,
DB::Memory<> & memory,
size_t min_chunk_bytes)>;

View File

@ -4,13 +4,14 @@
namespace DB
{
bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
skipWhitespaceIfAny(in);
char * pos = in.position();
size_t balance = 0;
bool quotes = false;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast<size_t>(pos - in.position()) < min_chunk_size))
{
@ -57,11 +58,14 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor
quotes = true;
++pos;
}
if (balance == 0)
++number_of_rows;
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
}

View File

@ -3,6 +3,6 @@
namespace DB
{
bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size);
}

View File

@ -96,7 +96,7 @@ void NO_INLINE throwAtAssertionFailed(const char * s, ReadBuffer & buf)
else
out << " before: " << quote << String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position()));
throw Exception(out.str(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
throw ParsingException(out.str(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
}
@ -503,7 +503,7 @@ static void readAnyQuotedStringInto(Vector & s, ReadBuffer & buf)
{
if (buf.eof() || *buf.position() != quote)
{
throw Exception(ErrorCodes::CANNOT_PARSE_QUOTED_STRING,
throw ParsingException(ErrorCodes::CANNOT_PARSE_QUOTED_STRING,
"Cannot parse quoted string: expected opening quote '{}', got '{}'",
std::string{quote}, buf.eof() ? "EOF" : std::string{*buf.position()});
}
@ -538,7 +538,7 @@ static void readAnyQuotedStringInto(Vector & s, ReadBuffer & buf)
parseComplexEscapeSequence(s, buf);
}
throw Exception("Cannot parse quoted string: expected closing quote",
throw ParsingException("Cannot parse quoted string: expected closing quote",
ErrorCodes::CANNOT_PARSE_QUOTED_STRING);
}
@ -716,7 +716,7 @@ ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf)
auto error = [](const char * message [[maybe_unused]], int code [[maybe_unused]])
{
if constexpr (throw_exception)
throw Exception(message, code);
throw ParsingException(message, code);
return ReturnType(false);
};
@ -861,7 +861,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D
s_pos[size] = 0;
if constexpr (throw_exception)
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
else
return false;
}
@ -899,7 +899,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D
else
{
if constexpr (throw_exception)
throw Exception("Cannot parse datetime", ErrorCodes::CANNOT_PARSE_DATETIME);
throw ParsingException("Cannot parse datetime", ErrorCodes::CANNOT_PARSE_DATETIME);
else
return false;
}

View File

@ -300,7 +300,7 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf)
else
{
if constexpr (throw_exception)
throw Exception("Unsigned type must not contain '-' symbol", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Unsigned type must not contain '-' symbol", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return ReturnType(false);
}
@ -648,7 +648,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf)
if constexpr (throw_exception)
{
throw Exception(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID);
throw ParsingException(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID);
}
else
{
@ -669,7 +669,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf)
if constexpr (throw_exception)
{
throw Exception(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID);
throw ParsingException(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID);
}
else
{
@ -824,7 +824,7 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf)
if (19 != size)
{
s[size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
}
datetime.year((s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0'));
@ -1016,7 +1016,7 @@ void readQuoted(std::vector<T> & x, ReadBuffer & buf)
if (*buf.position() == ',')
++buf.position();
else
throw Exception("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT);
throw ParsingException("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT);
}
first = false;
@ -1039,7 +1039,7 @@ void readDoubleQuoted(std::vector<T> & x, ReadBuffer & buf)
if (*buf.position() == ',')
++buf.position();
else
throw Exception("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT);
throw ParsingException("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT);
}
first = false;

View File

@ -99,7 +99,7 @@ ReturnType parseDateTimeBestEffortImpl(
auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]])
{
if constexpr (std::is_same_v<ReturnType, void>)
throw Exception(message, code);
throw ParsingException(message, code);
else
return false;
};

View File

@ -120,7 +120,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp
if (!tryReadIntText(addition_exp, buf))
{
if constexpr (_throw_on_error)
throw Exception("Cannot parse exponent while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot parse exponent while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
@ -133,7 +133,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp
if (digits_only)
{
if constexpr (_throw_on_error)
throw Exception("Unexpected symbol while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Unexpected symbol while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER);
return false;
}
stop = true;

View File

@ -160,7 +160,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf)
if (unlikely(res.ec != std::errc()))
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return ReturnType(false);
}
@ -243,7 +243,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf)
if (unlikely(res.ec != std::errc()))
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return ReturnType(false);
}
@ -331,7 +331,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
if (in.eof())
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
@ -387,7 +387,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
if (in.eof())
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value: nothing after exponent", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value: nothing after exponent", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
@ -425,7 +425,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
if (in.eof())
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value: no digits read", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value: no digits read", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
@ -436,14 +436,14 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
if (in.eof())
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value: nothing after plus sign", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value: nothing after plus sign", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
else if (negative)
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value: plus after minus sign", ErrorCodes::CANNOT_PARSE_NUMBER);
throw ParsingException("Cannot read floating point value: plus after minus sign", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}

View File

@ -911,15 +911,15 @@ template <typename Method>
Block Aggregator::convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
Method & method,
Arena * arena,
bool final,
size_t bucket) const
{
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &method, this] (
[bucket, &method, arena, this] (
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
convertToBlockImpl(method, method.data.impls[bucket],
@ -948,7 +948,7 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \
return {}; \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket); \
}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -980,7 +980,7 @@ void Aggregator::writeToTemporaryFileImpl(
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
Block block = convertOneBucketToBlock(data_variants, method, false, bucket);
Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
out.write(block);
update_max_sizes(block);
}
@ -1233,7 +1233,7 @@ Block Aggregator::prepareBlockAndFill(
}
}
filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final);
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
Block res = header.cloneEmpty();
@ -1300,7 +1300,6 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
@ -1315,7 +1314,8 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
}
else
{
insertAggregatesIntoColumns(data, final_aggregate_columns, arena);
/// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
}
if (params.overflow_row)
@ -1343,13 +1343,12 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
@ -1383,11 +1382,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
bool final,
ThreadPool * thread_pool) const
{
size_t max_threads = thread_pool ? thread_pool->getMaxThreads() : 1;
if (max_threads > data_variants.aggregates_pools.size())
for (size_t i = data_variants.aggregates_pools.size(); i < max_threads; ++i)
data_variants.aggregates_pools.push_back(std::make_shared<Arena>());
auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group)
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
return convertOneBucketToBlock(data_variants, method, final, bucket);
/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket) % max_threads;
Arena * arena = data_variants.aggregates_pools.at(thread_number).get();
return convertOneBucketToBlock(data_variants, method, arena, final, bucket);
};
/// packaged_task is used to ensure that exceptions are automatically thrown into the main stream.

View File

@ -1206,6 +1206,7 @@ protected:
Block convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
Method & method,
Arena * arena,
bool final,
size_t bucket) const;

View File

@ -132,6 +132,26 @@ Chunk IRowInputFormat::generate()
}
}
}
catch (ParsingException & e)
{
String verbose_diagnostic;
try
{
verbose_diagnostic = getDiagnosticInfo();
}
catch (const Exception & exception)
{
verbose_diagnostic = "Cannot get verbose diagnostic: " + exception.message();
}
catch (...)
{
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
}
e.setLineNumber(total_rows);
e.addMessage(verbose_diagnostic);
throw;
}
catch (Exception & e)
{
if (!isParseError(e.code()))

View File

@ -48,12 +48,12 @@ Chunk ArrowBlockInputFormat::generate()
}
if (!batch_result.ok())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of Arrow data: {}", batch_result.status().ToString());
auto table_result = arrow::Table::FromRecordBatches({*batch_result});
if (!table_result.ok())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA,
"Error while reading batch of Arrow data: {}", table_result.status().ToString());
++record_batch_current;

View File

@ -190,7 +190,7 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node
{
decoder.decodeString(tmp);
if (tmp.length() != 36)
throw Exception(std::string("Cannot parse uuid ") + tmp, ErrorCodes::CANNOT_PARSE_UUID);
throw ParsingException(std::string("Cannot parse uuid ") + tmp, ErrorCodes::CANNOT_PARSE_UUID);
UUID uuid;
parseUUID(reinterpret_cast<const UInt8 *>(tmp.data()), std::reverse_iterator<UInt8 *>(reinterpret_cast<UInt8 *>(&uuid) + 16));

View File

@ -424,11 +424,12 @@ void registerInputFormatProcessorCSV(FormatFactory & factory)
}
}
static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool quotes = false;
bool need_more_data = true;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
@ -458,6 +459,7 @@ static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory
}
else if (*pos == '\n')
{
++number_of_rows;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
@ -470,13 +472,16 @@ static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory
need_more_data = false;
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
{
++pos;
++number_of_rows;
}
}
}
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineCSV(FormatFactory & factory)

View File

@ -171,7 +171,7 @@ bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
if (file_column + 1 != column_indexes_for_input_fields.size())
{
assertChar(',', in);

View File

@ -173,7 +173,7 @@ inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index)
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
else if (*in.position() == '}')
{
++in.position();

View File

@ -38,7 +38,7 @@ Chunk ORCBlockInputFormat::generate()
std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->Read(&table);
if (!read_status.ok())
throw Exception{"Error while reading ORC data: " + read_status.ToString(),
throw ParsingException{"Error while reading ORC data: " + read_status.ToString(),
ErrorCodes::CANNOT_READ_ALL_DATA};
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC");

View File

@ -47,7 +47,7 @@ Chunk ParquetBlockInputFormat::generate()
std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
if (!read_status.ok())
throw Exception{"Error while reading Parquet data: " + read_status.ToString(),
throw ParsingException{"Error while reading Parquet data: " + read_status.ToString(),
ErrorCodes::CANNOT_READ_ALL_DATA};
++row_group_current;

View File

@ -173,10 +173,11 @@ void registerInputFormatProcessorRegexp(FormatFactory & factory)
});
}
static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
char * pos = in.position();
bool need_more_data = true;
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
@ -196,12 +197,12 @@ static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & mem
need_more_data = false;
++pos;
++number_of_rows;
}
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineRegexp(FormatFactory & factory)

View File

@ -89,7 +89,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
}
}
throw Exception("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA);
}
@ -157,7 +157,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
if (in.eof())
{
throw Exception("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA);
throw ParsingException("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA);
}
else if (*in.position() == '\t')
{

View File

@ -423,10 +423,11 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
}
}
static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
static std::pair<bool, size_t> fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size)
{
bool need_more_data = true;
char * pos = in.position();
size_t number_of_rows = 0;
while (loadAtPosition(in, memory, pos) && need_more_data)
{
@ -443,6 +444,9 @@ static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<>
}
else if (*pos == '\n' || *pos == '\r')
{
if (*pos == '\n')
++number_of_rows;
if (memory.size() + static_cast<size_t>(pos - in.position()) >= min_chunk_size)
need_more_data = false;
++pos;
@ -451,7 +455,7 @@ static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<>
saveUpToPosition(in, memory, pos);
return loadAtPosition(in, memory, pos);
return {loadAtPosition(in, memory, pos), number_of_rows};
}
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory)

View File

@ -489,7 +489,7 @@ void TemplateRowInputFormat::skipToNextDelimiterOrEof(const String & delimiter)
void TemplateRowInputFormat::throwUnexpectedEof()
{
throw Exception("Unexpected EOF while parsing row " + std::to_string(row_num) + ". "
throw ParsingException("Unexpected EOF while parsing row " + std::to_string(row_num) + ". "
"Maybe last row has wrong format or input doesn't contain specified suffix before EOF.",
ErrorCodes::CANNOT_READ_ALL_DATA);
}

View File

@ -138,11 +138,22 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
const Settings & settings = context.getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
writeAsyncImpl(block);
++inserted_blocks;
if (random_shard_insert)
{
writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo()));
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
writeAsyncImpl(block);
++inserted_blocks;
}
}
@ -175,18 +186,18 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block)
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context.getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
size_t num_shards = end - start;
remote_jobs_count = 0;
local_jobs_count = 0;
per_shard_jobs.resize(shards_info.size());
for (size_t shard_index : ext::range(0, shards_info.size()))
for (size_t shard_index : ext::range(start, end))
{
const auto & shard_info = shards_info[shard_index];
auto & shard_jobs = per_shard_jobs[shard_index];
@ -242,10 +253,11 @@ void DistributedBlockOutputStream::waitForJobs()
}
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block)
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block]()
return [this, thread_group, &job, &current_block, num_shards]()
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
@ -262,7 +274,6 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
});
const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
size_t num_shards = cluster->getShardsInfo().size();
auto & shard_job = per_shard_jobs[job.shard_index];
const auto & addresses = cluster->getShardsAddresses();
@ -356,12 +367,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0, end = shards_info.size();
if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block);
initWritingJobs(block, start, end);
pool.emplace(remote_jobs_count + local_jobs_count);
@ -394,7 +412,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block));
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
}
catch (...)
{

View File

@ -73,10 +73,10 @@ private:
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block);
void initWritingJobs(const Block & first_block);
void initWritingJobs(const Block & first_block, size_t start, size_t end);
struct JobReplica;
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block);
ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards);
void waitForJobs();

View File

@ -49,7 +49,9 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
, columns_list(columns_list_)
, settings(settings_)
, index_granularity(index_granularity_)
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity){}
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity)
{
}
Columns IMergeTreeDataPartWriter::releaseIndexColumns()
{

View File

@ -93,12 +93,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
result.emplace_back(Granule{
.start_row = current_row,
.granularity_rows = expected_rows_in_mark,
.block_rows = std::min(rows_left_in_block, expected_rows_in_mark),
.rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark),
.mark_number = current_mark,
.mark_on_start = true
.mark_on_start = true,
.is_complete = (rows_left_in_block >= expected_rows_in_mark)
});
current_row += expected_rows_in_mark;
current_row += result.back().rows_to_write;
current_mark++;
}
@ -173,8 +173,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
{
for (const auto & granule : granules)
{
if (granule.granularity_rows)
data_written = true;
data_written = true;
auto name_and_type = columns_list.begin();
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
@ -206,13 +205,13 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
writeIntBinary(plain_hashing.count(), marks);
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.granularity_rows);
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.rows_to_write);
/// Each type always have at least one substream
prev_stream->hashing_buf.next(); //-V522
}
writeIntBinary(granule.block_rows, marks);
writeIntBinary(granule.rows_to_write, marks);
}
}
@ -222,11 +221,11 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
{
auto block = header.cloneWithColumns(columns_buffer.releaseColumns());
auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), /* last_block = */ true);
if (!granules_to_write.back().isCompleted())
if (!granules_to_write.back().is_complete)
{
/// Correct last mark as it should contain exact amount of rows.
index_granularity.popMark();
index_granularity.appendMark(granules_to_write.back().block_rows);
index_granularity.appendMark(granules_to_write.back().rows_to_write);
}
writeDataBlockPrimaryIndexAndSkipIndices(block, granules_to_write);
}

View File

@ -218,6 +218,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
auto & stream = *skip_indices_streams[i];
for (const auto & granule : granules_to_write)
{
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_accumulated_marks[i] = 0;
}
if (skip_indices_aggregators[i]->empty() && granule.mark_on_start)
{
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
@ -234,18 +240,9 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
}
size_t pos = granule.start_row;
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows);
if (granule.isCompleted())
{
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.rows_to_write);
if (granule.is_complete)
++skip_index_accumulated_marks[i];
/// write index if it is filled
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_accumulated_marks[i] = 0;
}
}
}
}
}

View File

@ -20,25 +20,18 @@ struct Granule
{
/// Start row in block for granule
size_t start_row;
/// Amount of rows which granule have to contain according to index
/// granularity.
/// NOTE: Sometimes it's not equal to actually written rows, for example
/// for the last granule if it's smaller than computed granularity.
size_t granularity_rows;
/// Amount of rows from block which have to be written to disk from start_row
size_t block_rows;
size_t rows_to_write;
/// Global mark number in the list of all marks (index_granularity) for this part
size_t mark_number;
/// Should writer write mark for the first of this granule to disk.
/// NOTE: Sometimes we don't write mark for the start row, because
/// this granule can be continuation of the previous one.
bool mark_on_start;
/// Is this granule contain amout of rows equal to the value in index granularity
bool isCompleted() const
{
return granularity_rows == block_rows;
}
/// if true: When this granule will be written to disk all rows for corresponding mark will
/// be wrtten. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number),
/// We may have a lot of small blocks between two marks and this may be the last one.
bool is_complete;
};
/// Multiple granules to write for concrete block.

View File

@ -33,12 +33,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
size_t rows_left_in_block = block_rows - current_row;
result.emplace_back(Granule{
.start_row = current_row,
.granularity_rows = rows_left_in_last_mark,
.block_rows = std::min(rows_left_in_block, rows_left_in_last_mark),
.rows_to_write = std::min(rows_left_in_block, rows_left_in_last_mark),
.mark_number = current_mark,
.mark_on_start = false, /// Don't mark this granule because we have already marked it
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
});
current_row += rows_left_in_last_mark;
current_row += result.back().rows_to_write;
current_mark++;
}
@ -51,12 +51,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
/// save incomplete granule
result.emplace_back(Granule{
.start_row = current_row,
.granularity_rows = expected_rows_in_mark,
.block_rows = std::min(rows_left_in_block, expected_rows_in_mark),
.rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark),
.mark_number = current_mark,
.mark_on_start = true,
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
});
current_row += expected_rows_in_mark;
current_row += result.back().rows_to_write;
current_mark++;
}
@ -136,11 +136,12 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
};
}
void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written)
{
auto last_granule = granules_written.back();
/// If we didn't finished last granule than we will continue to write it from new block
if (!last_granule.isCompleted())
if (!last_granule.is_complete)
{
/// Shift forward except last granule
setCurrentMark(getCurrentMark() + granules_written.size() - 1);
@ -148,9 +149,9 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri
/// We wrote whole block in the same granule, but didn't finished it.
/// So add written rows to rows written in last_mark
if (still_in_the_same_granule)
rows_written_in_last_mark += last_granule.block_rows;
rows_written_in_last_mark += last_granule.rows_to_write;
else
rows_written_in_last_mark = last_granule.block_rows;
rows_written_in_last_mark = last_granule.rows_to_write;
}
else
{
@ -167,6 +168,23 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
if (compute_granularity)
{
size_t index_granularity_for_block = computeIndexGranularity(block);
if (rows_written_in_last_mark > 0)
{
size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
/// Previous granularity was much bigger than our new block's
/// granularity let's adjust it, because we want add new
/// heavy-weight blocks into small old granule.
if (rows_left_in_last_mark > index_granularity_for_block)
{
/// We have already written more rows than granularity of our block.
/// adjust last mark rows and flush to disk.
if (rows_written_in_last_mark >= index_granularity_for_block)
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
else /// We still can write some rows from new block into previous granule.
adjustLastMarkIfNeedAndFlushToDisk(index_granularity_for_block - rows_written_in_last_mark);
}
}
fillIndexGranularity(index_granularity_for_block, block.rows());
}
@ -281,10 +299,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
IDataType::SerializeBinaryBulkSettings & serialize_settings,
const Granule & granule)
{
if (granule.mark_on_start)
writeSingleMark(name, type, offset_columns, granule.granularity_rows, serialize_settings.path);
type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state);
type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
@ -309,6 +324,9 @@ void MergeTreeDataPartWriterWide::writeColumn(
WrittenOffsetColumns & offset_columns,
const Granules & granules)
{
if (granules.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty granules for column {}, current mark {}", backQuoteIfNeed(name), getCurrentMark());
auto [it, inserted] = serialization_states.emplace(name, nullptr);
if (inserted)
@ -326,8 +344,14 @@ void MergeTreeDataPartWriterWide::writeColumn(
for (const auto & granule : granules)
{
if (granule.granularity_rows > 0)
data_written = true;
data_written = true;
if (granule.mark_on_start)
{
if (last_non_written_marks.count(name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
last_non_written_marks[name] = getCurrentMarksForColumn(name, type, offset_columns, serialize_settings.path);
}
writeSingleGranule(
name,
@ -338,6 +362,17 @@ void MergeTreeDataPartWriterWide::writeColumn(
serialize_settings,
granule
);
if (granule.is_complete)
{
auto marks_it = last_non_written_marks.find(name);
if (marks_it == last_non_written_marks.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name));
for (const auto & mark : marks_it->second)
flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number));
last_non_written_marks.erase(marks_it);
}
}
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
@ -365,7 +400,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
bool must_be_last = false;
UInt64 offset_in_compressed_file = 0;
UInt64 offset_in_decompressed_block = 0;
UInt64 index_granularity_rows = 0;
UInt64 index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
size_t mark_num;
@ -379,7 +414,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
if (settings.can_use_adaptive_granularity)
DB::readBinary(index_granularity_rows, mrk_in);
else
index_granularity_rows = storage.getSettings()->index_granularity;
index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
if (must_be_last)
{
@ -404,8 +439,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
if (index_granularity_rows != index_granularity.getMarkRows(mark_num))
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}",
mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows);
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
data_part->getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount());
auto column = type.createColumn();
@ -415,8 +450,13 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
{
must_be_last = true;
}
else if (column->size() != index_granularity_rows)
/// Now they must be equal
if (column->size() != index_granularity_rows)
{
if (must_be_last && !settings.can_use_adaptive_granularity)
break;
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), actually in bin file {}, in mrk file {}",
mark_num, offset_in_compressed_file, offset_in_decompressed_block, column->size(), index_granularity.getMarkRows(mark_num));
@ -445,6 +485,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
WrittenOffsetColumns offset_columns;
if (rows_written_in_last_mark > 0)
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
bool write_final_mark = (with_final_mark && data_written);
@ -474,6 +516,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
serialization_states.clear();
#ifndef NDEBUG
/// Heavy weight validation of written data. Checks that we are able to read
/// data according to marks. Otherwise throws LOGICAL_ERROR (equal to about in debug mode)
for (const auto & column : columns_list)
{
if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes())
@ -537,4 +581,50 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_
rows_in_block);
}
void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark)
{
/// We can adjust marks only if we computed granularity for blocks.
/// Otherwise we cannot change granularity because it will differ from
/// other columns
if (compute_granularity && settings.can_use_adaptive_granularity)
{
if (getCurrentMark() != index_granularity.getMarksCount() - 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} (with {} rows) having rows offset {}, total marks {}",
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount());
index_granularity.popMark();
index_granularity.appendMark(new_rows_in_last_mark);
}
/// Last mark should be filled, otherwise it's a bug
if (last_non_written_marks.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}",
getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount());
if (rows_written_in_last_mark == new_rows_in_last_mark)
{
for (const auto & [name, marks] : last_non_written_marks)
{
for (const auto & mark : marks)
flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark()));
}
last_non_written_marks.clear();
if (compute_granularity && settings.can_use_adaptive_granularity)
{
/// Also we add mark to each skip index because all of them
/// already accumulated all rows from current adjusting mark
for (size_t i = 0; i < skip_indices.size(); ++i)
++skip_index_accumulated_marks[i];
/// This mark completed, go further
setCurrentMark(getCurrentMark() + 1);
/// Without offset
rows_written_in_last_mark = 0;
}
}
}
}

View File

@ -99,6 +99,14 @@ private:
/// in our index_granularity array.
void shiftCurrentMark(const Granules & granules_written);
/// Change rows in the last mark in index_granularity to new_rows_in_last_mark.
/// Flush all marks from last_non_written_marks to disk and increment current mark if already written rows
/// (rows_written_in_last_granule) equal to new_rows_in_last_mark.
///
/// This function used when blocks change granularity drastically and we have unfinished mark.
/// Also useful to have exact amount of rows in last (non-final) mark.
void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
@ -108,6 +116,10 @@ private:
using ColumnStreams = std::map<String, StreamPtr>;
ColumnStreams column_streams;
/// Non written marks to disk (for each column). Waiting until all rows for
/// this marks will be written to disk.
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;
MarksForColumns last_non_written_marks;
/// How many rows we have already written in the current mark.
/// More than zero when incoming blocks are smaller then their granularity.

View File

@ -17,6 +17,7 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h>
@ -373,6 +374,7 @@ StorageDistributed::StorageDistributed(
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
, rng(randomSeed())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -543,7 +545,8 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
}
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
if (!settings.insert_distributed_one_random_shard && !has_sharding_key
&& ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
{
throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
@ -890,6 +893,32 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
}
size_t StorageDistributed::getRandomShardIndex(const Cluster::ShardsInfo & shards)
{
UInt32 total_weight = 0;
for (const auto & shard : shards)
total_weight += shard.weight;
assert(total_weight > 0);
size_t res;
{
std::lock_guard lock(rng_mutex);
res = std::uniform_int_distribution<size_t>(0, total_weight - 1)(rng);
}
for (auto i = 0ul, s = shards.size(); i < s; ++i)
{
if (shards[i].weight > res)
return i;
res -= shards[i].weight;
}
__builtin_unreachable();
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : data_volume->getDisks())

View File

@ -10,7 +10,9 @@
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Interpreters/Cluster.h>
#include <pcg_random.hpp>
namespace DB
{
@ -24,9 +26,6 @@ using VolumePtr = std::shared_ptr<IVolume>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** A distributed table that resides on multiple servers.
* Uses data from the specified database and tables on each server.
*
@ -126,6 +125,8 @@ public:
NamesAndTypesList getVirtuals() const override;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
@ -198,6 +199,9 @@ protected:
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
};
}

View File

@ -68,121 +68,97 @@ void StorageSystemSettingsProfileElements::fillData(MutableColumns & res_columns
auto add_rows_for_single_element = [&](const String & owner_name, EntityType owner_type, const SettingsProfileElement & element, size_t & index)
{
switch (owner_type)
size_t old_num_rows = column_profile_name.size();
size_t new_num_rows = old_num_rows + 1;
size_t current_index = index++;
bool inserted_value = false;
if (!element.value.isNull() && !element.setting_name.empty())
{
case EntityType::SETTINGS_PROFILE:
{
column_user_name.insertDefault();
column_user_name_null_map.push_back(true);
column_role_name.insertDefault();
column_role_name_null_map.push_back(true);
column_profile_name.insertData(owner_name.data(), owner_name.length());
column_profile_name_null_map.push_back(false);
break;
}
case EntityType::USER:
{
column_user_name.insertData(owner_name.data(), owner_name.length());
column_user_name_null_map.push_back(false);
column_profile_name.insertDefault();
column_profile_name_null_map.push_back(true);
column_role_name.insertDefault();
column_role_name_null_map.push_back(true);
break;
}
case EntityType::ROLE:
{
column_user_name.insertDefault();
column_user_name_null_map.push_back(true);
column_role_name.insertData(owner_name.data(), owner_name.length());
column_role_name_null_map.push_back(false);
column_profile_name.insertDefault();
column_profile_name_null_map.push_back(true);
break;
}
default:
assert(false);
String str = Settings::valueToStringUtil(element.setting_name, element.value);
column_value.insertData(str.data(), str.length());
column_value_null_map.push_back(false);
inserted_value = true;
}
bool inserted_min = false;
if (!element.min_value.isNull() && !element.setting_name.empty())
{
String str = Settings::valueToStringUtil(element.setting_name, element.min_value);
column_min.insertData(str.data(), str.length());
column_min_null_map.push_back(false);
inserted_min = true;
}
bool inserted_max = false;
if (!element.max_value.isNull() && !element.setting_name.empty())
{
String str = Settings::valueToStringUtil(element.setting_name, element.max_value);
column_max.insertData(str.data(), str.length());
column_max_null_map.push_back(false);
inserted_max = true;
}
bool inserted_readonly = false;
if (element.readonly && !element.setting_name.empty())
{
column_readonly.push_back(*element.readonly);
column_readonly_null_map.push_back(false);
inserted_readonly = true;
}
bool inserted_setting_name = false;
if (inserted_value || inserted_min || inserted_max || inserted_readonly)
{
const auto & setting_name = element.setting_name;
column_setting_name.insertData(setting_name.data(), setting_name.size());
column_setting_name_null_map.push_back(false);
inserted_setting_name = true;
}
bool inserted_inherit_profile = false;
if (element.parent_profile)
{
auto parent_profile = access_control.tryReadName(*element.parent_profile);
if (parent_profile)
{
column_index.push_back(index++);
column_setting_name.insertDefault();
column_setting_name_null_map.push_back(true);
column_value.insertDefault();
column_value_null_map.push_back(true);
column_min.insertDefault();
column_min_null_map.push_back(true);
column_max.insertDefault();
column_max_null_map.push_back(true);
column_readonly.push_back(0);
column_readonly_null_map.push_back(true);
const String & parent_profile_str = *parent_profile;
column_inherit_profile.insertData(parent_profile_str.data(), parent_profile_str.length());
column_inherit_profile_null_map.push_back(false);
inserted_inherit_profile = true;
}
}
if (!element.setting_name.empty()
&& (!element.value.isNull() || !element.min_value.isNull() || !element.max_value.isNull() || element.readonly))
if (inserted_setting_name || inserted_inherit_profile)
{
const auto & setting_name = element.setting_name;
column_index.push_back(index++);
column_setting_name.insertData(setting_name.data(), setting_name.size());
column_setting_name_null_map.push_back(false);
if (element.value.isNull())
switch (owner_type)
{
column_value.insertDefault();
column_value_null_map.push_back(true);
}
else
{
String str = Settings::valueToStringUtil(setting_name, element.value);
column_value.insertData(str.data(), str.length());
column_value_null_map.push_back(false);
case EntityType::SETTINGS_PROFILE:
{
column_profile_name.insertData(owner_name.data(), owner_name.length());
column_profile_name_null_map.push_back(false);
break;
}
case EntityType::USER:
{
column_user_name.insertData(owner_name.data(), owner_name.length());
column_user_name_null_map.push_back(false);
break;
}
case EntityType::ROLE:
{
column_role_name.insertData(owner_name.data(), owner_name.length());
column_role_name_null_map.push_back(false);
break;
}
default:
assert(false);
}
if (element.min_value.isNull())
{
column_min.insertDefault();
column_min_null_map.push_back(true);
}
else
{
String str = Settings::valueToStringUtil(setting_name, element.min_value);
column_min.insertData(str.data(), str.length());
column_min_null_map.push_back(false);
}
column_index.push_back(current_index);
if (element.max_value.isNull())
{
column_max.insertDefault();
column_max_null_map.push_back(true);
}
else
{
String str = Settings::valueToStringUtil(setting_name, element.max_value);
column_max.insertData(str.data(), str.length());
column_max_null_map.push_back(false);
}
if (element.readonly)
{
column_readonly.push_back(*element.readonly);
column_readonly_null_map.push_back(false);
}
else
{
column_readonly.push_back(0);
column_readonly_null_map.push_back(true);
}
column_inherit_profile.insertDefault();
column_inherit_profile_null_map.push_back(true);
for (auto & res_column : res_columns)
res_column->insertManyDefaults(new_num_rows - res_column->size());
}
};

View File

@ -225,6 +225,18 @@
"with_coverage": false
}
},
"Functional stateless tests (ANTLR debug)": {
"required_build_properties": {
"compiler": "clang-11",
"package_type": "deb",
"build_type": "debug",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (release)": {
"required_build_properties": {
"compiler": "clang-11",

View File

@ -52,7 +52,7 @@
23.00
24.00
=== Try load data from datapage_v2.snappy.parquet
Code: 33. DB::Ex---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: data for INSERT was parsed from stdin
Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: data for INSERT was parsed from stdin
=== Try load data from dict-page-offset-zero.parquet
1552

View File

@ -18,7 +18,7 @@ INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-0
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test merge===';
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -27,7 +27,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test alter===';
ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String);
@ -38,7 +38,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test mutation===';
ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1 SETTINGS mutations_sync = 2;
@ -58,7 +58,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_with_pk;
@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL;
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_without_pk;
@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL;
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_with_small_granularity;

View File

@ -1 +1 @@
20000101_1_1_0 test_00961 5f2e2d4bbc14336f44037e3ac667f247 ed226557cd4e18ecf3ae06c6d5e6725c da96ff1e527a8a1f908ddf2b1d0af239
20000101_1_1_0 test_00961 b5fce9c4ef1ca42ce4ed027389c208d2 fc3b062b646cd23d4c23d7f5920f89ae da96ff1e527a8a1f908ddf2b1d0af239

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g')
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS check;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE check (x UInt64) ENGINE = Memory;"
(seq 1 2000000; echo 'hello'; seq 1 20000000) | $CLICKHOUSE_CLIENT --input_format_parallel_parsing=1 --min_chunk_bytes_for_parallel_parsing=1000 --query="INSERT INTO check(x) FORMAT TSV " 2>&1 | grep -q "(at row 2000001)" && echo 'OK' || echo 'FAIL' ||:
$CLICKHOUSE_CLIENT --query="DROP TABLE check;"

View File

@ -0,0 +1,2 @@
849
102400

View File

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS adaptive_table;
--- If granularity of consequent blocks differs a lot, then adaptive
--- granularity will adjust amout of marks correctly. Data for test empirically
--- derived, it's quite hard to get good parameters.
CREATE TABLE adaptive_table(
key UInt64,
value String
) ENGINE MergeTree()
ORDER BY key
SETTINGS index_granularity_bytes=1048576, min_bytes_for_wide_part = 0, enable_vertical_merge_algorithm = 0;
SET max_block_size=900;
-- There are about 900 marks for our settings.
INSERT INTO adaptive_table SELECT number, if(number > 700, randomPrintableASCII(102400), randomPrintableASCII(1)) FROM numbers(10000);
OPTIMIZE TABLE adaptive_table FINAL;
SELECT marks FROM system.parts WHERE table = 'adaptive_table' and database=currentDatabase() and active;
-- If we have computed granularity incorrectly than we will exceed this limit.
SET max_memory_usage='30M';
SELECT max(length(value)) FROM adaptive_table;
DROP TABLE IF EXISTS adaptive_table;

View File

@ -0,0 +1,20 @@
['0']
['1']
['2']
['3']
['4']
['5']
['6']
['7']
['8']
['9']
test.com ['foo3223','foo6455','foo382','foo5566','foo1037']
test.com0 ['foo0']
test.com0.0001 ['foo1']
test.com0.0002 ['foo2']
test.com0.0003 ['foo3']
test.com0.0004 ['foo4']
test.com0.0005 ['foo5']
test.com0.0006 ['foo6']
test.com0.0007 ['foo7']
test.com0.0008 ['foo8']

View File

@ -0,0 +1,25 @@
SET group_by_two_level_threshold_bytes = 1;
SET group_by_two_level_threshold = 1;
SELECT groupArray(DISTINCT toString(number % 10)) FROM numbers_mt(50000)
GROUP BY number ORDER BY number LIMIT 10
SETTINGS max_threads = 2, max_block_size = 2000;
DROP TABLE IF EXISTS dictinct_two_level;
CREATE TABLE dictinct_two_level (
time DateTime64(3),
domain String,
subdomain String
) ENGINE = MergeTree ORDER BY time;
INSERT INTO dictinct_two_level SELECT 1546300800000, 'test.com', concat('foo', toString(number % 10000)) from numbers(10000);
INSERT INTO dictinct_two_level SELECT 1546300800000, concat('test.com', toString(number / 10000)) , concat('foo', toString(number % 10000)) from numbers(10000);
SELECT
domain, groupArraySample(5, 11111)(DISTINCT subdomain) AS example_subdomains
FROM dictinct_two_level
GROUP BY domain ORDER BY domain, example_subdomains
LIMIT 10;
DROP TABLE IF EXISTS dictinct_two_level;

View File

@ -0,0 +1,2 @@
\N test_01605 \N 0 \N \N \N \N \N test_01605
PROFILE DROPPED

View File

@ -0,0 +1,8 @@
CREATE USER OR REPLACE 'test_01605';
CREATE SETTINGS PROFILE OR REPLACE 'test_01605';
ALTER USER 'test_01605' SETTINGS PROFILE 'test_01605';
SELECT * FROM system.settings_profile_elements WHERE user_name='test_01605' OR profile_name='test_01605';
DROP SETTINGS PROFILE 'test_01605';
SELECT 'PROFILE DROPPED';
SELECT * FROM system.settings_profile_elements WHERE user_name='test_01605' OR profile_name='test_01605';
DROP USER 'test_01605';

View File

@ -0,0 +1,8 @@
0
0
1
1
2
2
3
3

View File

@ -0,0 +1,22 @@
drop table if exists shard;
drop table if exists distr;
create table shard (id Int32) engine = MergeTree order by cityHash64(id);
create table distr as shard engine Distributed (test_cluster_two_shards_localhost, currentDatabase(), shard);
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 1;
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 0;
set insert_distributed_one_random_shard = 1;
insert into distr (id) values (0), (1);
insert into distr (id) values (2), (3);
select * from distr order by id;
drop table if exists shard;
drop table if exists distr;