Merge branch 'master' into fix-multiply-row-policies-on-same-column

This commit is contained in:
mergify[bot] 2021-12-15 00:05:49 +00:00 committed by GitHub
commit 003aeda627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 249 additions and 73 deletions

View File

@ -111,19 +111,6 @@ function start_server
fi
echo "ClickHouse server pid '$server_pid' started and responded"
echo "
set follow-fork-mode child
handle all noprint
handle SIGSEGV stop print
handle SIGBUS stop print
handle SIGABRT stop print
continue
thread apply all backtrace
continue
" > script.gdb
gdb -batch -command script.gdb -p "$server_pid" &
}
function clone_root

View File

@ -155,17 +155,34 @@ function fuzz
kill -0 $server_pid
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode child
handle all noprint
handle SIGSEGV stop print
handle SIGBUS stop print
continue
thread apply all backtrace
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
info locals
detach
quit
" > script.gdb
sudo gdb -batch -command script.gdb -p $server_pid &
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
# Check connectivity after we attach gdb, because it might cause the server
# to freeze and the fuzzer will fail.

View File

@ -128,14 +128,26 @@ function start()
counter=$((counter + 1))
done
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode child
handle all noprint
handle SIGSEGV stop print
handle SIGBUS stop print
handle SIGABRT stop print
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
thread apply all backtrace
backtrace full
info locals
detach
quit
" > script.gdb
@ -143,7 +155,10 @@ quit
# FIXME Hung check may work incorrectly because of attached gdb
# 1. False positives are possible
# 2. We cannot attach another gdb to get stacktraces if some queries hung
sudo gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" >> /test_output/gdb.log &
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
}
configure
@ -214,6 +229,9 @@ zgrep -Fa " <Fatal> " /var/log/clickhouse-server/clickhouse-server.log* > /dev/n
zgrep -Fa "########################################" /test_output/* > /dev/null \
&& echo -e 'Killed by signal (output files)\tFAIL' >> /test_output/test_results.tsv
zgrep -Fa " received signal " /test_output/gdb.log > /dev/null \
&& echo -e 'Found signal in gdb.log\tFAIL' >> /test_output/test_results.tsv
# Put logs into /test_output/
for log_file in /var/log/clickhouse-server/clickhouse-server.log*
do

View File

@ -298,13 +298,16 @@ Note that elements emitted by a late firing should be treated as updated results
### Monitoring New Windows {#window-view-monitoring}
Window view supports the `WATCH` query to constantly append the processing results to the console or use `TO` syntax to output the results to a table.
Window view supports the [WATCH](../../../sql-reference/statements/watch.md) query to monitoring changes, or use `TO` syntax to output the results to a table.
``` sql
WATCH [db.]name [LIMIT n]
WATCH [db.]window_view
[EVENTS]
[LIMIT n]
[FORMAT format]
```
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query.
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query. The `EVENTS` clause can be used to obtain a short form of the `WATCH` query where instead of the query result you will just get the latest query watermark.
### Settings {#window-view-settings}

View File

@ -511,7 +511,7 @@ public:
virtual void shutdown() {}
/// Called before shutdown() to flush data to underlying storage
/// (for Buffer)
/// Data in memory need to be persistent
virtual void flush() {}
/// Asks table to stop executing some action identified by action_type

View File

@ -1525,6 +1525,24 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
}
}
void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (getSettings()->in_memory_parts_enable_wal)
return;
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVector();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
const auto & storage_relative_path = part_in_memory->storage.relative_data_path;
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->relative_path, metadata_snapshot);
}
}
}
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);

View File

@ -548,6 +548,9 @@ public:
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// When WAL is not enabled, the InMemoryParts need to be persistent.
void flushAllInMemoryPartsIfNeeded();
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);

View File

@ -139,6 +139,10 @@ void StorageMergeTree::startup()
}
}
void StorageMergeTree::flush()
{
flushAllInMemoryPartsIfNeeded();
}
void StorageMergeTree::shutdown()
{

View File

@ -31,6 +31,7 @@ class StorageMergeTree final : public shared_ptr_helper<StorageMergeTree>, publi
friend struct shared_ptr_helper<StorageMergeTree>;
public:
void startup() override;
void flush() override;
void shutdown() override;
~StorageMergeTree() override;

View File

@ -527,7 +527,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
for (auto & watch_stream : watch_streams)
{
if (auto watch_stream_ptr = watch_stream.lock())
watch_stream_ptr->addBlock(block);
watch_stream_ptr->addBlock(block, watermark);
}
}
if (!target_table_id.empty())
@ -910,7 +910,11 @@ Pipe StorageWindowView::watch(
}
auto reader = std::make_shared<WindowViewSource>(
*this, has_limit, limit,
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
query.is_watch_events,
window_view_timezone,
has_limit,
limit,
local_context->getSettingsRef().window_view_heartbeat_interval.totalSeconds());
std::lock_guard lock(fire_signal_mutex);
@ -1077,7 +1081,8 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column #{} of time zone argument of function, must be constant string",
time_zone_arg_num);
time_zone = &DateLUT::instance(time_zone_ast->value.safeGet<String>());
window_view_timezone = time_zone_ast->value.safeGet<String>();
time_zone = &DateLUT::instance(window_view_timezone);
}
else
time_zone = &DateLUT::instance();
@ -1354,9 +1359,12 @@ Block & StorageWindowView::getHeader() const
sample_block = InterpreterSelectQuery(
select_query->clone(), window_view_context, getParentStorage(), nullptr,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
/// convert all columns to full columns
/// in case some of them are constant
for (size_t i = 0; i < sample_block.columns(); ++i)
{
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
}
}
return sample_block;
}

View File

@ -210,6 +210,7 @@ private:
BackgroundSchedulePool::TaskHolder clean_cache_task;
BackgroundSchedulePool::TaskHolder fire_task;
String window_view_timezone;
String function_now_timezone;
ASTPtr innerQueryParser(const ASTSelectQuery & query);

View File

@ -11,83 +11,109 @@ class WindowViewSource : public SourceWithProgress
{
public:
WindowViewSource(
StorageWindowView & storage_,
std::shared_ptr<StorageWindowView> storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: SourceWithProgress(storage_.getHeader())
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_->getHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
, has_limit(has_limit_)
, limit(limit_)
, heartbeat_interval_sec(heartbeat_interval_sec_) {}
, heartbeat_interval_sec(heartbeat_interval_sec_)
{
if (is_events)
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage->getHeader();
}
String getName() const override { return "WindowViewSource"; }
void addBlock(Block block_)
void addBlock(Block block_, UInt32 watermark)
{
std::lock_guard lock(blocks_mutex);
blocks.push_back(std::move(block_));
blocks_with_watermark.push_back({std::move(block_), watermark});
}
protected:
Block getHeader() const { return storage.getHeader(); }
Block getHeader() const { return header; }
Chunk generate() override
{
auto block = generateImpl();
return Chunk(block.getColumns(), block.rows());
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (is_events)
{
return Chunk(
{DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
block.rows());
}
else
{
return Chunk(block.getColumns(), block.rows());
}
}
Block generateImpl()
std::pair<Block, UInt32> generateImpl()
{
Block res;
if (has_limit && num_updates == static_cast<Int64>(limit))
return Block();
return {Block(), 0};
if (isCancelled() || storage.shutdown_called)
return Block();
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
std::unique_lock lock(blocks_mutex);
if (blocks.empty())
if (blocks_with_watermark.empty())
{
if (!end_of_blocks)
{
end_of_blocks = true;
num_updates += 1;
return getHeader();
return {getHeader(), 0};
}
storage.fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
storage->fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
if (isCancelled() || storage.shutdown_called)
if (isCancelled() || storage->shutdown_called)
{
return Block();
return {Block(), 0};
}
if (blocks.empty())
return getHeader();
if (blocks_with_watermark.empty())
return {getHeader(), 0};
else
{
end_of_blocks = false;
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
else
{
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
private:
StorageWindowView & storage;
std::shared_ptr<StorageWindowView> storage;
BlocksList blocks;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;
Block header;
const bool is_events;
String window_view_timezone;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;

View File

@ -189,7 +189,7 @@ if __name__ == "__main__":
logging.info("cache was not fetched, will create empty dir")
os.makedirs(ccache_path)
if build_config['package_type'] == "performance":
if build_config['package_type'] == "performance" and pr_info.number != 0:
# because perf tests store some information about git commits
subprocess.check_call(f"cd {repo_path} && git fetch origin master:master", shell=True)

View File

@ -1101,15 +1101,13 @@ class ClickHouseCluster:
info = self.mysql_client_container.client.api.inspect_container(self.mysql_client_container.name)
if info['State']['Health']['Status'] == 'healthy':
logging.debug("Mysql Client Container Started")
break
return
time.sleep(1)
return
except Exception as ex:
errors += [str(ex)]
time.sleep(1)
run_and_check(['docker-compose', 'ps', '--services', '--all'])
run_and_check(['docker', 'ps', '--all'])
logging.error("Can't connect to MySQL Client:{}".format(errors))
raise Exception("Cannot wait MySQL Client container")

View File

@ -54,3 +54,11 @@ def test_system_logs_non_empty_queue():
'log_queries_min_type': 'QUERY_START',
})
node.query('SYSTEM FLUSH LOGS')
def test_system_suspend():
node.query("CREATE TABLE t (x DateTime) ENGINE=Memory;")
node.query("INSERT INTO t VALUES (now());")
node.query("SYSTEM SUSPEND FOR 1 SECOND;")
node.query("INSERT INTO t VALUES (now());")
assert "1\n" == node.query("SELECT max(x) - min(x) >= 1 FROM t;")

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
# Tags: no-parallel
import os
import sys
import signal
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from client import client, prompt, end_of_block
log = None
# uncomment the line below for debugging
# log=sys.stdout
with client(name='client1>', log=log) as client1, client(name='client2>', log=log) as client2:
client1.expect(prompt)
client2.expect(prompt)
client1.send('SET allow_experimental_window_view = 1')
client1.expect(prompt)
client1.send('SET window_view_heartbeat_interval = 1')
client1.expect(prompt)
client2.send('SET allow_experimental_window_view = 1')
client2.expect(prompt)
client1.send('CREATE DATABASE IF NOT EXISTS 01070_window_view_watch_events')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS 01070_window_view_watch_events.mt NO DELAY')
client1.expect(prompt)
client1.send('DROP TABLE IF EXISTS 01070_window_view_watch_events.wv NO DELAY')
client1.expect(prompt)
client1.send("CREATE TABLE 01070_window_view_watch_events.mt(a Int32, timestamp DateTime('US/Samoa')) ENGINE=MergeTree ORDER BY tuple()")
client1.expect(prompt)
client1.send("CREATE WINDOW VIEW 01070_window_view_watch_events.wv WATERMARK=ASCENDING AS SELECT count(a) AS count, tumbleEnd(wid) AS w_end FROM 01070_window_view_watch_events.mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid")
client1.expect(prompt)
client1.send('WATCH 01070_window_view_watch_events.wv EVENTS')
client1.expect('Query id' + end_of_block)
client2.send("INSERT INTO 01070_window_view_watch_events.mt VALUES (1, '1990/01/01 12:00:00');")
client2.expect("Ok.")
client2.send("INSERT INTO 01070_window_view_watch_events.mt VALUES (1, '1990/01/01 12:00:06');")
client2.expect("Ok.")
client1.expect('1990-01-01 12:00:05' + end_of_block)
client1.expect('Progress: 1.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)
if match.groups()[1]:
client1.send(client1.command)
client1.expect(prompt)
client1.send('DROP TABLE 01070_window_view_watch_events.wv NO DELAY;')
client1.expect(prompt)
client1.send('DROP TABLE 01070_window_view_watch_events.mt;')
client1.expect(prompt)
client1.send('DROP DATABASE IF EXISTS 01070_window_view_watch_events;')
client1.expect(prompt)

View File

@ -1,5 +0,0 @@
CREATE TEMPORARY TABLE t (x DateTime);
INSERT INTO t VALUES (now());
SYSTEM SUSPEND FOR 1 SECOND;
INSERT INTO t VALUES (now());
SELECT max(x) - min(x) >= 1 FROM t;

View File

@ -0,0 +1,4 @@
before DETACH TABLE
500
after DETACH TABLE
500

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS mem_part_flush;
CREATE TABLE mem_part_flush
(
`key` UInt32,
`ts` DateTime,
`db_time` DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY (key, ts)
SETTINGS min_rows_for_compact_part = 1000000, min_bytes_for_compact_part = 200000000, in_memory_parts_enable_wal = 0;
INSERT INTO mem_part_flush(key, ts) SELECT number % 1000, now() + intDiv(number,1000) FROM numbers(500);
SELECT 'before DETACH TABLE';
SELECT count(*) FROM mem_part_flush;
DETACH TABLE mem_part_flush;
ATTACH TABLE mem_part_flush;
SELECT 'after DETACH TABLE';
SELECT count(*) FROM mem_part_flush;
DROP TABLE mem_part_flush;