Accesso riservato

Libreria Digital Signal Processing

Autore/Autrice

Paolo Rossi, 235651

Data di Pubblicazione

30 gennaio 2026

# This file contains some main function related to the digital signal processing

# --- Signal operations ---

# Scalar product
`%ps%` <- function(a, b) mean(a*b)

# Norm of a vector
norm_ps <- function(a) sqrt(a %ps% a)

# Scalar normalized product between n decimated vectors: i.e. cosine similarity
psDnN <- function(a, b, n = 1) {
  stopifnot(n >= 1, n %% 1 == 0)
  
  idx <-  seq(1, min(length(a), length(b)), by = n)
  
  a_d <- a[idx]
  b_d <- b[idx]
  
  mean(a_d * b_d) / (norm_ps(a_d)*norm_ps(b_d))
}

# --- Fourier Transform ---

# Harmonic components
cos_k <- function(t, f, k=1) cos(2*pi*f*k*t)
sin_k <- function(t, f, k=1) sin(2*pi*f*k*t)

compute_fft <- function(df, Ta, name_sig, monolateral = TRUE) {
  
  # Compute the FFT of a signal
  # df: data frame
  # Ta: acquisition time
  # name_sig: name of the signal contained in `df`
  
  N <- nrow(df)
  
  df <- df %>% mutate(
    n = row_number(),
    f = n/Ta,
    fft = fft({{ name_sig }}),
    mod = ifelse(monolateral, (2-(f==0)), 1) * Mod(fft) / length(n),
    phase = Arg(fft)/pi*180
  )
  
  if(monolateral)
    return(df %>% slice_head(n = floor(N/2)))
  
  df
}

# Upgraded version of `compute_fft`
spectrum_fft <- function(df, fs = NULL, name_sig, monolateral = TRUE, DC = TRUE) {
  
  # Compute the spectrum of a signal using FFT
  # df: signal data frame, it's required the time `t` vector!
  # fs: sampling frequency, it's required a uniform sampling!
  # name_sig: name of the signal contained in `df`. Must be a string!
  # monolateral: defines whether monolateral or bilateral spectrum
  # DC: defines whether consider or not the DC component
  
  # Note aboute the scale factor of the module:
  # To preserve the energy content, when the spectrum is monolateral
  # it's needed to double the module of the trasform, excluding
  # DC component and component at Nyquist frequency
  
  if (!is.character(name_sig) || length(name_sig) != 1 || nchar(name_sig) == 0)
    stop("`name_sig` must be a non-empty string of length 1.")
  
  if (!name_sig %in% names(df))
    stop(sprintf("Column '%s' not found in data frame.", name_sig))
  
  
  N <- nrow(df)
  
  if (is.null(fs)) {
    dt <- diff(df$t)
    dt_mean <- mean(dt)
    
    if (sd(dt) / dt_mean > 1e-6) {
      stop("Non-uniform sampling detected.")
    }
    
    fs <- 1 / dt_mean
  }
  
  out <- df %>% mutate(
    k = 0:(N-1),
    f = k*fs/N,
    fft = fft(df[[name_sig]]),
    mod = Mod(fft) / N * if (monolateral) {           # scale factor
      ifelse(k == 0 | (N %% 2 == 0 & k == N/2), 1, 2)
    } else {
      1
    },
    phase = Arg(fft)/pi*180
  )
  if(!DC)
    out <- out[-1,]
  
  if(monolateral)
    return(out %>% slice_head(n = floor(N/2) + 1))
  
  return(out)
}

dyn_compensation <- function(df, tf, name_sig = "y", padding = TRUE, alpha = 2) {
  
  # This function provides the dynamic compensation of an
  # LTI proper system
  
  # df: contains the signal(t, `name_sig`)
  # alpha: scale factor of padding
  # tf: transfer function, it must be a class `tf`
  
  if (!is.character(name_sig) || length(name_sig) != 1 || nchar(name_sig) == 0)
    stop("`name_sig` must be a non-empty string of length 1.")
  
  if (!name_sig %in% names(df))
    stop(sprintf("Column '%s' not found in data frame.", name_sig))
  
  # Sampling parameters
  dt <- diff(df$t)
  Ts <- mean(dt)            # sampling period
  
  if (sd(dt) / Ts > 1e-6) 
    stop("Non-uniform sampling detected.")
  
  fs <- 1 / Ts              # sampling frequency
  
  y <- df[[name_sig]]
  
  if(padding) {
    N <- nrow(df)
    M <- alpha*N            # padding length
    
    pad_left  <- floor((M - N)/2)
    pad_right <- ceiling((M - N)/2)
    t_start <- df$t[1] - pad_left * Ts
    
    out <- tibble(
      t = seq(from = t_start, by = Ts, length.out = M),
      y = c(
        rep(0, pad_left),
        y,
        rep(0, pad_right)
      )
    )
    
    stopifnot(isTRUE(all.equal(           # checks whether the signal is centered after padding
      out$y[(pad_left+1):(pad_left+N)],
      y,
      tolerance = 1e-10
      )))
  }
  
  freq <- tibble(
    k = 0:(M-1),
    f = ifelse(k <= M/2, k, k-M) * fs/M,   # FFT is symmetric respect to the frequency!
    w = 2*pi*f
  )
  
  Hiw <- freqresp(H, freq$w)
  
  out <- out %>% mutate(
    Y = fft(y),
    U = Y/Hiw,
    u = Re(fft(U, inverse = TRUE)) / M      # Re() to avoid numerical residuals in Im() component
                                            # fft() is not normalized, so it's divided by `M`
  )
  
  return(out[ (pad_left+1):(pad_left+N), ])
}

# --- Filters ---

gaussian_kernel <- function(
    x, sigma, radius = ceiling(3 * sigma), pad = c("none", "replicate", "reflect")
    ) {
  
  # Gaussian mask: law-pass filter, FIR and linear
  # sigma: number of samples to consider in the gauss. kernel
  
  stopifnot(is.numeric(x), length(x) > 0, is.numeric(sigma), sigma > 0)
  pad <- match.arg(pad)
  
  # Kernels calculation
  k <- seq(-radius, radius)
  h <- exp(-(k^2) / (2 * sigma^2))
  h <- h / sum(h)  
  
  if (pad == "none") {
    y <- stats::filter(x, h, sides = 2) # convolution
    return(as.numeric(y))
  }
  
  # Padding of signal tails
  if (pad == "replicate") {
    x_pad <- c(rep(x[1], radius), x, rep(x[length(x)], radius))
  } else { # reflect
    
    left  <- rev(x[2:(radius + 1)])
    right <- rev(x[(length(x) - radius):(length(x) - 1)])
    x_pad <- c(left, x, right)
  }
  
  # Convolution and truncation of the excess tails
  y_pad <- stats::filter(x_pad, h, sides = 2)
  y <- y_pad[(radius + 1):(radius + length(x))]
  
  as.numeric(y)
}