O'Reilly    
 Published on O'Reilly (http://oreilly.com/)
 See this if you're having trouble printing code examples


Fast Prototyping of Telephony Applications with YATE

by Maciek Kaminski
06/27/2006

So you have an idea for a novel, non-trivial telephony application? If it is truly novel, you will probably have to extend your favorite open source PBX to try it. When implementing your idea, if you are really lucky and/or a perfect coder, you'll get it right on your first try. However, the rest of us will have to go through a series of prototypes.

At the moment, the most common way of developing non-trivial open source telephony extensions is to code them directly in C/C++. Transmitting, receiving, and processing voice data generates thousands of events per second, and it is often most efficient to implement such routines in a low-level language such as C/C++.

However, when prototyping it is often more desirable to work in a higher-level language. Experiments are more readily and easily implemented in agile languages with extensive standard libraries that integrate seamlessly with databases, email, http, etc. Also, for most applications, the time-limiting factor is often the user interaction. This means that in some situations, the application must be as fast as a human.

In this article, I will present the YATE project (Yet Another Telephony Engine). YATE's API boundaries separate the parts of a telephony application that have to be "fast" from those that have to be just "fast enough." As a result, YATE allows developers to write scripts in higher-level languages, while leveraging the performance of native libraries without sacrificing too much efficiency.

Architecture

The YATE architecture owes a lot to the concept of a microkernel. Its core provides a minimal number of concepts and functions, delegating implementation of other functionality to modules. To communicate with each other, modules exchange messages.

A message contains four pieces of data (in order): a type, a list of character attributes, a return value, and one binary attribute. The binary attribute is used to carry a CallEndPoint, a concept that facilitates the manipulation of "media wiring." A CallEndPoint is simply a bunch of DataEndPoints (at least one for each kind of media, audio, video, etc.), each of which can be connected/disconnected to and from another (see Figure 1). A DataEndPoint comprises an incoming DataConsumer and an outgoing DataSource. When two CallEndPoints are connected, corresponding DataEndPoints are connected, which means that the DataCounsumer of DataEndPoint A gets connected to the DataSource of DataEndPoint B, and vice versa. If the source and consumer formats do not match, translators are inserted between them automatically.

Fig A
Figure 1. CallEndPoints

Message Flows

Since this article is venturing into some of the more abstract parts of YATE, it may help to look at a concrete example where modules exchange messages in order to set up a call (Figure 2):

Fig B
Figure 2. Message flow

What is going on here? There are two channel modules, SIP and ZAP, as well as a routing module. All three modules cooperate to handle an incoming call:

  1. An incoming call comes into SipChannel (1). To determine where to direct it, SipChannel sends a call.route message to RoutingModule (2).
  2. RoutingModule handles call.route (3) by mapping the called attribute(1234) to call target(zap/1).
  3. Now SipChannel creates a CallEndPoint for the incoming call and adds it to a new call.execute message, which it sends to ZapChannel (4). ZapChannel creates a new CallEndPoint and connects it to the CallEndPoint previously created by SipChannel. It then returns the message (5) and tries to call the destination. While waiting for the destination to answer, it may send call.ringing (6).
  4. When the callee answers the call, ZapChannel sends a call.answered message (7) to SipChannel. Once the call is set up, media data flows between sources and consumers. DTMF events may be sent in both directions (8)(9). When one of the participating channels detects a hangup (10), it disconnects its CallEndPoint. That results in a chan.disconnected and eventually a call.hangup message being sent by both channels (not pictured).

Modules

Besides SIP and ZAP, YATE has a few other modules that provide either VOIP or ISDN channels: h323, iax, and wanpipe. There are also pseudo channel modules that provide additional functions. For example, wave plays/records wave files; tone plays tones; moh plays "music on hold"; festival is used for doing text-to-speech (TTS) with the festival speech synthesis system; conference is used for conference calls; etc.

