Benchmarking IEnumerables in F# - Seq.timed
Feb 8, 2016It’s pretty straightforward to do basic benchmarking of a single, self-contained piece of code in .NET. You just make a Stopwatch sandwich (let sw = Stopwatch.StartNew(); <code goes here>; sw.Stop()), then read off the elapsed time from the Stopwatch.
What about measuring the throughput of a data pipeline? In this case one is less interested in timing a single block of code from start to finish, and more interested in bulk metrics like computations/sec or milliseconds/item. Oftentimes such pipelines are persistent or very long-running, so a useful benchmark would not be a one-time measurement, but rather something that samples repeatedly.
Furthermore, it’s sometimes difficult to determine where the bottleneck in a chain of computations lies. Is the root data source the culprit? Or is it perhaps an intermediate transformation that’s slow, or even the final consumer?
This kind of problem came up for me recently, so I put together a timing function Seq.timed. The complete code is at the bottom of the post.
Seq.timed
val timed : blockSize:int option -> f:(int -> TimeSpan -> float -> unit) -> source:seq<'t> -> seq<'t>
- blockSizeis the sample size - how many sequence elements should flow in between timing callbacks? Or specify None to only make a single callback when the source is exhausted.
- fis the timing callback - it is invoked once for every- blockSizeitems enumerated, and/or when the source sequence is exhausted. It is passed the count of items since the last callback, a- TimeSpanindicating how long it took for that many items to be enumerated, and a float indicating the proportion of that time spent upstream in the source sequence (as opposed to downstream in consumers of the resulting sequence).
- sourceis the input sequence.
The idea is that you can simply slip this into an existing F#-style collection-processing pipeline, or tag it on the end of any existing seq/IEnumerable. The resulting sequence yields exactly the same values as the source, but also keeps track of timing each block of items as they are enumerated, invoking the provided callback for each block.
Examples
Here’s a small example that benchmarks a toy sequence, showing measurement of overall throughput along with detection of upstream/downstream relative speed:
open System.Threading
seq{ 1 .. 100 }
|> Seq.timed None Scale.s                // add timing here
|> Seq.map (fun i -> Thread.Sleep(10); i)
|> Seq.map (fun i -> Thread.Sleep(25); i)
|> Seq.iter (fun i -> Thread.Sleep(15))
// output
// 100 items       5.15s      19.43 items/s       0.05 s/item ( 0% upstream | 100% downstream)
seq{ 1 .. 100 }
|> Seq.map (fun i -> Thread.Sleep(10); i)
|> Seq.timed None Scale.s                // move timing to here
|> Seq.map (fun i -> Thread.Sleep(25); i)
|> Seq.iter (fun i -> Thread.Sleep(15))
// output
// 100 items       5.13s      19.51 items/s       0.05 s/item (20% upstream | 80% downstream)
seq{ 1 .. 100 }
|> Seq.map (fun i -> Thread.Sleep(10); i)
|> Seq.map (fun i -> Thread.Sleep(25); i)
|> Seq.timed None Scale.s                // move timing to here
|> Seq.iter (fun i -> Thread.Sleep(15))
// output
// 100 items       5.15s      19.43 items/s       0.05 s/item (70% upstream | 30% downstream)
Here’s something a little more realistic - a tiny grep routine. Where is the time spent here - reading content from the disk or doing the regular expression matching?
open System.IO
open System.Text.RegularExpressions
let grep dir files patt =
    Directory.EnumerateFiles(dir, files, SearchOption.AllDirectories)
    |> Seq.collect (File.ReadAllLines)
    |> Seq.timed (Some 100000) Scale.ms
    |> Seq.filter (fun line -> Regex.IsMatch(line, patt))
// a cold search only reads ~150K lines/sec and large
// majority of time is spent reading from disk
grep @"C:\src\visualfsharp" "*.fs*" @"\blet\b" |> Seq.length
// 100000 items     552.14ms     181.11 items/ms       0.01 ms/item (89% upstream | 11% downstream)
// 100000 items     559.99ms     178.58 items/ms       0.01 ms/item (88% upstream | 12% downstream)
// 100000 items     777.57ms     128.61 items/ms       0.01 ms/item (91% upstream |  9% downstream)
// 100000 items     708.15ms     141.21 items/ms       0.01 ms/item (92% upstream |  8% downstream)
// 100000 items    3372.02ms      29.66 items/ms       0.03 ms/item (98% upstream |  2% downstream)
// 100000 items    3788.68ms      26.39 items/ms       0.04 ms/item (98% upstream |  2% downstream)
// 6740 items      75.73ms      89.00 items/ms       0.01 ms/item (95% upstream |  5% downstream)
// but for a second invocation the disk is warm and results are from the cache,
// so the time is more evenly split (and overall throughput is much higher)
grep @"C:\src\visualfsharp" "*.fs*" @"\blet\b" |> Seq.length
// 100000 items      99.67ms    1003.26 items/ms       0.00 ms/item (44% upstream | 56% downstream)
// 100000 items     175.45ms     569.95 items/ms       0.00 ms/item (59% upstream | 41% downstream)
// 100000 items     109.65ms     911.98 items/ms       0.00 ms/item (39% upstream | 61% downstream)
// 100000 items      92.56ms    1080.36 items/ms       0.00 ms/item (35% upstream | 65% downstream)
// 100000 items     193.58ms     516.59 items/ms       0.00 ms/item (70% upstream | 30% downstream)
// 100000 items     217.47ms     459.84 items/ms       0.00 ms/item (73% upstream | 27% downstream)
// 6740 items       8.71ms     774.07 items/ms       0.00 ms/item (49% upstream | 51% downstream)
How much overhead does Seq.timed add? That’s the real test - can Seq.timed benchmark itself?
// 1000000 items 80442000.00ns       0.01 items/ns      80.44 ns/item (46% upstream | 54% downstream)
Enumerable.Range(1, 1000000)
|> Seq.timed None Scale.ns
|> Seq.length
// 1000000 items 170637400.00ns       0.01 items/ns     170.64 ns/item (81% upstream | 19% downstream)
Enumerable.Range(1, 1000000)
|> Seq.timed None (fun _ _ _ -> ())
|> Seq.timed None Scale.ns
|> Seq.length
// 1000000 items 242433000.00ns       0.00 items/ns     242.43 ns/item (88% upstream | 12% downstream)
Enumerable.Range(1, 1000000)
|> Seq.timed None (fun _ _ _ -> ())
|> Seq.timed None (fun _ _ _ -> ())
|> Seq.timed None Scale.ns
|> Seq.length
Our baseline is about 80ns/item. Adding in one layer of Seq.timed slows throughput to 170ns/item, and two layers slows throughput to about 240ns/item. Looking at these deltas and rounding up, we make a rough estimate that Seq.timed adds a cost of ~100ns per item to a sequence (in my environment). So for data pipelines that are processing fewer than 1M items/sec, Seq.timed adds less than 1% overhead. For anything faster than that, it starts to stick out a bit.
Most of the work being done within Seq.timed is the stopping and starting of timers used to calculate the upstream/downstream ratio. We can speed things up significantly by removing that feature and measuring only the total throughput. I’ve called that version Seq.timedSlim.
// 1000000 items 8791000.00ns       0.11 items/ns       8.79 ns/item
Enumerable.Range(1, 1000000)
|> Seq.timedSlim None Scale.ns
|> Seq.length
// 1000000 items 13693300.00ns       0.07 items/ns      13.69 ns/item
Enumerable.Range(1, 1000000)
|> Seq.timedSlim None (fun _ _ _ -> ())
|> Seq.timedSlim None Scale.ns
|> Seq.length
// 1000000 items 18784500.00ns       0.05 items/ns      18.78 ns/item
Enumerable.Range(1, 1000000)
|> Seq.timedSlim None (fun _ _ _ -> ())
|> Seq.timedSlim None (fun _ _ _ -> ())
|> Seq.timedSlim None Scale.ns
|> Seq.length
Once again considering the deltas and rounding up, it appears this version is about 10x faster, slowing throughput by only <10ns/item. One needs to be processing 10M items/sec before Seq.timedSlim makes more than 1% impact.
Code
Full implementation is below. A few notes/gotchas:
- The final callback won’t be invoked if the downstream consumer stops before the upstream source is finished. e.g. when the sequence is shortened downstream by Seq.take.
- Beware of transformations like Seq.sortwhich slurp in the entire upstream sequence at full speed before doing any work to contribute downstream.
- Beware of sequences that do their work not in MoveNext()but inCurrent. Whether to assign the cost ofCurrentto the upstream or downstream sequence is debatable, but I’ve left it assigned to downstream.
open System
open System.Collections
open System.Collections.Generic
open System.Diagnostics
module Seq =
    type private TimedEnumerator<'t>(source : IEnumerator<'t>, blockSize, f) =
        let swUpstream = Stopwatch()
        let swDownstream = Stopwatch()
        let swTotal = Stopwatch()
        let mutable count = 0
        interface IEnumerator<'t> with
            member __.Reset() = failwith "not implemented"
            member __.Dispose() = source.Dispose()
            member __.Current = source.Current
            member __.Current = (source :> IEnumerator).Current
            member __.MoveNext() =
                // downstream consumer has finished processing the previous item
                swDownstream.Stop()
                // start the total throughput timer if this is the first item
                if not swTotal.IsRunning then
                    swTotal.Start()
                // measure upstream MoveNext() call
                swUpstream.Start()
                let result = source.MoveNext()
                swUpstream.Stop()
                if result then
                    count <- count + 1
                // blockSize reached or upstream is complete - invoke callback and reset timers
                if (result && count = blockSize) || (not result && count > 0) then
                    swTotal.Stop()
                    let upDownTotal = swUpstream.Elapsed + swDownstream.Elapsed
                    f count swTotal.Elapsed (swUpstream.Elapsed.TotalMilliseconds / upDownTotal.TotalMilliseconds)
                    count <- 0
                    swUpstream.Reset()
                    swDownstream.Reset()
                    swTotal.Restart()
                // clock starts for downstream consumer
                swDownstream.Start()
                result
    type private SlimTimedEnumerator<'t>(source : IEnumerator<'t>, blockSize, f) =
        let swTotal = Stopwatch()
        let mutable count = 0
        interface IEnumerator<'t> with
            member __.Current = source.Current
            member __.Current = (source :> IEnumerator).Current
            member __.Reset() = failwith "not implemented"
            member __.Dispose() = source.Dispose()
            member __.MoveNext() =
                // start the timer on the first MoveNext call
                if not swTotal.IsRunning then
                    swTotal.Start()
                let result = source.MoveNext()
                if result then
                    count <- count + 1
                // blockSize reached or upstream is complete - invoke callback and reset timer
                if (result && count = blockSize) || (not result && count > 0) then
                    swTotal.Stop()
                    f count swTotal.Elapsed Double.NaN
                    count <- 0
                    swTotal.Restart()
                result
    let mkTimed blockSize getEnum =
        let block =
            match blockSize with
            | Some(bs) when bs <= 0 -> invalidArg "blockSize" "blockSize must be positive"
            | Some(bs) -> bs
            | None -> -1
        { new IEnumerable<'a> with
              member __.GetEnumerator() = upcast (getEnum block)
          interface IEnumerable with
              member __.GetEnumerator() = upcast (getEnum block) }
    let timed blockSize f (source : seq<'t>) =
        mkTimed blockSize (fun block -> new TimedEnumerator<'t>(source.GetEnumerator(), block, f))
    let timedSlim blockSize f (source : seq<'t>) =
        mkTimed blockSize (fun block -> new SlimTimedEnumerator<'t>(source.GetEnumerator(), block, f))
// sample callbacks that scale time units
module Scale =
    let private stats unitString getUnit blockSize elapsedTotal upstreamRatio =
        let totalScaled = getUnit (elapsedTotal : TimeSpan)
        let itemsPerTime = (float blockSize) / totalScaled
        let timePerItem = totalScaled / (float blockSize)
        let upDownPercentStr =
            if Double.IsNaN(upstreamRatio) then ""
            else sprintf " (%2.0f%% upstream | %2.0f%% downstream)" (100. * upstreamRatio) (100. * (1. - upstreamRatio))
        Console.WriteLine(
            sprintf "%d items %10.2f{0} %10.2f items/{0} %10.2f {0}/item%s"
                blockSize totalScaled itemsPerTime timePerItem upDownPercentStr,
            (unitString : string)
        )
    let s = stats "s" (fun ts -> ts.TotalSeconds)
    let ms = stats "ms" (fun ts -> ts.TotalMilliseconds)
    let μs = stats "μs" (fun ts -> ts.TotalMilliseconds * 1e3)
    let ns = stats "ns" (fun ts -> ts.TotalMilliseconds * 1e6)