Skip to content

Commit 4267d92

Browse files
committed
[digger] add checker
1 parent fe0efd3 commit 4267d92

File tree

5 files changed

+623
-0
lines changed

5 files changed

+623
-0
lines changed

checkers/digger/api.py

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env python3
2+
3+
import contextlib
4+
from typing import Iterator
5+
6+
import minipwn as pwn
7+
8+
9+
class API:
10+
def __init__(self, io: pwn.remote):
11+
self.io = io
12+
13+
def register(self, username: bytes, password: bytes, secret: bytes) -> bytes:
14+
self.io.sendlineafter(b'> ', b'REGISTER')
15+
16+
self.io.sendlineafter(b': ', username)
17+
self.io.sendlineafter(b': ', password)
18+
self.io.sendlineafter(b': ', secret)
19+
20+
return self.io.recvline().strip()
21+
22+
def login(self, username: bytes, password: bytes) -> bytes:
23+
self.io.sendlineafter(b'> ', b'LOGIN')
24+
25+
self.io.sendlineafter(b': ', username)
26+
self.io.sendlineafter(b': ', password)
27+
28+
return self.io.recvline().strip()
29+
30+
def logout(self) -> bytes:
31+
self.io.sendlineafter(b'> ', b'LOGOUT')
32+
33+
return self.io.recvline().strip()
34+
35+
def encrypt(self, plaintext: bytes, username: bytes = None) -> bytes:
36+
self.io.sendlineafter(b'> ', b'ENCRYPT')
37+
38+
if username is not None:
39+
self.io.sendlineafter(b': ', username)
40+
41+
self.io.sendlineafter(b': ', plaintext.hex().encode())
42+
43+
line = self.io.recvline().strip()
44+
45+
if line.startswith(b'error'):
46+
return line
47+
48+
ciphertext = self.io.recvline().strip().decode()
49+
50+
return bytes.fromhex(ciphertext)
51+
52+
def decrypt(self, ciphertext: bytes) -> bytes:
53+
self.io.sendlineafter(b'> ', b'DECRYPT')
54+
55+
self.io.sendlineafter(b': ', ciphertext.hex().encode())
56+
57+
line = self.io.recvline().strip()
58+
59+
if line.startswith(b'error'):
60+
return line
61+
62+
plaintext = self.io.recvline().strip().decode()
63+
64+
return bytes.fromhex(plaintext)
65+
66+
def exit(self) -> bytes:
67+
self.io.sendlineafter(b'> ', b'EXIT')
68+
69+
return self.io.recvline().strip()
70+
71+
72+
@contextlib.contextmanager
73+
def connect(hostname: str, port: int = 17171) -> Iterator[API]:
74+
io = pwn.remote(hostname, port)
75+
api = API(io)
76+
77+
try:
78+
yield api
79+
finally:
80+
io.s.close()