Apart from channel modules that handle media traffic, there are modules that handle other telephony tasks. For example, routing, cdr, and user authorization.

One module that makes YATE a great prototyping framework is extmodule. It implements the YATE external protocol, a text-based protocol that allows message exchange via tcp. I won't go into the protocol details here, as it is beyond the scope of this article, but there is an official YATE wiki page dedicated to extmodule. However, I will present examples written in Python to show how flexible YATE scripting via external protocol can be.

Yet Another YATE Python Module (YAYPM)

YAYPM is a Python external protocol client. Its API is built around one concept--Deferred--that comes from the Twisted framework (the Twisted project is very well-documented. Check out the Twisted home page for in-depth explanations of Deferred and its asynchronous programming model). For now, it is enough to say that Deferred is a promise to carry out some process when a specified event occurs. The process that executes when that event occurs is determined by success and failure callback functions. YAYPM allows you to create Deferreds that fire when specific YATE messages are delivered or sent to and from YATE.

A Trivial Example

Let's start by writing a toy IVR application that will echo DTMF events to the caller. YateClientProtocolFactory will create a TCPDispatcher instance and instruct it to start the route function upon successful connection to the YATE server:

f = TCPDispatcherFactory(route)
reactor.connectTCP("localhost", 5039, f)

The route function registers the on_route function as a handler for call.route messages whose called attribute is ivr:

def route(yate):
    def on_route(route):
        ...

    yate.onmsg("call.route",
        lambda m : m["called"] == "ivr").addCallback(on_route)

The onmsg method of a YateClientProtocol object creates Deferreds whose callbacks will be fired when a message comes from YATE:

def onmsg(self, name, guard = lambda _: True, until = None, autoreturn = False):

Where:

name
    The name of YATE message to fire Deferred on. 
guard
    One argument Boolean function that defines additional conditions
    that message must meet in order to trigger the Deferred.
until
    A program that calls a function typically does not want to wait
    for an event forever.  The until parameter allows one to specify
    another Deferred which, when fired, will cancel the wait.
autoreturn
    Kind of syntactic sugar--when set to true before firing. 
callbacks
    Message will be returned automatically.

So, for example:

...
d = yate.onmsg("call.route",
    lambda m: m["called"] == "ivr")
...

creates a Deferred that will fire its callback when the script receives call.route messages whose called attribute is set to ivr, and:

...
end = yate.onmsg("chan.hangup",
    lambda m : m["id"] == "SIP/1", autoreturn = True)

d = yate.onmsg("call.answered",
    lambda m : m["targetid"] == "SIP/1", until = end)
...

