Skip to content
This repository was archived by the owner on Sep 18, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -1,73 +1,105 @@
### Spark Eventlog Analyzer
# Spark Eventlog Analyzer
The pyspark script to analyze Gazelle's eventlog

## Prequisites

# Jupyter Installation
### Jupyter Installation
```
pip3 install jupyter
```

# Notebook Installation
### Notebook Installation
```
pip3 install notebook
```

# iPyKernel
### iPyKernel
```
pip3 install ipykernel
```
# FindSpark
### FindSpark
```
pip3 install findspark
```

# Matplotlib
### Matplotlib
```
pip3 install matplotlib
```

# Seaborn
### Seaborn
```
pip3 install seaborn
```

# Pandasql
### Pandasql
```
pip3 install pandasql
```

# PyHDFS
### PyHDFS
```
pip3 install pyhdfs
```

# PyArrow
### PyArrow
```
pip3 install pyarrow
```

### Put below two .ipynb in the jupyter root directory
# Eventlog Analyzer Tools
The eventlog analyzer includes sparklog.ipynb and gazelle_analysis.ipynb.
Please put them into jupyter directory.
## sparklog.ipynb
sparklog.ipynb is the function definition for spark eventlog analyzer

## gazelle_analysis.ipynb
gazelle_analysis is the main program to call sparklog.ipynb and load the eventlog from hdfs.

###How it works:
##How it works:
Launch gazelle_analysis.ipynb as main script

###Parameters:
##Parameters:
In Analysis:generate_trace_view, the url for display.
In App_Log_Analysis:get_basic_state, the url for display.
In App_Log_Analysis:get_app_info, the url for display.
In show_rst function, the url for html.
In pyhdfs, the url for HDFS hosts.



###To run in in commandline:
##To run in in commandline:
jupyter nbconvert --execute --to notebook --inplace --allow-errors --ExecutePreprocessor.timeout=-1 ./gazelle_analysis.ipynb --template classic

###To convert into HTML:
##To convert into HTML:
jupyter nbconvert --to html ./gazelle_analysis.ipynb --output ./gazelle_analysis.html --template classic

# Tools to collect sar information and generation trace view(.json):
You can also use below files to collect sar information.
The purpose for this tool is to generate a json file with sar information.
After the json has been generated, you can use catapult to view your json file.

monitor.py: main program to collect sar information, it must be started before your application and stopped after your application.
post_process.sh: Post process the sar files after the monitor stop.
run_example.sh: An example to teach you how to use monitor.py with your application.
template.ipynb: A template to use for generating trace view(.json).

## Usage
Before run the tool, please make sure to set up some settings including
In monitor.py,
clients:the nodes in your cluster.
base_dir: the base directory name to put logs.
local_profile_dir: the local location to put logs.
hdfs_address: the hdfs address to copy all the logs to hdfs.

In sparklog.ipynb,
Please replace sr124 to the master in your cluster and use to process the logs.
Please replace sr525 to the catapult server.

You can check run_example.sh to see how to use the script to collect sar information.
Please add below command in your script:
```
appid=`yarn application -list 2>&1 | tail -n 1 | awk -F"\t" '{print $1}'`
rm -f log/memory*.csv
python3 ./monitor.py start $appid
$run_your_query
python3 ./monitor.py stop $appid "spark_logs"
```
146 changes: 146 additions & 0 deletions tools/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import os
import pyhdfs
import subprocess
import sys
import time

from inspect import currentframe, getframeinfo
from pathlib import Path

clients=['sr124']
home = str(Path.home())
base_dir = 'profile'
local_profile_dir=home+"/"+base_dir
hdfs_address='10.1.0.24:50070'

