Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions netforge_rl/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
DecoyTomcat,
Misinform,
ConfigureACL,
SecurityAwarenessTraining,
DeployHoneytoken,
RotateKerberos,
)
from .red import (
NetworkScan,
Expand All @@ -27,6 +30,11 @@
KillProcess,
ShareIntelligence,
OverloadPLC,
SpearPhishing,
)
from .red.post_exploitation import (
DumpLSASS,
PassTheTicket,
)

__all__ = [
Expand All @@ -42,6 +50,9 @@
'DecoyTomcat',
'Misinform',
'ConfigureACL',
'SecurityAwarenessTraining',
'DeployHoneytoken',
'RotateKerberos',
'NetworkScan',
'DiscoverRemoteSystems',
'DiscoverNetworkServices',
Expand All @@ -56,11 +67,7 @@
'KillProcess',
'ShareIntelligence',
'OverloadPLC',
'SecurityAwarenessTraining',
'DeployHoneytoken',
'SpearPhishing',
'DumpLSASS',
'PassTheTicket',
]

from .blue import SecurityAwarenessTraining
from .blue import DeployHoneytoken

__all__.extend(['SecurityAwarenessTraining', 'DeployHoneytoken'])
3 changes: 3 additions & 0 deletions netforge_rl/actions/blue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@
'ConfigureACL',
'SecurityAwarenessTraining',
'DeployHoneytoken',
'RotateKerberos',
]

from .identity import RotateKerberos
89 changes: 89 additions & 0 deletions netforge_rl/actions/blue/identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import random
import string
from netforge_rl.core.action import BaseAction, ActionEffect
from netforge_rl.core.registry import action_registry


@action_registry.register('RotateKerberos', 'blue')
class RotateKerberos(BaseAction):
"""
Apex Zero-Trust Action: Rotates Domain Kerberos TGT Keys globally.
This invalidates all currently held Enterprise Admin tokens, severing Red's ZTNA lateral movement.
It impacts the entirely network graph, but burns significant Business Downtime.
"""

def __init__(self, agent_id: str, target_ip: str):
# target_ip is effectively ignored since this is a global action, but retained for API parity.
super().__init__(agent_id, target_ip)
self.duration = 4
self.compute_cost = 80

def validate(self, global_state) -> bool:
# Global action; validate the blue agent has enough funds (highly expensive)
if self.agent_id in global_state.agent_funds:
if global_state.agent_funds[self.agent_id] < 5000:
return False
return True

def execute(self, global_state) -> ActionEffect:
class RotateKerberosCommand:
def __init__(self, agent_id):
self.agent_id = agent_id

def execute(self, state):
# 1. Burn the massive funding cost
if self.agent_id in state.agent_funds:
state.agent_funds[self.agent_id] -= 5000
state.business_downtime_score += 1500.0

# 2. Flush all Red Agent Inventories globally
for agent in state.agent_inventory:
state.agent_inventory[agent].clear()

# 3. Generate a new valid Domain Token string
random_suffix = ''.join(
random.choices(string.ascii_uppercase + string.digits, k=6)
)
new_token = f'Enterprise_Admin_Token_{random_suffix}'

# 4. Migrate the global environment physics to require the NEW token
for host in state.all_hosts.values():
# Update what the host requires
if 'Enterprise_Admin_Token' in host.system_tokens:
host.system_tokens.remove('Enterprise_Admin_Token')
host.system_tokens.append(new_token)

# Also update wildcard tokens from any previous rotations
old_tokens = [
t
for t in host.system_tokens
if t.startswith('Enterprise_Admin_Token_')
]
for t in old_tokens:
host.system_tokens.remove(t)
host.system_tokens.append(new_token)

# Update what the Domain Controllers hold in memory
if 'Enterprise_Admin_Token' in host.cached_credentials:
host.cached_credentials.remove('Enterprise_Admin_Token')
host.cached_credentials.append(new_token)

