Building Custom Operations
AP3 separates the protocol contract (defined in the SDK) from protocol implementations (shipped as separate packages or binaries). This lets you build and distribute your own privacy-preserving protocols without modifying the core SDK.
The central concept is the Operation abstract class in ap3. It defines a minimal, generic contract that any two-party (or multi-party) protocol must implement.
Design Principles
| Principle | Description |
|---|---|
| Pluggable | Protocols are plain Python classes. Install and import them like any package. |
| Session-local | Each Operation instance owns its own session state. No shared registry is needed. |
| Role-symmetric | The same class handles both sides (initiator and receiver) of a protocol. |
Core Abstractions
Operation
from ap3 import Operation, OperationResult
Operation is an abstract base class. Every protocol subclass must:
- Set a unique
operation_idclass variable. - Implement
on_start()— called once to kick off a session. - Implement
on_process()— called for every subsequent protocol round.
class Operation(ABC):
operation_id: ClassVar[str] # Must be non-empty; validated at class definition time
def start(role, inputs, config, context, session_id) -> Dict # Initiator entry-point
def receive(role, message, config, context, session_id) -> Dict # Responder entry-point
def process(session_id, message, context) -> Dict # Subsequent rounds
def has_session(session_id) -> bool # Session existence check
@abstractmethod
def on_start(role, inputs, config, context) -> OperationResult # Implement this
@abstractmethod
def on_process(role, state, message, config, context) -> OperationResult # And this
Public methods
| Method | Who calls it | Purpose |
|---|---|---|
start(role, inputs, ...) |
Initiator agent | Begins a new session, calls on_start() |
receive(role, message, ...) |
Responder agent | Handles first inbound message, calls on_process() with empty state and is_first_message=True |
process(session_id, message, ...) |
Both agents | Handles subsequent rounds for a known session |
has_session(session_id) |
Framework code | Returns True if the session is still active |
All three dispatch methods return a dict (via OperationResult.to_dict()):
{
"session_id": "...",
"operation": "protocol.psi.sanction.v1",
"done": False,
"outgoing": { ... }, # Present when there is a message to send
"result": { ... }, # Present when done=True and a result was produced
"metadata": { ... },
}
OperationResult
from ap3 import OperationResult
Your on_start() and on_process() implementations return an OperationResult:
@dataclass
class OperationResult:
next_state: Dict[str, Any] = field(default_factory=dict)
done: bool = False
outgoing: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
metadata: Dict[str, Any] = field(default_factory=dict)
| Field | Description |
|---|---|
next_state |
Serialisable dict persisted by Operation and passed back to you on the next round via state. Ignored when done=True. |
done |
Set to True to signal the protocol is complete. The session is cleaned up automatically. |
outgoing |
The message payload to send to the other party. Omit (or None) if no message needs to be sent. |
result |
Final protocol output returned to the caller. Only meaningful when done=True. |
metadata |
Arbitrary extra info (round numbers, protocol name, etc.) |
Tip
Keep next_state small and serialisable — it is stored in memory for each active session. Do not put binary blobs directly in state; base64-encode them first.
Session Lifecycle
Initiator Responder
───────── ─────────
op.start(role="initiator", inputs)
└─ on_start() → OperationResult
outgoing → msg1 ──────────────→ op.receive(role="receiver", message=msg1)
└─ on_process(state={}, is_first_message=True)
→ OperationResult
msg2 ←────────────────────────── outgoing ← msg2
op.process(session_id, msg2)
└─ on_process(state=prev, message=msg2)
result → DONE ... (more rounds or done=True on receiver)
receive()always callson_process()with an emptystateandcontext["is_first_message"] = True.process()always callson_process()with the last savedstateandcontext["is_first_message"] = False.- When
done=Truethe session is deleted from memory.
Writing a Custom Operation
Step 1 — Choose an operation_id
Use a reverse-domain string that uniquely identifies your protocol and version:
operation_id = "com.example.my-protocol.v1"
Operation.__init_subclass__ enforces this at class-definition time — if the class variable is missing or empty, a TypeError is raised immediately.
Step 2 — Implement on_start
Called when the initiator starts a new session. Return an OperationResult with the first outgoing message and whatever state the initiator will need for the next round.
def on_start(self, role, inputs, config, context) -> OperationResult:
if role != "initiator":
raise ValueError("on_start is only for role='initiator'")
proposal_value = int(inputs.get("proposal_value"))
return OperationResult(
next_state={"proposal_value": proposal_value},
outgoing={"phase": "proposal", "proposal_value": proposal_value},
metadata={"protocol": "threshold_approval"},
)
Step 3 — Implement on_process
Called for every protocol round on both sides. Dispatch on role and the message contents.
def on_process(self, role, state, message, config, context) -> OperationResult:
if role == "receiver":
# Responder logic: receive proposal, emit decision
threshold = int(config.get("threshold", 0))
approved = int(message["proposal_value"]) >= threshold
return OperationResult(
done=True,
outgoing={"phase": "decision", "approved": approved},
)
# Initiator logic: receive decision, return final result
return OperationResult(
done=True,
result={
"approved": bool(message["approved"]),
"proposal_value": int(state["proposal_value"]),
},
)
Step 4 — Use the operation
op = ThresholdApprovalOperation()
# Initiator side
init_out = op.start(role="initiator", inputs={"proposal_value": 84})
session_id = init_out["session_id"]
# Responder side (different operation instance, or same in tests)
recv_out = op.receive(
role="receiver",
message=init_out["outgoing"],
config={"threshold": 80},
)
# Initiator processes the response
final = op.process(session_id, recv_out["outgoing"])
print(final["result"]) # {"approved": True, "proposal_value": 84}
For a working end-to-end example, see the PSI demo in examples/psi/.
Reference Implementations: ap3operations
ap3operations is the canonical external package that ships production implementations of PSI. Study it when building your own binary-backed operation.
Installing
pip install ap3operations # if published separately, or:
uv add ap3operations
When working inside the monorepo it is already available as a local package.
PSI Sanction Check
from ap3operations import PSIOperation
op = PSIOperation()
# Initiator
init_out = op.start(
role="initiator",
inputs={"customer_data": "John Doe,ID123,123 Main St"},
)
# Responder
recv_out = op.receive(
role="receiver",
message=init_out["outgoing"],
config={
"sanction_list": [
"Jane Doe,ID999,777 River St",
"John Doe,ID123,123 Main St",
]
},
)
# Initiator finalises
result = op.process(init_out["session_id"], recv_out["outgoing"])
print(result["result"]) # {"is_match": True}
operation_id: "protocol.psi.sanction.v1"
Binary: libsanction_check_lib (PSI via ap3operations/psi/ffi.py)
Quick Reference
# Define a protocol
from ap3 import Operation, OperationResult
class MyOperation(Operation):
operation_id = "com.example.my-protocol.v1"
def on_start(self, role, inputs, config, context) -> OperationResult:
... # called once by start()
def on_process(self, role, state, message, config, context) -> OperationResult:
... # called by receive() and process()
# Use it
op = MyOperation()
out = op.start("initiator", inputs={"key": "value"})
# session_id = out["session_id"]
# out["outgoing"] → send to peer
# out["done"] → True when finished
# out["result"] → final result (when done=True)