generated from C4T-BuT-S4D/ad-boilerplate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchecker.py
executable file
·134 lines (99 loc) · 4.73 KB
/
checker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
import random
import sys
import requests
from checklib import *
from mole_lib import CheckMachine
class Checker(BaseChecker):
vulns: int = 1
timeout: int = 20
uses_attack_data: bool = True
def __init__(self, *args, **kwargs):
super(Checker, self).__init__(*args, **kwargs)
self.c = CheckMachine(self)
def check(self):
for _ in range(random.randint(3, 6)):
random.choice([self._check_scenario1, self._check_scenario2])()
self.cquit(Status.OK)
def put(self, _flag_id: str, flag: str, _vuln: str):
sess = get_initialized_session()
username, password = rnd_username(), rnd_password()
self.c.register(sess, username, password)
secret_id = self.c.add(sess, flag, False)
for _ in range(random.randint(1, 20)):
self.c.add(sess, rnd_string(random.randint(1, 100)), random.choice([True, False]))
self.cquit(Status.OK, f"{username}:{secret_id}", f"{username}:{secret_id}:{password}")
def get(self, flag_id: str, flag: str, vuln: str):
username, secret_id, password = flag_id.split(':')
s1 = get_initialized_session()
self.c.login(s1, username, password, status=Status.CORRUPT)
secret = self.c.get(s1, secret_id, status=Status.CORRUPT)
self.assert_eq(secret.get('author'), username, 'Invalid author', status=Status.CORRUPT)
self.assert_eq(secret.get('content'), flag, 'Invalid content', status=Status.CORRUPT)
self.assert_eq(secret.get('is_public'), False, 'Invalid is_public', status=Status.CORRUPT)
self.cquit(Status.OK)
def _check_scenario1(self):
s1 = get_initialized_session()
username, password = rnd_username(), rnd_password()
self.c.register(s1, username, password)
s2 = get_initialized_session()
self.c.login(s2, username, password)
s3 = get_initialized_session()
other_username, other_password = rnd_username(), rnd_password()
self.c.register(s3, other_username, other_password)
rnds = lambda: random.choice([s1, s2])
secrets = []
for _ in range(10):
content = rnd_string(random.randint(1, 100))
public = random.choice([True, False])
secrets.append((self.c.add(rnds(), content, public), content, public))
for id, content, public in secrets:
secret = self.c.get(rnds(), id)
self.assert_eq(secret.get('author'), username, 'Invalid author')
self.assert_eq(secret.get('is_public'), public, 'Invalid is_public')
self.assert_eq(secret.get('content'), content, 'Invalid content')
if public:
other_secret = self.c.get(s3, id)
self.assert_eq(other_secret.get('author', ''), '', 'Public secret author check')
self.assert_eq(other_secret.get('content'), content, 'Public secret content check')
self.cquit(Status.OK)
def _check_scenario2(self):
s1 = get_initialized_session()
username, password = rnd_username(), rnd_password()
self.c.register(s1, username, password)
s2 = get_initialized_session()
self.c.login(s2, username, password)
rnds = lambda: random.choice([s1, s2])
secrets = []
for _ in range(60):
content = rnd_string(random.randint(1, 100))
secrets.append((self.c.add(rnds(), content, False), content))
secrets.sort(key=lambda x: x[0])
lst = self.c.list(rnds())
self.assert_eq(len(lst), 50, 'Invalid list length')
got_secrets = []
start = ''
for _ in range(2):
lst = self.c.list(rnds(), start=start)
if not lst:
break
got_secrets.extend(lst)
start = lst[-1].get('id')
got_secrets.sort(key=lambda x: x.get('id'))
for need_secret, got_secret in zip(secrets, got_secrets):
self.assert_eq(need_secret[0], got_secret.get('id'), 'Invalid id')
self.assert_eq(need_secret[1], got_secret.get('content'), 'Invalid content')
self.assert_eq(False, got_secret.get('is_public'), 'Invalid is_public')
self.assert_eq(username, got_secret.get('author'), 'Invalid author')
self.cquit(Status.OK)
def action(self, action, *args, **kwargs):
try:
super(Checker, self).action(action, *args, **kwargs)
except requests.exceptions.ConnectionError:
self.cquit(Status.DOWN, 'Connection error', 'Got requests connection error')
if __name__ == '__main__':
c = Checker(sys.argv[2])
try:
c.action(sys.argv[1], *sys.argv[3:])
except c.get_check_finished_exception():
cquit(Status(c.status), c.public, c.private)