def killsar():
for l in clients:
try:
cmd="ssh "+l+" ps aux | grep -w sar | grep -v grep | tr -s ' ' | cut -d' ' -f2"
out=subprocess.check_output(cmd).decode('ascii').strip().split("\n")
for x in out:
cmd="ssh "+l+" kill "+x+" > /dev/null 2>&1"
subprocess.call(cmd,shell=True)
except Exception as e:
print(e)
pass
for l in clients:
try:
cmd="ssh "+l+" ps aux | grep -w pidstat | grep -v grep | tr -s ' ' | cut -d' ' -f2"
out=subprocess.check_output(cmd, shell=True).decode('ascii').strip().split("\n")
for x in out:
cmd="ssh "+l+" kill "+x+" > /dev/null 2>&1"
subprocess.call(cmd,shell=True)
except Exception as e:
print(e)
pass
for l in clients:
try:
cmd="ssh "+l+" ps aux | grep -w perf | grep -v grep | tr -s ' ' | cut -d' ' -f2"
out=subprocess.check_output(cmd,shell=True).decode('ascii').strip().split("\n")
except Exception as e:
print(e)
pass

def startmonitor(appid, **kwargs):
print("[monitor.py]Starting system monitoring ...")
print(clients)
appid_profile_dir=local_profile_dir+"/"+appid
cmd="mkdir -p "+appid_profile_dir
print("Launching CMD create application id profile dir: %s" % cmd)
subprocess.call(cmd,shell=True)

for l in clients:
cmd="ssh "+l+" date"
print(subprocess.check_output(cmd,shell=True).decode('ascii'))

killsar()

for l in clients:
print("[monitor.py]create profile directory")
client_profile_dir=appid_profile_dir+"/"+l
cmd="mkdir -p "+client_profile_dir
print("[monitor.py]Launching CMD create server profile dir: %s" % cmd)
subprocess.call(cmd,shell=True)
cmd="ssh "+l+" mkdir -p "+client_profile_dir
print("[monitor.py]Launching CMD create client profile dir: %s" % cmd)
subprocess.call(cmd,shell=True)
cmd="ssh "+l+" sar -o "+client_profile_dir+"/sar.bin -r -u -d -B -n DEV 1 >/dev/null 2>&1 &"
print("[monitor.py]Launching CMD create sar.bin file: %s" % cmd)
subprocess.call(cmd,shell=True)
if kwargs.get("collect_pid",False):
cmd="ssh "+l+" jps | grep CoarseGrainedExecutorBackend | head -n 1 | cut -d' ' -f 1 | xargs -I % pidstat -h -t -p % 1 > "+client_profile_dir+"/pidstat.out 2>/dev/null &"
print("Launching CMD collect pid: %s" % cmd)
subprocess.call(cmd,shell=True)
return appid_profile_dir

def stopmonitor(appid, eventlogdir, basedif):

appid_profile_dir=local_profile_dir+"/"+appid
cmd="mkdir -p "+appid_profile_dir
print("Launching CMD create application id profile dir: %s" % cmd)
subprocess.call(cmd,shell=True)

killsar()

with open("%s/starttime" % appid_profile_dir,"w") as f:
f.write("{:d}".format(int(time.time()*1000)))

hadoophome=os.environ["HADOOP_HOME"]
userlogdir="/opt/hadoop/yarn/logs"

