zoukankan      html  css  js  c++  java
  • Python paramiko 修改源码实现用户命令抓取

    paramiko 源码修改

      paramiko主要用来实现ssh客户端、服务端链接,上一节我们说到了堡垒机,堡垒机内有一个需求是“用户行为审计”,在这里我们就可以通过修改paramiko内文件的源码来实现相关要求。

    paramiko 源码包安装测试
    • 下载地址https://github.com/ 下载源码包paramiko
    • 解压源码包 paramiko-master.zip
    • 使用目录:paramiko-masterparamiko-masterdemos
    • python3启动:python paramiko-masterparamiko-masterdemosdemos.py

    :paramiko demos.py启动 不支持python3 ,由于python3默认是bays类型,需要修改源码。

    demosinteractive.py",
     line 84, in writeall
        sys.stdout.write(data)
    TypeError: write() argument must be str, not bytes
    修改源代码(demos.py 添加 python3 支持)
    :interactive.py文件内(77~85 行)
    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write('
    *** EOF ***
    
    ')
                sys.stdout.flush()
                break
          # 修改为编译为UTF-8
            sys.stdout.write(data.decode())
            sys.stdout.flush()
    C:UsersAdministratordemos>python3 demo.py
    Hostname: 192.168.1.100
    *** Unable to open host keys file
    *** WARNING: Unknown host key!
    Username [Administrator]: root
    Auth by (p)assword, (r)sa key, or (d)ss key? [p] p
    Password for root@192.168.1.100:
    *** Here we go!
    
    Line-buffered terminal emulation. Press F6 or ^Z to send EOF.
    
    Last login: Mon Jan 22 11:24:35 2018 from 192.168.1.150
    
    [root@localhost ~]# 
    注:登陆成功
    测试使用

    修改源码(实现抓取用户操作指令)

    注:paramiko-masterparamiko-masterdemosdemos.py 文件源码、简要说明
    import base64
    from binascii import hexlify
    import getpass
    import os
    import select
    import socket
    import sys
    import time
    import traceback
    from paramiko.py3compat import input
    
    import paramiko
    try:
        import interactive
    except ImportError:
        from . import interactive
    
    
    def agent_auth(transport, username):
        """
        Attempt to authenticate to the given transport using any of the private
        keys available from an SSH agent.
        """
        
        agent = paramiko.Agent()
        agent_keys = agent.get_keys()
        if len(agent_keys) == 0:
            return
            
        for key in agent_keys:
            # 尝试将本地key验证,验证不了就用密码
            print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint()))
            try:
                transport.auth_publickey(username, key)
                print('... success!')
                return
            except paramiko.SSHException:
                print('... nope.')
    
    
    def manual_auth(username, hostname):
        default_auth = 'p'
        auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
        if len(auth) == 0:
            auth = default_auth
    
        # r非对称秘钥
        if auth == 'r':
            default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
            path = input('RSA key [%s]: ' % default_path)
            if len(path) == 0:
                path = default_path
            try:
                key = paramiko.RSAKey.from_private_key_file(path)
            except paramiko.PasswordRequiredException:
                password = getpass.getpass('RSA key password: ')
                key = paramiko.RSAKey.from_private_key_file(path, password)
            t.auth_publickey(username, key)
        # 使用DSA 秘钥
        elif auth == 'd':
            default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa')
            path = input('DSS key [%s]: ' % default_path)
            if len(path) == 0:
                path = default_path
            try:
                key = paramiko.DSSKey.from_private_key_file(path)
            except paramiko.PasswordRequiredException:
                password = getpass.getpass('DSS key password: ')
                key = paramiko.DSSKey.from_private_key_file(path, password)
            t.auth_publickey(username, key)
        else:
            pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
            t.auth_password(username, pw)
    
    
    # setup logging
    paramiko.util.log_to_file('demo.log')
    
    username = ''
    if len(sys.argv) > 1:
        hostname = sys.argv[1]
        if hostname.find('@') >= 0:
            username, hostname = hostname.split('@')
    else:
        # 输入IP
        hostname = input('Hostname: ')
    if len(hostname) == 0:
        # 不输入打印错误
        print('*** Hostname required.')
        sys.exit(1)
    # 端口号22
    port = 22
    if hostname.find(':') >= 0:
        hostname, portstr = hostname.split(':')
        port = int(portstr)
    
    # now connect
    try:
        # 创建socket对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 链接IP端口号
        sock.connect((hostname, port))
    except Exception as e:
        # 登陆错误就报错
        print('*** Connect failed: ' + str(e))
        # 打印错误行
        traceback.print_exc()
        sys.exit(1)
    
    try:
        # ssh实例化传入socket
        t = paramiko.Transport(sock)
        try:
            t.start_client()
        except paramiko.SSHException:
            print('*** SSH negotiation failed.')
            sys.exit(1)
    
        try:
            keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
        except IOError:
            try:
                # 生成rsa 防止证书校验
                keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))
            except IOError:
                print('*** Unable to open host keys file')
                keys = {}
    
        # check server's host key -- this is important.
        key = t.get_remote_server_key()
        if hostname not in keys:
            print('*** WARNING: Unknown host key!')
        elif key.get_name() not in keys[hostname]:
            print('*** WARNING: Unknown host key!')
        elif keys[hostname][key.get_name()] != key:
            print('*** WARNING: Host key has changed!!!')
            sys.exit(1)
        else:
            print('*** Host key OK.')
    
        # get username
        if username == '':
            # getuser取出当前用户名
            default_username = getpass.getuser()
            username = input('Username [%s]: ' % default_username)
            if len(username) == 0:
                username = default_username
    
        agent_auth(t, username)
        # key验证 没通过继续往下走
        if not t.is_authenticated():
            manual_auth(username, hostname)
        if not t.is_authenticated():
            print('*** Authentication failed. :(')
            # 认证成功就关闭
            t.close()
            sys.exit(1)
    
        # ssh_session交互
        chan = t.open_session()
        # get终端  vt100 终端模式
        chan.get_pty()
        chan.invoke_shell()
        print('*** Here we go!
    ')
    
        # 交互
        # chan是双方打开的一个通道
     # 调用interactive文件
        interactive.interactive_shell(chan)
        chan.close()
        t.close()
    
    # 表村错误数据关闭链接
    except Exception as e:
        print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))
        traceback.print_exc()
        try:
            t.close()
        except:
            pass
        sys.exit(1)
    文件源码

    注:paramiko-masterparamiko-masterdemosinteractive.py 文件源码修改,实现用户审计

    import socket
    import sys
    from paramiko.py3compat import u
    
    # windows does not have termios...
    try:
        # 不出错代表这是一个linux系统
        import termios
        import tty
        has_termios = True
    except ImportError:
        has_termios = False
    
    
    def interactive_shell(chan):
        if has_termios:
            # linux链接
            posix_shell(chan)
        else:
            windows_shell(chan)
    
    # chan是传入的通道
    def posix_shell(chan):
        import select
        
        oldtty = termios.tcgetattr(sys.stdin)
        try:
            tty.setraw(sys.stdin.fileno())
            tty.setcbreak(sys.stdin.fileno())
            chan.settimeout(0.0)
    
    
    #-----------------1.源码修改---------------------#

    # 创建列表用来存放客户端输入命令 cmd = []
    #----------------------------------------------# while True: # stdin标准输入,stdout标准输出,标准错误 r, w, e = select.select([chan, sys.stdin], [], []) # 如果chan在r里面就代表有数据过来了 if chan in r: try: # 数据过来就收数据 x = u(chan.recv(1024)) # 如果数据等于0,代表断开链接 if len(x) == 0: sys.stdout.write(' *** EOF ') break # 发送数据 sys.stdout.write(x) # 刷新数据 sys.stdout.flush() except socket.timeout: pass # 没进行一次动作就select就执行返回一次 if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break
    #-----------------2.源码修改---------------------#

    # 判断用户是否使用回车键 if x == " ": # 将列表字符拼接 cmd_str="".join(cmd) # 获取出用户执行命令 print("-->",cmd_str) # 清空列表 cmd = [] else: # 没有回车就将字符添加列表内 cmd.append(x)
    #----------------------------------------------# chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) # thanks to Mike Looijmans for this code def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF. ") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write(' *** EOF *** ') sys.stdout.flush() break sys.stdout.write(data.decode()) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass
  • 相关阅读:
    IDEA调试快捷键
    视频预览
    文件上传:简单的demo
    Java 运行时优化
    Java 类加载
    Java StringTable
    Java 为什么不用Vector
    C++ 查找函数
    JVM 垃圾回收
    JVM 直接内存
  • 原文地址:https://www.cnblogs.com/xiangsikai/p/8337344.html
Copyright © 2011-2022 走看看