Colemak: 0 to 40 WPM in 40 Hours

On April 1st my first child was born and I started a wonderful month of paternity leave. Holding a sleeping infant leaves you with lots of sleepy hours where its (sometimes) possible to do repetitive tasks, so I decided to follow the 10% of my Automattic colleagues that are using either Dvorak or Colemak. My love of natural language processing led me to build word lists based on English word frequency and word/character frequency of my code and command line history.Colemak_layout_2 I chose Colemak over Dvorak because only 17 keys change location and most of those only move slightly. A lot of the key combinations that are ingrained from 15 years of using emacs are still pretty much the same. Standard commands like Cmd-Q, Cmd-W,Cmd-Z, Cmd-X, Cmd-C, and Cmd-V are all in the same places.

Why Would You Do This?

Well, needless to say, a layout designed in 1878 is probably not optimized for computers. Colemak was actually designed to place the most frequent letters right at your fingers. The fluidity is unnerving. There is very sparse evidence that you can type any faster with Colemak if you are already a great QWERTY touch typist. If you want to read more this StackOverflow thread is interesting. I also know and work with a lot of folks who don’t regret moving to either Colemak or Dvorak.

For myself, I was not a great touch typist. I knew the theory. But practicing typing was never something I did. Before I started Colemak I had a QWERTY typing speed of about 60 words per minute when copying text using TypeRacer. That’s about average. I don’t like being average. And I’ve never practiced typing code for speed. My most common three character sequence when coding is not ‘the’, it is ‘( $’… sigh PHP. I bet I can be faster with some practice.

So, if I’m going to try and get faster why not go all out? I make my living by tapping keys in a precise order. Why not learn a modern layout that has been well designed? I’ve also occasionally had pain in my hands, and my knuckles like to crack in ominous ways sometimes. Altogether, now seemed like a good time to give it a try.

And the most important reason: Never stop learning.

Learning Strategy

My strategy evolved over time, but this is where I ended up and what I would recommend.

  • This article made me think about typing as analogous to learning a musical instrument. Research has shown that learning music requires: “accurate, consistent repetition, while maintaining perfect technique”. In short, strive for accuracy and focus on the parts that you are not doing well at to improve.
  • Your brain needs time to process and learn. I had a habit of practicing Colemak for at least one minute each day. Some days I practiced for an hour, rarely longer.
  • Start out by learning the keyboard layout. I used The Typing Cat for about two hours over the course of a week.
  • Get a software program that can take arbitrary lists of words, and track and analyze where you are slow. I used Amphetype. Its not a great UI, but worked well enough. When practicing word lists practice the same three words in a row repeated three times before moving on to the next (the, of, and, the, of, and, the, of, and, to, in, a, …). This just felt like a good mix of repetition and mixing words to me. Your mileage may vary.
  • Then focus on practicing frequent English key sequences (or whatever your preferred language).
  • The top 5 bi-grams (the two-letter sequences ‘th’, ‘he’, ‘in’, ‘er’, and ‘an’) comprise 10% of all bi-grams. You should be extraordinarily fast and accurate at the top 30 bigrams.
  • Similarly get fast at 3-grams, 4-grams, and 5-grams. I built my lists from Peter Norvig’s analysis of the Google N-Gram Corpus.
  • Learn the most frequent words. Also from the N-Gram Corpus, the top 50 English words are about 40% of all words. Get fast at those, and you are well on your way.
  • When you are typing the above lists at 30+ WPM start practicing the top 500 words.
  • Along the way, focus on your mistakes. With Amphetype you can analyze the words and tri-grams that you make the most mistakes with. Build new lists based on these, slow down, and practice them till you are doing them perfectly. Speed will come. Focus on not needing to make corrections.
  • Rinse and repeat. Take breaks.
  • Go cold turkey and switch over completely. This was a lot easier because I was on leave from work. It wasn’t really until a month of practice that I completely switched. My QWERTY speed is now about as slow as Colemak because my brain is confused.
  • I’ve also moved on beyond simply English words and am practicing the 200 most common terms in my code, the 40 most common unix command terms, and the most common 3, 4, and 5 grams in my code.

All of my word lists are available in this Github project. There are also instructions for building your own lists. Writing this post was my trigger for cleaning up my lists so I can be more efficient at getting from 40 WPM to 80 WPM.

Analysis of Time Spent

I use RescueTime to track all of my time on my computer. In April I spent a total of 46 hours on my computer. Looking at only the time I spent where I was typing (rather than editing adorable photos of my daughter):

In the first six days of May as I slowly ramped back up at work I spent 26 hours on my computer with the keyboard layout entirely set to Colemak. About an hour of that time was spent practicing in Amphetype (still doing at least a minute of practice per day). Total time spent with Colemak has been about 47 hours, but I’m pretty sure I am undercounting how often I switched back to Qwerty for writing email in April. On May 6th I reached 41 WPM on TypeRacer for the first time.

Forty WPM is not very impressive, but it is noticeably more fluid and continuing to improve steadily. At this point it is good enough that I can return to work and be productive (if a little terse).

2014 In Review

Another year at Automattic, another 3 million emails sent to users to point them at their annual reports. This is my fourth year helping to send the annual reports, and as usual it makes for an exhausting December for everyone involved. But its pretty rewarding seeing how much people like them on Twitter. This year I’m particularly excited to see tweets from non-English users because we’ve expanded our translation of the reports from 6 to 16 languages.

Our designer wrote up a summary of how he thinks of our report:

A billboard shines upon the magical city of Anyville, USA and tells the tale of one remarkable blogger’s dedication and their achievements throughout the year that was. As 2015 edges closer the citizens of Anyville stop and look up in awe at the greatness that is you.

Its so incredible working with clever designers who know how to make data speak to users.

Here’s what I took out of the 100,000 views from 58,000 visitors my blog had in 2014:

  • Multi-lingual indexing is a pain point for a lot of Elasticsearch users (ourselves included). I’ve learned so much from the comments on that post.
  • Everyone really likes that two year old post about things I did wrong with ES. I should write another one. I’ve done lots of other things wrong since then. :)
  • I am very inconsistent about when I post. I should try writing shorter posts. Writing a series of posts was a good idea, but a bit overwhelming.
  • About half of my traffic is from search engines.
  • I get a lot of traffic from folks posting links to me on Stack Overflow. Makes me really happy to see my posts helping answer so many questions.

