Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.

Commit e1add41

Browse files
Merge pull request #27 from OpenFn/adaptor_service
AdaptorService
2 parents a63ffe6 + d42436d commit e1add41

19 files changed

+562
-62
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,7 @@ engine-*.tar
2727

2828
/tmp/
2929

30-
excoveralls.json
30+
excoveralls.json
31+
32+
node_modules
33+
package*.json

.tool-versions

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
nodejs 12.20.2
2+
elixir 1.11.3-otp-23
3+
erlang 23.2.7

lib/engine.ex

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,8 @@ defmodule Engine do
2121
@doc """
2222
DEPRECATED
2323
"""
24-
def execute_sync(%Message{} = message, %Job{} = job) do
24+
def execute_sync(%Message{}, %Job{}) do
2525
raise "execute_sync/2 is no longer supported"
26-
# {:ok, state_path} = Temp.path(%{prefix: "state", suffix: ".json"})
27-
# {:ok, final_state_path} = Temp.path(%{prefix: "final_state", suffix: ".json"})
28-
# {:ok, expression_path} = Temp.path(%{prefix: "expression", suffix: ".js"})
29-
30-
# File.write!(state_path, Jason.encode!(message.body))
31-
# File.write!(expression_path, job.expression || "")
32-
33-
# Engine.ShellRuntime.run(%RunSpec{
34-
# state_path: state_path,
35-
# final_state_path: final_state_path,
36-
# expression_path: expression_path,
37-
# adaptor: job.adaptor
38-
# })
3926
end
4027

4128
def handle_message(run_broadcaster, %Message{} = message) do

lib/engine/adaptor.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
defmodule Engine.Adaptor do
2+
@moduledoc false
3+
@type install_status :: :present | :installing
4+
5+
@type t :: %__MODULE__{
6+
name: binary(),
7+
version: binary(),
8+
path: binary(),
9+
status: install_status()
10+
}
11+
12+
@enforce_keys [:name, :version]
13+
defstruct @enforce_keys ++ [:status, :path]
14+
15+
def set_present(adaptor) do
16+
%{adaptor | status: :present}
17+
end
18+
end

