zoukankan      html  css  js  c++  java
  • Python通过LDAP验证、查找用户(class,logging)

    定义一个类,用于初始化ldap连接,验证、查找用户等功能

    # -*- coding: UTF-8 -*-
    import sys 
    reload(sys) 
    sys.setdefaultencoding('utf-8') 
    
    import ldap,logging,time
    logfile = 'e:\a.txt'
    # logging.basicConfig(filename=logfile,level=logging.INFO)
    # logging.basicConfig(format='%(time.asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
    logging.basicConfig(level=logging.INFO,  
                        #format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', #返回值:Thu, 26 May 2016 15:09:31 t11.py[line:92] INFO 
                        format='%(asctime)s %(levelname)s %(message)s', 
                        #datefmt='%a, %d %b %Y %H:%M:%S',  
                        #datefmt='%Y/%m/%d %I:%M:%S %p', #返回2016/05/26 03:12:56 PM
                        datefmt='%Y-%m-%d %H:%M:%S', #返回2016/05/26 03:12:56 PM
                        filename=logfile#,  
                        #filemode='a' #默认为a
                       ) 
    #logging输出结果:
    #2016-05-26 15:22:29 INFO liu1 valid passed.
    #2016-05-26 15:22:37 INFO liu1 valid passed.
    
    
    class ldapc:
        def __init__(self,ldap_path,baseDN,domainname,ldap_authuser,ldap_authpass):
            self.baseDN = baseDN
            self.ldap_error = None
            ldap_authduser = '%s\%s' %(domainname,ldap_authuser)
            self.l=ldap.initialize(ldap_path)
            self.l.protocol_version = ldap.VERSION3
            try:
                self.l.simple_bind_s(ldap_authduser,ldap_authpass)
            except ldap.LDAPError,err:
                self.ldap_error = 'Connect to %s failed, Error:%s.' %(ldap_path,err.message['desc'])
                print self.ldap_error
            # finally:
            #     self.l.unbind_s()
            #     del self.l
    
        def search_users(self,username): #模糊查找,返回一个list,使用search_s()
            if self.ldap_error is None:
                try:
                    searchScope = ldap.SCOPE_SUBTREE
                    searchFiltername = "sAMAccountName" #通过samaccountname查找用户
                    retrieveAttributes = None
                    searchFilter = '(' + searchFiltername + "=" + username +'*)'
                    ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                    if len(ldap_result) == 0: #ldap_result is a list.
                        return "%s doesn't exist." %username
                    else:
                        # result_type, result_data = self.l.result(ldap_result, 0)  
                        # return result_type, ldap_result
                        return ldap_result
                except ldap.LDAPError,err:
                    return err
    
        def search_user(self,username): #精确查找,返回值为list,使用search()
            if self.ldap_error is None:
                try:
                    searchScope = ldap.SCOPE_SUBTREE
                    searchFiltername = "sAMAccountName" #通过samaccountname查找用户
                    retrieveAttributes = None
                    searchFilter = '(' + searchFiltername + "=" + username +')'
                    ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                    result_type, result_data = self.l.result(ldap_result_id, 0)
                    if result_type == ldap.RES_SEARCH_ENTRY:
                        return result_data
                    else:
                        return "%s doesn't exist." %username
                except ldap.LDAPError,err:
                    return err
    
        def search_userDN(self,username): #精确查找,最后返回该用户的DN值
            if self.ldap_error is None:
                try:
                    searchScope = ldap.SCOPE_SUBTREE
                    searchFiltername = "sAMAccountName" #通过samaccountname查找用户
                    retrieveAttributes = None
                    searchFilter = '(' + searchFiltername + "=" + username +')'
                    ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                    result_type, result_data = self.l.result(ldap_result_id, 0)
                    if result_type == ldap.RES_SEARCH_ENTRY:
                        return result_data[0][0] #list第一个值为用户的DN,第二个值是一个dict,包含了用户属性信息
                    else:
                        return "%s doesn't exist." %username
                except ldap.LDAPError,err:
                    return err
    
        def valid_user(self,username,userpassword): #验证用户密码是否正确
            if self.ldap_error is None:
                target_user = self.search_userDN(username) #使用前面定义的search_userDN函数获取用户的DN
                if target_user.find("doesn't exist") == -1:
                    try:
                        self.l.simple_bind_s(target_user,userpassword)
                        logging.info('%s valid passed.
    '%(username)) #logging会自动在每行log后面添加"00"换行,windows下未自动换行
                        return True
                    except ldap.LDAPError,err:
                        return err
                else:
                    return target_user
    
        def update_pass(self,username,oldpassword,newpassword): #####未测试#########
            if self.ldap_error is None:
                target_user = self.search_userDN(username) 
                if target_user.find("doesn't exist") == -1:
                    try:
                        self.l.simple_bind_s(target_user,oldpassword)
                        self.l.passwd_s(target_user,oldpassword,newpassword)
                        return 'Change password success.'
                    except ldap.LDAPError,err:
                        return err
                else:
                    return target_user
    
    
    
    ldap_authuser='liu1'
    ldap_authpass='pass'
    domainname='uu'
    ldappath='ldap://192.168.200.25:389'
    
    baseDN='OU=优优,DC=uu,DC=yuu,DC=com' #ldap_authuser在连接到LDAP的时候不会用到baseDN,在验证其他用户的时候才需要使用
    username = 'liu1' #要查找/验证的用户
    p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)
    print p.valid_user('Lily','lpass') #调用valid_user()方法验证用户是否为合法用户

    遍历OU下的用户函数:

    def search_OU(self): #精确查找,最后返回该用户的DN值
            if self.ldap_error is None:
                try:
                    searchScope = ldap.SCOPE_SUBTREE
                    #searchFiltername = "sAMAccountName" #通过samaccountname查找用户
                    retrieveAttributes = None
                    searchFilter = '(&(objectClass=person))'
                    ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)
                    if ldap_result is not None:
                        udict = {}
                        usersinfor = []
                        for pinfor in ldap_result:
                        #pinfor是一个tuple,第一个元素是该用户的CN,第二个元素是一个dict,包含有用户的所有属性
                            if pinfor[1]:
                                p=pinfor[1]
                                sAMAccountName = p['sAMAccountName'][0] #返回值是一个list
                                displayName = p['displayName'][0]
                                #如果用户的某个属性为空,则dict中不会包含有相应的key
                                if 'department' in p:
                                    department = p['department'][0]
                                else:
                                    department = None
                                #print sAMAccountName,displayName,department
                                udict['sAMAccountName'] = sAMAccountName
                                udict['displayName'] = displayName
                                udict['department'] = department
                                usersinfor.append(udict)
                                # print udict
                        return usersinfor
                except ldap.LDAPError,err:
                    return err
                finally:
                    self.l.unbind_s()
                    del self.l
    
    

    baseDN='OU=Admin,DC=u,DC=y,DC=com' #需要遍历的OU
    p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)
    users = p.search_OU()
    print users[0]['department']

    #retrieveAttributes = None
    searchFilter = '(&(objectClass=person))'
    attrs = ['cn','uid','mail']
    ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, attrs) #只过滤attrs属性,如果为*,则过滤所有属性

    分页返回LDAP查询结果:

    import ldap  
    from ldap.controls import SimplePagedResultsControl  
      
    l = ldap.initialize('ldap://1.1.1.16')  
    l.simple_bind_s('user', 'Password')  
    baseDN=unicode('OU=集团,DC=uxin,DC=youxinpai,DC=com','utf8')
    PAGE_SIZE = 500  #设置每页返回的条数  
    ATTRLIST = ['sAMAccountName','name', 'mail','department','title']  #设置返回的属性值  
    # ATTRLIST = None #设置为None则返回用户的所有属性
    searchFilter =  '(&(objectCategory=person)(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))' #查询Enabled的用户
    pg_ctrl = SimplePagedResultsControl(True, size=PAGE_SIZE, cookie="")  
    userdata = []  
      
    while True:  
        msgid = l.search_ext(baseDN,ldap.SCOPE_SUBTREE, searchFilter, ATTRLIST, serverctrls=[pg_ctrl])  
        _a, res_data, _b, srv_ctrls = l.result3(msgid)  
        # print 'res_data', len(res_data) ,msgid
        userdata.extend(res_data)  
        cookie = srv_ctrls[0].cookie  
        if cookie:  
            pg_ctrl.cookie = cookie  
        else:  
            break  
      
    print 'totalnum:', len(userdata)  
    print userdata[0]

    ObjectClass类型如下:详细参考:http://www.cnblogs.com/dreamer-fish/p/5832735.html(LDAP查询过滤语法)http://www.morgantechspace.com/2013/05/ldap-search-filter-examples.html

    objectClass: person
    objectClass: organizationalPerson
    objectClass: inetOrgPerson

  • 相关阅读:
    Delphi 日期函数列表
    Delphi Copy 函数 和 Pos函数
    delphi xe10 手机程序事件服务操作、退出键操作
    delphi xe10 安卓设备信息
    delphi xe10 获取屏幕截图
    Battery electric vehicles (BEVs) 快充技术
    短波红外(SWIR)相机camera
    多核片上系统(SoC)架构的嵌入式DSP软件设计
    工业4.0是个白日梦吗?
    电子设计搜索引擎引入分析和见解
  • 原文地址:https://www.cnblogs.com/dreamer-fish/p/5531339.html
Copyright © 2011-2022 走看看