Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 54 additions & 33 deletions pyomo/common/formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
StreamIndenter
"""

import io
import re
import types

from typing import Iterable

from pyomo.common.sorting import sorted_robust


Expand Down Expand Up @@ -204,42 +208,59 @@ class StreamIndenter:
StreamIndenter objects may be arbitrarily nested.
"""

def __init__(self, ostream, indent=' ' * 4):
self.os = ostream
self.indent = indent
self.stripped_indent = indent.rstrip()
self.newline = True

def __getattr__(self, name):
return getattr(self.os, name)

def write(self, data):
if not len(data):
return
lines = data.split('\n')
if self.newline:
if lines[0]:
self.os.write(self.indent + lines[0])
else:
self.os.write(self.stripped_indent)
_newline_re = re.compile('\n([^\n])')
_blankline_re = re.compile('\n\n')

def __init__(self, ostream: io.TextIOBase, indent: str = ' ' * 4):
super().__setattr__('wrapped_os', ostream)
# The following is a "cute" trick: because of the __getattr__ /
# __setattr__ overloads, nested StreamIndenter instances all
# print directly to the underlying stream object, and all share
# a common `newline` flag.
if isinstance(ostream, StreamIndenter):
super().__setattr__('target_os', ostream.target_os)
indent = ostream.indent + indent
else:
self.os.write(lines[0])
if len(lines) < 2:
self.newline = False
return
for line in lines[1:-1]:
if line:
self.os.write("\n" + self.indent + line)
else:
self.os.write("\n" + self.stripped_indent)
if lines[-1]:
self.os.write("\n" + self.indent + lines[-1])
self.newline = False
super().__setattr__('target_os', ostream)
# We will assume the last thing written to the stream we
# are wrapping ended with a newline
super().__setattr__('newline', True)
super().__setattr__('indent', indent)
super().__setattr__('indent_match', f'\n{indent}\\1')
super().__setattr__('stripped_indent', indent.rstrip())
if self.stripped_indent:
super().__setattr__('blankline_match', f'\n{self.stripped_indent}\n')

def __getattr__(self, name: str):
return getattr(self.wrapped_os, name)

def __setattr__(self, name: str, val):
if name in self.__dict__:
super().__setattr__(name, val)
else:
self.os.write("\n")
self.newline = True
self.wrapped_os.__setattr__(name, val)

def writelines(self, sequence):
def write(self, data: str) -> int:
if not data:
return 0
written = 0
if self.newline:
if data[0] != '\n':
written += self.target_os.write(self.indent)
elif self.stripped_indent:
written += self.target_os.write(self.stripped_indent)
data = self._newline_re.sub(self.indent_match, data)
if self.stripped_indent:
data, n = self._blankline_re.subn(self.blankline_match, data)
# If we replaced any blank lines, then we need to check
# again to catch cases like "\n\n\n"
if n:
data = self._blankline_re.sub(self.blankline_match, data)
written += self.target_os.write(data)
self.newline = data.endswith('\n')
return written

def writelines(self, sequence: Iterable[str]) -> None:
for x in sequence:
self.write(x)

Expand Down
19 changes: 11 additions & 8 deletions pyomo/common/tee.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ def _retry(self, fcn, *args, retries=10):
failCount = 0
while 1:
try:
fcn(*args)
break
return fcn(*args)
except (OSError, BlockingIOError):
failCount += 1
if failCount >= retries:
Expand All @@ -89,10 +88,12 @@ def flush(self):
self._retry(self._ostream.flush)
self._handle.flush = True

def write(self, data):
def write(self, data: str) -> int:
ans = 0
chunksize = _pipe_buffersize >> 1 # 1/2 the buffer size
for i in range(0, len(data), chunksize):
self._retry(self._ostream.write, data[i : i + chunksize])
ans += self._retry(self._ostream.write, data[i : i + chunksize])
return ans

def writelines(self, data):
for line in data:
Expand All @@ -116,19 +117,21 @@ class _AutoFlush(_SignalFlush):
# Because we define write() and writelines() under windows, we
# need to make sure that _AutoFlush calls them

def write(self, data):
super().write(data)
def write(self, data: str) -> int:
ans = super().write(data)
self.flush()
return ans

def writelines(self, data):
super().writelines(data)
self.flush()

else:

def write(self, data):
self._ostream.write(data)
def write(self, data: str) -> int:
ans = self._ostream.write(data)
self.flush()
return ans

def writelines(self, data):
self._ostream.writelines(data)
Expand Down
53 changes: 49 additions & 4 deletions pyomo/common/tests/test_formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,27 +195,72 @@ def _data_gen(i, j):


class TestStreamIndenter(unittest.TestCase):
def test_empty(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
self.assertEqual(0, OUT2.write(''))
self.assertEqual('', OUT2.getvalue())

def test_noprefix(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
OUT2.write('Hello?\nHello, world!')
self.assertEqual(28, OUT2.write('Hello?\nHello, world!'))
self.assertEqual(' Hello?\n Hello, world!', OUT2.getvalue())

def test_prefix(self):
prefix = 'foo:'
prefix = 'foo: '
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1, prefix)
OUT2.write('Hello?\nHello, world!')
self.assertEqual('foo:Hello?\nfoo:Hello, world!', OUT2.getvalue())
OUT2.write('Hello?\nText\n\nHello, world!')
self.assertEqual(
'foo: Hello?\nfoo: Text\nfoo:\nfoo: Hello, world!', OUT2.getvalue()
)

def test_blank_lines(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
OUT2.write('Hello?\n\nText\n\nHello, world!')
self.assertEqual(' Hello?\n\n Text\n\n Hello, world!', OUT2.getvalue())

def test_blank_lines_nonwhitespace_indent(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1, " | ")
OUT2.write('Hello?\n\nText\n')
OUT2.write('\n')
OUT2.write('Hello, world!')
self.assertEqual(
' | Hello?\n |\n | Text\n |\n | Hello, world!', OUT2.getvalue()
)

def test_writelines(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
OUT2.writelines(['Hello?\n', '\n', 'Text\n', '\n', 'Hello, world!'])
self.assertEqual(' Hello?\n\n Text\n\n Hello, world!', OUT2.getvalue())

def test_nested(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
OUT3 = StreamIndenter(OUT2)
self.assertIs(OUT3.target_os, OUT2.target_os)
self.assertIs(OUT3.target_os, OUT1)
OUT3.write('Hello?\n\nText\n\nHello, world!')
self.assertEqual(
' Hello?\n\n Text\n\n Hello, world!', OUT1.getvalue()
)

def test_nested_interleave(self):
OUT1 = StringIO()
OUT2 = StreamIndenter(OUT1)
OUT3 = StreamIndenter(OUT2)
self.assertIs(OUT3.target_os, OUT2.target_os)
self.assertIs(OUT3.target_os, OUT1)
OUT3.write('Hello?')
OUT2.write('\n\n')
OUT3.write('Text\n')
OUT2.write('Hi\n')
OUT3.write('Hello, world!')
self.assertEqual(
' Hello?\n\n Text\n Hi\n Hello, world!',
OUT1.getvalue(),
)
10 changes: 3 additions & 7 deletions pyomo/core/base/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,24 +366,20 @@ def _pprint_base_impl(
if not _attr and self.parent_block() is None:
_name = ''

# We only indent everything if we printed the header
if _attr or _name or _doc:
ostream = StreamIndenter(ostream, self._PPRINT_INDENT)
# The first line should be a hanging indent (i.e., not indented)
ostream.newline = False

if self.is_reference():
_attr = list(_attr) if _attr else []
_attr.append(('ReferenceTo', self.referent))

if _name:
ostream.write(_name + " : ")
if _doc:
ostream.write(_doc + '\n')
ostream.write(_doc + '\n' + self._PPRINT_INDENT)
if _attr:
ostream.write(", ".join("%s=%s" % (k, v) for k, v in _attr))
if _attr or _name or _doc:
ostream.write("\n")
# We only indent everything if we printed the header
ostream = StreamIndenter(ostream, self._PPRINT_INDENT)

if not _constructed:
# HACK: for backwards compatibility, Abstract blocks will
Expand Down