前言:
每个公司的网络环境大都划分 办公网络、线上网络,之所以划分的主要原因是为了保证线上操作安全;
对于外部用户而言也只能访问线上网络的特定开放端口,那么是什么控制了用户访问线上网络的呢?
防火墙过滤......!
对于内部员工而言对线上系统日常运维、代码部署如何安全访问线上业务系统呢?如何监控、记录技术人员的操作记录?
堡垒机策略:
1.回收所有远程登录Linux主机的用户名、密码;
2.中间设置堡垒机(保存所有线上Linux主机的用户名、密码);
3.所有技术人员都要通过堡垒机去获取用户名、密码,然后在再去连接 线上系统,并记录操作日志;
堡垒机策略优点:
1.记录用户操作;
2.实现远程操作权限集中管理;
一、堡垒机表结构设计
from django.db import models from django.contrib.auth.models import User # Create your models here. class IDC(models.Model): name = models.CharField(max_length=64,unique=True) def __str__(self): return self.name class Host(models.Model): """存储所有主机信息""" hostname = models.CharField(max_length=64,unique=True) ip_addr = models.GenericIPAddressField(unique=True) port = models.IntegerField(default=22) idc = models.ForeignKey("IDC") #host_groups = models.ManyToManyField("HostGroup") #host_users = models.ManyToManyField("HostUser") enabled = models.BooleanField(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr) class HostGroup(models.Model): """主机组""" name = models.CharField(max_length=64,unique=True) host_user_binds = models.ManyToManyField("HostUserBind") def __str__(self): return self.name class HostUser(models.Model): """存储远程主机的用户信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallIntegerField(choices=auth_type_choices) username = models.CharField(max_length=32) password = models.CharField(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password') class HostUserBind(models.Model): """绑定主机和用户""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user') class SessionLog(models.Model): ''' 记录每个用户登录操作,ID传给 shell生成文件命名 ''' account=models.ForeignKey('Account') host_user_bind=models.ForeignKey('HostUserBind') start_date=models.DateField(auto_now_add=True) end_date=models.DateField(blank=True,null=True) def __str__(self): return '%s-%s'%(self.account,self.host_user_bind) class AuditLog(models.Model): """审计日志""" class Account(models.Model): """堡垒机账户 1. 扩展 2. 继承 user.account.host_user_bind """ user = models.OneToOneField(User) name = models.CharField(max_length=64) host_user_binds = models.ManyToManyField("HostUserBind",blank=True) host_groups = models.ManyToManyField("HostGroup",blank=True)
二、通过堡垒机远程登录Linux主机
2种堡垒机登录方式:
命令行登录堡垒机方式:
方式1:通过 修改open_shh源码扩展-Z option生成唯一 ssh进程,使用Linux的strace 命令对唯一 ssh进程进行检测生成日志文件;
0.用户执行audit_shell出现交互界面,提示用户输入机组和主机;
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","zhanggen_audit.settings") django.setup() #在Django视图之外,调用Django功能设置环境变量! from audit.backend import user_interactive if __name__ == '__main__': shell_obj=user_interactive.UserShell(sys.argv) shell_obj.start()
from django.contrib.auth import authenticate class UserShell(object): '''用户登录堡垒机,启动自定制shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表认证失败,返回用户对象认证成功! if not user: count+=1 print('无效的用户名或者,密码!') else: self.user=user return True else: print('输入次数超过3次!') def start(self): """启动交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s. %s[%s]" % (index, group, group.host_user_binds.count())) print("%s. 未分组机器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 选择的未分组机器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s. %s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] print("selected host", selected_host) elif choice2 == 'b': break
知识点:
在Django视图之外,调用Django功能设置环境变量!(切记放在和Django manage.py 同级目录);
import sys,os,django os.environ.setdefault("DJANGO_SETTINGS_MODULE","Sensors_Data.settings") django.setup() # 在Django视图之外,调用Django功能设置环境变量! from app01 import models objs=models.AlarmInfo.objects.all() for row in objs: print(row.comment)
注意:在Django启动时会自动加载一些 文件,比如每个app中admin.py,不能在这些文件里面设置加载环境变量,因为已经加载完了,如果违反这个规则会导致Django程序启动失败;
1.实现ssh用户指令检测
1.0 修改open_shh源码,扩展 ssh -Z 唯一标识符;(这样每次ssh远程登录,都可以利用唯一标识符,分辨出 每个ssh会话进程;)
修改OpenSsh下的ssh.c文件的608和609行、935行增加; while ((opt = getopt(ac, av, "1246ab:c:e:fgi:kl:m:no:p:qstvxz:" "ACD:E:F:GI:J:KL:MNO:PQ:R:S:TVw:W:XYyZ:")) != -1) { case 'Z': break;
知识点:
OpenSSH 是 SSH (Secure SHell) 协议的免费开源实现项目。
1.1 修改openssh之后,编译、安装
chmod 755 configure ./configure --prefix=/usr/local/openssh make chmod 755 mkinstalldirs make install sshpass -p xxxxxx123 /usr/local/openssh/bin/ssh root@172.17.10.112 -Z s1123ssssd212
1.2 每个ssh会话进程可以唯一标识之后,在堡垒机使用会话脚本shell脚本检测 ssh会话进程;(strace命令进行监控,并生产 log日志文件);
#!/usr/bin/bash for i in $(seq 1 30);do echo $i $1 process_id=`ps -ef | grep $1 | grep -v 'ession_check.sh' | grep -v grep | grep -v sshpass | awk '{print $2}'` echo "process: $process_id" if [ ! -z "$process_id" ];then echo 'start run strace.....' strace -fp $process_id -t -o $2.log; break; fi sleep 5 done;
知识点:
strace 检测进程的IO调用,监控用户shell输入的命令字符;
sshpass无需提示输入密码登录
[root@localhost sshpass-1.06]# sshpass -p wsnb ssh root@172.16.22.1 -o StrictHostKeyChecking=no Last login: Tue Jul 10 16:39:53 2018 from 192.168.113.84 [root@ecdb ~]#
python生成唯一标识符
s=string.ascii_lowercase+string.digits random_tag=''.join(random.sample(s,10))
解决普通用户,无法执行 strace命令;
方式1:执行文件 +s权限
chmod u+s `which strace`
方式2:修改sudo配置文件,使普通用户sudo时无需输入密码!
修改sudo配置文件,防止修改出错,一定要切换到root用户; %普通用户 ALL=(ALL) NOPASSWD: ALL wq! #退出
#!/usr/bin/python3 # -*- coding: utf-8 -* from django.contrib.auth import authenticate import subprocess,string,random from audit import models from django.conf import settings class UserShell(object): '''用户登录堡垒机,启动自定制shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表认证失败,返回用户对象认证成功! if not user: count+=1 print('无效的用户名或者,密码!') else: self.user=user return True else: print('输入次数超过3次!') def start(self): """启动交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s. %s[%s]" % (index, group, group.host_user_binds.count())) print("%s. 未分组机器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_list = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_list = selected_group.host_user_binds.all() elif choice == len(host_groups): # 选择的未分组机器 # selected_group = self.user.account.host_user_binds.all() host_bind_list = self.user.account.host_user_binds.all() if host_bind_list: while True: for index, host in enumerate(host_bind_list): print("%s. %s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_list): selected_host = host_bind_list[choice2] s = string.ascii_lowercase + string.digits random_tag = ''.join(random.sample(s, 10)) session_obj=models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) session_tracker_scipt='/bin/sh %s %s %s'%(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.pk) session_tracker_process=subprocess.Popen(session_tracker_scipt,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) cmd='sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o stricthostkeychecking=no -Z %s' % (selected_host.host_user.password, selected_host.host_user.username, selected_host.host.ip_addr, selected_host.host.port,random_tag) subprocess.run(cmd,shell=True)#开启子进程交互 print(session_tracker_process.stdout.readlines(), session_tracker_process.stderr.readlines()) elif choice2 == 'b': break
2.shell远程登录程序检查日志文件,分析;
tab补全的命令,需要搜素write(5,该脚本实现思路,按键去尝试,循环多种条件判断;
import re class AuditLogHandler(object): '''分析audit log日志''' def __init__(self,log_file): self.log_file_obj = self._get_file(log_file) def _get_file(self,log_file): return open(log_file) def parse(self): cmd_list = [] cmd_str = '' catch_write5_flag = False #for tab complication for line in self.log_file_obj: #print(line.split()) line = line.split() try: pid,time_clock,io_call,char = line[0:4] if io_call.startswith('read(4'): if char == '"\177",':#回退 char = '[1<-del]' if char == '"\33OB",': #vim中下箭头 char = '[down 1]' if char == '"\33OA",': #vim中下箭头 char = '[up 1]' if char == '"\33OC",': #vim中右移 char = '[->1]' if char == '"\33OD",': #vim中左移 char = '[1<-]' if char == '"33[2;2R",': #进入vim模式 continue if char == '"\33[>1;95;0c",': # 进入vim模式 char = '[----enter vim mode-----]' if char == '"\33[A",': #命令行向上箭头 char = '[up 1]' catch_write5_flag = True #取到向上按键拿到的历史命令 if char == '"\33[B",': # 命令行向上箭头 char = '[down 1]' catch_write5_flag = True # 取到向下按键拿到的历史命令 if char == '"\33[C",': # 命令行向右移动1位 char = '[->1]' if char == '"\33[D",': # 命令行向左移动1位 char = '[1<-]' cmd_str += char.strip('"",') if char == '"\t",': catch_write5_flag = True continue if char == '"\r",': cmd_list.append([time_clock,cmd_str]) cmd_str = '' # 重置 if char == '"':#space cmd_str += ' ' if catch_write5_flag: #to catch tab completion if io_call.startswith('write(5'): if io_call == '"7",': #空键,不是空格,是回退不了就是这个键 pass else: cmd_str += char.strip('"",') catch_write5_flag = False except ValueError as e: print("