Paging Doctor AWS

I've been using AWS for quite a while now, and one thing that is as inevitable as death and taxes is dealing with paging. All of the APIs that return multiple items support pagination in some way, and anytime you write code that needs to act on all items, you need to handle fetching pages. My quest to parse my own access logs, is no exception, since my logs are stored in S3 and my API needs to list all of the logs for a specific date, of which there could be thousands. The way I've typically handled paging in Clojure with with a loop / recur similar to this:

(defn get-items [{:keys [s3-client s3-bucket s3-page-size] :as logs-client}
                 prefix]
  (loop [items []
         token nil]
    (let [_ (println "Fetching page")
          res (aws/invoke s3-client
                          {:op :ListObjectsV2
                           :request (mk-s3-req s3-bucket prefix s3-page-size token)})
          items (concat items (:Contents res))]
      (if (:IsTruncated res)
        (recur items (:NextContinuationToken res))
        items))))

logs-client is a data structure containing the stuff I need to talk to S3:

{:aws-region "eu-west-1",
 :s3-client ; Cognitect aws-api client here
 :s3-bucket "logs.jmglov.net",
 :s3-prefix "logs/",
 :s3-page-size 25}

And prefix is the start of S3 keys for CloudFront access logs for a specific date, for example: logs/A1BCD23EFGHIJ4.2022-09-15-

Finally, mk-s3-req is a function that constructs a ListObjectsV2 request, adding the continuation token if it's non-nil:

(defn mk-s3-req [s3-bucket prefix s3-page-size continuation-token]
  (merge {:Bucket s3-bucket
          :Prefix prefix}
         (when s3-page-size
           {:MaxKeys s3-page-size})
         (when continuation-token
           {:ContinuationToken continuation-token})))

The loop / recur approach has the benefit of being fairly simple, but the drawback of being a lazily-ticking time bomb, as Stuart Sierra explains. It also has the drawback of fetching all of the items, regardless of how many I need. For example, if I need the first two items for some reason:

(take 2 (get-items))

I'll see something like this in my REPL buffer:

Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page

Now imagine instead of nine pages of items, I have hundreds. Yikes!

In practice, this hasn't been a problem for me, as I've seldom had enough pages for this to really matter, but it does feel yucky, to use a technical term. Surely we can do better, right?

Right. In fact, Clojure has an entire abstraction built around the concept of taking just what you need: the sequence abstraction. A sequence (often called a "seq" in the Clojure literature) is a logical list that is usually lazily realised, and many of the sequence library functions in clojure.core are lazy, such as map, filter, take, drop, etc. If we can find a way to generate a lazy sequence, the code above that takes the first two items in the sequence should only fetch the first page.

And where there's a will, Clojure has a way: the iterate function. According to the docs, it returns a lazy sequence generated by a function that is repeatedly called with the previous value returned by the function. This is easier to see than it is to explain:

(comment

  (->> (iterate inc 5)
       (take 5))
  ;; => (5 6 7 8 9)

)

If we unroll this in our heads, this is what's going on:

(comment

  (cons
   5
   (cons
    (inc 5)
    (cons
     (inc (inc 5))
     (cons
      (inc (inc (inc 5)))
      (cons
       (inc (inc (inc (inc 5))))
       nil)))))
  ;; => (5 6 7 8 9)

)

The cool thing about iterate is that our generator function f can do whatever it wants, as long as it takes one argument. So what if we write an f that fetches a page of results from S3? We have something similar to that in the body of the loop we wrote above, so let's adapt that a bit:

(defn get-s3-page [{:keys [s3-client s3-bucket s3-page-size]}
                   prefix
                   token]
  (let [_ (println "Fetching page")
        res (aws/invoke s3-client
                        {:op :ListObjectsV2
                         :request (mk-s3-req s3-bucket prefix s3-page-size token)})]
    :???))

Then we can do stuff like this:

(defn s3-page-iterator [logs-client prefix]
  (partial get-s3-page logs-client prefix))

