-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathutils.py
More file actions
146 lines (124 loc) · 4.98 KB
/
utils.py
File metadata and controls
146 lines (124 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import collections
import logging
import sys
import numpy as np
import pandas as pd
import scipy
import six
import sklearn.preprocessing as pre
import torch
import tqdm
import yaml
def threshold(x, threshold=0.5):
# Batch_wise
if x.ndim == 3:
return np.array([pre.binarize(sub, threshold=threshold) for sub in x])
else:
return pre.binarize(x, threshold=threshold)
def double_threshold(x, high_thres, low_thres, n_connect=1):
"""double_threshold
Helper function to calculate double threshold for n-dim arrays
:param x: input array
:param high_thres: high threshold value
:param low_thres: Low threshold value
:param n_connect: Distance of <= n clusters will be merged
"""
assert x.ndim <= 3, "Whoops something went wrong with the input ({}), check if its <= 3 dims".format(
x.shape)
if x.ndim == 3:
apply_dim = 1
elif x.ndim < 3:
apply_dim = 0
# x is assumed to be 3d: (batch, time, dim)
# Assumed to be 2d : (time, dim)
# Assumed to be 1d : (time)
# time axis is therefore at 1 for 3d and 0 for 2d (
return np.apply_along_axis(lambda x: _double_threshold(
x, high_thres, low_thres, n_connect=n_connect),
axis=apply_dim,
arr=x)
def _double_threshold(x, high_thres, low_thres, n_connect=1, return_arr=True):
"""_double_threshold
Computes a double threshold over the input array
:param x: input array, needs to be 1d
:param high_thres: High threshold over the array
:param low_thres: Low threshold over the array
:param n_connect: Postprocessing, maximal distance between clusters to connect
:param return_arr: By default this function returns the filtered indiced, but if return_arr = True it returns an array of tsame size as x filled with ones and zeros.
"""
assert x.ndim == 1, "Input needs to be 1d"
high_locations = np.where(x > high_thres)[0]
locations = x > low_thres
encoded_pairs = find_contiguous_regions(locations)
filtered_list = list(
filter(
lambda pair:
((pair[0] <= high_locations) & (high_locations <= pair[1])).any(),
encoded_pairs))
filtered_list = connect_(filtered_list, n_connect)
if return_arr:
zero_one_arr = np.zeros_like(x, dtype=int)
for sl in filtered_list:
zero_one_arr[sl[0]:sl[1]] = 1
return zero_one_arr
return filtered_list
def connect_(pairs, n=1):
"""connect_
Connects two adjacent clusters if their distance is <= n
:param pairs: Clusters of iterateables e.g., [(1,5),(7,10)]
:param n: distance between two clusters
"""
if len(pairs) == 0:
return []
start_, end_ = pairs[0]
new_pairs = []
for i, (next_item, cur_item) in enumerate(zip(pairs[1:], pairs[0:])):
end_ = next_item[1]
if next_item[0] - cur_item[1] <= n:
pass
else:
new_pairs.append((start_, cur_item[1]))
start_ = next_item[0]
new_pairs.append((start_, end_))
return new_pairs
def find_contiguous_regions(activity_array):
"""Find contiguous regions from bool valued numpy.array.
Copy of https://dcase-repo.github.io/dcase_util/_modules/dcase_util/data/decisions.html#DecisionEncoder
Reason is:
1. This does not belong to a class necessarily
2. Import DecisionEncoder requires sndfile over some other imports..which causes some problems on clusters
"""
# Find the changes in the activity_array
change_indices = np.logical_xor(activity_array[1:],
activity_array[:-1]).nonzero()[0]
# Shift change_index with one, focus on frame after the change.
change_indices += 1
if activity_array[0]:
# If the first element of activity_array is True add 0 at the beginning
change_indices = np.r_[0, change_indices]
if activity_array[-1]:
# If the last element of activity_array is True, add the length of the array
change_indices = np.r_[change_indices, activity_array.size]
# Reshape the result into two columns
return change_indices.reshape((-1, 2))
def _decode_with_timestamps(encoder, labels):
result_labels = []
for i, label_column in enumerate(labels.T):
change_indices = find_contiguous_regions(label_column)
# append [onset, offset] in the result list
for row in change_indices:
result_labels.append((encoder.classes_[i], row[0], row[1]))
return result_labels
def decode_with_timestamps(encoder: pre.MultiLabelBinarizer, labels: np.array):
"""decode_with_timestamps
Decodes the predicted label array (2d) into a list of
[(Labelname, onset, offset), ...]
:param encoder: Encoder during training
:type encoder: pre.MultiLabelBinarizer
:param labels: n-dim array
:type labels: np.array
"""
if labels.ndim == 3:
return [_decode_with_timestamps(encoder, lab) for lab in labels]
else:
return _decode_with_timestamps(encoder, labels)