for l in clients:
client_profile_dir=appid_profile_dir+"/"+l
cmd="ssh "+l+" sar -f "+client_profile_dir+"/sar.bin -r > "+client_profile_dir+"/sar_mem.sar;sar -f "+client_profile_dir+"/sar.bin -u > "+client_profile_dir+"/sar_cpu.sar;sar -f "+client_profile_dir+"/sar.bin -d > "+client_profile_dir+"/sar_disk.sar;sar -f "+client_profile_dir+"/sar.bin -n DEV > "+client_profile_dir+"/sar_nic.sar;sar -f "+client_profile_dir+"/sar.bin -B > "+client_profile_dir+"/sar_page.sar;"
print("Launching CMD: %s" % cmd)
subprocess.call(cmd,shell=True)
cmd="ssh "+l+" grep -rI xgbtck --no-filename "+userlogdir+"/"+appid+"/* | sed 's/^ //g' > "+client_profile_dir+"/xgbtck.txt"
print("Launching CMD: %s" % cmd)
subprocess.call(cmd,shell=True)
cmd="scp -r "+l+":"+client_profile_dir+" "+appid_profile_dir+"/ > /dev/null 2>&1"
print("Launching CMD: %s" % cmd)
subprocess.call(cmd, shell=True)
cmd="ssh "+l+" jps | grep CoarseGrainedExecutorBackend | head -n 2 | tail -n 1 | cut -d' ' -f 1 | xargs -I % ps -To tid p % > "+client_profile_dir+"/sched_threads.txt"
subprocess.call(cmd, shell=True)
cmd="ssh "+l+" sar -V > "+client_profile_dir+"/sarv.txt"
print("Launching CMD: %s" % cmd)
subprocess.call(cmd, shell=True)
cmd="test -f "+client_profile_dir+"/perfstat.txt && head -n 1 "+client_profile_dir+"/perfstat.txt > "+client_profile_dir+"/perfstarttime"
print("Launching CMD: %s" % cmd)
subprocess.call(cmd,shell=True)

logfile=eventlogdir+"/"+appid
cmd="hadoop fs -copyToLocal "+logfile+" "+appid_profile_dir+"/app.log"
print("Launching CMD hadoop fs copytolocal: %s" % cmd)
subprocess.call(cmd,shell=True)

fs = pyhdfs.HdfsClient(hosts=hdfs_address, user_name='root')

print("Launching CMD hadoop fs -mkdir /%s" % basedif)
fs.mkdirs("/" + basedif + "/")
v=[os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(local_profile_dir+"/"+appid)) for f in fn]
for f in v:
paths=os.path.split(f)
fs.mkdirs("/"+ basedif + paths[0][len(local_profile_dir):])
fs.copy_from_local(f,"/"+ basedif + paths[0][len(local_profile_dir):]+"/"+paths[1],overwrite=True)


if __name__ == '__main__':
if sys.argv[1]=="start":
startmonitor( sys.argv[2])
elif sys.argv[1]=="stop":
import datetime
from datetime import date
basedir=base_dir+"/"+date.today().strftime("%Y_%m_%d")
stopmonitor( sys.argv[2],sys.argv[3],basedir)

lastnightrun=["","",""]
with open("log/runs.txt") as f:
for l in f.readlines():
x=l.strip().split(" ")
if ( x[0]=="05" and x[2]!=sys.argv[2] ) or ( len(sys.argv)==5 and x[2]==sys.argv[4] ):
lastnightrun[0]=x[0]
lastnightrun[1]=x[1]
lastnightrun[2]=x[2]
os.system(("./post_process.sh {} {} {} {}").format(date.today().strftime("%Y_%m_%d"), sys.argv[2], lastnightrun[1],lastnightrun[2]))