(->> (iterate (s3-page-iterator logs-client prefix) nil)
     (take 2))

The s3-page-iterator function transforms our get-s3-page function, which takes three parameters, into a function that takes one parameter so we can use it with iterate. It does this through the magic of partial, which takes a function and some arguments, and returns a new function with those arguments applied. For example, we can create a partial function from +, which takes two (or more) arguments, that adds 1 to its argument:

(comment

  (def plus1 (partial + 1))

  (plus1 2)
  ;; => 3

)

There's an open question here, though. What in the world should get-s3-page return? Since it's currently called with a continuation token, and the definition of iterate is that it calls the function with the return value of its previous invocation, we have no choice but to return the continuation token from the result of the ListObjectsV2 call:

(:NextContinuationToken res)

The only problem with this is that now we're throwing away the actual items, so what we'll get is something like this:

(comment

  (->> (iterate (s3-page-iterator logs-client prefix) nil)
       (take 2))
  ;; => (nil
  ;;     "1wfyyUjO9xuARj07VdnGBHFPmwEwTyDn7VTywaAH6L417g/fWqfeVJRrCV+nFPFHwLVJ3+CWT6BXTdzyKIPsmZm4U0VpZRffW")

)

That's not all that interesting, to be honest. The one piece of good news is that our REPL buffer looks like this:

Fetching page

We have at least succeeded in being lazy! 🏆

As things stand, our function takes the token as its argument, meaning have what we need in order to do the pagination, but not what we need in order to build up the sequence of items to return. If we think about where the token comes from, it's actually the same place as the items: the response of the ListObjectsV2 call. So what if we return the API response from the function, meaning that the function would then be called with that response on the next iteration? That could look something like this:

(defn get-s3-page [{:keys [s3-client s3-bucket s3-page-size]}
                   prefix prev]
  (let [{token :NextContinuationToken} prev
        _ (println "Fetching page")
        res (aws/invoke s3-client
                        {:op :ListObjectsV2
                         :request (mk-s3-req s3-bucket prefix s3-page-size token)})]
    res))

(comment

  (->> (iterate (s3-page-iterator logs-client prefix) nil)
       (take 2))
  ;; => (nil
  ;;     {:Prefix "logs/A1BCD23EFGHIJ4.2022-09-15-",
  ;;      :NextContinuationToken
  ;;      "1s4mtjEeNyCwY1wu7URgZz/kHpfSH3HhaG26VfcL8sgSNXZF/iYdAjSNQQpTiLF+TRoIB/tD93dC/QmqcmFYo+ZgxX0oZg7+v",
  ;;      :Contents
  ;;      [{:Key "logs/A1BCD23EFGHIJ4.2022-09-15-00.0125de6e.gz",
  ;;        :LastModified #inst "2022-09-15T00:39:56.000-00:00",
  ;;        :ETag "\"0fc8a817d4f9b742b6ae83292a181385\"",
  ;;        :Size 1496,
  ;;        :StorageClass "STANDARD"}
  ;;       ...
  ;;       {:Key "logs/A1BCD23EFGHIJ4.2022-09-15-02.95762404.gz",
  ;;        :LastModified #inst "2022-09-15T02:49:56.000-00:00",
  ;;        :ETag "\"617e1235db32c79c37f5776abc5ff3ec\"",
  ;;        :Size 800,
  ;;        :StorageClass "STANDARD"}],
  ;;      :MaxKeys 25,
  ;;      :IsTruncated true,
  ;;      :Name "logs.jmglov.net",
  ;;      :KeyCount 25})

)

This is definitely moving in the right direction, but there are a couple of issues. First things first, the first thing is nil, since that's the initial argument we gave to iterate. Second, the second thing is not a list of items, but rather a data structure containing a list of items under the :Contents key. We can address both of these issues by wrapping get-s3-page in a new function that handles turning a list of pages into a list of items:

