Skip to content

Commit 7d0dad2

Browse files
authored
chore: remove code for Kafka logging (#128)
1 parent 9f7b4d6 commit 7d0dad2

File tree

8 files changed

+13
-661
lines changed

8 files changed

+13
-661
lines changed

src/JuliaHub.jl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ include("PackageBundler/PackageBundler.jl")
3333
include("jobs/jobs.jl")
3434
include("jobs/request.jl")
3535
include("jobs/logging.jl")
36-
include("jobs/logging-kafka.jl")
3736
include("jobs/logging-legacy.jl")
3837
include("packages.jl")
3938
include("projects.jl")

src/jobs/logging-kafka.jl

Lines changed: 0 additions & 501 deletions
This file was deleted.

src/jobs/logging-legacy.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ function JobLogMessage(::_LegacyLogging, json::AbstractDict, offset::Integer)
2424
eventId = _get_json_or(json, "eventId", String, nothing)
2525
JobLogMessage(;
2626
_offset=offset, timestamp, message, _metadata=metadata, _keywords=keywords,
27-
_legacy_eventId=eventId, _kafka_stream=nothing, _json=json,
27+
_legacy_eventId=eventId, _json=json,
2828
)
2929
end
3030

src/jobs/logging.jl

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,5 @@
11
abstract type _JobLoggingAPIVersion end
22

3-
module _LoggingMode
4-
@enum T NOKAFKA AUTOMATIC FORCEKAFKA
5-
end
6-
const _OPTION_LoggingMode = Ref{_LoggingMode.T}(_LoggingMode.NOKAFKA)
7-
8-
function _job_logging_api_version(
9-
auth::Authentication, jobname::AbstractString
10-
)::_JobLoggingAPIVersion
11-
# For debugging, development, and testing purposes, we allow the user to force the
12-
# logging backend to use a particular endpoint. This is not a documented behaviour
13-
# and should not be relied on.
14-
if _OPTION_LoggingMode[] == _LoggingMode.NOKAFKA
15-
return _LegacyLogging()
16-
elseif _OPTION_LoggingMode[] == _LoggingMode.FORCEKAFKA
17-
return _KafkaLogging()
18-
elseif _OPTION_LoggingMode[] == _LoggingMode.AUTOMATIC
19-
query = Dict("jobname" => jobname)
20-
r = _restcall(auth, :HEAD, "juliaruncloud", "get_logs_v2"; query)
21-
# If HEAD /juliaruncloud/get_logs_v2 returns a 200, then we know we can try to fetch
22-
# the newer Kafka logs. If it returns anything else, we will try use the old endpoint.
23-
# However, it _should_ return a 404 in the latter case, and we'll warn if it returns a
24-
# different code.
25-
r.status == 200 && return _KafkaLogging()
26-
if r.status != 404
27-
@warn "Unexpected response from HEAD /juliaruncloud/get_logs_v2" r.status r.body
28-
end
29-
return _LegacyLogging()
30-
end
31-
error("Invalid _OPTION_LoggingMode: $(_OPTION_LoggingMode[])")
32-
end
33-
343
"""
354
struct JobLogMessage
365
@@ -53,7 +22,6 @@ Base.@kwdef struct JobLogMessage
5322
_metadata::Dict{String, Any} # `metadata :: Dict{String, Any}`: additional metadata with not guaranteed fields; may also be empty (TODO)
5423
_keywords::Dict{String, Any} # `keywords :: Dict{String, Any}`: additional metadata with not guaranteed fields; may also be empty (TODO)
5524
_legacy_eventId::Union{String, Nothing}
56-
_kafka_stream::Union{String, Nothing}
5725
_json::Dict
5826
end
5927

@@ -254,11 +222,7 @@ function job_logs_buffered(
254222
stream::Bool=false,
255223
auth::Authentication=__auth__(),
256224
)
257-
if _job_logging_api_version(auth, jobname) == _KafkaLogging()
258-
return KafkaLogsBuffer(f, auth; jobname, offset, stream)
259-
else
260-
return _LegacyLogsBuffer(f, auth; jobname, offset, stream)
261-
end
225+
return _LegacyLogsBuffer(f, auth; jobname, offset, stream)
262226
end
263227
job_logs_buffered(job::Union{AbstractString, Job}; kwargs...) =
264228
job_logs_buffered(_noop, job; kwargs...)

test/jet.jl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ function JET.configured_reports(::JuliaHubReportFilter, reports::Vector{JET.Infe
1515
if isa(report, JET.MethodErrorReport) && report.union_split > 1
1616
return false
1717
end
18-
# The Kafka code (which is currently not enabled) has a few bugs that JET actually
19-
# reveals. But we ignore them for now.
20-
endswith(string(report.vst[end].file), "logging-kafka.jl") && return false
2118
# We also ignore the _restput_mockable() error in restapi.jl, since JET seems to
2219
# assume that kwargs... must be non-empty
2320
contains(string(report.vst[end].linfo.def.name), "_restput_mockable") && return false

test/jobs.jl

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -656,9 +656,9 @@ end
656656
end
657657
end
658658

659-
function logging_mocking_wrapper(f::Base.Callable, testset_name::AbstractString; legacy=false)
659+
function logging_mocking_wrapper(f::Base.Callable, testset_name::AbstractString)
660660
global MOCK_JULIAHUB_STATE
661-
logengine = LogEngine(; kafkalogging=!legacy)
661+
logengine = LogEngine()
662662
MOCK_JULIAHUB_STATE[:logengine] = logengine
663663
try
664664
Mocking.apply(mocking_patch) do
@@ -671,40 +671,24 @@ function logging_mocking_wrapper(f::Base.Callable, testset_name::AbstractString;
671671
end
672672
end
673673

674-
JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.AUTOMATIC
675-
@testset "Job logs: legacy = $legacy" for legacy in [true, false]
676-
logging_mocking_wrapper("Basic logging"; legacy=legacy) do logengine
674+
@testset "Job logs" begin
675+
logging_mocking_wrapper("Basic logging") do logengine
677676
@testset "Invalid requests" begin
678677
# Negative offsets are not allowed
679678
@test_throws ArgumentError JuliaHub.job_logs_buffered("jr-test1"; offset=-1)
680679
# First, just double check that a missing job 403s on the backend
681680
@test_throws JuliaHub.PermissionError JuliaHub.job_logs_buffered("jr-test1"; offset=0)
682681
end
683-
# Let's check that we are using the correct backend
684-
@testset "Dispatching on backend" begin
685-
auth = JuliaHub.current_authentication()
686-
# If the job is not present, then the Kafka backend will always be disabled
687-
@test JuliaHub._job_logging_api_version(auth, "jr-test1") == JuliaHub._LegacyLogging()
688-
# Let's add a finished job without any logs. Since these jobs are marked as finished,
689-
# we should not add logs to the "backend" after the buffer has been constructed.
690-
logengine.jobs["jr-test1"] = LogEngineJob([])
691-
# After the job is added, the expected backend depends on whether we're testing for legacy
692-
# of the Kafka backend.
693-
expected_backend = legacy ? JuliaHub._LegacyLogging() : JuliaHub._KafkaLogging()
694-
@test JuliaHub._job_logging_api_version(auth, "jr-test1") == expected_backend
695-
# Let's also test the override variable
696-
JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.FORCEKAFKA
697-
@test JuliaHub._job_logging_api_version(auth, "jr-test1") == JuliaHub._KafkaLogging()
698-
JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.NOKAFKA
699-
@test JuliaHub._job_logging_api_version(auth, "jr-test1") == JuliaHub._LegacyLogging()
700-
JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.AUTOMATIC
701-
end
702682
# Test the fetching of logs
683+
auth = JuliaHub.current_authentication()
684+
# Let's add a finished job without any logs. Since these jobs are marked as finished,
685+
# we should not add logs to the "backend" after the buffer has been constructed.
686+
logengine.jobs["jr-test1"] = LogEngineJob([])
703687
@testset "Zero logs" begin
704688
let lb = JuliaHub.job_logs_buffered("jr-test1"; offset=0)
705689
# Just one check to make sure that the returned buffer actually matches the backend
706690
# that we're trying to test.
707-
@test lb isa (legacy ? JuliaHub._LegacyLogsBuffer : JuliaHub.KafkaLogsBuffer)
691+
@test lb isa JuliaHub._LegacyLogsBuffer
708692
@test length(lb.logs) == 0
709693
@test JuliaHub.hasfirst(lb)
710694
@test JuliaHub.haslast(lb)
@@ -937,7 +921,6 @@ JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.AUTOMATIC
937921
end
938922
end
939923
end
940-
JuliaHub._OPTION_LoggingMode[] = JuliaHub._LoggingMode.NOKAFKA
941924

942925
@testset "JuliaHub.submit_job: expose=" begin
943926
empty!(MOCK_JULIAHUB_STATE)

test/mocking.jl

Lines changed: 1 addition & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -475,15 +475,6 @@ function _restcall_mocked(method, url, headers, payload; query)
475475
else
476476
serve_legacy(logengine, Dict(query))
477477
end
478-
elseif in(method, (:HEAD, :GET)) && endswith(url, "/juliaruncloud/get_logs_v2")
479-
logengine = get(MOCK_JULIAHUB_STATE, :logengine, nothing)
480-
if isnothing(logengine)
481-
JuliaHub._RESTResponse(500, "MOCK_JULIAHUB_STATE[:logengine] not set up")
482-
elseif !logengine.kafkalogging
483-
JuliaHub._RESTResponse(404, "MOCK_JULIAHUB_STATE[:logengine]: Kafka disabled")
484-
else
485-
serve_kafka(logengine, method, Dict(query))
486-
end
487478
elseif (method == :GET) && occursin("/juliaruncloud/product_image_groups", url)
488479
Dict(
489480
"image_groups" => Dict(
@@ -532,91 +523,10 @@ end
532523

533524
Base.@kwdef mutable struct LogEngine
534525
jobs::Dict{String, LogEngineJob} = Dict()
535-
kafkalogging::Bool = false
536-
# The real legacy endpoint has a limit of 10k. Similarly, for the real
537-
# kafka endpoint this is determined by the max_bytes of the response,
538-
# so it can actually vary.
526+
# The real legacy endpoint has a limit of 10k
539527
max_response_size::Int = 10
540528
end
541529

542-
function serve_kafka(logengine::LogEngine, method::Symbol, query::AbstractDict)
543-
jobname = get(query, "jobname", nothing)
544-
job = get(logengine.jobs, jobname, nothing)
545-
# If the client is doing a HEAD request, then we immediately return the
546-
# approriate empty response.
547-
if method == :HEAD
548-
if isnothing(jobname)
549-
return JuliaHub._RESTResponse(400, "")
550-
elseif isnothing(job)
551-
return JuliaHub._RESTResponse(404, "")
552-
else
553-
return JuliaHub._RESTResponse(200, "")
554-
end
555-
end
556-
# Error handing for the GET requests (like for HEAD, but with a body)
557-
if isnothing(jobname)
558-
return JuliaHub._RESTResponse(400, "jobname is missing")
559-
elseif isnothing(job)
560-
return JuliaHub._RESTResponse(404, "No such job $jobname")
561-
end
562-
# Return the normal response
563-
offset = get(query, "offset", nothing)
564-
# We'll construct the full list of logs for the job, including the meta message
565-
# at the end if necessary.
566-
logs::Vector{Any} = map(enumerate(job.logs)) do (i, log)
567-
# Make the indexing start from 0, to match the Kafka offset logic
568-
(i - 1, log)
569-
end
570-
# We'll add the meta=bottom message, if needed.
571-
if !job.isrunning
572-
push!(logs, (length(logs), :bottom))
573-
end
574-
logs = if isnothing(offset)
575-
start = max(1, length(logs) - logengine.max_response_size + 1)
576-
logs[start:end]
577-
elseif offset + 1 <= length(logs)
578-
start = offset + 1
579-
stop = min(start + logengine.max_response_size - 1, length(logs))
580-
logs[start:stop]
581-
else
582-
[] # For out of range offsets we just return an empty list of logs
583-
end
584-
start_timestamp = Dates.now()
585-
logs = map(logs) do (i, log)
586-
value = if isa(log, Symbol)
587-
Dict("meta" => string(log))
588-
else
589-
Dict(
590-
"timestamp" =>
591-
JuliaHub._log_legacy_datetime_to_ms(start_timestamp + Dates.Second(i)),
592-
"log" => Dict(
593-
"message" => String(log),
594-
"keywords" => Dict(
595-
"typeof(logger)" => "LoggingExtras...",
596-
"jrun_hostname" => "jr-ecogm4cccn-x5z4g",
597-
"jrun_worker_id" => 1,
598-
"jrun_thread_id" => 1,
599-
"jrun_time" => start_timestamp,
600-
"jrun_process_id" => 27,
601-
),
602-
"metadata" => Dict(
603-
"line" => 141,
604-
"id" => "Main_JuliaRunJob_53527a33",
605-
"_module" => "Main.JuliaRunJob",
606-
"filepath" => "/opt/juliahub/master_startup.jl",
607-
"group" => "master_startup",
608-
"level" => "Info",
609-
"steam" => "stdout",
610-
),
611-
),
612-
)
613-
end
614-
Dict("offset" => i, "value" => value)
615-
end
616-
logs_json = JSON.json(Dict("consumer_id" => 1234, "logs" => logs))
617-
return JuliaHub._RESTResponse(200, logs_json)
618-
end
619-
620530
function serve_legacy(logengine::LogEngine, query::AbstractDict)
621531
jobname = get(query, "jobname", nothing)
622532
job = get(logengine.jobs, jobname, nothing)

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ end
209209
:extend_job,
210210
:interrupt!, :isdone, :job, :job_file, :job_files,
211211
:job_logs, :job_logs_buffered, :job_logs_newer!, :job_logs_older!,
212-
:AbstractJobLogsBuffer, :KafkaLogsBuffer,
212+
:AbstractJobLogsBuffer,
213213
:hasfirst, :haslast, :jobs, :kill_job,
214214
:nodespec, :nodespecs, :reauthenticate!, :submit_job,
215215
:update_dataset, :upload_dataset, :wait_job,

0 commit comments

Comments
 (0)