old_cache = [
t
for t in host.cached_credentials
if t.startswith('Enterprise_Admin_Token_')
]
for t in old_cache:
host.cached_credentials.remove(t)
host.cached_credentials.append(new_token)

deltas = {'identity_flush': RotateKerberosCommand(self.agent_id)}

return ActionEffect(
success=True,
state_deltas=deltas,
observation_data={
'alert': 'CRITICAL: Global Domain Keys Rotated. Enterprise Network re-verified.'
},
eta=self.duration,
)
3 changes: 3 additions & 0 deletions netforge_rl/actions/red/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
'ShareIntelligence',
'OverloadPLC',
'SpearPhishing',
'DumpLSASS',
'PassTheTicket',
]

from .social_engineering import SpearPhishing
from .post_exploitation import DumpLSASS, PassTheTicket
154 changes: 102 additions & 52 deletions netforge_rl/actions/red/exploits.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
return global_state.can_route_to(self.target_ip)
return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)

def execute(self, global_state) -> ActionEffect:
import random
Expand All @@ -53,21 +53,37 @@ def execute(self, global_state) -> ActionEffect:
if not host or not host.vulnerabilities:
return ActionEffect(success=False, state_deltas=[], observation_data={})

# CVSS-Weighted Stochastics (1.0 = 100% success on 10.0 CVSS, 0.2 = 20% on 2.0 CVSS)
cvss = getattr(host, 'cvss_score', 5.0) # Default average vulnerability logic
probability_of_success = cvss / 10.0

if host.decoy == 'active' or random.random() > probability_of_success:
return ActionEffect(
success=False,
state_deltas=[],
observation_data={
'failed_exploit': self.target_ip,
'reason': 'stochastic_cvss_failure',
},
# --- Sim2Real Bridge dispatch ---
bridge = getattr(global_state, 'sim2real_bridge', None)
if bridge is not None:
hw_result = bridge.dispatch(
'ExploitRemoteService', self.target_ip, getattr(host, 'os', 'Unknown')
)
reward_delta = bridge.reward_delta(hw_result)
if not hw_result.success:
return ActionEffect(
success=False,
state_deltas=[],
observation_data={
'failed_exploit': self.target_ip,
'reason': 'sim2real_failure',
'sim2real_stdout': hw_result.stdout,
'sim2real_reward_delta': reward_delta,
'sim2real_latency_ms': hw_result.latency_ms,
},
)
else:
# Fallback: CVSS-weighted random roll (legacy training path)
cvss = getattr(host, 'cvss_score', 5.0)
if host.decoy == 'active' or random.random() > cvss / 10.0:
return ActionEffect(
success=False,
state_deltas=[],
observation_data={'failed_exploit': self.target_ip},
)
hw_result = None
reward_delta = 0.0

# Build OOP Delta List
deltas = [
UpdateHostPrivilegeCommand(
self.target_ip, 'User', compromised_by=self.agent_id
Expand All @@ -79,6 +95,9 @@ def execute(self, global_state) -> ActionEffect:
'exploit': self.target_ip,
'status': 'User_Access_Gained',
'active_session_established': True,
'sim2real_stdout': hw_result.stdout if hw_result else None,
'sim2real_reward_delta': reward_delta,
'sim2real_latency_ms': hw_result.latency_ms if hw_result else None,
}

return ActionEffect(
Expand Down Expand Up @@ -122,19 +141,10 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
return global_state.can_route_to(self.target_ip)
return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)

def execute(self, global_state) -> ActionEffect:
"""Completes the pre-auth RCE and commits the privilege transformation

delta. Fails if target is patched against CVE-2019-0708.

Args:
global_state: Simulator context.

Returns:
ActionEffect: System delta upgrading access rights to 'User'.
"""
"""Completes the pre-auth RCE. Uses Sim2RealBridge when available."""
import random

host = global_state.all_hosts.get(self.target_ip)
Expand All @@ -147,27 +157,51 @@ def execute(self, global_state) -> ActionEffect:
},
)

