您的位置:首页 > 其它

kafka权限控制-Acl

2017-07-27 00:00 302 查看

介绍

Acl表示对一个资源的访问权限。它由Resource和Acl组成。

Resource表示一个具体的资源。

Acl表示权限,由主体principal,是否允许permissionType,主机host,操作operation组成。

Resource

//  ResourceType表示资源类型,name则表示资源标识符
case class Resource(resourceType: ResourceType, name: String) {

override def toString: String = {
resourceType.name + Resource.Separator + name
}
}

以一个名为test的Topic为例,用Resource表示这个资源

new Resource(ResourceType.Topic, "test")

ResourceType

object ResourceType {

def fromString(resourceType: String): ResourceType = {
// 从values找到name相等的type
val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
}
// 取值序列
def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId)

def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", ""))
}

ResourceType只有四种内置的类型

case object Topic extends ResourceType {
val name = "Topic"
val error = Errors.TOPIC_AUTHORIZATION_FAILED
val toJava = JResourceType.TOPIC
}

case object Group extends ResourceType {
val name = "Group"
val error = Errors.GROUP_AUTHORIZATION_FAILED
val toJava = JResourceType.GROUP
}

case object Cluster extends ResourceType {
val name = "Cluster"
val error = Errors.CLUSTER_AUTHORIZATION_FAILED
val toJava = JResourceType.CLUSTER
}

case object TransactionalId extends ResourceType {
val name = "TransactionalId"
val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
val toJava = JResourceType.TRANSACTIONAL_ID
}

Operation

object Operation {

def fromString(operation: String): Operation = {
// 从values找到name相等的值
val op = values.find(op => op.name.equalsIgnoreCase(operation))
op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
}

def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", ""))
// 取值集合
def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All)
}

Opearation只有下面几种内置的类型

// 读操作
case object Read extends Operation {
val name = "Read"
val toJava = AclOperation.READ
}
// 写操作
case object Write extends Operation {
val name = "Write"
val toJava = AclOperation.WRITE
}
// 新建操作
case object Create extends Operation {
val name = "Create"
val toJava = AclOperation.CREATE
}
// 删除操作
case object Delete extends Operation {
val name = "Delete"
val toJava = AclOperation.DELETE
}
// 修改操作
case object Alter extends Operation {
val name = "Alter"
val toJava = AclOperation.ALTER
}
// 描述操作
case object Describe extends Operation {
val name = "Describe"
val toJava = AclOperation.DESCRIBE
}
// 集群操作
case object ClusterAction extends Operation {
val name = "ClusterAction"
val toJava = AclOperation.CLUSTER_ACTION
}
// 描述配置操作
case object DescribeConfigs extends Operation {
val name = "DescribeConfigs"
val toJava = AclOperation.DESCRIBE_CONFIGS
}
// 修改配置操作
case object AlterConfigs extends Operation {
val name = "AlterConfigs"
val toJava = AclOperation.ALTER_CONFIGS
}
//
case object IdempotentWrite extends Operation {
val name = "IdempotentWrite"
val toJava = AclOperation.IDEMPOTENT_WRITE
}
// 表示所有的操作
case object All extends Operation {
val name = "All"
val toJava = AclOperation.ALL
}

PermissionType

object PermissionType {
def fromString(permissionType: String): PermissionType = {
val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
}
// 从values找到name相等的值
def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
// 取值集合
def values: Seq[PermissionType] = List(Allow, Deny)
}

内置的PermissionType,只有两种,Allow表示允许,Deny表示拒绝。

case object Allow extends PermissionType {
val name = "Allow"
val toJava = AclPermissionType.ALLOW
}

case object Deny extends PermissionType {
val name = "Deny"
val toJava = AclPermissionType.DENY
}

KafkaPrincipal

KafkaPrincipal默认是以User类型,来区分的。

public class KafkaPrincipal implements Principal {
public static final String SEPARATOR = ":";
public static final String USER_TYPE = "User";
public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
// 主体类型
private String principalType;
// 标识符
private String name;

public KafkaPrincipal(String principalType, String name) {
if (principalType == null || name == null) {
throw new IllegalArgumentException("principalType and name can not be null");
}
this.principalType = principalType;
this.name = name;
}

public static KafkaPrincipal fromString(String str) {
if (str == null || str.isEmpty()) {
throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
}
// 以:字符切割
String[] split = str.split(SEPARATOR, 2);

if (split == null || split.length != 2) {
throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
}

return new KafkaPrincipal(split[0], split[1]);

public String toString() {
return principalType + SEPARATOR + name;
}
}

Acl

case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
// 转为map类型。后面会再转为json类型,存到zookeeper的节点中
def toMap(): Map[String, Any] = {
Map(Acl.PrincipalKey -> principal.toString,
Acl.PermissionTypeKey -> permissionType.name,
Acl.OperationKey -> operation.name,
Acl.HostsKey -> host)
}
}

