reduce messaging to vnode manager

This commit is contained in:
wires 2025-04-07 11:20:18 -04:00
parent 5a6e41cab7
commit 66db32f2e7
Signed by: wires
SSH key fingerprint: SHA256:9GtP+M3O2IivPDlw1UY872UPUuJH2gI0yG6ExBxaaiM

View file

@ -11,9 +11,7 @@
%%% gen_server callbacks %%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2]). -export([init/1, handle_call/3, handle_cast/2]).
-record(kv_vnode_manager, {table :: ets:table()}). -type state() :: {}.
-type state() :: #kv_vnode_manager{}.
%%% public api %%% public api
@ -23,37 +21,44 @@ start_link() ->
-spec get_local(Key :: binary()) -> {ok, Value :: term()} | none. -spec get_local(Key :: binary()) -> {ok, Value :: term()} | none.
get_local(Key) -> get_local(Key) ->
Index = kv_ring:key_to_index(Key), Index = kv_ring:key_to_index(Key),
gen_server:call(?MODULE, {get, Index, Key}). VNode = get_vnode(Index),
kv_vnode:get(VNode, Key).
-spec put_local(Key :: binary(), Value :: term()) -> ok. -spec put_local(Key :: binary(), Value :: term()) -> ok.
put_local(Key, Value) -> put_local(Key, Value) ->
Index = kv_ring:key_to_index(Key), Index = kv_ring:key_to_index(Key),
gen_server:cast(?MODULE, {put, Index, Key, Value}). VNode = get_vnode(Index),
kv_vnode:put(VNode, Key, Value).
%%% gen_server callbacks %%% gen_server callbacks
-spec init([]) -> {ok, state()}. -spec init([]) -> {ok, state()}.
init([]) -> init([]) ->
{ok, #kv_vnode_manager{table = ets:new(?MODULE, [set])}}. ?MODULE = ets:new(?MODULE, [set, protected, named_table, {read_concurrency, true}]),
{ok, {}}.
handle_call({get, Index, Key}, _From, State) -> handle_call({get_vnode, Index}, _From, State) ->
VNode = get_vnode(Index, State), VNode =
{reply, kv_vnode:get(VNode, Key), State}. case ets:lookup(?MODULE, Index) of
handle_cast({put, Index, Key, Value}, State) ->
VNode = get_vnode(Index, State),
ok = kv_vnode:put(VNode, Key, Value),
{noreply, State}.
%%% internal functions
-spec get_vnode(Index :: kv_ring:index(), state()) -> pid().
get_vnode(Index, #kv_vnode_manager{table = Table}) ->
case ets:lookup(Table, Index) of
[{_, Pid}] -> [{_, Pid}] ->
Pid; Pid;
[] -> [] ->
{ok, Pid} = kv_vnode_sup:start_vnode(Index), {ok, Pid} = kv_vnode_sup:start_vnode(Index),
ets:insert(Table, {Index, Pid}), ets:insert(?MODULE, {Index, Pid}),
Pid Pid
end,
{reply, VNode, State}.
handle_cast(_, _) ->
error(not_implemented).
%%% internal functions
-spec get_vnode(Index :: kv_ring:index()) -> pid().
get_vnode(Index) ->
case ets:lookup(?MODULE, Index) of
[{_, Pid}] ->
Pid;
[] ->
gen_server:call(?MODULE, {get_vnode, Index})
end. end.