LDAP認証を組み込んでみる(その2)
昨日書いたコードで試したら、会社のActiveDirectoryではうまく動かず。
仕方ないので、hackしてみる。
とりあえず動いたので、参考になればと、書き散らかしたものを恥を忍んで載せます。
エラーチェックやバリデートチェックは適宜入れてください。
LDAPの簡易認証を利用しています。
app.cfg内で
identity.soldapprovider.autocreate=True
のように設定しておけば、LDAP上にはあるけどDBに無いユーザはDBに登録されます。
import logging import ldap from turbogears.identity.soprovider import * log = logging.getLogger("turbogears.identity.soldapprovider") hub = PackageHub("turbogears.identity") __connection__ = hub # Global class references -- these will be set when the Provider is initialised. user_class= None group_class= None permission_class= None visit_class = None class SoLdapIdentityProvider(SqlObjectIdentityProvider): """ IdentityProvider that uses LDAP for authentication. """ def __init__(self): super(SoLdapIdentityProvider, self).__init__() get = turbogears.config.get global user_class, group_class, permission_class, visit_class user_class_path= get( "identity.soprovider.model.user", __name__ + ".TG_User" ) user_class= load_class(user_class_path) if user_class: log.info("Succesfully loaded \"%s\"" % user_class_path) try: self.user_class_db_encoding= \ user_class.sqlmeta.columns['user_name'].dbEncoding except (KeyError, AttributeError): self.user_class_db_encoding= 'UTF-8' group_class_path= get( "identity.soprovider.model.group", __name__ + ".TG_Group" ) group_class= load_class(group_class_path) if group_class: log.info("Succesfully loaded \"%s\"" % group_class_path) permission_class_path= get( "identity.soprovider.model.permission", __name__ + ".TG_Permission" ) permission_class= load_class(permission_class_path) if permission_class: log.info("Succesfully loaded \"%s\"" % permission_class_path) visit_class_path= get( "identity.soprovider.model.visit", __name__ + ".TG_VisitIdentity" ) visit_class= load_class(visit_class_path) if visit_class: log.info("Succesfully loaded \"%s\"" % visit_class_path) self.host = get("identity.soldapprovider.host", "localhost") self.port = get("identity.soldapprovider.port", 389) self.basedn = get("identity.soldapprovider.basedn", "dc=localhost") self.domain = get("identity.soldapprovider.domain", "") self.autocreate = get("identity.soldapprovider.autocreate", False) log.info("host :: %s" % self.host) log.info("port :: %d" % self.port) log.info("basedn :: %s" % self.basedn) log.info("domain :: %s" % self.domain) log.info("autocreate :: %s" % self.autocreate) def validate_identity( self, user_name, password, visit_key ): ''' Look up the identity represented by user_name and determine whether the password is correct. Must return either None if the credentials weren't valid or an object with the following properties: user_name: original user name user: a provider dependant object (TG_User or similar) groups: a set of group IDs permissions: a set of permission IDs ''' ldapcon = ldap.open(self.host, self.port) ldapresult = None try: ldapcon.simple_bind_s('%s@%s' % (user_name, self.domain), password) filter = "(sAMAccountName=%s)" % user_name rc = ldapcon.search(self.basedn, ldap.SCOPE_SUBTREE, filter) ldapresult = ldapcon.result(rc)[1] except ldap.LDAPError: log.error("Invalid password supplied for %s" % user_name) return None if(len(ldapresult) == 0): log.warning("No such LDAP user: %s" % user_name) return False elif(len(ldapresult) > 1): log.error("Too many users: %s" % user_name) return False try: log.info("user_class.by_user_name(%s)" % user_name) user = user_class.by_user_name(user_name) except SQLObjectNotFound: if(self.autocreate and ldapresult): hub.begin() # get user info user_info = dict() if len(ldapresult): user_info = ldapresult[0][1] user = user_class(user_name=user_name, password='ldap', display_name=unicode(user_info.get('displayName', [user_name])[0], 'utf8'), email_address=user_info.get('mail', [''])[0]) ldapcon.unbind_s() hub.commit() hub.end() else: log.error("No such database user: %s" % user_name) return None try: link= visit_class.by_visit_key(visit_key) link.user_id= user.id except SQLObjectNotFound: link= visit_class(visit_key=visit_key, user_id=user.id) return SqlObjectIdentity(visit_key, user)