虚位以待(AD)
虚位以待(AD)
首页 > 数据库 > MongoDB数据库 > Rocksdb代码学习写流程1(WriteBatch写,WriterThead调度Writer)

Rocksdb代码学习写流程1(WriteBatch写,WriterThead调度Writer)
类别:MongoDB数据库   作者:码皇   来源:<a href="http://blog.csdn.net/u014361034" target="_blank" rel="nofollow&   点击:

Rocksdb代码学习写流程1(WriteBatch写,WriterThead调度Writer)。

Rocksdb代码学习写流程1(WriteBatch写,WriterThead调度Writer)。

1.几个需要使用的相关类

1.Slice

    //主要用来装数据的// 就两个成员变量data,size// (就是用装key和value的值,长度),以及一些处理函数。class Slice {
    public: // Create an empty slice. Slice() : data_(""), size_(0) {
    }
    // Create a slice that refers to d[0,n-1]. Slice(const char* d, size_t n) : data_(d), size_(n) {
    }
    // Create a slice that refers to the contents of "s" /* implicit */ Slice(const std::string& s) : data_(s.data()), size_(s.size()) {
    }
    // Create a slice that refers to s[0,strlen(s)-1] /* implicit */ Slice(const char* s) : data_(s), size_(strlen(s)) {
    }
    // Create a single slice from SliceParts using buf as storage. // buf must exist as long as the returned Slice exists. Slice(const struct SliceParts& parts, std::string* buf);
    // Return a pointer to the beginning of the referenced data const char* data() const {
    return data_;
    }
    // Return the length (in bytes) of the referenced data size_t size() const {
    return size_;
    }
    // Return true iff the length of the referenced data is zero bool empty() const {
    return size_ == 0;
    }
    // Return the ith byte in the referenced data. // REQUIRES: n < size() char operator[](size_t n) const {
    assert(n < size());
    return data_[n];
    }
    // Change this slice to refer to an empty array void clear() {
    data_ = "";
    size_ = 0;
    }
    // Drop the first "n" bytes from this slice. void remove_prefix(size_t n) {
    assert(n <= size());
    data_ += n;
    size_ -= n;
    }
    // Return a string that contains the copy of the referenced data. std::string ToString(bool hex = false) const;
    // Three-way comparison. Returns value: // < 0 iff "*this" < "b", // == 0 iff "*this" == "b", // > 0 iff "*this" > "b" int compare(const Slice& b) const;
    // Return true iff "x" is a prefix of "*this" bool starts_with(const Slice& x) const {
    return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
    }
    // Compare two slices and returns the first byte where they differ size_t difference_offset(const Slice& b) const;
    // private: make these public for rocksdbjni access const char* data_;
    size_t size_;
    // Intentionally copyable}
    ;

2.WriteOptions

    struct WriteOptions {
    // Default: false bool sync;
    //是否需要同步 // If true, writes will not first go to the write ahead log, // and the write may got lost after a crash. bool disableWAL;
    //是否需要写事务日志 // The option is deprecated. It'
    s not used anymore. uint64_t timeout_hint_us;
    // 指示了这个写操作完成的时间期限 // If true and if user is trying to write to column families that don'
    t exist // (they were dropped), ignore the write (don'
    t return an error). If there // are multiple writes in a WriteBatch, other writes will succeed. // Default: false bool ignore_missing_column_families;
    WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0), ignore_missing_column_families(false) {
    }
    }
    ;