Click here to see the complete report.

Elasticsearch: The Broken Bits

A post like this carries a heavy risk of this:

So, some friendly caveats:

  • I wrote the first draft of this a bit after midnight in the middle of a week where we weathered 3 ES outages on two different clusters, none of which were the fault of ES, but all of which could have been much less painful.
  • We’re currently running ES 1.3.4. We’ve been using ES since 0.17, and it has gotten immensely better along the way. We couldn’t build many features without it.
  • I know many of these problems are being actively worked on, I’m publishing this in the hopes of providing more context to the series of github issues I’ve submitted.

With that out of the way, I think ES has some significant problems when it comes to operating and iterating on a large cluster with terabytes of data. I think these problems contribute to misunderstandings about ES and end up giving developers and sys admins a bad experience. I think many of the fixes (though not all) require rethinking default configurations rather than implementing complex algorithms.

Shard Recovery When a Node Fails

This sort of complaint about ES seems to come up regularly:

Reading the original issue, the confusion is around how the “elastic” part of ES moves shards around. I’ll quote:

  1. Cluster has 3 data nodes, A, B, and C. The index has a replica count of 1…
  2. A crashes. B takes over as master, and then starts transferring data to C as a new replica.
  3. B crashes. C is now master with an impartial dataset.
  4. There is a write to the index.
  5. A and B finally reboot, and they are told that they are now stale. Both A and B delete their local data.
  6. all the data A and B had which C never got is lost forever.

Two of three nodes failed back to back, and there were only 2 replicas of the data. ES devs correctly pointed out that you should expect to lose data. However, the user lost almost all of the existing data because the shard was moved from A to C. Almost all of the data existed on the disk of A, but it was deleted.

Elasticsearch is being too elastic. Its great to reshuffle data when nodes get added to a cluster. When a node fails though, reshuffling data is almost always a terrible idea. Let’s go through a timeline we see when an ES node fails in production:

  1. Cluster is happily processing millions of queries and index ops an hour…
  2. A node fails and drops out of the cluster
  3. The master allocates all the shards that were on that node to another node and starts recovery on those nodes.
  4. Meanwhile: alerts are fired to ops, pagers go off, people start fixing things.
  5. The shards being recovered start pulling terabytes of data over 1Gbit connections. Recovering all shards will take hours.
  6. Ops fixes whatever minor hardware problem occurred and the down node rejoins the cluster.
  7. All of the data on the down node gets tossed away. That node has nothing to do.
  8. Eventually all of the data is recovered.
  9. Slowly, shards are relocated to the down node
  10. About 10-16 hours later the cluster has finally recovered.

I’ve seen this happen at least a half dozen times. A single node going down triggers a half day event. Many nodes going down can take a day or two to recover.

There’s at least a couple of ways things can go very wrong because of the above flow:

  • Because it takes days to recover all the shards of your data, if there is another hardware failure (or human error) during this time then you are at risk of not having enough redundancy of your data. As it currently stands eventually we’ll catch the triple event within a week that will cause us to lose data.
  • Moving that much data across the network puts a huge strain on your infrastructure. You can easily cause other problems when there is already something going wrong. A router going down and bringing down part of your ES cluster is exactly the wrong time to move around lots of data and further increase your network load.
  • When a node fails its shards get allocated elsewhere. Very often this means that if enough nodes fail at once, then you can end up running out of disk space. When I’ve got 99 shards and very little disk space on a node why the %&#^ are you trying to put 40 more on that node? I’ve watched in slow motion horror as my disk space shrinks as I try and delete shards to force them onto other nodes with very little success.
  • Sometimes you won’t have enough replicas – oops, you made a mistake – but losing data does not need to be a binary event. Losing 5% of your data is not the same as losing 95%. Yes, you need to reindex in either case, but for many search applications you can work off a slightly stale index and users will never notice while you rebuild a new one.
  • Because a down node triggers moving shards around, you can’t just do a restart of your cluster, you need a procedure to restart your cluster. In our case that’s evolved into a 441 line bash script!

I could complain about this problem for a while, but you get the idea.

Why don’t we just recover the local data? By default ES should not move shards around when a node goes down. If something is wrong with the cluster, why take actions that will make the situation worse? I’ve slowly come to the conclusion that moving data around is the wrong approach.

Cluster Restart Times

Actual IRC transcript between me and our head of systems:

gibrown – restarting ES cluster
gibrown – will take three days, cluster will be yellow during that time
bazza – maybe we should make `deploy wpcom` take 3 days :)
bazza – then we could call ourselves Elastic WordPress!

Ouch… that stung. At we do about 100-200 deploys a day and each updates thousands of servers in less than 20 seconds. We optimize for iteration speed and fixing problems quickly because performance problems in particular are hard to predict. Elasticsearch development in comparison has been excruciatingly slow.

We recently did an upgrade from 1.2.1 to 1.3.4. It took 6 or 7 days to complete because ES replicas do not stay in sync with each other. As real time indexing continues the replicas each do their own indexing so when a node get restarted its shard recovery requires moving all of the modified segments from another node. After bulk indexing, this means moving your entire data set across the network once for each replica you have.

Recently, we’ve been expanding our indexing from the 800 mil docs we had to about 5 billion. I am not exaggerating when I say that we’ve spent at least two months out of the last twelve waiting for ES to restart after upgrading versions, changing our indexing, or trying to improve our configuration.

Resyncing across the network completely kills our productivity and how fast we can iterate. I’d go so far as to say it is our fundamental limitation to using ES in more applications. It feels like this syncing should at least partially happen in the backgorund whenever the cluster is green.

Field Data and Heap

So you want to sort 1000 of your 5 billion documents by date? Sure we can do that, just hold on a second while I load all 5 billion dates into memory…

I didn’t fully appreciate this limitation until just recently. Field data has been a thorn in ES developers sides from the beginning. It is the single easiest way to crash your cluster, and I bet everyone has done it. The new-ish breakers help a lot, but they still don’t let me do what I’m trying to do. I don’t need 5 billion dates to sort 1000 docs, I only need 1000.

I shouldn’t complain too loudly about this, because there is actually a solution! doc_values will store your fields on disk so that you don’t need to load all values into memory, only those that you need. They are a little slower, but slower is far better than non-functional. I think they should be the default.


Elasticsearch has evolved a lot over the past years, I think its time to rethink a number of default settings to improve the configuration when dealing with large amounts of data. A lot of this comes down to thinking about the user experience of Elasticsearch. A lot of thought has clearly been put into the user experience for a new user just starting out. That’s what makes ES so addictive. I think some more thought could be put into the experience of sys admins and developers who are iterating on large data sets.

Scaling Elasticsearch Part 3: Queries

See part 1 and part 2 for an overview of our system and how we scale our indexing. Originally I was planning a separate post for global queries and related posts queries, but it was hard to break into two posts and contributed to me taking forever to write them.

Two types of queries run on the Elasticsearch index: global queries across all posts and local queries that search posts within a single blog. Over 90% of our queries (23 million/day) are local, and the remainder are global (2 million/day).

In this post we’ll show some performance data for global and local queries, and discuss the tradeoffs we made to improve their performance.

An Aside about Query Testing

We used JMeter for most of our query testing. Since we are comparing global and local queries, it was important to get a variety of different queries to run and to run them on mostly full indices (though sometimes ongoing development made that impractical).

Generally we ran real user queries randomly sampled from our search logs. For local queries we were running related posts type queries with the posts pseudo-randomly selected from the index. There had to be a fair bit of hand curation of the queries due to errors in our sampling techniques and the occasional bad query type that would get through. Generating a good mix of queries is a combination of art and deciding that things are good enough.

A Few Mapping Details

I’ve already written a post on how we map WordPress posts to ES documents (and wpes-lib is on github), and there’s a description of how we handle multi-lingual data. Mostly those details are irrelevant for scaling our queries, but there are two important details:

  1. All post documents are child documents of blog documents. The blog documents record meta information about the blog.
  2. Child documents are always stored on the same shard as their parent document. They all use the same routing value. This provides us with a huge optimization. Local searches can use routing on the query so it gets executed on a single node and accesses only a single shard.

The Global Query

The original plan with our global queries was to use the parent blog documents to keep track of some meta information about the blogs to determine whether a blog’s posts were globally searchable (whether they were publicly accessible, had mature content, spam, etc). By using the blog document we’d be able to update a single document and not having to reindex all the posts on the blog. Then we’d use a has_parent filter on all global queries.

Unfortunately we found that parent/child filters did not scale well for our purposes. All parent documents ids had to be loaded into memory (about 5.3 GB of data per node for our 60 million blog documents). We had plenty of RAM available for ES (30 GB) and the id cache seemed to hold steady, but we still saw much slower queries and higher server load when using the has_parent filter. Here’s the data for the final parent/child performance test we ran (note this was tested on a 16 node cluster):

Using has_parent JMeter threads Reqs/Sec Median Latency(ms) Mean Latency(ms) Max Latency(ms) CPU % Load
yes 2 5.3 538 365 2400 45 12
no 2 10 338 190 1500 30 8
yes 4 7.6 503 514 2500 75 22
no 4 17.5 207 222 2100 50 15

Avoiding using the has_parent filter saved us a lot of servers and gave us a lot more overhead, even though it means we have to bulk reindex posts more often.

In the end our global query looks something like:

