本篇文章为大家展示了如何利用sqlmapapi发起扫描,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
sqlmap可谓是sql注入探测的神器,但是利用sqlmap测试SQL注入的效率很低,每一个url都需要手动测试。sqlmap的开发者新加了sqlmapapi.py,可以直接通过接口调用来操作,简化了sqlmap命令执行方式。
sqlmap api分为服务端和客户端,sqlmap api有两种模式,一种是基于HTTP协议的接口模式,一种是基于命令行的接口模式。
sqlmap源码下载地址:https://github.com/sqlmapproject/sqlmap/
python sqlmapapi.py -h
无论是基于HTTP协议的接口模式还是基于命令行的接口模式,首先都是需要开启api服务端的。通过输入以下命令即可开启api服务端:python sqlmapapi.py -s
命令成功后,在命令行中会返回一些信息。以下命令大概的意思是api服务端在本地8775端口上运行,admin token为c6bbb0c1f86b7d7bc2ed6ce3e3bbdcb5等
但是通过上面的这种方式开启api服务端有一个缺点,当服务端和客户端不是一台主机会连接不上,因此如果要解决这个问题,可以通过输入以下命令来开启api服务端:python sqlmapapi.py -s -H "0.0.0.0" -p 8775
命令成功后,远程客户端就可以通过指定远程主机IP和端口来连接到API服务端。
python sqlmapapi.py -c
如果是客户端和服务端不是同一台计算机的话,输入以下命令:
python sqlmapapi.py -c -H "192.168.1.101" -p 8775
help 显示帮助信息new ARGS 开启一个新的扫描任务use TASKID 切换taskid data 获取当前任务返回的数据log 获取当前任务的扫描日志status 获取当前任务的扫描状态option OPTION 获取当前任务的选项options 获取当前任务的所有配置信息stop 停止当前任务 kill 杀死当前任务list 显示所有任务列表flush 清空所有任务exit 退出客户端
3.3.1.new命令
new -u "url"
实例:new -u "http://www.baidu.com"
虽然我们仅仅只指定了-u参数,但是从返回的信息中可以看出,输入new命令后,首先先请求了/task/new,来创建一个新的taskid,后又发起了一个请求去开始任务,因此可以发现该模式实质也是基于HTTP协议的。
3.3.2. status 命令
获取该任务的扫描状态,若返回内容中的status字段为terminated,说明扫描完成,若返回内容中的status字段为run,说明扫描还在进行中。下图是扫描完成的截图:
3.3.3. data 命令
获取扫描完成后注入出来的信息,若返回的内容中data字段不为空就说明存在注入。下图是存在SQL注入返回的内容,可以看到返回的内容有数据库类型、payload、注入的参数等等。
简单介绍下sqlmapapi.py的h基于http接口调用模式的主要函数,进入到lib/utils/api.py的server类,可以发现通过向server提交数据进行与服务的交互。 一共分为3种类型。
Users' methods 用户方法
Admin function 管理函数
sqlmap core interact functions 核心交互函数
可以提交数据的种类如下:
@get("/task/new")
@get("/task/new")
def task_new():
"""
Create a new task
"""
taskid = encodeHex(os.urandom(8), binary=False)
remote_addr = request.remote_addr
DataStore.tasks[taskid] = Task(taskid, remote_addr)
logger.debug("Created new task: '%s'" % taskid)
return jsonize({"success": True, "taskid": taskid})
@get("/task/delete")
@get("/task/<taskid>/delete")
def task_delete(taskid):
"""
Delete an existing task
"""
if taskid in DataStore.tasks:
DataStore.tasks.pop(taskid)
logger.debug("(%s) Deleted task" % taskid)
return jsonize({"success": True})
else:
response.status = 404
logger.warning("[%s] Non-existing task ID provided to task_delete()" % taskid)
return jsonize({"success": False, "message": "Non-existing task ID"})
@get("/option/list")
@post("/option/get")
@post("/option/set")
@post("/option/<taskid>/set")
def option_set(taskid):
"""
Set value of option(s) for a certain task ID
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to option_set()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if request.json is None:
logger.warning("[%s] Invalid JSON options provided to option_set()" % taskid)
return jsonize({"success": False, "message": "Invalid JSON options"})
for option, value in request.json.items():
DataStore.tasks[taskid].set_option(option, value)
logger.debug("(%s) Requested to set options" % taskid)
return jsonize({"success": True})
@post("/scan/start")
@post("/scan/<taskid>/start")
def scan_start(taskid):
"""
Launch a scan
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_start()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if request.json is None:
logger.warning("[%s] Invalid JSON options provided to scan_start()" % taskid)
return jsonize({"success": False, "message": "Invalid JSON options"})
# Initialize sqlmap engine's options with user's provided options, if any
for option, value in request.json.items():
DataStore.tasks[taskid].set_option(option, value)
# Launch sqlmap engine in a separate process
DataStore.tasks[taskid].engine_start()
logger.debug("(%s) Started scan" % taskid)
return jsonize({"success": True, "engineid": DataStore.tasks[taskid].engine_get_id()})
@get("/scan/stop")
@get("/scan/<taskid>/stop")
def scan_stop(taskid):
"""
Stop a scan
"""
if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()):
logger.warning("[%s] Invalid task ID provided to scan_stop()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
DataStore.tasks[taskid].engine_stop()
logger.debug("(%s) Stopped scan" % taskid)
return jsonize({"success": True})
@get("/scan/kill")
@get("/scan/<taskid>/kill")
def scan_kill(taskid):
"""
Kill a scan
"""
if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()):
logger.warning("[%s] Invalid task ID provided to scan_kill()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
DataStore.tasks[taskid].engine_kill()
logger.debug("(%s) Killed scan" % taskid)
return jsonize({"success": True})
@get("/scan/status")
@get("/scan/<taskid>/status")
def scan_status(taskid):
"""
Returns status of a scan
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_status()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if DataStore.tasks[taskid].engine_process() is None:
status = "not running"
else:
status = "terminated" if DataStore.tasks[taskid].engine_has_terminated() is True else "running"
logger.debug("(%s) Retrieved scan status" % taskid)
return jsonize({
"success": True,
"status": status,
"returncode": DataStore.tasks[taskid].engine_get_returncode()
})
@get("/scan/data")
@get("/scan/<taskid>/data")
def scan_data(taskid):
"""
Retrieve the data of a scan
"""
json_data_message = list()
json_errors_message = list()
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
# Read all data from the IPC database for the taskid
for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)})
# Read all error messages from the IPC database
for error in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_errors_message.append(error)
logger.debug("(%s) Retrieved scan data and error messages" % taskid)
return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})
@get("/scan/log")
@get("/download/")
@get("/admin/list")
@get("/admin/list")
@get("/admin/<token>/list")
def task_list(token=None):
"""
Pull task list
"""
tasks = {}
for key in DataStore.tasks:
if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
tasks[key] = dejsonize(scan_status(key))["status"]
logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})
@get("/admin//flush")
@get("/admin/flush")
@get("/admin/<token>/flush")
def task_flush(token=None):
"""
Flush task spool (delete all tasks)
"""
for key in list(DataStore.tasks):
if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
DataStore.tasks[key].engine_kill()
del DataStore.tasks[key]
logger.debug("(%s) Flushed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
return jsonize({"success": True})
从sqlmapapi.py文件分析,提取调用关系。不难发现这些操作可以完全满足我们的测试需求,因此利用这些就可以批量了。
sqlmapapi.py很方便的提供了http请求入口,但是使用起来只能得到最终是否注入的结果,每个接口进行注入扫描时具体发起了什么样的请求,多少个请求就难以得到,下面分享一下个人梳理sqlmap源码后记录的流程图。从图中可以定位到具体发起payload级别的请求位置,从而想要获取发起了什么样的请求,多少个请求,只需要在此处增加自定义代码即可。
上述内容就是如何利用sqlmapapi发起扫描,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://www.freebuf.com/articles/web/263144.html