roll = random.random()
if roll < 0.15:
return ActionEffect(
success=False,
state_deltas={},
observation_data={'exploit': 'failed silently'},
)
elif roll < 0.25:
return ActionEffect(
success=False,
state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'},
observation_data={'exploit': 'failed - kernel panic'},
# --- Sim2Real Bridge dispatch ---
bridge = getattr(global_state, 'sim2real_bridge', None)
if bridge is not None:
hw_result = bridge.dispatch(
'ExploitBlueKeep', self.target_ip, getattr(host, 'os', 'Unknown')
)
reward_delta = bridge.reward_delta(hw_result)
if not hw_result.success:
return ActionEffect(
success=False,
state_deltas={},
observation_data={
'exploit': 'BlueKeep failed',
'sim2real_stdout': hw_result.stdout,
'sim2real_reward_delta': reward_delta,
},
)
else:
roll = random.random()
if roll < 0.15:
return ActionEffect(
success=False,
state_deltas={},
observation_data={'exploit': 'failed silently'},
)
elif roll < 0.25:
return ActionEffect(
success=False,
state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'},
observation_data={'exploit': 'failed - kernel panic'},
)
hw_result = None
reward_delta = 0.0

return ActionEffect(
success=True,
state_deltas={
f'hosts/{self.target_ip}/privilege': 'User',
f'hosts/{self.target_ip}/compromised_by': self.agent_id,
},
observation_data={'exploit': 'BlueKeep success'},
observation_data={
'exploit': 'BlueKeep success',
'sim2real_stdout': hw_result.stdout if hw_result else None,
'sim2real_reward_delta': reward_delta,
},
)


Expand Down Expand Up @@ -206,7 +240,7 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
return global_state.can_route_to(self.target_ip)
return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)

def execute(self, global_state) -> ActionEffect:
"""Calculates impact deltas following the SMB buffer overflow
Expand Down Expand Up @@ -282,20 +316,11 @@ def validate(self, global_state) -> bool:
"""Requires valid routing to the web interface."""
if not super().validate(global_state):
return False
return global_state.can_route_to(self.target_ip)
return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)

def execute(self, global_state) -> ActionEffect:
"""Executes the RFI request. Automatically evaluates failure states if

interacting with simulated High-Interaction Honeypots (e.g.,
DecoyApache, DecoyTomcat).

Args:
global_state (GlobalNetworkState): State snapshot.

Returns:
ActionEffect: Success logic containing failure telemetry if targeting a Decoy,
else structural access upgrades to 'User'.
"""Executes the RFI request. Uses Sim2RealBridge when available.
Automatically fails against honeypot decoys.
"""
host = global_state.all_hosts.get(self.target_ip)
if host and host.decoy in ['Apache', 'Tomcat', 'active']:
Expand All @@ -305,11 +330,36 @@ def execute(self, global_state) -> ActionEffect:
observation_data={'exploit': 'Failed against Decoy'},
)

# --- Sim2Real Bridge dispatch ---
bridge = getattr(global_state, 'sim2real_bridge', None)
if bridge is not None:
hw_result = bridge.dispatch(
'ExploitHTTP_RFI', self.target_ip, getattr(host, 'os', 'Unknown')
)
reward_delta = bridge.reward_delta(hw_result)
if not hw_result.success:
return ActionEffect(
success=False,
state_deltas={},
observation_data={
'exploit': 'HTTP_RFI failed',
'sim2real_stdout': hw_result.stdout,
'sim2real_reward_delta': reward_delta,
},
)
else:
hw_result = None
reward_delta = 0.0

return ActionEffect(
success=True,
state_deltas={
f'hosts/{self.target_ip}/privilege': 'User',
f'hosts/{self.target_ip}/compromised_by': self.agent_id,
},
observation_data={'exploit': 'HTTP_RFI success'},
observation_data={
'exploit': 'HTTP_RFI success',
'sim2real_stdout': hw_result.stdout if hw_result else None,
'sim2real_reward_delta': reward_delta,
},
)
Loading
Loading