-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinsein.py
More file actions
136 lines (105 loc) · 4.78 KB
/
insein.py
File metadata and controls
136 lines (105 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
[ INSEIN - MAIN EXECUTABLE ]
AUTHOR: R H A Ashan Imalka (scxr-dev)
DESCRIPTION:
Entry point for the INSEIN Advanced Stealth Scanner.
Orchestrates the kernel injection, logic engine, and UI.
Requires ROOT privileges to function (for raw socket access).
USAGE:
sudo insein <TARGET_IP>
"""
import sys
import os
import subprocess
import time
import argparse
import asyncio
import traceback
from rich.live import Live
from insein_core.ghost_engine import GhostScanner
from insein_core.insane_logic import InsaneBrain
from insein_ui.cyberpunk import CyberpunkDashboard
from insein_modules.decoy import DecoyGenerator
from insein_modules.time_travel import TimeTraveler
def check_root():
if os.geteuid() != 0:
print("\n[!] CRITICAL ERROR: INSEIN requires ROOT access.")
print(" Please run with: sudo insein <TARGET>\n")
sys.exit(1)
async def main_loop(target_ip, ports):
dashboard = CyberpunkDashboard(target_ip)
decoy_gen = DecoyGenerator()
scanner = GhostScanner(target_ip, decoy_generator=decoy_gen)
brain = InsaneBrain(target_ip)
time_machine = TimeTraveler(target_ip)
dashboard.update_state(new_log=("Initializing Kernel Injector...", "INFO"))
active_decoys = decoy_gen.generate_batch(5)
scanner.load_decoys(active_decoys)
dashboard.update_state(new_log=(f"Generated {len(active_decoys)} Decoy IPs.", "INFO"))
dashboard.update_state(decoy_count=len(active_decoys))
try:
with Live(dashboard.render(), refresh_per_second=4, screen=True) as live:
loop = asyncio.get_event_loop()
loop.run_in_executor(None, scanner.sniff_responses)
dashboard.update_state(new_log=("Ghost Protocol: ENGAGED.", "SUCCESS"))
chunk_size = 5
total_scanned = 0
for i in range(0, len(ports), chunk_size):
chunk = ports[i : i + chunk_size]
await scanner.run_scan(chunk)
total_scanned += len(chunk)
new_open_ports = scanner.open_ports
for p in new_open_ports:
if p not in dashboard.ports_found:
dashboard.update_state(new_port=p, new_log=(f"Port {p} OPEN", "SUCCESS"))
analysis = brain.analyze_results(new_open_ports, total_scanned)
if analysis["status"] == "GHOST_BLOCK":
dashboard.update_state(new_log=("FIREWALL DETECTED!", "CRIT"))
dashboard.update_state(waf_status="BLOCKING")
evasion = brain.suggest_evasion()
dashboard.update_state(new_log=(f"Switching Strategy: {evasion}", "WARN"))
elif analysis["status"] == "HONEYPOT_DETECTED":
dashboard.update_state(new_log=("HONEYPOT DETECTED! Aborting.", "CRIT"))
break
if 80 in chunk and 80 not in new_open_ports:
dashboard.update_state(new_log=("Port 80 closed. Checking History...", "INFO"))
try:
history = await time_machine.get_history()
if history['status'] == 'FOUND':
dashboard.update_state(new_log=(f"HISTORY: Was open on {history['last_seen']}", "WARN"))
except Exception as e:
dashboard.update_state(new_log=(f"OSINT Error: {str(e)}", "WARN"))
live.update(dashboard.render())
await asyncio.sleep(0.05)
dashboard.update_state(new_log=("Scan Complete. SYSTEM HOLD.", "SUCCESS"))
dashboard.update_state(new_log=("Press Ctrl+C to exit.", "INFO"))
live.update(dashboard.render())
stop_event = asyncio.Event()
await stop_event.wait()
except KeyboardInterrupt:
pass
except Exception as e:
raise e
def run():
check_root()
parser = argparse.ArgumentParser(description="INSEIN - Advanced Stealth Scanner")
parser.add_argument("target", help="Target IP Address")
parser.add_argument("--ports", help="Port range (e.g., 1-1000)", default="1-1000")
args = parser.parse_args()
try:
start, end = map(int, args.ports.split('-'))
target_ports = list(range(start, end + 1))
asyncio.run(main_loop(args.target, target_ports))
except KeyboardInterrupt:
print("\n[!] User Aborted.")
except ValueError:
print("[!] Invalid port range.")
except Exception:
print("\n" + "="*50)
print("[!] INSEIN CRASHED! REPORT THIS TO SCXR-DEV")
print("="*50)
traceback.print_exc()
print("="*50)
input("\n[PRESS ENTER TO CLOSE TERMINAL]")
if __name__ == "__main__":
run()