您的位置:首页 > 其它

keystone 的pam认证方式

2012-08-30 12:26 141 查看
from __future__ import absolute_import
import pdb

try:
import pam
except ImportError:
pam = None
import PAM

from keystone import identity

def PAM_authenticate(username, password):
def _pam_conv(auth, query_list):
resp = []

for query, q_type in query_list:
if q_type in [PAM.PAM_PROMPT_ECHO_ON, PAM.PAM_PROMPT_ECHO_OFF]:
resp.append((password, 0))
elif q_type in [PAM.PAM_PROMPT_ERROR_MSG,
PAM.PAM_PROMPT_TEXT_INFO]:
resp.append(('', 0))

return resp

auth = PAM.pam()
auth.start('passwd')
auth.set_item(PAM.PAM_USER, username)
auth.set_item(PAM.PAM_CONV, _pam_conv)

try:
auth.authenticate()
auth.acct_mgmt()
except PAM.error:
raise AssertionError('Invalid user / password')

return True

class PamIdentity(identity.Driver):
"""Very basic identity based on PAM.

Tenant is always the same as User, root user has admin role.
"""

def authenticate(self, user_id, tenant_id, password):
if user_id in ['nova', 'glance', 'cinder']:
metadata = {}
metadata['roles'] = ['admin']
tenant = {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'}
user = {'id': user_id, 'name': user_id, 'enabled': True, 'email':'openstack@openstack.com', 'tenantId':'service'}
return (user, tenant, metadata)
else:
auth = pam.authenticate if pam else PAM_authenticate
if auth(user_id, password):
metadata = {}
if user_id == 'root':
metadata['is_admin'] = True

tenant = None
user = {'id': user_id, 'name': user_id, 'enabled': 'true', 'email':user_id, 'tenantId':None}
return (user, tenant, metadata)

def get_tenant(self, tenant_id):
if tenant_id == 'service':
return {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'}
else:
return {'enabled': True, 'description': None, 'name': tenant_id, 'id': tenant_id}

def get_tenant_by_name(self, tenant_name):
if tenant_name == 'service':
return {'enabled': True, 'description': None, 'name': 'service', 'id': 'service'}
else:
return {'enabled': True, 'description': None, 'name': tenant_name, 'id': tenant_name}

def get_user(self, user_id):
if user_id in ['nova', 'glance', 'cinder']:
return {'id': user_id, 'name': user_id, 'enabled': 'True', 'email':'test@test.com', 'tenantId':'service'}
else:
return {'id': user_id, 'name': user_id, 'enabled': 'True', 'email':user_id, 'tenantId':user_id}

def get_user_by_name(self, user_name):
if user_name in ['nova', 'glance', 'cinder']:
return {'id': user_name, 'name': user_name, 'enabled': 'True', 'email':'test@test.com', 'tenantId':'service'}
else:
return {'id': user_name, 'name': user_name, 'enabled': 'True', 'email':user_name, 'tenantId':user_name}

def get_role(self, role_id):
if role_id == 'admin':
return {'id':role_id, 'name':'admin'}
#	return {'id':role_id, 'name':role_id}

def list_users(self):
raise NotImplementedError()

def list_roles(self):
raise NotImplementedError()

def add_user_to_tenant(self, tenant_id, user_id):
raise NotImplementedError()
pass

def remove_user_from_tenant(self, tenant_id, user_id):
pass
raise NotImplementedError()

def get_tenants(self):
return [{'enabled': True, 'description': None, 'name': 'service', 'id': 'service'}]

def get_tenants_for_user(self, user_id):
return [user_id]

def get_roles_for_user_and_tenant(self, user_id, tenant_id):
raise NotImplementedError()

def add_role_to_user_and_tenant(self, user_id, tenant_id, role_id):
raise NotImplementedError()

def remove_role_from_user_and_tenant(self, user_id, tenant_id, role_id):
raise NotImplementedError()

def create_user(self, user_id, user):
raise NotImplementedError()

def update_user(self, user_id, user):
raise NotImplementedError()

def delete_user(self, user_id):
raise NotImplementedError()

def create_tenant(self, tenant_id, tenant):
raise NotImplementedError()

def update_tenant(self, tenant_id, tenant):
raise NotImplementedError()

def delete_tenant(self, tenant_id, tenant):
raise NotImplementedError()

def get_metadata(self, user_id, tenant_id):
metadata = {}
#    if user_id == 'root':# or user_id == 'nova' or user_id == 'glance':
#       metadata['is_admin'] = True
#	metadata['roles'] = ['admin']
return metadata

def create_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()

def update_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()

def delete_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()

def create_role(self, role_id, role):
raise NotImplementedError()

def update_role(self, role_id, role):
raise NotImplementedError()

def delete_role(self, role_id):
raise NotImplementedError()


keystone本身其实已经集成了pam认证方式,但是存在一些问题。

pam.py
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: