Merge branch 'master' into memory-tracking-overloads

This commit is contained in:
mergify[bot] 2022-04-13 12:40:49 +00:00 committed by GitHub
commit 4fb66013fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 271 additions and 80 deletions

View File

@ -153,13 +153,19 @@ jobs:
EOF EOF
- name: Clear repository - name: Clear repository
run: | run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE" sudo rm -fr "$GITHUB_WORKSPACE"
- name: Check out repository code mkdir "$GITHUB_WORKSPACE"
uses: actions/checkout@v2
- name: Fast Test
run: |
sudo rm -fr "$TEMP_PATH" sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH" mkdir -p "$TEMP_PATH"
- name: Check out repository code
uses: actions/checkout@v2
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.TEMP_PATH }}
- name: Fast Test
run: |
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH" cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 fast_test_check.py cd "$REPO_COPY/tests/ci" && python3 fast_test_check.py
- name: Cleanup - name: Cleanup

View File

@ -115,6 +115,7 @@ function start_server
function clone_root function clone_root
{ {
git config --global --add safe.directory "$FASTTEST_SOURCE"
git clone --depth 1 https://github.com/ClickHouse/ClickHouse.git -- "$FASTTEST_SOURCE" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/clone_log.txt" git clone --depth 1 https://github.com/ClickHouse/ClickHouse.git -- "$FASTTEST_SOURCE" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/clone_log.txt"
( (

View File

@ -329,8 +329,8 @@ then
-e "Code: 1000, e.code() = 111, Connection refused" \ -e "Code: 1000, e.code() = 111, Connection refused" \
-e "UNFINISHED" \ -e "UNFINISHED" \
-e "Renaming unexpected part" \ -e "Renaming unexpected part" \
/var/log/clickhouse-server/clickhouse-server.backward.*.log | zgrep -Fa "<Error>" > /test_output/bc_check_error_messages.txt \ /var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "<Error>" > /test_output/bc_check_error_messages.txt \
&& echo -e 'Backward compatibility check: Error message in clickhouse-server.log (see bc_check_error_messages.txt)\tOK' >> /test_output/test_results.tsv \ && echo -e 'Backward compatibility check: Error message in clickhouse-server.log (see bc_check_error_messages.txt)\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'Backward compatibility check: No Error messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv || echo -e 'Backward compatibility check: No Error messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
# Remove file bc_check_error_messages.txt if it's empty # Remove file bc_check_error_messages.txt if it's empty
@ -346,7 +346,7 @@ then
# OOM # OOM
zgrep -Fa " <Fatal> Application: Child process was terminated by signal 9" /var/log/clickhouse-server/clickhouse-server.backward.*.log > /dev/null \ zgrep -Fa " <Fatal> Application: Child process was terminated by signal 9" /var/log/clickhouse-server/clickhouse-server.backward.*.log > /dev/null \
&& echo -e 'Backward compatibility check: OOM killer (or signal 9) in clickhouse-server.log\tOK' >> /test_output/test_results.tsv \ && echo -e 'Backward compatibility check: OOM killer (or signal 9) in clickhouse-server.log\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'Backward compatibility check: No OOM messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv || echo -e 'Backward compatibility check: No OOM messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
# Logical errors # Logical errors
@ -366,7 +366,7 @@ then
# It also checks for crash without stacktrace (printed by watchdog) # It also checks for crash without stacktrace (printed by watchdog)
echo "Check for Fatal message in server log:" echo "Check for Fatal message in server log:"
zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.backward.*.log > /test_output/bc_check_fatal_messages.txt \ zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.backward.*.log > /test_output/bc_check_fatal_messages.txt \
&& echo -e 'Backward compatibility check: Fatal message in clickhouse-server.log (see bc_check_fatal_messages.txt)\tOK' >> /test_output/test_results.tsv \ && echo -e 'Backward compatibility check: Fatal message in clickhouse-server.log (see bc_check_fatal_messages.txt)\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'Backward compatibility check: No fatal messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv || echo -e 'Backward compatibility check: No fatal messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv
# Remove file bc_check_fatal_messages.txt if it's empty # Remove file bc_check_fatal_messages.txt if it's empty

View File

@ -47,7 +47,7 @@ Optional parameters:
- `kafka_row_delimiter` — Delimiter character, which ends the message. - `kafka_row_delimiter` — Delimiter character, which ends the message.
- `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object. - `kafka_schema` — Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` — The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition. - `kafka_num_consumers` — The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition, and must not be greater than the number of physical cores on the server where ClickHouse is deployed.
- `kafka_max_block_size` — The maximum batch size (in messages) for poll (default: `max_block_size`). - `kafka_max_block_size` — The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). - `kafka_skip_broken_messages` — Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`). - `kafka_commit_every_batch` — Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).

View File

@ -137,14 +137,14 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
auto & dst_column_host_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["host_name"]]); auto & dst_column_host_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["host_name"]]);
auto & dst_array_current_time = typeid_cast<ColumnUInt32 &>(*mutable_columns[name_pos["current_time"]]).getData(); auto & dst_array_current_time = typeid_cast<ColumnUInt32 &>(*mutable_columns[name_pos["current_time"]]).getData();
auto & dst_array_thread_id = typeid_cast<ColumnUInt64 &>(*mutable_columns[name_pos["thread_id"]]).getData(); // auto & dst_array_thread_id = typeid_cast<ColumnUInt64 &>(*mutable_columns[name_pos["thread_id"]]).getData();
auto & dst_array_type = typeid_cast<ColumnInt8 &>(*mutable_columns[name_pos["type"]]).getData(); auto & dst_array_type = typeid_cast<ColumnInt8 &>(*mutable_columns[name_pos["type"]]).getData();
auto & dst_column_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["name"]]); auto & dst_column_name = typeid_cast<ColumnString &>(*mutable_columns[name_pos["name"]]);
auto & dst_array_value = typeid_cast<ColumnInt64 &>(*mutable_columns[name_pos["value"]]).getData(); auto & dst_array_value = typeid_cast<ColumnInt64 &>(*mutable_columns[name_pos["value"]]).getData();
const auto & src_column_host_name = typeid_cast<const ColumnString &>(*src.getByName("host_name").column); const auto & src_column_host_name = typeid_cast<const ColumnString &>(*src.getByName("host_name").column);
const auto & src_array_current_time = typeid_cast<const ColumnUInt32 &>(*src.getByName("current_time").column).getData(); const auto & src_array_current_time = typeid_cast<const ColumnUInt32 &>(*src.getByName("current_time").column).getData();
// const auto & src_array_thread_id = typeid_cast<const ColumnUInt64 &>(*src.getByName("thread_id").column).getData(); const auto & src_array_thread_id = typeid_cast<const ColumnUInt64 &>(*src.getByName("thread_id").column).getData();
const auto & src_column_name = typeid_cast<const ColumnString &>(*src.getByName("name").column); const auto & src_column_name = typeid_cast<const ColumnString &>(*src.getByName("name").column);
const auto & src_array_value = typeid_cast<const ColumnInt64 &>(*src.getByName("value").column).getData(); const auto & src_array_value = typeid_cast<const ColumnInt64 &>(*src.getByName("value").column).getData();
@ -169,6 +169,16 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
rows_by_name[id] = src_row; rows_by_name[id] = src_row;
} }
/// Filter out snapshots
std::set<size_t> thread_id_filter_mask;
for (size_t i = 0; i < src_array_thread_id.size(); ++i)
{
if (src_array_thread_id[i] != 0)
{
thread_id_filter_mask.emplace(i);
}
}
/// Merge src into dst. /// Merge src into dst.
for (size_t dst_row = 0; dst_row < dst_rows; ++dst_row) for (size_t dst_row = 0; dst_row < dst_rows; ++dst_row)
{ {
@ -180,6 +190,11 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
if (auto it = rows_by_name.find(id); it != rows_by_name.end()) if (auto it = rows_by_name.find(id); it != rows_by_name.end())
{ {
size_t src_row = it->second; size_t src_row = it->second;
if (thread_id_filter_mask.contains(src_row))
{
continue;
}
dst_array_current_time[dst_row] = src_array_current_time[src_row]; dst_array_current_time[dst_row] = src_array_current_time[src_row];
switch (dst_array_type[dst_row]) switch (dst_array_type[dst_row])
@ -199,24 +214,18 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
/// Copy rows from src that dst does not contains. /// Copy rows from src that dst does not contains.
for (const auto & [id, pos] : rows_by_name) for (const auto & [id, pos] : rows_by_name)
{ {
if (thread_id_filter_mask.contains(pos))
{
continue;
}
for (size_t col = 0; col < src.columns(); ++col) for (size_t col = 0; col < src.columns(); ++col)
{ {
mutable_columns[col]->insert((*src.getByPosition(col).column)[pos]); mutable_columns[col]->insert((*src.getByPosition(col).column)[pos]);
} }
} }
/// Filter out snapshots
std::set<size_t> thread_id_filter_mask;
for (size_t i = 0; i < dst_array_thread_id.size(); ++i)
{
if (dst_array_thread_id[i] != 0)
{
thread_id_filter_mask.emplace(i);
}
}
dst.setColumns(std::move(mutable_columns)); dst.setColumns(std::move(mutable_columns));
dst.erase(thread_id_filter_mask);
} }
@ -394,8 +403,16 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
if (need_render_progress && (stdout_is_a_tty || is_interactive) && !select_into_file) if (need_render_progress && (stdout_is_a_tty || is_interactive) && !select_into_file)
progress_indication.clearProgressOutput(); progress_indication.clearProgressOutput();
output_format->write(materializeBlock(block)); try
written_first_block = true; {
output_format->write(materializeBlock(block));
written_first_block = true;
}
catch (const Exception &)
{
/// Catch client errors like NO_ROW_DELIMITER
throw LocalFormatError(getCurrentExceptionMessage(print_stack_trace), getCurrentExceptionCode());
}
/// Received data block is immediately displayed to the user. /// Received data block is immediately displayed to the user.
output_format->flush(); output_format->flush();
@ -1413,6 +1430,8 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
progress_indication.clearProgressOutput(); progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(profile_events.last_block); logs_out_stream->writeProfileEvents(profile_events.last_block);
logs_out_stream->flush(); logs_out_stream->flush();
profile_events.last_block = {};
} }
if (is_interactive) if (is_interactive)
@ -1838,7 +1857,7 @@ void ClientBase::runInteractive()
} }
LineReader::Patterns query_extenders = {"\\"}; LineReader::Patterns query_extenders = {"\\"};
LineReader::Patterns query_delimiters = {";", "\\G"}; LineReader::Patterns query_delimiters = {";", "\\G", "\\G;"};
#if USE_REPLXX #if USE_REPLXX
replxx::Replxx::highlighter_callback_t highlight_callback{}; replxx::Replxx::highlighter_callback_t highlight_callback{};
@ -1860,9 +1879,13 @@ void ClientBase::runInteractive()
break; break;
has_vertical_output_suffix = false; has_vertical_output_suffix = false;
if (input.ends_with("\\G")) if (input.ends_with("\\G") || input.ends_with("\\G;"))
{ {
input.resize(input.size() - 2); if (input.ends_with("\\G"))
input.resize(input.size() - 2);
else if (input.ends_with("\\G;"))
input.resize(input.size() - 3);
has_vertical_output_suffix = true; has_vertical_output_suffix = true;
} }

View File

@ -201,9 +201,6 @@ void LocalConnection::finishQuery()
{ {
next_packet_type = Protocol::Server::EndOfStream; next_packet_type = Protocol::Server::EndOfStream;
if (!state)
return;
if (state->executor) if (state->executor)
{ {
state->executor.reset(); state->executor.reset();
@ -219,6 +216,7 @@ void LocalConnection::finishQuery()
state->io.onFinish(); state->io.onFinish();
state.reset(); state.reset();
last_sent_snapshots.clear();
} }
bool LocalConnection::poll(size_t) bool LocalConnection::poll(size_t)
@ -326,6 +324,21 @@ bool LocalConnection::poll(size_t)
} }
} }
if (state->is_finished && !state->sent_profile_events)
{
state->sent_profile_events = true;
if (send_profile_events && state->executor)
{
Block block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
getProfileEvents(block);
state->block.emplace(std::move(block));
return true;
}
}
if (state->is_finished) if (state->is_finished)
{ {
finishQuery(); finishQuery();

View File

@ -47,6 +47,7 @@ struct LocalQueryState
bool sent_extremes = false; bool sent_extremes = false;
bool sent_progress = false; bool sent_progress = false;
bool sent_profile_info = false; bool sent_profile_info = false;
bool sent_profile_events = false;
/// To output progress, the difference after the previous sending of progress. /// To output progress, the difference after the previous sending of progress.
Progress progress; Progress progress;

View File

@ -57,6 +57,8 @@ struct ZooKeeperRequest : virtual Request
bool restored_from_zookeeper_log = false; bool restored_from_zookeeper_log = false;
UInt64 request_created_time_ns = 0; UInt64 request_created_time_ns = 0;
UInt64 thread_id = 0;
String query_id;
ZooKeeperRequest() = default; ZooKeeperRequest() = default;
ZooKeeperRequest(const ZooKeeperRequest &) = default; ZooKeeperRequest(const ZooKeeperRequest &) = default;

View File

@ -8,6 +8,7 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <base/getThreadId.h>
#include <Common/config.h> #include <Common/config.h>
@ -1016,6 +1017,11 @@ void ZooKeeper::pushRequest(RequestInfo && info)
try try
{ {
info.time = clock::now(); info.time = clock::now();
if (zk_log)
{
info.request->thread_id = getThreadId();
info.request->query_id = String(CurrentThread::getQueryId());
}
if (!info.request->xid) if (!info.request->xid)
{ {
@ -1269,6 +1275,11 @@ void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const
elem.event_time = event_time; elem.event_time = event_time;
elem.address = socket_address; elem.address = socket_address;
elem.session_id = session_id; elem.session_id = session_id;
if (request)
{
elem.thread_id = request->thread_id;
elem.query_id = request->query_id;
}
maybe_zk_log->add(elem); maybe_zk_log->add(elem);
} }
} }

View File

@ -33,6 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader))); auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
++active_working_reader;
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }); schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
return true; return true;
@ -203,11 +204,6 @@ bool ParallelReadBuffer::nextImpl()
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker) void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{ {
{
std::lock_guard lock{mutex};
++active_working_reader;
}
SCOPE_EXIT({ SCOPE_EXIT({
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
--active_working_reader; --active_working_reader;

View File

@ -250,21 +250,6 @@ bool MergeTreeTransaction::rollback() noexcept
/// Discard changes in active parts set /// Discard changes in active parts set
/// Remove parts that were created, restore parts that were removed (except parts that were created by this transaction too) /// Remove parts that were created, restore parts that were removed (except parts that were created by this transaction too)
for (const auto & part : parts_to_remove)
{
if (part->version.isRemovalTIDLocked())
{
/// Don't need to remove part from working set if it was created and removed by this transaction
assert(part->version.removal_tid_lock == tid.getHash());
continue;
}
/// FIXME do not lock removal_tid when rolling back part creation, it's ugly
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part}, true);
}
for (const auto & part : parts_to_activate)
if (part->version.getCreationTID() != tid)
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
/// Kind of optimization: cleanup thread can remove these parts immediately /// Kind of optimization: cleanup thread can remove these parts immediately
for (const auto & part : parts_to_remove) for (const auto & part : parts_to_remove)
@ -274,6 +259,18 @@ bool MergeTreeTransaction::rollback() noexcept
part->appendCSNToVersionMetadata(VersionMetadata::CREATION); part->appendCSNToVersionMetadata(VersionMetadata::CREATION);
} }
for (const auto & part : parts_to_remove)
{
/// NOTE It's possible that part is already removed from working set in the same transaction
/// (or, even worse, in a separate non-transactional query with PrehistoricTID),
/// but it's not a problem: removePartsFromWorkingSet(...) will do nothing in this case.
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part}, true);
}
for (const auto & part : parts_to_activate)
if (part->version.getCreationTID() != tid)
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
for (const auto & part : parts_to_activate) for (const auto & part : parts_to_activate)
{ {
/// Clear removal_tid from version metadata file, so we will not need to distinguish TIDs that were not committed /// Clear removal_tid from version metadata file, so we will not need to distinguish TIDs that were not committed

View File

@ -105,7 +105,7 @@ void getProfileEvents(
{"value", std::make_shared<DataTypeInt64>()}, {"value", std::make_shared<DataTypeInt64>()},
}; };
ColumnsWithTypeAndName temp_columns; ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types) for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name); temp_columns.emplace_back(name_and_type.type, name_and_type.name);

View File

@ -212,7 +212,7 @@ void TransactionLog::runUpdatingThread()
if (stop_flag.load()) if (stop_flag.load())
return; return;
if (!zookeeper) if (getZooKeeper()->expired())
{ {
auto new_zookeeper = global_context->getZooKeeper(); auto new_zookeeper = global_context->getZooKeeper();
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
@ -222,16 +222,11 @@ void TransactionLog::runUpdatingThread()
loadNewEntries(); loadNewEntries();
removeOldEntries(); removeOldEntries();
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception &)
{ {
tryLogCurrentException(log); tryLogCurrentException(log);
/// TODO better backoff /// TODO better backoff
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
if (Coordination::isHardwareError(e.code))
{
std::lock_guard lock{mutex};
zookeeper.reset();
}
log_updated_event->set(); log_updated_event->set();
} }
catch (...) catch (...)

View File

@ -116,6 +116,8 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"type", std::move(type_enum)}, {"type", std::move(type_enum)},
{"event_date", std::make_shared<DataTypeDate>()}, {"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime64>(6)}, {"event_time", std::make_shared<DataTypeDateTime64>(6)},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"query_id", std::make_shared<DataTypeString>()},
{"address", DataTypeFactory::instance().get("IPv6")}, {"address", DataTypeFactory::instance().get("IPv6")},
{"port", std::make_shared<DataTypeUInt16>()}, {"port", std::make_shared<DataTypeUInt16>()},
{"session_id", std::make_shared<DataTypeInt64>()}, {"session_id", std::make_shared<DataTypeInt64>()},
@ -164,6 +166,8 @@ void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
auto event_time_seconds = event_time / 1000000; auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType()); columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time); columns[i++]->insert(event_time);
columns[i++]->insert(thread_id);
columns[i++]->insert(query_id);
columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16); columns[i++]->insertData(IPv6ToBinary(address.host()).data(), 16);
columns[i++]->insert(address.port()); columns[i++]->insert(address.port());
columns[i++]->insert(session_id); columns[i++]->insert(session_id);

View File

@ -22,6 +22,8 @@ struct ZooKeeperLogElement
Type type = UNKNOWN; Type type = UNKNOWN;
Decimal64 event_time = 0; Decimal64 event_time = 0;
UInt64 thread_id = 0;
String query_id;
Poco::Net::SocketAddress address; Poco::Net::SocketAddress address;
Int64 session_id = 0; Int64 session_id = 0;

View File

@ -3,6 +3,7 @@
#include <Processors/QueryPlan/LimitStep.h> #include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h> #include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/SortingStep.h> #include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations namespace DB::QueryPlanOptimizations
@ -66,6 +67,11 @@ size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
if (typeid_cast<const TotalsHavingStep *>(child.get())) if (typeid_cast<const TotalsHavingStep *>(child.get()))
return 0; return 0;
/// Disable for WindowStep.
/// TODO: we can push down limit in some cases if increase the limit value.
if (typeid_cast<const WindowStep *>(child.get()))
return 0;
/// Now we should decide if pushing down limit possible for this step. /// Now we should decide if pushing down limit possible for this step.
const auto & transform_traits = transforming->getTransformTraits(); const auto & transform_traits = transforming->getTransformTraits();

View File

@ -728,6 +728,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
return; return;
sendData({}); sendData({});
last_sent_snapshots.clear();
} }
sendProgress(); sendProgress();

View File

@ -76,4 +76,5 @@ def get_images_with_versions(
def get_image_with_version(reports_path, image, pull=True, version=None): def get_image_with_version(reports_path, image, pull=True, version=None):
logging.info("Looking for images file in %s", reports_path)
return get_images_with_versions(reports_path, [image], pull, version=version)[0] return get_images_with_versions(reports_path, [image], pull, version=version)[0]

View File

@ -1407,3 +1407,24 @@ def test_insert_select_schema_inference(started_cluster):
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')" f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')"
) )
assert int(result) == 1 assert int(result) == 1
def test_parallel_reading_with_memory_limit(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(100000)"
)
result = instance.query_and_get_error(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') settings max_memory_usage=10000"
)
assert "Memory limit (for query) exceeded" in result
sleep(5)
# Check that server didn't crash
result = instance.query("select 1")
assert int(result) == 1

View File

@ -1,5 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
@ -12,7 +14,7 @@ function run_selects()
{ {
thread_num=$1 thread_num=$1
readarray -t tables_arr < <(${CLICKHOUSE_CLIENT} -q "SELECT database || '.' || name FROM system.tables readarray -t tables_arr < <(${CLICKHOUSE_CLIENT} -q "SELECT database || '.' || name FROM system.tables
WHERE database in ('system', 'information_schema', 'INFORMATION_SCHEMA') and name!='zookeeper' and name!='merge_tree_metadata_cache' WHERE database in ('system', 'information_schema', 'INFORMATION_SCHEMA') and name!='zookeeper' and name!='merge_tree_metadata_cache'
AND sipHash64(name || toString($RAND)) % $THREADS = $thread_num") AND sipHash64(name || toString($RAND)) % $THREADS = $thread_num")
for t in "${tables_arr[@]}" for t in "${tables_arr[@]}"

View File

@ -6,7 +6,7 @@
3 all_1_1_0 0 3 all_1_1_0 0
3 all_3_3_0 1 3 all_3_3_0 1
4 all_1_1_0 1 (0,0,'00000000-0000-0000-0000-000000000000') 0 4 all_1_1_0 1 (0,0,'00000000-0000-0000-0000-000000000000') 0
4 all_2_2_0 18446744073709551615 (1,1,'00000000-0000-0000-0000-000000000000') 0 4 all_2_2_0 18446744073709551615 (0,0,'00000000-0000-0000-0000-000000000000') 0
4 all_3_3_0 0 (0,0,'00000000-0000-0000-0000-000000000000') 0 4 all_3_3_0 0 (0,0,'00000000-0000-0000-0000-000000000000') 0
5 1 5 1
6 all_1_1_0 0 6 all_1_1_0 0
@ -19,7 +19,6 @@
1 1 AddPart 1 1 1 1 all_1_1_0 1 1 AddPart 1 1 1 1 all_1_1_0
2 1 Begin 1 1 1 1 2 1 Begin 1 1 1 1
2 1 AddPart 1 1 1 1 all_2_2_0 2 1 AddPart 1 1 1 1 all_2_2_0
1 1 LockPart 1 1 1 1 all_2_2_0
2 1 Rollback 1 1 1 1 2 1 Rollback 1 1 1 1
3 1 Begin 1 1 1 1 3 1 Begin 1 1 1 1
3 1 AddPart 1 1 1 1 all_3_3_0 3 1 AddPart 1 1 1 1 all_3_3_0

View File

@ -27,6 +27,11 @@ expect "Row 1:"
expect "1: 1" expect "1: 1"
expect ":) " expect ":) "
send -- "SELECT 1\\G;\r"
expect "Row 1:"
expect "1: 1"
expect ":) "
send -- "SELECT 1\\\r" send -- "SELECT 1\\\r"
expect ":-] " expect ":-] "
send -- ", 2\r" send -- ", 2\r"
@ -41,6 +46,14 @@ expect "1: 1"
expect "2: 2" expect "2: 2"
expect ":) " expect ":) "
send -- "SELECT 1\\\r"
expect ":-] "
send -- ", 2\\G;\r"
expect "Row 1:"
expect "1: 1"
expect "2: 2"
expect ":) "
send -- "" send -- ""
expect eof expect eof
@ -56,6 +69,11 @@ expect "Row 1:"
expect "1: 1" expect "1: 1"
expect ":) " expect ":) "
send -- "SELECT 1\\G;\r"
expect "Row 1:"
expect "1: 1"
expect ":) "
send -- "SELECT 1; \r" send -- "SELECT 1; \r"
expect "│ 1 │" expect "│ 1 │"
expect ":) " expect ":) "
@ -65,6 +83,11 @@ expect "Row 1:"
expect "1: 1" expect "1: 1"
expect ":) " expect ":) "
send -- "SELECT 1\\G; \r"
expect "Row 1:"
expect "1: 1"
expect ":) "
send -- "SELECT 1\r" send -- "SELECT 1\r"
expect ":-] " expect ":-] "
send -- ";\r" send -- ";\r"
@ -78,6 +101,13 @@ expect "Row 1:"
expect "1: 1" expect "1: 1"
expect ":) " expect ":) "
send -- "SELECT 1\r"
expect ":-] "
send -- "\\G;\r"
expect "Row 1:"
expect "1: 1"
expect ":) "
send -- "SELECT 1\r" send -- "SELECT 1\r"
expect ":-] " expect ":-] "
send -- ", 2;\r" send -- ", 2;\r"
@ -92,5 +122,14 @@ expect "1: 1"
expect "2: 2" expect "2: 2"
expect ":) " expect ":) "
send -- "SELECT 1\r"
expect ":-] "
send -- ", 2\\G;\r"
expect "Row 1:"
expect "1: 1"
expect "2: 2"
expect ":) "
send -- "" send -- ""
expect eof expect eof

View File

@ -23,6 +23,12 @@ expect "Row 1:"
expect "1: 1" expect "1: 1"
expect ":) " expect ":) "
send -- "SELECT 1\\G;\r"
expect "Row 1:"
expect "1: 1"
expect ":) "
send -- "SELECT 1\\\r" send -- "SELECT 1\\\r"
expect ":-] " expect ":-] "
send -- ", 2\r" send -- ", 2\r"
@ -37,5 +43,14 @@ expect "1: 1"
expect "2: 2" expect "2: 2"
expect ":) " expect ":) "
send -- "SELECT 1\\\r"
expect ":-] "
send -- ", 2\\G;\r"
expect "Row 1:"
expect "1: 1"
expect "2: 2"
expect ":) "
send -- "" send -- ""
expect eof expect eof

View File

@ -1,5 +1,18 @@
do not print any ProfileEvents packets
0 0
100000 print only last (and also number of rows to provide more info in case of failures)
[ 0 ] SelectedRows: 131010 (increment) [ 0 ] SelectedRows: 131010 (increment)
regression test for incorrect filtering out snapshots
0
regression test for overlap profile events snapshots between queries
[ 0 ] SelectedRows: 1 (increment)
[ 0 ] SelectedRows: 1 (increment)
regression test for overlap profile events snapshots between queries (clickhouse-local)
[ 0 ] SelectedRows: 1 (increment)
[ 0 ] SelectedRows: 1 (increment)
print everything
OK OK
print each 100 ms
OK
check that ProfileEvents is new for each query
OK OK

View File

@ -4,13 +4,30 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
# do not print any ProfileEvents packets echo 'do not print any ProfileEvents packets'
$CLICKHOUSE_CLIENT -q 'select * from numbers(1e5) format Null' |& grep -c 'SelectedRows' $CLICKHOUSE_CLIENT -q 'select * from numbers(1e5) format Null' |& grep -c 'SelectedRows'
# print only last (and also number of rows to provide more info in case of failures)
$CLICKHOUSE_CLIENT --max_block_size=65505 --print-profile-events --profile-events-delay-ms=-1 -q 'select * from numbers(1e5)' 2> >(grep -o -e '\[ 0 \] SelectedRows: .*$' -e Exception) 1> >(wc -l) echo 'print only last (and also number of rows to provide more info in case of failures)'
# print everything $CLICKHOUSE_CLIENT --max_block_size=65505 --print-profile-events --profile-events-delay-ms=-1 -q 'select * from numbers(1e5)' |& grep -o -e '\[ 0 \] SelectedRows: .*$' -e Exception
echo 'regression test for incorrect filtering out snapshots'
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -n -q 'select 1; select 1' >& /dev/null
echo $?
echo 'regression test for overlap profile events snapshots between queries'
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -n -q 'select 1; select 1' |& grep -F -o '[ 0 ] SelectedRows: 1 (increment)'
echo 'regression test for overlap profile events snapshots between queries (clickhouse-local)'
$CLICKHOUSE_LOCAL --print-profile-events --profile-events-delay-ms=-1 -n -q 'select 1; select 1' |& grep -F -o '[ 0 ] SelectedRows: 1 (increment)'
echo 'print everything'
profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events -q 'select sleep(1) from numbers(2) format Null' |& grep -c 'SelectedRows')" profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events -q 'select sleep(1) from numbers(2) format Null' |& grep -c 'SelectedRows')"
test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)" test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)"
# print each 100 ms
echo 'print each 100 ms'
profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events --profile-events-delay-ms=100 -q 'select sleep(1) from numbers(2) format Null' |& grep -c 'SelectedRows')" profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events --profile-events-delay-ms=100 -q 'select sleep(1) from numbers(2) format Null' |& grep -c 'SelectedRows')"
test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)" test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)"
echo 'check that ProfileEvents is new for each query'
sleep_function_calls=$($CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -n -q 'select sleep(1); select 1' |& grep -c 'SleepFunctionCalls')
test "$sleep_function_calls" -eq 1 && echo OK || echo "FAIL ($sleep_function_calls)"

View File

@ -0,0 +1,10 @@
0 1
1 2
2 3
a 2
0 10000000
1 10000000
2 10000000
0 10000000
1 10000000
2 10000000

View File

@ -0,0 +1,16 @@
SELECT
number,
leadInFrame(number) OVER w AS W
FROM numbers(10)
WINDOW w AS (ORDER BY number ASC Rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
LIMIT 3;
WITH arrayJoin(['a', 'a', 'b', 'b']) AS field
SELECT
field,
count() OVER (PARTITION BY field)
ORDER BY field ASC
LIMIT 1;
select * from ( ( select *, count() over () cnt from ( select * from numbers(10000000) ) ) ) limit 3 ;
select * from ( ( select *, count() over () cnt from ( select * from numbers(10000000) ) ) ) order by number limit 3 ;

View File

@ -9,11 +9,11 @@ FORMATS=('CSV' 'CSVWithNames')
for format in "${FORMATS[@]}" for format in "${FORMATS[@]}"
do do
echo "$format, false"; echo "$format, false";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum
echo "$format, true"; echo "$format, true";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=true -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c Format $format" | md5sum
done done

View File

@ -11,10 +11,10 @@ FORMATS=('JSONEachRow' 'JSONCompactEachRow' 'JSONCompactStringsEachRow' 'JSONCom
for format in "${FORMATS[@]}" for format in "${FORMATS[@]}"
do do
echo "$format, false"; echo "$format, false";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
echo "$format, true"; echo "$format, true";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=true -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=true -q \
"SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum "SELECT ClientEventTime::DateTime('Asia/Dubai') as a, MobilePhoneModel as b, ClientIP6 as c FROM test.hits ORDER BY a, b, c LIMIT 3000000 Format $format" | md5sum
done done

View File

@ -13,9 +13,9 @@ do
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()" $CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()"
echo "$format, false"; echo "$format, false";
$CLICKHOUSE_CLIENT --max_block_size=65505 --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --max_block_size=65505 --output_format_parallel_formatting=false -q \
"SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \ "SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \
$CLICKHOUSE_CLIENT --max_block_size=65505 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=false -q "INSERT INTO parsing_with_names FORMAT $format" $CLICKHOUSE_CLIENT --max_threads=0 --max_block_size=65505 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=false -q "INSERT INTO parsing_with_names FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum $CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
@ -23,9 +23,9 @@ do
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()" $CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()"
echo "$format, true"; echo "$format, true";
$CLICKHOUSE_CLIENT --max_block_size=65505 --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --max_block_size=65505 --output_format_parallel_formatting=false -q \
"SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \ "SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 50000 Format $format" | \
$CLICKHOUSE_CLIENT --max_block_size=65505 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=true -q "INSERT INTO parsing_with_names FORMAT $format" $CLICKHOUSE_CLIENT --max_threads=0 --max_block_size=65505 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=true -q "INSERT INTO parsing_with_names FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum $CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"

View File

@ -13,9 +13,9 @@ do
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()" $CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()"
echo "$format, false"; echo "$format, false";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 5000 Format $format" | \ "SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 5000 Format $format" | \
$CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=false -q "INSERT INTO parsing_with_names SETTINGS input_format_null_as_default=0 FORMAT $format" $CLICKHOUSE_CLIENT --max_threads=0 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=false -q "INSERT INTO parsing_with_names SETTINGS input_format_null_as_default=0 FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum $CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"
@ -23,9 +23,9 @@ do
$CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()" $CLICKHOUSE_CLIENT -q "CREATE TABLE parsing_with_names(c FixedString(16), a DateTime('Asia/Dubai'), b String) ENGINE=Memory()"
echo "$format, true"; echo "$format, true";
$CLICKHOUSE_CLIENT --output_format_parallel_formatting=false -q \ $CLICKHOUSE_CLIENT --max_threads=0 --output_format_parallel_formatting=false -q \
"SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 5000 Format $format" | \ "SELECT URLRegions as d, toTimeZone(ClientEventTime, 'Asia/Dubai') as a, MobilePhoneModel as b, ParamPrice as e, ClientIP6 as c FROM test.hits LIMIT 5000 Format $format" | \
$CLICKHOUSE_CLIENT --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=true -q "INSERT INTO parsing_with_names SETTINGS input_format_null_as_default=0 FORMAT $format" $CLICKHOUSE_CLIENT --max_threads=0 --input_format_skip_unknown_fields=1 --input_format_parallel_parsing=true -q "INSERT INTO parsing_with_names SETTINGS input_format_null_as_default=0 FORMAT $format"
$CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum $CLICKHOUSE_CLIENT -q "SELECT * FROM parsing_with_names;" | md5sum
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names" $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS parsing_with_names"