【必赢亚洲988.net】strlen源代码分析,ObjectReplic

作者:计算机知识

tony bai

0. chromium源代码解析,chromium源代码深入分析

【必赢亚洲988.net】strlen源代码分析,ObjectReplicator源代码分析。本筹划在CSDN写完那多种文字,却因为在CSDN中误删了风姿罗曼蒂克篇blog,该篇blog被移到了回笼站。然则CSDN居然未有从回笼站打消删除的操作方法。联想到事先CSDN泄密的主题材料,其可相信性值得存疑。随转向CNBlog来写这个文字。

偷得浮生半日闲。忙中偷闲看看chromium代码。看过后生可畏段,不禁慨然,不亏是生机勃勃品的店堂,写出的架构也是不日常。不看代码,就凭chrome在windows上运行的频率也能心获得chromium代码上的急速。

自家能侥幸看见chromium的源代码,还要多谢google为开源职业做的扶植。自己目光如豆,能有chromium代码学习的空子,真是万幸。希望能借chromium之力,提高本身。

能写那连串文档,首先谢谢google的开源,在自家的回忆中开源的事物依然正是效用简陋,要么正是bug满天飞,抑或是满篇代码未有一点点儿注释。但chromium却非常差别样,速度快,分界面清爽,bug少。看其代码,注释也是满满的。

再次要谢谢的是最早将chromium的UI收抽出来的先辈(很中期的先辈,以往baidu、google都力不从心找到了,未能记录前辈姓名十二分惭愧。),前辈将chromium的UI部分抽出并公开出来--chromiumFrame。前辈已经将chromium的UI框架收取,音讯管理,根基类库等等皆有分装。前人栽树后人乘凉,感激前辈解析后能公开出来,作者也是在前辈的chromiumFrame底工上再做深远的剖判学习。

CSDN上探究Chromium的大神超多,小编写那多种小说,只为自身记录点滴,不能够与大神们做类比。且自个儿不善写作,超级多文字唯有和煦能看通晓,写在blog上,又不免会有看客看过。如有写不明道(Mingdao卡塔尔国不白之处还期望看客不要深究。若有不当之处,请回复建议,本人不胜多谢。

闲话少说。策画工具和代码,一览chromium气度。先要下载前辈的chromiumFrame,链接就请去前辈的blog查找。其次是工具sourceinsight,vs二零零六必得。

自己那不编写翻译Chromium。3年前i5 8G的机器,chromium小编编写翻译了整整4个钟头,vs调节和测量检验起来也异常慢。这种措施招致本人间接从未深远的的就学chromium。3年后的昨日作者决定用二〇一一年的代码初步学习框架。我相信chromium那4年来在框架上变化应该不会太大。

 

以下是大神小说的参阅,记录不全,如有参谋定不敢疏漏,日后补给。

上文提到的前辈一时不恐怕找到,日后定要寻到并记下:

. chromium源代码深入分析,chromium源代码分析本打算在CSDN写完那三翻伍次串文字,却因为在CSDN中误删了风姿罗曼蒂克篇blog,该篇blog被移到了回笼站。可是CSDN居然...

1、Replicator运转代码具体深入分析

上篇问中介绍了开发银行Replicator的详尽经过,以下演说Replicator的运行代码的详实落成,首先看replicate方法:

def replicate(self, override_devices=None, override_partitions=None):
        """Run a replication pass"""
        self.start = time.time()
        self.suffix_count = 0
        self.suffix_sync = 0
        self.suffix_hash = 0
        self.replication_count = 0
        self.last_replication_count = -1
        self.partition_times = []

        if override_devices is None:
            override_devices = []
        if override_partitions is None:
            override_partitions = []
        #heartbeat 为心跳函数 依据配置。配置没有 默觉得 300
        stats = eventlet.spawn(self.heartbeat)
        #detect_lockup  检查死锁
        lockup_detector = eventlet.spawn(self.detect_lockups)
        eventlet.sleep()  # Give spawns a cycle

        try:
            #replication 的 woker 数量
            self.run_pool = GreenPool(size=self.concurrency)
            # Returns a sorted list of jobs (dictionaries) that specify the
            # partitions, nodes, etc to be synced.
            # 返回专门为分区,节点同步工作的排序的列表
            #
            jobs = self.collect_jobs()
            for job in jobs:
                #重写设备
                if override_devices and job['device'] not in override_devices:
                    continue
                #重写分区
                if override_partitions and 
                        job['partition'] not in override_partitions:
                    continue
                #假设重写设备及其重写分区在job 中
                dev_path = join(self.devices_dir, job['device'])
                if self.mount_check and not ismount(dev_path):
                    self.logger.warn(_('%s is not mounted'), job['device'])
                    continue
                #ring没有改变
                if not self.check_ring():
                    self.logger.info(_("Ring change detected. Aborting "
                                       "current replication pass."))
                    return
                #假设
                if job['delete']:
                    self.run_pool.spawn(self.update_deleted, job)
                else:
                    #运行的是更新
                    self.run_pool.spawn(self.update, job)
            with Timeout(self.lockup_timeout):
                self.run_pool.waitall()
        except (Exception, Timeout):
            self.logger.exception(_("Exception in top-level replication loop"))
            self.kill_coros()
        finally:
            stats.kill()
            lockup_detector.kill()
            self.stats_line()

在replicate方法中,首先是为replicate方法运维的预备干活。个中最重大的是要访谈要运营的job的collection_jobs方法,以下为其代码的亲力亲为实现:

def collect_jobs(self):
        """
        Returns a sorted list of jobs (dictionaries) that specify the
        partitions, nodes, etc to be synced.
        """
        jobs = []
        ips = whataremyips()
        #replication_ip 和replication_port 在  RingBuilder中 load加入
        #self.object_ring = Ring(self.swift_dir, ring_name='object')
        for local_dev in [dev for dev in self.object_ring.devs
                          if dev and dev['replication_ip'] in ips and
                          dev['replication_port'] == self.port]:
            dev_path = join(self.devices_dir, local_dev['device'])
            obj_path = join(dev_path, 'objects')
            tmp_path = join(dev_path, 'tmp')
            if self.mount_check and not ismount(dev_path):
                self.logger.warn(_('%s is not mounted'), local_dev['device'])
                continue
        #Remove any file in a given path that that was last modified before mtime.
        #/srv/1/node/sdb1/tmp下的文件
            unlink_older_than(tmp_path, time.time() - self.reclaim_age)
            if not os.path.exists(obj_path):
                try:
                    mkdirs(obj_path)
                except Exception:
                    self.logger.exception('ERROR creating %s' % obj_path)
                continue
            #root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
            #13069  133971  4799  58208  94238
            for partition in os.listdir(obj_path):
                try:
                    job_path = join(obj_path, partition)
                    #推断当前路径是否为文件。假设是文件则删除
                    if isfile(job_path):
                        #
                        # Clean up any (probably zero-byte) files where a
                        # partition should be.
                        self.logger.warning('Removing partition directory '
                                            'which was a file: %s', job_path)
                        os.remove(job_path)
                        continue
                    #获得每一个partion相应的设备
                    part_nodes = 
                        self.object_ring.get_part_nodes(int(partition))
                    #nodes为不是本机器nodes的其它replica-1个nodes
                    nodes = [node for node in part_nodes
                             if node['id'] != local_dev['id']]
                    #对objects下全部partion遍历,故有jobs的长度最大为_replica2part2dev分区备份中出现此设备有此设备id的分区和
                    jobs.append(
                        dict(path=job_path,
                             device=local_dev['device'],
                             nodes=nodes,
                             #len(nodes)>len(part_nodes)-1的情况是当前节点已经不再是 当前partition所相应的设备了。有可能删除了该设备
                             delete=len(nodes) > len(part_nodes) - 1,
                             partition=partition))
                except (ValueError, OSError):
                    continue
        #打乱顺序
        random.shuffle(jobs)
        if self.handoffs_first:
            # Move the handoff parts to the front of the list
            #将handoff 节点移到jobs队列的前边
            jobs.sort(key=lambda job: not job['delete'])
        self.job_count = len(jobs)
        return jobs

对此第二层for循环,os.listdir(obj_path)列出objects目录下的整套partion,创立object是在objects目录下创造objects所映射的分区号的文件件,再在partion目录下开创以object的hash值后四人为名称的目录。然后再在后缀目录下创建以object的hash值为目录名的目录,object会储存为以object上传时间戳为名.data为文件后缀的文书。

经过掌握风流洒脱致性hash算法可以预知。扩大虚构节点后各样设备会三个虚构节点和其对应,借使二个设施对应的分区为n则,obj_path下子目录数目会<=n,由于存入的整整文书并不一定都能映照到近年来配备所对应的分区。

for循环首先判读obj_path下是还是不是为文件,如果文件则删除。若不是则收获该分区号,依附分区号拿到该分区号所映射的四个备份设备。并将配备id和本地设备id不想等的增至nodes中,将nodes、path等音讯增到jobs中。最终打乱jobs的逐个,再将handoff 节点移到行列前边。重临jobs。再到replicate方法,首先我们看job[delete]为False的情况。当job[delete]为False会运维update方法,上面看update方法的详实落成:

def update(self, job):
        """
        High-level method that replicates a single partition.

        :param job: a dict containing info about the partition to be replicated
        """
        self.replication_count  = 1
        self.logger.increment('partition.update.count.%s' % (job['device'],))
        begin = time.time()
        try:
            #get_hashes 从hashes.pkl获取hashes值并更新 获取本地的hashes job[path] 为 job_path = join(obj_path, partition) local_hash为hashes.pkl中的反序列化回来的内容 hashed为改变的
            hashed, local_hash = tpool_reraise(
                get_hashes, job['path'],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)
            self.suffix_hash  = hashed
            self.logger.update_stats('suffix.hashes', hashed)
            #
            attempts_left = len(job['nodes'])
            #此时的nodes为除去本节点外的全部节点 由于 job['nodes]不包括本地节点get_more_nodes(int(job['partition']))能获得除去本partion所相应节点 外的其它全部节点
            nodes = itertools.chain(
                job['nodes'],
                self.object_ring.get_more_nodes(int(job['partition'])))
           #此时attempts_left 为2 若果replica为3
            while attempts_left > 0:
                # If this throws StopIterator it will be caught way below
                node = next(nodes)
                attempts_left -= 1
                try:
                    with Timeout(self.http_timeout):
                        #REPLICARE方法 相应 sever里面的RELICATE方法
                        resp = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '', headers=self.headers).getresponse()
                        if resp.status == HTTP_INSUFFICIENT_STORAGE:
                            self.logger.error(_('%(ip)s/%(device)s responded'
                                                ' as unmounted'), node)
                            attempts_left  = 1
                            continue
                        if resp.status != HTTP_OK:
                            self.logger.error(_("Invalid response %(resp)s "
                                                "from %(ip)s"),
                                              {'resp': resp.status,
                                               'ip': node['replication_ip']})
                            continue
                        #remote_hash 为 请求 'REPLICATE 返回的
                        remote_hash = pickle.loads(resp.read())
                        del resp
                    #找出本地后缀和远程后缀不同的
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #假设没有说明没有变动。则继续请求下一个节点
                    if not suffixes:
                        continue

                    #效果就是运行get_hashes方法 
                    hashed, recalc_hash = tpool_reraise(
                        get_hashes,
                        job['path'], recalculate=suffixes,
                        reclaim_age=self.reclaim_age)
                    self.logger.update_stats('suffix.hashes', hashed)
                    local_hash = recalc_hash
                    #假如 local_hash 为 123 321 122 remote_hash 123 321 124 则 122为变化的
                    #文件路径hash值后三位会不会反复
                    suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #找到了不同的并知道其节点则将其同步到相应的节点。是基于推送模式的,故传的数据是自己本地的数据
                    self.sync(node, job, suffixes)  #同步变化的
                    with Timeout(self.http_timeout):
                        conn = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '/'   '-'.join(suffixes),
                            headers=self.headers)
                        conn.getresponse().read()
                    self.suffix_sync  = len(suffixes)
                    self.logger.update_stats('suffix.syncs', len(suffixes))
                except (Exception, Timeout):
                    self.logger.exception(_("Error syncing with node: %s") %
                                          node)
            #后缀数量 写日志时会用到
            self.suffix_count  = len(local_hash)
        except (Exception, Timeout):
            self.logger.exception(_("Error syncing partition"))
        finally:
            self.partition_times.append(time.time() - begin)
            self.logger.timing_since('partition.update.timing', begin)

update方法,中率先是获取地点文件里当前配备所对应hashes.pkl文件里每二个后缀所对应的hahes值。形如{'a83': '0db7b416c9808517a1bb2157af20b09b'},当中key为文件内容hash值的后三字节。value为后缀目录下总体子目录下(即以文件内容的md5值为名字的目录卡塔尔全体.data文件的公文名称字的md5值,能够精晓为后生可畏体文件名称的md5值和。

            hashed, local_hash = tpool_reraise(
                get_hashes, job['path'],
                do_listdir=(self.replication_count % 10) == 0,
                reclaim_age=self.reclaim_age)

如上代码片段会运作get_hashes方法,并将前边參数字传送递给get_hashes

def get_hashes(partition_dir, recalculate=None, do_listdir=False,  
               reclaim_age=ONE_WEEK):  
    """ 
    Get a list of hashes for the suffix dir.  do_listdir causes it to mistrust 
    the hash cache for suffix existence at the (unexpectedly high) cost of a 
    listdir.  reclaim_age is just passed on to hash_suffix. 

    :param partition_dir: absolute path of partition to get hashes for 
    :param recalculate: 形如 recalculate=['a83'] 
      list of suffixes(后缀,即 hash值的后缀  310即为后缀  root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects/94238# ls 
   310  hashes.pkl   ) which should be recalculated(又一次计算) when got 
    :param do_listdir: force existence check for all hashes in the partition(对partion中的hashe强行运行检查) 
    :param reclaim_age: age at which to remove tombstones 

    :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) 
    """

因未有传递recalulate这几个參数故仅唯有do_listdir为True时会强制运转又壹回总括后缀文件下任何文书名称字的hash值。文件名称字是时间戳,时间戳变了印证文件有创新。故须求和远程同步。检查是或不是为同三个本子号,不是同一个本子号的总得把本地版本号传递给长途server。

attempts_left = len(job['nodes'])
            #此时的nodes为除去本节点外的全部节点 由于 job['nodes]不包括本地节点get_more_nodes(int(job['partition']))能获得除去本partion所相应节点 外的其它全部节点
            nodes = itertools.chain(
                job['nodes'],
                self.object_ring.get_more_nodes(int(job['partition'])))

如上代码片段,attempts_left为当下job相应的分区去掉本地节点的别样的备份节点的个数。获得attempts_left后。上边接着更新了nodes,当中get_more_nodes方法会获得出去本分区所对应节点之外的任何任何节点的迭代器,全体nodes是除了本节点外全部节点的二个迭代器。

上边正是while循环。循环attempts_left次,

resp = http_connect(
                            node['replication_ip'], node['replication_port'],
                            node['device'], job['partition'], 'REPLICATE',
                            '', headers=self.headers).getresponse()

基于迭代拿走的node诉求,因别本节点首先被迭代到,故首先央求别本节点。若果成功须求读取resp重临的内容,得到远程设备同二个partion下的remote_hash

suffixes = [suffix for suffix in local_hash if
                                local_hash[suffix] !=
                                remote_hash.get(suffix, -1)]
                    #假设没有说明没有变动,则继续请求下一个节点
                    if not suffixes:
                        continue

相对来说七个道具肖似partion下的hashes.pkl文件风流浪漫律key而value分裂的key。suffixes则印证和远程备份文件都以同三个版本号。继续呼吁下一个备份。假设不为空,则必得管理,同有时间再一回拿走和睦hashes.pkl目录中的内容。由于上一回号令时间中或然有别的的备份已经有新的换代推送到本server了。获得地点最新的hashes.pkl内容后再二回对照。得到区别的雷同分区下的两样后缀
运行同步:

self.sync(node, job, suffixes)  #同步变化的

在联合具名转移时作者方今选择rsync方法。未有利用ssync,只是已经留出了ssync的落到实处,当ssync方法稳固期就能够把rsync替换掉。

(敬请期望卡塔尔国

 def sync(self, node, job, suffixes):  # Just exists for doc anchor point
        """
        Synchronize local suffix directories from a partition with a remote
        node.

        :param node: the "dev" entry for the remote node to sync with
        :param job: information about the partition being synced
        :param suffixes: a list of suffixes which need to be pushed

        :returns: boolean indicating success or failure
        """
        # self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
        #配置没有 sync_method方法 则运行类自己的rsync方法
        return self.sync_method(node, job, suffixes)

  sync_method方法从比方以下获得,未有配置则运营rsync方法

  self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')

def rsync(self, node, job, suffixes):
        """
        Uses rsync to implement the sync method. This was the first
        sync method in Swift.
        """
        if not os.path.exists(job['path']):
            return False
        args = [
            'rsync',
            '--recursive',
            '--whole-file',
            '--human-readable',
            '--xattrs',
            '--itemize-changes',
            '--ignore-existing',
            '--timeout=%s' % self.rsync_io_timeout,
            '--contimeout=%s' % self.rsync_io_timeout,
            '--bwlimit=%s' % self.rsync_bwlimit,
        ]
        node_ip = rsync_ip(node['replication_ip'])
        #包括了ip信息
        if self.vm_test_mode:
            rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
        else:
            rsync_module = '%s::object' % node_ip
        had_any = False
        for suffix in suffixes:
            spath = join(job['path'], suffix)
            if os.path.exists(spath):
                args.append(spath)
                had_any = True
        if not had_any:
            return False
        args.append(join(rsync_module, node['device'],
                    'objects', job['partition']))
        #args里面包括了通的全部信息 包括设备名称。设备分区
        return self._rsync(args) == 0

rsync方法将接收的參数都放置args中,然后运维_rsync方法。

    def _rsync(self, args):
        """
        Execute the rsync binary to replicate a partition.

        :returns: return code of rsync process. 0 is successful
        """
        start_time = time.time()
        ret_val = None
        try:
            with Timeout(self.rsync_timeout):
                #此处即为同步操作了,推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)
                results = proc.stdout.read()
                ret_val = proc.wait()
        except Timeout:
            self.logger.error(_("Killing long-running rsync: %s"), str(args))
            proc.kill()
            return 1  # failure response code
        total_time = time.time() - start_time
        for result in results.split('n'):
            if result == '':
                continue
            if result.startswith('cd '):
                continue
            if not ret_val:
                self.logger.info(result)
            else:
                self.logger.error(result)
        if ret_val:
            error_line = _('Bad rsync return code: %(ret)d <- %(args)s') % 
                {'args': str(args), 'ret': ret_val}
            if self.rsync_error_log_line_length:
                error_line = error_line[:self.rsync_error_log_line_length]
            self.logger.error(error_line)
        elif results:
            self.logger.info(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {'src': args[-2], 'dst': args[-1], 'time': total_time})
        else:
            self.logger.debug(
                _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"),
                {'src': args[-2], 'dst': args[-1], 'time': total_time})
        return ret_val

高级中学档比如说以下代码片段正是运营详细的推送:

  #此处即为同步操作了。推送模式
                proc = subprocess.Popen(args,
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.STDOUT)

若job[delete]为True现身这么的事态的或者就是。因增加和删除了配备。Ring 又二遍调度,当前partion中的备份不再有此server的ID如partion号为45678的在rebalance前的对于的备份设备的id为[1,2,3]。如果当前设施id为1,则又三次rebalance后当前partion相应的备份为[4,2,3],则就能够现出job[delete]为True的情事。大家看其代码详细完毕:

    def update_deleted(self, job):
        """
        High-level method that replicates a single partition that doesn't
        belong on (不应放在 )this node.

        :param job: a dict containing info about the partition to be replicated
        """
        #得到parition下相应的后缀
        def tpool_get_suffixes(path):
            return [suff for suff in os.listdir(path)
                    if len(suff) == 3 and isdir(join(path, suff))]
        self.replication_count  = 1
        self.logger.increment('partition.delete.count.%s' % (job['device'],))
        begin = time.time()
        try:
            responses = []
            suffixes = tpool.execute(tpool_get_suffixes, job['path'])
            if suffixes:
                for node in job['nodes']:
                    success = self.sync(node, job, suffixes)      #运行同步
                    if success:
                        with Timeout(self.http_timeout):
                            conn = http_connect(
                                node['replication_ip'],
                                node['replication_port'],
                                node['device'], job['partition'], 'REPLICATE',
                                '/'   '-'.join(suffixes), headers=self.headers)
                            conn.getresponse().read()
                    responses.append(success)
            if self.handoff_delete:
                # delete handoff if we have had handoff_delete successes
                delete_handoff = len([resp for resp in responses if resp]) >= 
                    self.handoff_delete
            else:
                # delete handoff if all syncs were successful
                delete_handoff = len(responses) == len(job['nodes']) and 
                    all(responses)
            #suffixes为空或 请求的三个已经都响应成功后删除本地partion下的文件
            if not suffixes or delete_handoff:
                self.logger.info(_("Removing partition: %s"), job['path'])
                tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
        except (Exception, Timeout):
            self.logger.exception(_("Error syncing handoff partition"))
        finally:
            self.partition_times.append(time.time() - begin)
            self.logger.timing_since('partition.delete.timing', begin)

从那之后 replicate操作就表达完毕,文中若有领悟不创立之处,请指正,谢谢!

1 Object-aduitor审计详细解析

上风华正茂篇小说中,演说了Object-aduitor的启航,个中审计的详细运维是AuditorWorker达成的。在run_audit中实例化了奥迪torWorker类,并调用audit_all_objects方法,以下看此措施的详实代码实现:

def audit_all_objects(self, mode='once', device_dirs=None):
        #run_forever传过来的mode 为forever
        description = ''
        if device_dirs:
            device_dir_str = ','.join(sorted(device_dirs))
            description = _(' - %s') % device_dir_str
        self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
                        (mode, self.auditor_type, description))
        begin = reported = time.time()
        self.total_bytes_processed = 0
        self.total_files_processed = 0
        total_quarantines = 0
        total_errors = 0
        time_auditing = 0
        #返回的是 device_dirs 下的文件hash列表
        #返回内容为 hsh_path, device, partition
        #all_locs 为设备self.device中device_dirs下的全部文件,为 AuditLocation(hsh_path, device, partition)对象
        all_locs = self.diskfile_mgr.object_audit_location_generator(
            device_dirs=device_dirs)
        for location in all_locs:
            loop_time = time.time()
            #一个个的审计
            self.failsafe_object_audit(location)
            self.logger.timing_since('timing', loop_time)
            self.files_running_time = ratelimit_sleep(
                self.files_running_time, self.max_files_per_second)
            self.total_files_processed  = 1
            now = time.time()
            if now - reported >= self.log_time:
                self.logger.info(_(
                    'Object audit (%(type)s). '
                    'Since %(start_time)s: Locally: %(passes)d passed, '
                    '%(quars)d quarantined, %(errors)d errors '
                    'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
                    'Total time: %(total).2f, Auditing time: %(audit).2f, '
                    'Rate: %(audit_rate).2f') % {
                        'type': '%s%s' % (self.auditor_type, description),
                        'start_time': time.ctime(reported),
                        'passes': self.passes, 'quars': self.quarantines,
                        'errors': self.errors,
                        'frate': self.passes / (now - reported),
                        'brate': self.bytes_processed / (now - reported),
                        'total': (now - begin), 'audit': time_auditing,
                        'audit_rate': time_auditing / (now - begin)})
                cache_entry = self.create_recon_nested_dict(
                    'object_auditor_stats_%s' % (self.auditor_type),
                    device_dirs,
                    {'errors': self.errors, 'passes': self.passes,
                     'quarantined': self.quarantines,
                     'bytes_processed': self.bytes_processed,
                     'start_time': reported, 'audit_time': time_auditing})
                dump_recon_cache(cache_entry, self.rcache, self.logger)
                reported = now
                total_quarantines  = self.quarantines
                total_errors  = self.errors
                self.passes = 0
                #隔离的数量
                self.quarantines = 0
                self.errors = 0
                self.bytes_processed = 0
            time_auditing  = (now - loop_time)
        # Avoid divide by zero during very short runs
        elapsed = (time.time() - begin) or 0.000001
        self.logger.info(_(
            'Object audit (%(type)s) "%(mode)s" mode '
            'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
            'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
            'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
            'Rate: %(audit_rate).2f') % {
                'type': '%s%s' % (self.auditor_type, description),
                'mode': mode, 'elapsed': elapsed,
                'quars': total_quarantines   self.quarantines,
                'errors': total_errors   self.errors,
                'frate': self.total_files_processed / elapsed,
                'brate': self.total_bytes_processed / elapsed,
                'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
        # Clear recon cache entry if device_dirs is set
        if device_dirs:
            cache_entry = self.create_recon_nested_dict(
                'object_auditor_stats_%s' % (self.auditor_type),
                device_dirs, {})
            dump_recon_cache(cache_entry, self.rcache, self.logger)
        if self.stats_sizes:
            self.logger.info(
                _('Object audit stats: %s') % json.dumps(self.stats_buckets))

方法 self.failsafe_object_audit(location)是找到devcie/objects下总体被审计对象的职责,审计就是要扫描全体的指标。开采存题指标文书就要隔开,在收获被审计对象的岗位后,再次来到 奥迪(Audi卡塔尔国tLocation(hsh_path, device, partition)类的迭代对象,也正是对此没贰个.data文件都会去实例化叁个 奥迪(Audi卡塔 尔(阿拉伯语:قطر‎tLocation对象,将其传给failsafe_object_audit方法。由其来进行下一步的操作。那么详细看failsafe_object_audit的实现:

def failsafe_object_audit(self, location):
        """
        object_audit的切入点
        Entrypoint to object_audit, with a failsafe generic exception handler.
        """
        try:
            #审计object
            self.object_audit(location)
        except (Exception, Timeout):
            self.logger.increment('errors')
            self.errors  = 1
            self.logger.exception(_('ERROR Trying to audit %s'), location)

此措施中要害是运转object_audit()方法,由其来运行详细的审计。其代码实现比如以下:

def object_audit(self, location):
        """
        Audits the given object location.

        :param location: an audit location
                         (from diskfile.object_audit_location_generator)
        """
        def raise_dfq(msg):
            raise DiskFileQuarantined(msg)

        try:
            df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
            #df 调用 DiskFile中的open方法
            with df.open():
                metadata = df.get_metadata()
                obj_size = int(metadata['Content-Length'])
                if self.stats_sizes:
                    self.record_stats(obj_size)
                #没有被损坏
                if self.zero_byte_only_at_fps and obj_size:
                    self.passes  = 1
                    return
                #_quarantine_hook 隔离挂钩 reader中 reader中假设文件大小或mdf变化会把文件隔离
                #reader是DiskFileReader对象
                reader = df.reader(_quarantine_hook=raise_dfq)
           #在文件关闭的时候。调用DiskFileReader的close方法。而且self._handle_close_quarantine()来处理隔离   replicate是怎样接收隔离的?
            with closing(reader):
                for chunk in reader:
                    chunk_len = len(chunk)
                    #流量限制 ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5)
                    self.bytes_running_time = ratelimit_sleep(
                        self.bytes_running_time,
                        self.max_bytes_per_second,
                        incr_by=chunk_len)
                    self.bytes_processed  = chunk_len
                    self.total_bytes_processed  = chunk_len
        except DiskFileNotExist:
            return
        #抛出文件隔离错误。隔离数量 1
        except DiskFileQuarantined as err:
            self.quarantines  = 1
            self.logger.error(_('ERROR Object %(obj)s failed audit and was'
                                ' quarantined: %(err)s'),
                              {'obj': location, 'err': err})
        self.passes  = 1

直白操作C标准库提供的字符串操作函数是有断定风险的,稍有不慎就能够引致内部存款和储蓄器难题。下一周用业余时间写了三个Mini的商洛字符串操作库,但是测量检验之后才发掘自身的落到实处有非常大的性质缺欠。

以下咱们来详细深入分析该格局,首先是由此传播的參数,来实例化一个DiskFile类,df

self.diskfile_mgr.get_diskfile_from_audit_location(location),当中location是 AuditLocation实例,

    def get_diskfile_from_audit_location(self, audit_location):
        dev_path = self.get_dev_path(audit_location.device, mount_check=False)
        return DiskFile.from_hash_dir(
            self, audit_location.path, dev_path,
            audit_location.partition)


    @classmethod
    def from_hash_dir(cls, mgr, hash_dir_path, device_path, partition):
        return cls(mgr, device_path, None, partition, _datadir=hash_dir_path)

地点五个函数的意思即为利用奥迪(Audi卡塔 尔(英语:State of Qatar)tLocation实例中的属性来实例化二个DiskFile类,DiskFile有对文本操作的亲力亲为措施,在拿到df后必得打开auditlocation.path所指向的文件,并获取它的metadata,然后读取文件,读取文件要求专门实例化一个DiskFileReader类。那些类是对文件举行隔绝的重大。方法比較掩盖。必要多多注意。

 def reader(self, keep_cache=False,
               _quarantine_hook=lambda m: None):
        """
        Return a :class:`swift.common.swob.Response` class compatible
        "`app_iter`" object as defined by
        :class:`swift.obj.diskfile.DiskFileReader`.
        这个实现将打开文件的关闭传递给swift.obj.diskfile.DiskFileReader来负责
        For this implementation, the responsibility of closing the open file
        is passed to the :class:`swift.obj.diskfile.DiskFileReader` object.

        :param keep_cache: caller's preference for keeping data read in the
                           OS buffer cache
        :param _quarantine_hook: 1-arg callable called when obj quarantined;
                                 the arg is the reason for quarantine.
                                 Default is to ignore it.
                                 Not needed by the REST layer.
        :returns: a :class:`swift.obj.diskfile.DiskFileReader` object
        """
        dr = DiskFileReader(
            self._fp, self._data_file, int(self._metadata['Content-Length']),
            self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
            self._mgr.keep_cache_size, self._device_path, self._logger,
            quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
        # At this point the reader object is now responsible for closing
        # the file pointer.文件指针
        self._fp = None
        return dr

DiskFileReader会读取读取文件里的目的

 def __iter__(self):
        """Returns an iterator over the data file."""
        try:
            dropped_cache = 0
            self._bytes_read = 0
            self._started_at_0 = False
            self._read_to_eof = False
            if self._fp.tell() == 0:
                self._started_at_0 = True
                self._iter_etag = hashlib.md5()
            while True:
                chunk = self._threadpool.run_in_thread(
                    self._fp.read, self._disk_chunk_size)
                if chunk:
                    if self._iter_etag:
                        self._iter_etag.update(chunk)
                    self._bytes_read  = len(chunk)
                    if self._bytes_read - dropped_cache > (1024 * 1024):
                        self._drop_cache(self._fp.fileno(), dropped_cache,
                                         self._bytes_read - dropped_cache)
                        dropped_cache = self._bytes_read
                    yield chunk
                else:
                    self._read_to_eof = True
                    self._drop_cache(self._fp.fileno(), dropped_cache,
                                     self._bytes_read - dropped_cache)
                    break
        finally:
            if not self._suppress_file_closing:
                self.close()

__iter__是对文件读取内容的迭代。在读取进程中会总计新的etag值,final方法。关闭文件,在关门文件时,要是有须求隔断的靶子,则就能将对象隔开分离,先看close函数的完成

def close(self):
        """
        Close the open file handle if present.

        For this specific implementation, this method will handle quarantining
        the file if necessary.
        """
        if self._fp:
            try:
                if self._started_at_0 and self._read_to_eof:     #文件从头到尾都读完
                    self._handle_close_quarantine()
            except DiskFileQuarantined:
                raise
            except (Exception, Timeout) as e:
                self._logger.error(_(
                    'ERROR DiskFile %(data_file)s'
                    ' close failure: %(exc)s : %(stack)s'),
                    {'exc': e, 'stack': ''.join(traceback.format_stack()),
                     'data_file': self._data_file})
            finally:
                fp, self._fp = self._fp, None
                fp.close()

当文件自始自终都都完是,关闭文件时对于不完全的文书会进展隔开,以下看self._handle_close_quarantine()方法

 def _handle_close_quarantine(self):
        """Check if file needs to be quarantined(检查文件是否须要隔离)"""
        if self._bytes_read != self._obj_size:
            self._quarantine(
                "Bytes read: %s, does not match metadata: %s" % (
                    self._bytes_read, self._obj_size))
        elif self._iter_etag and 
                self._etag != self._iter_etag.hexdigest():
            self._quarantine(
                "ETag %s and file's md5 %s do not match" % (
                    self._etag, self._iter_etag.hexdigest()))

首先判读文件的长度和读取的长度是不是生机勃勃律,假若区别样,则为_quarantine方法传入的是读取的长短不协作。假使etag不想同则传出的是md5值不宽容,看_quarantine方法的详实达成:

 def _quarantine(self, msg):
        #移到一个隔离区
        self._quarantined_dir = self._threadpool.run_in_thread(
            quarantine_renamer, self._device_path, self._data_file)
        self._logger.warn("Quarantined object %s: %s" % (
            self._data_file, msg))
        self._logger.increment('quarantines')
        self._quarantine_hook(msg)

_quarantine方法中quarantine_renamer会运维终于的隔开分离:

def quarantine_renamer(device_path, corrupted_file_path):
    """
    In the case that a file is corrupted文件损坏了, move it to a quarantined
    area to allow replication to fix it.让 replication 来处理

    :params device_path: The path to the device the corrupted file is on.
    :params corrupted_file_path: The path to the file you want quarantined.

    :returns: path (str) of directory the file was moved to
    :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
                     exceptions from rename
    """
    from_dir = dirname(corrupted_file_path)
    to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
    invalidate_hash(dirname(from_dir))
    try:
        renamer(from_dir, to_dir)
    except OSError as e:
        if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
            raise
        to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
        renamer(from_dir, to_dir)
    return to_dir

文件将会切断到device_path/quarantined/objects文件夹下,并将切断的文本夹再次回到。在那之中renamer方法

def renamer(old, new):
    """
    Attempt to fix / hide race conditions like empty object directories
    being removed by backend processes during uploads, by retrying.

    :param old: old path to be renamed
    :param new: new path to be renamed to
    """
    try:
        mkdirs(os.path.dirname(new))
        os.rename(old, new)
    except OSError:
        mkdirs(os.path.dirname(new))
        os.rename(old, new)

从代码能够看见,将文件隔断到了新文件夹中。

因为我水平有限。文中难免现身了然错误,敬请指正、调换。感谢!

本文由bwin必赢发布,转载请注明来源

关键词: bwin688