35 changes: 35 additions & 0 deletions tools/post_process.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# /bin/sh
echo -e "Starting Post Processing ..."
echo -e "Start notebook processing - template.ipynb"
sed 's/BASEDIR_TEMP/profile\/'$1'/g' template.ipynb > tpcxx_$1_$2.ipynb
sed -i 's/APPID_TEMP/'$2'/g' tpcxx_$1_$2.ipynb
if [ $# -eq 4 ]
then
sed -i 's/LAST_BASEDIR/profile\/'$3'/g' tpcxx_$1_$2.ipynb
sed -i 's/LAST_APPID/'$4'/g' tpcxx_$1_$2.ipynb
fi

hadoop fs -mkdir /history
hadoop fs -cp /profile/$1/$2/app.log /history/$2
echo -e "Finish notebook processing - template.ipynb"

echo -e "Start notebook execution - tpcxx.ipynb"
mkdir -p html
jupyter nbconvert --execute --to notebook --inplace --allow-errors --ExecutePreprocessor.timeout=-1 ./tpcxx_$1_$2.ipynb --template classic
jupyter nbconvert --to html ./tpcxx_$1_$2.ipynb --output html/tpcxx_$1_$2.html --template classic

#echo -e "notebook processing - tpch_summary.ipynb"
#sed 's/BASEDIR_TEMP/profile\/'$1'/g' tpch_template_summary.ipynb > tpch_summary_$1_$2.ipynb
#sed -i 's/APPID_TEMP/'$2'/g' tpch_summary_$1_$2.ipynb
#if [ $# -eq 4 ]
#then
# sed -i 's/LAST_BASEDIR/profile\/'$3'/g' tpch_summary_$1_$2.ipynb
# sed -i 's/LAST_APPID/'$4'/g' tpch_summary_$1_$2.ipynb
#fi

#jupyter nbconvert --execute --to notebook --inplace --allow-errors --ExecutePreprocessor.timeout=-1 ./tpch_summary_$1_$2.ipynb --template classic
#jupyter nbconvert --to html --no-input ./tpch_summary_$1_$2.ipynb --output html/tpch_summary_$1_$2.html --template classic
#echo -e "Finish notebook processing - tpch.ipynb"

#rm -rf ./tpch_summary_$1_$2.ipynb
echo -e "Finish Post Processing !!!"
41 changes: 41 additions & 0 deletions tools/run_example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

#/home/spark-sql/collect_sar.sh
eventlogdir="spark_logs"

echo "Stoping Thrift Server ..."
./run_spark_thrift_server.sh stop
echo "Done"
sleep 20
echo "Cleaning Cache ..."
./clean_cache.sh
echo "Done"
sleep 1
echo "Starting Thrift Server ..."
./run_spark_thrift_server.sh start
echo "Done"
sleep 40
echo "Start Resource Monitoring ..."
appid=`yarn application -list 2>&1 | tail -n 1 | awk -F"\t" '{print $1}'`
echo `date +'%H %Y_%m_%d'` $appid ${1} >> log/runs.txt
rm -f log/memory*.csv
python3 ./monitor.py start $appid
echo "Running TPCH Query"
./run_tpch.py 2>&1 >> tpch_query.log | tee -a tpch_query.txt
echo "Done"
sleep 1
echo "Stop Thrift Server"
./run_spark_thrift_server.sh stop
sleep 10
python3 ./monitor.py stop $appid "spark_logs"
echo '<font style="font-family: Courier New"">' > log/link.html
echo 'history event: <a href="http://10.1.0.24:18080/history/'$appid'/jobs/">http://10.1.0.24:18080/history/'$appid'/jobs/</a><br>' >> log/link.html
echo 'history on sr124: <a href="http://10.1.0.24:18080/history/'$appid'/jobs/">http://10.1.0.24:18080/history/'$appid'/jobs/</a><br>' >> log/link.html
echo 'notebook on sr124: <a href="http://10.1.0.24:8888/notebooks/jenkins/tpch_'`date +'%Y_%m_%d'`'_'$appid'.ipynb">http://10.1.0.24:8888/notebooks/jenkins/tpch_'`date +'%Y_%m_%d'`'_'$appid'.ipynb</a><br>' >> log/link.html
echo 'notebook html on sr124: <a href="http://10.1.0.24:8888/view/jenkins/html/tpch_'`date +'%Y_%m_%d'`'_'$appid'.html">http://10.1.0.24:8888/notebooks/jenkins/html/tpch_'`date +'%Y_%m_%d'`'_'$appid'.html</a><br>' >> log/link.html
echo 'traceview on sr124: <a href="http://10.1.0.24:1088/tracing_examples/trace_viewer.html#/tracing/test_data/'$appid'.json">http://10.1.0.24:1088/tracing_examples/trace_viewer.html#/tracing/test_data/'$appid'.json</a><br>' >> log/link.html

echo "</font><hr/>" >> log/link.html

echo "All Jobs Are Done."
#/home/spark-sql/stop_sar.sh
Loading