keystone 的pam认证方式
2012-08-30 12:26
141 查看
from __future__ import absolute_import import pdb try: import pam except ImportError: pam = None import PAM from keystone import identity def PAM_authenticate(username, password): def _pam_conv(auth, query_list): resp = [] for query, q_type in query_list: if q_type in [PAM.PAM_PROMPT_ECHO_ON, PAM.PAM_PROMPT_ECHO_OFF]: resp.append((password, 0)) elif q_type in [PAM.PAM_PROMPT_ERROR_MSG, PAM.PAM_PROMPT_TEXT_INFO]: resp.append(('', 0)) return resp auth = PAM.pam() auth.start('passwd') auth.set_item(PAM.PAM_USER, username) auth.set_item(PAM.PAM_CONV, _pam_conv) try: auth.authenticate() auth.acct_mgmt() except PAM.error: raise AssertionError('Invalid user / password') return True class PamIdentity(identity.Driver): """Very basic identity based on PAM. Tenant is always the same as User, root user has admin role. """ def authenticate(self, user_id, tenant_id, password): if user_id in ['nova', 'glance', 'cinder']: metadata = {} metadata['roles'] = ['admin'] tenant = {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'} user = {'id': user_id, 'name': user_id, 'enabled': True, 'email':'openstack@openstack.com', 'tenantId':'service'} return (user, tenant, metadata) else: auth = pam.authenticate if pam else PAM_authenticate if auth(user_id, password): metadata = {} if user_id == 'root': metadata['is_admin'] = True tenant = None user = {'id': user_id, 'name': user_id, 'enabled': 'true', 'email':user_id, 'tenantId':None} return (user, tenant, metadata) def get_tenant(self, tenant_id): if tenant_id == 'service': return {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'} else: return {'enabled': True, 'description': None, 'name': tenant_id, 'id': tenant_id} def get_tenant_by_name(self, tenant_name): if tenant_name == 'service': return {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'} else: return {'enabled': True, 'description': None, 'name': tenant_name, 'id': tenant_name} def get_user(self, user_id): if user_id in ['nova', 'glance', 'cinder']: return {'id': user_id, 'name': user_id, 'enabled': 'True', 'email':'test@test.com', 'tenantId':'service'} else: return {'id': user_id, 'name': user_id, 'enabled': 'True', 'email':user_id, 'tenantId':user_id} def get_user_by_name(self, user_name): if user_name in ['nova', 'glance', 'cinder']: return {'id': user_name, 'name': user_name, 'enabled': 'True', 'email':'test@test.com', 'tenantId':'service'} else: return {'id': user_name, 'name': user_name, 'enabled': 'True', 'email':user_name, 'tenantId':user_name} def get_role(self, role_id): if role_id == 'admin': return {'id':role_id, 'name':'admin'} # return {'id':role_id, 'name':role_id} def list_users(self): raise NotImplementedError() def list_roles(self): raise NotImplementedError() def add_user_to_tenant(self, tenant_id, user_id): raise NotImplementedError() pass def remove_user_from_tenant(self, tenant_id, user_id): pass raise NotImplementedError() def get_tenants(self): return [{'enabled': True, 'description': None, 'name': 'service', 'id': 'service'}] def get_tenants_for_user(self, user_id): return [user_id] def get_roles_for_user_and_tenant(self, user_id, tenant_id): raise NotImplementedError() def add_role_to_user_and_tenant(self, user_id, tenant_id, role_id): raise NotImplementedError() def remove_role_from_user_and_tenant(self, user_id, tenant_id, role_id): raise NotImplementedError() def create_user(self, user_id, user): raise NotImplementedError() def update_user(self, user_id, user): raise NotImplementedError() def delete_user(self, user_id): raise NotImplementedError() def create_tenant(self, tenant_id, tenant): raise NotImplementedError() def update_tenant(self, tenant_id, tenant): raise NotImplementedError() def delete_tenant(self, tenant_id, tenant): raise NotImplementedError() def get_metadata(self, user_id, tenant_id): metadata = {} # if user_id == 'root':# or user_id == 'nova' or user_id == 'glance': # metadata['is_admin'] = True # metadata['roles'] = ['admin'] return metadata def create_metadata(self, user_id, tenant_id, metadata): raise NotImplementedError() def update_metadata(self, user_id, tenant_id, metadata): raise NotImplementedError() def delete_metadata(self, user_id, tenant_id, metadata): raise NotImplementedError() def create_role(self, role_id, role): raise NotImplementedError() def update_role(self, role_id, role): raise NotImplementedError() def delete_role(self, role_id): raise NotImplementedError()
keystone本身其实已经集成了pam认证方式,但是存在一些问题。
pam.py
相关文章推荐
- 基于PAM认证方式详解
- Linux-PAM认证方式
- linux的PAM认证和shadow文件中密码的加密方式
- vsftp 通过pam认证方式,添加虚拟用户
- /etc/pam.d/login Linux-PAM认证方式【转】
- Linux-PAM认证方式
- linux的PAM认证和shadow文件中密码的加密方式
- linux的PAM认证和shadow文件中密码的加密方式
- PAM认证方式
- vsftpd基于pam_mysql的认证和hash编码的方式配置虚拟用户
- Linux-PAM认证方式
- vsftp虚拟用户使用pam认证方式:
- linux的PAM认证和shadow文件中密码的加密方式
- Linux-PAM认证方式
- linux的PAM认证和shadow文件中密码的加密方式
- 在Centos7上使用vsftpd+pam_mysql实现虚拟用户认证
- LDAP认证的两种方式
- centos6.5下vsftpd服务的安装及配置并通过pam认证实现虚拟用户文件共享
- OAuth2.0学习(1-9)新浪开放平台微博认证-web应用授权(授权码方式)
- 实现基于AD的MOSS的FORM认证方式