Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d12c16b7 authored by Jakub Pawłowski's avatar Jakub Pawłowski Committed by Automerger Merge Worker
Browse files

Merge "LC3 codec: Add tools, tests, table, README.md" am: ff9eaa6d am: 0c8330f6 am: 6842b295

parents 5bd0a252 6842b295
Loading
Loading
Loading
Loading
+102 −0
Original line number Diff line number Diff line
# Low Complexity Communication Codec (LC3)

The LC3 is an efficient low latency audio codec.

[_Low Complexity Communication Codec_](https://www.bluetooth.org/DocMan/handlers/DownloadDoc.ashx?doc_id=502107&vId=542963)

## Overview

The directory layout is as follows :
- include:      Library interface
- src:          Source files
- tools:        Standalone encoder/decoder tools
- test:         Python implentation, used as reference for unit testing
- build:        Building outputs
- bin:          Compilation output

## How to build

The default toolchain used is GCC. Invoke `make` to build the library.

```sh
$ make -j
```

Compiled library `liblc3.a` will be found in `bin` directory.

#### Cross compilation

The cc, as, ld and ar can be selected with respective Makefile variables `CC`,
`AS`, `LD` and `AR`. The `AS` and `LD` selections are optionnal, and fallback
to `CC` selection when not defined.

The `LIBC` must be set to `bionic` for android cross-compilation. This switch
prevent link with `pthread` and `rt` libraries, that is included in the
bionic libc.

Following example build for android, using NDK toolset.

```sh
$ make -j CC=path_to_android_ndk_prebuilt/toolchain-prefix-clang LIBC=bionic
```

Compiled library will be found in `bin` directory.

## Tools

Tools can be all compiled, while involking `make` as follows :

```sh
$ make tools
```

The standalone encoder `elc3` take a `wave` file as input and encode it
according given parameter. The LC3 binary file format used is the non
standard format described by the reference encoder / decoder tools.
The standalone decoder `dlc3` do the inverse operation.

Refer to `elc3 -h` or `dlc3 -h` for options.

Note that `elc3` output bitstream to standard output when output file is
omitted. On the other side `dlc3` read from standard input when input output
file are omitted.
In such way you can easly test encoding / decoding loop with :

```sh
$ ./elc3 <in.wav> -b <bitrate> | ./dlc3 > <out.wav>
```

Adding Linux `aplay` tools, you will be able to instant hear the result :

```sh
$ ./elc3 <in.wav> -b <bitrate> | ./dlc3 | aplay
```

## Test

A python implementation of the encoder is provided in `test` diretory.
The C implementation is unitary validated against this implementation and
intermediate values given in Appendix C of the specification.

#### Prerequisite

```sh
# apt install python3 python3-dev python3-pip
$ pip3 install scipy numpy
```

#### Running test suite

```sh
$ make test
```


## Conformance

The proposed encoder and decoder implementation have been fully tested and
validated.

For more detail on conformance, refer to [_Bluetooth Conformance
Documents and scripts_](https://www.bluetooth.com/specifications/specs/low-complexity-communication-codec-1-0/)
+118 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np


def print_table(t, m=4):

    for (i, v) in enumerate(t):
        print('{:14.8e},'.format(v), end = '\n' if i%m == m-1 else ' ')

    if len(t) % 4:
        print('')


def mdct_fft_twiddles():

    for n in (10, 20, 30, 40, 60, 80, 90, 120, 160, 180, 240):

        print('\n--- fft bf2 twiddles {:3d} ---'.format(n))

        kv = -2 * np.pi * np.arange(n // 2) / n
        for (i, k) in enumerate(kv):
            print('{{ {:14.7e}, {:14.7e} }},'.format(np.cos(k), np.sin(k)),
                  end = '\n' if i%2 == 1 else ' ')

    for n in (15, 45):

        print('\n--- fft bf3 twiddles {:3d} ---'.format(n))

        kv = -2 * np.pi * np.arange(n) / n
        for k in kv:
            print(('{{ {{ {:14.7e}, {:14.7e} }},' +
                     ' {{ {:14.7e}, {:14.7e} }} }},').format(
                np.cos(k), np.sin(k), np.cos(2*k), np.sin(2*k)))


def mdct_rot_twiddles():

    for n in (120, 160, 240, 320, 360, 480, 640, 720, 960):

        print('\n--- mdct rot twiddles {:3d} ---'.format(n))

        kv = 2 * np.pi * (np.arange(n // 4) + 1/8) / n
        for (i, k) in enumerate(kv):
            print('{{ {:14.7e}, {:14.7e} }},'.format(np.cos(k), np.sin(k)),
                  end = '\n' if i%2 == 1 else ' ')


def mdct_scaling():

    print('\n--- mdct scaling ---')
    ns = np.array([ [ 60, 120, 180, 240, 360], [ 80, 160, 240, 320, 480] ])
    print_table(np.sqrt(2 / ns[0]))
    print_table(np.sqrt(2 / ns[1]))


def tns_lag_window():

    print('\n--- tns lag window ---')
    print_table(np.exp(-0.5 * (0.02 * np.pi * np.arange(9)) ** 2))


def tns_quantization_table():

    print('\n--- tns quantization table ---')
    print_table(np.sin((np.arange(8) + 0.5) * (np.pi / 17)))
    print_table(np.sin((np.arange(8)) * (np.pi / 17)))


def quant_iq_table():

    print('\n--- quantization iq table ---')
    print_table(10 ** (np.arange(65) / 28))


def sns_ge_table():

    g_tilt_table = [ 14, 18, 22, 26, 30 ]

    for (sr, g_tilt) in enumerate(g_tilt_table):
        print('\n--- sns ge table, sr:{} ---'.format(sr))
        print_table(10 ** ((np.arange(64) * g_tilt) / 630))


def inv_table():

    print('\n--- inv table ---')
    print_table(np.append(np.zeros(1), 1 / np.arange(1, 28)))


if __name__ == '__main__':

    mdct_fft_twiddles()
    mdct_rot_twiddles()
    mdct_scaling()

    inv_table()
    sns_ge_table()
    tns_lag_window()
    tns_quantization_table()
    quant_iq_table()

    print('')
+4083 −0

File added.

Preview size limit exceeded, changes collapsed.

+185 −0
Original line number Diff line number Diff line
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numpy as np

import build.lc3 as lc3
import tables as T, appendix_c as C


### ------------------------------------------------------------------------ ###

class AttackDetector:

    def __init__(self, dt, sr):

        self.dt = dt
        self.sr = sr
        self.ms = T.DT_MS[dt]

        self.xn1 = 0
        self.xn2 = 0
        self.en1 = 0
        self.an1 = 0
        self.p_att = 0

    def is_enabled(self, nbytes):

        c1 = self.dt == T.DT_10M and \
             self.sr == T.SRATE_32K and nbytes > 80

        c2 = self.dt == T.DT_10M and \
             self.sr >= T.SRATE_48K and nbytes >= 100

        c3 = self.dt == T.DT_7M5 and \
             self.sr == T.SRATE_32K and nbytes >= 61 and nbytes < 150

        c4 = self.dt == T.DT_7M5 and \
             self.sr >= T.SRATE_48K and nbytes >= 75 and nbytes < 150

        return c1 or c2 or c3 or c4

    def run(self, nbytes, x):

        ### 3.3.6.2 Downsampling and filtering input

        mf = int(16 * self.ms)

        r = len(x) // mf
        x_att = np.array([ np.sum(x[i*r:(i+1)*r]) for i in range(mf) ])

        x_hp = np.empty(mf)
        x_hp[0 ] = 0.375 * x_att[0 ] - 0.5 * self.xn1    + 0.125 * self.xn2
        x_hp[1 ] = 0.375 * x_att[1 ] - 0.5 * x_att[0   ] + 0.125 * self.xn1
        x_hp[2:] = 0.375 * x_att[2:] - 0.5 * x_att[1:-1] + 0.125 * x_att[0:-2]
        self.xn2 = x_att[-2]
        self.xn1 = x_att[-1]

        ### 3.3.6.3 Energy calculation

        nb = int(self.ms / 2.5)

        e_att = np.array([ np.sum(np.square(x_hp[40*i:40*(i+1)]))
                           for i in range(nb) ])

        a_att = np.empty(nb)
        a_att[0] = np.maximum(0.25 * self.an1, self.en1)
        for i in range(1,nb):
            a_att[i] = np.maximum(0.25 * a_att[i-1], e_att[i-1])
        self.en1 = e_att[-1]
        self.an1 = a_att[-1]

        ### 3.3.6.4 Attack Detection

        p_att = -1
        flags = [ (e_att[i] > 8.5 * a_att[i]) for i in range(nb) ]

        for (i, f) in enumerate(flags):
            if f: p_att = i

        f_att = p_att >= 0 or self.p_att - 1 >= nb // 2
        self.p_att = 1 + p_att

        return self.is_enabled(nbytes) and f_att


def initial_state():
    return { 'en1': 0.0, 'an1': 0.0, 'p_att': 0 }

### ------------------------------------------------------------------------ ###

def check_enabling(rng, dt):

    ok = True

    for sr in range(T.SRATE_16K, T.NUM_SRATE):

        attdet = AttackDetector(dt, sr)

        for nbytes in [ 61, 61-1, 75-1, 75, 80, 80+1, 100-1, 100, 150-1, 150 ]:

            f_att = lc3.attdet_run(dt, sr, nbytes,
                initial_state(), 2 * rng.random(T.NS[dt][sr]+6) - 1)

            ok = ok and f_att == attdet.is_enabled(nbytes)

    return ok

def check_unit(rng, dt, sr):

    ns = T.NS[dt][sr]
    ok = True

    attdet = AttackDetector(dt, sr)

    state_c = initial_state()
    x_c = np.zeros(ns+6)

    for run in range(100):

        ### Generate noise, and an attack at random point

        x = (2 * rng.random(ns)) - 1
        x[(ns * rng.random()).astype(int)] *= 100

        ### Check Implementation

        f_att = attdet.run(100, x)

        x_c = np.append(x_c[-6:], x)
        f_att_c = lc3.attdet_run(dt, sr, 100, state_c, x_c)

        ok = ok and f_att_c == f_att
        ok = ok and np.amax(np.abs(1 - state_c['en1']/attdet.en1)) < 1e-6
        ok = ok and np.amax(np.abs(1 - state_c['an1']/attdet.an1)) < 1e-6
        ok = ok and state_c['p_att'] == attdet.p_att

    return ok

def check_appendix_c(dt):

    sr = T.SRATE_48K

    state = initial_state()

    x = np.append(np.zeros(6), C.X_PCM_ATT[dt][0])
    f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x)
    ok = f_att == C.F_ATT[dt][0]

    x = np.append(x[-6:], C.X_PCM_ATT[dt][1])
    f_att = lc3.attdet_run(dt, sr, C.NBYTES_ATT[dt], state, x)
    ok = f_att == C.F_ATT[dt][1]

    return ok

def check():

    rng = np.random.default_rng(1234)
    ok = True

    for dt in range(T.NUM_DT):
        ok and check_enabling(rng, dt)

    for dt in range(T.NUM_DT):
        for sr in range(T.SRATE_32K, T.NUM_SRATE):
            ok = ok and check_unit(rng, dt, sr)

    for dt in range(T.NUM_DT):
        ok = ok and check_appendix_c(dt)

    return ok

### ------------------------------------------------------------------------ ###
+62 −0
Original line number Diff line number Diff line
/******************************************************************************
 *
 *  Copyright 2021 Google, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at:
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 ******************************************************************************/

#include <Python.h>
#include <numpy/ndarrayobject.h>

#include <attdet.c>
#include "ctypes.h"

static PyObject *attdet_run_py(PyObject *m, PyObject *args)
{
    unsigned dt, sr, nbytes;
    PyObject *attdet_obj, *x_obj;
    struct lc3_attdet_analysis attdet = { 0 };
    float *x;

    if (!PyArg_ParseTuple(args, "IIIOO",
                &dt, &sr, &nbytes, &attdet_obj, &x_obj))
        return NULL;

    CTYPES_CHECK("dt", (unsigned)dt < LC3_NUM_DT);
    CTYPES_CHECK("sr", (unsigned)sr < LC3_NUM_SRATE);
    CTYPES_CHECK(NULL, attdet_obj = to_attdet_analysis(attdet_obj, &attdet));

    int ns = LC3_NS(dt, sr);

    CTYPES_CHECK("x", x_obj = to_1d_ptr(x_obj, NPY_FLOAT, ns+6, &x));

    int att = lc3_attdet_run(dt, sr, nbytes, &attdet, x+6);

    from_attdet_analysis(attdet_obj, &attdet);
    return Py_BuildValue("i", att);
}

static PyMethodDef methods[] = {
    { "attdet_run", attdet_run_py, METH_VARARGS },
    { NULL },
};

PyMODINIT_FUNC lc3_attdet_py_init(PyObject *m)
{
    import_array();

    PyModule_AddFunctions(m, methods);

    return m;
}
Loading