This is the article you need when your team lead says "we need to automate the Bruker." You have a Bruker FTIR spectrometer - an Alpha II, a Vertex, a Tensor - running OPUS on a Windows workstation. Right now someone walks over, presses buttons, waits, exports a file, and emails it. You need to replace that with code.
This tutorial walks through the entire automation chain: reading existing OPUS spectral files, understanding their internal structure, batch processing a directory of spectra, controlling OPUS programmatically via DDE, building an automated measurement loop, and preprocessing spectra for downstream analysis. Every code example runs. Every step builds on the previous one.
For the conceptual background on all five OPUS programmatic interfaces (DDE, Named Pipes, HTTP, OPC, COM/ActiveX), see our complete guide to Bruker OPUS interfaces. This tutorial focuses on the most practical path: Python + DDE.
Prerequisites
You need the following:
- Windows 10 or 11 - OPUS is Windows-only, and DDE is a Windows IPC protocol
- OPUS installed and licensed (v8.0 or later; v9.3 is current as of this writing)
- Python 3.10+ - we recommend 3.11 or 3.12 for best compatibility
- A Bruker FTIR spectrometer connected and operational - Alpha II, Vertex, Tensor, or Invenio
Install the Python dependencies:
pip install brukeropus matplotlib numpy pandas scipyThe brukeropus library handles both OPUS file reading and DDE instrument control. matplotlib is for visualization. numpy, pandas, and scipy are for data manipulation and preprocessing.
A note on architecture: your Python code and OPUS must run on the same Windows PC. DDE is a local IPC protocol. If you need remote control, see the HTTP Server section in our OPUS interfaces guide.
Step 1: Reading OPUS Spectral Files
Before you automate the instrument, learn to read its output. OPUS stores spectra in proprietary binary files with numeric extensions - .0, .1, .2, and so on. These files are not human-readable and are not documented by Bruker. The brukeropus library reverse-engineers the format.
from brukeropus import read_opus
import matplotlib.pyplot as plt
import numpy as np
# Read an OPUS spectral file
opus_file = read_opus('C:/OPUS_Data/sample.0')
# The file contains multiple data blocks
# 'a' = absorbance, 't' = transmittance, etc.
print("Available data blocks:", opus_file.data_keys)
# Extract absorbance spectrum
wavenumbers = opus_file.a.x # numpy array, typically 4000-400 cm^-1
absorbance = opus_file.a.y # numpy array, absorbance units
# Plot it
plt.figure(figsize=(12, 5))
plt.plot(wavenumbers, absorbance, color='#ff2d55', linewidth=0.8)
plt.xlabel('Wavenumber (cm$^{-1}$)')
plt.ylabel('Absorbance')
plt.title('FTIR Absorbance Spectrum')
plt.gca().invert_xaxis() # FTIR convention: high wavenumber on the left
plt.tight_layout()
plt.savefig('spectrum.png', dpi=150)
plt.show()The invert_xaxis() call is not cosmetic - FTIR spectra are conventionally plotted with high wavenumber (4000 cm^-1) on the left and low wavenumber (400 cm^-1) on the right. If your plots look "backwards" compared to what you see in OPUS, this is why.
Step 2: Understanding OPUS Data Blocks
An OPUS file is not a single spectrum. It is a container holding multiple related data blocks, each representing a different view of the same measurement. Understanding these blocks is essential for extracting the right data.
from brukeropus import read_opus
opus_file = read_opus('C:/OPUS_Data/sample.0')
# List all data blocks
print("Data keys:", opus_file.data_keys)
# Typical output: ['a', 't', 'r', 'igsm', 'igrf']
# What each block contains:
# 'a' - Absorbance spectrum (most common for analysis)
# 't' - Transmittance spectrum (= 10^(-absorbance))
# 'r' - Reflectance spectrum
# 'igsm' - Sample interferogram (raw detector signal)
# 'igrf' - Reference (background) interferogram
# 'phsm' - Sample phase spectrum
# 'phrf' - Reference phase spectrumThe absorbance block (a) is what you almost always want for analytical work. It is computed from the ratio of the sample and reference interferograms, with Fourier transform and phase correction applied by OPUS during acquisition.
You can also access measurement parameters and metadata:
# Access file-level parameters
params = opus_file.params
print(f"Instrument: {params.get('INS', 'Unknown')}")
print(f"Source: {params.get('SRC', 'Unknown')}")
print(f"Detector: {params.get('DTC', 'Unknown')}")
print(f"Resolution: {params.get('RES', 'Unknown')} cm-1")
print(f"Scans: {params.get('NSS', 'Unknown')}")
# Spectral range from the absorbance block
print(f"Wavenumber range: {opus_file.a.x[0]:.1f} - {opus_file.a.x[-1]:.1f} cm-1")
print(f"Data points: {len(opus_file.a.x)}")Understanding the block structure matters when you are debugging. If opus_file.a raises an error, the file might only contain transmittance data (t). If the absorbance looks wrong, check the reference interferogram (igrf) - a stale or contaminated background measurement is the most common cause of bad spectra.
Step 3: Batch Processing Spectral Files
A single spectrum is a demo. Real work means processing hundreds or thousands of files. Here is how to iterate over a directory of OPUS files, extract the absorbance spectra, and produce a structured dataset.
from brukeropus import read_opus
import numpy as np
import pandas as pd
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_opus_directory(data_dir: str, output_csv: str) -> pd.DataFrame:
"""
Read all OPUS files in a directory, extract absorbance spectra,
and produce a CSV with one row per spectrum.
"""
data_dir = Path(data_dir)
# OPUS files have numeric extensions: .0, .1, .2, ...
opus_files = sorted([
f for f in data_dir.iterdir()
if f.suffix and f.suffix[1:].isdigit()
])
logger.info(f"Found {len(opus_files)} OPUS files in {data_dir}")
rows = []
reference_wavenumbers = None
for filepath in opus_files:
try:
opus_file = read_opus(str(filepath))
if 'a' not in opus_file.data_keys:
logger.warning(f"No absorbance data in {filepath.name}, skipping")
continue
wavenumbers = opus_file.a.x
absorbance = opus_file.a.y
# Verify consistent wavenumber axes across files
if reference_wavenumbers is None:
reference_wavenumbers = wavenumbers
elif len(wavenumbers) != len(reference_wavenumbers):
logger.warning(
f"Wavenumber mismatch in {filepath.name}: "
f"expected {len(reference_wavenumbers)} points, "
f"got {len(wavenumbers)}. Skipping."
)
continue
row = {'filename': filepath.name}
for i, wn in enumerate(wavenumbers):
row[f'{wn:.1f}'] = absorbance[i]
rows.append(row)
logger.info(f"Processed {filepath.name}: {len(wavenumbers)} points")
except Exception as e:
logger.error(f"Failed to read {filepath.name}: {e}")
continue
df = pd.DataFrame(rows)
df.to_csv(output_csv, index=False)
logger.info(f"Saved {len(df)} spectra to {output_csv}")
return df
# Usage
df = process_opus_directory(
data_dir='C:/OPUS_Data/experiment_2026_05/',
output_csv='C:/OPUS_Data/spectra_export.csv'
)
print(f"Dataset shape: {df.shape}")
# Example output: Dataset shape: (150, 1869)
# 150 spectra, each with 1868 wavenumber columns + 1 filename columnThis produces a CSV where each row is one spectrum and each column (after the filename) is a wavenumber. This is the standard "wide format" for spectral datasets and is directly consumable by scikit-learn, TensorFlow, or any ML framework. For guidance on building the end-to-end pipeline from raw spectra to model input, see our article on building a spectroscopy data pipeline.
For very large datasets (thousands of files), consider writing to a NumPy .npy file or HDF5 instead of CSV:
# Save as numpy arrays for faster loading
np.save('wavenumbers.npy', reference_wavenumbers)
np.save('absorbance_matrix.npy', df.iloc[:, 1:].values.astype(np.float32))Step 4: Controlling OPUS via DDE
Now we move from reading files to controlling the instrument. The DDE interface lets you send commands to a running OPUS instance: trigger measurements, change parameters, and manage the experiment workflow.
from brukeropus import opus_dde
import time
# Connect to the running OPUS instance
# OPUS must already be open on this machine
opus = opus_dde.OPUSDde()
print("Connected to OPUS via DDE")
# Trigger a background (reference) measurement
# This measures the empty ATR crystal or transmission cell
print("Measuring background...")
opus.measure_ref()
print("Background measurement complete")
# Trigger a sample measurement
# Uses whatever experiment is currently loaded in OPUS
print("Measuring sample...")
opus.measure_sample()
print("Sample measurement complete")
# You can also send raw OPUS command strings
# for operations not wrapped by the library
opus.raw_command('SaveAs', {
'PATH': r'C:\OPUS_Data\automated_measurement.0',
'FMT': '0' # OPUS binary format
})Important Timing Considerations
DDE commands to OPUS are synchronous by default - measure_sample() blocks until the measurement completes. However, the actual measurement duration depends on your experiment settings (number of scans, resolution, scan velocity). A typical clinical measurement (32 scans at 4 cm^-1 resolution) takes 15-30 seconds.
import time
# Check how long measurements actually take
start = time.perf_counter()
opus.measure_sample()
elapsed = time.perf_counter() - start
print(f"Measurement took {elapsed:.1f} seconds")If a measurement takes longer than expected, do not assume it failed. OPUS may be waiting for instrument stabilization, running internal diagnostics, or performing automatic atmosphere compensation. The DDE call will return when OPUS is done, not before.
Step 5: Building a Measurement Automation Loop
Here is the complete automation pattern: trigger a measurement, wait for completion, read the result, and log it. This is the core of any automated FTIR workflow.
from brukeropus import opus_dde, read_opus
from pathlib import Path
from datetime import datetime
import time
import json
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)
class FTIRAutomation:
"""Automated FTIR measurement workflow using OPUS DDE."""
def __init__(self, data_dir: str, background_interval_minutes: int = 30):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.background_interval = background_interval_minutes * 60
self.last_background_time = 0
self.opus = opus_dde.OPUSDde()
logger.info("Connected to OPUS via DDE")
def _needs_background(self) -> bool:
"""Check if the background measurement is stale."""
elapsed = time.time() - self.last_background_time
return elapsed > self.background_interval
def measure_background(self):
"""Take a fresh background measurement."""
logger.info("Taking background measurement...")
start = time.perf_counter()
self.opus.measure_ref()
elapsed = time.perf_counter() - start
self.last_background_time = time.time()
logger.info(f"Background complete ({elapsed:.1f}s)")
def measure_sample(self, sample_id: str) -> dict:
"""
Acquire a sample spectrum with full error handling.
Returns a dict with metadata and file path.
"""
# Check if we need a fresh background
if self._needs_background():
self.measure_background()
# Generate a unique filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{sample_id}_{timestamp}.0"
filepath = self.data_dir / filename
logger.info(f"Measuring sample '{sample_id}'...")
start = time.perf_counter()
try:
# Trigger the measurement
self.opus.measure_sample()
elapsed = time.perf_counter() - start
# Save the result
self.opus.raw_command('SaveAs', {
'PATH': str(filepath),
'FMT': '0'
})
logger.info(
f"Measurement complete: {filename} ({elapsed:.1f}s)"
)
# Read back and validate the spectrum
opus_file = read_opus(str(filepath))
if 'a' not in opus_file.data_keys:
raise ValueError("No absorbance data in measurement result")
wavenumbers = opus_file.a.x
absorbance = opus_file.a.y
# Basic quality checks
max_abs = float(absorbance.max())
min_abs = float(absorbance.min())
if max_abs > 3.0:
logger.warning(
f"High absorbance ({max_abs:.2f}) - sample may be too thick"
)
if max_abs < 0.01:
logger.warning(
f"Very low absorbance ({max_abs:.4f}) - "
"sample may not be in contact with ATR crystal"
)
result = {
'sample_id': sample_id,
'filename': filename,
'filepath': str(filepath),
'timestamp': timestamp,
'measurement_time_s': round(elapsed, 1),
'num_points': len(wavenumbers),
'wavenumber_range': [
float(wavenumbers[0]),
float(wavenumbers[-1])
],
'max_absorbance': max_abs,
'min_absorbance': min_abs,
'status': 'success'
}
except Exception as e:
logger.error(f"Measurement failed for '{sample_id}': {e}")
result = {
'sample_id': sample_id,
'timestamp': timestamp,
'status': 'error',
'error': str(e)
}
return result
def run_batch(self, sample_ids: list[str]) -> list[dict]:
"""Run a batch of measurements sequentially."""
results = []
for i, sample_id in enumerate(sample_ids, 1):
logger.info(f"--- Sample {i}/{len(sample_ids)} ---")
result = self.measure_sample(sample_id)
results.append(result)
# Log to a JSON file after each measurement
# so we don't lose data if the batch is interrupted
log_path = self.data_dir / 'measurement_log.json'
with open(log_path, 'w') as f:
json.dump(results, f, indent=2)
return results
# Usage
automation = FTIRAutomation(
data_dir='C:/OPUS_Data/clinical_run_001/',
background_interval_minutes=30
)
# Measure a batch of samples
sample_ids = ['SAMPLE_001', 'SAMPLE_002', 'SAMPLE_003']
results = automation.run_batch(sample_ids)
# Print summary
for r in results:
status = r['status']
sample = r['sample_id']
if status == 'success':
print(f" {sample}: OK ({r['num_points']} pts, "
f"max abs {r['max_absorbance']:.3f})")
else:
print(f" {sample}: FAILED - {r['error']}")This class handles the three problems that trip up every first-time FTIR automation project:
- Background staleness - the background measurement drifts over time due to atmospheric CO2 and H2O
- Quality validation - catching obviously bad spectra before they enter the analysis pipeline
- Crash resilience - logging results after each measurement so a failure does not lose the entire batch
Step 6: Spectral Preprocessing
Raw FTIR spectra straight from the instrument are rarely suitable for direct analysis or machine learning. They contain baseline drift, varying intensity scales, and high-frequency noise. Preprocessing corrects these artifacts.
Baseline Correction
Baseline drift is the most common artifact in FTIR spectra. The spectrum floats upward or curves due to scattering effects, ATR crystal contamination, or instrument drift. A simple rubber-band correction anchors the baseline to zero:
import numpy as np
from scipy.signal import savgol_filter
from scipy.spatial import ConvexHull
def rubberband_baseline(wavenumbers: np.ndarray,
absorbance: np.ndarray) -> np.ndarray:
"""
Rubber-band baseline correction.
Fits a convex hull to the bottom of the spectrum
and subtracts it.
"""
# Create points array for convex hull
points = np.column_stack([wavenumbers, absorbance])
# Find the convex hull (lower boundary)
hull = ConvexHull(points)
# Extract the lower hull vertices (sorted by wavenumber)
hull_vertices = sorted(set(hull.vertices))
hull_x = wavenumbers[hull_vertices]
hull_y = absorbance[hull_vertices]
# Interpolate the baseline across all wavenumbers
baseline = np.interp(wavenumbers, hull_x, hull_y)
# Subtract baseline
corrected = absorbance - baseline
return correctedNormalization
Normalization puts all spectra on the same intensity scale so that differences in sample thickness, contact pressure (for ATR), or instrument gain do not dominate the analysis:
def normalize_spectrum(absorbance: np.ndarray,
method: str = 'vector') -> np.ndarray:
"""
Normalize a spectrum.
Methods:
'vector' - L2 norm (unit vector normalization)
'minmax' - Scale to [0, 1]
'snv' - Standard Normal Variate
"""
if method == 'vector':
norm = np.linalg.norm(absorbance)
if norm == 0:
return absorbance
return absorbance / norm
elif method == 'minmax':
mn, mx = absorbance.min(), absorbance.max()
if mx == mn:
return np.zeros_like(absorbance)
return (absorbance - mn) / (mx - mn)
elif method == 'snv':
# Standard Normal Variate: subtract mean, divide by std
mean = absorbance.mean()
std = absorbance.std()
if std == 0:
return np.zeros_like(absorbance)
return (absorbance - mean) / std
else:
raise ValueError(f"Unknown normalization method: {method}")Smoothing
Savitzky-Golay filtering removes high-frequency noise while preserving peak shapes. It is the standard smoothing method for spectroscopic data:
from scipy.signal import savgol_filter
def smooth_spectrum(absorbance: np.ndarray,
window_length: int = 11,
polyorder: int = 3) -> np.ndarray:
"""
Savitzky-Golay smoothing.
window_length must be odd. Higher = more smoothing.
polyorder must be less than window_length.
"""
return savgol_filter(absorbance, window_length, polyorder)Complete Preprocessing Pipeline
Combine all three steps into a reusable pipeline:
def preprocess_spectrum(wavenumbers: np.ndarray,
absorbance: np.ndarray,
wavenumber_range: tuple = (1800, 900),
smooth_window: int = 11,
normalize_method: str = 'snv') -> tuple:
"""
Full preprocessing pipeline for FTIR spectra.
Steps:
1. Crop to fingerprint region
2. Baseline correction
3. Smoothing
4. Normalization
Returns (cropped_wavenumbers, preprocessed_absorbance)
"""
# 1. Crop to region of interest
# The fingerprint region (1800-900 cm^-1) contains the most
# diagnostically useful information for biological samples
mask = (wavenumbers >= wavenumber_range[1]) & \
(wavenumbers <= wavenumber_range[0])
wn = wavenumbers[mask]
ab = absorbance[mask]
# 2. Baseline correction
ab = rubberband_baseline(wn, ab)
# 3. Smoothing
ab = smooth_spectrum(ab, window_length=smooth_window)
# 4. Normalization
ab = normalize_spectrum(ab, method=normalize_method)
return wn, ab
# Usage with a real file
from brukeropus import read_opus
import matplotlib.pyplot as plt
opus_file = read_opus('C:/OPUS_Data/sample.0')
wn_raw = opus_file.a.x
ab_raw = opus_file.a.y
wn_processed, ab_processed = preprocess_spectrum(wn_raw, ab_raw)
# Plot before and after
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
axes[0].plot(wn_raw, ab_raw, color='#ff2d55', linewidth=0.8)
axes[0].set_title('Raw Spectrum')
axes[0].set_ylabel('Absorbance')
axes[0].invert_xaxis()
axes[1].plot(wn_processed, ab_processed, color='#00d4ff', linewidth=0.8)
axes[1].set_title('Preprocessed (baseline corrected, smoothed, SNV normalized)')
axes[1].set_xlabel('Wavenumber (cm$^{-1}$)')
axes[1].set_ylabel('Normalized Absorbance')
axes[1].invert_xaxis()
plt.tight_layout()
plt.savefig('preprocessing_comparison.png', dpi=150)
plt.show()The fingerprint region (1800-900 cm^-1) is where most biological and pharmaceutical FTIR analysis happens. Peaks in this region correspond to C=O stretches, amide bands, C-O-C vibrations, and phosphodiester groups - the molecular signatures that differentiate healthy tissue from diseased tissue, genuine drugs from counterfeits, or compliant polymers from contaminated batches.
Common Pitfalls
After running FTIR automation in production environments, here are the problems you will hit and how to handle them.
DDE Connection Failures
Symptom: OPUSDde() raises an exception or hangs.
Cause: OPUS is not running, or another DDE client is already connected.
Fix: Check that OPUS is running and that no other automation script or macro has an active DDE connection. Only one DDE client can connect at a time.
import subprocess
def ensure_opus_running():
"""Check if OPUS is running, start it if not."""
result = subprocess.run(
['tasklist', '/FI', 'IMAGENAME eq opus.exe'],
capture_output=True, text=True
)
if 'opus.exe' not in result.stdout.lower():
print("OPUS is not running. Starting OPUS...")
subprocess.Popen(
r'C:\Program Files\Bruker\OPUS\opus.exe',
shell=True
)
# OPUS takes 10-20 seconds to fully initialize
import time
time.sleep(20)
print("OPUS started. Attempting DDE connection...")DDE Timeouts
Symptom: A DDE command hangs indefinitely or times out.
Cause: OPUS is waiting for user input (a dialog box is open), the instrument is in an error state, or OPUS crashed silently.
Fix: Check the OPUS window for dialog boxes. Implement a timeout wrapper:
import threading
def dde_with_timeout(opus, command, timeout_seconds=120):
"""Run a DDE command with a timeout."""
result = [None]
error = [None]
def target():
try:
result[0] = getattr(opus, command)()
except Exception as e:
error[0] = e
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
raise TimeoutError(
f"OPUS DDE command '{command}' timed out after "
f"{timeout_seconds}s. Check OPUS for dialog boxes."
)
if error[0]:
raise error[0]
return result[0]
# Usage
dde_with_timeout(opus, 'measure_sample', timeout_seconds=90)File Locking
Symptom: read_opus() fails or returns corrupted data when reading a file that OPUS just created.
Cause: OPUS may still have a write lock on the file. There is a brief window after a measurement completes where the file is being written to disk.
Fix: Add a small delay and retry:
import time
from brukeropus import read_opus
def read_opus_with_retry(filepath: str,
max_retries: int = 5,
delay_seconds: float = 1.0):
"""Read an OPUS file with retry logic for file locking."""
for attempt in range(max_retries):
try:
return read_opus(filepath)
except (PermissionError, OSError) as e:
if attempt < max_retries - 1:
time.sleep(delay_seconds)
else:
raise RuntimeError(
f"Could not read {filepath} after "
f"{max_retries} attempts: {e}"
)Atmospheric Interference
Symptom: Strong, sharp peaks around 2350 cm^-1 (CO2) and broad bands around 1630 cm^-1 and 3400 cm^-1 (H2O) appear in your spectra.
Cause: The background measurement was taken under different atmospheric conditions than the sample measurement. CO2 and water vapor concentrations in the lab air changed between measurements.
Fix:
- Take backgrounds more frequently - in a clinical setting, every 15-30 minutes is typical. The
FTIRAutomationclass above handles this withbackground_interval_minutes. - Ensure the instrument's desiccant (if equipped) is fresh.
- Consider purging with dry nitrogen for critical measurements.
From Automation to Clinical Workflow
This tutorial automates the instrument. That is the foundation. But a clinical deployment needs several additional layers.
Patient context. Which patient does this spectrum belong to? Barcode scanning, MRN lookup, worklist integration.
Classification. The raw spectrum goes into a trained ML model - SVM, random forest, or CNN - that returns a diagnostic classification with confidence scores. See Building AI Pipelines for Spectral Classification for the full pipeline.
Result delivery. The classification reaches the clinician on screen and the hospital EHR via HL7v2 ORU^R01 message. It cannot live in a CSV file on the instrument PC.
Audit trail. Every measurement, every classification, every result delivery must be logged immutably for regulatory compliance.
Uptime. The system must handle instrument errors, network failures, and OPUS crashes gracefully - not with a stack trace on the operator's screen.
These are the problems we solve at SpectraDx. The platform wraps the entire stack - from DDE instrument control through spectral preprocessing, ML classification, and HL7 output - into a single clinical-grade application. The automation patterns in this tutorial are the building blocks. The clinical workflow architecture article shows how they fit into a production system.
What Comes Next
You now have working code for the full Python-OPUS automation chain. From here:
- Build a spectral dataset. Use the batch processing code (Step 3) to process your existing OPUS files into a structured dataset.
- Train a classifier. Feed the preprocessed spectra into scikit-learn or TensorFlow. Start with PCA + SVM - it is surprisingly effective for spectral classification.
- Close the loop. Integrate the classifier into the measurement automation loop (Step 5) so that every acquired spectrum is immediately classified.
- Read the architecture guide. Our article on building clinical workflow software for spectroscopy-based diagnostics covers the full system design for taking this from a lab script to a deployed clinical tool.
SpectraDx is clinical workflow software for spectroscopy-based diagnostics. We handle the integration layer between your spectrometer and your clinician. Learn more or get in touch.