creates a Deferred that will wait for a call.answered message with targetid set to SIP/1 (which is the YATE channel's ID) until call termination, i.e., the arrival of a chan.hangup message with id set to SIP/1.

Now, let us look at the on_route handler. After telling the incoming call to connect to DumbChannel by returning the call.route message with dumb/, it registers a watch for call.execute that comes from the same channel as call.route. The difference between onwatch and onmsg is that a Deferred created with onwatch fires after some other handler performs the actual processing of the message. In this case, the dumbchan module will process the call.execute message. Then, since the script is only interested in the fact that somebody processed the call.execute, the onwatch Deferred will fire:

    def on_route(route):
        yate.ret(route, True, "dumb/")

        def on_execute(execute):
            ...

        execute = yate.onwatch("call.execute",
            lambda m : m["id"] == route["id"])
        execute.addCallback(on_execute)
        yate.onmsg("call.route").addCallback(on_route)

Since Deferreds are a one-shot deal, in the end, another Deferred for call.route must be created in order to handle another call in the future.

The on_execute function sends call.answered message to confirm that a call has been answered and connected to DumbChannel.

DumbChannel provides CallEndpoints that have empty DataEndpoints. This allows other modules to attach their data sources and consumers to it. The on_dtmf function uses this feature by attaching a wave file DataSource on every dtmf event:

        def on_execute(execute):
            yate.msg("call.answered",
                     {"id": execute["targetid"],
                      "targetid": execute["id"]}).enqueue()
            print "Call %s answered." % callid
            def on_dtmf(dtmf):
                print "Dtmf %s received." % dtmf["text"]
                yate.msg("chan.masquerade",
                    {"message" : "chan.attach",                    
                     "id": dtmf["targetid"],
                     "source": "wave/play/./sounds/digits/pl/%s.gsm" % \
                     dtmf["text"]}).enqueue()
                yate.onmsg("chan.dtmf",
                    lambda m : m["id"] == dtmf["id"]).addCallback(on_dtmf)
                dtmf.ret(True)
            dtmf = yate.onmsg("chan.dtmf",
                lambda m : m["id"] == execute["id"])
            dtmf.addCallback(on_dtmf)

The use of chan.masquerade above requires further explanation. In response to chan.masquerade, the receiving channel (the dumb channel, in this case) inserts its CallEndPoint and resends the message specified in the message attribute. chan.masquerade is a bit of a trick that allows you to manipulate native endpoints without actually touching them.

Flow

Although Deferreds provide a cleaner asynchronous programming model than bare event loops, programs composed of a large number of small callbacks are hard to read and write. The YAYPM flow module uses Python generators to connect callbacks of different Deferreds into a single control flow:

def doSomething():
    ...
    yield yate.onmsg("chan.dtmf", lambda m : m["id"] == callid)
    dtmf = flow.getResult()
    #do something with dtmf
    ...

The keyword yield turns doSomething into a generator. yielding a Deferred will suspend the generator's execution until the Deferred is triggered. Then the generator resumes execution and flow.getResult delivers the return value of the callback that was yielded. In the case of an errback being triggered, flow.getResult will raise an exception. To run this new flow generator, you must use flow.go. It returns a Deferred that is fired when the generator returns:

    ...
    yield go(doSomething())
    print getResult()
    ...

Sidenote

Using generators in this manner, as co-routines, is probably against generator designers' intentions. This will change as of Python version 2.5, when it will be possible to access return values when execution is resumed:

      ...
      dtmf = yield yate.onmsg(
          "chan.dtmf", lambda m : m["id"] == callid)
      ...

It will also remove the ugly global variable that is hidden below getResult.

Another Trivial Example

Let's look at the keyecho toy ivr again, this time written with flow. First, the route generator directs calls to ivr to a dumb channel and starts the ivr generator for them. Notice that the ivr generator is started and forgotten (using go()). This allows the route generator to proceed to the next call.route message without delay:

    ...
    def route(yate):
        while True:
        yield yate.onmsg("call.route", lambda m : m["called"] == "ivr")
        route = getResult()
        go(ivr(yate, route["id"]))
        route.ret(True, "dumb/")
    ...

The ivr generator first answers the call, then handles chan.dtmf messages in a while loop, which is a more readable construct than adding callbacks to Deferreds. In the previous keyecho example, we ignored hangup events. In this example, chan.hangup is properly handled by the AbandonedException handler:

...
def ivr(yate, callid):
    try:
        end = yate.onwatch("chan.hangup", lambda m : m["id"] == callid)

        yield yate.onwatch("call.execute",
            lambda m : m["id"] == callid,
            until = end)
        execute = getResult()

        targetid = execute["targetid"]

        yate.msg("call.answered",
                 {"id": targetid,
                  "targetid": callid}).enqueue()

        print "Call %s answered." % callid

        while True:
            yield yate.onmsg(
                "chan.dtmf",
                lambda m : m["id"] == callid,
                end)
            dtmf = getResult()

            print "Dtmf %s received." % dtmf["text"]

            yate.msg("chan.masquerade",
                {"message" : "chan.attach",
                 "id": targetid,
                 "source": "wave/play/./sounds/digits/pl/%s.gsm" % \
                 dtmf["text"]}).enqueue()

            dtmf.ret(True)

    except AbandonedException, e:
        print "Call %s abandoned." % callid

...

A Non-Trivial Example

Keyecho is only a toy. Now let's look at a more realistic "blind transfer" example.

It's beyond the scope of this article to describe how a transfer is initiated using touch-tones on the handset. (The examples directory in YAYPM includes a complete example.) However, once a transfer is finally initiated, a call.execute message containing the CallEndPoint to be transferred is dispatched. The module that handles the target extension will treat it as a normal incoming call, and will connect its CallEndPoint and initiate a call:

def blind_transfer(yate, callid, targetid, transferto, returnto):
    try:
        yate.msg(
            "chan.masquerade",
            {"message" : "call.execute",                
             "id": targetid, "callto": transferto}).enqueue()

At this point, one of three things can happen: the endpoint being transferred hangs up, the call initiated by the target extension goes unanswered, or the call initiated by the target is answered:

        end = yate.onmsg(
             "chan.hangup",
             lambda m : m["id"] == targetid,
            autoreturn = True)

        notanswered =  yate.onmsg(
            "chan.disconnected",
            lambda m : m["id"] == targetid,
            until = end)

        answered =  yate.onwatch(
            "call.answered",
            lambda m : m["targetid"] == targetid,
            until = end)

In the case of the unanswered call, YATE sends a chan.disconnected message. This signals the last chance to reconnect the CallEndPoint.

Next, we use the YAYPM XOR function to combine the answered and unanswered Deferreds into a single Deferred that will fire when either of the two fires and automatically cancel the other one. getResult() returns both the index of the Deferred that fired and the return value of that Deferred. In the following code sample, if getResult returns an index of 0, it means that answered fired. If it returns an index of 1, it means that notanswered fired:

        yield XOR(answered, notanswered)
        what, m = getResult()

Since chan.hangup was used as an until condition by both the answered and unanswered Deferreds, getResult will raise AbandonedException if the extension to be transferred hangs up.

If answered fires, we simply have to return the call.answered message that YATE sent:

        if what == 0:
            logger.debug("Blind transfer to: %s done" % transferto)
            m.ret(False)    
            return
        else:

If notanswered fires, the following code tries get the routing module to resolve the returnto extension in order to return the call to be transferred to the transfer initiator. If routing is possible (i.e., the route message is processed), we then try to execute the connection back to the initiator. If all goes well, the CallEndPoint being transferred is connected to CallEndPoint of the initiator, and we can then return the chan.disconnected message:

            logger.debug(
                "Blind transfer to: %s failed. Returning to %s" % \
                (transferto, returnto))

            route = yate.msg("call.route",
                             {"called": returnto},
                             until = end)
            yield route.dispatch()

            if not getResult():
                logger.debug("Can't return to: %s" % returnto)
                m.ret(False)
                return

            yate.msg("chan.masquerade",
                     {"message" : "call.execute",                
                      "id": m["id"],
                      "callto": route.getRetValue(),
                      "called": returnto}).enqueue()
            yate.ret(m, True)

Not much can be done in the case of a hangup:

    except AbandonedException, e:
        logger.debug(
            "Blind transfer to: %s failed. Peer has disconnected" % \
            transferto)

Summary

YATE has a very minimalistic architecture and exposes most of its features through an external protocol. Since this external protocol is a simple text protocol, it can be written quickly in any language. There are libraries for Perl, PHP, and Python. At the moment, YAYPM is the most advanced YATE connector library. YAYPM uses the Twisted framework and Python generators to allow programmers to write prototypes rapidly. Since Twisted is a huge protocol library, it is easy to mix telephony applications with protocols such as http, smtp, and sql.

Resources

YAYPM
YATE main site
YATE messages
Twisted main site
Twisted O'Reilly book

Maciek Kaminski graduated from Warsaw University with a Computer Science degree.


Return to O'Reilly Emerging Telephony.

Copyright © 2009 O'Reilly Media, Inc.