(defn list-objects [logs-client prefix]
  (->> (iterate (s3-page-iterator logs-client prefix) nil)
       (drop 1)
       (mapcat (comp (partial map :Key) :Contents))))

(comment

  (->> (list-objects logs-client prefix)
       (take 2))
  ;; ...
  ;; ...
  ;; OMG what is going on here??? My REPL hasn't printed the result yet,
  ;; and I've been waiting several minutes! 😱

)

Something is deeply wrong here, clearly. Looking at our REPL buffer, we find a clue:

Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
Fetching page
...

This looks like an infinite loop, and given that we're looping on the continuation token, that is probably the issue. Let's have a look at the AWS documentation for ListObjectsV2 and see what we can learn about the continuation token. In the response:

NextContinuationToken

NextContinuationToken is sent when IsTruncated is true, which means there are more keys in the bucket that can be listed. The next list requests to Amazon S3 can be continued with this NextContinuationToken.

Type: string

Aha! What is this IsTruncated of which they speak?

IsTruncated

Set to false if all of the results were returned. Set to true if more keys are available to return.

Type: Boolean

Let's think about what our get-s3-page function is doing when called by iterate. Assuming we have five pages of objects with the prefix logs/A1BCD23EFGHIJ4.2022-09-15-, here's what our requests will look like:

 -> NextContinuationToken nil
<- IsTruncated true, NextContinuationToken "t1"
 -> NextContinuationToken "t1"
<- IsTruncated true, NextContinuationToken "t2"
 -> NextContinuationToken "t2"
<- IsTruncated true, NextContinuationToken "t3"
 -> NextContinuationToken "t3"
<- IsTruncated true, NextContinuationToken "t4"
 -> NextContinuationToken "t4"
<- IsTruncated false
 -> NextContinuationToken nil
<- IsTruncated true, NextContinuationToken "t1"
...

Right, so we'll just cycle through the pages until the heat death of the universe (assuming the universe continues expanding forever and thus atoms get too far apart to bang against each other and generate thermodynamic energy, an assumption which is frankly outside the scope of this blog post which is supposed to be about making S3 requests and not theoretical astrophysics). This is clearly A Bad Thing™ (cycling through pages, I mean, not the heat death of the universe, though that would also be A Bad Thing™), so let's see if we can't reason our way out of this with a little hammock time (this is actually an excuse for me to go walk Rover, since the poor fellow has been waiting 20 minutes for me to get to a good stopping point).

A dog walks in a grassy field

OK, speaking of stopping points, it seems like what we need in this function is a stopping point. We know that when IsTruncated is false, there is no next page, so we should stop right there. Let's see how we can do that in code:

(defn get-s3-page [{:keys [s3-client s3-bucket s3-page-size]}
                   prefix prev]
  (let [{token :NextContinuationToken
         truncated? :IsTruncated
         page-num :page-num} prev
        page-num (if page-num (inc page-num) 1)
        done? (false? truncated?)
        res (when-not done?
              (println "Fetching page" page-num)
              (-> (aws/invoke s3-client
                              {:op :ListObjectsV2
                               :request (mk-s3-req s3-bucket prefix s3-page-size token)})
                  (assoc :page-num page-num)))]
    res))

