33import os
44import subprocess
55import sys
6- import time
76
87FIELD_ORDER = [
98 "queryName" , "severity" , "line" , "fileName" ,
109 "resourceType" , "resourceName" , "searchKey" , "searchValue" ,
1110 "expectedValue" , "actualValue" , "issueType" , "similarityID" , "search_line" ,
1211]
1312
13+ KICS_RESULT_CODES = {0 , 20 , 30 , 40 , 50 , 60 }
14+
1415SCRIPT_DIR = os .path .dirname (os .path .abspath (__file__ ))
1516REPO_ROOT = os .path .normpath (os .path .join (SCRIPT_DIR , "../../.." ))
1617QUERIES_DIR = os .path .join (REPO_ROOT , "assets" , "queries" )
@@ -40,8 +41,7 @@ def build_command(query_id: str, scan_path: str, payload_path: str, output_path:
4041 "--experimental-queries" ,
4142 "--bom" ,
4243 "--enable-openapi-refs" ,
43- "--kics_compute_new_simid"
44- #"--ignore-on-exit", "results"
44+ "--kics_compute_new_simid"
4545 ]
4646
4747
@@ -53,11 +53,10 @@ def run_scan(query_id: str, scan_path: str, payload_path: str, output_path: str,
5353 print ("-" * 60 )
5454
5555 try :
56- result = subprocess .run (command , cwd = REPO_ROOT , check = True )
56+ result = subprocess .run (command , cwd = REPO_ROOT )
57+ if result .returncode not in KICS_RESULT_CODES :
58+ print (f"\n [ERROR] Scan failed with return code { result .returncode } ." , file = sys .stderr )
5759 return result .returncode
58- except subprocess .CalledProcessError as e :
59- print (f"\n [ERROR] Scan failed with return code { e .returncode } ." , file = sys .stderr )
60- return e .returncode
6160 except FileNotFoundError :
6261 print ("\n [ERROR] 'go' not found. Make sure Go is installed and in your PATH." , file = sys .stderr )
6362 return 1
@@ -93,21 +92,21 @@ def find_positive_tests(query_path: str) -> list[tuple[str, str]]:
9392 continue
9493 positives .append ((label , file_path ))
9594 else :
96- # File: positiveX.<ext>
95+ # File: positive.<ext> or positiveX.<ext>
9796 suffix = entry [len ("positive" ):].split ("." )[0 ]
98- if not suffix .isdigit ():
99- continue
97+ if suffix and not suffix .isdigit ():
98+ continue # skip positive_expected_result.json etc.
10099 positives .append ((f"positive{ suffix } " , full_path ))
101100
102101 positives .sort (key = lambda x : x [0 ])
103102 return positives
104103
105104
106- def run_query_scans (query_id : str , query_path : str ) -> list [tuple [str , str , int ]]:
105+ def run_query_scans (query_id : str , query_path : str ) -> tuple [ list [tuple [str , str , int ]], bool ]:
107106 positives = find_positive_tests (query_path )
108107 if not positives :
109108 print (f"[WARN] No positive tests found in { query_path } /test, skipping." , file = sys .stderr )
110- return []
109+ return [], False
111110
112111 payloads_dir = os .path .join (query_path , "payloads" )
113112 os .makedirs (payloads_dir , exist_ok = True )
@@ -121,21 +120,22 @@ def run_query_scans(query_id: str, query_path: str) -> list[tuple[str, str, int]
121120 output_name = f"{ label } .json"
122121 print (f"\n -> { label } : { os .path .relpath (scan_path , REPO_ROOT )} " )
123122 rc = run_scan (query_id , scan_path , payload_path , output_path , output_name )
124- if rc != 0 :
123+ if rc not in KICS_RESULT_CODES :
125124 failed .append ((scan_path , payload_path , rc ))
126125
127- collect_and_write_expected_results (query_path )
128- return failed
126+ written = collect_and_write_expected_results (query_path )
127+ return failed , written
129128
130129
131- def collect_and_write_expected_results (query_path : str ) -> None :
130+ def collect_and_write_expected_results (query_path : str ) -> bool :
132131 """
133132 Read all positive*.json result files from results/, extract findings,
134- sort by (fileName, line), and write test/positive_expected_result.json.
133+ sort by (fileName, line, issueType, searchKey, similarityID), and write
134+ test/positive_expected_result.json. Returns True if the file was written.
135135 """
136136 results_dir = os .path .join (query_path , "results" )
137137 if not os .path .isdir (results_dir ):
138- return
138+ return False
139139
140140 entries = []
141141 for filename in sorted (os .listdir (results_dir )):
@@ -144,7 +144,8 @@ def collect_and_write_expected_results(query_path: str) -> None:
144144 with open (os .path .join (results_dir , filename ), encoding = "utf-8" ) as f :
145145 data = json .load (f )
146146
147- for query in data .get ("queries" , []):
147+ all_findings = data .get ("queries" , []) + data .get ("bill_of_materials" , [])
148+ for query in all_findings :
148149 query_name = query .get ("query_name" , "" )
149150 severity = query .get ("severity" , "" )
150151 for file_entry in query .get ("files" , []):
@@ -165,6 +166,9 @@ def collect_and_write_expected_results(query_path: str) -> None:
165166 }
166167 entries .append ({k : entry [k ] for k in FIELD_ORDER })
167168
169+ if not entries :
170+ return False
171+
168172 entries .sort (key = lambda x : (
169173 x ["fileName" ], x ["line" ], x ["issueType" ], x ["searchKey" ], x ["similarityID" ]
170174 ))
@@ -175,6 +179,7 @@ def collect_and_write_expected_results(query_path: str) -> None:
175179 f .write ("\n " )
176180
177181 print (f" -> Written { len (entries )} entries to { os .path .relpath (out_path , REPO_ROOT )} " )
182+ return True
178183
179184
180185def iter_queries ():
@@ -196,30 +201,35 @@ def main():
196201 args = parse_args ()
197202
198203 if args .run_all :
199- all_failed = []
204+ all_failed = []
205+ written_count = 0
200206 queries = list (iter_queries ())
201- print (f"Found { len (queries )} queries. Starting scans...\n " )
202- time .sleep (5 ) # mudar para menos, isto é só para efeitos de debug
203- for query_id , query_path in queries :
204- print (f"\n === { os .path .relpath (query_path , REPO_ROOT )} ({ query_id } ) ===" )
205- failed = run_query_scans (query_id , query_path )
207+ total = len (queries )
208+ width = len (str (total ))
209+ print (f"Found { total } queries. Starting scans...\n " )
210+ for idx , (query_id , query_path ) in enumerate (queries , start = 1 ):
211+ print (f"\n [{ idx :{width }d} /{ total } ] { os .path .relpath (query_path , REPO_ROOT )} " )
212+ failed , written = run_query_scans (query_id , query_path )
206213 all_failed .extend (failed )
214+ if written :
215+ written_count += 1
207216
208217 print ("\n " + "=" * 60 )
218+ print (f"[SUMMARY] { written_count } /{ total } positive_expected_result.json written" )
209219 if all_failed :
210- print (f"[SUMMARY] { len (all_failed )} scan(s) failed:" )
220+ print (f" { len (all_failed )} scan(s) failed:" )
211221 for scan_path , payload_path , rc in all_failed :
212222 print (f" - { os .path .relpath (scan_path , REPO_ROOT )} → exit { rc } " )
213223 sys .exit (1 )
214224 else :
215- print (f"[SUMMARY] All scans completed successfully." )
225+ print (" All scans completed successfully." )
216226 sys .exit (0 )
217227 else :
218228 if not args .queryPath :
219229 print ("[ERROR] --queryPath is required when not using --run-all." , file = sys .stderr )
220230 sys .exit (1 )
221231 query_path = os .path .normpath (os .path .join (REPO_ROOT , args .queryPath ))
222- failed = run_query_scans (args .queryID , query_path )
232+ failed , _ = run_query_scans (args .queryID , query_path )
223233 sys .exit (1 if failed else 0 )
224234
225235
0 commit comments