Wednesday, 18 November 2020

Fundis - FUNctional DIStributed processing

This is a technical post about a project I've been thinking about on-and-off for a while.  The idea is to use Redis as the middleware for a heterogeneous network of processing agents.  Clients would write requests for particular computations into Redis, servers would pick these up, do the computations and write the results back into Redis, from where the clients would pick them up.  I haven't yet written any code for this, I'm just trying to clarify the design at present.

This could be seen as a more ambitious successor to "Fundep" - a little library I developed while working at Bloomberg to connect and control a network of FUNctional DEPendencies within a single process.  "Fundis" aims to do a similar job, but working between multiple processes distributed over multiple machines.  

Background

This project was motivated by problems I faced as a software engineer at Bloomberg, from where I recently retired.  One might say that they are no longer my problems, but I found them interesting and spent quite a bit of time thinking about ways to overcome them, but never had the time to implement these ideas.  So I'm reluctant to just abandon all these thoughts, and would still like to implement them in the hope that they may be useful somewhere.

In the early days, all the financial analysis/trading/management/etc functionality which Bloomberg provides its users was driven by a few huge monolithic executables running on huge monolithic "big iron" machines.  This was actually pretty efficient, but very inflexible and hard to scale-up further.  So the trend has been to move specific chunks of functionality into lots of different executables, each providing specialised services, and distribute these across an increasing number of more cost-effective and energy-efficient "commodity" machines.  Bloomberg differs from other prominent providers of online services in that Google, Facebook, etc. provide a fairly narrow range of functionality on a very large scale, while Bloomberg provides thousands of different functions, each of interest to a different subset of their users.  (To give an extreme example, I once had to implement some special logic in one function which was only expected to be used by a single client, and then only a few times each year - however that client was the central bank of a country, so the work was considered justified.)  So the back-end of the Bloomberg system now consists of many thousands of different executables, each of which may be running tens or hundreds of instances spread over a cluster of machines dedicated to that particular area of functionality.  Responding to a single user click on a button may involve chains of calls to dozens of different services.

Clearly, efficient communication between all these services is critical to keep the whole system responsive.  I will not go into the details of how this works, it uses some proprietary protocols which always seemed rather heavyweight to me.  What is relevant is that there can be major mismatches between the speed of different operations, and updates that need to be fast can be held up waiting for operations that take significant time.  Sometimes this is simply unavoidable, but in many cases delays can be minimised by:

  • Caching results which may be needed more than once;
  • Pre-computing results which take time and are likely to be needed later;
  • Doing repeated similar operations in batches, thus saving communication and setup time;
  • Making requests to multiple services in parallel, as long as the operations are independent;
However these optimisations are often easier said than done.  The natural way to write code tends to lead to making requests for remote data/operations as and when they are needed.  Implementing any of the optimisations above requires some refactoring and more complex code, so it tends not to get done when the priority is to deliver a working system quickly.

  • Caching and pre-computing is sometimes done, e.g. with Redis, but extra code has to be written for this in each case. Note that caching within a client process or even one client machine is usually not ideal as the next request involving the same data may be served on a different machine.
  • Due to the volume and complexity of the existing code, which has often been updated by dozens of developers over tens of years, it can be quite hard to get enough of an overview to see clearly where batching is possible.  Similarly, it can be very difficult to trace the interdependencies between sub-computations to see when it is safe to re-order them or do them in parallel.
All these forms of optimisation depend on decoupling the sequencing of calling computationally expensive remote operations from the sequencing of the code which consumes their results.  So rather than making such remote calls directly from the consuming code, we need some infrastructure to manage these calls and store their results.  Instead of building such infrastructure in ad-hoc fashion for each use, it seems worthwhile to create a generic infrastructure which can manage many such uses.

Note that if the sequencing of calls can be changed by the infrastructure, it is essential that such re-ordering has no side-effects.  This implies that the remote calls must operate as pure functions, whose only action is to produce a result which depends only on their inputs.  However if the result is expected to vary over time, we can add a timestamp or version number parameter to make this explicit.

