Using this book as part of a course? Please let us know!

Implementing Callbacks

7.6. Implementing Callbacks#

When a callback is registered with a promise using bind or one of the other syntaxes, it is added to a list of callbacks that is stored with the promise. Eventually, if the promise is fulfilled, the Lwt resolution loop runs all the callbacks registered with the promise. There is no guarantee about the execution order of callbacks for a promise. In other words, the execution order is nondeterministic. If the order matters, the programmer needs to use the composition operators (such as bind and join) to enforce an ordering. If the promise never becomes fulfilled (or is rejected), none of its callbacks will ever be run.

Note

Lwt also supports registering functions that are run after a promise is rejected. Lwt.catch and try%lwt are used for this purpose. They are counterparts to Lwt.bind and let%lwt.

Once again, it’s important to keep track of where the concurrency really comes from: the OS. There might be many asynchronous I/O operations occurring at the OS level. But at the OCaml level, the resolution loop is sequential, meaning that only one callback can ever be running at a time.

Finally, the resolution loop never attempts to interrupt a callback. So if one callback goes into an infinite loop, no other callback will ever get to run. That makes Lwt a cooperative concurrency mechanism, rather than a preemptive one.

To better understand callback resolution, let’s implement it ourselves. We’ll use the Promise data structure we developed earlier. To start, we add a bind operator to the Promise signature:

module type PROMISE = sig
  ...

  (** [p >>= c] registers callback [c] with promise [p].
      When the promise is fulfilled, the callback will be run
      on the promises's contents.  If the promise is never
      fulfilled, the callback will never run. *)
  val ( >>= ) : 'a promise -> ('a -> 'b promise) -> 'b promise
end

Next, let’s re-develop the entire Promise structure. We start off just like before:

module Promise : PROMISE = struct
  type 'a state = Pending | Fulfilled of 'a | Rejected of exn
  ...

But now to implement the representation type of promises, we use a record with mutable fields. The first field is the state of the promise, and it corresponds to the ref we used before. The second field is more interesting and is discussed below.

  (** RI: the input may not be [Pending]. *)
  type 'a handler = 'a state -> unit

  (** RI: if [state <> Pending] then [handlers = []]. *)
  type 'a promise = {
    mutable state : 'a state;
    mutable handlers : 'a handler list
  }

A handler is a new abstraction: a function that takes a state. The primary use for a handler will be to run callbacks. It will be used to fulfill and reject promises when their state is ready to switch away from pending. This is why we ask, via a representation invariant, that the input state to a handler may not be pending.

We require that only pending promises may have handlers waiting in their list. Once the state becomes non-pending, i.e., either fulfilled or rejected, the handlers associated with the promise will all be processed and removed from the list. This is why we say, as a representation invariant, that if the state is not pending, then the handlers list must be empty.

