You are here

How to Use GenStage and WebSockets to Optimize Consumption of Rate-Limited APIs

For certain kinds of tasks it can be tricky to get around API rate limits. A prime example would be implementing an incremental search with a third-party API. Ed Ellison over at the Mint Digital blog shows you how you can do it with a little help from GenStage and WebSockets. 

The obvious way to handle incremental search is to send a query with every keystroke in the manner of autocomplete. If you’re rate-limited, however, this might be impossible. Ed suggests two alternatives: debounce, where you send a bunch of requests at a time,  or throttling, where you limit the number of requests in any time period. The downsides of these are that they take away that autocomplete experience that users like and they require waiting for a server response before doing anything else. 

Instead, we want to display results as the user types without worrying about hitting our rate limit. To this end, a good solution would know to wait to send a new request if the request per second limit had been reached, or would know to send several requests at the same time if no requests had been sent in the last few seconds. The benefits of this approach are that no calls are wasted, any banked calls we didn’t use, we can use later, and a banked call can be used immediately when a new user query comes in. 

Adding GenStage

Ed explains that you can create the required solution with GenStage, a system based on an Elixir server to handle processing chains in which an early stage in the chain is faster than a later receiving stage (i.e. the case we have here).  In this case, we’re going to create stages which act either as producers that emit events or consumers that consume events from producers. The stages we need here are: holding the latest user input, asking for the latest query at a certain rate, calling the API with the query. 

Under this setup, the browser sends all events to the Elixir server where the GenStage pipeline processes them. The pipeline consists of three modules: Query, a producer which receives user input and buffers or emits events; QueryRateLimiter, a consumer/producer, which sends a query every second to the Query module or receives an event from Query if it has one ready; finally, QueryRunner, a consumer that receives events from QueryRateLimiter and sends them off to the third-party API we’re querying. 

The promise of this setup is that it will limit the frequency of API queries in a more intelligent way than debounce or throttle. 

Introducing WebSockets

The downside of the above setup is that we need to make two jumps, one from the browser to the Elixir server, and then another from there to the API. The API response still needs to be sent back to the browser. We’ll manage this by connecting the GenStage pipeline with the browser via WebSockets. In this way, we’ll be able to process events in our own time and push the results back whenever they arrive. 

Final Implementation

To finish up, Ed goes thru the main modules he’s created. 

Query Module 

The Query module starts by calling ‘start_link(socket)’, which triggers ‘init/1’, which specifies that Query is a producer and that we only want to keep one event at a time in the buffer. When a query comes over the socket, we call ‘update/2’ with the pid of the process and the incoming query. This triggers ‘handle_cast/2’, which tries to dispatch the event to a subscribed consumer (in this case, QueryRateLimiter). 

Query promises to make sure that if events dispatched outpace demand, they’ll be placed in the buffer. If, on the other hand, demand outpaces events supply, this is buffered so that several future events can be sent in quick succession without buffering.

QueryRateLimiter Module

This module starts again by calling ‘start_link/1’, which triggers ‘init/1’. This creates a producer/consumer. ‘Handle_subscribe/4’ is called when it subscribes to the Query producer. This takes the producer pid and reference and stores them in the producer’s state. Next, ‘ask_and_schedule/2’ is called with the updated state and the new producer, which then returns a tuple, specifying that this module will ask for demand. This ensures we ask the producer for new events. ‘Handle_events/3’ takes care of the new events, emitting them unchanged. We then make use of an interval to send a message to ‘self()’ after the interval has passed. This message is received by ‘handle_info/2’ and then ‘ask_and_schedule/2’ is called again and we start the loop again. 

QueryRunner module

This module handles events as they arrive. When an incoming event has a query value, it’s sent off to the third-party API. When the results are returned, they’re sent over the WebSocket mentioned in the event. 

Frontend

Ed built the fronted with Elm. When the Elm program loads, it initializes a model, which stores the current query and any results received via the WebSocket. The code joins the ‘typeahead:public’ channel and creates a subscription that listens out for new results. When new results come in, the subscription triggers a Results update message, which decodes the received JSON and updates the model with the results. The change in the model leads to a change in the view. 

Conclusion

Ed concludes by stating that the resulting app is a good way to add state to a WebSockets connection to do some async work and keep your API calls within the rate limits. 

 

Original Article

Stateful WebSockets with Elixir’s GenStage

Wendell Santos
 

Comments