Evaluating DSP operations in minimal buffer space by pipelining

Kragen Javier Sitaker, 2018-12-18 (updated 2018-12-19) (20 minutes)

I was thinking about how to implement a cascade of short-memory filters in C, how to manage all of their circular buffers, and it occurred to me that I can actually use the same circular buffer for all of them. Furthermore, especially if I’m willing to accept a few samples of extra latency, this allows me to move samples from one stage of the pipeline to the next with zero cost.

Suppose, for example, that I’m cascading the three FIR filters y(n) = x(n) - x(n-1), y(n) = x(n) - x(n-5), and y(n) = x(n) + x(n-15). Let’s suppose our circular buffer b, indexed by index bi, has a size that is a power of 2 with bitmask m, so the current input sample is at b[bi & m]. It’s convenient to invert the polarity of our first filter: y(n) = x(n-1) - x(n). We can implement this as follows:

b[(bi - 1) & m] -= b[bi & m];

This converts the sample at bi-1 from being x(n-1) to being y(n).

Now our second-stage filter, y(n) = x(n) - x(n-5), is being fed an inverted signal. So if it, too, is inverted, to y(n) = x(n-5) - x(n), we can implement it as follows:

b[(bi - 6) & m] -= b[(bi - 1) & m];

This converts the sample at bi-6 from being x(n-5) to being y(n). Note that this is the value we computed in the previous step, so it doesn’t need to be loaded from RAM. The inversion of sign in this step cancels the inversion of sign in the previous step.

Then, our final stage, y(n) = x(n) + x(n-15), can be implemented similarly:

b[(bi - 21) & m] += b[(bi - 6) & m];

This converts the sample at bi-21 from being x(n-15) to being y(n).

Finally, you need to delay the output from the pipeline for 21 samples. Here I’m also dividing it by 4 to compensate for the amplification by the above filters.

if (bi >= 21) buf[bi - 21] = b[(bi - 21) & m] >> 2;

Here’s a loop doing the above pipeline, although I haven’t verified that it performs the desired DSP function:

  for (bi = 0; bi != n; bi++) {
    b[bi & m] = buf[bi];
    b[(bi - 1) & m] -= b[bi & m];
    b[(bi - 6) & m] -= b[(bi - 1) & m];
    b[(bi - 21) & m] += b[(bi - 6) & m];
    if (bi >= 21) buf[bi - 21] = b[(bi - 21) & m] >> 2;
  }

amd64 assembly

