如何对在特定数值点回绕的编码器值或角度进行解绕

在信号处理中,处理回绕量(角度、计数器、取模流)非常常见。本文通过两个简单、独立的示例介绍静态 unwrap() 函数和有状态的 OnlineUnwrapper(用于流式/在线场景)。两者都接受一维输入,并保持样本数量不变。

解绕示例

使用 unwrap() —— 静态(批量)解绕

适用于已有完整序列的场景。 提示:如果数值或噪声需要不同的回绕检测灵敏度,可以设置 threshold 参数。

unwrap_example.py
# 简单示例(独立运行)
import numpy as np
from UliEngineering.SignalProcessing.WrappedValues import unwrap

# 以度为单位的角,在 360 处回绕
wrapped = np.array([350, 355,   1,   3])   # 从 355 跳到 1(回绕)
un = unwrap(wrapped, wrap_value=360)

print(un)  # -> [350. 355. 361. 363.]  (连续递增的角)

使用 OnlineUnwrapper —— 在线/流式解绕

适用于数据逐个样本或分块到达的场景。内部维护状态。

注意:两种方法都保持样本数量不变,且仅支持一维输入。批量处理请使用 unwrap(),处理实时流或标量/分块混合输入请使用 OnlineUnwrapper

online_unwrapper_example.py
# 在线标量和分块用法
import numpy as np
from UliEngineering.SignalProcessing.WrappedValues import OnlineUnwrapper

u = OnlineUnwrapper(wrap_value=100)   # 在 100 处回绕的计数器

# 输入标量
print(u(10))   # -> 10
print(u(95))   # -> 95
print(u(2))    # -> 102  (在线处理正向回绕)

# 或输入一个分块(一维数组)
chunk = np.array([98, 3, 5])  # 从之前的状态继续
print(u(chunk))  # -> array([ 98., 103., 105.])

生成上方图表的代码

解绕示例

plot_wrapped_unwrapped.py
#!/usr/bin/env python3
# SPDX-License-Identifier: CC0-1.0
"""示例:绘制回绕与解绕后的相位信号。

许可证:CC0 1.0 Universal —— 公共领域 dedication(见 examples/LICENSE-CC0-1.0.txt)

作为脚本运行:
    python examples/unwrap_plot.py        # 显示图表
    python examples/unwrap_plot.py --save out.png  # 保存到文件

本脚本创建一个合成的连续相位信号,将其回绕到 [0,2*pi),
然后用批量 `unwrap()` 函数和有状态的 `OnlineUnwrapper` 恢复,
以演示两者结果一致。
"""
import argparse

import numpy as np
import matplotlib.pyplot as plt
plt.style.use("ggplot")

from UliEngineering.SignalProcessing.WrappedValues import unwrap, OnlineUnwrapper

def make_data(n=1000, wrap_value=2 * np.pi, seed=0):
    """创建一个在中间*反向*的连续相位信号。

    前半部分具有正角速度,后半部分具有负角速度,
    从而使相位反向。返回的 `wrapped` 序列位于 [0, wrap_value)。
    """
    np.random.seed(seed)
    t = np.linspace(0.0, 10.0, n)
    mid = n // 2
    # 角速度(弧度每单位时间)
    omega1 = 1.2 * 2 * np.pi
    omega2 = -0.8 * 2 * np.pi

    true_phase = np.empty(n, dtype=float)
    # 前半部分:正斜率 + 小幅振荡
    true_phase[:mid] = omega1 * t[:mid] + 0.8 * np.sin(2 * np.pi * 0.3 * t[:mid])
    # 后半部分:从前半部分最后一个值开始以保持连续,
    # 然后对负角速度进行积分
    start_phase = true_phase[mid - 1]
    true_phase[mid:] = (
        start_phase
        + omega2 * (t[mid:] - t[mid - 1])
        + 0.8 * np.sin(2 * np.pi * 0.3 * t[mid:])
    )

    # 回绕到 [0, wrap_value)
    wrapped = np.mod(true_phase, wrap_value)
    return t, true_phase, wrapped


def plot_example(save_path=None):
    wrap_value = 2 * np.pi
    t, true_phase, wrapped = make_data(n=1200, wrap_value=wrap_value)

    # 批量解绕
    static_unwrapped = unwrap(wrapped, wrap_value=wrap_value)

    # 在线解绕(流式风格)
    u = OnlineUnwrapper(wrap_value=wrap_value)
    online_unwrapped = np.array([u(x) for x in wrapped])

    # 快速合理性检查
    maxdiff = np.max(np.abs(static_unwrapped - online_unwrapped))
    print(f"unwrap() 与 OnlineUnwrapper 之间的最大差异:{maxdiff:.3e}")

    fig, axs = plt.subplots(2, 1, figsize=(10, 6), sharex=True)

    # 在图中标记方向变化点
    mid_idx = len(t) // 2
    axs[0].axvline(t[mid_idx], color="#444444", ls=":", lw=0.8)
    axs[1].axvline(t[mid_idx], color="#444444", ls=":", lw=0.8, label="direction change")

    axs[0].plot(t, wrapped, color="#1f77b4", lw=1)
    axs[0].set_title("Wrapped signal (0 .. 2π)")
    axs[0].set_ylabel("phase (rad)")
    axs[0].grid(True)

    axs[1].plot(t, static_unwrapped, color="#2ca02c", lw=1, label="unwrap()")
    axs[1].plot(t, online_unwrapped, color="#ff7f0e", lw=1, ls="--", label="OnlineUnwrapper")
    axs[1].plot(t, true_phase, color="#7f7f7f", lw=0.8, ls=":", label="true phase")
    axs[1].set_title("Unwrapped signal (continuous phase)")
    axs[1].set_xlabel("time")
    axs[1].set_ylabel("phase (rad)")
    axs[1].legend()
    axs[1].grid(True)

    plt.tight_layout()
    if save_path:
        fig.savefig(save_path, dpi=150)
        print(f"已将图表保存到 {save_path}")
    else:
        plt.show()


if __name__ == "__main__":
    p = argparse.ArgumentParser(description="绘制回绕与解绕后的相位信号。")
    p.add_argument("--save", dest="save", help="将图表保存到此路径而非显示")
    args = p.parse_args()
    plot_example(save_path=args.save)

Check out similar posts by category: UliEngineering, Python