checkers/digger/checker.py

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
#!/usr/bin/env python3
2+
3+
import sys
4+
import socket
5+
import string
6+
import secrets
7+
from typing import Any, Tuple, List
8+
9+
import checklib
10+
11+
import api
12+
import des
13+
14+
15+
def random_username(length: int = None) -> str:
16+
if length is None:
17+
length = 10 + secrets.randbelow(20)
18+
19+
alpha = string.ascii_letters + string.digits
20+
21+
return ''.join(secrets.choice(alpha) for _ in range(length))
22+
23+
24+
def random_password() -> str:
25+
length = 8
26+
alpha = string.ascii_letters + string.digits + string.punctuation
27+
28+
return ''.join(secrets.choice(alpha) for _ in range(length))
29+
30+
31+
def random_secret(length: int = None) -> str:
32+
if length is None:
33+
length = 32 + secrets.randbelow(64)
34+
35+
return secrets.token_urlsafe(length)[:length]
36+
37+
38+
def random_plaintext(blocks_count: int = None) -> List[bytes]:
39+
if blocks_count is None:
40+
blocks_count = 4 + secrets.randbelow(16)
41+
42+
plaintext = []
43+
44+
while len(plaintext) < blocks_count:
45+
block = secrets.token_bytes(8)
46+
47+
if b'error' not in block:
48+
plaintext.append(block)
49+
50+
return plaintext
51+
52+
53+
def des_encrypt(key: bytes, plaintext: List[bytes]) -> bytes:
54+
cipher = des.DES(key)
55+
ciphertext = []
56+
57+
for block in plaintext:
58+
ciphertext.append(cipher.encrypt(block))
59+
60+
return b''.join(ciphertext)
61+
62+
63+
class Checker(checklib.BaseChecker):
64+
vulns: int = 1
65+
timeout: int = 20
66+
uses_attack_data: bool = True
67+
68+
def __init__(self, *args, **kwargs):
69+
super(Checker, self).__init__(*args, **kwargs)
70+
71+
def action(self, action, *args, **kwargs):
72+
try:
73+
super(Checker, self).action(action, *args, **kwargs)
74+
except self.get_check_finished_exception():
75+
raise
76+
except socket.timeout as ex:
77+
self.cquit(
78+
checklib.Status.DOWN,
79+
'timeout error',
80+
f'timeout error: {ex}',
81+
)
82+
except ConnectionError as ex:
83+
self.cquit(
84+
checklib.Status.DOWN,
85+
'connection error',
86+
f'connection error: {ex}',
87+
)
88+
except socket.error as ex:
89+
self.cquit(
90+
checklib.Status.MUMBLE,
91+
'socket error',
92+
f'socket error: {ex}',
93+
)
94+
95+
def check(self):
96+
username = random_username()
97+
password = random_password()
98+
secret = random_secret()
99+
100+
plaintext1 = random_plaintext()
101+
ciphertext1 = des_encrypt(password.encode(), plaintext1)
102+
103+
plaintext2 = random_plaintext()
104+
ciphertext2 = des_encrypt(password.encode(), plaintext2)
105+
106+
with api.connect(self.host) as client:
107+
response = client.register(
108+
username.encode(),
109+
password.encode(),
110+
secret.encode(),
111+
)
112+
if response.startswith(b'error'):
113+
if b'user already exists' in response:
114+
self.throw_mumble('user already exists', response)
115+
116+
self.throw_mumble('failed to register', response)
117+
118+
client.exit()
119+
120+
with api.connect(self.host) as client:
121+
response = client.login(
122+
username.encode(),
123+
password.encode(),
124+
)
125+
if response.startswith(b'error'):
126+
if b'user does not exist' in response:
127+
self.throw_mumble('user does not exist', response)
128+
129+
self.throw_mumble('failed to login', response)
130+
131+
if not response.endswith(secret.encode()):
132+
self.throw_mumble('invalid secret', response)
133+
134+
def encrypt_scenario():
135+
response = client.encrypt(
136+
b''.join(plaintext1),
137+
)
138+
if response.startswith(b'error'):
139+
self.throw_mumble('failed to encrypt', response)
140+
141+
if response != ciphertext1:
142+
self.throw_mumble('invalid encryption', response)
143+
144+
def decrypt_scenario():
145+
response = client.decrypt(
146+
ciphertext1,
147+
)
148+
if response.startswith(b'error'):
149+
self.throw_mumble('failed to decrypt', response)
150+
151+
if response != b''.join(plaintext1):
152+
self.throw_mumble('invalid decryption', response)
153+
154+
if secrets.randbits(1) == 0:
155+
encrypt_scenario()
156+
decrypt_scenario()
157+
else:
158+
decrypt_scenario()
159+
encrypt_scenario()
160+
161+
client.logout()
162+
client.exit()
163+
164+
with api.connect(self.host) as client:
165+
response = client.encrypt(
166+
b''.join(plaintext2),
167+
username,
168+
)
169+
if response.startswith(b'error'):
170+
self.throw_mumble('failed to encrypt', response)
171+
172+
if response != ciphertext2:
173+
self.throw_mumble('invalid encryption', response)
174+
175+
client.exit()
176+
177+
self.cquit(checklib.Status.OK)
178+
179+
def put(self, flag_id: str, flag: str, vuln: str):
180+
username = random_username()
181+
password = random_password()
182+
183+
with api.connect(self.host) as client:
184+
response = client.register(
185+
username.encode(),
186+
password.encode(),
187+
flag.encode(),
188+
)
189+
if response.startswith(b'error'):
190+
if b'user already exists' in response:
191+
self.throw_mumble('user already exists', response)
192+
193+
self.throw_mumble('failed to register', response)
194+
195+
client.exit()
196+
197+
self.cquit(
198+
checklib.Status.OK,
199+
username,
200+
self.store_flag_id(username, password),
201+
)
202+
203+
def get(self, flag_id: str, flag: str, vuln: str):
204+
username, password = self.load_flag_id(flag_id)
205+
206+
with api.connect(self.host) as client:
207+
response = client.login(
208+
username.encode(),
209+
password.encode(),
210+
)
211+
if response.startswith(b'error'):
212+
if b'user does not exist' in response:
213+
self.throw_corrupt('user does not exist', response)
214+
215+
self.throw_mumble('failed to login', response)
216+
217+
if not response.endswith(flag.encode()):
218+
self.throw_corrupt('invalid secret', response)
219+
220+
client.logout()
221+
client.exit()
222+
223+
self.cquit(checklib.Status.OK)
224+
225+
def throw_mumble(self, message: str, reason: Any):
226+
self.cquit(
227+
checklib.Status.MUMBLE,
228+
message,
229+
f'{message}: {repr(reason)[:128]}',
230+
)
231+
232+
def throw_corrupt(self, message: str, reason: Any):
233+
self.cquit(
234+
checklib.Status.CORRUPT,
235+
message,
236+
f'{message}: {repr(reason)[:128]}',
237+
)
238+
239+
def store_flag_id(self, username: str, password: str) -> str:
240+
return f'{username} {password}'
241+
242+
def load_flag_id(self, flag_id: str) -> Tuple[str, str]:
243+
return flag_id.split(' ')
244+
245+
246+
if __name__ == "__main__":
247+
host = sys.argv[2]
248+
checker = Checker(host)
249+
250+
try:
251+
action = sys.argv[1]
252+
arguments = sys.argv[3:]
253+
254+
checker.action(action, *arguments)
255+
except checker.get_check_finished_exception():
256+
checklib.cquit(
257+
checklib.Status(checker.status),
258+
checker.public,
259+
checker.private,
260+
)

0 commit comments

Comments
 (0)