lib/engine/adaptor/repo.ex

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
defmodule Engine.Adaptor.Repo do
2+
@callback list_local(path :: String.t()) :: list(Engine.Adaptor.t())
3+
def list_local(path, depth \\ 4) when is_binary(path) do
4+
System.cmd("find", ~w[#{path} -maxdepth #{depth} -type f -name package.json])
5+
|> case do
6+
{stdout, 0} ->
7+
stdout
8+
|> String.trim()
9+
|> String.split("\n")
10+
|> filter_parent_paths()
11+
|> Enum.map(fn package_json ->
12+
res = Jason.decode!(File.read!(package_json))
13+
get = &Map.get(res, &1)
14+
15+
%Engine.Adaptor{
16+
name: get.("name"),
17+
version: get.("version"),
18+
path: String.replace_suffix(package_json, "/package.json", ""),
19+
status: :present
20+
}
21+
end)
22+
23+
{stdout, _} ->
24+
raise "Failed to list adaptors from path: #{path}\n#{stdout}"
25+
end
26+
end
27+
28+
@doc """
29+
```
30+
|------------ alias ---------| |----- source &|| version -------|
31+
@openfn/language-common-v1.2.6@npm:@openfn/language-common@1.2.6
32+
```
33+
"""
34+
@callback install(adaptors :: list(String.t()) | String.t(), dir :: String.t()) ::
35+
{Collectable.t(), exit_status :: non_neg_integer}
36+
@spec install(adaptors :: list(String.t()) | String.t(), dir :: String.t()) ::
37+
{Collectable.t(), exit_status :: non_neg_integer}
38+
def install(adaptor, dir) when is_binary(adaptor),
39+
do: install([adaptor], dir)
40+
41+
def install(adaptors, dir) when is_list(adaptors) do
42+
System.cmd(
43+
"/usr/bin/env",
44+
[
45+
"sh",
46+
"-c",
47+
"""
48+
npm install \
49+
--no-save \
50+
--ignore-scripts \
51+
--no-fund \
52+
--no-audit \
53+
--no-package-lock \
54+
--global-style \
55+
--prefix #{dir} \
56+
#{Enum.join(adaptors, " ")}
57+
"""
58+
],
59+
stderr_to_stdout: true
60+
)
61+
end
62+
63+
@doc """
64+
Given a list of _potentially_ nested package.json files (i.e. dependancies of
65+
our adaptors), `filter_parent_paths/1` reduces the list down to the parent
66+
directories by grouping directory names by their shortest common path.
67+
"""
68+
def filter_parent_paths(paths) when is_list(paths) do
69+
paths
70+
|> Enum.sort(:desc)
71+
|> Enum.reduce([], fn path, acc ->
72+
base = path |> String.replace("package.json", "")
73+
74+
parent =
75+
acc
76+
|> Enum.find(base, fn parent -> String.contains?(base, parent) end)
77+
78+
acc ++ [parent]
79+
end)
80+
|> Enum.uniq()
81+
|> Enum.map(fn folder -> "#{folder}package.json" end)
82+
end
83+
end

lib/engine/adaptor/service.ex

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
defmodule Engine.Adaptor.Service do
2+
@moduledoc """
3+
The Adaptor Service is use to query and install adaptors in order to run jobs.
4+
5+
On startup, it queries the filesystem for `package.json` files and builds up
6+
a list of available adaptors.
7+
8+
## Configuration
9+
10+
The service requires at least `:adaptors_path`, which is used to both query
11+
which adaptors are installed and when to install new adaptors.
12+
13+
Another optional setting is: `:repo`, which must point at a module that will be
14+
used to do the querying and installing.
15+
16+
## Installing Adaptors
17+
18+
Using the `install/3` function an adaptor can be installed, which will also
19+
add it to the list of available adaptors.
20+
21+
The adaptor is marked as `:installing`, to allow for conditional behaviour
22+
elsewhere such as delaying or rejecting processing until the adaptor becomes
23+
available.
24+
25+
## Looking up adaptors
26+
27+
The module leans on Elixir's built-in `Version` module to provide version
28+
lookups.
29+
30+
When looking up an adaptor, either a string or a tuple can be used.
31+
In the case of requesting the latest version, any one of these will return
32+
the latest version the service is aware of.
33+
34+
- `@openfn/language-http`
35+
- `@openfn/language-http@latest`
36+
- `{"@openfn/language-http", nil}`
37+
- `{"@openfn/language-http", "latest"}`
38+
- `{~r/language-http/, "latest"}`
39+
40+
You can also request a specific version, or use a range specification:
41+
42+
- `@openfn/language-http@1.2.3`
43+
- `{"@openfn/language-http", "~> 1.2.0"}`
44+
- `{"@openfn/language-http", "< 2.0.0"}`
45+
46+
> **NOTE**
47+
> More complex npm style install strings like: `">=0.1.0 <0.2.0"`
48+
> are not supported.
49+
> Generally the tuple style is preferred when using range specifications as
50+
> the npm style strings have a simplistic regex splitter.
51+
52+
See [Version](https://hexdocs.pm/elixir/Version.html) for more details on
53+
matching versions.
54+
"""
55+
56+
@type package_spec :: {name :: String.t() | Regex.t(), version :: String.t() | nil}
57+
58+
use Agent
59+
require Logger
60+
61+
defmodule State do
62+
@moduledoc false
63+
64+
@type t :: %__MODULE__{
65+
name: GenServer.server(),
66+
adaptors: [Engine.Adaptor.t()],
67+
adaptors_path: binary(),
68+
repo: module()
69+
}
70+
71+
@enforce_keys [:adaptors_path]
72+
defstruct @enforce_keys ++ [:name, adaptors: [], repo: Engine.Adaptor.Repo]
73+
74+
def find_adaptor(%{adaptors: adaptors}, fun) when is_function(fun) do
75+
Enum.find(adaptors, fun)
76+
end
77+
78+
def find_adaptor(state, {package_name, version}) do
79+
find_adaptor(state, fn %{name: n, version: v} ->
80+
n == package_name && (v == version || is_nil(version))
81+
end)
82+
end
83+
84+
def refresh_list(state) do
85+
%{state | adaptors: state.repo.list_local(state.adaptors_path)}
86+
end
87+
88+
def add_adaptor(state, adaptor) do
89+
%{state | adaptors: state.adaptors ++ [adaptor]}
90+
end
91+
92+
def remove_adaptor(state, fun) do
93+
%{state | adaptors: Enum.reject(state.adaptors, fun)}
94+
end
95+
end
96+
97+
def start_link(opts) do
98+
state = struct!(State, opts) |> State.refresh_list()
99+
100+
Agent.start_link(fn -> state end, name: state.name || __MODULE__)
101+
end
102+
103+
def get_adaptors(agent) do
104+
Agent.get(agent, fn state -> state.adaptors end)
105+
end
106+
107+
@spec find_adaptor(Agent.agent(), package :: String.t()) :: Adaptor.t() | nil
108+
def find_adaptor(agent, package) when is_binary(package) do
109+
find_adaptor(agent, resolve_package_name(package))
110+
end
111+
112+
@spec find_adaptor(Agent.agent(), package_spec()) :: Adaptor.t() | nil
113+
def find_adaptor(agent, {package_name, version}) do
114+
requirement = version_to_requirement(version)
115+
116+
get_adaptors(agent)
117+
|> Enum.filter(&by_name_and_requirement(&1, package_name, requirement))
118+
|> Enum.max_by(
119+
fn %{version: version} ->
120+
Version.parse!(version)
121+
end,
122+
Version,
123+
fn -> nil end
124+
)
125+
end
126+
127+
defp by_name_and_requirement(adaptor, matcher = %Regex{}, requirement) do
128+
!!(Regex.match?(matcher, adaptor.name) &&
129+
Version.match?(adaptor.version, requirement))
130+
end
131+
132+
defp by_name_and_requirement(adaptor, name, requirement) do
133+
!!(match?(%{name: ^name}, adaptor) &&
134+
Version.match?(adaptor.version, requirement))
135+
end
136+
137+
defp version_to_requirement(version) do
138+
cond do
139+
version in ["latest", nil] ->
140+
"> 0.0.0"
141+
142+
String.match?(version, ~r/[<=>]/) ->
143+
raise ArgumentError, message: "Version specs not implemented yet."
144+
145+
true ->
146+
version
147+
end
148+
|> Version.parse_requirement!()
149+
end
150+
151+
def installed?(agent, package_spec) do
152+
!!find_adaptor(agent, package_spec)
153+
end
154+
155+
@spec install(Agent.agent(), binary()) ::
156+
{:ok, Engine.Adaptor.t()} | {:error, {Collectable.t(), exit_status :: non_neg_integer}}
157+
def install(agent, package) when is_binary(package) do
158+
install(agent, resolve_package_name(package))
159+
end
160+
161+
@spec install(Agent.agent(), package_spec()) ::
162+
{:ok, Engine.Adaptor.t()} | {:error, {Collectable.t(), exit_status :: non_neg_integer}}
163+
def install(agent, package_spec) do
164+
agent
165+
|> find_adaptor(package_spec)
166+
|> case do
167+
nil -> install!(agent, package_spec)
168+
existing -> {:ok, existing}
169+
end
170+
end
171+
172+
def install!(agent, {package_name, version} = package_spec) do
173+
new_adaptor = %Engine.Adaptor{name: package_name, version: version, status: :installing}
174+
175+
agent |> Agent.update(&State.add_adaptor(&1, new_adaptor))
176+
177+
{repo, adaptors_path} =
178+
agent
179+
|> Agent.get(fn state ->
180+
{state.repo, state.adaptors_path}
181+
end)
182+
183+
repo.install(build_aliased_name(package_spec), adaptors_path)
184+
|> case do
185+
{_stdout, 0} ->
186+
Logger.info("Refreshing Adaptor list")
187+
agent |> Agent.update(&State.refresh_list/1, 30_000)
188+
{:ok, agent |> Agent.get(&State.find_adaptor(&1, {package_name, version}))}
189+
190+
{stdout, code} ->
191+
agent
192+
|> Agent.update(fn state ->
193+
State.remove_adaptor(state, &match?(^new_adaptor, &1))
194+
end)
195+
196+
{:error, {stdout, code}}
197+
end
198+
end
199+
200+
def resolve_package_name(package_name) when is_binary(package_name) do
201+
~r/(@?[\/\d\n\w-]+)(?:@([\d\.\w]+))?$/
202+
|> Regex.run(package_name)
203+
|> case do
204+
[_, name, version] ->
205+
{name, version}
206+
207+
[_, _name] ->
208+
{package_name, nil}
209+
210+
_ ->
211+
raise ArgumentError, "Only npm style package names are currently supported"
212+
end
213+
end
214+
215+
@doc """
216+
Turns a package name and version into a string for NPM.
217+
218+
Since multiple versions of the same package can be installed, it's important
219+
to rely on npms built-in package aliasing.
220+
221+
E.g. `@openfn/language-http@1.2.8` turns into:
222+
`@openfn/language-http-v1.2.8@npm:@openfn/language-http@1.2.8`
223+
224+
Which is pretty long winded but necessary for the reason above.
225+
226+
If using this module as a base, it's likely you would need to adaptor this
227+
to suit your particular naming strategy.
228+
"""
229+
def build_aliased_name({package, version}) do
230+
"#{package}-v#{version}@npm:#{package}@#{version}"
231+
end
232+
end

lib/engine/run/handler.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ defmodule Engine.Run.Handler do
6565

6666
@impl Handler
6767
def log_callback(log_agent, context, args) do
68-
Engine.LogAgent.process_chunk(log_agent, args)
69-
|> __MODULE__.on_log_emit(context)
68+
res = Engine.LogAgent.process_chunk(log_agent, args)
69+
res |> __MODULE__.on_log_emit(context)
7070

7171
true
7272
end

0 commit comments

Comments
 (0)