Changes Feed and Subscriptions¶
barrel_docdb tracks all document modifications with a changes feed. Changes are ordered using HLC (Hybrid Logical Clock) timestamps for consistent ordering across distributed systems.
Changes Feed¶
Getting Changes¶
Retrieve document changes since a timestamp:
%% Get all changes from the beginning
{ok, Changes, LastHlc} = barrel_docdb:get_changes(<<"mydb">>, first).
%% Get changes since a specific HLC timestamp
{ok, NewChanges, NewHlc} = barrel_docdb:get_changes(<<"mydb">>, LastHlc).
Change Structure¶
Each change is a map with these fields:
#{
id => <<"doc123">>, %% Document ID
hlc => #timestamp{...}, %% HLC timestamp
rev => <<"2-abc123">>, %% Current revision
deleted => false, %% Deletion flag
changes => [<<"2-abc123">>] %% List of new revisions
}
Options¶
{ok, Changes, _} = barrel_docdb:get_changes(<<"mydb">>, first, #{
limit => 100, %% Maximum changes to return
include_docs => true, %% Include full document body
descending => true, %% Reverse order (newest first)
doc_ids => [<<"doc1">>] %% Filter to specific documents
}).
Filtered Changes¶
Filter changes by path patterns or queries:
%% Filter by path pattern (MQTT-style)
{ok, Changes, _} = barrel_docdb:get_changes(<<"mydb">>, first, #{
paths => [<<"users/#">>]
}).
%% Filter by query
{ok, Changes, _} = barrel_docdb:get_changes(<<"mydb">>, first, #{
query => #{
where => [{path, [<<"type">>], <<"user">>}]
}
}).
Wildcard Performance: Path patterns with # wildcards (e.g., <<"users/#">>) use sharded posting list indexes for efficient HLC-ordered iteration. Changes are pre-indexed by path prefix and time bucket, enabling ~50x faster queries compared to full scan with deduplication.
HLC (Hybrid Logical Clock)¶
HLC provides distributed event ordering without synchronized clocks.
Why HLC?¶
- Causality: Events are ordered by cause-effect relationships
- Distributed: No central clock coordinator needed
- Monotonic: Timestamps always increase within a node
Using HLC¶
%% Get current HLC timestamp
Ts = barrel_docdb:get_hlc().
%% Generate a new timestamp (advances clock)
NewTs = barrel_docdb:new_hlc().
%% Synchronize with remote node's clock
RemoteTs = receive_from_remote(),
{ok, SyncedTs} = barrel_docdb:sync_hlc(RemoteTs).
Cross-Node Synchronization¶
When receiving data from another node:
handle_remote_message(#{hlc := RemoteHlc, data := Data}) ->
%% Sync local clock with remote
{ok, _} = barrel_docdb:sync_hlc(RemoteHlc),
%% Process data - subsequent writes will be ordered after remote events
process_data(Data).
Path Subscriptions¶
Subscribe to real-time notifications for documents matching path patterns.
Basic Subscription¶
%% Subscribe to all user document changes
{ok, SubRef} = barrel_docdb:subscribe(<<"mydb">>, <<"users/#">>).
%% Receive notifications
receive
{barrel_change, <<"mydb">>, Change} ->
#{id := DocId, rev := Rev, deleted := Deleted} = Change,
io:format("Document ~s changed (rev: ~s)~n", [DocId, Rev])
end.
%% Unsubscribe when done
ok = barrel_docdb:unsubscribe(SubRef).
Pattern Syntax¶
Patterns use MQTT-style wildcards:
| Pattern | Matches |
|---|---|
users/alice |
Exact path |
users/+ |
Any single segment: users/alice, users/bob |
users/# |
Any suffix: users/alice, users/alice/profile |
users/+/orders |
Single wildcard: users/alice/orders, users/bob/orders |
How Paths Work¶
Document structure is converted to paths:
%% This document:
#{
<<"id">> => <<"user:alice">>,
<<"type">> => <<"user">>,
<<"profile">> => #{
<<"name">> => <<"Alice">>,
<<"city">> => <<"Paris">>
}
}
%% Generates these paths:
%% - type/user
%% - profile/name/Alice
%% - profile/city/Paris
Notification Format¶
{barrel_change, DbName, #{
id => DocId, %% Document ID
rev => RevId, %% New revision
hlc => HlcTimestamp, %% HLC timestamp
deleted => boolean(), %% Deletion flag
paths => [binary()] %% Changed paths
}}
Query Subscriptions¶
Subscribe to changes for documents matching a query.
Basic Query Subscription¶
%% Subscribe to active user changes
Query = #{
where => [
{path, [<<"type">>], <<"user">>},
{path, [<<"active">>], true}
]
},
{ok, SubRef} = barrel_docdb:subscribe_query(<<"mydb">>, Query).
%% Receive notifications
receive
{barrel_query_change, <<"mydb">>, Change} ->
io:format("Matching document changed: ~p~n", [Change])
end.
%% Unsubscribe
ok = barrel_docdb:unsubscribe_query(SubRef).
Path-Optimized Evaluation¶
Query subscriptions are optimized using path extraction:
- Paths referenced in the query are extracted
- Changes are filtered by path intersection
- Full query evaluation only runs when paths overlap
This avoids evaluating the query for every document change.
Complete Examples¶
Real-Time Dashboard¶
-module(dashboard).
-export([start/1]).
start(DbName) ->
%% Subscribe to order changes
{ok, SubRef} = barrel_docdb:subscribe(DbName, <<"orders/#">>),
loop(SubRef).
loop(SubRef) ->
receive
{barrel_change, _Db, #{id := Id, deleted := false}} ->
update_dashboard(Id),
loop(SubRef);
{barrel_change, _Db, #{id := Id, deleted := true}} ->
remove_from_dashboard(Id),
loop(SubRef);
stop ->
barrel_docdb:unsubscribe(SubRef)
end.
Incremental Processing¶
process_changes(DbName, Since) ->
{ok, Changes, LastHlc} = barrel_docdb:get_changes(DbName, Since, #{
limit => 100,
include_docs => true
}),
%% Process batch
lists:foreach(fun process_change/1, Changes),
%% Continue if more changes exist
case length(Changes) of
100 -> process_changes(DbName, LastHlc);
_ -> {ok, LastHlc}
end.
Reactive Query Cache¶
-module(user_cache).
-behaviour(gen_server).
init([DbName]) ->
%% Subscribe to user changes
Query = #{where => [{path, [<<"type">>], <<"user">>}]},
{ok, SubRef} = barrel_docdb:subscribe_query(DbName, Query),
%% Build initial cache
{ok, Users, _} = barrel_docdb:find(DbName, Query),
Cache = maps:from_list([{maps:get(<<"id">>, U), U} || U <- Users]),
{ok, #{sub => SubRef, cache => Cache}}.
handle_info({barrel_query_change, _, #{id := Id, deleted := true}}, State) ->
{noreply, State#{cache := maps:remove(Id, maps:get(cache, State))}};
handle_info({barrel_query_change, _, #{id := Id}}, State = #{cache := Cache}) ->
%% Fetch updated document
{ok, Doc} = barrel_docdb:get_doc(<<"mydb">>, Id),
{noreply, State#{cache := Cache#{Id => Doc}}}.
API Reference¶
See the Erlang API Reference for complete function documentation.
Changes Feed¶
barrel_docdb:get_changes/2,3- Get changes since timestamp
HLC¶
barrel_docdb:get_hlc/0- Get current HLCbarrel_docdb:new_hlc/0- Generate new HLCbarrel_docdb:sync_hlc/1- Sync with remote HLC
Path Subscriptions¶
barrel_docdb:subscribe/2,3- Subscribe to path patternbarrel_docdb:unsubscribe/1- Unsubscribe
Query Subscriptions¶
barrel_docdb:subscribe_query/2,3- Subscribe to querybarrel_docdb:unsubscribe_query/1- Unsubscribe