LDAP認証を組み込んでみる(その2)

昨日書いたコードで試したら、会社のActiveDirectoryではうまく動かず。
仕方ないので、hackしてみる。
とりあえず動いたので、参考になればと、書き散らかしたものを恥を忍んで載せます。

エラーチェックやバリデートチェックは適宜入れてください。

LDAPの簡易認証を利用しています。

app.cfg内で

identity.soldapprovider.autocreate=True

のように設定しておけば、LDAP上にはあるけどDBに無いユーザはDBに登録されます。

import logging

import ldap

from turbogears.identity.soprovider import *

log = logging.getLogger("turbogears.identity.soldapprovider")

hub = PackageHub("turbogears.identity")
__connection__ = hub

# Global class references -- these will be set when the Provider is initialised.
user_class= None
group_class= None
permission_class= None
visit_class = None

class SoLdapIdentityProvider(SqlObjectIdentityProvider):
    """
    IdentityProvider that uses LDAP for authentication.
    """

    def __init__(self):
        super(SoLdapIdentityProvider, self).__init__()
        get = turbogears.config.get

        global user_class, group_class, permission_class, visit_class

        user_class_path= get( "identity.soprovider.model.user", 
                              __name__ + ".TG_User" )
        user_class= load_class(user_class_path)
        if user_class:
            log.info("Succesfully loaded \"%s\"" % user_class_path)
        try:
            self.user_class_db_encoding= \
                user_class.sqlmeta.columns['user_name'].dbEncoding
        except (KeyError, AttributeError):
            self.user_class_db_encoding= 'UTF-8'
        group_class_path= get( "identity.soprovider.model.group",
                                __name__ + ".TG_Group" )
        group_class= load_class(group_class_path)
        if group_class:
            log.info("Succesfully loaded \"%s\"" % group_class_path)
            
        permission_class_path= get( "identity.soprovider.model.permission",
                                    __name__ + ".TG_Permission" )
        permission_class= load_class(permission_class_path)
        if permission_class:
            log.info("Succesfully loaded \"%s\"" % permission_class_path)
        
        visit_class_path= get( "identity.soprovider.model.visit",
                                __name__ + ".TG_VisitIdentity" )
        visit_class= load_class(visit_class_path)
        if visit_class:
            log.info("Succesfully loaded \"%s\"" % visit_class_path)

        self.host = get("identity.soldapprovider.host", "localhost")
        self.port = get("identity.soldapprovider.port", 389)
        self.basedn  = get("identity.soldapprovider.basedn", "dc=localhost")
        self.domain = get("identity.soldapprovider.domain", "")
        self.autocreate = get("identity.soldapprovider.autocreate", False)

        log.info("host :: %s" % self.host)
        log.info("port :: %d" % self.port)
        log.info("basedn :: %s" % self.basedn)
        log.info("domain :: %s" % self.domain)
        log.info("autocreate :: %s" % self.autocreate)

    def validate_identity( self, user_name, password, visit_key ):
        '''
        Look up the identity represented by user_name and determine whether the
        password is correct.
        
        Must return either None if the credentials weren't valid or an object
        with the following properties:
            user_name: original user name
            user: a provider dependant object (TG_User or similar)
            groups: a set of group IDs
            permissions: a set of permission IDs
        '''
        
        ldapcon = ldap.open(self.host, self.port)
        ldapresult = None
        try:
            ldapcon.simple_bind_s('%s@%s' % (user_name, self.domain), password)
            filter = "(sAMAccountName=%s)" % user_name
            rc = ldapcon.search(self.basedn, ldap.SCOPE_SUBTREE, filter)
            ldapresult = ldapcon.result(rc)[1]
        except ldap.LDAPError:
            log.error("Invalid password supplied for %s" % user_name)
            return None
        
        if(len(ldapresult) == 0):
            log.warning("No such LDAP user: %s" % user_name)
            return False
        elif(len(ldapresult) > 1):
            log.error("Too many users: %s" % user_name)
            return False
        
        try:
            log.info("user_class.by_user_name(%s)" % user_name)
            user = user_class.by_user_name(user_name)
        except SQLObjectNotFound:
            if(self.autocreate and ldapresult):
                hub.begin()
                
                # get user info
                user_info = dict()
                if len(ldapresult):
                    user_info = ldapresult[0][1]
                user = user_class(user_name=user_name,
                                  password='ldap',
                                  display_name=unicode(user_info.get('displayName', [user_name])[0], 'utf8'),
                                  email_address=user_info.get('mail', [''])[0])
                ldapcon.unbind_s()
                
                hub.commit()
                hub.end()
            else:
                log.error("No such database user: %s" % user_name)
                return None

        try:
            link= visit_class.by_visit_key(visit_key)
            link.user_id= user.id
        except SQLObjectNotFound:
            link= visit_class(visit_key=visit_key, user_id=user.id)
        return SqlObjectIdentity(visit_key, user)