POST global-*/post/_search
   "query": {
      "filtered": {
         "query": {
            "multi_match": {
              "fields": ["content","title"],
              "query": "Can I haz query?"
         "filter": {
            "and": [
                  "term": {
                     "lang": "en"
                  "and": [
                       "term": {
                          "spam": false
                       "term": {
                          "private": false
                       "term": {
                          "mature": false

One last side note on ES global queries because it cannot be said strongly enough: FILTERS ARE EPIC. If you don’t understand why I am shouting this at you, then you need to go read about and understand Bitsets and how filters are cached.

Sidenote: after re-reading that post I realized we may be able to improve our performance a bit by switching from using AND filters shown above to bool filters. This is why blogging is good. This change cut our median query time in half:

Global Query Performance With Increasing Numbers of Shards

Global queries require gathering results from all shards and then combining the results to give the final result. A search for 10 results across 10 shards requires running the query on each of the shards, and then combining those 100 results to get the final 10 results. More shards means more processing across the cluster, but also more results that need to be combined. This gets even more interesting when you start paging results. To get results 90-100 of a search across ten shards requires requiring combining 1000 results to satisfy the search.

We did some testing of our query performance across a few million blogs as we varied the number of shards using a fixed number of JMeter threads.

Shards/Index Requests/sec Median(ms) Mean(ms)
5 320 191 328
10 320 216 291
25 289 345 332
50 183 467 531

There’s a pretty clear relationship between query latency and number of shards. This result pushed us to try and minimize the total number of shards in our cluster. We can probably also improve query performance by increasing our replication.

The Local Query

Most of our local queries are used for finding related posts. We run a combination of mlt_query queries and multi_match queries to send the text of the current post and find posts that are similar. For a post with the title “The Best”, and content of “This is the best post in the world” the query would look like:

POST global-0m-10m/post/_search?routing=12345
 "query": {
   "filtered": {
     "query": {
       "multi_match": {
         "query": "The Best This is the best post in the world.",
         "fields": ["mlt_content"]
     "filter": {
       "and": [
           "term": {
             "blog_id": 12345
           "not": {
             "term": {
               "post_id": 3

Looks simple, but there’s a lot of interesting optimizations to discuss.


All of our local queries use the search routing parameter to limit the search to a single shard. Organizing our indices and shards so that an entire blog will always be in a single shard is one of the most important optimizations in our system. Without it we would not be able to scale and handle millions of queries because we would be wasting a lot of cycles searching shards that had no or very few documents that were actually related to our search.

Query Type

In the above example, we use the multi_match query. If the content was longer (more than 100 words) we would use the mlt_query. We originally started out with all queries using mlt_query to make the queries faster and ensure good relevancy. However, using mlt_query does not guarantee that the resulting query will actually have any terms in the final search query. A lot will depend on the frequency of the terms in the post and their frequency in the index. Changing to using multi_match for short content gave us a big improvement in the relevancy of our results (as measured by click through rate).


We started building related posts by using the MLT API for running the query. In that case we would only send the document id to ES and trust ES to get the post, analyze it, and construct the appropriate mlt_query for it. This worked pretty well, but did not scale as well as building our own query on the client. Switching to mlt_query gave us a 10x improvement in number of requests we could handle and reduced the query response time.

Operation Requests/sec Median(ms) Mean(ms)
MLT API 150 270 306
mlt_query 1200 77 167

From what I could tell the big bottleneck was getting the original document. Of course this change moved a lot of processing off of the ES cluster and onto the web hosts, but web hosts are easier to scale than ES nodes (or at least is a problem our systems team is very good at).

mlt_content Field

The query is going to a single field called mlt_content rather than to separate title and content fields. Searching fewer fields gives us a significant performance boost, and helps us search for words that occur in different fields in different posts. The fairly new multi_match cross_fields option could probably help here, but I assume would not be as performant as a single field.

For a while we were also storing the mlt_content field, but recent work has determined that storing the field did not speed up the mlt_query queries.

The history of how we ended up using mlt_content is also instructive. We started using the mlt_content field and storing it while we were still using the MLT API. Originally we were using the post title and content fields which were getting extracted from the document’s _source. Switching to a stored mlt_content field reduced the average time to get a document before building the query from about 500ms to 100ms. In the end this turned out to not be enough of a performance boost for our application, but is worth looking into for anyone using the MLT API.

Improving Relevancy with Rescoring

We’ve run a couple of tests to improve the relevancy of our related posts. Our strategy here has mostly been to use the mlt_query/multi_match queries as our basic query and then use rescoring to adjust the query results. For instance we build a query that slightly reranks our top 50 results based on the commonality between who has liked the current post and whether they liked the posts that were similar. Running this using the rescoring option had almost no impact on query performance and yet gave a nice bump to the click through rate of our related posts.

Shard Size and Local Query performance

While global queries perform best with a small number of  larger shards, local queries are fastest when the shard is smaller. Here’s some data we collected comparing the number of shards to routed query speed when running mlt_query searches:

Shards/Index Requests/sec Median(ms) Mean(ms) Max(ms)
10 1200 77 167 5900
30 1600 67 112 1100

Less data in each shard (more total shards) has a very significant impact on the number of slow queries and how slow they are.

Final Trade Off of Shard Size and Query Performance

Based off the global and local query results above we decided to go with 25 shards per index as a tradeoff between decent performance for both query types. This was one of the trickier decisions, but it worked reasonably well for us for a while. About 6 months after making this decision though we decided that we were ending up with too many slow queries (queries taking longer than 1 second).

I guess that’s my cue to tease the next post in this (possibly) never-ending series: rebuilding our indices to add 6-7x as many documents while reducing slow queries and speeding up all queries in general. We went from the 99th percentile of our queries taking 1.7 seconds down to 800ms and the median time dropping from 180ms to 50ms. All these improvements while going from 800 million docs in our cluster to 5.5 billion docs. Details coming up in my next post.

Presentation: Elasticsearch at Automattic

I gave a presentation at the Elasticsearch Denver meetup last night covering some of the bigger changes we had to make to scale a cluster to handle all posts. This is a cherry picked list from my more comprehensive ongoing series of posts chronicling our experience scaling Elasticsearch.

Presentation slides don’t always translate well to the web since there is a lot said in person that is not on the slides. Hopefully they’ll still be useful for the broader points I covered.


Elasticsearch, Open Source, and the Future

This essay started as a response to a comment on my multilingual indexing post. The comment is mostly an advertisement, but brings up some interesting points so I decided to publish it and turn my response into a full post.

For some context here’s the key part of the comment:

I thought readers might be interested in Rosette Search Essentials for Elasticsearch, from Basis Technologies, which we launched last night at hack/reduce in Cambridge, MA. It’s a plugin that does three neat things that improve multilingual search quality:

– Intelligent CJKT tokenization/segmentation
– Lemmatization: performs morphological analysis to find the “lemma” or dictionary form of the words in your documents, which is far superior to stemming.
– Decompounding: languages like German contain compound words that don’t always make great index terms. We break these up into their constituents so you can index them too.

There are many areas where Elasticsearch could benefit from better handling of multi-lingual text. And the NLP geek in me would love to see ES get some more modern Natural Language Processing techniques – such as these – applied within it.

Unfortunately I’m a lot less excited about this system because it is closed source, limiting its impact on the overall ES ecosystem. I love that Basis Technologies has whitepapers and it seems to be doing great evaluation of their system, but even the whitepapers require registering with them. This just seems silly.

Search engine technology has been around for a while, a big part of Elasticsearch’s success is due to it being open source. And particularly due to Lucene being an open source, collaborative effort over the past 15 years. I think ES has the potential to become a phenomenal NLP platform over the next five years, bringing many amazing NLP technologies coming out of academia to a massive number of developers. NLP researchers have done tremendous science in an open and collaborative manner. We should work to scale that technology in open and collaborative ways as well.

Building a platform on closed source solutions is not sustainable.

Humans express their dreams, opinions, and ideas in hundreds of languages. Bridging that gap between humans and computers – and ultimately between humans – is a noble endeavor that will subtly shape the next century. I’d like to see Elasticsearch be a force in democratizing the use of natural language processing and machine learning. These methods will impact how we understand the world, how we communicate with each other, and ultimately our democracy. We should not build that future on licensing that explicitly prevents citizens of some countries from participating.

I recognize that working for a successful open source company makes me luckily immune to certain the pressures of business, investors, and government contracts. But I have seen the unfortunate cycle of cool NLP technology getting trapped within a closed source company and eventually being completely shut down. I’ve recently been reading “The Theory That Would Not Die”. Where would we be now if the efficacy of Bayesian probability had not been locked inside classified government organizations for 40 years after World War II?

Basis Technologies has been around a long time, and has far more NLP talent and experience than I do, but the popularity of my very brief multi-lingual post tells me that there is also huge opportunity for the community to improve the multi-lingual capabilities of Elasticsearch. A company leading the way could build a strong business providing all the support that inevitably will be needed when dealing with multiple languages. I’d be happy to talk with anyone about how’s rather large set of multi-lingual data could help in such an endeavor. I bet other organizations that would be interested also.

Scaling Elasticsearch Part 2: Indexing

In part 1 I gave an overview of our cluster configuration. In this part we’ll dig into:

  • How our data is partitioned into indices to scale over time
  • Optimizing bulk indexing
  • Scaling real time indexing
  • How we manage indexing failures and downtime.

The details of our document mappings are mostly irrelevant for our indexing scaling discussion, so we’ll skip them until part 3.

Data Partitioning

Since data is constantly growing we need an indexing structure that can grow over time as well. A well-known limitation of ES is that once an index is created you cannot change the number of shards. The common solution to this problem is to recognize that searching across an index with 10 shards is identical to searching across 10 indices with 1 shard each, and indices can be created at will.

In our case we create one index for every 10 million blogs, with 25 shards per index. We use index templates so that as our system tries to index to a non-existent index the index is created dynamically.

There are a few factors that led to our index and sharding sizes:

  1. Uniform shard sizes: Shards should be of similar sizes so that you get mostly uniform response times. Larger shards take longer to query. We tried one index per 1 million blogs and found too much variation. For instance, when we migrated Microsoft’s LiveSpaces to we got a million or so blogs added to our DBs in a row created and they have remained pretty active. This variation drove us to put many blogs into each index. We rely on the hashing algorithm to spread the blogs across all the shards in the index.
  2. Limit number of shards per index: New shards are not instantly created when a new index is created. Technically, I guess, the cluster state is red for a very short period. At one point we tested 200 shards per index. In those cases we sometimes saw a few document indexing failures in our real time indexing because the primary shards were still being allocated across the cluster. There’s probably some other ways around this, but its something to look out for.
  3. Upper limit of shard size: Early on we tried indexing 10 million blogs per index with only 5 shards per index. Our initial testing went well, but then we found that the indices with the larger shards (the older blogs) were experiencing much longer query latencies. This was us starting to hit the upper limits of our shard sizes. The upper limit on shard size varies by what kind of data you have and is difficult to predict so it’s not surprising that we hit it.
  4. Minimize total number of shards: We’ll discuss this further in our next post on global queries, but as the number of shards increases the efficiency of the search decreases, so reducing the number of shards helps make global queries faster.

Like all fun engineering problems, there is no easy or obvious answer and the solution comes by guessing, testing, and eventually deciding that things are good enough. In our case we figured out that our maximum shard size was around 30 GB. We then created shards that were fairly large but which we don’t think would be able to grow to that maximum for many years.

As I’m writing this, and after a few months in production, we’re actually wondering if our shards are still too large. We didn’t take into account that deleted documents would also negatively affect shard size, and every time we reindex or update a document we effectively delete the old version. Investigation into this is still ongoing, so I’m not going to try to go into the details. The number of deleted documents in your shards is related to how much real time indexing you are doing, and the merge policy settings.

Bulk Indexing Practicalities

Bulk indexing speed is a major limit in how quickly we can iterate during development, and indexing will probably be one of our limiting factors in launching new features in the future. Faster bulk indexing means faster iteration time, more testing of different shard/index configurations, and more testing of query scaling.

For these reason we ended up putting a lot of effort into speeding up our bulk indexing. We went from bulk indexing taking about two months (estimated, we never actually ran this) to taking less than a week. Improving bulk indexing speed is very iterative and application specific. There are some obvious things to pay attention to like using the bulk indexing API, and testing different numbers of docs in each bulk API request. There were also a few things that surprised me:

  • Infrastructure Load: ES Indexing puts a heavy load on certain parts of the infrastructure because it pulls data from so many places. In the end, our indexing bottleneck is not ES itself, but actually other pieces of our infrastructure. I suppose we could throw more infrastructure at the problem, but that’s a trade off between how often you are going to bulk reindex vs how much that infrastructure will cost.
  • Extreme Corner Cases: For instance has millions of followers, likers, and lots of commenters. Building a list of these (and keeping it up to date with real time indexing) can be very costly – like, “oh $%#@ are we really trying to index a list of 2.3 million ids” costly (and then update it every few seconds).
  • Selective Bulk Indexing: Adding fields to the index requires updating the mappings and bulk reindexing all of our data. Having a way to selectively bulk index (find all blogs of a particular type) can speed up bulk indexing a lot.
  • Cluster Restarts: After bulk indexing we need to do a full rolling restart of the cluster.

I wish we had spent more time finding and implementing bulk indexing optimizations earlier in the project.

Scaling Real Time Indexing

During normal operation, our rate of indexing (20m+ document changes a day) has never really been a problem for our Elasticsearch cluster. Our real time indexing problems have mostly stemmed from combining so many pieces of information into each document that gathering the data can be a high load on our database tables.

Creating the correct triggers to detect when a user has changed a field is often non-trivial to implement in a way that won’t over index documents. There are posts that get new comments or likes every second. Rebuilding the entire document in those cases is a complete waste of time. We make heavy use of the Update API mostly to reduce the load of recreating ES documents from scratch.

The other times when real time indexing became a problem was when something went wrong with the cluster. A couple of examples:

  • Occasionally when shards are getting relocated or initialized on a number of nodes the network can get swamped which backs up the indexing jobs or can cause a high proportion of them to start failing.
  • Query load can become too high if a non-performant query is released into production.
  • We make mistakes (shocking!) and break a few servers.
  • Occasionally we find ES bugs in production. Particularly these have been around deleteByQuery, probably because we run a lot of them.
  • A router or server fails.

Real time indexing couples other portions of our infrastructure to our indexing. If indexing gets slowed down for some reason we can create a heavy load on our DBs, or on our jobs system that runs the ES indexing (and a lot of other, more important things).

In my opinion, scaling real time indexing comes down to two pieces:

  1. How do we manage downtime and performance problems in Elasticsearch and decouple it from our other systems.
  2. When indexing fails (which it will), how do we recover and avoid bulk indexing the whole data set.

Managing Downtime

We mentioned in the first post of this series that we mark ES nodes as down if we receive particular errors from them (such as a connection error). Naturally, if a node is down, then the system has less capacity for handling indexing operations. The more nodes that are down the less indexing we can handle.

We implemented some simple heuristics to reduce the indexing load when we start to detect that a server has been down for more than a few minutes. Once triggered, we queue certain indexing jobs for later processing by just storing the blog IDs in a DB table. The longer any nodes are down, the fewer types of jobs we allow. As soon as any problems are found we disable bulk indexing of entire blogs. If problems persist for more than 5 minutes we start to disable reindexing of entire posts, and eventually we also turn off any updating of documents or deletions.

Before implementing this indexing delay mechanism we had some cases where the real time indexing overwhelmed our system. Since implementing it we haven’t seen any, and we actually smoothly weathered a failure of one of the ES network routers while maintaining our full query load.

We of course also have some switches we can throw if we want to completely disable indexing and push all blog ids that need to be reindexed into our indexing queue.

Managing Indexing Failures

Managing failures means you need to define your goals for indexing:

  • Eventually ES will have the same data as the canonical data.
  • Minimize needing to bulk re-index everything.
  • Under normal operation the index is up to date within a minute.

There are a couple of different ways our indexing can fail:

  1. An individual indexing job crashes (eg an out of memory error when we try to index 2.3 million ids :) ).
  2. An individual indexing job gets an error trying to index to ES.
  3. Our heuristics delay indexing by adding it to a queue
  4. We introduce a bug to our indexing that affects a small subset of posts.
  5. We turn off real time indexing.

We have a few mechanisms that deal with these different problems.

  • Problems 1, 3, and 5: The previously mentioned indexing queue. This lets us pick up the pieces after any failure and prevent bulk reindexing everything.
  • Problem 2: When indexing jobs fail they are retried using an exponential back off mechanism (5 min, 10 min, 20 min, 40 min, …).
  • Problem 4: We run scrolling queries against the index to find the set of blogs that would have been affected by a bug, and bulk index only those blogs.

It’s All About the Failures and Iteration

Looking back on what I wish we had done better, I think a recognition that indexing is all about handling the error conditions would have been the best advice I could have gotten. Getting systems for handling failures and for faster bulk indexing in place sooner would have helped a lot.

In the next part of the series I’ll talk about query performance and balancing global and local queries.

Scaling Elasticsearch Part 1: Overview

We recently launched Related Posts across, so its time to pop the hood and take a look at what ended up in our engine.

There’s a lot of good information spread across the web on how to use Elasticsearch, but I haven’t seen too many detailed discussions of what it looks like to scale an ES cluster for a large application. Being an open source company means we get to talk about these details. Keep in mind though that scaling is very dependent on the end application so not all of our methods will be applicable for you.

I’m going to spread our scaling story out over four posts:

Scale is in the top 20 sites on the Internet. We host very big clients all the way down the long tail where this blog resides. The primary project goal was to provide related posts on all of those page views (14 billion a month and growing). We also needed to replace our aging search engine that was running for global searches, and we have plans for many more applications in the future.

I’m only going to focus on the related posts queries (searches within a single blog) and the global queries (searches across all blogs). They illustrate some really nice features of ES.

Currently every day we average:

  • 23m queries for related posts within a single shard
  • 2m global queries across all shards
  • 13m docs indexed
  • 10m docs updated
  • 2.5m docs deleted
  • 250k delete by query operations

Our index has about 600 million documents in it, with 1.5 million added every day. Including replication there is about 9 TB of data.

System Overview

We mostly rely on the default ES settings, but there are a few key places where we override them. I’m going to leave the details of how we partition data to the post on indexing.

  • The system is running 0.90.8.
  • We have a single cluster spread across 3 data centers. There are risks with doing this. You need low latency links, and some longer timeouts to prevent communication problems within the cluster. Even with very good links between DCs local nodes are still 10x faster to reach than remote nodes:
discovery.zen.fd.ping_interval: 15s
discovery.zen.fd.ping_timeout: 60s
discovery.zen.fd.ping_retries: 5

This also helps prevent nodes from being booted if they experience long GC cycles.

  • 10 data nodes and one dedicated master node in each DC (30 data nodes total)
  • Currently 175 shards with 3 copies of each shard.
  • Disable multicast, and only list the master nodes in the unicast list. Don’t waste time pinging servers that can’t be masters.
  • Dedicated hardware (96GB RAM, SSDs, fast CPUs) – CPUs definitely seem to the bottleneck that drives us to add more nodes.
  • ES is configured to use 30GB of the RAM with indices.fielddata.cache.size set to 20GB. This is still not ideal, but better than our previous setting of index.fielddata.cache: soft. 1.0 is going to have an interesting new feature that applies a “circuit breaker” to try and detect and squash out of memory problems. Elasticsearch has come a long way with preventing OutOfMemory exceptions, but I still have painful memories from when we were running into them fairly often in 0.18 and 0.19.
  • Use shard allocation awareness to spread replicas across data centers and within data centers.
cluster.routing.allocation.awareness.attributes: dc, parity

dc is the name of the data center, parity we set to 0 or 1 based on the es host number. Even hosts are on one router and odd on another.

  • We use fixed size thread pools, very deep for indexing because we don’t want to lose index operations, much shorter for search and get since if an operation has been queued that deeply the client will probably time out by the time it gets a response.
    type: fixed
    size: 30
    queue_size: 1000
    type: fixed
    size: 30
    queue_size: 1000
    type: fixed
    size: 100
    queue_size: 200
    type: fixed
    size: 100
    queue_size: 200
  • Elastica PHP Client. I’ll add the disclaimer that I think this client is slightly over object-oriented which makes it pretty big. We use it mostly because it has great error handling, and we mostly just set raw queries rather than building a sequence of objects. This limits how much of the code we have to load because we use some PHP autoloading magic:
//Autoloader for Elastica interface client to ElasticSearch
function elastica_autoload( $class_name ) {
	if ( 'Elastica' === substr( $class_name, 0, strlen( 'Elastica' ) ) ) {
		$path = str_replace( '\\', '/', $class_name );
		include_once dirname( __FILE__ ) . '/Elastica/lib/' . $path . '.php';
spl_autoload_register( 'elastica_autoload' );
  • Custom round robin ES node selection built by extending the Elastica client. We track stats on errors, number of operations, and mark nodes as down for a few minutes if certain errors occur (eg connection errors).
  • Some customizations for node recovery. See my post on speeding up restart time.

Well, that’s not so hard…

If you’ve worked with Elasticsearch a bit then most of those settings probably seem fairly expected.

Elasticsearch scaling is deceptively easy when you first start out. Before building this cluster we ran a two node cluster for over a year and were indexing a few hundred very active sites. We threw more and more data at that small cluster, and more and more queries, and the cluster hardly blinked. It has about 50 million documents on it now, and gets 1-2 million queries a day.

It is still pretty shocking to me how much harder it was to scale a 10-100x larger cluster. There is a substantial and humbling difference between running a small cluster with 10s of GB vs a cluster with 9 TB of data.

In the next part in this series I’ll talk about how we overcame the indexing problems we ran into: how our index structure scales over time, our optimizations for real time indexing, and how we handle indexing failures.

2013 in review

My personal blogging goal for this past year was to actually have an Annual Report that I would not be ashamed to look at. Goal Achieved!

A pretty large part of my traffic is from search engines for some phrase with the word “Elasticsearch” in it, but I also surpassed 200 followers recently. I only managed to publish 9 times this year though. Will have to work on that.

You can check out the full report below.

Here’s an excerpt:

The concert hall at the Sydney Opera House holds 2,700 people. This blog was viewed about 24,000 times in 2013. If it were a concert at Sydney Opera House, it would take about 9 sold-out performances for that many people to see it.

Click here to see the complete report.

Managing Elasticsearch Cluster Restart Time

While building a fairly large index (8TB total for 500 million docs), I ran into some very long restart times for the cluster. That prompted me to start a discussion about long restart times. There’s some good discussion in that thread, and I wanted to write a post to summarize what we are doing to deal with long restart times.

By “long restart times”, I don’t mean that Elasticsearch didn’t start up quickly, but rather it spent a very long time recovering shards. In my logs I would see messages such as:

recovered_files [399] with total_size of [42.2gb], took [12.4m], throttling_wait [0s]#012         : reusing_files   [0] with total_size of [0b]

All of the data for a 42 GB shard was being recovered from one of the peer nodes rather than from the local disk.

In that ES user group thread, Zachary Tong has a good example and description of why Elasticsearch nodes can have such long restart times. The key point is:

The segment creation and merging process is not deterministic between nodes.

This means that as indexing occurs the segments for the same shard on different nodes will necessarily diverge from each other. When shard recovery occurs (such as during a restart) the segments that are different will need to be copied over the network rather than recovered from the disk.

We’ve put in place a couple of practices to try and minimize how much impact these slow restarts have on us.

First, some background on our system:

  • 500 million documents in 175 shards (about 8TB including replication)
  • 1.5 million new docs a day.
  • Heavy reindexing/updating of newer documents running 24/7, but 99% of older documents never change.
  • Bulk indexing is only ever performed when adding new fields to the index or adding new features.
  • With our recent launch of Related Posts we peak at about a million queries an hour. (More on scaling in a future post.)

Our current methods of minimizing cluster restart times:

1. After bulk indexing we perform the following:
  • Optimize all indices and set the max segments to 5.
  • Perform a rolling restart of the cluster (last one took 38 hours to complete)

By optimizing the index into a smaller number of segments we significantly decrease query time for the older documents. Also, since most of our data never changes we ensure that most of our data will be in large segments that should stay in sync across the cluster. They are less likely to be merged because they are already big.

The rolling restart of the cluster after bulk indexing ensures that all nodes have identical segments. By incurring the cost of restarting just after bulk indexing, we ensure that if a real issue comes up later that requires restarting then we will be able to restart more quickly.

Our current rolling restart only restarts a single node at a time. Because we are using shard allocation awareness we could increase the number of nodes we restart at once if we want to reduce the total time to restart the cluster, but that would also reduce our capacity for servicing incoming queries.

2. When doing a rolling restart, disable allocation
curl -XPUT localhost:9200/_cluster/settings -d '{"transient":{"cluster.routing.allocation.disable_allocation": true}}'

This ensures that there will not be a lot of thrashing of shards around the cluster as the nodes are restarted. Otherwise when we shutdown a node the cluster will try and try and allocate the shards on that node onto other nodes. Once the node is back up, you can then set this value to false.

3. Use noop Linux scheduling  on SSDs rather then CFQ for a significant speed up

When we tested making this update we saw the node restart time (from shutting the node down to all shards being recovered) drop from an average of 150 seconds/node to 96 seconds/node. This was for the case where there was very little difference between the shards on different nodes. When you are doing a rolling restart of 30 nodes, that’s a really big difference. Props to Github for investigating the performance impact of the scheduler.

4. Increase the default recovery limits
cluster.routing.allocation.node_initial_primaries_recoveries: 4
cluster.routing.allocation.node_concurrent_recoveries: 15
indices.recovery.max_bytes_per_sec: 100mb
indices.recovery.concurrent_streams: 5

We’ve tried increasing the max_bytes_per_sec above 100mb, but that runs us into cases where the network traffic starts interfering with the query traffic. You will of course get different results depending on what hardware you are using. In general the ES defaults are set for Amazon EC2, so you can increase your limits a lot if you have your own hardware.

5. Periodic rolling restarts?

One thing I am considering is periodically doing a rolling restart of the cluster. Every few months or so. The only real reason to do this is that it will help me recover faster if I really have to do a restart due to some cluster or hardware failure. Though with the rate that new ES releases occur we’ll probably have a reason to perform such a restart periodically anyways. Not to mention the possibility of bulk reindexing in order to add new features.

I am curious how our restart time will change over time. I would theorize that since most of our data doesn’t change, that data will slowly get accumulated in the older, larger segments while the newer posts will be in the smaller, newer segments. For the most part it will be these newer segments that need to get recovered from the primary shard.


Get every new post delivered to your Inbox.

Join 1,448 other followers