Automatic style fix

This commit is contained in:
robot-clickhouse 2024-04-20 15:58:25 +00:00 committed by Sema Checherinda
parent 02c9a07778
commit ac27860b49
8 changed files with 128 additions and 56 deletions

View File

@ -15,9 +15,8 @@ void SinkToStorage::onConsume(Chunk chunk)
*/
Nested::validateArraySizes(getHeader().cloneWithColumns(chunk.getColumns()));
setDeduplicationTokenForChildren(chunk);
fillDeduplicationTokenForChildren(chunk);
consume(chunk);
fillDeduplicationTokenForChildren(chunk);
if (!lastBlockIsDuplicate()) // TODO: remove that
cur_chunk = std::move(chunk);
}

View File

@ -25,24 +25,12 @@ protected:
virtual void consume(Chunk & chunk) = 0;
virtual bool lastBlockIsDuplicate() const { return false; }
virtual std::shared_ptr<DedupTokenInfo> setDeduplicationTokenForChildren(Chunk & chunk) const
void fillDeduplicationTokenForChildren(Chunk & chunk) const
{
auto token_info = chunk.getChunkInfos().get<DedupTokenInfo>();
if (token_info)
return token_info;
return;
auto block_dedup_token_for_children = std::make_shared<DedupTokenInfo>("");
chunk.getChunkInfos().add(block_dedup_token_for_children);
return block_dedup_token_for_children;
}
virtual std::shared_ptr<DedupTokenInfo> getDeduplicationTokenForChildren(Chunk & chunk) const
{
return chunk.getChunkInfos().get<DedupTokenInfo>();
}
virtual void fillDeduplicationTokenForChildren(Chunk & chunk) const
{
SipHash hash;
for (const auto & colunm: chunk.getColumns())
{
@ -50,8 +38,9 @@ protected:
}
const auto hash_value = hash.get128();
chunk.getChunkInfos().get<DedupTokenInfo>()->addTokenPart(
fmt::format(":hash-{}", toString(hash_value.items[0]) + "_" + toString(hash_value.items[1])));
chunk.getChunkInfos().add(std::make_shared<DedupTokenInfo>(
fmt::format(":hash-{}", toString(hash_value.items[0]) + "_" + toString(hash_value.items[1]))
));
}
private:

View File

@ -49,6 +49,7 @@ namespace DB
class DedupTokenInfo : public ChunkInfoCloneable<DedupTokenInfo>
{
public:
DedupTokenInfo() = default;
DedupTokenInfo(const DedupTokenInfo & other) = default;
explicit DedupTokenInfo(String first_part)
{
@ -68,9 +69,15 @@ namespace DB
return result;
}
bool empty() const
{
return token_parts.empty();
}
void addTokenPart(String part)
{
token_parts.push_back(std::move(part));
if (!part.empty())
token_parts.push_back(std::move(part));
}
private:

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/StorageMergeTree.h>
#include <Processors/Transforms/NumberBlocksTransform.h>
@ -84,6 +85,7 @@ void MergeTreeSink::consume(Chunk & chunk)
bool support_parallel_write = false;
String block_dedup_token;
std::shared_ptr<DedupTokenInfo> dedub_token_info_for_children = nullptr;
if (storage.getDeduplicationLog())
{
auto token_info = chunk.getChunkInfos().get<DedupTokenInfo>();
@ -102,6 +104,9 @@ void MergeTreeSink::consume(Chunk & chunk)
}
else
{
dedub_token_info_for_children = std::make_shared<DedupTokenInfo>();
chunk.getChunkInfos().add(dedub_token_info_for_children);
LOG_DEBUG(storage.log,
"dedup token from hash is caclulated");
}
@ -126,9 +131,9 @@ void MergeTreeSink::consume(Chunk & chunk)
current_block.block.clear();
current_block.partition.clear();
if (auto children_dedup_token = getDeduplicationTokenForChildren(chunk))
if (dedub_token_info_for_children)
{
children_dedup_token->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash());
dedub_token_info_for_children->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash());
}
/// If optimize_on_insert setting is true, current_block could become empty after merge

View File

@ -41,7 +41,6 @@ private:
struct DelayedChunk;
std::unique_ptr<DelayedChunk> delayed_chunk;
void fillDeduplicationTokenForChildren(Chunk &) const override { /* For MergeTree we get the tokens from part checksums */ }
void finishDelayedChunk();
};

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
@ -293,6 +294,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
}
String block_dedup_token;
std::shared_ptr<DedupTokenInfo> dedub_token_info_for_children = nullptr;
if constexpr (!async_insert)
{
auto token_info = chunk.getChunkInfos().get<DedupTokenInfo>();
@ -314,6 +316,8 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
}
else
{
dedub_token_info_for_children = std::make_shared<DedupTokenInfo>();
chunk.getChunkInfos().add(dedub_token_info_for_children);
LOG_DEBUG(storage.log,
"dedup token from hash is caclulated");
}
@ -382,9 +386,9 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}
if (auto children_dedup_token = getDeduplicationTokenForChildren(chunk))
if (dedub_token_info_for_children)
{
children_dedup_token->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash());
dedub_token_info_for_children->addTokenPart(":block_hash-" + temp_part.part->getPartBlockIDHash());
}
}

