Design¶
This document describes the architecture and internals of barrel_embed.
Architecture Overview¶
barrel_embed uses a two-layer architecture that bridges Erlang and Python:
┌─────────────────────────────────────────────────────────┐
│ Erlang Application │
├─────────────────────────────────────────────────────────┤
│ barrel_embed (API) │
│ ↓ │
│ barrel_embed_local/fastembed/splade/... (Providers) │
│ ↓ │
│ barrel_embed_port_server (gen_server) │
│ ↓ JSON-lines over stdio │
├─────────────────────────────────────────────────────────┤
│ Python Process │
│ ↓ │
│ barrel_embed (asyncio server) │
│ ↓ │
│ providers/* (sentence_transformers, fastembed, etc.) │
└─────────────────────────────────────────────────────────┘
Key components:
- barrel_embed - Main API module that coordinates providers
- Provider modules - Implement the
barrel_embed_providerbehaviour - barrel_embed_port_server - gen_server managing Python port communication
- Python async server - Asyncio-based server handling embedding requests
- Python providers - Model-specific implementations
Request Multiplexing¶
The Problem¶
Without multiplexing, concurrent Erlang processes calling embed_batch would serialize - each would wait for the previous to complete. This is problematic because:
- Embedding generation is CPU-bound (model inference)
- Multiple Erlang processes often need embeddings concurrently
- Serializing requests significantly increases latency
The Solution¶
Request multiplexing allows concurrent requests to the same model:
- Erlang gen_server assigns a unique ID to each request
- All requests are sent immediately to Python (non-blocking)
- Python processes requests concurrently using a thread pool
- Python includes the request ID in each response
- gen_server routes responses to the correct caller
Sequence Diagram¶
Process A ──┐
Process B ──┼──► gen_server ──► Python ──► Model
Process C ──┘ │ │
│◄─────────────┘
│ (routes by ID)
┌────────┼────────┐
↓ ↓ ↓
Proc A Proc B Proc C
Each process receives its response as soon as it's ready, not waiting for other processes.
Erlang Port Server¶
barrel_embed_port_server is a gen_server that manages Python port communication.
State Structure¶
-record(state, {
port :: port(), % Erlang port to Python
pending = #{} :: #{integer() => {pid(), reference()}}, % Request ID to caller
next_id = 1 :: integer(), % Next request ID
timeout :: timeout(), % Default timeout
buffer = <<>> :: binary() % Partial line buffer
}).
Request Flow¶
- Receive call -
handle_call({embed_batch, Texts}, From, State) - Build request - JSON with action, texts, and unique ID
- Send to port -
port_command(Port, [Json, "\n"]) - Track caller - Store
Fromin pending map with ID as key - Return noreply - Caller blocks waiting for response
Response Flow¶
- Port data arrives -
handle_info({Port, {data, {eol, Line}}}, State) - Parse JSON - Extract response and ID
- Lookup caller - Find
Fromin pending map by ID - Reply -
gen_server:reply(From, Result) - Cleanup - Remove ID from pending map
Buffer Handling¶
Large embedding responses may exceed line buffer size:
handle_info({Port, {data, {noeol, Partial}}}, State) ->
%% Accumulate partial data
{noreply, State#state{buffer = <<Buffer/binary, Partial/binary>>}};
handle_info({Port, {data, {eol, Line}}}, State) ->
%% Combine buffer with complete line
FullLine = <<Buffer/binary, Line/binary>>,
handle_response(FullLine, State#state{buffer = <<>>});
Python Async Server¶
AsyncEmbedServer Base Class¶
The AsyncEmbedServer provides the async infrastructure:
class AsyncEmbedServer(ABC):
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.write_lock = asyncio.Lock()
async def run(self):
# Setup async stdin/stdout
# Main loop: read request, dispatch, write response
async def handle_request(self, line: bytes, writer):
# Parse JSON, dispatch, include ID in response
async def dispatch(self, request: dict) -> dict:
# Route to appropriate handler
async def handle_embed(self, texts: list) -> dict:
# Run embed_sync in thread pool
result = await loop.run_in_executor(self.executor, self.embed_sync, texts)
return result
Concurrency Model¶
- asyncio - Non-blocking I/O for stdin/stdout
- ThreadPoolExecutor - CPU-bound embedding work runs in threads
- write_lock - Prevents output interleaving from concurrent handlers
- uvloop (optional) - Higher performance event loop
Request Handling¶
async def handle_request(self, line: bytes, writer):
request_id = None
try:
request = json.loads(line.decode())
request_id = request.get("id")
response = await self.dispatch(request)
except Exception as e:
response = {"ok": False, "error": str(e)}
if request_id is not None:
response["id"] = request_id
async with self.write_lock:
output = json.dumps(response) + "\n"
writer.write(output.encode())
await writer.drain()
JSON Protocol¶
Communication uses newline-delimited JSON (JSON Lines).
Request Format¶
{"id": 1, "action": "embed", "texts": ["hello", "world"]}
{"id": 2, "action": "info"}
{"id": 3, "action": "embed_image", "images": ["base64..."]}
| Field | Type | Description |
|---|---|---|
id |
integer | Unique request identifier |
action |
string | One of: embed, embed_image, info |
texts |
array | Text strings to embed |
images |
array | Base64-encoded images |
Response Format¶
Success:
{"id": 1, "ok": true, "embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]]}
{"id": 2, "ok": true, "dimensions": 768, "model": "BAAI/bge-base-en-v1.5", "backend": "fastembed"}
Error:
| Field | Type | Description |
|---|---|---|
id |
integer | Matches request ID |
ok |
boolean | Success indicator |
embeddings |
array | List of embedding vectors |
error |
string | Error message (when ok=false) |
Provider Lifecycle¶
Initialization¶
- Erlang provider init -
barrel_embed_fastembed:init(Config) - Start port server -
barrel_embed_port_server:start_link(Python, Args, Opts) - Python starts - Parses args, imports provider module
- Load model -
server.load_model()downloads/loads model - Get info - Erlang calls
info/2to get dimensions - Ready - Provider stores server pid in config
Embedding Request¶
- User calls -
barrel_embed:embed(Text, State) - Provider dispatches -
barrel_embed_fastembed:embed_batch([Text], Config) - Port server sends - JSON request to Python
- Python embeds - Thread pool runs model inference
- Response returns - JSON with embeddings
- Result delivered - To original caller
Cleanup¶
When the Erlang process terminates:
1. Port server's terminate/2 called
2. port_close(Port) signals Python
3. Python's stdin closes, async loop exits
4. Thread pool shuts down
One Server Per Model¶
Each provider instance creates its own:
- gen_server process
- Python port/process
- Loaded model in memory
This is intentional:
- Different models need separate Python processes
- Model isolation prevents memory/state conflicts
- Multiplexing benefits concurrent calls to the SAME model
If you need multiple embedding models, initialize multiple provider states:
{ok, BgeState} = barrel_embed:init(#{embedder => {fastembed, #{model => "BAAI/bge-small-en-v1.5"}}}).
{ok, NomicState} = barrel_embed:init(#{embedder => {fastembed, #{model => "nomic-ai/nomic-embed-text-v1.5"}}}).
Error Handling¶
Python Errors¶
try:
embeddings = self.model.encode(texts)
return {"ok": True, "embeddings": embeddings.tolist()}
except Exception as e:
return {"ok": False, "error": str(e)}
Erlang Error Decoding¶
Port Exit Handling¶
If Python crashes:
handle_info({Port, {exit_status, Status}}, State) ->
%% Fail all pending requests
maps:foreach(fun(_, From) ->
gen_server:reply(From, {error, {port_exited, Status}})
end, Pending),
{stop, {port_exited, Status}, State}.
All pending callers receive an error tuple rather than hanging.
Performance Considerations¶
Thread Pool Sizing¶
Default max_workers=4 balances:
- CPU utilization - Multiple concurrent embeddings
- Memory - Each thread may hold model state
- Contention - Too many threads cause lock contention
Large Batches¶
For very large batches, consider:
- Using batch_size option to chunk requests
- The line buffer is 10MB but very large responses may need buffering
uvloop¶
Install uvloop for better async performance:
The server auto-detects and uses it when available.