object Acl {
val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
val WildCardHost: String = "*"
val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
val PrincipalKey = "principal"
val PermissionTypeKey = "permissionType"
val OperationKey = "operation"
val HostsKey = "host"
val VersionKey = "version"
val CurrentVersion = 1
val AclsKey = "acls"

/**
aclJson数据存储在zookeeper中,它的格式如下
{
"version": 1,
"acls": [
{
"host":"host1",
"permissionType": "Deny",
"operation": "Read",
"principal": "User:alice"
}
]
}

*/
def fromJson(aclJson: String): Set[Acl] = {
if (aclJson == null || aclJson.isEmpty)
return collection.immutable.Set.empty[Acl]

var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
Json.parseFull(aclJson) match {
case Some(m) =>
val aclMap = m.asInstanceOf[Map[String, Any]]
//the acl json version.
require(aclMap(VersionKey) == CurrentVersion)
// 获取aclJson的acls的值
val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
aclSet.foreach(item => {
val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
val host: String = item(HostsKey).asInstanceOf[String]
// 构建Acl,并且添加到acls里
acls += new Acl(principal, permissionType, host, operation)
})
case None =>
}
acls.toSet
}
}

SimpleAclAuthorizer

实现了Authorizer接口,主要提供了Acl的管理

class SimpleAclAuthorizer extends Authorizer with Logging {

private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
// 初始化配置
override def configure(javaConfigs: util.Map[String, _]) {
val configs = javaConfigs.asScala
val props = new java.util.Properties()
configs.foreach { case (key, value) => props.put(key, value.toString) }
// 从配置中获取super.users的值,这是一个字符串。
// 格式为User:user1;User:user2,用户之间用;隔开,一个用户是User:username的格式。
superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
}.getOrElse(Set.empty[KafkaPrincipal])

// 这个配置表示,当没有找到对应的Acl规则时,默认是否允许
shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)

// 初始化zookeeper连接
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
zkUtils = ZkUtils(zkUrl,
sessionTimeout = zkSessionTimeOutMs,
connectionTimeout = zkConnectionTimeoutMs,
kafkaConfig.zkEnableSecureAcls)

// 保证Acl节点存在
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
// 从zookeeper中读取数据,初始化
loadCache()
// 保证Acl节点存在aclCache
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)

// 注册监听时间,当节点有变动时,会自行调用AclChangedNotificationHandler回调函数,更新aclCache
aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
aclChangeListener.init()
}

private def loadCache()  {
inWriteLock(lock) {
// zkUtils.getChildren 返回子节点列表,子节点的数据类型为String
// 返回"/acls"节点的子节点
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
// 根据string实例化ResourceType
val resourceType = ResourceType.fromString(rType)
// 返回"/acls/resourceName"节点的子节点
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
// 根据type和name实例化Resource,然后从zookeeper中读取到对应的Acl列表
val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
// 更新aclCache
updateCache(new Resource(resourceType, resourceName), versionedAcls)
}
}
}
}

def toResourcePath(resource: Resource): String = {
// 根据Resource找到zookeeper中对应的节点路径
SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name
}

private def getAclsFromZk(resource: Resource): VersionedAcls = {
// 读取Resource对应节点的数据
val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
// 调用Acl.fromJson解析数据,返回VersionedAcls。VersionedAcls定义在下面
VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
}

// 更新aclCache
private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
if (versionedAcls.acls.nonEmpty) {
aclCache.put(resource, versionedAcls)
} else {
aclCache.remove(resource)
}
}

}

VersionedAcls的定义

object SimpleAclAuthorizer {
// VersionedAcls只是Acl的列表和zkVersion的版本号
private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
}

SimpleAclAuthorizer还有一个重要的方法authorize,用于检查权限

class SimpleAclAuthorizer extends Authorizer with Logging {
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
val principal = session.principal
val host = session.clientAddress.getHostAddress
// 获取resource对应的Acl列表和该resource的type的默认Acl列表
// WildCardResource表示匹配所有
val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))

// 从上面resource找到所有的Acls中,查找是否有deny的acl
val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)

