Skip to content

Commit 6aed5f2

Browse files
authored
TEZ-4617 prevent resource leaks for Closeables (#402) (Dmitry Kryukov reviewed by Laszlo Bodor)
1 parent faa9bb9 commit 6aed5f2

File tree

5 files changed

+98
-74
lines changed

5 files changed

+98
-74
lines changed

tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,12 @@ public static class StaticAMView extends View {
894894
@Override
895895
public void render() {
896896
response().setContentType(MimeType.HTML);
897-
PrintWriter pw = writer();
897+
try (PrintWriter pw = writer()) {
898+
render(pw);
899+
}
900+
}
901+
902+
private void render(PrintWriter pw) {
898903
pw.write("<html>");
899904
pw.write("<head>");
900905
pw.write("<meta charset=\"utf-8\">");
@@ -903,11 +908,11 @@ public void render() {
903908
pw.write("<body>");
904909
if (historyUrl == null || historyUrl.isEmpty()) {
905910
pw.write("<h1>Tez UI Url is not defined.</h1>" +
906-
"<p>To enable tracking url pointing to Tez UI, set the config <b>" +
907-
TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
911+
"<p>To enable tracking url pointing to Tez UI, set the config <b>" +
912+
TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
908913
} else {
909914
pw.write("<h1>Redirecting to Tez UI</h1>. <p>If you are not redirected shortly, click " +
910-
"<a href='" + historyUrl + "'><b>here</b></a></p>"
915+
"<a href='" + historyUrl + "'><b>here</b></a></p>"
911916
);
912917
pw.write("<script type='text/javascript'>setTimeout(function() { " +
913918
"window.location.replace('" + historyUrl + "');" +

tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,13 @@ private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
129129

130130
int numBroadcastTasks = 2;
131131
int numOneToOneTasks = 3;
132+
int numNMs;
132133
if (doLocalityCheck) {
133-
YarnClient yarnClient = YarnClient.createYarnClient();
134-
yarnClient.init(tezConf);
135-
yarnClient.start();
136-
int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
137-
yarnClient.stop();
134+
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
135+
yarnClient.init(tezConf);
136+
yarnClient.start();
137+
numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
138+
}
138139
// create enough 1-1 tasks to run in parallel
139140
numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
140141
if (numOneToOneTasks < 1) {

tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.tez.mapreduce.examples.ExampleDriver.getTezDecoratedConfiguration;
2222

23+
import java.io.IOException;
2324
import java.util.ArrayList;
2425
import java.util.Date;
2526
import java.util.List;
@@ -82,45 +83,52 @@ static int printUsage() {
8283
@SuppressWarnings("deprecation")
8384
public int run(String[] args) throws Exception {
8485
Configuration conf = getConf();
85-
JobClient client = new JobClient(conf);
86+
try (JobClient client = new JobClient(conf)) {
87+
return run(client, conf, args);
88+
}
89+
}
90+
91+
private int run(JobClient client, Configuration conf, String[] args)
92+
throws IOException, ClassNotFoundException, InterruptedException {
8693
ClusterStatus cluster = client.getClusterStatus();
8794
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
8895
String join_reduces = conf.get(REDUCES_PER_HOST);
8996
if (join_reduces != null) {
90-
num_reduces = cluster.getTaskTrackers() *
91-
Integer.parseInt(join_reduces);
97+
num_reduces = cluster.getTaskTrackers() *
98+
Integer.parseInt(join_reduces);
9299
}
100+
93101
Job job = new Job(conf);
94102
job.setJobName("join");
95103
job.setJarByClass(Sort.class);
96104

97-
job.setMapperClass(Mapper.class);
105+
job.setMapperClass(Mapper.class);
98106
job.setReducerClass(Reducer.class);
99107

100-
Class<? extends InputFormat> inputFormatClass =
101-
SequenceFileInputFormat.class;
102-
Class<? extends OutputFormat> outputFormatClass =
103-
SequenceFileOutputFormat.class;
108+
Class<? extends InputFormat> inputFormatClass =
109+
SequenceFileInputFormat.class;
110+
Class<? extends OutputFormat> outputFormatClass =
111+
SequenceFileOutputFormat.class;
104112
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
105113
Class<? extends Writable> outputValueClass = TupleWritable.class;
106114
String op = "inner";
107115
List<String> otherArgs = new ArrayList<String>();
108-
for(int i=0; i < args.length; ++i) {
116+
for (int i = 0; i < args.length; ++i) {
109117
try {
110118
if ("-r".equals(args[i])) {
111119
num_reduces = Integer.parseInt(args[++i]);
112120
} else if ("-inFormat".equals(args[i])) {
113-
inputFormatClass =
114-
Class.forName(args[++i]).asSubclass(InputFormat.class);
121+
inputFormatClass =
122+
Class.forName(args[++i]).asSubclass(InputFormat.class);
115123
} else if ("-outFormat".equals(args[i])) {
116-
outputFormatClass =
117-
Class.forName(args[++i]).asSubclass(OutputFormat.class);
124+
outputFormatClass =
125+
Class.forName(args[++i]).asSubclass(OutputFormat.class);
118126
} else if ("-outKey".equals(args[i])) {
119-
outputKeyClass =
120-
Class.forName(args[++i]).asSubclass(WritableComparable.class);
127+
outputKeyClass =
128+
Class.forName(args[++i]).asSubclass(WritableComparable.class);
121129
} else if ("-outValue".equals(args[i])) {
122-
outputValueClass =
123-
Class.forName(args[++i]).asSubclass(Writable.class);
130+
outputValueClass =
131+
Class.forName(args[++i]).asSubclass(Writable.class);
124132
} else if ("-joinOp".equals(args[i])) {
125133
op = args[++i];
126134
} else {
@@ -131,7 +139,7 @@ public int run(String[] args) throws Exception {
131139
return printUsage();
132140
} catch (ArrayIndexOutOfBoundsException except) {
133141
System.out.println("ERROR: Required parameter missing from " +
134-
args[i-1]);
142+
args[i - 1]);
135143
return printUsage(); // exits
136144
}
137145
}
@@ -144,29 +152,29 @@ public int run(String[] args) throws Exception {
144152
return printUsage();
145153
}
146154

147-
FileOutputFormat.setOutputPath(job,
148-
new Path(otherArgs.remove(otherArgs.size() - 1)));
155+
FileOutputFormat.setOutputPath(job,
156+
new Path(otherArgs.remove(otherArgs.size() - 1)));
149157
List<Path> plist = new ArrayList<Path>(otherArgs.size());
150158
for (String s : otherArgs) {
151159
plist.add(new Path(s));
152160
}
153161

154162
job.setInputFormatClass(CompositeInputFormat.class);
155-
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
156-
CompositeInputFormat.compose(op, inputFormatClass,
157-
plist.toArray(new Path[0])));
163+
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
164+
CompositeInputFormat.compose(op, inputFormatClass,
165+
plist.toArray(new Path[0])));
158166
job.setOutputFormatClass(outputFormatClass);
159167

160168
job.setOutputKeyClass(outputKeyClass);
161169
job.setOutputValueClass(outputValueClass);
162170

163171
Date startTime = new Date();
164172
System.out.println("Job started: " + startTime);
165-
int ret = job.waitForCompletion(true) ? 0 : 1 ;
173+
int ret = job.waitForCompletion(true) ? 0 : 1;
166174
Date end_time = new Date();
167175
System.out.println("Job ended: " + end_time);
168-
System.out.println("The job took " +
169-
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
176+
System.out.println("The job took " +
177+
(end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
170178
return ret;
171179
}
172180

tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -180,70 +180,75 @@ public int run(String[] args) throws Exception {
180180
}
181181

182182
Configuration conf = getConf();
183-
JobClient client = new JobClient(conf);
183+
try (JobClient client = new JobClient(conf)) {
184+
return run(client, conf, args);
185+
}
186+
}
187+
188+
private int run(JobClient client, Configuration conf, String[] args)
189+
throws IOException, ClassNotFoundException, InterruptedException {
184190
ClusterStatus cluster = client.getClusterStatus();
185191
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
186-
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
187-
1*1024*1024*1024);
192+
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024);
188193
if (numBytesToWritePerMap == 0) {
189-
System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0");
194+
System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0");
190195
return -2;
191196
}
192-
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
193-
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
197+
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
198+
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
194199
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
195200
if (numMaps == 0 && totalBytesToWrite > 0) {
196201
numMaps = 1;
197202
conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
198203
}
199204
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
200-
205+
201206
Job job = new Job(conf);
202-
207+
203208
job.setJarByClass(RandomTextWriter.class);
204209
job.setJobName("random-text-writer");
205-
210+
206211
job.setOutputKeyClass(Text.class);
207212
job.setOutputValueClass(Text.class);
208-
213+
209214
job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
210-
job.setMapperClass(RandomTextMapper.class);
211-
212-
Class<? extends OutputFormat> outputFormatClass =
213-
SequenceFileOutputFormat.class;
215+
job.setMapperClass(RandomTextMapper.class);
216+
217+
Class<? extends OutputFormat> outputFormatClass =
218+
SequenceFileOutputFormat.class;
214219
List<String> otherArgs = new ArrayList<String>();
215-
for(int i=0; i < args.length; ++i) {
220+
for (int i = 0; i < args.length; ++i) {
216221
try {
217222
if ("-outFormat".equals(args[i])) {
218-
outputFormatClass =
219-
Class.forName(args[++i]).asSubclass(OutputFormat.class);
223+
outputFormatClass =
224+
Class.forName(args[++i]).asSubclass(OutputFormat.class);
220225
} else {
221226
otherArgs.add(args[i]);
222227
}
223228
} catch (ArrayIndexOutOfBoundsException except) {
224229
System.out.println("ERROR: Required parameter missing from " +
225-
args[i-1]);
230+
args[i - 1]);
226231
return printUsage(); // exits
227232
}
228233
}
229234

230235
job.setOutputFormatClass(outputFormatClass);
231236
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));
232-
237+
233238
System.out.println("Running " + numMaps + " maps.");
234-
239+
235240
// reducer NONE
236241
job.setNumReduceTasks(0);
237-
242+
238243
Date startTime = new Date();
239244
System.out.println("Job started: " + startTime);
240245
int ret = job.waitForCompletion(true) ? 0 : 1;
241246
Date endTime = new Date();
242247
System.out.println("Job ended: " + endTime);
243-
System.out.println("The job took " +
244-
(endTime.getTime() - startTime.getTime()) /1000 +
245-
" seconds.");
246-
248+
System.out.println("The job took " +
249+
(endTime.getTime() - startTime.getTime()) / 1000 +
250+
" seconds.");
251+
247252
return ret;
248253
}
249254

tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -246,17 +246,22 @@ public int run(String[] args) throws Exception {
246246

247247
Path outDir = new Path(args[0]);
248248
Configuration conf = getConf();
249-
JobClient client = new JobClient(conf);
249+
try (JobClient client = new JobClient(conf)) {
250+
return run(client, conf, outDir);
251+
}
252+
}
253+
254+
private int run(JobClient client, Configuration conf, Path outDir)
255+
throws IOException, ClassNotFoundException, InterruptedException {
250256
ClusterStatus cluster = client.getClusterStatus();
251257
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
252-
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
253-
1*1024*1024*1024);
258+
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024);
254259
if (numBytesToWritePerMap == 0) {
255260
System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
256261
return -2;
257262
}
258-
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
259-
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
263+
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
264+
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
260265
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
261266
if (numMaps == 0 && totalBytesToWrite > 0) {
262267
numMaps = 1;
@@ -265,31 +270,31 @@ public int run(String[] args) throws Exception {
265270
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
266271

267272
Job job = new Job(conf);
268-
273+
269274
job.setJarByClass(RandomWriter.class);
270275
job.setJobName("random-writer");
271276
FileOutputFormat.setOutputPath(job, outDir);
272277
job.setOutputKeyClass(BytesWritable.class);
273278
job.setOutputValueClass(BytesWritable.class);
274279
job.setInputFormatClass(RandomInputFormat.class);
275-
job.setMapperClass(RandomMapper.class);
280+
job.setMapperClass(RandomMapper.class);
276281
job.setReducerClass(Reducer.class);
277282
job.setOutputFormatClass(SequenceFileOutputFormat.class);
278-
283+
279284
System.out.println("Running " + numMaps + " maps.");
280-
285+
281286
// reducer NONE
282287
job.setNumReduceTasks(0);
283-
288+
284289
Date startTime = new Date();
285290
System.out.println("Job started: " + startTime);
286291
int ret = job.waitForCompletion(true) ? 0 : 1;
287292
Date endTime = new Date();
288293
System.out.println("Job ended: " + endTime);
289-
System.out.println("The job took " +
290-
(endTime.getTime() - startTime.getTime()) /1000 +
291-
" seconds.");
292-
294+
System.out.println("The job took " +
295+
(endTime.getTime() - startTime.getTime()) / 1000 +
296+
" seconds.");
297+
293298
return ret;
294299
}
295300

0 commit comments

Comments
 (0)