This helper function that enqueues a handler on a promise’s handler list will be helpful later:

  let enqueue
      (handler : 'a state -> unit)
      (promise : 'a promise) : unit
    =
    promise.handlers <- handler :: promise.handlers

We continue to pun resolvers and promises internally:

  type 'a resolver = 'a promise

Because we changed the representation type from a ref to a record, we have to update a few of the functions in trivial ways:

  (** [write_once p s] changes the state of [p] to be [s].  If [p] and [s]
      are both pending, that has no effect.
      Raises: [Invalid_arg] if the state of [p] is not pending. *)
  let write_once p s =
    if p.state = Pending
    then p.state <- s
    else invalid_arg "cannot write twice"

  let make () =
    let p = {state = Pending; handlers = []} in
    (p, p)

  let return x =
    {state = Fulfilled x; handlers = []}

  let state p = p.state

Now we get to the trickier parts of the implementation.

The steps needed to reject a promise (with an exception) or fulfill a promise (with a value) are quite similar, so we implement a helper function resolve. This helper takes a resolver and a state, and it changes the state of the associated promise to the given state. We require that the state st that we are moving over to may not be the pending state. We mutate the handlers list to be empty to ensure that the RI holds, but we save the handlers in a local variable. Then we call write_once on the resolver to change its state. Finally, we process all the handlers that were waiting on this promise. Each of those handlers requires a state for an input, and we pass them the new state that the promise has just been set to.

  (** Requires: [st] may not be [Pending]. *)
  let resolve (r : 'a resolver) (st : 'a state) =
    assert (st <> Pending);
    let handlers = r.handlers in
    r.handlers <- [];
    write_once r st;
    List.iter (fun f -> f st) handlers

  let reject r e =
    resolve r (Rejected e)

  let fulfill r v =
    resolve r (Fulfilled v)

Finally, the implementation of >>= is the trickiest part. Recall that the bind function needs to immediately return a new promise. First, if the input promise is already fulfilled, let’s go ahead and immediately run the callback on it. The callback will yield a new promise, which we immediately return:

  let ( >>= )
      (input_promise : 'a promise)
      (callback : 'a -> 'b promise) : 'b promise
    =
    match input_promise.state with
    | Fulfilled x -> callback x

Second, if the promise is already rejected, then we quickly craft a new promise that is also rejected with the same exception as has no handlers waiting on it. We return that new promise to the user immediately:

    | Rejected exc -> {state = Rejected exc; handlers = []}

Third, if the input promise is pending, we need to do more work. Our task is delicate: we need to immediately return a new promise (which we will call the output promise) to the user, but we also need that output promise to become fulfilled when (or if) the input promise becomes fulfilled and the callback completes running, sometime in the future. Its contents will be whatever contents are contained within the promise that the callback itself returns.

So, we create a new promise and resolver called output_promise and output_resolver. That promise is what bind returns. Before returning it, we use a helper function handler_of_callback (described below) to transform the callback into a handler, and enqueue that handler on the promise. That ensures the handler will be run when the promise later becomes resolved:

    | Pending ->
      let output_promise, output_resolver = make () in
      enqueue (handler_of_callback callback output_resolver) input_promise;
      output_promise

All that’s left is to implement that helper function to create handlers out of callbacks. Recall that a handler’s type is itself a function type, 'a state -> unit. This is why our helper function’s output is actally an anonymous function. That anonymous function takes a state as its input:

  let handler_of_callback
      (callback : 'a -> 'b promise)
      (resolver : 'b resolver) : 'a handler =
      fun (state : 'a state) ->

We proceed by taking cases on that input state. The first two cases, below, are simple. It would violate the RI to call a handler on a pending state. And if the state is rejected, then the handler should propagate that rejection to the resolver, which causes the promise returned by bind to also be rejected.

  let handler_of_callback
      (callback : 'a -> 'b promise)
      (resolver : 'b resolver) : 'a handler =
      fun (state : 'a state) ->
      match state with
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc

But if the state is fulfiled, then the callback registered with the promise can—at last!—be run on the contents of the fulfilled promise. If the callback executes successfully it produces a new promise, but recall that the callback may itself raise an exception.

First, consider the optimistic case in which the callback executes successfully and produces a promise. That promise might already be rejected or fulfilled, in which case that state again propagates.

      | Fulfilled x ->
        let promise = callback x in
        match promise.state with
        | Fulfilled y -> resolve resolver y
        | Rejected exc -> reject resolver exc

But the promise might still be pending. In that case, we need to enqueue a new handler whose purpose is to do the propagation once the result is available:

        | Pending -> enqueue (copying_handler resolver) promise

where copying_handler is a new helper function that creates a very simple handler to do that propagation:

  let copying_handler (resolver : 'a resolver) : 'a handler
    = function
      | Pending -> failwith "handler RI violated"
      | Rejected exc -> reject resolver exc
      | Fulfilled x -> resolve resolver x

Second, consider the case in which the callback function itself raises some exception exc. In that case, we need to reject the promise with that exception. We do this by wrapping the execution of the callback in a try block:

      | Fulfilled x ->
        try
          let promise = callback x in
          match promise.state with
          | Fulfilled y -> resolve resolver y
          | Rejected exc -> reject resolver exc
          | Pending -> enqueue (copying_handler resolver) promise
        with exc -> reject resolver exc

The Lwt implementation of bind follows essentially the same algorithm as we just implemented. Note that there is no concurrency in bind: as we said above, it’s the OS that provides the concurrency.