add key hashing logic
This commit is contained in:
parent
155c24a535
commit
5a6e41cab7
4 changed files with 50 additions and 10 deletions
37
apps/kv/src/kv_ring.erl
Normal file
37
apps/kv/src/kv_ring.erl
Normal file
|
@ -0,0 +1,37 @@
|
|||
-module(kv_ring).
|
||||
|
||||
-export([
|
||||
new/2,
|
||||
key_to_index/1,
|
||||
ordered_from/2
|
||||
]).
|
||||
|
||||
-define(RING_MAX, trunc(math:pow(2, 160))).
|
||||
|
||||
-export_type([ring/0]).
|
||||
|
||||
-type index() :: non_neg_integer().
|
||||
|
||||
-opaque ring() :: {Partitions :: non_neg_integer(), [{Index :: index(), Owner :: node()}]}.
|
||||
|
||||
%%% public api
|
||||
|
||||
-spec new(Partitions :: non_neg_integer(), Node :: node()) -> ring().
|
||||
new(Partitions, Node) ->
|
||||
{Partitions, [{Index, Node} || Index <- lists:seq(0, ?RING_MAX - 1, ring_step(Partitions))]}.
|
||||
|
||||
-spec key_to_index(Key :: binary()) -> index().
|
||||
key_to_index(Key) ->
|
||||
<<Index:160/integer>> = crypto:hash(sha, Key),
|
||||
Index.
|
||||
|
||||
-spec ordered_from(Ring :: ring(), Index :: index()) -> [{Index :: index(), Owner :: node()}].
|
||||
ordered_from({Partitions, OwnerList}, Index) ->
|
||||
Step = ring_step(Partitions),
|
||||
{A, B} = lists:split(Index div Step, OwnerList),
|
||||
B ++ A.
|
||||
|
||||
%%% internal functions
|
||||
|
||||
ring_step(RingSize) ->
|
||||
?RING_MAX div RingSize.
|
|
@ -19,7 +19,7 @@ Holds data for a single partition in the ring.
|
|||
]).
|
||||
|
||||
-record(kv_vnode, {
|
||||
index :: non_neg_integer(),
|
||||
index :: kv_ring:index(),
|
||||
table :: ets:table()
|
||||
}).
|
||||
|
||||
|
@ -27,7 +27,7 @@ Holds data for a single partition in the ring.
|
|||
|
||||
%%% public api
|
||||
|
||||
-spec start_link(Index :: non_neg_integer()) -> gen_server:start_ret().
|
||||
-spec start_link(Index :: kv_ring:index()) -> gen_server:start_ret().
|
||||
start_link(Index) ->
|
||||
gen_server:start_link(?MODULE, [Index], []).
|
||||
|
||||
|
@ -41,7 +41,7 @@ put(VNode, Key, Value) ->
|
|||
|
||||
%%% gen_server callbacks
|
||||
|
||||
-spec init([Index :: non_neg_integer()]) -> {ok, state()}.
|
||||
-spec init([Index :: kv_ring:index()]) -> {ok, state()}.
|
||||
init([Index]) ->
|
||||
{ok, #kv_vnode{index = Index, table = ets:new(?MODULE, [set, private])}}.
|
||||
|
||||
|
|
|
@ -4,8 +4,8 @@
|
|||
|
||||
-export([
|
||||
start_link/0,
|
||||
get_local/2,
|
||||
put_local/3
|
||||
get_local/1,
|
||||
put_local/2
|
||||
]).
|
||||
|
||||
%%% gen_server callbacks
|
||||
|
@ -20,12 +20,14 @@
|
|||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec get_local(Index :: non_neg_integer(), Key :: term()) -> {ok, Value :: term()} | none.
|
||||
get_local(Index, Key) ->
|
||||
-spec get_local(Key :: binary()) -> {ok, Value :: term()} | none.
|
||||
get_local(Key) ->
|
||||
Index = kv_ring:key_to_index(Key),
|
||||
gen_server:call(?MODULE, {get, Index, Key}).
|
||||
|
||||
-spec put_local(Index :: non_neg_integer(), Key :: term(), Value :: term()) -> ok.
|
||||
put_local(Index, Key, Value) ->
|
||||
-spec put_local(Key :: binary(), Value :: term()) -> ok.
|
||||
put_local(Key, Value) ->
|
||||
Index = kv_ring:key_to_index(Key),
|
||||
gen_server:cast(?MODULE, {put, Index, Key, Value}).
|
||||
|
||||
%%% gen_server callbacks
|
||||
|
@ -45,6 +47,7 @@ handle_cast({put, Index, Key, Value}, State) ->
|
|||
|
||||
%%% internal functions
|
||||
|
||||
-spec get_vnode(Index :: kv_ring:index(), state()) -> pid().
|
||||
get_vnode(Index, #kv_vnode_manager{table = Table}) ->
|
||||
case ets:lookup(Table, Index) of
|
||||
[{_, Pid}] ->
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
-spec start_vnode(Index :: non_neg_integer()) -> supervisor:startchild_ret().
|
||||
-spec start_vnode(Index :: kv_ring:index()) -> supervisor:startchild_ret().
|
||||
start_vnode(Index) ->
|
||||
supervisor:start_child(?MODULE, [Index]).
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue