Skip to content

rocksdb

Posted on:November 19, 2023 at 01:10 PM

rocksdb get 使用

(gdb) bt
#0  std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::assign (__n=6, __s=0x555555db8135 "value", this=0x7fffffffde90) at /usr/include/c++/12/bits/basic_string.h:1063
#1  rocksdb::SaveValue (arg=0x7fffffffd690, entry=<optimized out>) at db/memtable.cc:1068
#2  0x000055555588eed2 in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffd690, callback_func=0x555555769fe0 <rocksdb::SaveValue(void*, char const*)>)
    at memtable/skiplistrep.cc:90
#3  0x00005555557683c3 in rocksdb::MemTable::GetFromTable (this=this@entry=0x555555db7d70, key=..., max_covering_tombstone_seq=<optimized out>, do_merge=do_merge@entry=true, callback=callback@entry=0x0, 
    is_blob_index=is_blob_index@entry=0x0, value=0x7fffffffde90, columns=0x0, timestamp=0x0, s=0x7fffffffd950, merge_context=0x7fffffffd990, seq=0x7fffffffd8f8, found_final_value=0x7fffffffd7b6, 
    merge_in_progress=0x7fffffffd7b7) at db/memtable.cc:1329
#4  0x0000555555769215 in rocksdb::MemTable::Get (this=0x555555db7d70, key=..., value=0x7fffffffde90, columns=0x0, timestamp=timestamp@entry=0x0, s=s@entry=0x7fffffffd950, merge_context=<optimized out>, 
    max_covering_tombstone_seq=<optimized out>, seq=<optimized out>, read_opts=..., immutable_memtable=<optimized out>, callback=<optimized out>, is_blob_index=<optimized out>, do_merge=<optimized out>)
    at db/memtable.cc:1285
#5  0x000055555565e189 in rocksdb::MemTable::Get (do_merge=true, is_blob_index=<optimized out>, callback=<optimized out>, immutable_memtable=false, read_opts=..., max_covering_tombstone_seq=0x7fffffffd8f0, 
    merge_context=0x7fffffffd990, s=0x7fffffffd950, timestamp=0x0, columns=<optimized out>, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:279
#6  rocksdb::DBImpl::GetImpl (this=0x555555d7db40, read_options=..., key=..., get_impl_options=...) at db/db_impl/db_impl.cc:2293
#7  0x000055555565522b in rocksdb::DBImpl::GetImpl (this=this@entry=0x555555d7db40, read_options=..., column_family=column_family@entry=0x555555da3ba0, key=..., value=value@entry=0x7fffffffddc0, 
    timestamp=<optimized out>) at db/db_impl/db_impl.cc:2025
#8  0x0000555555655492 in rocksdb::DBImpl::Get (this=0x555555d7db40, _read_options=..., column_family=0x555555da3ba0, key=..., value=0x7fffffffddc0, timestamp=<optimized out>) at db/db_impl/db_impl.cc:2013
#9  0x000055555564cc57 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=..., value=<optimized out>) at db/db_impl/db_impl.cc:1985
#10 0x0000555555642bf2 in rocksdb::DB::Get (this=this@entry=0x555555d7db40, options=..., column_family=0x555555da3ba0, key=..., value=value@entry=0x7fffffffde90) at ./include/rocksdb/db.h:562
#11 0x0000555555626ed2 in rocksdb::DB::Get (value=0x7fffffffde90, key=..., options=..., this=0x555555d7db40) at ./include/rocksdb/db.h:573
#12 rocksdb_get (db=<optimized out>, options=0x555555dbdc70, key=<optimized out>, keylen=<optimized out>, vallen=0x7fffffffdf00, errptr=0x7fffffffdf10) at db/c.cc:1293
#13 0x00005555556240d7 in main (argc=1, argv=0x7fffffffe078) at c_simple_example.c:67
(gdb) up
#1  rocksdb::SaveValue (arg=0x7fffffffd690, entry=<optimized out>) at db/memtable.cc:1068