val allowOps = operation match {
// Read, Write, Delete, Alter动作包含了Describe
case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
// AlterConfigs包含了DescribeConfigs
case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
// 其余的不修改
case _ => Set[Operation](operation)
}
// 遍历allowOps列表,查找是否有明确指定Allow的acl
val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))

// 有以下三种条件,满足其一,则认为有权限
// 是否是super user
val authorized = isSuperUser(operation, resource, principal, host) ||
// 如果acls没有找到,查看默认配置
isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
// 如果没有找到deny的acl,并且还有allow的acl
(!denyMatch && allowMatch)

logAuditMessage(principal, authorized, operation, resource, host)
authorized
}

private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host:
String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
acls.find { acl =>
// // permissionType相等
acl.permissionType == permissionType &&
// principal相等,或者principal为WildCardPrincipal,表示匹配所有
(acl.principal == principal || acl.principal == Acl.WildCardPrincipal) &&
// operation相等,或者operation为All,表示匹配所有
(operations == acl.operation || acl.operation == All) &&
// host相等,或者host为WildCardHost,表示匹配所有
(acl.host == host || acl.host == Acl.WildCardHost)
}.exists { acl =>
authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl")
// 找到后,则返回true。没有,则返回false
true
}
}

// 是否principal为super user
def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
// superUsers是否包含principal
if (superUsers.contains(principal)) {
authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")
true
} else false
}

def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal,
host: String, acls: Set[Acl]): Boolean = {
if (acls.isEmpty) {
authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
// 返回配置的值
shouldAllowEveryoneIfNoAclIsFound
} else false
}
}

因为acl数据持久化到zookeeper中,所以当zookeeper中的数据发生改变时,应该还有监听的作用。这个是通过zookeeper的watch来实现的。

acl的更新涉及到zookeeper的两个地方。一个是Resource节点,存储acls。另一个是持久顺序节点,它的子节点记录了每次Resource的更新。

aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
aclChangeListener.init()

class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private val seqNodeRoot: String,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val isClosed = new AtomicBoolean(false)

// 初始化
def init() {
zkUtils.makeSurePersistentPathExists(seqNodeRoot)
// 监听seqNodeRoot节点的子节点变化。
// seqNodeRoot就是上面所说的顺序节点
zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
processAllNotifications()
}

def processAllNotifications() {
// 获取seqNodeRoot的子节点。子节点存储了resource
val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
// 并且从小到大排序
processNotifications(changes.asScala.sorted)
}

private def processNotifications(notifications: Seq[String]) {
if (notifications.nonEmpty) {
info(s"Processing notification(s) to $seqNodeRoot")
try {
val now = time.milliseconds
for (notification <- notifications) {
// 获取当前节点的顺序号
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
// 如果changeId比上次更新的id大,则表示这是新的纪录
val changeZnode = seqNodeRoot + "/" + notification
// 读取当前节点的数据,表示Resource的字符串
val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
// 调用notificationHandler的processNotification方法
data.map(notificationHandler.processNotification(_)).getOrElse {
logger.warn(s"read null data from $changeZnode when processing notification $notification")
}
}
// 更新lastExecutedChange
lastExecutedChange = changeId
}
purgeObsoleteNotifications(now, notifications)
} catch {
case e: ZkInterruptedException =>
if (!isClosed.get)
throw e
}
}
}

// 因为它是顺序节点的子节点,所以名称后缀会有自增数字
// 类似于acl_changes_0000000001, acl_changes_0000000002
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

// 监听子节点变化
object NodeChangeListener extends IZkChildListener {

override def handleChildChange(path: String, notifications: java.util.List[String]) {
try {
import scala.collection.JavaConverters._
if (notifications != null)
// 调用processNotifications方法
processNotifications(notifications.asScala.sorted)
} catch {
case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e)
}
}
}
}

上面processNotifications方法,调用了AclChangedNotificationHandler 的processNotification方法。

object AclChangedNotificationHandler extends NotificationHandler {
override def processNotification(notificationMessage: String) {
// 通过字符串,实例化Resource
val resource: Resource = Resource.fromString(notificationMessage)
inWriteLock(lock) {
// 从zookeeper中读取该resource的acls
val versionedAcls = getAclsFromZk(resource)
// 更新aclCache
updateCache(resource, versionedAcls)
}
}

概括

本章先介绍了与acl相关的数据结构。Resource代表资源,Acl表示访问规则。

然后介绍了acl的管理,SimpleAclAuthorizer类。其中涉及到了zookeeper的数据持久化,aclCache的更新。两者之间的同步,通过了zookeeper的watch机制来实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka acl authorizer