View File

@ -139,7 +139,6 @@ private:
/// We can delay processing for previous chunk and start writing a new one.
std::unique_ptr<DelayedChunk> delayed_chunk;
void fillDeduplicationTokenForChildren(Chunk &) const override { /* For MergeTree we get the tokens from part checksums */ }
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
};

View File

@ -22,7 +22,14 @@ def __format(template, **params):
return template.format(**kv_args)
def instance_create_statement(table_name, table_columns, table_keys, table_engine, with_deduplication, no_merges=True):
def instance_create_statement(
table_name,
table_columns,
table_keys,
table_engine,
with_deduplication,
no_merges=True,
):
template = """
CREATE TABLE {table_name}
{table_columns}
@ -37,22 +44,36 @@ def instance_create_statement(table_name, table_columns, table_keys, table_engin
params["table_columns"] = table_columns
params["table_keys"] = table_keys
params["table_no_merges"] = f"SYSTEM STOP MERGES {table_name};" if no_merges else ""
params["table_engine"] = "MergeTree()" if table_engine == "MergeTree" else f"ReplicatedMergeTree('/clickhouse/tables/{{database}}/{table_name}', '1')"
params["table_engine"] = (
"MergeTree()"
if table_engine == "MergeTree"
else f"ReplicatedMergeTree('/clickhouse/tables/{{database}}/{table_name}', '1')"
)
deduplication_window_setting_name = "non_replicated_deduplication_window" if table_engine == "MergeTree" else "replicated_deduplication_window"
deduplication_window_setting_name = (
"non_replicated_deduplication_window"
if table_engine == "MergeTree"
else "replicated_deduplication_window"
)
deduplication_window_setting_value = 1000 if with_deduplication else 0
settings = list()
settings += [f"{deduplication_window_setting_name}={deduplication_window_setting_value}"]
settings += [
f"{deduplication_window_setting_name}={deduplication_window_setting_value}"
]
params["table_settings"] = "SETTINGS " + ",".join(settings)
return __format(template, **params)
def instance_insert_statement(table_name, count, insert_method, insert_unique_blocks, use_insert_token):
insert_settings = "" if not use_insert_token else "SETTINGS insert_deduplication_token='UDT'"
def instance_insert_statement(
table_name, count, insert_method, insert_unique_blocks, use_insert_token
):
insert_settings = (
"" if not use_insert_token else "SETTINGS insert_deduplication_token='UDT'"
)
if insert_method == 'InsertSelect':
if insert_method == "InsertSelect":
template = """
INSERT INTO {table_name}
SELECT {insert_columns}
@ -62,7 +83,9 @@ def instance_insert_statement(table_name, count, insert_method, insert_unique_bl
template,
table_name=table_name,
count=count,
insert_columns="'src_4', 4" if not insert_unique_blocks else "'src_' || toString(number), number",
insert_columns="'src_4', 4"
if not insert_unique_blocks
else "'src_' || toString(number), number",
insert_settings=insert_settings,
)
@ -74,7 +97,9 @@ def instance_insert_statement(table_name, count, insert_method, insert_unique_bl
values = []
for i in range(count):
values += [f"('src_{i}', {i})"] if insert_unique_blocks else ["('src_4', 4)"]
values += (
[f"('src_{i}', {i})"] if insert_unique_blocks else ["('src_4', 4)"]
)
insert_values = ", ".join(values)
return __format(
@ -86,7 +111,9 @@ def instance_insert_statement(table_name, count, insert_method, insert_unique_bl
def get_drop_tables_statements(tables):
return "".join([f"DROP TABLE IF EXISTS {table_name};\n" for table_name in tables[::-1]])
return "".join(
[f"DROP TABLE IF EXISTS {table_name};\n" for table_name in tables[::-1]]
)
def get_logs_statement(args):
@ -94,15 +121,17 @@ def get_logs_statement(args):
return "SET send_logs_level='test';"
return ""
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
raise argparse.ArgumentTypeError("Boolean value expected.")
class ArgsFactory:
def __init__(self, parser):
@ -110,29 +139,55 @@ class ArgsFactory:
def add_opt_engine(self):
self.__parser.add_argument(
"--table-engine", choices=["ReplicatedMergeTree", "MergeTree"], default="MergeTree")
"--table-engine",
choices=["ReplicatedMergeTree", "MergeTree"],
default="MergeTree",
)
def add_opt_user_token(self):
self.__parser.add_argument("--use-insert-token", type=str2bool, nargs='?', const=True, default=False)
self.__parser.add_argument(
"--use-insert-token", type=str2bool, nargs="?", const=True, default=False
)
def add_opt_single_thread(self):
self.__parser.add_argument("--single-thread", type=str2bool, nargs='?', const=True, default=True)
self.__parser.add_argument(
"--single-thread", type=str2bool, nargs="?", const=True, default=True
)
def add_opt_dedup_src(self):
self.__parser.add_argument("--deduplicate-src-table", type=str2bool, nargs='?', const=True, default=True)
self.__parser.add_argument(
"--deduplicate-src-table",
type=str2bool,
nargs="?",
const=True,
default=True,
)
def add_opt_dedup_dst(self):
self.__parser.add_argument("--deduplicate-dst-table", type=str2bool, nargs='?', const=True, default=True)
self.__parser.add_argument(
"--deduplicate-dst-table",
type=str2bool,
nargs="?",
const=True,
default=True,
)
def add_opt_get_logs(self):
self.__parser.add_argument("--get-logs", type=str2bool, nargs='?', const=True, default=False)
self.__parser.add_argument(
"--get-logs", type=str2bool, nargs="?", const=True, default=False
)
def add_opt_uniq_blocks(self):
self.__parser.add_argument("--insert-unique-blocks", type=str2bool, nargs='?', const=True, default=True)
self.__parser.add_argument(
"--insert-unique-blocks", type=str2bool, nargs="?", const=True, default=True
)
def add_opt_insert_method(self):
self.__parser.add_argument(
"--insert-method", choices=["InsertSelect", "InsertValues"], default="InsertSelect")
"--insert-method",
choices=["InsertSelect", "InsertValues"],
default="InsertSelect",
)
def add_all(self):
self.add_opt_engine()
@ -174,10 +229,16 @@ def test_insert_several_blocks(parser):
WHERE b % 2 = 0;
"""
drop_tables_statements = get_drop_tables_statements( ["table_a_b", "table_when_b_even", "mv_b_even"] )
drop_tables_statements = get_drop_tables_statements(
["table_a_b", "table_when_b_even", "mv_b_even"]
)
insert_statement = instance_insert_statement(
"table_a_b", 10, args.insert_method, args.insert_unique_blocks, args.use_insert_token
"table_a_b",
10,
args.insert_method,
args.insert_unique_blocks,
args.use_insert_token,
)
print_details_statements = f"""
@ -190,8 +251,6 @@ def test_insert_several_blocks(parser):
{"" if not args.get_logs else "SELECT _part, count() FROM table_when_b_even GROUP BY _part ORDER BY _part;"}
"""
if args.insert_unique_blocks:
assert_first_insert_statements = f"""
SELECT throwIf( count() != 10 )
@ -278,7 +337,12 @@ def test_mv_generates_several_blocks(parser):
ArgsFactory(parser).add_all()
def calle(args):
tables = ["table_for_join_with", "table_a_b", "table_when_b_even_and_joined", "mv_b_even"]
tables = [
"table_for_join_with",
"table_a_b",
"table_when_b_even_and_joined",
"mv_b_even",
]
drop_tables_statements = get_drop_tables_statements(tables)
details_print_for_table_for_join_with = ""
@ -305,7 +369,11 @@ def test_mv_generates_several_blocks(parser):
)
insert_statement = instance_insert_statement(
"table_a_b", 5, args.insert_method, args.insert_unique_blocks, args.use_insert_token
"table_a_b",
5,
args.insert_method,
args.insert_unique_blocks,
args.use_insert_token,
)
details_print_statements = f"""
@ -449,7 +517,11 @@ def test_several_mv_into_one_table(parser):
)
insert_statement = instance_insert_statement(
"table_src", 8, args.insert_method, args.insert_unique_blocks, args.use_insert_token
"table_src",
8,
args.insert_method,
args.insert_unique_blocks,
args.use_insert_token,
)
details_print_statements = f"""
@ -568,9 +640,7 @@ def parse_args():
test_mv_generates_several_blocks(
subparsers.add_parser("mv_generates_several_blocks")
)
test_several_mv_into_one_table(
subparsers.add_parser("several_mv_into_one_table")
)
test_several_mv_into_one_table(subparsers.add_parser("several_mv_into_one_table"))
args = parser.parse_args()
if args.test is None:
parser.print_help()