hbase 权限控制源码分析
2016-07-26 11:48
435 查看
–hbase 权限控制源码分析
在hbase中可以通过grant ‘alice’, ‘RW’, ‘hbase_perf.table20’
命令把一个读写权限分配给一个命名空间下面的一个表
其实是通过rpc 调用发送到 AccessController 这个类当中
可以通过查看该类的继续关系
public class AccessController extends BaseMasterAndRegionObserver implements RegionServerObserver, AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
就可以大概分出具体的功能。通过CoprocessorService 类对外提供rpc的 protobuf 服务
其中比较重要的接口类为 BaseMasterAndRegionObserver,里面有很多像
preCreateTable、preDeleteTable 的接口,这样在进行表的创建和删除时,就可以进行权限回调校验了
AccessController 该类其实也只是一个普通的协处理器。
首先当regionServer打开一个协处理器时
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) { RegionCoprocessorEnvironment env = c.getEnvironment(); final Region region = env.getRegion(); if (region == null) { LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()"); return; } //这里只会处理acl的表数据初始化 if (AccessControlLists.isAclRegion(region)) { aclRegion = true; // When this region is under recovering state, initialize will be handled by postLogReplay if (!region.isRecovering()) { try { initialize(env); } catch (IOException ex) { // if we can't obtain permissions, it's better to fail // than perform checks incorrectly throw new RuntimeException("Failed to initialize permissions cache", ex); } } } else { initialized = true; } }
会读取 region是 acl 的表进行初始化存放到zk当中,
void initialize(RegionCoprocessorEnvironment e) throws IOException { final Region region = e.getRegion(); Configuration conf = e.getConfiguration(); Map<byte[], ListMultimap<String,TablePermission>> tables = AccessControlLists.loadAll(region); // For each table, write out the table's permissions to the respective // znode for that table. for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t: tables.entrySet()) { byte[] entry = t.getKey(); ListMultimap<String,TablePermission> perms = t.getValue(); byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf); //把数据序列化到zk进行存储了 this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized); } initialized = true; }
扫描acl表中的数据出来,然后存放在zk中。这里的数据很重要,整个权限数据都是存入在acl表当中的,现在在acl region打开时
自动读取更新到zk当中,然后后面会说到会有一个zk watcher进行监听读取相关的数据到缓存当中去。
然后在AccessController进行了起动的调用
public void start(CoprocessorEnvironment env)
接着在方法里面初始化zk
ZooKeeperWatcher zk = null; if (env instanceof MasterCoprocessorEnvironment) { // if running on HMaster MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env; zk = mEnv.getMasterServices().getZooKeeper(); } else if (env instanceof RegionServerCoprocessorEnvironment) { RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env; zk = rsEnv.getRegionServerServices().getZooKeeper(); } else if (env instanceof RegionCoprocessorEnvironment) { // if running at region regionEnv = (RegionCoprocessorEnvironment) env; conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration()); zk = regionEnv.getRegionServerServices().getZooKeeper(); compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT, AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT); }
从上面的zk的初始化当中,可以分析出,其实regionserver的协处理器的初始化有三个实例对象环境
分别是hmaster,hregionserver,region。
然后初始化权限判断对象,这个对象就是进行直接的权限校验工作了
this.authManager = TableAuthManager.get(zk, env.getConfiguration());
现在以 preCreateTable 作为分析流程。
在hmaster中的preCreateTable 方法当中会调用
` cpHost.preCreateTable(hTableDescriptor, newRegions);`
然后cpHost 中的方法为,调用如下
public void preCreateTable(final HTableDescriptor htd, final HRegionInfo[] regions) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { @Override public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException { oserver.preCreateTable(ctx, htd, regions); } }); }
从上面可以看到其实回调了
MasterObserver 的 preCreateTable 方法,该方法的实现类就是为 AccessController
在 AccessController 实现类中,就会取出具体的关键信息
@Override public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c, HTableDescriptor desc, HRegionInfo[] regions) throws IOException { Set<byte[]> families = desc.getFamiliesKeys(); Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR); for (byte[] family: families) { familyMap.put(family, null); } requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(), desc.getTableName(), familyMap, Action.CREATE); }
然后在 requireNamespacePermission 方法中,会拿
requireNamespacePermission User user = getActiveUser(); 用客户端的请求用户信息 private User getActiveUser() throws IOException { User user = RpcServer.getRequestUser(); if (user == null) { // for non-rpc handling, fallback to system user user = userProvider.getCurrent(); } return user; }
这个信息就是之前文章提到的,客户端上传的请求头中包含的用户名,放在threadLocal对象当中
public void requireNamespacePermission(String request, String namespace, TableName tableName, Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions) throws IOException { User user = getActiveUser(); AuthResult result = null; for (Action permission : permissions) { if (authManager.authorize(user, namespace, permission)) { result = AuthResult.allow(request, "Namespace permission granted", user, permission, namespace); result.getParams().setTableName(tableName).setFamilies(familyMap); break; } else { // rest of the world result = AuthResult.deny(request, "Insufficient permissions", user, permission, namespace); result.getParams().setTableName(tableName).setFamilies(familyMap); } } logResult(result); if (authorizationEnabled && !result.isAllowed()) { throw new AccessDeniedException("Insufficient permissions " + result.toContextString()); } }
在上面中就是对用户的请求进行权限的判断。
上面判断用到的权限数据其实是下面对象创建时进行读取的
this.authManager = TableAuthManager.get(zk, env.getConfiguration());
在该构造方法当中
private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf) throws IOException { this.conf = conf; // initialize global permissions based on configuration globalCache = initGlobal(conf); this.zkperms = new ZKPermissionWatcher(watcher, this, conf); try { this.zkperms.start(); } catch (KeeperException ke) { LOG.error("ZooKeeper initialization failed", ke); } }
可以看到会初始化数据到 globalCache 中,连接到zk进行监听数据
public void start() throws KeeperException { try { watcher.registerListener(this); if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { List<ZKUtil.NodeAndData> existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { refreshNodes(existing); } } } finally { initialized.countDown(); } }
起动时,会读取zk中的权限数据到缓存当中去.
现在我们回到最开始的地方,acl表的数据是如何存放进去的(前面说到acl region打开时,会读取对应的region中的数据
写数据到zk中,所以后面的监听进程就可以
读取的到zk中的数据了)
在 AccessController 类中继承了 AccessControlService.Interface 接口,所以在客户端进行 grant 赋权时
其实是调用到服务端的grant方面的
/** * 给不同的权限了 */ @Override public void grant(RpcController controller, AccessControlProtos.GrantRequest request, RpcCallback<AccessControlProtos.GrantResponse> done) { final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission()); AccessControlProtos.GrantResponse response = null; try { // verify it's only running at .acl. if (aclRegion) { if (!initialized) { throw new CoprocessorException("AccessController not yet initialized"); } if (LOG.isDebugEnabled()) { LOG.debug("Received request to grant access permission " + perm.toString()); } //判断当前用户是否具有赋权限的权限 switch(request.getUserPermission().getPermission().getType()) { case Global : case Table : requirePermission("grant", perm.getTableName(), perm.getFamily(), perm.getQualifier(), Action.ADMIN); break; case Namespace : requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN); break; } User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm); return null; } }); if (AUDITLOG.isTraceEnabled()) { // audit log should store permission changes in addition to auth results AUDITLOG.trace("Granted permission " + perm.toString()); } } else { throw new CoprocessorException(AccessController.class, "This method " + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table."); } response = AccessControlProtos.GrantResponse.getDefaultInstance(); } catch (IOException ioe) { // pass exception back up ResponseConverter.setControllerException(controller, ioe); } done.run(response); }
可以看到上面首先,拿到当前正在操作的用户是否具有赋权的权限
然后在
AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm);
方法当中,把数据写到acl表当中
static void addUserPermission(Configuration conf, UserPermission userPerm) throws IOException { Permission.Action[] actions = userPerm.getActions(); byte[] rowKey = userPermissionRowKey(userPerm); Put p = new Put(rowKey); byte[] key = userPermissionKey(userPerm); if ((actions == null) || (actions.length == 0)) { String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'"; LOG.warn(msg); throw new IOException(msg); } byte[] value = new byte[actions.length]; for (int i = 0; i < actions.length; i++) { value[i] = actions[i].code(); } p.addImmutable(ACL_LIST_FAMILY, key, value); if (LOG.isDebugEnabled()) { LOG.debug("Writing permission with rowKey "+ Bytes.toString(rowKey)+" "+ Bytes.toString(key)+": "+Bytes.toStringBinary(value) ); } // TODO: Pass in a Connection rather than create one each time. 把结果存放到acl表当中去 try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Table table = connection.getTable(ACL_TABLE_NAME)) { table.put(p); } } }
这个acl表就有数据了
同时,在hbase运行过程上,往acl增加权限数据时,会调用如下的方法
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put, final WALEdit edit, final Durability durability) { if (aclRegion) { //如果是acl中的数据就同时往zk中写进去 updateACL(c.getEnvironment(), put.getFamilyCellMap()); }
}
把数据了同时写到zk当中去,
/** * Writes all table ACLs for the tables in the given Map up into ZooKeeper * znodes. This is called to synchronize ACL changes following {@code _acl_} * table updates. * 如果是acl中的数据,就同时往zk中写进去 */ void updateACL(RegionCoprocessorEnvironment e, final Map<byte[], List<Cell>> familyMap) { Set<byte[]> entries = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR); for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) { List<Cell> cells = f.getValue(); for (Cell cell: cells) { if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0, AccessControlLists.ACL_LIST_FAMILY.length)) { entries.add(CellUtil.cloneRow(cell)); } } } ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher(); Configuration conf = regionEnv.getConfiguration(); for (byte[] entry: entries) { try { ListMultimap<String,TablePermission> perms = AccessControlLists.getPermissions(conf, entry); byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf); zkw.writeToZookeeper(entry, serialized); } catch (IOException ex) { LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'", ex); } } }
总结流程
客户端grant 权限数据,提交到服务端的 AccessController.grant方法当中,
然后写数据到Acl 当中,然后当打开acl region时,会把这个region中的数据写到zk当中
zk中的监听进程就会监听到数据,就回写到 authManager 的缓存当中去了,如果是在运行当
中增加acl权限,会在region是 Acl 的potput方法中,同时写到zk当中去
相关文章推荐
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- 如何解决struts2日期类型转换
- java 保留两位小数的几种方法
- Java IO流 文件传输基础
- Eclipse中查看android工程代码出现"android.jar has no source attachment"的解决方案
- 基于Java实现杨辉三角 LeetCode Pascal's Triangle
- hbase shell基础和常用命令详解
- 手把手教你配置Hbase完全分布式环境
- 实战:在Java Web 项目中使用HBase
- HBase RowKey设计的那些事
- Spark中将对象序列化存储到hdfs
- HBase基本原理
- HBase中的基本概念
- 【原创】基于分布式存储的开源系统在实时数据库海量历史数据存储项目上的预研
- HBase0.96.x开发使用(一)--安装
- 基于外部ZooKeeper的GlusterFS作为分布式文件系统的完全分布式HBase集群安装指南
- 基于solr实现hbase的二级索引