zoukankan      html  css  js  c++  java
  • OpenStack_Swift源代码分析——Object-auditor源代码分析(2)

    1 Object-aduitor审计详细分析

    上一篇文章中,解说了Object-aduitor的启动,当中审计的详细运行是AuditorWorker实现的。在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,以下看此方法的详细代码实现:

    def audit_all_objects(self, mode='once', device_dirs=None):
            #run_forever传过来的mode 为forever
            description = ''
            if device_dirs:
                device_dir_str = ','.join(sorted(device_dirs))
                description = _(' - %s') % device_dir_str
            self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
                            (mode, self.auditor_type, description))
            begin = reported = time.time()
            self.total_bytes_processed = 0
            self.total_files_processed = 0
            total_quarantines = 0
            total_errors = 0
            time_auditing = 0
            #返回的是 device_dirs 下的文件hash列表
            #返回内容为 hsh_path, device, partition
            #all_locs 为设备self.device中device_dirs下的全部文件,为 AuditLocation(hsh_path, device, partition)对象
            all_locs = self.diskfile_mgr.object_audit_location_generator(
                device_dirs=device_dirs)
            for location in all_locs:
                loop_time = time.time()
                #一个个的审计
                self.failsafe_object_audit(location)
                self.logger.timing_since('timing', loop_time)
                self.files_running_time = ratelimit_sleep(
                    self.files_running_time, self.max_files_per_second)
                self.total_files_processed += 1
                now = time.time()
                if now - reported >= self.log_time:
                    self.logger.info(_(
                        'Object audit (%(type)s). '
                        'Since %(start_time)s: Locally: %(passes)d passed, '
                        '%(quars)d quarantined, %(errors)d errors '
                        'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
                        'Total time: %(total).2f, Auditing time: %(audit).2f, '
                        'Rate: %(audit_rate).2f') % {
                            'type': '%s%s' % (self.auditor_type, description),
                            'start_time': time.ctime(reported),
                            'passes': self.passes, 'quars': self.quarantines,
                            'errors': self.errors,
                            'frate': self.passes / (now - reported),
                            'brate': self.bytes_processed / (now - reported),
                            'total': (now - begin), 'audit': time_auditing,
                            'audit_rate': time_auditing / (now - begin)})
                    cache_entry = self.create_recon_nested_dict(
                        'object_auditor_stats_%s' % (self.auditor_type),
                        device_dirs,
                        {'errors': self.errors, 'passes': self.passes,
                         'quarantined': self.quarantines,
                         'bytes_processed': self.bytes_processed,
                         'start_time': reported, 'audit_time': time_auditing})
                    dump_recon_cache(cache_entry, self.rcache, self.logger)
                    reported = now
                    total_quarantines += self.quarantines
                    total_errors += self.errors
                    self.passes = 0
                    #隔离的数量
                    self.quarantines = 0
                    self.errors = 0
                    self.bytes_processed = 0
                time_auditing += (now - loop_time)
            # Avoid divide by zero during very short runs
            elapsed = (time.time() - begin) or 0.000001
            self.logger.info(_(
                'Object audit (%(type)s) "%(mode)s" mode '
                'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
                'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
                'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
                'Rate: %(audit_rate).2f') % {
                    'type': '%s%s' % (self.auditor_type, description),
                    'mode': mode, 'elapsed': elapsed,
                    'quars': total_quarantines + self.quarantines,
                    'errors': total_errors + self.errors,
                    'frate': self.total_files_processed / elapsed,
                    'brate': self.total_bytes_processed / elapsed,
                    'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
            # Clear recon cache entry if device_dirs is set
            if device_dirs:
                cache_entry = self.create_recon_nested_dict(
                    'object_auditor_stats_%s' % (self.auditor_type),
                    device_dirs, {})
                dump_recon_cache(cache_entry, self.rcache, self.logger)
            if self.stats_sizes:
                self.logger.info(
                    _('Object audit stats: %s') % json.dumps(self.stats_buckets))
    方法 self.failsafe_object_audit(location)是找到devcie/objects下全部被审计对象的位置,审计就是要扫描全部的对象。发现有问题的文件就要隔离,在得到被审计对象的位置后,返回 AuditLocation(hsh_path, device, partition)类的迭代对象,也就是对于没一个.data文件都会去实例化一个 AuditLocation对象,将其传给failsafe_object_audit方法。由其来进行下一步的操作。那么详细看failsafe_object_audit的实现:

    def failsafe_object_audit(self, location):
            """
            object_audit的切入点
            Entrypoint to object_audit, with a failsafe generic exception handler.
            """
            try:
                #审计object
                self.object_audit(location)
            except (Exception, Timeout):
                self.logger.increment('errors')
                self.errors += 1
                self.logger.exception(_('ERROR Trying to audit %s'), location)
    
    此方法中主要是运行object_audit()方法,由其来运行详细的审计。其代码实现例如以下:

    def object_audit(self, location):
            """
            Audits the given object location.
    
            :param location: an audit location
                             (from diskfile.object_audit_location_generator)
            """
            def raise_dfq(msg):
                raise DiskFileQuarantined(msg)
    
            try:
                df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
                #df 调用 DiskFile中的open方法
                with df.open():
                    metadata = df.get_metadata()
                    obj_size = int(metadata['Content-Length'])
                    if self.stats_sizes:
                        self.record_stats(obj_size)
                    #没有被损坏
                    if self.zero_byte_only_at_fps and obj_size:
                        self.passes += 1
                        return
                    #_quarantine_hook 隔离挂钩 reader中 reader中假设文件大小或mdf变化会把文件隔离
                    #reader是DiskFileReader对象
                    reader = df.reader(_quarantine_hook=raise_dfq)
               #在文件关闭的时候。调用DiskFileReader的close方法。

    而且self._handle_close_quarantine()来处理隔离 replicate是怎样接收隔离的? with closing(reader): for chunk in reader: chunk_len = len(chunk) #流量限制 ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5) self.bytes_running_time = ratelimit_sleep( self.bytes_running_time, self.max_bytes_per_second, incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len except DiskFileNotExist: return #抛出文件隔离错误。隔离数量+1 except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' ' quarantined: %(err)s'), {'obj': location, 'err': err}) self.passes += 1

    以下我们来详细分析该方法,首先是通过传入的參数,来实例化一个DiskFile类,df = self.diskfile_mgr.get_diskfile_from_audit_location(location),当中location是 AuditLocation实例,
        def get_diskfile_from_audit_location(self, audit_location):
            dev_path = self.get_dev_path(audit_location.device, mount_check=False)
            return DiskFile.from_hash_dir(
                self, audit_location.path, dev_path,
                audit_location.partition)
    
    
        @classmethod
        def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
            return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)
    上面两个函数的意思即为利用AuditLocation实例中的属性来实例化一个DiskFile类,DiskFile有对文件操作的详细方法,在得到df后须要打开auditlocation.path所指向的文件,并获得它的metadata,然后读取文件,读取文件须要专门实例化一个DiskFileReader类。这个类是对文件进行隔离的关键。方法比較隐蔽。须要多多注意。

     def reader(self, keep_cache=False,
                   _quarantine_hook=lambda m: None):
            """
            Return a :class:`swift.common.swob.Response` class compatible
            "`app_iter`" object as defined by
            :class:`swift.obj.diskfile.DiskFileReader`.
            这个实现将打开文件的关闭传递给swift.obj.diskfile.DiskFileReader来负责
            For this implementation, the responsibility of closing the open file
            is passed to the :class:`swift.obj.diskfile.DiskFileReader` object.
    
            :param keep_cache: caller's preference for keeping data read in the
                               OS buffer cache
            :param _quarantine_hook: 1-arg callable called when obj quarantined;
                                     the arg is the reason for quarantine.
                                     Default is to ignore it.
                                     Not needed by the REST layer.
            :returns: a :class:`swift.obj.diskfile.DiskFileReader` object
            """
            dr = DiskFileReader(
                self._fp, self._data_file, int(self._metadata['Content-Length']),
                self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
                self._mgr.keep_cache_size, self._device_path, self._logger,
                quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
            # At this point the reader object is now responsible for closing
            # the file pointer.文件指针
            self._fp = None
            return dr

    DiskFileReader会读取读取文件里的对象

     def __iter__(self):
            """Returns an iterator over the data file."""
            try:
                dropped_cache = 0
                self._bytes_read = 0
                self._started_at_0 = False
                self._read_to_eof = False
                if self._fp.tell() == 0:
                    self._started_at_0 = True
                    self._iter_etag = hashlib.md5()
                while True:
                    chunk = self._threadpool.run_in_thread(
                        self._fp.read, self._disk_chunk_size)
                    if chunk:
                        if self._iter_etag:
                            self._iter_etag.update(chunk)
                        self._bytes_read += len(chunk)
                        if self._bytes_read - dropped_cache > (1024 * 1024):
                            self._drop_cache(self._fp.fileno(), dropped_cache,
                                             self._bytes_read - dropped_cache)
                            dropped_cache = self._bytes_read
                        yield chunk
                    else:
                        self._read_to_eof = True
                        self._drop_cache(self._fp.fileno(), dropped_cache,
                                         self._bytes_read - dropped_cache)
                        break
            finally:
                if not self._suppress_file_closing:
                    self.close()
    __iter__是对文件读取内容的迭代。在读取过程中会计算新的etag值,final方法。关闭文件,在关闭文件时,假设有须要隔离的对象,则就会将对象隔离,先看close函数的实现

    def close(self):
            """
            Close the open file handle if present.
    
            For this specific implementation, this method will handle quarantining
            the file if necessary.
            """
            if self._fp:
                try:
                    if self._started_at_0 and self._read_to_eof:     #文件从头到尾都读完
                        self._handle_close_quarantine()
                except DiskFileQuarantined:
                    raise
                except (Exception, Timeout) as e:
                    self._logger.error(_(
                        'ERROR DiskFile %(data_file)s'
                        ' close failure: %(exc)s : %(stack)s'),
                        {'exc': e, 'stack': ''.join(traceback.format_stack()),
                         'data_file': self._data_file})
                finally:
                    fp, self._fp = self._fp, None
                    fp.close()

    当文件从头到尾都都完是,关闭文件时对于不完整的文件会进行隔离,以下看self._handle_close_quarantine()方法

     def _handle_close_quarantine(self):
            """Check if file needs to be quarantined(检查文件是否须要隔离)"""
            if self._bytes_read != self._obj_size:
                self._quarantine(
                    "Bytes read: %s, does not match metadata: %s" % (
                        self._bytes_read, self._obj_size))
            elif self._iter_etag and 
                    self._etag != self._iter_etag.hexdigest():
                self._quarantine(
                    "ETag %s and file's md5 %s do not match" % (
                        self._etag, self._iter_etag.hexdigest()))
    首先判读文件的长度和读取的长度是否同样,假设不同样,则为_quarantine方法传入的是读取的长度不匹配。假设etag不想同则传入的是md5值不匹配,看_quarantine方法的详细实现:

     def _quarantine(self, msg):
            #移到一个隔离区
            self._quarantined_dir = self._threadpool.run_in_thread(
                quarantine_renamer, self._device_path, self._data_file)
            self._logger.warn("Quarantined object %s: %s" % (
                self._data_file, msg))
            self._logger.increment('quarantines')
            self._quarantine_hook(msg)
    
    _quarantine方法中quarantine_renamer会运行终于的隔离:

    def quarantine_renamer(device_path, corrupted_file_path):
        """
        In the case that a file is corrupted文件损坏了, move it to a quarantined
        area to allow replication to fix it.让 replication 来处理
    
        :params device_path: The path to the device the corrupted file is on.
        :params corrupted_file_path: The path to the file you want quarantined.
    
        :returns: path (str) of directory the file was moved to
        :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
                         exceptions from rename
        """
        from_dir = dirname(corrupted_file_path)
        to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
        invalidate_hash(dirname(from_dir))
        try:
            renamer(from_dir, to_dir)
        except OSError as e:
            if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
                raise
            to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
            renamer(from_dir, to_dir)
        return to_dir
    文件将会隔离到device_path/quarantined/objects文件夹下,并将隔离的文件夹返回。当中renamer方法

    def renamer(old, new):
        """
        Attempt to fix / hide race conditions like empty object directories
        being removed by backend processes during uploads, by retrying.
    
        :param old: old path to be renamed
        :param new: new path to be renamed to
        """
        try:
            mkdirs(os.path.dirname(new))
            os.rename(old, new)
        except OSError:
            mkdirs(os.path.dirname(new))
            os.rename(old, new)
    从代码能够看出,将文件隔离到了新文件夹中。

    因为本人水平有限。文中难免出现理解错误,敬请指正、交流。谢谢!


  • 相关阅读:
    正则里的.*?
    无边框缩放
    平台 测试笔记
    eclipse快捷键
    linux笔记
    笔记
    wamp、wordpress
    java-selenium
    html/css笔记
    selenium2——ruby
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/7048391.html
Copyright © 2011-2022 走看看