From 5a6e41cab7a87f71c26df4089eae3496abe712de Mon Sep 17 00:00:00 2001 From: wires Date: Fri, 4 Apr 2025 16:08:38 -0400 Subject: [PATCH] add key hashing logic --- apps/kv/src/kv_ring.erl | 37 ++++++++++++++++++++++++++++++++ apps/kv/src/kv_vnode.erl | 6 +++--- apps/kv/src/kv_vnode_manager.erl | 15 +++++++------ apps/kv/src/kv_vnode_sup.erl | 2 +- 4 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 apps/kv/src/kv_ring.erl diff --git a/apps/kv/src/kv_ring.erl b/apps/kv/src/kv_ring.erl new file mode 100644 index 0000000..0ba4ac0 --- /dev/null +++ b/apps/kv/src/kv_ring.erl @@ -0,0 +1,37 @@ +-module(kv_ring). + +-export([ + new/2, + key_to_index/1, + ordered_from/2 +]). + +-define(RING_MAX, trunc(math:pow(2, 160))). + +-export_type([ring/0]). + +-type index() :: non_neg_integer(). + +-opaque ring() :: {Partitions :: non_neg_integer(), [{Index :: index(), Owner :: node()}]}. + +%%% public api + +-spec new(Partitions :: non_neg_integer(), Node :: node()) -> ring(). +new(Partitions, Node) -> + {Partitions, [{Index, Node} || Index <- lists:seq(0, ?RING_MAX - 1, ring_step(Partitions))]}. + +-spec key_to_index(Key :: binary()) -> index(). +key_to_index(Key) -> + <> = crypto:hash(sha, Key), + Index. + +-spec ordered_from(Ring :: ring(), Index :: index()) -> [{Index :: index(), Owner :: node()}]. +ordered_from({Partitions, OwnerList}, Index) -> + Step = ring_step(Partitions), + {A, B} = lists:split(Index div Step, OwnerList), + B ++ A. + +%%% internal functions + +ring_step(RingSize) -> + ?RING_MAX div RingSize. diff --git a/apps/kv/src/kv_vnode.erl b/apps/kv/src/kv_vnode.erl index 8a83303..d3163fa 100644 --- a/apps/kv/src/kv_vnode.erl +++ b/apps/kv/src/kv_vnode.erl @@ -19,7 +19,7 @@ Holds data for a single partition in the ring. ]). -record(kv_vnode, { - index :: non_neg_integer(), + index :: kv_ring:index(), table :: ets:table() }). @@ -27,7 +27,7 @@ Holds data for a single partition in the ring. %%% public api --spec start_link(Index :: non_neg_integer()) -> gen_server:start_ret(). +-spec start_link(Index :: kv_ring:index()) -> gen_server:start_ret(). start_link(Index) -> gen_server:start_link(?MODULE, [Index], []). @@ -41,7 +41,7 @@ put(VNode, Key, Value) -> %%% gen_server callbacks --spec init([Index :: non_neg_integer()]) -> {ok, state()}. +-spec init([Index :: kv_ring:index()]) -> {ok, state()}. init([Index]) -> {ok, #kv_vnode{index = Index, table = ets:new(?MODULE, [set, private])}}. diff --git a/apps/kv/src/kv_vnode_manager.erl b/apps/kv/src/kv_vnode_manager.erl index 58cbfcd..ee8f07c 100644 --- a/apps/kv/src/kv_vnode_manager.erl +++ b/apps/kv/src/kv_vnode_manager.erl @@ -4,8 +4,8 @@ -export([ start_link/0, - get_local/2, - put_local/3 + get_local/1, + put_local/2 ]). %%% gen_server callbacks @@ -20,12 +20,14 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec get_local(Index :: non_neg_integer(), Key :: term()) -> {ok, Value :: term()} | none. -get_local(Index, Key) -> +-spec get_local(Key :: binary()) -> {ok, Value :: term()} | none. +get_local(Key) -> + Index = kv_ring:key_to_index(Key), gen_server:call(?MODULE, {get, Index, Key}). --spec put_local(Index :: non_neg_integer(), Key :: term(), Value :: term()) -> ok. -put_local(Index, Key, Value) -> +-spec put_local(Key :: binary(), Value :: term()) -> ok. +put_local(Key, Value) -> + Index = kv_ring:key_to_index(Key), gen_server:cast(?MODULE, {put, Index, Key, Value}). %%% gen_server callbacks @@ -45,6 +47,7 @@ handle_cast({put, Index, Key, Value}, State) -> %%% internal functions +-spec get_vnode(Index :: kv_ring:index(), state()) -> pid(). get_vnode(Index, #kv_vnode_manager{table = Table}) -> case ets:lookup(Table, Index) of [{_, Pid}] -> diff --git a/apps/kv/src/kv_vnode_sup.erl b/apps/kv/src/kv_vnode_sup.erl index 431e3d8..e9113ed 100644 --- a/apps/kv/src/kv_vnode_sup.erl +++ b/apps/kv/src/kv_vnode_sup.erl @@ -12,7 +12,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec start_vnode(Index :: non_neg_integer()) -> supervisor:startchild_ret(). +-spec start_vnode(Index :: kv_ring:index()) -> supervisor:startchild_ret(). start_vnode(Index) -> supervisor:start_child(?MODULE, [Index]).