Openstack Cinder中建立volume过程的源码解析(9)
2014-04-15 23:37
387 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
我们在上一篇博客中可以看到,在建立新卷之前,需要获取建立卷的目标主机,经过分析源码我们可以知道,如果没有指定建立新卷的目标主机,需要通过调度器算法实现目标主机的确定,如果指定了建立新卷的目标主机,则直接获取目标主机,无论是哪种情况,都需要调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立。在这篇博客中,我们就具体来分析这个方法的实现过程。
我们先来看方法create_volume的源码实现:
self._create_func_mapping = {
'raw': self._create_raw_volume,
'snap': self._create_from_snapshot,
'source_vol': self._create_from_source_volume,
'image': self._create_from_image,
}
这里就指明了建立新卷的四种途径,即直接建立raw格式的新卷、从快照建立新卷、从已有的卷建立新卷和从镜像建立新卷。在上述类的__call__方法中,根据具体情况分别调用了不用的方法实现了新卷的建立,我们来看看这几个建立新卷的方法的源码:
OK!到此为止,cinder中建立新卷的整体流程的源码分析已经全部完成,其实我想说的一句话就是,如果真的把这个流程的实现过程搞清楚,那么cinder模块的源码也就基本掌握了。
谢谢大家的支持!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
我们在上一篇博客中可以看到,在建立新卷之前,需要获取建立卷的目标主机,经过分析源码我们可以知道,如果没有指定建立新卷的目标主机,需要通过调度器算法实现目标主机的确定,如果指定了建立新卷的目标主机,则直接获取目标主机,无论是哪种情况,都需要调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立。在这篇博客中,我们就具体来分析这个方法的实现过程。
我们先来看方法create_volume的源码实现:
def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None): """ 远程调用实现建立并导出卷; """ request_spec_p = jsonutils.to_primitive(request_spec) self.cast(ctxt, self.make_msg('create_volume', volume_id=volume['id'], request_spec=request_spec_p, filter_properties=filter_properties, allow_reschedule=allow_reschedule, snapshot_id=snapshot_id, image_id=image_id, source_volid=source_volid), # queue_get_for:根据给定的topic和host获取对应的队列名称; topic=rpc.queue_get_for(ctxt, self.topic, host), version='1.4')我们可以看到,这里也是应用了广播方法cast实现远程调用方法create_volume,即/cinder/volume/manager.py----class VolumeManager----def create_volume,我们具体来看这个方法的实现源码:
@utils.require_driver_initialized def create_volume(self, context, volume_id, request_spec=None, filter_properties=None, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None): """ Creates and exports the volume. 建立并导出卷; """ # 构建并返回用于通过管理器建立卷的flow; flow = create_volume.get_manager_flow( self.db, self.driver, self.scheduler_rpcapi, self.host, volume_id, request_spec=request_spec, filter_properties=filter_properties, allow_reschedule=allow_reschedule, snapshot_id=snapshot_id, image_id=image_id, source_volid=source_volid, reschedule_context=context.deepcopy()) assert flow, _('Manager volume flow not retrieved') # 进行flow的运行操作; flow.run(context.elevated()) if flow.state != states.SUCCESS: raise exception.CinderException(_("Failed to successfully complete" " manager volume workflow")) self._reset_stats() return volume_id可见,这里再一次应用taskflow模式来实现建立并导出卷的操作。我们具体来看方法get_manager_flow的源码实现:
def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id, request_spec=None, filter_properties=None, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None, reschedule_context=None): """ Constructs and returns the manager entrypoint flow. 构建并返回用于通过管理器建立卷的flow; flow将会做以下的事情: 1. 首先要确定我们是否允许进行重新调度,因为这影响了我们如何对出现错误的情况进行处理; 2. 为相关的task注入keys和values; 3. 对于出错的task进行处理,发送错误通知,记录错误信息等; 4. 实现了从输入的参数中提取建立卷的规范信息的操作; 5. 通知已经开始进行卷的建立操作; 6. 根据所获取的建立卷的规范信息实现卷的建立操作; 7. 当成功的建立卷之后,完成卷建立之后的通知操作; """ # flow_name:volume_create_manager; flow_name = ACTION.replace(":", "_") + "_manager" # 获取类Flow的实例化对象; volume_flow = linear_flow.Flow(flow_name) # Determine if we are allowed to reschedule since this affects how # failures will be handled. # 首先要确定我们是否允许进行重新调度,因为这影响了我们如何对出现错误的情况进行处理; if not filter_properties: filter_properties = {} if not request_spec and allow_reschedule: LOG.debug(_("No request spec, will not reschedule")) allow_reschedule = False if not filter_properties.get('retry', None) and allow_reschedule: LOG.debug(_("No retry filter property or associated " "retry info, will not reschedule")) allow_reschedule = False # 添加一个给定的task到flow; # 这个类实现了注入字典信息到flow中; volume_flow.add(base.InjectTask({ 'filter_properties': filter_properties, 'image_id': image_id, 'request_spec': request_spec, 'snapshot_id': snapshot_id, 'source_volid': source_volid, 'volume_id': volume_id, }, addons=[ACTION])) # 如果不允许进行重新调度的操作; if not allow_reschedule: # On failure ensure that we just set the volume status to error. LOG.debug(_("Retry info not present, will not reschedule")) # 添加一个给定的task到flow; # 这个task实现了当出现错误时,设置指定id的卷的状态为ERROR; volume_flow.add(OnFailureChangeStatusTask(db)) # 如果允许进行重新调度的操作; else: # 添加一个给定的task到flow; # 触发一个发送进行重新调度的请求,当进行task恢复回滚操作的时候; volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, scheduler_rpcapi)) # 添加一个给定的task到flow; # 提取一个用于建立卷的通用结构规范; volume_flow.add(ExtractVolumeSpecTask(db)) # 添加一个给定的task到flow; # 执行关于给定卷的相关通知操作,获取指定卷的使用率信息,并进行通知操作; volume_flow.add(NotifyVolumeActionTask(db, host, "create.start")) # 添加一个给定的task到flow; # 根据所提供的规范要求实现卷的建立操作; volume_flow.add(CreateVolumeFromSpecTask(db, host, driver)) # 添加一个给定的task到flow; # 当成功的建立卷之后,完成卷建立之后的通知操作; volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end")) # 获取flow的调试信息; return flow_utils.attach_debug_listeners(volume_flow)这里最重要的一个task就是CreateVolumeFromSpecTask,它所实现的操作就是根据所提供的规范要求实现卷的建立。我们具体来看这个类的源码实现:
class CreateVolumeFromSpecTask(base.CinderTask): """ 根据所提供的规范要求实现卷的建立操作; """ def __init__(self, db, host, driver): super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION]) self.db = db self.driver = driver self.requires.update(['volume_spec', 'volume_ref']) self._create_func_mapping = { 'raw': self._create_raw_volume, 'snap': self._create_from_snapshot, 'source_vol': self._create_from_source_volume, 'image': self._create_from_image, } self.host = host def __call__(self, context, volume_ref, volume_spec): """ 根据所提供的规范要求实现卷的建立操作; """ if not self.driver.initialized: LOG.error(_("Unable to create volume, driver not initialized")) driver_name = self.driver.__class__.__name__ raise exception.DriverNotInitialized(driver=driver_name) # 获取建立卷的类型信息; create_type = volume_spec.pop('type', None) # 根据具体的建立卷的类型,获取对应的建立卷的方法; # self._create_func_mapping = { # 'raw': self._create_raw_volume, # 'snap': self._create_from_snapshot, # 'source_vol': self._create_from_source_volume, # 'image': self._create_from_image, # } create_functor = self._create_func_mapping.get(create_type) if not create_functor: raise exception.VolumeTypeNotFound(volume_type_id=create_type) volume_spec = dict(volume_spec) volume_id = volume_spec.pop('volume_id', None) if not volume_id: volume_id = volume_ref['id'] LOG.info(_("Volume %(volume_id)s: being created using %(functor)s " "with specification: %(volume_spec)s") % {'volume_spec': volume_spec, 'volume_id': volume_id, 'functor': _make_pretty_name(create_functor)}) volume_ref['host'] = self.host # 根据确定的要调用的建立卷的方法,调用这个方法实现指定类型的卷的建立操作; model_update = create_functor(context, volume_ref=volume_ref, **volume_spec) try: if model_update: volume_ref = self.db.volume_update(context, volume_ref['id'], model_update) except exception.CinderException as ex: if model_update: LOG.exception(_("Failed updating model of volume %(volume_id)s" " with creation provided model %(model)s") % {'volume_id': volume_id, 'model': model_update}) raise exception.ExportFailure(reason=ex) model_update = None try: LOG.debug(_("Volume %s: creating export"), volume_ref['id']) # 为逻辑卷创建导出接口; model_update = self.driver.create_export(context, volume_ref) if model_update: self.db.volume_update(context, volume_ref['id'], model_update) except exception.CinderException as ex: if model_update: LOG.exception(_("Failed updating model of volume %(volume_id)s" " with driver provided model %(model)s") % {'volume_id': volume_id, 'model': model_update}) raise exception.ExportFailure(reason=ex)我们在这个类的初始化方法中可以看到:
self._create_func_mapping = {
'raw': self._create_raw_volume,
'snap': self._create_from_snapshot,
'source_vol': self._create_from_source_volume,
'image': self._create_from_image,
}
这里就指明了建立新卷的四种途径,即直接建立raw格式的新卷、从快照建立新卷、从已有的卷建立新卷和从镜像建立新卷。在上述类的__call__方法中,根据具体情况分别调用了不用的方法实现了新卷的建立,我们来看看这几个建立新卷的方法的源码:
def _create_raw_volume(self, context, volume_ref, **kwargs): """ 实现raw格式卷的建立; """ return self.driver.create_volume(volume_ref) def _create_from_snapshot(self, context, volume_ref, snapshot_id, **kwargs): """ 实现从快照建立卷的操作,并根据具体情况实现对指定卷的glance元数据进行更新操作; """ volume_id = volume_ref['id'] # 获取指定卷的快照; snapshot_ref = self.db.snapshot_get(context, snapshot_id) # 调用具体驱动中的create_volume_from_snapshot方法,实现从快照建立卷; model_update = self.driver.create_volume_from_snapshot(volume_ref, snapshot_ref) make_bootable = False try: # 根据volume_id获取volume; originating_vref = self.db.volume_get(context, snapshot_ref['volume_id']) make_bootable = originating_vref.bootable except exception.CinderException as ex: LOG.exception(_("Failed fetching snapshot %(snapshot_id)s bootable" " flag using the provided glance snapshot " "%(snapshot_ref_id)s volume reference") % {'snapshot_id': snapshot_id, 'snapshot_ref_id': snapshot_ref['volume_id']}) raise exception.MetadataUpdateFailure(reason=ex) if make_bootable: # 根据具体情况实现对指定卷的glance元数据进行更新操作; self._handle_bootable_volume_glance_meta(context, volume_id, snapshot_id=snapshot_id) return model_update def _create_from_source_volume(self, context, volume_ref, source_volid, **kwargs): """ 实现从源卷建立(实际上就是直接拷贝)卷的操作; """ # 根据source_volid获取卷的信息; srcvol_ref = self.db.volume_get(context, source_volid) # 创建指定卷的克隆; model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref) # 根据具体情况实现对指定卷的glance元数据进行更新操作; if srcvol_ref.bootable: self._handle_bootable_volume_glance_meta(context, volume_ref['id'], source_volid=source_volid) return model_update def _create_from_image(self, context, volume_ref, image_location, image_id, image_meta, image_service, **kwargs): """ 从镜像实现卷的建立; """ LOG.debug(_("Cloning %(volume_id)s from image %(image_id)s " " at location %(image_location)s") % {'volume_id': volume_ref['id'], 'image_location': image_location, 'image_id': image_id}) # 从现有的镜像有效的建立一个卷; model_update, cloned = self.driver.clone_image(volume_ref, image_location, image_id) # 如果没有实现克隆,说明没有指定的镜像; # 实现建立卷,并下载镜像数据到卷中; if not cloned: # 实现建立卷,并下载镜像数据到卷中; model_update = self.driver.create_volume(volume_ref) updates = dict(model_update or dict(), status='downloading') # 更新卷的状态; try: volume_ref = self.db.volume_update(context, volume_ref['id'], updates) except exception.CinderException: LOG.exception(_("Failed updating volume %(volume_id)s with " "%(updates)s") % {'volume_id': volume_ref['id'], 'updates': updates}) # 下载glance镜像数据到指定的卷; self._copy_image_to_volume(context, volume_ref, image_id, image_location, image_service) # 根据具体情况实现对指定卷的glance元数据进行更新操作; self._handle_bootable_volume_glance_meta(context, volume_ref['id'], image_id=image_id, image_meta=image_meta) return model_update再来看这几个方法的源码,不同的方法中会进一步调用不同的方法来实现新卷的建立,这就直接与/cinder/volume/drivers中的不同的块存储后端实现直接联系到一起了,具体调用的是那一种块存储器中的建立卷的方法,就是由self.driver所确定的。
OK!到此为止,cinder中建立新卷的整体流程的源码分析已经全部完成,其实我想说的一句话就是,如果真的把这个流程的实现过程搞清楚,那么cinder模块的源码也就基本掌握了。
谢谢大家的支持!
相关文章推荐
- Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
- Openstack Cinder中建立volume过程的源码解析(2)
- Openstack Cinder中建立volume过程的源码解析(3)
- Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
- Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
- OpenStack Cinder服务启动过程中的资源加载和扩展源码解析之三
- Openstack Cinder中建立volume过程的源码解析(1)
- OpenStack建立实例完整过程源码详细分析(14)----依据AMQP通信架构实现消息接收机制解析之一
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- Openstack Cinder中建立volume过程的源码解析(8)
- OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二
- OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二
- OpenStack Cinder服务启动过程中的资源加载和扩展源码解析之二
- Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析
- OpenStack建立实例完整过程源码详细分析(4)
- openstack nova 源码解析 — Nova API 执行过程从(novaclient到Action)
- OpenStack建立实例完整过程源码详细分析(9)
- OpenStack源码分析之cinder-volume服务
- OpenStack建立实例完整过程源码详细分析(6)
- openstack_cinder里面的volume的简单解析