mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #28354 from qoega/rework-kafka-topic-creation
This commit is contained in:
commit
8a269d64d2
@ -13,7 +13,6 @@ import avro.io
|
||||
import avro.datafile
|
||||
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
|
||||
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
|
||||
from confluent_kafka import admin
|
||||
|
||||
import kafka.errors
|
||||
import pytest
|
||||
@ -71,6 +70,43 @@ def get_kafka_producer(port, serializer, retries):
|
||||
def producer_serializer(x):
|
||||
return x.encode() if isinstance(x, str) else x
|
||||
|
||||
def kafka_create_topic(admin_client, topic_name, num_partitions=1, replication_factor=1, max_retries=50, config=None):
|
||||
logging.debug(f"Kafka create topic={topic_name}, num_partitions={num_partitions}, replication_factor={replication_factor}")
|
||||
topics_list = [NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor, topic_configs=config)]
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
admin_client.create_topics(new_topics=topics_list, validate_only=False)
|
||||
logging.debug("Admin client succeed")
|
||||
return
|
||||
except Exception as e:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries < max_retries:
|
||||
logging.warning(f"Failed to create topic {e}")
|
||||
else:
|
||||
raise
|
||||
|
||||
def kafka_delete_topic(admin_client, topic, max_retries=50):
|
||||
result = admin_client.delete_topics([topic])
|
||||
for (topic, e) in result.topic_error_codes:
|
||||
if e == 0:
|
||||
logging.debug(f"Topic {topic} deleted")
|
||||
else:
|
||||
logging.error(f"Failed to delete topic {topic}: {e}")
|
||||
|
||||
retries = 0
|
||||
while True:
|
||||
topics_listed = admin_client.list_topics()
|
||||
logging.debug(f"TOPICS LISTED: {topics_listed}")
|
||||
if topic not in topics_listed:
|
||||
return
|
||||
else:
|
||||
retries += 1
|
||||
time.sleep(0.5)
|
||||
if retries > max_retries:
|
||||
raise Exception(f"Failed to delete topics {topic}, {result}")
|
||||
|
||||
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
|
||||
logging.debug("kafka_produce server:{}:{} topic:{}".format("localhost", kafka_cluster.kafka_port, topic))
|
||||
producer = get_kafka_producer(kafka_cluster.kafka_port, producer_serializer, retries)
|
||||
@ -289,6 +325,7 @@ def test_kafka_json_as_string(kafka_cluster):
|
||||
|
||||
def test_kafka_formats(kafka_cluster):
|
||||
schema_registry_client = CachedSchemaRegistryClient('http://localhost:{}'.format(kafka_cluster.schema_registry_port))
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
# data was dumped from clickhouse itself in a following manner
|
||||
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'
|
||||
@ -606,7 +643,7 @@ def test_kafka_formats(kafka_cluster):
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
logging.debug(('Checking {}'.format(format_name)))
|
||||
topic_name = 'format_tests_{}'.format(format_name)
|
||||
topic_name = f'format_tests_{format_name}'
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2]
|
||||
result = instance.query('SELECT * FROM test.kafka_{format_name}_mv;'.format(format_name=format_name))
|
||||
@ -630,7 +667,7 @@ def test_kafka_formats(kafka_cluster):
|
||||
0 0 AM 0.5 1 {topic_name} 0 {offset_2}
|
||||
'''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2])
|
||||
assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name)
|
||||
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
# Since everything is async and shaky when receiving messages from Kafka,
|
||||
# we may want to try and check results multiple times in a loop.
|
||||
@ -771,11 +808,10 @@ def test_kafka_issue4116(kafka_cluster):
|
||||
def test_kafka_consumer_hang(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="consumer_hang", num_partitions=8, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "consumer_hang"
|
||||
kafka_create_topic(admin_client, topic_name, num_partitions=8)
|
||||
|
||||
instance.query('''
|
||||
instance.query(f'''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
DROP TABLE IF EXISTS test.consumer;
|
||||
@ -783,8 +819,8 @@ def test_kafka_consumer_hang(kafka_cluster):
|
||||
CREATE TABLE test.kafka (key UInt64, value UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = 'consumer_hang',
|
||||
kafka_group_name = 'consumer_hang',
|
||||
kafka_topic_list = '{topic_name}',
|
||||
kafka_group_name = '{topic_name}',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_num_consumers = 8;
|
||||
CREATE TABLE test.view (key UInt64, value UInt64) ENGINE = Memory();
|
||||
@ -818,13 +854,28 @@ def test_kafka_consumer_hang(kafka_cluster):
|
||||
# 'dr'||'op' to avoid self matching
|
||||
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
|
||||
|
||||
# cleanup unread messages so kafka will not wait reading consumers to delete topic
|
||||
instance.query(f'''
|
||||
CREATE TABLE test.kafka (key UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = '{topic_name}',
|
||||
kafka_group_name = '{topic_name}',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_num_consumers = 8;
|
||||
''')
|
||||
|
||||
num_read = int(instance.query('SELECT count() FROM test.kafka'))
|
||||
logging.debug(f"read {num_read} from {topic_name} before delete")
|
||||
instance.query('DROP TABLE test.kafka')
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_consumer_hang2(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="consumer_hang2", num_partitions=1, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "consumer_hang2"
|
||||
kafka_create_topic(admin_client, topic_name)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
@ -864,6 +915,7 @@ def test_kafka_consumer_hang2(kafka_cluster):
|
||||
# from a user perspective: we expect no hanging 'drop' queries
|
||||
# 'dr'||'op' to avoid self matching
|
||||
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_csv_with_delimiter(kafka_cluster):
|
||||
@ -916,21 +968,21 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
|
||||
|
||||
def test_kafka_select_empty(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="empty", num_partitions=1, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "empty"
|
||||
kafka_create_topic(admin_client, topic_name)
|
||||
|
||||
instance.query('''
|
||||
instance.query(f'''
|
||||
CREATE TABLE test.kafka (key UInt64)
|
||||
ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka1:19092',
|
||||
kafka_topic_list = 'empty',
|
||||
kafka_group_name = 'empty',
|
||||
kafka_topic_list = '{topic_name}',
|
||||
kafka_group_name = '{topic_name}',
|
||||
kafka_format = 'TSV',
|
||||
kafka_row_delimiter = '\\n';
|
||||
''')
|
||||
|
||||
assert int(instance.query('SELECT count() FROM test.kafka')) == 0
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_json_without_delimiter(kafka_cluster):
|
||||
@ -1153,9 +1205,8 @@ def test_kafka_recreate_kafka_table(kafka_cluster):
|
||||
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="recreate_kafka_table", num_partitions=6, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "recreate_kafka_table"
|
||||
kafka_create_topic(admin_client, topic_name, num_partitions=6)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -1211,6 +1262,7 @@ def test_kafka_recreate_kafka_table(kafka_cluster):
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_librdkafka_compression(kafka_cluster):
|
||||
@ -1253,12 +1305,9 @@ def test_librdkafka_compression(kafka_cluster):
|
||||
logging.debug(('Check compression {}'.format(compression_type)))
|
||||
|
||||
topic_name = 'test_librdkafka_compression_{}'.format(compression_type)
|
||||
admin_client = admin.AdminClient({'bootstrap.servers': "localhost:{}".format(kafka_cluster.kafka_port)})
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic = admin.NewTopic(topic=topic_name, num_partitions=1, replication_factor=1, config={
|
||||
'compression.type': compression_type,
|
||||
})
|
||||
admin_client.create_topics(new_topics=[topic], validate_only=False)
|
||||
kafka_create_topic(admin_client, topic_name, config={'compression.type': compression_type})
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (key UInt64, value String)
|
||||
@ -1281,6 +1330,8 @@ def test_librdkafka_compression(kafka_cluster):
|
||||
|
||||
instance.query('DROP TABLE test.kafka SYNC')
|
||||
instance.query('DROP TABLE test.consumer SYNC')
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_materialized_view_with_subquery(kafka_cluster):
|
||||
instance.query('''
|
||||
@ -1652,11 +1703,9 @@ def test_kafka_commit_on_block_write(kafka_cluster):
|
||||
|
||||
def test_kafka_virtual_columns2(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="virt2_0", num_partitions=2, replication_factor=1))
|
||||
topic_list.append(NewTopic(name="virt2_1", num_partitions=2, replication_factor=1))
|
||||
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
kafka_create_topic(admin_client, "virt2_0", num_partitions=2)
|
||||
kafka_create_topic(admin_client, "virt2_1", num_partitions=2)
|
||||
|
||||
instance.query('''
|
||||
CREATE TABLE test.kafka (value UInt64)
|
||||
@ -1715,14 +1764,16 @@ def test_kafka_virtual_columns2(kafka_cluster):
|
||||
|
||||
assert TSV(result) == TSV(expected)
|
||||
|
||||
kafka_delete_topic(admin_client, "virt2_0")
|
||||
kafka_delete_topic(admin_client, "virt2_1")
|
||||
|
||||
|
||||
def test_kafka_produce_key_timestamp(kafka_cluster):
|
||||
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="insert3", num_partitions=1, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "insert3"
|
||||
kafka_create_topic(admin_client, topic_name)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -1774,12 +1825,13 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
|
||||
|
||||
assert TSV(result) == TSV(expected)
|
||||
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_flush_by_time(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="flush_by_time", num_partitions=1, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "flush_by_time"
|
||||
kafka_create_topic(admin_client, topic_name)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -1833,6 +1885,7 @@ def test_kafka_flush_by_time(kafka_cluster):
|
||||
''')
|
||||
|
||||
assert TSV(result) == TSV('1 1')
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_flush_by_block_size(kafka_cluster):
|
||||
@ -1892,13 +1945,11 @@ def test_kafka_flush_by_block_size(kafka_cluster):
|
||||
assert int(
|
||||
result) == 100, 'Messages from kafka should be flushed when block of size kafka_max_block_size is formed!'
|
||||
|
||||
|
||||
def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="topic_with_multiple_partitions2", num_partitions=10, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "topic_with_multiple_partitions2"
|
||||
kafka_create_topic(admin_client, topic_name, num_partitions=10)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.view;
|
||||
@ -1938,6 +1989,7 @@ def test_kafka_lot_of_partitions_partial_commit_of_bulk(kafka_cluster):
|
||||
DROP TABLE test.consumer;
|
||||
DROP TABLE test.view;
|
||||
''')
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_rebalance(kafka_cluster):
|
||||
@ -1964,9 +2016,8 @@ def test_kafka_rebalance(kafka_cluster):
|
||||
# time.sleep(2)
|
||||
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="topic_with_multiple_partitions", num_partitions=11, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "topic_with_multiple_partitions"
|
||||
kafka_create_topic(admin_client, topic_name, num_partitions=11)
|
||||
|
||||
cancel = threading.Event()
|
||||
|
||||
@ -2070,6 +2121,7 @@ def test_kafka_rebalance(kafka_cluster):
|
||||
kafka_thread.join()
|
||||
|
||||
assert result == 1, 'Messages from kafka get duplicated!'
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
|
||||
@ -2645,6 +2697,7 @@ def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_clus
|
||||
def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
# data was dumped from clickhouse itself in a following manner
|
||||
# clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g'
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
all_formats = {
|
||||
## Text formats ##
|
||||
@ -2814,8 +2867,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
|
||||
topic_name_prefix = 'format_tests_4_stream_'
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
print(('Set up {}'.format(format_name)))
|
||||
topic_name = topic_name_prefix + '{}'.format(format_name)
|
||||
logging.debug(f'Set up {format_name}')
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
data_sample = format_opts['data_sample']
|
||||
data_prefix = []
|
||||
raw_message = '_raw_message'
|
||||
@ -2855,8 +2908,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
extra_settings=format_opts.get('extra_settings') or ''))
|
||||
|
||||
for format_name, format_opts in list(all_formats.items()):
|
||||
print(('Checking {}'.format(format_name)))
|
||||
topic_name = topic_name_prefix + '{}'.format(format_name)
|
||||
logging.debug('Checking {format_name}')
|
||||
topic_name = f"{topic_name_prefix}{format_name}"
|
||||
# shift offsets by 1 if format supports empty value
|
||||
offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2]
|
||||
result = instance.query('SELECT * FROM test.kafka_data_{format_name}_mv;'.format(format_name=format_name))
|
||||
@ -2886,6 +2939,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
|
||||
# print(errors_result.strip())
|
||||
# print(errors_expected.strip())
|
||||
assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name)
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):
|
||||
retries = 0
|
||||
@ -2901,14 +2955,12 @@ def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):
|
||||
raise Exception("No new data :(")
|
||||
|
||||
def test_kafka_consumer_failover(kafka_cluster):
|
||||
|
||||
# for backporting:
|
||||
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
|
||||
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
|
||||
|
||||
topic_list = []
|
||||
topic_list.append(NewTopic(name="kafka_consumer_failover", num_partitions=2, replication_factor=1))
|
||||
admin_client.create_topics(new_topics=topic_list, validate_only=False)
|
||||
topic_name = "kafka_consumer_failover"
|
||||
kafka_create_topic(admin_client, topic_name, num_partitions=2)
|
||||
|
||||
instance.query('''
|
||||
DROP TABLE IF EXISTS test.kafka;
|
||||
@ -3019,6 +3071,7 @@ def test_kafka_consumer_failover(kafka_cluster):
|
||||
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':8,'value': 8}), partition=1)
|
||||
producer.flush()
|
||||
prev_count = wait_for_new_data('test.destination', prev_count)
|
||||
kafka_delete_topic(admin_client, topic_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -46,6 +46,8 @@ def test_simple_select(started_cluster):
|
||||
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
|
||||
|
||||
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
|
||||
node.query("DROP TABLE simple_mongo_table")
|
||||
simple_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
|
||||
@ -67,6 +69,8 @@ def test_complex_data_type(started_cluster):
|
||||
assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + '\n'
|
||||
|
||||
assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n'
|
||||
node.query("DROP TABLE incomplete_mongo_table")
|
||||
incomplete_mongo_table.drop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
|
||||
@ -95,7 +99,9 @@ def test_incorrect_data_type(started_cluster):
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query("SELECT bbbb FROM strange_mongo_table2")
|
||||
|
||||
node.query("DROP TABLE strange_mongo_table")
|
||||
node.query("DROP TABLE strange_mongo_table2")
|
||||
strange_mongo_table.drop()
|
||||
|
||||
@pytest.mark.parametrize('started_cluster', [True], indirect=['started_cluster'])
|
||||
def test_secure_connection(started_cluster):
|
||||
@ -116,3 +122,5 @@ def test_secure_connection(started_cluster):
|
||||
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
|
||||
|
||||
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
|
||||
node.query("DROP TABLE simple_mongo_table")
|
||||
simple_mongo_table.drop()
|
||||
|
Loading…
Reference in New Issue
Block a user