Skip to content

Building Custom Operations

AP3 separates the protocol contract (defined in the SDK) from protocol implementations (shipped as separate packages or binaries). This lets you build and distribute your own privacy-preserving protocols without modifying the core SDK.

The central concept is the Operation abstract class in ap3. It defines a minimal, generic contract that any two-party (or multi-party) protocol must implement.


Design Principles

Principle Description
Pluggable Protocols are plain Python classes. Install and import them like any package.
Session-local Each Operation instance owns its own session state. No shared registry is needed.
Role-symmetric The same class handles both sides (initiator and receiver) of a protocol.

Core Abstractions

Operation

from ap3 import Operation, OperationResult

Operation is an abstract base class. Every protocol subclass must:

  1. Set a unique operation_id class variable.
  2. Implement on_start() — called once to kick off a session.
  3. Implement on_process() — called for every subsequent protocol round.
class Operation(ABC):
    operation_id: ClassVar[str]          # Must be non-empty; validated at class definition time

    def start(role, inputs, config, context, session_id) -> Dict   # Initiator entry-point
    def receive(role, message, config, context, session_id) -> Dict # Responder entry-point
    def process(session_id, message, context) -> Dict               # Subsequent rounds
    def has_session(session_id) -> bool                             # Session existence check

    @abstractmethod
    def on_start(role, inputs, config, context) -> OperationResult  # Implement this

    @abstractmethod
    def on_process(role, state, message, config, context) -> OperationResult  # And this

Public methods

Method Who calls it Purpose
start(role, inputs, ...) Initiator agent Begins a new session, calls on_start()
receive(role, message, ...) Responder agent Handles first inbound message, calls on_process() with empty state and is_first_message=True
process(session_id, message, ...) Both agents Handles subsequent rounds for a known session
has_session(session_id) Framework code Returns True if the session is still active

All three dispatch methods return a dict (via OperationResult.to_dict()):

{
    "session_id": "...",
    "operation": "protocol.psi.sanction.v1",
    "done": False,
    "outgoing": { ... },   # Present when there is a message to send
    "result": { ... },     # Present when done=True and a result was produced
    "metadata": { ... },
}

OperationResult

from ap3 import OperationResult

Your on_start() and on_process() implementations return an OperationResult:

@dataclass
class OperationResult:
    next_state: Dict[str, Any] = field(default_factory=dict)
    done: bool = False
    outgoing: Optional[Dict[str, Any]] = None
    result: Optional[Any] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
Field Description
next_state Serialisable dict persisted by Operation and passed back to you on the next round via state. Ignored when done=True.
done Set to True to signal the protocol is complete. The session is cleaned up automatically.
outgoing The message payload to send to the other party. Omit (or None) if no message needs to be sent.
result Final protocol output returned to the caller. Only meaningful when done=True.
metadata Arbitrary extra info (round numbers, protocol name, etc.)

Tip

Keep next_state small and serialisable — it is stored in memory for each active session. Do not put binary blobs directly in state; base64-encode them first.


Session Lifecycle

Initiator                              Responder
─────────                              ─────────
op.start(role="initiator", inputs)
  └─ on_start() → OperationResult
        outgoing → msg1 ──────────────→ op.receive(role="receiver", message=msg1)
                                             └─ on_process(state={}, is_first_message=True)
                                                   → OperationResult
               msg2 ←────────────────────────── outgoing ← msg2
op.process(session_id, msg2)
  └─ on_process(state=prev, message=msg2)
        result → DONE                    ... (more rounds or done=True on receiver)
  • receive() always calls on_process() with an empty state and context["is_first_message"] = True.
  • process() always calls on_process() with the last saved state and context["is_first_message"] = False.
  • When done=True the session is deleted from memory.

Writing a Custom Operation

Step 1 — Choose an operation_id

Use a reverse-domain string that uniquely identifies your protocol and version:

operation_id = "com.example.my-protocol.v1"

Operation.__init_subclass__ enforces this at class-definition time — if the class variable is missing or empty, a TypeError is raised immediately.

Step 2 — Implement on_start

Called when the initiator starts a new session. Return an OperationResult with the first outgoing message and whatever state the initiator will need for the next round.

def on_start(self, role, inputs, config, context) -> OperationResult:
    if role != "initiator":
        raise ValueError("on_start is only for role='initiator'")

    proposal_value = int(inputs.get("proposal_value"))

    return OperationResult(
        next_state={"proposal_value": proposal_value},
        outgoing={"phase": "proposal", "proposal_value": proposal_value},
        metadata={"protocol": "threshold_approval"},
    )

Step 3 — Implement on_process

Called for every protocol round on both sides. Dispatch on role and the message contents.

def on_process(self, role, state, message, config, context) -> OperationResult:
    if role == "receiver":
        # Responder logic: receive proposal, emit decision
        threshold = int(config.get("threshold", 0))
        approved = int(message["proposal_value"]) >= threshold
        return OperationResult(
            done=True,
            outgoing={"phase": "decision", "approved": approved},
        )

    # Initiator logic: receive decision, return final result
    return OperationResult(
        done=True,
        result={
            "approved": bool(message["approved"]),
            "proposal_value": int(state["proposal_value"]),
        },
    )

Step 4 — Use the operation

op = ThresholdApprovalOperation()

# Initiator side
init_out = op.start(role="initiator", inputs={"proposal_value": 84})
session_id = init_out["session_id"]

# Responder side (different operation instance, or same in tests)
recv_out = op.receive(
    role="receiver",
    message=init_out["outgoing"],
    config={"threshold": 80},
)

# Initiator processes the response
final = op.process(session_id, recv_out["outgoing"])
print(final["result"])  # {"approved": True, "proposal_value": 84}

For a working end-to-end example, see the PSI demo in examples/psi/.


Reference Implementations: ap3operations

ap3operations is the canonical external package that ships production implementations of PSI. Study it when building your own binary-backed operation.

Installing

pip install ap3operations   # if published separately, or:
uv add ap3operations

When working inside the monorepo it is already available as a local package.

PSI Sanction Check

from ap3operations import PSIOperation

op = PSIOperation()

# Initiator
init_out = op.start(
    role="initiator",
    inputs={"customer_data": "John Doe,ID123,123 Main St"},
)

# Responder
recv_out = op.receive(
    role="receiver",
    message=init_out["outgoing"],
    config={
        "sanction_list": [
            "Jane Doe,ID999,777 River St",
            "John Doe,ID123,123 Main St",
        ]
    },
)

# Initiator finalises
result = op.process(init_out["session_id"], recv_out["outgoing"])
print(result["result"])  # {"is_match": True}

operation_id: "protocol.psi.sanction.v1"
Binary: libsanction_check_lib (PSI via ap3operations/psi/ffi.py)


Quick Reference

# Define a protocol
from ap3 import Operation, OperationResult

class MyOperation(Operation):
    operation_id = "com.example.my-protocol.v1"

    def on_start(self, role, inputs, config, context) -> OperationResult:
        ...  # called once by start()

    def on_process(self, role, state, message, config, context) -> OperationResult:
        ...  # called by receive() and process()


# Use it
op = MyOperation()
out = op.start("initiator", inputs={"key": "value"})
# session_id = out["session_id"]
# out["outgoing"] → send to peer
# out["done"]     → True when finished
# out["result"]   → final result (when done=True)