Also, it greatly simplifies the client code if it can just ask for what it wants without needing to specify which server its request should be routed to.  However in the Bloomberg environment the system for specifying how to route requests to the appropriate servers had become highly complicated, requiring considerable attention to configure and update correctly.  I believe it should be possible to manage this in a way which is both simpler and more dynamic.

Proposal


Redis is often used simply as a cache, but the Redis home page describes its uses more broadly as "database, cache and message broker".  I want to explore using Redis as a single integrated messaging and caching system which would handle all the communication between services in a distributed processing environment like that described above.

If we assume that processing results are going to be cached in Redis, we will need to have code to write and read input and output in the string key/value form which Redis supports.  The key here needs to include (or at least depend on) all the relevant inputs, otherwise we will get false hits.  So rather than re-encoding this same data in another format in order to call a remote service, we can use the Redis-compatible format to communicate with the remote service as well.  The procedure would be:
  1. Client formats the input parameters as a string that can be used as a Redis key.
  2. Client queries Redis for this key, if found client gets the data in string form and decodes it.
  3. If the key was not present in Redis, client requests it by writing the key to a Redis queue.  Note that the query and request (when needed) can be done atomically by sending a Lua script to be executed on the Redis server.
  4. Servers for this data will be monitoring this Redis queue, so one of them will pick up the requested key, do the necessary computation and write the input-key/output-value back to Redis.
  5. Client then reads the result from Redis as it would have done at step 2 if it was already available.

Data structure / granularity

For a first version, I would represent each function in Redis by one hash for the key/data pairs and one list for a queue of keys being requested.  In a later version I would hope to support more sophisticated, possibly hierarchical structures.  When a client wants the data for a key which has not yet been computed, it will RPUSH the key to the relevant queue.  Each server will monitor the queues for the functions they support with BLPOP - this ensures that each request will be processed by one and only one server.

Pre-computation

When a client knows that certain data is likely to be needed soon but not immediately, it could write requests for this data into a low-priority queue (represented as another Redis list).  When a server is idle and has no work waiting in the main (high priority) queue it would serve requests from the low-priority queue, writing those results into Redis so that when the client later needs them they are immediately available.

Common parameter data

Sometimes several different computations will require the same input data, e.g. info about a user such as full name, address, organisation, privileges, etc..  Rather than passing each of these parameters individually to each function which needs them, the client could write a "user" record into Redis with all this info and then just pass a single identifier which enables a server to find this info to each function called.

Scaling

Note that depending on load, not only could extra instances of specific servers be started or stopped on-the-fly, but even moved to different machines without needing any special routing configuration changes. 
Redis itself could become a bottleneck but if necessary multiple Redis instances could be used, along with some scheme for sharding data across instances.

Side benefits

This system has the side effect that requests to services and their replies are automatically recorded in Redis.  Retention times may need to be tuned depending on the storage space available.  But this data can then be inspected and monitored by other tools for debugging, testing, system health checks etc..

Next steps


If anyone finds this interesting or has feedback, please post a comment.  I hope to start prototyping this scheme soon, and will post any results here.
 
Update 17/5/24 - after a ridiculously long delay, I have now made a Tcl implementation of this idea, see https://wiki.tcl-lang.org/page/DisTcl+%2D+Distributed+Programming+Infrastructure+for+Tcl.

Wednesday, 4 November 2020

More Autumn Photos

Not much time to post anything as we are now in Family Lockdown No. 2.  Every Monday evening my son John goes to an activity group run by Resources for Autism which he enjoys.  But a week ago they emailed to say that one of their staff who was at this group had been confirmed to have coronavirus, so everyone who had attended the group, including John,  needs to self-isolate for two weeks from that Monday, i.e. until Monday 9th November.  Since it's difficult to do any real isolation while John is at his usual Supported Living house with a whole team of staff coming and going, my wife and I have been looking after him back at our family house for the last week.

I have been able to get out for a walk now and then, so here are some pictures taken in and around Hadley Wood (the actual wood that is, not the suburb where the Porches have stickers saying "My other car is a Bentley" 😄) :









Excavations in the Midden-Heap of my Memory

 And now for something completely different My secondary school was Morgan Academy, Dundee.  In my class there were two people who later bec...