generated from SalesforceAIResearch/oss-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcpu_processors.py
More file actions
119 lines (92 loc) · 5.95 KB
/
cpu_processors.py
File metadata and controls
119 lines (92 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import numpy as np
import time
from multiprocessing import Pool, cpu_count
from typing import List
# Import the PURE calculation functions from the utility file
from data_processing_utils import calculate_valid_mask, calculate_scene_flow, calculate_wrist_scene_flow
# --- Batch Processing Functions for Multiprocessing ---
def process_mask_batch(args):
"""Processes a batch of masks. This function performs NO file I/O."""
flows_batch, depths1_batch, depths2_batch = args
masks = [calculate_valid_mask(flow, d1, d2) for flow, d1, d2 in zip(flows_batch, depths1_batch, depths2_batch)]
return masks
def process_scene_flow_batch(args):
"""Processes a batch of scene flows. This function performs NO file I/O."""
flows_batch, masks_batch, depths1_batch, depths2_batch, intrinsic_params = args
scene_flows = [calculate_scene_flow(mask, intrinsic_params, flow, d1, d2) for flow, mask, d1, d2 in zip(flows_batch, masks_batch, depths1_batch, depths2_batch)]
return scene_flows
def process_wrist_scene_flow_batch(args):
"""Processes a batch of wrist scene flows. This function performs NO file I/O."""
flows_batch, masks_batch, depths1_batch, depths2_batch, intrinsic_params, wrist_extrinsics, reference_extrinsics = args
scene_flows = [calculate_wrist_scene_flow(mask, intrinsic_params, flow, d1, d2, wrist_ext, ref_ext)
for flow, mask, d1, d2, wrist_ext, ref_ext in zip(flows_batch, masks_batch, depths1_batch, depths2_batch, wrist_extrinsics, reference_extrinsics)]
return scene_flows
# --- Main CPU Processor Class ---
class CPUProcessor:
"""Manages CPU multiprocessing for compute-intensive tasks like mask and scene flow generation."""
def __init__(self, num_workers: int = None, physical_gpu_id: int = None):
self.num_workers = num_workers or min(cpu_count(), 32)
self.physical_gpu_id = physical_gpu_id
gpu_info = f" (Physical GPU {physical_gpu_id})" if physical_gpu_id is not None else ""
print(f"🔧 CPU Processor initialized with {self.num_workers} workers{gpu_info}")
def _execute_parallel(self, task_function, batches):
"""Helper function to run a given task in parallel using the worker pool."""
if not batches:
return []
with Pool(self.num_workers) as pool:
batch_results = pool.map(task_function, batches)
# Flatten the list of lists into a single list
all_results = [item for sublist in batch_results for item in sublist]
return all_results
def process_masks_parallel(self, flows: List[np.ndarray], depths1: List[np.ndarray], depths2: List[np.ndarray]) -> List[np.ndarray]:
"""Calculates all valid masks in parallel without performing any I/O."""
n_pairs = len(flows)
if n_pairs == 0: return []
batch_size = max(1, n_pairs // self.num_workers)
batches = [
(flows[i:i + batch_size], depths1[i:i + batch_size], depths2[i:i + batch_size])
for i in range(0, n_pairs, batch_size)
]
start_time = time.time()
all_masks = self._execute_parallel(process_mask_batch, batches)
processing_time = time.time() - start_time
gpu_info = f" [GPU {self.physical_gpu_id}]" if self.physical_gpu_id is not None else ""
print(f"✅ Mask processing completed in {processing_time:.3f}s ({n_pairs/processing_time:.1f} masks/second){gpu_info}")
return all_masks
def process_scene_flows_parallel(self, flows: List[np.ndarray], masks: List[np.ndarray],
depths1: List[np.ndarray], depths2: List[np.ndarray],
intrinsic_params: List[float]) -> List[np.ndarray]:
"""Calculates all scene flows in parallel without performing any I/O."""
n_pairs = len(flows)
if n_pairs == 0: return []
batch_size = max(1, n_pairs // self.num_workers)
batches = [
(flows[i:i + batch_size], masks[i:i + batch_size], depths1[i:i + batch_size], depths2[i:i + batch_size], intrinsic_params)
for i in range(0, n_pairs, batch_size)
]
start_time = time.time()
all_scene_flows = self._execute_parallel(process_scene_flow_batch, batches)
processing_time = time.time() - start_time
gpu_info = f" [GPU {self.physical_gpu_id}]" if self.physical_gpu_id is not None else ""
print(f"✅ Scene flow processing completed in {processing_time:.3f}s ({n_pairs/processing_time:.1f} flows/second){gpu_info}")
return all_scene_flows
def process_wrist_scene_flows_parallel(self, flows: List[np.ndarray], masks: List[np.ndarray],
depths1: List[np.ndarray], depths2: List[np.ndarray],
intrinsic_params: List[float], wrist_extrinsics: List[np.ndarray],
reference_extrinsics: List[np.ndarray]) -> List[np.ndarray]:
"""Calculates all wrist scene flows in parallel without performing any I/O."""
n_pairs = len(flows)
if n_pairs == 0: return []
batch_size = max(1, n_pairs // self.num_workers)
batches = [
(flows[i:i + batch_size], masks[i:i + batch_size], depths1[i:i + batch_size],
depths2[i:i + batch_size], intrinsic_params, wrist_extrinsics[i:i + batch_size],
reference_extrinsics[i:i + batch_size])
for i in range(0, n_pairs, batch_size)
]
start_time = time.time()
all_scene_flows = self._execute_parallel(process_wrist_scene_flow_batch, batches)
processing_time = time.time() - start_time
gpu_info = f" [GPU {self.physical_gpu_id}]" if self.physical_gpu_id is not None else ""
print(f"✅ Wrist scene flow processing completed in {processing_time:.3f}s ({n_pairs/processing_time:.1f} flows/second){gpu_info}")
return all_scene_flows