Here’s what it looks like compiled with GCC, 92 bytes:

  400653:   74 5a                   je     4006af <filter+0x89>
  400655:   4d 89 c1                mov    %r8,%r9
  400658:   ba 00 00 00 00          mov    $0x0,%edx
         ; LOOP START
  40065d:   41 0f be 01             movsbl (%r9),%eax             ; load buf[bi]
  400661:   0f b6 ca                movzbl %dl,%ecx               ; bi & m
  400664:   89 04 8c                mov    %eax,(%rsp,%rcx,4)     ; b[bi & m] = ...
  400667:   8d 4a ff                lea    -0x1(%rdx),%ecx        ; bi - 1
  40066a:   0f b6 c9                movzbl %cl,%ecx               ; & m
  40066d:   8b 1c 8c                mov    (%rsp,%rcx,4),%ebx     ; load b[(bi-1) & m]
  400670:   29 c3                   sub    %eax,%ebx              ; - b[bi & m]
  400672:   89 d8                   mov    %ebx,%eax
  400674:   89 1c 8c                mov    %ebx,(%rsp,%rcx,4)     ; store b[(bi-1) & m]
  400677:   8d 4a fa                lea    -0x6(%rdx),%ecx        ; bi - 6
  40067a:   0f b6 c9                movzbl %cl,%ecx               ; & m
  40067d:   8b 1c 8c                mov    (%rsp,%rcx,4),%ebx     ; load b[(bi-6) & m]
  400680:   29 c3                   sub    %eax,%ebx              ; - b[(bi-1) & m]
  400682:   89 d8                   mov    %ebx,%eax
  400684:   89 1c 8c                mov    %ebx,(%rsp,%rcx,4)     ; store b[(bi-6) & m]
  400687:   8d 4a eb                lea    -0x15(%rdx),%ecx       ; bi - 21
  40068a:   44 0f b6 d1             movzbl %cl,%r10d              ; & m (saving entire)
  40068e:   42 03 04 94             add    (%rsp,%r10,4),%eax     ; b[(bi-21) & m] + ...
  400692:   42 89 04 94             mov    %eax,(%rsp,%r10,4)     ; store it
  400696:   83 fa 14                cmp    $0x14,%edx             ; bi >= 21?
  400699:   76 09                   jbe    4006a4 <filter+0x7e>
  40069b:   89 c9                   mov    %ecx,%ecx              ; 2-insn NOP
  40069d:   c1 f8 02                sar    $0x2,%eax              ; b[...] >> 2
  4006a0:   41 88 04 08             mov    %al,(%r8,%rcx,1)       ; buf[
  4006a4:   83 c2 01                add    $0x1,%edx              ; bi++
  4006a7:   49 83 c1 01             add    $0x1,%r9               ; ++&buf[bi]
  4006ab:   39 fa                   cmp    %edi,%edx              ; bi != n?
  4006ad:   75 ae                   jne    40065d <filter+0x37>

It seems like %edx is being used as our loop counter bi, %r9 is being used as the input counter &buf[bi], and %edi is being used for n, the limit. My index mask m is 255, so GCC is implementing the masking with movzbl instructions. Each stage of the pipeline requires 6 instructions, for whatever that’s worth in 2018, and there’s an additional 4 instructions or so of loop overhead, for a total of 28 instructions inside the loop.

ARM assembly

The Cortex-M4 equivalent is as follows, 80 bytes and 29 instructions, 25 inside the loop:

  22:   b334        cbz r4, 72 <filter+0x72>
  24:   f105 3eff   add.w   lr, r5, #4294967295 ; 0xffffffff
  28:   2300        movs    r3, #0
  2a:   aa01        add r2, sp, #4
    ; LOOP START
  2c:   f81e 1f01   ldrb.w  r1, [lr, #1]!         ; buf[bi]
  30:   b2d8        uxtb    r0, r3                ; bi & m?
  32:   f842 1020   str.w   r1, [r2, r0, lsl #2]  ; b[bi & m] = ...
  36:   1e58        subs    r0, r3, #1            ; bi - 1
  38:   b2c0        uxtb    r0, r0                ; & m?
  3a:   f852 6020   ldr.w   r6, [r2, r0, lsl #2]  ; load b[(bi-1) & m]
  3e:   1a76        subs    r6, r6, r1            ; subtract r1 from it
  40:   f842 6020   str.w   r6, [r2, r0, lsl #2]  ; store b[(bi-1) & m]
  44:   1f98        subs    r0, r3, #6            ; bi - 6
  46:   b2c0        uxtb    r0, r0                ; & m?
  48:   f852 1020   ldr.w   r1, [r2, r0, lsl #2]  ; load b[bi-6 & m]
  4c:   1b8e        subs    r6, r1, r6            ; subtract r6 from it
  4e:   f842 6020   str.w   r6, [r2, r0, lsl #2]  ; store b[bi-6 & m]
  52:   f1a3 0015   sub.w   r0, r3, #21           ; bi - 21
  56:   b2c0        uxtb    r0, r0                ; & m?
  58:   f852 1020   ldr.w   r1, [r2, r0, lsl #2]  ; load b[bi-21 & m]
  5c:   4431        add r1, r6                ; add r6 to it
  5e:   f842 1020   str.w   r1, [r2, r0, lsl #2]  ; save it
  62:   2b14        cmp r3, #20               ; bi > 20?
  64:   bf84        itt hi                    ; conditional, unsigned greater
  66:   1089        asrhi   r1, r1, #2            ; r1 >>= 2
  68:   f80e 1c15   strbhi.w    r1, [lr, #-21]; store buf[bi-21]?
  6c:   3301        adds    r3, #1
  6e:   42bb        cmp r3, r7
  70:   d1dc        bne.n   2c <filter+0x2c>

This uses 5 instructions for each stage of the pipeline, against AMD64’s 6, and unlike implementations of AMD64, I think that number actually does mean 5 clock cycles under normal circumstances.

Actually, on further thought, I think my concern about the 21 samples of latency was unfounded. There’s a data path straight through the loop from buf[bi] to what ought to be buf[bi] but is instead buf[bi-21]. There’s additionally a delayed path through the FIFOs, which is as it should be.

(I fixed that bug and removed the >> 2 and verified that the impulse response was as it should be.)

Recursive filters

Recursive filters can also be implemented with this structure. Suppose we have a cascade of two humble integrators, y(n) = x(n) + y(n-1), as we might for the beginning of a Hogenauer filter. Of course, we could implement this as follows:

y1 += x;
y2 += y1;

But if we stick to this staged-FIFO structure so that we can implement arbitrary lags, it looks like this:

void filter2(char *buf, int n)
{
  enum { bufsiz = 256, m = bufsiz-1 };
  int b[bufsiz] = {0};

  for (unsigned bi = 0; bi != n; bi++) {
    b[bi & m] = buf[bi];               
    b[bi & m] += b[(bi - 1) & m];          // ∫1
    b[(bi - 1) & m] += b[(bi - 2) & m];    // ∫2
    b[(bi - 7) & m] -= b[(bi - 1) & m];
    b[(bi - 12) & m] -= b[(bi - 7) & m];
    buf[bi] = b[(bi - 12) & m] >> 3;       // compensate for 25× amplfier
  }
}

The ∫1 line computes the new y(n) value from the new x(n) value and the previous y(n) value; the ∫2 line does the same thing, but for the second integrator. Absent any further transmogrification, it leaves a trail of second-order integrator values behind in the buffer. Then, immediately below, we have two feedforward comb filter lines, which tame the wild integrators and convert them into a mild-mannered triangular-kernel FIR filter, completing the structure of a second-order (CIC) Hogenauer low-pass filter, although in this case without the usual decimation step.

In ∫1, because we don’t need to preserve x(n), we can transform it into y(n), saving a memory load. But in the ∫2 stage, we do need to preserve x(n) (the previous stage’s y(n)), so we just overwrite y(n-1) instead.

Again, in the comb filters, we’re inverting the sign here for convenience — instead of computing y(n) = x(n) - x(n-5), we’re computing x(n-5) - x(n), which requires no extra work because we have an even number of such inverting comb filters in the pipeline.

This filter function ends up as follows on amd64 with -Os:

  9b:   48 81 ec 18 04 00 00    sub    $0x418,%rsp
  a2:   48 89 fa                mov    %rdi,%rdx
  a5:   b9 00 01 00 00          mov    $0x100,%ecx               ; buffer size to zero
  aa:   64 48 8b 04 25 28 00    mov    %fs:0x28,%rax
  b1:   00 00 
  b3:   48 89 84 24 08 04 00    mov    %rax,0x408(%rsp)
  ba:   00 
  bb:   31 c0                   xor    %eax,%eax
  bd:   48 8d 7c 24 08          lea    0x8(%rsp),%rdi
  c2:   f3 ab                   rep stos %eax,%es:(%rdi)         ; zero the buffer
  c4:   48 89 d7                mov    %rdx,%rdi
    ;; LOOP START
  c7:   39 f0                   cmp    %esi,%eax                 ; bi (%eax) != n
  c9:   74 64                   je     12f <filter2+0x94>
  cb:   0f be 0f                movsbl (%rdi),%ecx               ; buf[bi]
  ce:   8d 50 ff                lea    -0x1(%rax),%edx           ; bi - 1
  d1:   44 0f b6 c0             movzbl %al,%r8d                  ; bi & m
  d5:   48 ff c7                inc    %rdi                      ; ++&buf[bi]
  d8:   0f b6 d2                movzbl %dl,%edx                  ; (bi - 1) & m
  db:   42 89 4c 84 08          mov    %ecx,0x8(%rsp,%r8,4)      ; b[(bi-1) & m] = …
  e0:   03 4c 94 08             add    0x8(%rsp,%rdx,4),%ecx     ; … + b[bi & m]
  e4:   42 89 4c 84 08          mov    %ecx,0x8(%rsp,%r8,4)      ; b[bi & m] = …
  e9:   8d 48 fe                lea    -0x2(%rax),%ecx           ; bi - 2
  ec:   44 8b 44 94 08          mov    0x8(%rsp,%rdx,4),%r8d  ; XXX redundant load
  f1:   0f b6 c9                movzbl %cl,%ecx
  f4:   44 03 44 8c 08          add    0x8(%rsp,%rcx,4),%r8d  ; XXX WTF
  f9:   8d 48 f9                lea    -0x7(%rax),%ecx           ; bi - 7
  fc:   0f b6 c9                movzbl %cl,%ecx                  ; & m
  ff:   44 89 44 94 08          mov    %r8d,0x8(%rsp,%rdx,4)
 104:   8b 54 8c 08             mov    0x8(%rsp,%rcx,4),%edx
 108:   44 29 c2                sub    %r8d,%edx
 10b:   89 54 8c 08             mov    %edx,0x8(%rsp,%rcx,4)
 10f:   8d 48 f4                lea    -0xc(%rax),%ecx
 112:   ff c0                   inc    %eax
 114:   0f b6 c9                movzbl %cl,%ecx
 117:   44 8b 4c 8c 08          mov    0x8(%rsp,%rcx,4),%r9d
 11c:   41 29 d1                sub    %edx,%r9d
 11f:   44 89 ca                mov    %r9d,%edx
 122:   44 89 4c 8c 08          mov    %r9d,0x8(%rsp,%rcx,4)
 127:   c1 fa 03                sar    $0x3,%edx
 12a:   88 57 ff                mov    %dl,-0x1(%rdi)
 12d:   eb 98                   jmp    c7 <filter2+0x2c>
 12f:   48 8b 84 24 08 04 00    mov    0x408(%rsp),%rax
 136:   00 
 137:   64 48 33 04 25 28 00    xor    %fs:0x28,%rax        ; stack canary‽
 13e:   00 00 
 140:   74 05                   je     147 <filter2+0xac>
 142:   e8 00 00 00 00          callq  147 <filter2+0xac>
 147:   48 81 c4 18 04 00 00    add    $0x418,%rsp
 14e:   c3                      retq

Looks like it has 6 stores, 5 fetches, and 30 instructions inside the loop to realize this four-stage pipeline, but again only 5 or 6 instructions per stage of the pipeline. I was hoping to reach some kind of conclusion here about whether recursive filtering was going to be slightly costlier, but I really have no idea.

It looks to me like GCC has given up on register allocation in part of this code and is just emitting redundant loads, which is weird.

However, this is definitely less costly:

y1 += x;
y2 += y1;

And if you’re doing a Hogenauer filter, you should probably not only do that, but you should also decimate the stuff downstream from the integrators.

But you can use this approach for other kinds of recursive filtering, too.

On one core of my laptop, this code filters 100 mebibytes (of malloced, mostly uninitialized memory) in 785 ms. That’s about 134 million samples per second, or about 12 clock cycles per sample, or about 3 clock cycles per sample per pipeline stage. Presumably the number on a Cortex-M4 would be more like 6 clock cycles per sample per pipeline stage, due to in-order execution.

This suggests that processing a signal at 44.1 ksps would require about 132 kHz of amd64 clock per stage or 266 kHz of Cortex-M4 clock per stage. This seems like it might come in at the sub-MIPS level I was hoping for in the Bleep modem.

Multirate processing

So here’s a Hogenauer filter with a twist — it’s tuned to detect one particular frequency (19110 Hz) and null another one (17640 Hz). At least, that was the intention. I haven’t tested this C code, just code in Python that purports to be equivalent, except that it uses floating-point (!!).

unsigned filter3(signed char *buf, int n)
{
  enum { bufsiz = 256, m = bufsiz-1 };
  int b[bufsiz] = {0};
  uint64_t d1=0, d2=0, d3=0, d4=0, i1=0, i2=0, i3=0, tmp, tmp2, c2=0, c3=0;
  int bi5=0, bo=0, odd=0;

  for (unsigned bi = 0; bi != n; bi++, bi5++) {
    tmp = buf[bi];
    tmp2 = tmp - d1;            /* Differentiator 1 */
    d1 = tmp;
    tmp = tmp2 - d2;            /* Differentiator 2 */
    d2 = tmp2;
    tmp2 = tmp - d3;            /* Differentiator 3 */
    d3 = tmp;
    tmp = tmp2 - d4;            /* Differentiator 4 */
    d4 = tmp2;
    i1 += odd ? -tmp : tmp;     /* Integrator 1 */
    odd = ~odd;
    i2 += i1;                   /* Integrator 2 */
    i3 += i2;                   /* Integrator 3 */

    if (bi5 == 5) {
      bi5 = 0;
      /* First comb filter: y(n) = x(n-2) - x(n); note inversion */
      b[bi & m] = i3;
      b[(bi - 2) & m] -= i3;
      /* Second comb filter: y(n) = x(n-1) - x(n); canceling inversion */
      tmp = c2 - b[(bi - 2) & m];
      c2 = b[(bi - 2) & m];
      /* Third comb filter: y(n) = x(n) - x(n-1), this time not inverting; but now into the output buffer */
      buf[bo++] = tmp - c3;
      c3 = tmp;
    }
  }
  return bo;
}

This rather alarming C comes out to this perhaps even more alarming assembly:

000000000000014f <filter3>:
 14f:   41 57                   push   %r15
 151:   41 56                   push   %r14
 153:   b9 00 01 00 00          mov    $0x100,%ecx
 158:   41 55                   push   %r13
 15a:   41 54                   push   %r12
 15c:   49 89 fc                mov    %rdi,%r12
 15f:   55                      push   %rbp
 160:   53                      push   %rbx
 161:   31 d2                   xor    %edx,%edx
 163:   45 31 db                xor    %r11d,%r11d
 166:   45 31 c0                xor    %r8d,%r8d
 169:   45 31 ff                xor    %r15d,%r15d
 16c:   48 81 ec 38 04 00 00    sub    $0x438,%rsp
 173:   45 31 d2                xor    %r10d,%r10d
 176:   45 31 c9                xor    %r9d,%r9d
 179:   64 48 8b 04 25 28 00    mov    %fs:0x28,%rax
 180:   00 00 
 182:   48 89 84 24 28 04 00    mov    %rax,0x428(%rsp)
 189:   00 
 18a:   31 c0                   xor    %eax,%eax
 18c:   48 8d 7c 24 28          lea    0x28(%rsp),%rdi
 191:   89 74 24 1c             mov    %esi,0x1c(%rsp)
 195:   45 31 f6                xor    %r14d,%r14d
 198:   45 31 ed                xor    %r13d,%r13d
 19b:   31 ed                   xor    %ebp,%ebp
 19d:   31 db                   xor    %ebx,%ebx
 19f:   f3 ab                   rep stos %eax,%es:(%rdi)
 1a1:   31 ff                   xor    %edi,%edi
    ; LOOP START
 1a3:   3b 54 24 1c             cmp    0x1c(%rsp),%edx
 1a7:   89 54 24 18             mov    %edx,0x18(%rsp)
 1ab:   0f 84 a0 00 00 00       je     251 <filter3+0x102>
 1b1:   49 0f be 34 14          movsbq (%r12,%rdx,1),%rsi     ; sign-extending byte to quad?
 1b6:   48 89 34 24             mov    %rsi,(%rsp)
 1ba:   48 29 de                sub    %rbx,%rsi              ; d1
 1bd:   48 89 74 24 08          mov    %rsi,0x8(%rsp)
 1c2:   48 29 ee                sub    %rbp,%rsi              ; d2
 1c5:   48 89 74 24 10          mov    %rsi,0x10(%rsp)
 1ca:   4c 29 ee                sub    %r13,%rsi              ; d3
 1cd:   48 89 f5                mov    %rsi,%rbp
 1d0:   49 89 f5                mov    %rsi,%r13
 1d3:   4c 29 f5                sub    %r14,%rbp              ; d4
 1d6:   48 89 eb                mov    %rbp,%rbx
 1d9:   48 f7 db                neg    %rbx
 1dc:   45 85 db                test   %r11d,%r11d            ; odd
 1df:   41 f7 d3                not    %r11d
 1e2:   48 0f 44 dd             cmove  %rbp,%rbx
 1e6:   49 01 d9                add    %rbx,%r9               ; i1
 1e9:   4d 01 ca                add    %r9,%r10               ; i2
 1ec:   4c 01 d7                add    %r10,%rdi              ; i3
 1ef:   41 83 f8 05             cmp    $0x5,%r8d
 1f3:   75 40                   jne    235 <filter3+0xe6>
    ; DECIMATED CONDITIONAL START
 1f5:   8b 5c 24 18             mov    0x18(%rsp),%ebx
 1f9:   44 0f b6 44 24 18       movzbl 0x18(%rsp),%r8d
 1ff:   83 eb 02                sub    $0x2,%ebx              ; bi - 2
 202:   0f b6 db                movzbl %bl,%ebx               ; & m
 205:   42 89 7c 84 28          mov    %edi,0x28(%rsp,%r8,4)  ; b[bi & m] = i3
 20a:   44 8b 44 9c 28          mov    0x28(%rsp,%rbx,4),%r8d ; b[bi-2 & m]
 20f:   41 29 f8                sub    %edi,%r8d              ; x(n-2) - x(n)
 212:   44 89 44 9c 28          mov    %r8d,0x28(%rsp,%rbx,4) ; ‽
 217:   4d 63 c0                movslq %r8d,%r8
 21a:   48 63 d8                movslq %eax,%rbx
 21d:   4c 29 c1                sub    %r8,%rcx
 220:   ff c0                   inc    %eax
 222:   40 88 cd                mov    %cl,%bpl
 225:   44 29 fd                sub    %r15d,%ebp
 228:   49 89 cf                mov    %rcx,%r15
 22b:   4c 89 c1                mov    %r8,%rcx
 22e:   41 88 2c 1c             mov    %bpl,(%r12,%rbx,1)
 232:   45 31 c0                xor    %r8d,%r8d
    ; DECIMATED CONDITIONAL END
 235:   4d 89 ee                mov    %r13,%r14
 238:   41 ff c0                inc    %r8d
 23b:   48 ff c2                inc    %rdx
 23e:   4c 8b 6c 24 10          mov    0x10(%rsp),%r13
 243:   48 8b 6c 24 08          mov    0x8(%rsp),%rbp
 248:   48 8b 1c 24             mov    (%rsp),%rbx
 24c:   e9 52 ff ff ff          jmpq   1a3 <filter3+0x54>
    ; LOOP END
 251:   48 8b 94 24 28 04 00    mov    0x428(%rsp),%rdx
 258:   00 
 259:   64 48 33 14 25 28 00    xor    %fs:0x28,%rdx
 260:   00 00 
 262:   74 05                   je     269 <filter3+0x11a>
 264:   e8 00 00 00 00          callq  269 <filter3+0x11a>
 269:   48 81 c4 38 04 00 00    add    $0x438,%rsp
 270:   5b                      pop    %rbx
 271:   5d                      pop    %rbp
 272:   41 5c                   pop    %r12
 274:   41 5d                   pop    %r13
 276:   41 5e                   pop    %r14
 278:   41 5f                   pop    %r15
 27a:   c3                      retq

So, this has an 8-stage per-sample pipeline (which will be shared with the other ultrasound signals I want to detect), followed by a decimator and a three-stage per-output-sample pipeline. The loop overhead is 7 instructions per sample at the end and 3 instructions per sample at the beginning; the 9-stage per-sample code is 20 instructions; then the decimated code is 18 instructions, averaging 3.6 instructions per loop. All in all this should run in 33.6 instructions per sample, which would be about 17 amd64 clock cycles per sample if the same ratio held as for the previous code whose performance I actually measured. Also that works out to 12 pipeline stages, which means about 3 instructions or 1.5 clock cycles per stage, definitely an improvement over the earlier numbers.

It probably isn’t necessary to use 64-bit math in here, and definitely not for all the variables it’s being used for. But I haven’t done the analysis yet to be sure I understand the Hogenauer overflows.

This suggests that doing the full demodulation this way will require a bit over a Cortex-M4 MIPS.

The differentiators on the front end might not be necessary; they're there to filter out the (commonly much more powerful) low-frequency noise.

Topics