您的位置:首页 > 其它

hbase 权限控制源码分析

2016-07-26 11:48 435 查看

–hbase 权限控制源码分析

在hbase中可以通过

grant ‘alice’, ‘RW’, ‘hbase_perf.table20’

命令把一个读写权限分配给一个命名空间下面的一个表

其实是通过rpc 调用发送到 AccessController 这个类当中

可以通过查看该类的继续关系

public class AccessController extends BaseMasterAndRegionObserver
implements RegionServerObserver,
AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {


就可以大概分出具体的功能。通过CoprocessorService 类对外提供rpc的 protobuf 服务

其中比较重要的接口类为 BaseMasterAndRegionObserver,里面有很多像

preCreateTable、preDeleteTable 的接口,这样在进行表的创建和删除时,就可以进行权限回调校验了

AccessController 该类其实也只是一个普通的协处理器。

首先当regionServer打开一个协处理器时

public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
RegionCoprocessorEnvironment env = c.getEnvironment();
final Region region = env.getRegion();
if (region == null) {
LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
return;
}
//这里只会处理acl的表数据初始化
if (AccessControlLists.isAclRegion(region)) {
aclRegion = true;
// When this region is under recovering state, initialize will be handled by postLogReplay
if (!region.isRecovering()) {
try {
initialize(env);
} catch (IOException ex) {
// if we can't obtain permissions, it's better to fail
// than perform checks incorrectly
throw new RuntimeException("Failed to initialize permissions cache", ex);
}
}
} else {
initialized = true;
}
}


会读取 region是 acl 的表进行初始化存放到zk当中,

void initialize(RegionCoprocessorEnvironment e) throws IOException {
final Region region = e.getRegion();
Configuration conf = e.getConfiguration();
Map<byte[], ListMultimap<String,TablePermission>> tables =
AccessControlLists.loadAll(region);
// For each table, write out the table's permissions to the respective
// znode for that table.
for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
tables.entrySet()) {
byte[] entry = t.getKey();
ListMultimap<String,TablePermission> perms = t.getValue();
byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
//把数据序列化到zk进行存储了
this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
}
initialized = true;
}


扫描acl表中的数据出来,然后存放在zk中。这里的数据很重要,整个权限数据都是存入在acl表当中的,现在在acl region打开时

自动读取更新到zk当中,然后后面会说到会有一个zk watcher进行监听读取相关的数据到缓存当中去。

然后在AccessController进行了起动的调用

public void start(CoprocessorEnvironment env)


接着在方法里面初始化zk

ZooKeeperWatcher zk = null;
if (env instanceof MasterCoprocessorEnvironment) {
// if running on HMaster
MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
zk = mEnv.getMasterServices().getZooKeeper();
} else if (env instanceof RegionServerCoprocessorEnvironment) {
RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
zk = rsEnv.getRegionServerServices().getZooKeeper();
} else if (env instanceof RegionCoprocessorEnvironment) {
// if running at region
regionEnv = (RegionCoprocessorEnvironment) env;
conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
zk = regionEnv.getRegionServerServices().getZooKeeper();
compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
}


从上面的zk的初始化当中,可以分析出,其实regionserver的协处理器的初始化有三个实例对象环境

分别是hmaster,hregionserver,region。

然后初始化权限判断对象,这个对象就是进行直接的权限校验工作了

this.authManager = TableAuthManager.get(zk, env.getConfiguration());


现在以 preCreateTable 作为分析流程。

在hmaster中的preCreateTable 方法当中会调用

` cpHost.preCreateTable(hTableDescriptor, newRegions);`


然后cpHost 中的方法为,调用如下

public void preCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions)
throws IOException {
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
oserver.preCreateTable(ctx, htd, regions);
}
});
}


从上面可以看到其实回调了

MasterObserver 的 preCreateTable 方法,该方法的实现类就是为 AccessController

在 AccessController 实现类中,就会取出具体的关键信息

@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
Set<byte[]> families = desc.getFamiliesKeys();
Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
for (byte[] family: families) {
familyMap.put(family, null);
}
requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
desc.getTableName(), familyMap, Action.CREATE);
}


然后在 requireNamespacePermission 方法中,会拿

requireNamespacePermission

User user = getActiveUser();
用客户端的请求用户信息
private User getActiveUser() throws IOException {
User user = RpcServer.getRequestUser();
if (user == null) {
// for non-rpc handling, fallback to system user
user = userProvider.getCurrent();
}
return user;
}


这个信息就是之前文章提到的,客户端上传的请求头中包含的用户名,放在threadLocal对象当中

