kafka 0.10.1.0 权限验证源码分析
2017-01-05 00:00
381 查看
初始化流程图
ChannelBuilders.create创建ChannelBuilder对应关系如下:
这里我们配置的listeners值是
listeners=SASL_PLAINTEXT://0.0.0.0:9092
故解析出来的protocol就是SASL_PLAINTEXT,相应的ChannelBuilder也就是SaslChannelBuilder
到这里,权限验证相关的组件算是构建完毕了,然后我们看当一个连接接进来的时候,这些组件是怎么工作的。
入口是Accepter.accept
接下来是Processor线程
+ SaslChannelBuilder.buildChannel:
* 这里会根据构造SaslChannelBuilder时传进来的mode参数的不同选择构造SaslServerAuthenticator还是SaslClientAuthenticator,我们这里是服务端,当然是构造SaslServerAuthenticator.
* 构造好SaslServerAuthenticator后会调用它的configure函数,进行一些初始化配置。
* 把SaslServerAuthenticator对象作为参数传给KafkaChannel返回。
+ 这里注册好新的kafkaChannel后会调用poll函数完成一些IO的读写操作,而权限验证的处理就以这里为入口。
+ pollSelectionKeys函数会处理所有可完成连接,可读或可写的KafkaChannel。而权限验证部分则出现在KafkaChannel的准备阶段。
+ authenticate这个函数内会根据目前握手所处的状态的不同而做不同的处理,
* 首先是HANDSHAKE_REQUEST状态,调用handleKafkaRequest处理第一阶段的握手请求,解析出客户端发来的mechanism,
根据mechanism创建SaslServer,代码如下:
这里具体根据我们在jaas配置文件中的配置,最后是返回了一个PlainSaslServer,具体为什么返回了一个PlainSaslServer稍后讲。
* 然后下一状态是AUTHENTICATE,验证客户端发来的明文用户名和密码,调用了PlainSaslServer的evaluateResponse,代码如下
所以这里可以根据我们的需求根据客户端传过来的username和password动态的去某一个数据源获取和匹配其合法性。
jaas配置文件:
看到这里我们配置了一个org.apache.kafka.common.security.plain.PlainLoginModule
上面讲到返回了一个PlainSaslServer,具体是怎么返回的呢?我们需要从SaslChannelBuilder的configure说起,上流程图:
+ LoginContext.init这里会初始化ConfigFile,读取jaas配置文件中的内容
+ LoginContext.login这里会实例化jaas配置文件中的PlainLoginModule,并依次调用其initialize和login函数,
其中initialize函数会把username和password配置到subject里面去,这个subject最终会通过LoginManager的subject函数在SaslChannelBuilder的buildChannel函数中获取到,设置到SaslClientAuthenticator中用于和其他服务器通讯验证使用。
+ 而PlainLoginModule可不仅仅只做了这一件事情,该类定义了一个静态块初始化代码,调用了PlainSaslServerProvider的initialize函数用于注册创建负责做PLAIN协议验证的类的工厂类PlainSaslServerFactory。代码如下:
我们看到注册的是一个SaslServerFactory.PLAIN -> org.apache.kafka.common.security.plain.PlainSaslServer.PlainSaslServerFactory的对应关系
而再看前面调用的Sasl的createSaslServer的代码:
其中mechFilter的值即是SaslServerFactory.PLAIN,这里取到PlainSaslServerFactory然后调用createSaslServer方法返回了一个PlainSaslServer
综上,kafka内部的整个权限验证的初始化流程和验证逻辑已经比较清晰了(可能讲的比较乱,反正我是清晰了),但是我们发现,跟网上的其他自己编写LoginModule模块做验证的方式不同,kafka的PlainLoginModule这个类本身并没有做什么跟验证有关的逻辑,
只是做了一些初始化和注册provider的工作,而真正做权限验证的是从provider间接生产出来的PlainSaslServer类。
ChannelBuilders.create创建ChannelBuilder对应关系如下:
switch (securityProtocol) { case SSL: requireNonNullMode(mode, securityProtocol); channelBuilder = new SslChannelBuilder(mode); break; case SASL_SSL: case SASL_PLAINTEXT: requireNonNullMode(mode, securityProtocol); if (loginType == null) throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); if (mode == Mode.CLIENT && clientSaslMechanism == null) throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable); break; case PLAINTEXT: case TRACE: channelBuilder = new PlaintextChannelBuilder(); break; default: throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol); }
这里我们配置的listeners值是
listeners=SASL_PLAINTEXT://0.0.0.0:9092
故解析出来的protocol就是SASL_PLAINTEXT,相应的ChannelBuilder也就是SaslChannelBuilder
到这里,权限验证相关的组件算是构建完毕了,然后我们看当一个连接接进来的时候,这些组件是怎么工作的。
入口是Accepter.accept
接下来是Processor线程
+ SaslChannelBuilder.buildChannel:
* 这里会根据构造SaslChannelBuilder时传进来的mode参数的不同选择构造SaslServerAuthenticator还是SaslClientAuthenticator,我们这里是服务端,当然是构造SaslServerAuthenticator.
* 构造好SaslServerAuthenticator后会调用它的configure函数,进行一些初始化配置。
* 把SaslServerAuthenticator对象作为参数传给KafkaChannel返回。
+ 这里注册好新的kafkaChannel后会调用poll函数完成一些IO的读写操作,而权限验证的处理就以这里为入口。
+ pollSelectionKeys函数会处理所有可完成连接,可读或可写的KafkaChannel。而权限验证部分则出现在KafkaChannel的准备阶段。
+ authenticate这个函数内会根据目前握手所处的状态的不同而做不同的处理,
* 首先是HANDSHAKE_REQUEST状态,调用handleKafkaRequest处理第一阶段的握手请求,解析出客户端发来的mechanism,
根据mechanism创建SaslServer,代码如下:
saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { public SaslServer run() throws SaslException { return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler); } });
这里具体根据我们在jaas配置文件中的配置,最后是返回了一个PlainSaslServer,具体为什么返回了一个PlainSaslServer稍后讲。
* 然后下一状态是AUTHENTICATE,验证客户端发来的明文用户名和密码,调用了PlainSaslServer的evaluateResponse,代码如下
String[] tokens; try { tokens = new String(response, "UTF-8").split("\u0000"); } catch (UnsupportedEncodingException e) { throw new SaslException("UTF-8 encoding not supported", e); } if (tokens.length != 3) throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length); authorizationID = tokens[0]; String username = tokens[1]; String password = tokens[2]; if (username.isEmpty()) { throw new SaslException("Authentication failed: username not specified"); } if (password.isEmpty()) { throw new SaslException("Authentication failed: password not specified"); } if (authorizationID.isEmpty()) authorizationID = username; try { String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + username); if (!password.equals(expectedPassword)) { throw new SaslException("Authentication failed: Invalid username or password"); } } catch (IOException e) { throw new SaslException("Authentication failed: Invalid JAAS configuration", e); }
所以这里可以根据我们的需求根据客户端传过来的username和password动态的去某一个数据源获取和匹配其合法性。
jaas配置文件:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx" password="yyyyy" user_xxxxx="yyyyy"; };
看到这里我们配置了一个org.apache.kafka.common.security.plain.PlainLoginModule
上面讲到返回了一个PlainSaslServer,具体是怎么返回的呢?我们需要从SaslChannelBuilder的configure说起,上流程图:
+ LoginContext.init这里会初始化ConfigFile,读取jaas配置文件中的内容
+ LoginContext.login这里会实例化jaas配置文件中的PlainLoginModule,并依次调用其initialize和login函数,
其中initialize函数会把username和password配置到subject里面去,这个subject最终会通过LoginManager的subject函数在SaslChannelBuilder的buildChannel函数中获取到,设置到SaslClientAuthenticator中用于和其他服务器通讯验证使用。
+ 而PlainLoginModule可不仅仅只做了这一件事情,该类定义了一个静态块初始化代码,调用了PlainSaslServerProvider的initialize函数用于注册创建负责做PLAIN协议验证的类的工厂类PlainSaslServerFactory。代码如下:
protected PlainSaslServerProvider() { super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka"); super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName()); }
我们看到注册的是一个SaslServerFactory.PLAIN -> org.apache.kafka.common.security.plain.PlainSaslServer.PlainSaslServerFactory的对应关系
而再看前面调用的Sasl的createSaslServer的代码:
String mechFilter = "SaslServerFactory." + mechanism; Provider[] provs = Security.getProviders(mechFilter); for (int j = 0; provs != null && j < provs.length; j++) { className = provs[j].getProperty(mechFilter); if (className == null) { throw new SaslException("Provider does not support " + mechFilter); } fac = (SaslServerFactory) loadFactory(provs[j], className); if (fac != null) { mech = fac.createSaslServer( mechanism, protocol, serverName, props, cbh); if (mech != null) { return mech; } } }
其中mechFilter的值即是SaslServerFactory.PLAIN,这里取到PlainSaslServerFactory然后调用createSaslServer方法返回了一个PlainSaslServer
综上,kafka内部的整个权限验证的初始化流程和验证逻辑已经比较清晰了(可能讲的比较乱,反正我是清晰了),但是我们发现,跟网上的其他自己编写LoginModule模块做验证的方式不同,kafka的PlainLoginModule这个类本身并没有做什么跟验证有关的逻辑,
只是做了一些初始化和注册provider的工作,而真正做权限验证的是从provider间接生产出来的PlainSaslServer类。
相关文章推荐
- HBase源码分析之Simple权限验证
- HBase源码分析之Ranger权限验证
- [zz]DEFAULT_KEYS_SHORTCUT 功能的验证 及其 源码实现分析
- asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证
- iwebshop源码分析--权限分析
- Kafka源码分析(2)
- Kafka源码分析(5)
- kafka源码分析
- Struts2拦截器权限验证(源码)!
- shiro学习笔记——从源码角度分析shiro身份验证过程
- Rails自带用户验证has_secure_password的使用与源码分析
- kafka源码分析之一server启动分析
- YII自带验证的源码分析
- Kafka Producer同步模式发送message源码分析
- JobTracker之作业恢复与权限管理机制(源码分析第四篇)
- openstack keystone分析之一---------登陆和权限验证
- asp.net mvc源码分析-DefaultModelBinder 自定义的普通数据类型的绑定和验证
- DEFAULT_KEYS_SHORTCUT 功能的验证 及其 源码实现分析
- JS页面验证必填源码及问题分析
- Spark集成Kafka源码分析——SparkStreaming从kafak中接收数据