(comment

  (->> (iterate (partial get-s3-page
                         (assoc logs-client :s3-page-size 100) prefix)
                nil)
       (take 6))
  ;; => (nil
  ;;     {:Prefix "logs/E1HJS54CQLFQU4.2022-09-15-",
  ;;      :page-num 1,
  ;;      :Contents [...],
  ;;      :NextContinuationToken "1kkaMk4RnoZHxnSRa5TBnVMb9NECfmmq...",
  ;;      :MaxKeys 100,
  ;;      :IsTruncated true,
  ;;      :Name "logs.jmglov.net",
  ;;      :KeyCount 100}
  ;;     {:Prefix "logs/E1HJS54CQLFQU4.2022-09-15-",
  ;;      :page-num 2,
  ;;      :Contents [...],
  ;;      :NextContinuationToken
  ;;      "1Jz5W+GkccdkP6Jc7tDsdqqrHFlgThRSt2ZhgHh1uYPA1gIzR4aer2l...",
  ;;      :ContinuationToken
  ;;      "1kkaMk4RnoZHxnSRa5TBnVMb9NECfmmqFfDLlcxdn6GdCiMc8ZzNQRj...",
  ;;      :MaxKeys 100,
  ;;      :IsTruncated true,
  ;;      :Name "logs.jmglov.net",
  ;;      :KeyCount 100}
  ;;     {:Prefix "logs/E1HJS54CQLFQU4.2022-09-15-",
  ;;      :page-num 3,
  ;;      :Contents [...],
  ;;      :ContinuationToken
  ;;      "1Jz5W+GkccdkP6Jc7tDsdqqrHFlgThRSt2ZhgHh1uYPA1gIzR4aer2l...",
  ;;      :MaxKeys 100,
  ;;      :IsTruncated false,
  ;;      :Name "logs.jmglov.net",
  ;;      :KeyCount 36}
  ;;     nil
  ;;     {:Prefix "logs/E1HJS54CQLFQU4.2022-09-15-",
  ;;      :page-num 1,
  ;;      :Contents [...],
  ;;      :NextContinuationToken
  ;;      "1EPegB1wcwtgRoGRiJpM4YYjiTvLHYYVjKr+ghX30LhowHoHMAepdcR...",
  ;;      :MaxKeys 100,
  ;;      :IsTruncated true,
  ;;      :Name "logs.jmglov.net",
  ;;      :KeyCount 100})
       
)

This is much better! Now we can see that we fetch page 1, which contains 100 items (:KeyCount); page 2, which also contains 100 items; page 3, which is the last page and contains only 36 items; then since :IsTruncated is false the next time iterate calls get-s3-page, we don't fetch the page and just return nil; then we start over again with page 1, and would continue cycling forever if it hadn't been for the (take 6).

This experiment has accomplished something very important, however. Now we have a marker for when we've reached the last page and should therefore break the cycle! We can update our list-objects wrapper function to look for this marker:

(defn list-objects [logs-client prefix]
  (->> (iterate (s3-page-iterator logs-client prefix) nil)
       (drop 1)
       (take-while :Contents)
       (mapcat (comp (partial map :Key) :Contents))))

(comment

  (->> (list-objects logs-client prefix)
       (take 5))
  ;; => ("logs/E1HJS54CQLFQU4.2022-09-15-00.0125de6e.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-00.3b36a099.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-00.54775acb.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-00.6c612378.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-00.73072440.gz")

)

Victory!

We can test out our laziness by taking 105 items, which should require fetching the first two pages of results but nothing more:

(comment

  (->> (list-objects logs-client prefix)
       (take 105))
  ;; => ("logs/E1HJS54CQLFQU4.2022-09-15-00.0125de6e.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-00.3b36a099.gz"
  ;;     ...
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-10.ae86e512.gz"
  ;;     "logs/E1HJS54CQLFQU4.2022-09-15-10.b4a720f9.gz")

)

Strangely, our REPL buffer reports that we fetched three pages:

Fetching page 1
Fetching page 2
Fetching page 3

In fact, we see the same thing even when we took just 5 items. What in the world is going on here?

Well, it turns out that Clojure realises lazy sequences in chunks; in other words, it optimistically calls the function producing the lazy sequence a few times to ensure that you have enough items in the sequence. In our case, our sequence seems to be realised in chunks of 3?

I have to admit that my knowledge of Clojure internals doesn't go this deep, and I hope someone reading this can explain things to me on Twitter.

In any case, at least we've accomplished our original goal of not fetching all the things when we only need some of the things. So that's something. 🤷

Discuss this post here.

Published: 2022-09-22

Tagged: clojure

Archive