@@ -13,99 +13,72 @@ import scala.concurrent.duration.Duration
1313import scala .util .{Failure , Success , Try }
1414
1515final case class Response (
16- `__ID__` : String ,
17- `__BODY__` : Option [String ] = None ,
18- `__HEADER__` : Map [String , Seq [String ]] = Map (),
19- `__STATUS_CODE__` : Option [Int ] = None ,
20- `__STATUS_MSG__` : Option [String ] = None ,
21- `__ERROR__` : Option [String ] = None ,
22- `__ELAPSED_TIME__` : Long ,
23- `__URL__` : String ,
24- `__DATA__` : String )
16+ `__ID__` : String ,
17+ `__BODY__` : Option [String ] = None ,
18+ `__HEADER__` : Map [String , Seq [String ]] = Map (),
19+ `__STATUS_CODE__` : Option [Int ] = None ,
20+ `__STATUS_MSG__` : Option [String ] = None ,
21+ `__ERROR__` : Option [String ] = None ,
22+ `__ELAPSED_TIME__` : Long ,
23+ `__URL__` : String ,
24+ `__DATA__` : String )
2525
2626final case class ResponseBatch (
27- `__ID__` : Seq [String ],
28- `__BODY__` : Option [String ] = None ,
29- `__HEADER__` : Map [String , Seq [String ]] = Map (),
30- `__STATUS_CODE__` : Option [Int ] = None ,
31- `__STATUS_MSG__` : Option [String ] = None ,
32- `__ERROR__` : Option [String ] = None ,
33- `__ELAPSED_TIME__` : Long ,
34- `__URL__` : String ,
35- `__DATA__` : String )
27+ `__ID__` : Seq [String ],
28+ `__BODY__` : Option [String ] = None ,
29+ `__HEADER__` : Map [String , Seq [String ]] = Map (),
30+ `__STATUS_CODE__` : Option [Int ] = None ,
31+ `__STATUS_MSG__` : Option [String ] = None ,
32+ `__ERROR__` : Option [String ] = None ,
33+ `__ELAPSED_TIME__` : Long ,
34+ `__URL__` : String ,
35+ `__DATA__` : String )
3636
3737
3838object Alias {
3939 val DataCol = " __DATA__"
4040 val IdCol = " __ID__"
4141 val UrlCol = " __URL__"
42- val ParamsCol = " __PARAMS__"
43- val HiddenParamsCol = " __HIDDEN_PARAMS__"
44- val HeadersCol = " __HEADERS__"
4542}
4643
4744
4845private [almaren] case class HTTP (
49- headers : Map [String , String ],
50- params : Map [String , String ],
51- hiddenParams : Map [String , String ],
52- method : String ,
53- requestHandler : (Row , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response ,
54- session : () => requests.Session ,
55- connectTimeout : Int ,
56- readTimeout : Int ,
57- threadPoolSize : Int ,
58- batchSize : Int ) extends Main {
59-
60- private def columnExists (df : DataFrame , colName : String ): Option [String ] =
61- Some (colName).filter(df.columns.contains)
62-
63- private def getRowParam (row : Row , colNameExists : Option [String ]): Map [String , String ] =
64- colNameExists.map(name => row.getAs[Map [String , String ]](name)).getOrElse(Map ())
46+ headers : Map [String , String ],
47+ params : Map [String , String ],
48+ hiddenParams : Map [String , String ],
49+ method : String ,
50+ requestHandler : (Row , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response ,
51+ session : () => requests.Session ,
52+ connectTimeout : Int ,
53+ readTimeout : Int ,
54+ threadPoolSize : Int ,
55+ batchSize : Int ) extends Main {
6556
6657 override def core (df : DataFrame ): DataFrame = {
6758 logger.info(s " headers:{ $headers},params:{ $params}, method:{ $method}, connectTimeout:{ $connectTimeout}, readTimeout{ $readTimeout}, threadPoolSize:{ $threadPoolSize}, batchSize:{ $batchSize} " )
6859
6960 import df .sparkSession .implicits ._
7061
71- val headersColExists = columnExists(df,Alias .HeadersCol )
72- val paramsColExists = columnExists(df,Alias .ParamsCol )
73- val hiddenParamsColExists = columnExists(df,Alias .HiddenParamsCol )
74-
7562 val result = df.mapPartitions(partition => {
7663
7764 implicit val ec : ExecutionContext = ExecutionContext .fromExecutor(Executors .newFixedThreadPool(threadPoolSize))
7865 val data : Iterator [Future [Seq [Response ]]] = partition.grouped(batchSize).map(rows => Future {
7966 val s = session()
80- rows.map(row => request(row, s, headersColExists, paramsColExists, hiddenParamsColExists ))
67+ rows.map(row => request(row, s))
8168 })
8269 val requests : Future [Iterator [Seq [Response ]]] = Future .sequence(data)
8370 Await .result(requests, Duration .Inf ).flatten
8471 })
8572 result.toDF
8673 }
8774
88- private def request (row : Row , session : Session , headersColExists : Option [ String ], paramsColExists : Option [ String ], hiddenParamsColExists : Option [ String ] ): Response = {
75+ private def request (row : Row , session : Session ): Response = {
8976 val url = row.getAs[Any ](Alias .UrlCol ).toString
9077 val startTime = System .currentTimeMillis()
91-
92- val headersRow = getRowParam(row, headersColExists)
93- val paramsRow = getRowParam(row, paramsColExists)
94- val hiddenParamsRow = getRowParam(row, hiddenParamsColExists)
95-
96- val allHeaders = headers ++ headersRow
97- val allParams = params ++ paramsRow
98- val allHiddenParams = hiddenParams ++ hiddenParamsRow
99-
100- logger.info(s " headers:{ $allHeaders},params:{ $allParams} " )
101- val response = Try (requestHandler(row, session, url, allHeaders, allParams ++ allHiddenParams, method, connectTimeout, readTimeout))
78+ val response = Try (requestHandler(row, session, url, headers, params ++ hiddenParams, method, connectTimeout, readTimeout))
10279 val elapsedTime = System .currentTimeMillis() - startTime
10380 val id = row.getAs[Any ](Alias .IdCol ).toString
104-
105- val data = method.toUpperCase match {
106- case " PUT" | " POST" | " DELETE" => row.getAs[Any ](Alias .DataCol ).toString
107- case _ => " "
108- }
81+ val data = row.getAs[Any ](Alias .DataCol ).toString
10982
11083 def getResponse (r : requests.Response ) = Response (
11184 id,
@@ -126,25 +99,25 @@ private[almaren] case class HTTP(
12699 case Failure (re : RequestFailedException ) => getResponse(re.response)
127100 case Failure (f) => {
128101 logger.error(" Almaren HTTP Request Error" , f)
129- Response (id, `__ERROR__` = Some (f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url,`__DATA__` = data)
102+ Response (id, `__ERROR__` = Some (f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url, `__DATA__` = data)
130103 }
131104 }
132105 }
133106}
134107
135108private [almaren] case class HTTPBatch (
136- url : String ,
137- headers : Map [String , String ],
138- params : Map [String , String ],
139- hiddenParams : Map [String , String ],
140- method : String ,
141- requestHandler : (String , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response ,
142- session : () => requests.Session ,
143- connectTimeout : Int ,
144- readTimeout : Int ,
145- batchSize : Int ,
146- batchDelimiter : (Seq [Row ]) => String
147- ) extends Main {
109+ url : String ,
110+ headers : Map [String , String ],
111+ params : Map [String , String ],
112+ hiddenParams : Map [String , String ],
113+ method : String ,
114+ requestHandler : (String , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response ,
115+ session : () => requests.Session ,
116+ connectTimeout : Int ,
117+ readTimeout : Int ,
118+ batchSize : Int ,
119+ batchDelimiter : (Seq [Row ]) => String
120+ ) extends Main {
148121
149122 override def core (df : DataFrame ): DataFrame = {
150123 logger.info(s " url:{ $url}, headers:{ $headers},params:{ $params}, method:{ $method}, connectTimeout:{ $connectTimeout}, readTimeout{ $readTimeout}, batchSize:{ $batchSize} " )
@@ -202,16 +175,16 @@ private[almaren] case class HTTPBatch(
202175private [almaren] trait HTTPConnector extends Core {
203176
204177 def http (
205- headers : Map [String , String ] = Map (),
206- params : Map [String , String ] = Map (),
207- hiddenParams : Map [String , String ] = Map (),
208- method : String ,
209- requestHandler : (Row , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response = HTTPConn .defaultHandler,
210- session : () => requests.Session = HTTPConn .defaultSession,
211- connectTimeout : Int = 60000 ,
212- readTimeout : Int = 1000 ,
213- threadPoolSize : Int = 1 ,
214- batchSize : Int = 5000 ): Option [Tree ] =
178+ headers : Map [String , String ] = Map (),
179+ params : Map [String , String ] = Map (),
180+ hiddenParams : Map [String , String ] = Map (),
181+ method : String ,
182+ requestHandler : (Row , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response = HTTPConn .defaultHandler,
183+ session : () => requests.Session = HTTPConn .defaultSession,
184+ connectTimeout : Int = 60000 ,
185+ readTimeout : Int = 1000 ,
186+ threadPoolSize : Int = 1 ,
187+ batchSize : Int = 5000 ): Option [Tree ] =
215188 HTTP (
216189 headers,
217190 params,
@@ -226,18 +199,18 @@ private[almaren] trait HTTPConnector extends Core {
226199 )
227200
228201 def httpBatch (
229- url : String ,
230- headers : Map [String , String ] = Map (),
231- params : Map [String , String ] = Map (),
232- hiddenParams : Map [String , String ] = Map (),
233- method : String ,
234- requestHandler : (String , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response = HTTPConn .defaultHandlerBatch,
235- session : () => requests.Session = HTTPConn .defaultSession,
236- connectTimeout : Int = 60000 ,
237- readTimeout : Int = 1000 ,
238- batchSize : Int = 5000 ,
239- batchDelimiter : (Seq [Row ]) => String = HTTPConn .defaultBatchDelimiter
240- ): Option [Tree ] =
202+ url : String ,
203+ headers : Map [String , String ] = Map (),
204+ params : Map [String , String ] = Map (),
205+ hiddenParams : Map [String , String ] = Map (),
206+ method : String ,
207+ requestHandler : (String , Session , String , Map [String , String ], Map [String , String ], String , Int , Int ) => requests.Response = HTTPConn .defaultHandlerBatch,
208+ session : () => requests.Session = HTTPConn .defaultSession,
209+ connectTimeout : Int = 60000 ,
210+ readTimeout : Int = 1000 ,
211+ batchSize : Int = 5000 ,
212+ batchDelimiter : (Seq [Row ]) => String = HTTPConn .defaultBatchDelimiter
213+ ): Option [Tree ] =
241214 HTTPBatch (
242215 url,
243216 headers,
@@ -283,4 +256,4 @@ object HTTPConn {
283256 val defaultSession = () => requests.Session ()
284257
285258 implicit class HTTPImplicit (val container : Option [Tree ]) extends HTTPConnector
286- }
259+ }
0 commit comments