走到这个分支:

      case kTypeValue: {                                             // (1) kv type 的类型
        if (s->inplace_update_support) {
          s->mem->GetLock(s->key->user_key())->ReadLock();
        }

        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);   // (2) 获取值

        *(s->status) = Status::OK();

        if (!s->do_merge) {
          // Preserve the value with the goal of returning it as part of
          // raw merge operands to the user
          // TODO(yanqin) update MergeContext so that timestamps information
          // can also be retained.

          merge_context->PushOperand(
              v, s->inplace_update_support == false /* operand_pinned */);
        } else if (*(s->merge_in_progress)) {
          assert(s->do_merge);

          if (s->value || s->columns) {
            // `op_failure_scope` (an output parameter) is not provided (set to
            // nullptr) since a failure must be propagated regardless of its
            // value.
            *(s->status) = MergeHelper::TimedFullMerge(
                merge_operator, s->key->user_key(),
                MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(),
                s->logger, s->statistics, s->clock,
                /* update_num_ops_stats */ true, s->value, s->columns,
                /* op_failure_scope */ nullptr);
          }
        } else if (s->value) {
          s->value->assign(v.data(), v.size());         //  (3) 将取得的值塞到s->value
        } else if (s->columns) {
          s->columns->SetPlainValue(v);
        }
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
                       GetImplOptions& get_impl_options) {
  assert(get_impl_options.value != nullptr ||
         get_impl_options.merge_operands != nullptr ||
         get_impl_options.columns != nullptr);

  assert(get_impl_options.column_family);

  if (read_options.timestamp) {
    const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
                                        *(read_options.timestamp));
    if (!s.ok()) {
      return s;
    }
  } else {
    const Status s = FailIfCfHasTs(get_impl_options.column_family);
    if (!s.ok()) {
      return s;
    }
  }

  // Clear the timestamps for returning results so that we can distinguish
  // between tombstone or key that has never been written
  if (get_impl_options.timestamp) {
    get_impl_options.timestamp->clear();
  }

  GetWithTimestampReadCallback read_cb(0);  // Will call Refresh

  PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
  StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
  PERF_TIMER_GUARD(get_snapshot_time);

  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
      get_impl_options.column_family);
  auto cfd = cfh->cfd();

  if (tracer_) {
    // TODO: This mutex should be removed later, to improve performance when
    // tracing is enabled.
    InstrumentedMutexLock lock(&trace_mutex_);
    if (tracer_) {
      // TODO: maybe handle the tracing status?
      tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
    }
  }

  if (get_impl_options.get_merge_operands_options != nullptr) {
    for (int i = 0; i < get_impl_options.get_merge_operands_options
                            ->expected_max_number_of_operands;
         ++i) {
      get_impl_options.merge_operands[i].Reset();
    }
  }

  // Acquire SuperVersion
  SuperVersion* sv = GetAndRefSuperVersion(cfd);
  if (read_options.timestamp && read_options.timestamp->size() > 0) {
    const Status s =
        FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
    if (!s.ok()) {
      ReturnAndCleanupSuperVersion(cfd, sv);
      return s;
    }
  }

  TEST_SYNC_POINT_CALLBACK("DBImpl::GetImpl:AfterAcquireSv", nullptr);
  TEST_SYNC_POINT("DBImpl::GetImpl:1");
  TEST_SYNC_POINT("DBImpl::GetImpl:2");

  SequenceNumber snapshot;
  if (read_options.snapshot != nullptr) {
    if (get_impl_options.callback) {
      // Already calculated based on read_options.snapshot
      snapshot = get_impl_options.callback->max_visible_seq();
    } else {
      snapshot =
          reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
    }
  } else {
    // Note that the snapshot is assigned AFTER referencing the super
    // version because otherwise a flush happening in between may compact away
    // data for the snapshot, so the reader would see neither data that was be
    // visible to the snapshot before compaction nor the newer data inserted
    // afterwards.
    snapshot = GetLastPublishedSequence();
    if (get_impl_options.callback) {
      // The unprep_seqs are not published for write unprepared, so it could be
      // that max_visible_seq is larger. Seek to the std::max of the two.
      // However, we still want our callback to contain the actual snapshot so
      // that it can do the correct visibility filtering.
      get_impl_options.callback->Refresh(snapshot);

      // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
      // max_visible_seq = max(max_visible_seq, snapshot)
      //
      // Currently, the commented out assert is broken by
      // InvalidSnapshotReadCallback, but if write unprepared recovery followed
      // the regular transaction flow, then this special read callback would not
      // be needed.
      //
      // assert(callback->max_visible_seq() >= snapshot);
      snapshot = get_impl_options.callback->max_visible_seq();
    }
  }
  // If timestamp is used, we use read callback to ensure <key,t,s> is returned
  // only if t <= read_opts.timestamp and s <= snapshot.
  // HACK: temporarily overwrite input struct field but restore
  SaveAndRestore<ReadCallback*> restore_callback(&get_impl_options.callback);
  const Comparator* ucmp = get_impl_options.column_family->GetComparator();
  assert(ucmp);
  if (ucmp->timestamp_size() > 0) {
    assert(!get_impl_options
                .callback);  // timestamp with callback is not supported
    read_cb.Refresh(snapshot);
    get_impl_options.callback = &read_cb;
  }
  TEST_SYNC_POINT("DBImpl::GetImpl:3");
  TEST_SYNC_POINT("DBImpl::GetImpl:4");

  // Prepare to store a list of merge operations if merge occurs.
  MergeContext merge_context;
  SequenceNumber max_covering_tombstone_seq = 0;

  Status s;
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
  // merge_operands will contain the sequence of merges in the latter case.
  LookupKey lkey(key, snapshot, read_options.timestamp);
  PERF_TIMER_STOP(get_snapshot_time);

  bool skip_memtable = (read_options.read_tier == kPersistedTier &&
                        has_unpersisted_data_.load(std::memory_order_relaxed));
  bool done = false;
  std::string* timestamp =
      ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
  if (!skip_memtable) {
    // Get value associated with key
    if (get_impl_options.get_value) {
      if (sv->mem->Get(
              lkey,
              get_impl_options.value ? get_impl_options.value->GetSelf()
                                     : nullptr,
              get_impl_options.columns, timestamp, &s, &merge_context,
              &max_covering_tombstone_seq, read_options,
              false /* immutable_memtable */, get_impl_options.callback,
              get_impl_options.is_blob_index)) {
        done = true;

        if (get_impl_options.value) {
          get_impl_options.value->PinSelf();
        }

        RecordTick(stats_, MEMTABLE_HIT);
      } else if ((s.ok() || s.IsMergeInProgress()) &&
                 sv->imm->Get(lkey,
                              get_impl_options.value
                                  ? get_impl_options.value->GetSelf()
                                  : nullptr,
                              get_impl_options.columns, timestamp, &s,
                              &merge_context, &max_covering_tombstone_seq,
                              read_options, get_impl_options.callback,
                              get_impl_options.is_blob_index)) {
        done = true;

        if (get_impl_options.value) {
          get_impl_options.value->PinSelf();
        }

        RecordTick(stats_, MEMTABLE_HIT);
      }
    } else {
      // Get Merge Operands associated with key, Merge Operands should not be
      // merged and raw values should be returned to the user.
      if (sv->mem->Get(lkey, /*value=*/nullptr, /*columns=*/nullptr,
                       /*timestamp=*/nullptr, &s, &merge_context,
                       &max_covering_tombstone_seq, read_options,
                       false /* immutable_memtable */, nullptr, nullptr,
                       false)) {
        done = true;
        RecordTick(stats_, MEMTABLE_HIT);
      } else if ((s.ok() || s.IsMergeInProgress()) &&
                 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
                                           &max_covering_tombstone_seq,
                                           read_options)) {
        done = true;
        RecordTick(stats_, MEMTABLE_HIT);
      }
    }
    if (!done && !s.ok() && !s.IsMergeInProgress()) {
      ReturnAndCleanupSuperVersion(cfd, sv);
      return s;
    }
  }
  TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
  TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
  PinnedIteratorsManager pinned_iters_mgr;
  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv->current->Get(
        read_options, lkey, get_impl_options.value, get_impl_options.columns,
        timestamp, &s, &merge_context, &max_covering_tombstone_seq,
        &pinned_iters_mgr,
        get_impl_options.get_value ? get_impl_options.value_found : nullptr,
        nullptr, nullptr,
        get_impl_options.get_value ? get_impl_options.callback : nullptr,
        get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
        get_impl_options.get_value);
    RecordTick(stats_, MEMTABLE_MISS);
  }

  {
    PERF_TIMER_GUARD(get_post_process_time);

    RecordTick(stats_, NUMBER_KEYS_READ);
    size_t size = 0;
    if (s.ok()) {
      const auto& merge_threshold = read_options.merge_operand_count_threshold;
      if (merge_threshold.has_value() &&
          merge_context.GetNumOperands() > merge_threshold.value()) {
        s = Status::OkMergeOperandThresholdExceeded();
      }

      if (get_impl_options.get_value) {
        if (get_impl_options.value) {
          size = get_impl_options.value->size();
        } else if (get_impl_options.columns) {
          size = get_impl_options.columns->serialized_size();
        }
      } else {
        // Return all merge operands for get_impl_options.key
        *get_impl_options.number_of_operands =
            static_cast<int>(merge_context.GetNumOperands());
        if (*get_impl_options.number_of_operands >
            get_impl_options.get_merge_operands_options
                ->expected_max_number_of_operands) {
          s = Status::Incomplete(
              Status::SubCode::KMergeOperandsInsufficientCapacity);
        } else {
          // Each operand depends on one of the following resources: `sv`,
          // `pinned_iters_mgr`, or `merge_context`. It would be crazy expensive
          // to reference `sv` for each operand relying on it because `sv` is
          // (un)ref'd in all threads using the DB. Furthermore, we do not track
          // on which resource each operand depends.
          //
          // To solve this, we bundle the resources in a `GetMergeOperandsState`
          // and manage them with a `SharedCleanablePtr` shared among the
          // `PinnableSlice`s we return. This bundle includes one `sv` reference
          // and ownership of the `merge_context` and `pinned_iters_mgr`
          // objects.
          bool ref_sv = ShouldReferenceSuperVersion(merge_context);
          if (ref_sv) {
            assert(!merge_context.GetOperands().empty());
            SharedCleanablePtr shared_cleanable;
            GetMergeOperandsState* state = nullptr;
            state = new GetMergeOperandsState();
            state->merge_context = std::move(merge_context);
            state->pinned_iters_mgr = std::move(pinned_iters_mgr);

            sv->Ref();

            state->sv_handle = new SuperVersionHandle(
                this, &mutex_, sv,
                immutable_db_options_.avoid_unnecessary_blocking_io);

            shared_cleanable.Allocate();
            shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState,
                                              state /* arg1 */,
                                              nullptr /* arg2 */);
            for (size_t i = 0; i < state->merge_context.GetOperands().size();
                 ++i) {
              const Slice& sl = state->merge_context.GetOperands()[i];
              size += sl.size();

              get_impl_options.merge_operands->PinSlice(
                  sl, nullptr /* cleanable */);
              if (i == state->merge_context.GetOperands().size() - 1) {
                shared_cleanable.MoveAsCleanupTo(
                    get_impl_options.merge_operands);
              } else {
                shared_cleanable.RegisterCopyWith(
                    get_impl_options.merge_operands);
              }
              get_impl_options.merge_operands++;
            }
          } else {
            for (const Slice& sl : merge_context.GetOperands()) {
              size += sl.size();
              get_impl_options.merge_operands->PinSelf(sl);
              get_impl_options.merge_operands++;
            }
          }
        }
      }
      RecordTick(stats_, BYTES_READ, size);
      PERF_COUNTER_ADD(get_read_bytes, size);
    }

    ReturnAndCleanupSuperVersion(cfd, sv);

    RecordInHistogram(stats_, BYTES_PER_READ, size);
  }
  return s;
}

相关阅读