3.WriteBatch

    //rocksdb在写时做了一个优化批量更新的操作,即writebatch类。writebatch类只有一个成员变量,存储的//是若干条记录的序列号字符串,这个字符串是按照一定格式生成,当要取出这些记录时,也要按照格式一条一条//解析出来。//先介绍下这个类的成员变量rep_,这个字符串用来存储这次批操作的所有记录,格式如下:// WriteBatch::rep_ :=// sequence: fixed64// count: fixed32// data: record[count]// record :=// kTypeValue varstring varstring// kTypeDeletion varstring// kTypeSingleDeletion varstring// kTypeMerge varstring varstring// kTypeColumnFamilyValue varint32 varstring varstring// kTypeColumnFamilyDeletion varint32 varstring varstring// kTypeColumnFamilySingleDeletion varint32 varstring varstring// kTypeColumnFamilyMerge varint32 varstring varstring// varstring :=// len: varint32// data: uint8[len]//可以看到这个这个字符串首先有有8字节的序列号和4字节的记录数作为头,所以这个类定义了//static const size_t KHeader=12//作为这个这个字符串的最小长度。在头之后,紧接着就是一条一条的记录。//对于插入的记录,由kTypeValue+key长度+key+value长度+value组成//对于删除记录,由kTypeDelete+key长度+key组成//这类定义了写和删除的操作实现就是调用WriteBatchInternal这个类里面的方法class WriteBatch : public WriteBatchBase {
    public: explicit WriteBatch(size_t reserved_bytes = 0);
    ~WriteBatch();
    using WriteBatchBase::Put;
    // Store the mapping "key->value" in the database. void Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override;
    void Put(const Slice& key, const Slice& value) override {
    Put(nullptr, key, value);
    }
    // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. void Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) override;
    void Put(const SliceParts& key, const SliceParts& value) override {
    Put(nullptr, key, value);
    }
    using WriteBatchBase::Delete;
    // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
    void Delete(const Slice& key) override {
    Delete(nullptr, key);
    }
    // variant that takes SliceParts void Delete(ColumnFamilyHandle* column_family, const SliceParts& key) override;
    void Delete(const SliceParts& key) override {
    Delete(nullptr, key);
    }
    using WriteBatchBase::SingleDelete;
    // If the database contains a mapping for "key", erase it. Expects that the // key was not overwritten. Else do nothing. void SingleDelete(ColumnFamilyHandle* column_family, const Slice& key) override;
    void SingleDelete(const Slice& key) override {
    SingleDelete(nullptr, key);
    }
    // variant that takes SliceParts void SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key) override;
    void SingleDelete(const SliceParts& key) override {
    SingleDelete(nullptr, key);
    }

4.WriteBatchInternal
这个类主要作用就是操作WriteBatch的字符串,比如取出/设置序列号,取出/设置记录数,将WriteBatch插入memtable等等。来看下这个类的操作成员方法,全部声明为static方法:

    class WriteBatchInternal {
    public: // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* static void Put(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value);
    static void Put(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value);
    static void Delete(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key);
    static void Delete(WriteBatch* batch, uint32_t column_family_id, const Slice& key);
    static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key);
    static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, const Slice& key);
    static void Merge(WriteBatch* batch, uint32_t column_family_id, const Slice& key, const Slice& value);
    static void Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value);
    // Return the number of entries in the batch. static int Count(const WriteBatch* batch);
    // Set the count for the number of entries in the batch. static void SetCount(WriteBatch* batch, int n);
    // Return the seqeunce number for the start of this batch. static SequenceNumber Sequence(const WriteBatch* batch);
    // Store the specified number as the seqeunce number for the start of // this batch. static void SetSequence(WriteBatch* batch, SequenceNumber seq);
    // Returns the offset of the first entry in the batch. // This offset is only valid if the batch is not empty. static size_t GetFirstOffset(WriteBatch* batch);
    static Slice Contents(const WriteBatch* batch) {
    return Slice(batch->rep_);
    }
    static size_t ByteSize(const WriteBatch* batch) {
    return batch->rep_.size();
    }
    static void SetContents(WriteBatch* batch, const Slice& contents);
    // Inserts batch entries into memtable // If dont_filter_deletes is false AND options.filter_deletes is true, // then --> Drops deletes in batch if db->KeyMayExist returns false // If ignore_missing_column_families == true. WriteBatch referencing // non-existing column family should be ignored. // However, if ignore_missing_column_families == false, any WriteBatch // referencing non-existing column family will return a InvalidArgument() // failure. // // If log_number is non-zero, the memtable will be updated only if // memtables->GetLogNumber() >= log_number static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, const bool dont_filter_deletes = true);
    static void Append(WriteBatch* dst, const WriteBatch* src);
    }
    ;

5.MemTableInserter

    // 这个类将正常的键值对和删除类型的键值对添加进memtable。这个类将作为参数,传入rep_解析函数class MemTableInserter : public WriteBatch::Handler {
    public: SequenceNumber sequence_;
    ColumnFamilyMemTables* cf_mems_;
    bool ignore_missing_column_families_;
    uint64_t log_number_;
    DBImpl* db_;
    const bool dont_filter_deletes_;
    MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, bool ignore_missing_column_families, uint64_t log_number, DB* db, const bool dont_filter_deletes) : sequence_(sequence), cf_mems_(cf_mems), ignore_missing_column_families_(ignore_missing_column_families), log_number_(log_number), db_(reinterpret_cast(db)), dont_filter_deletes_(dont_filter_deletes) {
    assert(cf_mems);
    if (!dont_filter_deletes_) {
    assert(db_);
    }
    }
    bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
    // We are only allowed to call this from a single-threaded write thread // (or while holding DB mutex) bool found = cf_mems_->Seek(column_family_id);
    if (!found) {
    if (ignore_missing_column_families_) {
    *s = Status::OK();
    }
    else {
    *s = Status::InvalidArgument( "Invalid column family specified in write batch");
    }
    return false;
    }
    if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) {
    // This is true only in recovery environment (log_number_ is always 0 in // non-recovery, regular write code-path) // * If log_number_ < cf_mems_->GetLogNumber(), this means that column // family already contains updates from this log. We can'
    t apply updates // twice because of update-in-place or merge workloads -- ignore the // update *s = Status::OK();
    return false;
    }
    return true;
    }
    virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override {
    Status seek_status;
    //如何在memtable中没有找到传入的ColumnFamily,直接返回 if (!SeekToColumnFamily(column_family_id, &seek_status)) {
    ++sequence_;
    return seek_status;
    }
    //获取memtable MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    //如何memtable操作中的内部更新不支持就添加这条记录 if (!moptions->inplace_update_support) {
    mem->Add(sequence_, kTypeValue, key, value);
    //或者更新这条记录 }
    else if (moptions->inplace_callback == nullptr) {
    mem->Update(sequence_, key, value);
    RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
    }
    else {
    //或者更新这条记录 if (mem->UpdateCallback(sequence_, key, value)) {
    }
    else {
    //如果在memtable中找不到这条记录,就去从sst获取,并且更新,添加 // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot;
    read_from_snapshot.number_ = sequence_;
    ReadOptions ropts;
    ropts.snapshot = &read_from_snapshot;
    std::string prev_value;
    std::string merged_value;
    auto cf_handle = cf_mems_->GetColumnFamilyHandle();
    if (cf_handle == nullptr) {
    cf_handle = db_->DefaultColumnFamily();
    }
    //调用数据库的Get的操作获获取这个key之前的值,并存在快照中 Status s = db_->Get(ropts, cf_handle, key, &prev_value);
    char* prev_buffer = const_cast(prev_value.c_str());
    uint32_t prev_size = static_cast(prev_value.size());
    auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr, s.ok() ? &prev_size : nullptr, value, &merged_value);
    if (status == UpdateStatus::UPDATED_INPLACE) {
    // prev_value is updated in-place with final value. mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
    RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
    }
    else if (status == UpdateStatus::UPDATED) {
    // merged_value contains the final value. mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
    RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
    }
    }
    }
    // Since all Puts are logged in trasaction logs (if enabled), always bump // sequence number. Even if the update eventually fails and does not result // in memtable add/update. sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
    }
    virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
    ++sequence_;
    return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    if (!dont_filter_deletes_ && moptions->filter_deletes) {
    SnapshotImpl read_from_snapshot;
    read_from_snapshot.number_ = sequence_;
    ReadOptions ropts;
    ropts.snapshot = &read_from_snapshot;
    std::string value;
    auto cf_handle = cf_mems_->GetColumnFamilyHandle();
    if (cf_handle == nullptr) {
    cf_handle = db_->DefaultColumnFamily();
    }
    if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
    RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
    return Status::OK();
    }
    }
    mem->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
    }
    virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
    ++sequence_;
    return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    if (!dont_filter_deletes_ && moptions->filter_deletes) {
    SnapshotImpl read_from_snapshot;
    read_from_snapshot.number_ = sequence_;
    ReadOptions ropts;
    ropts.snapshot = &read_from_snapshot;
    std::string value;
    auto cf_handle = cf_mems_->GetColumnFamilyHandle();
    if (cf_handle == nullptr) {
    cf_handle = db_->DefaultColumnFamily();
    }
    if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
    RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
    return Status::OK();
    }
    }
    mem->Add(sequence_, kTypeSingleDeletion, key, Slice());
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
    }
    virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override {
    Status seek_status;
    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
    ++sequence_;
    return seek_status;
    }
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetMemTableOptions();
    bool perform_merge = false;
    if (moptions->max_successive_merges > 0 && db_ != nullptr) {
    LookupKey lkey(key, sequence_);
    // Count the number of successive merges at the head // of the key in the memtable size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
    if (num_merges >= moptions->max_successive_merges) {
    perform_merge = true;
    }
    }
    if (perform_merge) {
    // 1) Get the existing value std::string get_value;
    // Pass in the sequence number so that we also include previous merge // operations in the same batch. SnapshotImpl read_from_snapshot;
    read_from_snapshot.number_ = sequence_;
    ReadOptions read_options;
    read_options.snapshot = &read_from_snapshot;
    auto cf_handle = cf_mems_->GetColumnFamilyHandle();
    if (cf_handle == nullptr) {
    cf_handle = db_->DefaultColumnFamily();
    }
    db_->Get(read_options, cf_handle, key, &get_value);
    Slice get_value_slice = Slice(get_value);
    // 2) Apply this merge auto merge_operator = moptions->merge_operator;
    assert(merge_operator);
    std::deque operands;
    operands.push_front(value.ToString());
    std::string new_value;
    bool merge_success = false;
    {
    StopWatchNano timer(Env::Default(), moptions->statistics != nullptr);
    PERF_TIMER_GUARD(merge_operator_time_nanos);
    merge_success = merge_operator->FullMerge( key, &get_value_slice, operands, &new_value, moptions->info_log);
    RecordTick(moptions->statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanos());
    }
    if (!merge_success) {
    // Failed to merge! RecordTick(moptions->statistics, NUMBER_MERGE_FAILURES);
    // Store the delta in memtable perform_merge = false;
    }
    else {
    // 3) Add value to memtable mem->Add(sequence_, kTypeValue, key, new_value);
    }
    }
    if (!perform_merge) {
    // Add merge operator to memtable mem->Add(sequence_, kTypeMerge, key, value);
    }
    sequence_++;
    cf_mems_->CheckMemtableFull();
    return Status::OK();
    }
    }
    ;

2.写流程

    rocksdb_put(db, writeoptions, key, strlen(key), value, strlen(value) + 1, &err);
    //调用SaveError(errptr, db->rep->Put(options->rep, Slice(key, keylen), Slice(val, vallen)));
    //调用db->rep->Put(options->rep, Slice(key, keylen), Slice(val, vallen))Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
    // Pre-allocate size of write batch conservatively. // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // and we allocate 11 extra bytes for key length, as well as value length. WriteBatch batch(key.size() + value.size() + 24);
    //设置Batch batch.Put(column_family, key, value);
    return Write(opt, &batch);
    //写Batch}
    //首先设置WriteBatchvoid WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
    WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
    }
    //实际调用的的是WriteBatchInternal::Putvoid WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) {
    //WriteBatch记入数加1 WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
    // if (column_family_id == 0) {
    b->rep_.push_back(static_cast(kTypeValue));
    //添加类型 }
    else {
    b->rep_.push_back(static_cast(kTypeColumnFamilyValue));
    //添加类型 PutVarint32(&b->rep_, column_family_id);
    }
    PutLengthPrefixedSlice(&b->rep_, key);
    //key的长度和值 PutLengthPrefixedSlice(&b->rep_, value);
    //添加value的长度和值}
    //然后再把WriteBatch写入virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
    |Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
    return WriteImpl(write_options, my_batch, nullptr);
    }
    //写入实现在这里Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) {
    if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
    }
    if (write_options.timeout_hint_us != 0) {
    return Status::InvalidArgument("timeout_hint_us is deprecated");
    }
    Status status;
    bool callback_failed = false;
    bool xfunc_attempted_write = false;
    /*先尝试下可以不可以写*/ XFUNC_TEST("transaction", "transaction_xftest_write_impl", xf_transaction_write1, xf_transaction_write, write_options, db_options_, my_batch, callback, this, &status, &xfunc_attempted_write);
    if (xfunc_attempted_write) {
    // Test already did the write return status;
    }
    PERF_TIMER_GUARD(write_pre_and_post_process_time);
    WriteThread::Writer w;
    //建立写操作 w.batch = my_batch;
    //要写的数据 w.sync = write_options.sync;
    //需不需要对事务日志执行fsync或者fdatasync 操作 w.disableWAL = write_options.disableWAL;
    //指示需要不需要写事务日志 w.in_batch_group = false;
    // 最后,in_batch_group的比较有意思。在RocksDB内部,对写入操作做了优化,尽可能地将用户的写入 // 批量处理。这其中使用了一个队列,即write_thread_内部的WriteThread::Writer*队列。在准备写队列头 // 的任务时,会试着用BuildBatchGroup()构建一个批量任务组,将紧跟着队头的其他写操作任务加入 // 到一个BatchGroup,一次性地写入数据库。 w.done = false;
    //写操作完成时设置 w.has_callback = (callback != nullptr) ? true : false;
    if (!write_options.disableWAL) {
    //记入 the Number of Write calls that request WAL RecordTick(stats_, WRITE_WITH_WAL);
    }
    //记入工作。数据库评测时用到 StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE);
    // 将当前写入任务@w挂入写队列,并在mutex_上睡眠等待。等待直到: // 1) 写操作设置了超时时间,等待超时。或, // 2) @w之前的任务都已完成,@w已处于队列头部。或, // 3) @w这个写任务被别的写线程完成了。 // 第3个条件,任务被别的写线程完成,实际上是被之前的写任务合并进一个 // WriteBatchGroup中去了。此时的@w会被标记成in_batch_group。有意思的是,在JoinBatchGroup() // 里面,如果因为超时唤醒了,发现当前任务in_batch_group为true,则会继续等待, // 因为它已经被别的线程加入BatchGroup准备写入数据库了。 write_thread_.JoinBatchGroup(&w);
    //将要带有要写的batch的Write加入写的队列当中 if (w.done) {
    // write was done by someone else, no need to grab mutex RecordTick(stats_, WRITE_DONE_BY_OTHER);
    return w.status;
    }
    // else we are the leader of the write batch group WriteContext context;
    mutex_.Lock();
    //如果需要写事务日志 if (!write_options.disableWAL) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
    }
    //还是自己的write写自己的batch RecordTick(stats_, WRITE_DONE_BY_SELF);
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
    // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
    //设置最大的log的size uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0) ? 4 * max_total_in_memory_state_ : db_options_.max_total_wal_size;
    if (UNLIKELY(!single_column_family_mode_) && alive_log_files_.begin()->getting_flushed == false && total_log_size_ > max_total_wal_size) {
    // 如果column family有多个,最早的活跃的事务日志对应的memtable还没有被写入磁盘, // 而且当前日志总大小超过了设定的最大值,那么就需要分配新的memtable,将老的 // immutable memtable内容写入磁盘。 uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
    //当前活跃事务的日志对应的num alive_log_files_.begin()->getting_flushed = true;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
    // no need to refcount because drop is happening in write thread, so can'
    t // happen while we'
    re in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (cfd->IsDropped()) {
    continue;
    }
    //小等于当前活跃事务日志的num的colum family都应该切换新的memtable if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
    //为Column family分配新的memtable status = SwitchMemtable(cfd, &context);
    if (!status.ok()) {
    break;
    }
    cfd->imm()->FlushRequested();
    //调度将要发生的flush SchedulePendingFlush(cfd);
    }
    }
    //调度flush或者compaction MaybeScheduleFlushOrCompaction();
    }
    /*判断需要flush */ else if (UNLIKELY(write_buffer_.ShouldFlush())) {
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_.memory_usage(), write_buffer_.buffer_size());
    // no need to refcount because drop is happening in write thread, so can'
    t // happen while we'
    re in the write thread //这里flush当前版本的columnfamily for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (cfd->IsDropped()) {
    continue;
    }
    if (!cfd->mem()->IsEmpty()) {
    status = SwitchMemtable(cfd, &context);
    if (!status.ok()) {
    break;
    }
    cfd->imm()->FlushRequested();
    SchedulePendingFlush(cfd);
    }
    }
    //调度flush或者compaction MaybeScheduleFlushOrCompaction();
    }
    if (UNLIKELY(status.ok() && !bg_error_.ok())) {
    status = bg_error_;
    }
    /*flush_schedule不为空*/ if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(&context);
    }
    /*write_controller_判断是否需要stop或者delay*/ if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
    PERF_TIMER_STOP(write_pre_and_post_process_time);
    PERF_TIMER_GUARD(write_delay_time);
    // We don'
    t know size of curent batch so that we always use the size // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. status = DelayWrite(last_batch_group_size_);
    PERF_TIMER_START(write_pre_and_post_process_time);
    }
    uint64_t last_sequence = versions_->LastSequence();
    WriteThread::Writer* last_writer = &w;
    autovector write_batch_group;
    /*日志的和日志的dir同步*/ bool need_log_sync = !write_options.disableWAL && write_options.sync;
    bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
    //这里在等待事务日志同步的完成 if (status.ok()) {
    //把需要写的WriteBatch作为leader加入BatchGroup中 last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( &w, &last_writer, &write_batch_group);
    if (need_log_sync) {
    while (logs_.front().getting_synced) {
    log_sync_cv_.Wait();
    }
    for (auto& log : logs_) {
    assert(!log.getting_synced);
    log.getting_synced = true;
    }
    }
    // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into memtables mutex_.Unlock();
    if (callback != nullptr) {
    // If this write has a validation callback, check to see if this write // is able to be written. Must be called on the write thread. status = callback->Callback(this);
    callback_failed = true;
    }
    }
    else {
    mutex_.Unlock();
    }
    // At this point the mutex is unlocked //这里开始是写memtable if (status.ok()) {
    //把write_batch_group中的WriteBatch往WriteBatchInternal这个类要往memtable中写的updates-WriteBatch(这样写和前面区分开)添加 WriteBatch* updates = nullptr;
    if (write_batch_group.size() == 1) {
    updates = write_batch_group[0];
    }
    else {
    updates = &tmp_batch_;
    for (size_t i = 0;
    i < write_batch_group.size();
    ++i) {
    //往writeBatch追加 WriteBatchInternal::Append(updates, write_batch_group[i]);
    }
    }
    //这个updates-writeBatch的序列号等于verson中最后的序列号+1 const SequenceNumber current_sequence = last_sequence + 1;
    //设置序列号 WriteBatchInternal::SetSequence(updates, current_sequence);
    //获取记录数 int my_batch_count = WriteBatchInternal::Count(updates);
    //verson中最后的序列号等于Internal-writeBatch的count last_sequence += my_batch_count;
    const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
    // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
    RecordTick(stats_, BYTES_WRITTEN, batch_size);
    if (write_options.disableWAL) {
    flush_on_destroy_ = true;
    }
    PERF_TIMER_STOP(write_pre_and_post_process_time);
    uint64_t log_size = 0;
    //需要写事务日志 if (!write_options.disableWAL) {
    PERF_TIMER_GUARD(write_wal_time);
    //通过WriteBatch创建log入口,就是获取updates-Batch的内容 Slice log_entry = WriteBatchInternal::Contents(updates);
    //log队列里添加log_entry status = logs_.back().writer->AddRecord(log_entry);
    total_log_size_ += log_entry.size();
    //添加日志的size alive_log_files_.back().AddSize(log_entry.size());
    log_empty_ = false;
    log_size = log_entry.size();
    RecordTick(stats_, WAL_FILE_BYTES, log_size);
    /*同步日志*/ if (status.ok() && need_log_sync) {
    RecordTick(stats_, WAL_FILE_SYNCED);
    StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
    // It'
    s safe to access logs_ with unlocked mutex_ here because: // - we'
    ve set getting_synced=true for all logs, // so other threads won'
    t pop from logs_ while we'
    re here, // - only writer thread can push to logs_, and we'
    re in // writer thread, so no one will push to logs_, // - as long as other threads don'
    t modify it, it'
    s safe to read // from std::deque from multiple threads concurrently. for (auto& log : logs_) {
    status = log.writer->file()->Sync(db_options_.use_fsync);
    if (!status.ok()) {
    break;
    }
    }
    if (status.ok() && need_log_dir_sync) {
    // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. status = directories_.GetWalDir()->Fsync();
    }
    }
    }
    //上面都是在操作log if (status.ok()) {
    PERF_TIMER_GUARD(write_memtable_time);
    /****************这里开始将WriteBatch写memtable***********/ //这个函数就是往memtable中写WriteBatch用的 status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), write_options.ignore_missing_column_families, 0, this, false);
    // A non-OK status here indicates iteration failure (either in-memory // writebatch corruption (very bad), or the client specified invalid // column family). This will later on trigger bg_error_. // // Note that existing logic was not sound. Any partial failure writing // into the memtable would result in a state that some write ops might // have succeeded in memtable but Status reports error for all writes. SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
    }
    //收尾操作 PERF_TIMER_START(write_pre_and_post_process_time);
    if (updates == &tmp_batch_) {
    tmp_batch_.Clear();
    }
    mutex_.Lock();
    // internal stats default_cf_internal_stats_->AddDBStats( InternalStats::BYTES_WRITTEN, batch_size);
    default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, my_batch_count);
    if (!write_options.disableWAL) {
    if (write_options.sync) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
    }
    default_cf_internal_stats_->AddDBStats( InternalStats::WAL_FILE_BYTES, log_size);
    }
    if (status.ok()) {
    versions_->SetLastSequence(last_sequence);
    }
    }
    else {
    // Operation failed. Make sure sure mutex is held for cleanup code below. mutex_.Lock();
    }
    if (db_options_.paranoid_checks && !status.ok() && !callback_failed && !status.IsBusy() && bg_error_.ok()) {
    bg_error_ = status;
    // stop compaction & fail any further writes }
    mutex_.AssertHeld();
    if (need_log_sync) {
    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
    }
    uint64_t writes_for_other = write_batch_group.size() - 1;
    if (writes_for_other > 0) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, writes_for_other);
    if (!write_options.disableWAL) {
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, writes_for_other);
    }
    }
    mutex_.Unlock();
    write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
    return status;
    }
相关热词搜索: