-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathutility.py
More file actions
472 lines (426 loc) · 17.9 KB
/
utility.py
File metadata and controls
472 lines (426 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
from termcolor import colored
import requests
from rich.console import Console
from rich.markdown import Markdown
import sys as sys
# Required for Questions Panel
import os
import time
import locale
from collections import defaultdict
from simple_term_menu import TerminalMenu
import webbrowser
from error import SearchError
from save import SaveSearchResults
from markdown import MarkdownRenderer
from settings import PLAYBOOK_FILE
# Required for OAuth
import json
from oauthlib.oauth2 import MobileApplicationClient
from requests_oauthlib import OAuth2Session
# Required for Selenium script and for web_driver_manager
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
# Required for replacing html escape characters with their corresponding ascii characters
import html
console = Console()
def get_browser_driver():
# Try to install web drivers for one of these browsers
# Chrome, Firefox, Edge (One of them must be installed)
try:
return webdriver.Chrome(ChromeDriverManager().install())
except ValueError:
try:
return webdriver.Firefox(executable_path=GeckoDriverManager().install())
except ValueError:
try:
return webdriver.Edge(EdgeChromiumDriverManager().install())
except ValueError:
print(
"You do not have one of these supported browsers:"
+ "Chrome, Firefox, Edge"
)
class Playbook:
def __init__(self):
self.key = "DYNAMIC"
@property
def playbook_path(self):
"""Create an environment variable 'DYNAMIC containing the path of dynamic_playbook.json and returns i."""
if not os.getenv(self.key):
os.environ[self.key] = PLAYBOOK_FILE
return os.getenv(self.key)
@property
def playbook_template(self):
# Basic template and fields of playbook
return {"time_of_update": time.time(), "items_stackoverflow": []}
@property
def playbook_content(self):
# Reads playbook data from local storage and returns it
try:
with open(self.playbook_path, "r") as playbook:
return json.load(playbook)
except FileNotFoundError:
os.makedirs(os.path.dirname(self.playbook_path), exist_ok=True)
with open(self.playbook_path, "w") as playbook:
json.dump(self.playbook_template, playbook, ensure_ascii=False)
return self.playbook_content
@playbook_content.setter
def playbook_content(self, value):
if isinstance(value, dict):
with open(self.playbook_path, "w") as playbook:
json.dump(value, playbook, ensure_ascii=False)
else:
raise ValueError("value should be of type dict")
def is_question_in_playbook(self, question_id):
content = self.playbook_content
for entry in content["items_stackoverflow"]:
if int(entry["question_id"]) == int(question_id):
return True
return False
def add_to_playbook(self, stackoverflow_object, question_id):
"""
Receives a QuestionsPanelStackoverflow object and
saves data of a particular question into playbook
Saves playbook in the following format
{
time_of_update: unix,
items_stackoverflow:
[
{
time: unix timestamp
question_id: 123456,
question_title: 'question_title',
question_link: 'link',
answer_body: 'body of the answer'
},
...
]
"""
if self.is_question_in_playbook(question_id):
console.print(
"[red] Question is already in the playbook," + "No need to add"
)
return
for question in stackoverflow_object.questions_data:
if int(question[1]) == int(question_id):
content = self.playbook_content
now = time.time()
content["time_of_update"] = now
content["items_stackoverflow"].append(
{
"time_of_creation": now,
"question_id": int(question_id),
"question_title": question[0],
"question_link": question[2],
"answer_body": stackoverflow_object.answer_data[
int(question_id)
],
}
)
self.playbook_content = content
console.print("[green] Question added to the playbook")
def delete_from_playbook(self, stackoverflow_object, question_id):
content = self.playbook_content
for i in range(len(content["items_stackoverflow"])):
if content["items_stackoverflow"][i]["question_id"] == question_id:
del content["items_stackoverflow"][i]
break
self.playbook_content = content
self = Playbook()
self.display_panel()
def display_panel(self):
playbook_data = self.playbook_content
if len(playbook_data["items_stackoverflow"]) == 0:
SearchError(
"You have no entries in the playbook",
"Browse and save entries in playbook with 'p' key",
)
sys.exit()
# Creates QuestionPanelStackoverflow object
# populates its question_data and answer_data and displays it
question_panel = QuestionsPanelStackoverflow()
for item in playbook_data["items_stackoverflow"]:
question_panel.questions_data.append(
[item["question_title"], item["question_id"], item["question_link"]]
)
question_panel.answer_data[item["question_id"]] = item["answer_body"]
question_panel.display_panel([], playbook=True)
class QuestionsPanelStackoverflow:
def __init__(self):
# list( list( question_title, question_id, question_link )... )
self.questions_data = []
# dict( question_id:list( body, link ))
# corresponding to self.questions_data
self.answer_data = defaultdict(lambda: False)
self.line_color = "bold red"
self.heading_color = "bold blue"
self.utility = Utility()
self.playbook = Playbook()
def replaceHtmlEscapeCharacters(self, question_title):
"""
Function to replace HTML escape characters in question's title
to their corresponding ASCII characters
For example, if the title was something like this:
'"static const" vs "#define" vs "enum"'
the HTML escape characters would be replaced with their corresponding ASCII
characters and the resulting string would be:
'"static const" vs "#define" vs "enum"'
"""
return html.unescape(question_title)
def populate_question_data(self, questions_list):
"""
Function to populate question data property
Creates batch request to stackexchange API and to get question
details of questions with id in the list. Stores the returned
data in the following format:
list( list( question_title, question_link, question_id ) )
"""
with console.status("Getting the questions..."):
try:
resp = requests.get(self.utility.get_batch_ques_url(questions_list))
except:
SearchError("Search Failed", "Try connecting to the internet")
sys.exit()
json_ques_data = resp.json()
self.questions_data = [
[self.replaceHtmlEscapeCharacters(item["title"].replace("|", "")), item["question_id"], item["link"]]
for item in json_ques_data["items"]
]
def populate_answer_data(self, questions_list):
"""
Function to populate answer data property
Creates batch request to stackexchange API
to get ans of questions with question id
in the list. Stores the returned data data
in the following format:
dict( question_id:list( body, link ) )
"""
with console.status("Searching answers..."):
try:
resp = requests.get(self.utility.get_batch_ans_url(questions_list))
except:
SearchError("Search Failed", "Try connecting to the internet")
sys.exit()
json_ans_data = resp.json()
for item in json_ans_data["items"]:
if not (self.answer_data[item["question_id"]]):
self.answer_data[item["question_id"]] = item["body_markdown"]
# Sometimes the StackExchange API fails to deliver some answers.
# The below code is to fetch them
failed_ques_id = [
question[1]
for question in self.questions_data
if not (self.answer_data[question[1]])
]
if not (len(failed_ques_id) == 0):
self.populate_answer_data(failed_ques_id)
def return_formatted_ans(self, ques_id):
# This function uses uses Rich Markdown to format answers body.
body_markdown = self.answer_data[int(ques_id)]
body_markdown = str(body_markdown)
xml_markup_replacement = [
("&", "&"),
("<", "<"),
(">", ">"),
(""", '"'),
("'", "'"),
("'", "'"),
]
for convert_from, convert_to in xml_markup_replacement:
body_markdown = body_markdown.replace(convert_from, convert_to)
width = os.get_terminal_size().columns
console = Console(width=width - 4)
markdown = Markdown(body_markdown, hyperlinks=False)
with console.capture() as capture:
console.print(markdown)
highlighted = capture.get()
if not ("UTF" in locale.getlocale()[1]):
box_replacement = [
("─", "-"),
("═", "="),
("║", "|"),
("│", "|"),
("┌", "+"),
("└", "+"),
("┐", "+"),
("┘", "+"),
("╔", "+"),
("╚", "+"),
("╗", "+"),
("╝", "+"),
("•", "*"),
]
for convert_from, convert_to in box_replacement:
highlighted = highlighted.replace(convert_from, convert_to)
return highlighted
def navigate_questions_panel(self, playbook=False):
# Code for navigating through the question panel
if playbook:
message = "Playbook Questions"
instructions = ". Press 'd' to delete from playbook"
keys = ("enter", "d")
else:
message = "Relevant Questions"
instructions = ". Press 'p' to save in playbook"
keys = ("enter", "p")
console.rule("[bold blue] {}".format(message), style="bold red")
console.print(
"[yellow] Use arrow keys to navigate."
+ "'q' or 'Esc' to quit. 'Enter' to open in a browser"
+ instructions
)
console.print()
options = ["|".join(map(str, question)) for question in self.questions_data]
question_menu = TerminalMenu(
options,
preview_command=self.return_formatted_ans,
preview_size=0.75,
accept_keys=keys,
)
quitting = False
while not (quitting):
options_index = question_menu.show()
try:
question_link = self.questions_data[options_index][2]
except Exception:
return sys.exit() if playbook else None
else:
if question_menu.chosen_accept_key == "enter":
webbrowser.open(question_link)
elif question_menu.chosen_accept_key == "p":
self.playbook.add_to_playbook(
self, self.questions_data[options_index][1]
)
elif question_menu.chosen_accept_key == "d" and playbook:
self.playbook.delete_from_playbook(
self, self.questions_data[options_index][1]
)
def display_panel(self, questions_list, playbook=False):
if not playbook:
self.populate_question_data(questions_list)
self.populate_answer_data(questions_list)
self.navigate_questions_panel(playbook=playbook)
class Utility:
def __init__(self):
# the parent url
self.search_content_url = "https://api.stackexchange.com/"
def __get_search_url(self, question, tags):
"""
This function returns the url that contains all the custom
data provided by the user such as tags and question, which
can finally be used to get answers
"""
return f"{self.search_content_url}/2.2/search/advanced?order=desc&sort=relevance&" \
f"tagged={tags}&q={question}&site=stackoverflow"
def get_batch_ques_url(self, ques_id_list):
"""
Returns URL which contains ques_ids which can be use to get
get the details of all the corresponding questions
"""
batch_ques_id = ""
for question_id in ques_id_list:
batch_ques_id += str(question_id) + ";"
return f"{self.search_content_url}/2.2/questions/{batch_ques_id[:-1]}?order=desc&sort=votes&site=stackoverflow&filter=!--1nZwsgqvRX"
def get_batch_ans_url(self, ques_id_list):
batch_ques_id = ""
for question_id in ques_id_list:
batch_ques_id += str(question_id) + ";"
return f"{self.search_content_url}/2.2/questions/{batch_ques_id[:-1]}/answers?order=desc&sort=votes&site=stackoverflow&filter=!--1nZwsgqvRX"
def make_request(self, que, tag: str):
"""
This function uses the requests library to make
the rest api call to the stackexchange server.
:param que: The user questions that servers as
a question in the api.
:type que: String
:param tag: The tags that user wants for searching the relevant
answers. For e.g. TypeError might be for multiple
languages so is tag is used as "Python" then the
api will return answers based on the tags and question.
:type tag: String
:return: Json response from the api call.
:rtype: Json format data
"""
with console.status("Searching..."):
try:
url = self.__get_search_url(que, tag)
resp = requests.get(url)
except:
SearchError(
"\U0001F613 Search Failed",
"\U0001F4BB Try connecting to the internet",
)
sys.exit()
return resp.json()
def get_que(self, json_data):
"""
This function returns the list of ids of the questions
that have been answered, from the response that we get
from the make_request function.
"""
que_id = []
for data in json_data["items"]:
if data["answer_count"]:
que_id.append(data["question_id"])
return que_id
def get_ans(self, questions_list):
"""
This Function creates QuestionsPanel_stackoverflow class which supports
Rendering, navigation, searching and redirecting capabilities
"""
stackoverflow_panel = QuestionsPanelStackoverflow()
stackoverflow_panel.display_panel(questions_list)
# Support for reddit searching can also be implemented from here
# Get an access token and extract to a JSON file "access_token.json"
@classmethod
def setCustomKey(self):
"""
scopes possible values:
read_inbox - access a user's global inbox
no_expiry - access_token's with this scope do not expire
write_access - perform write operations as a user
private_info - access full history of a user's private
actions on the site
"""
client_id = 20013
scopes = "read_inbox"
authorization_url = "https://stackoverflow.com/oauth/dialog"
redirect_uri = "https://stackexchange.com/oauth/login_success"
# Create an OAuth session and open the auth_url in a browser
# for the user to authenticate
stackApps = OAuth2Session(
client=MobileApplicationClient(client_id=client_id),
scope=scopes,
redirect_uri=redirect_uri,
)
auth_url, state = stackApps.authorization_url(authorization_url)
driver = get_browser_driver()
# Open auth_url in one of the supported browsers
driver.get(auth_url)
# Close the window after 20s
# (Assuming that the user logs in within 30 seconds)
time.sleep(30)
# Close the windows as soon as authorization is done
try:
WebDriverWait(driver, 1).until(
EC.presence_of_element_located((By.TAG_NAME, "h2"))
)
callback_url = driver.current_url
finally:
driver.quit()
# Extract access token data from callback_url
accessTokenData = stackApps.token_from_fragment(callback_url)
# Store the access token data in a dictionary
jsonDict = {
"access_token": accessTokenData["access_token"],
"expires": accessTokenData["expires"],
"state": state,
}
with open("access_token.json", "w") as jsonFile:
json.dump(jsonDict, jsonFile)