public void requireNamespacePermission(String request, String namespace, TableName tableName,
Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
throws IOException {
User user = getActiveUser();
AuthResult result = null;

for (Action permission : permissions) {
if (authManager.authorize(user, namespace, permission)) {
result = AuthResult.allow(request, "Namespace permission granted",
user, permission, namespace);
result.getParams().setTableName(tableName).setFamilies(familyMap);
break;
} else {
// rest of the world
result = AuthResult.deny(request, "Insufficient permissions", user,
permission, namespace);
result.getParams().setTableName(tableName).setFamilies(familyMap);
}
}
logResult(result);
if (authorizationEnabled && !result.isAllowed()) {
throw new AccessDeniedException("Insufficient permissions "
+ result.toContextString());
}
}


在上面中就是对用户的请求进行权限的判断。

上面判断用到的权限数据其实是下面对象创建时进行读取的

this.authManager = TableAuthManager.get(zk, env.getConfiguration());


在该构造方法当中

private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
throws IOException {
this.conf = conf;

// initialize global permissions based on configuration
globalCache = initGlobal(conf);

this.zkperms = new ZKPermissionWatcher(watcher, this, conf);
try {
this.zkperms.start();
} catch (KeeperException ke) {
LOG.error("ZooKeeper initialization failed", ke);
}
}


可以看到会初始化数据到 globalCache 中,连接到zk进行监听数据

public void start() throws KeeperException {
try {
watcher.registerListener(this);
if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
refreshNodes(existing);
}
}
} finally {
initialized.countDown();
}
}


起动时,会读取zk中的权限数据到缓存当中去.

现在我们回到最开始的地方,acl表的数据是如何存放进去的(前面说到acl region打开时,会读取对应的region中的数据

写数据到zk中,所以后面的监听进程就可以

读取的到zk中的数据了)

在 AccessController 类中继承了 AccessControlService.Interface 接口,所以在客户端进行 grant 赋权时

其实是调用到服务端的grant方面的

/**
* 给不同的权限了
*/
@Override
public void grant(RpcController controller,
AccessControlProtos.GrantRequest request,
RpcCallback<AccessControlProtos.GrantResponse> done) {
final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
AccessControlProtos.GrantResponse response = null;
try {
// verify it's only running at .acl.
if (aclRegion) {
if (!initialized) {
throw new CoprocessorException("AccessController not yet initialized");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to grant access permission " + perm.toString());
}
//判断当前用户是否具有赋权限的权限
switch(request.getUserPermission().getPermission().getType()) {
case Global :
case Table :
requirePermission("grant", perm.getTableName(), perm.getFamily(),
perm.getQualifier(), Action.ADMIN);
break;
case Namespace :
requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
break;
}

User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
return null;
}
});

if (AUDITLOG.isTraceEnabled()) {
// audit log should store permission changes in addition to auth results
AUDITLOG.trace("Granted permission " + perm.toString());
}
} else {
throw new CoprocessorException(AccessController.class, "This method "
+ "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
}
response = AccessControlProtos.GrantResponse.getDefaultInstance();
} catch (IOException ioe) {
// pass exception back up
ResponseConverter.setControllerException(controller, ioe);
}
done.run(response);
}


可以看到上面首先,拿到当前正在操作的用户是否具有赋权的权限

然后在

AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);


方法当中,把数据写到acl表当中

static void addUserPermission(Configuration conf, UserPermission userPerm)
throws IOException {
Permission.Action[] actions = userPerm.getActions();
byte[] rowKey = userPermissionRowKey(userPerm);
Put p = new Put(rowKey);
byte[] key = userPermissionKey(userPerm);

if ((actions == null) || (actions.length == 0)) {
String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
LOG.warn(msg);
throw new IOException(msg);
}

byte[] value = new byte[actions.length];
for (int i = 0; i < actions.length; i++) {
value[i] = actions[i].code();
}
p.addImmutable(ACL_LIST_FAMILY, key, value);
if (LOG.isDebugEnabled()) {
LOG.debug("Writing permission with rowKey "+
Bytes.toString(rowKey)+" "+
Bytes.toString(key)+": "+Bytes.toStringBinary(value)
);
}
// TODO: Pass in a Connection rather than create one each time. 把结果存放到acl表当中去
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(ACL_TABLE_NAME)) {
table.put(p);
}
}
}


这个acl表就有数据了

同时,在hbase运行过程上,往acl增加权限数据时,会调用如下的方法

public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit, final Durability durability) {
if (aclRegion) {
//如果是acl中的数据就同时往zk中写进去
updateACL(c.getEnvironment(), put.getFamilyCellMap());
}


}

把数据了同时写到zk当中去,

/**
* Writes all table ACLs for the tables in the given Map up into ZooKeeper
* znodes.  This is called to synchronize ACL changes following {@code _acl_}
* table updates.
* 如果是acl中的数据,就同时往zk中写进去
*/
void updateACL(RegionCoprocessorEnvironment e,
final Map<byte[], List<Cell>> familyMap) {
Set<byte[]> entries =
new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
List<Cell> cells = f.getValue();
for (Cell cell: cells) {
if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
AccessControlLists.ACL_LIST_FAMILY.length)) {
entries.add(CellUtil.cloneRow(cell));
}
}
}
ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
Configuration conf = regionEnv.getConfiguration();
for (byte[] entry: entries) {
try {
ListMultimap<String,TablePermission> perms =
AccessControlLists.getPermissions(conf, entry);
byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
zkw.writeToZookeeper(entry, serialized);
} catch (IOException ex) {
LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
ex);
}
}
}


总结流程

客户端grant 权限数据,提交到服务端的 AccessController.grant方法当中,

然后写数据到Acl 当中,然后当打开acl region时,会把这个region中的数据写到zk当中

zk中的监听进程就会监听到数据,就回写到 authManager 的缓存当中去了,如果是在运行当

中增加acl权限,会在region是 Acl 的potput方法中,同时写到zk当中去
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase