您的位置:首页 > 其它

spark源码阅读(十五)--securityManager

2016-04-07 13:27 417 查看
securityManager主要用于权限设置,比如在使用yarn作为资源调度框架时,用于生成secret key进行登录。该类默认只用一个实例,所以的app使用同一个实例,下面是该类的所有源代码:

private[spark] class SecurityManager(sparkConf: SparkConf)
extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))

// admin acls should be set before view or modify acls
private var adminAcls: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls", ""))

private var viewAcls: Set[String] = _

// list of users who have permission to modify the application. This should
// apply to both UI and CLI for things like killing the application.
private var modifyAcls: Set[String] = _

// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Utils.getCurrentUserName())

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

private val secretKey = generateSecretKey()
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString() +
"; users with modify permissions: " + modifyAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
Authenticator.setDefault(
new Authenticator() {
override def getPasswordAuthentication(): PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null) {
val  parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
}
return passAuth
}
}
)
}

// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)

// SSL configuration for different communication layers - they can override the default
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")

val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream()

try {
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ks)
tmf.getTrustManagers
} finally {
input.close()
}
}

lazy val credulousTrustStoreManagers = Array({
logWarning("Using 'accept-all' trust manager for SSL connections.")
new X509TrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = null

override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}

override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
}: TrustManager
})

val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default"))
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)

val hostVerifier = new HostnameVerifier {
override def verify(s: String, sslSession: SSLSession): Boolean = true
}

(Some(sslContext.getSocketFactory), Some(hostVerifier))
} else {
(None, None)
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
private def stringToSet(list: String): Set[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
}

/**
* Admin acls should be set before the view or modify acls.  If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing view acls to: " + viewAcls.mkString(","))
}

def setViewAcls(defaultUser: String, allowedUsers: String) {
setViewAcls(Set[String](defaultUser), allowedUsers)
}

def getViewAcls: String = viewAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls.  If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
}

def getModifyAcls: String = modifyAcls.mkString(",")

/**
* Admin acls should be set before the view or modify acls.  If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setAdminAcls(adminUsers: String) {
adminAcls = stringToSet(adminUsers)
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
}

def setAcls(aclSetting: Boolean) {
aclsOn = aclSetting
logInfo("Changing acls enabled to: " + aclsOn)
}

/**
* Generates or looks up the secret key.
*
* The way the key is stored depends on the Spark deployment mode. Yarn
* uses the Hadoop UGI.
*
* For non-Yarn deployments, If the config variable is not set
* we throw an exception.
*/
private def generateSecretKey(): String = {
if (!isAuthenticationEnabled) return null
// first check to see if the secret is already set, else generate a new one if on yarn
val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
if (secretKey != null) {
logDebug("in yarn mode, getting secret from credentials")
return new Text(secretKey).toString
} else {
logDebug("getSecretKey: yarn mode, secret key from credentials is null")
}
val cookie = akka.util.Crypt.generateSecureCookie
// if we generated the secret then we must be the first so lets set it so t
// gets used by everyone else
SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)
logInfo("adding secret to credentials in yarn mode")
cookie
} else {
// user must have set spark.authenticate.secret config
sparkConf.getOption("spark.authenticate.secret") match {
case Some(value) => value
case None => throw new Exception("Error: a secret key must be specified via the " +
"spark.authenticate.secret config")
}
}
sCookie
}

/**
* Check to see if Acls for the UI are enabled
* @return true if UI authentication is enabled, otherwise false
*/
def aclsEnabled(): Boolean = aclsOn

/**
* Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls are disabled
* via spark.acls.enable, all users have view access. If the user is null
* it is assumed authentication is off and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
viewAcls.mkString(","))
!aclsEnabled || user == null || viewAcls.contains(user)
}

/**
* Checks the given user against the modify acl list to see if they have
* authorization to modify the application. If the UI acls are disabled
* via spark.acls.enable, all users have modify access. If the user is null
* it is assumed authentication isn't turned on and all users have access.
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
*/
def checkModifyPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
modifyAcls.mkString(","))
!aclsEnabled || user == null || modifyAcls.contains(user)
}

/**
* Check to see if authentication for the Spark communication protocols is enabled
* @return true if authentication is enabled, otherwise false
*/
def isAuthenticationEnabled(): Boolean = authOn

/**
* Gets the user used for authenticating HTTP connections.
* For now use a single hardcoded user.
* @return the HTTP user as a String
*/
def getHttpUser(): String = "sparkHttpUser"

/**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
* @return the SASL user as a String
*/
def getSaslUser(): String = "sparkSaslUser"

/**
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = secretKey

// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: