-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmake_images.py
More file actions
executable file
·194 lines (159 loc) · 5.5 KB
/
make_images.py
File metadata and controls
executable file
·194 lines (159 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import os
import numpy as np
import pandas as pd
import re
from PIL import Image
import torch
import torchvision.transforms.functional as F
# constants
ROOT = os.path.join("home", "niche", "gsformer")
DIR_DATA = os.path.join(ROOT, "data")
DIR_IMAGES = os.path.join(DIR_DATA, "images")
PATH_gparents = os.path.join(DIR_DATA, "g_parents.csv")
PATH_splits = os.path.join(DIR_DATA, "splits.csv")
PATH_ec_train = os.path.join(
DIR_DATA, "raw", "Training_Data", "6_Training_EC_Data_2014_2021.csv"
)
PATH_ec_test = os.path.join(
DIR_DATA, "raw", "Testing_Data", "6_Testing_EC_Data_2022.csv"
)
def main():
# read data
dt_g = pd.read_csv(PATH_gparents)
dt_split = pd.read_csv(PATH_splits)
dt_ec = pd.concat(
[
pd.read_csv(PATH_ec_train),
pd.read_csv(PATH_ec_test),
],
axis=0,
)
# standardize each env variables
dt_ec = standardize_dt_ec(dt_ec)
dt_ec = dt_ec.dropna(axis=1, how="any")
# get the column names (seq and non-seq)
cols_seq, cols_nsq = obtain_ec_cols(dt_ec)
# create folders
make_folders(DIR_IMAGES)
# iterate over each data point and generate images for genotypes, ec seq, and ec non-seq
for i, query in dt_split.iterrows():
print("processing %d / %d" % (i, len(dt_split)))
y = query["Yield_Mg_ha"]
name = str(i) + ".jpg"
if "train" in query["split"]:
split = "train"
elif "test" in query["split"]:
split = "test"
else:
split = "val"
# obtain images for genotypes and ec
try:
img_g = query_g(query, dt_g)
ec_img_sq, ec_img_nsq = query_ec(query, dt_ec, cols_seq, cols_nsq)
except Exception as e:
print(e)
continue
# concatenate images
img = np.concatenate([img_g, ec_img_sq, ec_img_nsq], axis=0)
# save image (X) and annotation (y)
Image.fromarray(img).save(os.path.join(DIR_IMAGES, split, name))
with open(os.path.join(DIR_IMAGES, split, "annotation.txt"), "a") as f:
f.write("%s,%.6f\n" % (name, y))
# convert variable to images ----------------------------------------------------
def query_ec(
query: pd.Series, dt_ec: pd.DataFrame, cols_seq: list[str], cols_nsq: list[str]
) -> tuple[np.ndarray, np.ndarray]:
# seq
ec_seq = (
dt_ec.loc[dt_ec["Env"] == query["Env"], cols_seq]
.values.reshape((-1, 3, 3))
.swapaxes(0, 1)
)
ec_img_seq = ec_seq.swapaxes(0, 1).swapaxes(1, 2)
ec_img_seq = to_uint8(ec_img_seq)
ec_img_seq = resize(ec_img_seq, dim=384)
# non-seq (140 features)
ec_img_nsq = dt_ec.loc[dt_ec["Env"] == query["Env"], cols_nsq].values.reshape(
(10, 14)
)
ec_img_nsq = np.repeat(ec_img_nsq[:, :, np.newaxis], 3, axis=2)
ec_img_nsq = resize(ec_img_nsq)
ec_img_nsq = to_uint8(ec_img_nsq)
# retur
return ec_img_seq, ec_img_nsq
def query_g(query: pd.Series, dt_g: pd.DataFrame) -> np.ndarray:
"""
Query the genotype of a given sample.
Args:
query: a row of the dt_split dataframe
dt_g: the genotype of parents
Returns:
a 3D array of shape (3, 384, 384) representing the genotype of the sample
"""
# get the genotype of parents
p1 = dt_g.loc[dt_g["line"] == query["inbreds"], :].values[:, 1:]
p2 = dt_g.loc[dt_g["line"] == query["testers"], :].values[:, 1:]
# get the genotype of offspring
f1 = p1 + p2
# concatenate the genotype of parents and offspring
g = np.concatenate([p1, p2, f1], axis=0)
# normalize the genotype
g = to_uint8(g)
g = pad_g(g)
# return the genotype
return g
# misc ------------------------------------------------------------------
def standardize_dt_ec(dt_ec: pd.DataFrame):
dt_ec.iloc[:, 1:] = dt_ec.iloc[:, 1:].apply(lambda x: (x - x.mean()) / x.std())
return dt_ec
def obtain_ec_cols(dt_ec: pd.DataFrame):
cols_ec = dt_ec.columns
cols_seq = [col for col in cols_ec if re.findall(r".*[1-9]{1}$", col)]
cols_seq.sort()
cols_nsq = [col for col in cols_ec if re.findall(r".*[a-zA-Z]$", col)][1:]
return cols_seq, cols_nsq
def make_folders(dir_images: str):
# init folders and annotations
if not os.path.exists(dir_images):
os.mkdir(dir_images)
os.mkdir(os.path.join(dir_images, "train"))
os.mkdir(os.path.join(dir_images, "test"))
os.mkdir(os.path.join(dir_images, "val"))
for split in ["train", "test", "val"]:
with open(os.path.join(dir_images, split, "annotation.txt"), "w") as f:
f.write("")
def resize(array, dim=384):
"""
Resize an array to a given dimension.
"""
array_ts = torch.tensor(array).permute(2, 0, 1)
return F.resize(array_ts, (dim, dim)).permute(1, 2, 0).numpy()
def to_uint8(array: np.ndarray) -> np.ndarray:
"""
Convert an array to uint8.
Args:
array: an array of any type
Returns:
an array of type uint8
"""
array = (array - array.min()) / (array.max() - array.min()) * 255
array = array.astype(np.uint8)
return array
def pad_g(img: np.ndarray, dim: int = 384) -> np.ndarray:
"""
Args:
img: a 2D array of shape (3, p)
dim: the dimension of the output image
Returns:
a 3D array of shape (dim, dim, 3)
"""
c, p = img.shape
img = (
np.repeat(img, (dim**2) // p + 1, axis=1)[:, : dim**2]
.reshape(3, dim, dim)
.swapaxes(0, 2)
.swapaxes(0, 1)
)
return img
if __name__ == "__main__":
main()