Elasticsearch at Intercom

In the beginning there was MongoDB

Intercom allows our customers to store custom data about their customers. This data is incredibly free-form, and ranges from integer values to large tracts of text. MongoDB was our first solution to this. It worked well, but as Intercom scaled it caused issues with the UserList for some customers. We decided to keep MongoDB as the source of truth, and use Elasticsearch as our way of accessing the data. Here’s what we learnt as we started heavily using Elasticsearch.

The first steps

In late 2014, we started implementing the UserList in Elasticsearch, and by early 2015 it was in production. This was a huge improvement for our larger customers, and a slight improvement for our smaller customers. One of the major features this enabled was OR filtering, one of our most requested features.

The ingestion of data into Elasticsearch was the first big problem we hit. The provider we were using for MongoDB at this time did not provide a way to access the Oplog, so we could not ingest that way.

We ended up with the following: when a Rails callback was triggered for a save, create, etc. it would take the document from MongoDB, convert it to JSON, and send it to SQS. At the other end, Logstash would read from SQS, batch the updates, and send them to Elasticsearch. We kept this setup in place until recently.

Initial issues

There is no point migrating to a new service if we're presenting the wrong data to the customer. One of the major issues was trying to get an Elasticsearch mapping that would work correctly. After a large amount of cross-team work, we got a mapping that worked.

A worker was created that would check the Elasticsearch and MongoDB counts and reingest if they didn't match, as well as doing a full reingest of all user data from MongoDB to Elasticsearch every week. We believed this was a good way of mitigating the issues we had.

When we tried to model the MongoDB data in our Elasticsearch mapping, we made a mistake. We decided to create a new field for each piece of custom data as well as each type it could be. Each time a new piece of custom data was added the mapping would grow.

After several months, we noticed our Elasticsearch cluster was having long, blocking GC pauses. These resulted in Elasticsearch stalling, and at one point the cluster was unavailable for upwards of 1 hour a day, in 1 minute pauses. This was causing issues for our customers, as the UserList would take a long time to load, or in bad cases timeout and not load at all!

It was at this time we engaged Elastic support. We sent on diagnostics, and they informed us that the issue was caused by unbounded mapping growth. Our mapping had hit 150MB, and was growing. Each time it was updated it needed to be sent to every node in the cluster. We had 10 nodes, so this was 1.5GB of data being transferred every time.

Our solution was to create a mapping that would reuse existing fields. A table in MySQL that would map custom data from the customer’s app to a field in Elasticsearch. Now our mapping only grows when the customer with the largest number of custom data adds a new one.

More issues

After using our ingestion for nearly a year, we began to worry that we were having more data consistency problems than we had initially anticipated. We were dealing with a slow but steady number of reports from customers about how their data was mismatched in the UserList. As we had now moved MongoDB in-house, and had access to the MongoDB Oplogs, we decided to work on a new ingestion pipeline using the MongoDB Oplog as the source of truth.

This led us to creating Argus. Argus is a Go service that tails the MongoDB Oplog, batches the updates and sends them to two SQS queues. One queue is serviced by a fleet of workers that ingest the data into Elasticsearch, the other just performs a comparison of the documents in MongoDB and Elasticsearch.

Lately we have been suffering major cluster instability linked to shard size. Our original Elasticsearch infrastructure was a single index with 24 shards, an alias per customer's app, and routing based on the customer's ID. However, it did mean that we ended up with hotspotting. Elastic recommend that a shard does not exceed 30-50 GB; we had shards that were over 300 GB. Our solution was to split our index out into 10 indices, each with 24 shards.

We created a migration worker that would enable dual writing for a customer's app. All new updates would be ingested to the old and new indices, and while this happened a worker would get all the users for a customer's app and reindex them onto the new index. It would determine which shard would land on each index, and then find the smallest shard to better balance the data across the indexes.
Bar a few teething issues, this worked flawlessly, with none of our customers noticing.

The future

We have several plans to further improve and scale Elasticsearch in the future. Now that we have the ability to support multiple clusters, we are now thinking about sharding our customers on to different clusters. This allows us to: reduce the blast radius in case of issues, try different hardware configurations, and test different versions of Elasticsearch and its plugins.

Unlike routing and its subsequent hotspotting, our setup now means we can easily try remove routing for a subsection of our customer's apps and see if there is there is anything to gain from it. This becomes especially useful if we move to having a single index for a large customer, as we would like their data spread across the shards in the index.

Currently we do a lot of our work with Elasticsearch through a gem in our Rails app. As we move to a more service orientated architecture, it makes more and more sense for us to move a lot of talking to Elasticsearch to a standalone service that is language agnostic of our clients.