diff --git a/src/graphviz/__init__.py b/src/graphviz/__init__.py new file mode 100644 index 0000000..66760eb --- /dev/null +++ b/src/graphviz/__init__.py @@ -0,0 +1,63 @@ +# graphviz - create dot, save, render, view + +"""Assemble DOT source code and render it with Graphviz. + +>>> dot = Digraph(comment='The Round Table') + +>>> dot.node('A', 'King Arthur') +>>> dot.node('B', 'Sir Bedevere the Wise') +>>> dot.node('L', 'Sir Lancelot the Brave') + +>>> dot.edges(['AB', 'AL']) + +>>> dot.edge('B', 'L', constraint='false') + +>>> print(dot) #doctest: +NORMALIZE_WHITESPACE +// The Round Table +digraph { + A [label="King Arthur"] + B [label="Sir Bedevere the Wise"] + L [label="Sir Lancelot the Brave"] + A -> B + A -> L + B -> L [constraint=false] +} +""" + +from .dot import Graph, Digraph +from .files import Source +from .lang import escape, nohtml +from .backend import (render, pipe, version, view, + ENGINES, FORMATS, RENDERERS, FORMATTERS, + ExecutableNotFound, RequiredArgumentError) + +__all__ = [ + 'Graph', 'Digraph', + 'Source', + 'escape', 'nohtml', + 'render', 'pipe', 'version', 'view', + 'ENGINES', 'FORMATS', 'RENDERERS', 'FORMATTERS', + 'ExecutableNotFound', 'RequiredArgumentError', +] + +__title__ = 'graphviz' +__version__ = '0.14.1.dev0' +__author__ = 'Sebastian Bank ' +__license__ = 'MIT, see LICENSE.txt' +__copyright__ = 'Copyright (c) 2013-2020 Sebastian Bank' + +#: Set of known layout commands used for rendering (``'dot'``, ``'neato'``, ...) +ENGINES = ENGINES + +#: Set of known output formats for rendering (``'pdf'``, ``'png'``, ...) +FORMATS = FORMATS + +#: Set of known output formatters for rendering (``'cairo'``, ``'gd'``, ...) +FORMATTERS = FORMATTERS + +#: Set of known output renderers for rendering (``'cairo'``, ``'gd'``, ...) +RENDERERS = RENDERERS + +ExecutableNotFound = ExecutableNotFound + +RequiredArgumentError = RequiredArgumentError diff --git a/src/graphviz/_compat.py b/src/graphviz/_compat.py new file mode 100644 index 0000000..7d6fc7b --- /dev/null +++ b/src/graphviz/_compat.py @@ -0,0 +1,69 @@ +# _compat.py - Python 2/3 compatibility + +import os +import sys +import operator +import subprocess + +PY2 = (sys.version_info.major == 2) + + +if PY2: + string_classes = (str, unicode) # needed individually for sublassing + text_type = unicode + + iteritems = operator.methodcaller('iteritems') + + def makedirs(name, mode=0o777, exist_ok=False): + try: + os.makedirs(name, mode) + except OSError: + if not exist_ok or not os.path.isdir(name): + raise + + def stderr_write_bytes(data, flush=False): + """Write data str to sys.stderr (flush if requested).""" + sys.stderr.write(data) + if flush: + sys.stderr.flush() + + def Popen_stderr_devnull(*args, **kwargs): # noqa: N802 + with open(os.devnull, 'w') as f: + return subprocess.Popen(*args, stderr=f, **kwargs) + + class CalledProcessError(subprocess.CalledProcessError): + + def __init__(self, returncode, cmd, output=None, stderr=None): + super(CalledProcessError, self).__init__(returncode, cmd, output) + self.stderr = stderr + + @property # pragma: no cover + def stdout(self): + return self.output + + @stdout.setter # pragma: no cover + def stdout(self, value): + self.output = value + + +else: + string_classes = (str,) + text_type = str + + def iteritems(d): + return iter(d.items()) + + def makedirs(name, mode=0o777, exist_ok=False): # allow os.makedirs mocking + return os.makedirs(name, mode, exist_ok=exist_ok) + + def stderr_write_bytes(data, flush=False): + """Encode data str and write to sys.stderr (flush if requested).""" + encoding = sys.stderr.encoding or sys.getdefaultencoding() + sys.stderr.write(data.decode(encoding)) + if flush: + sys.stderr.flush() + + def Popen_stderr_devnull(*args, **kwargs): # noqa: N802 + return subprocess.Popen(*args, stderr=subprocess.DEVNULL, **kwargs) + + CalledProcessError = subprocess.CalledProcessError diff --git a/src/graphviz/backend.py b/src/graphviz/backend.py new file mode 100644 index 0000000..6f4cc0c --- /dev/null +++ b/src/graphviz/backend.py @@ -0,0 +1,312 @@ +# backend.py - execute rendering, open files in viewer + +import os +import re +import errno +import logging +import platform +import subprocess + +from . import _compat + +from . import tools + +__all__ = [ + 'render', 'pipe', 'version', 'view', + 'ENGINES', 'FORMATS', 'RENDERERS', 'FORMATTERS', + 'ExecutableNotFound', 'RequiredArgumentError', +] + +ENGINES = { # http://www.graphviz.org/pdf/dot.1.pdf + 'dot', 'neato', 'twopi', 'circo', 'fdp', 'sfdp', 'patchwork', 'osage', +} + +FORMATS = { # http://www.graphviz.org/doc/info/output.html + 'bmp', + 'canon', 'dot', 'gv', 'xdot', 'xdot1.2', 'xdot1.4', + 'cgimage', + 'cmap', + 'eps', + 'exr', + 'fig', + 'gd', 'gd2', + 'gif', + 'gtk', + 'ico', + 'imap', 'cmapx', + 'imap_np', 'cmapx_np', + 'ismap', + 'jp2', + 'jpg', 'jpeg', 'jpe', + 'json', 'json0', 'dot_json', 'xdot_json', # Graphviz 2.40 + 'pct', 'pict', + 'pdf', + 'pic', + 'plain', 'plain-ext', + 'png', + 'pov', + 'ps', + 'ps2', + 'psd', + 'sgi', + 'svg', 'svgz', + 'tga', + 'tif', 'tiff', + 'tk', + 'vml', 'vmlz', + 'vrml', + 'wbmp', + 'webp', + 'xlib', + 'x11', +} + +RENDERERS = { # $ dot -T: + 'cairo', + 'dot', + 'fig', + 'gd', + 'gdiplus', + 'map', + 'pic', + 'pov', + 'ps', + 'svg', + 'tk', + 'vml', + 'vrml', + 'xdot', +} + +FORMATTERS = {'cairo', 'core', 'gd', 'gdiplus', 'gdwbmp', 'xlib'} + +PLATFORM = platform.system().lower() + + +log = logging.getLogger(__name__) + + +class ExecutableNotFound(RuntimeError): + """Exception raised if the Graphviz executable is not found.""" + + _msg = ('failed to execute %r, ' + 'make sure the Graphviz executables are on your systems\' PATH') + + def __init__(self, args): + super(ExecutableNotFound, self).__init__(self._msg % args) + + +class RequiredArgumentError(Exception): + """Exception raised if a required argument is missing.""" + + +class CalledProcessError(_compat.CalledProcessError): + + def __str__(self): + s = super(CalledProcessError, self).__str__() + return '%s [stderr: %r]' % (s, self.stderr) + + +def command(engine, format_, filepath=None, renderer=None, formatter=None): + """Return args list for ``subprocess.Popen`` and name of the rendered file.""" + if formatter is not None and renderer is None: + raise RequiredArgumentError('formatter given without renderer') + + if engine not in ENGINES: + raise ValueError('unknown engine: %r' % engine) + if format_ not in FORMATS: + raise ValueError('unknown format: %r' % format_) + if renderer is not None and renderer not in RENDERERS: + raise ValueError('unknown renderer: %r' % renderer) + if formatter is not None and formatter not in FORMATTERS: + raise ValueError('unknown formatter: %r' % formatter) + + output_format = [f for f in (format_, renderer, formatter) if f is not None] + cmd = [engine, '-T%s' % ':'.join(output_format)] + + if filepath is None: + rendered = None + else: + cmd.extend(['-O', filepath]) + suffix = '.'.join(reversed(output_format)) + rendered = '%s.%s' % (filepath, suffix) + + return cmd, rendered + + +if PLATFORM == 'windows': # pragma: no cover + def get_startupinfo(): + """Return subprocess.STARTUPINFO instance hiding the console window.""" + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + startupinfo.wShowWindow = subprocess.SW_HIDE + return startupinfo +else: + def get_startupinfo(): + """Return None for startupinfo argument of ``subprocess.Popen``.""" + return None + + +def run(cmd, input=None, capture_output=False, check=False, encoding=None, + quiet=False, **kwargs): + """Run the command described by cmd and return its (stdout, stderr) tuple.""" + log.debug('run %r', cmd) + + if input is not None: + kwargs['stdin'] = subprocess.PIPE + if encoding is not None: + input = input.encode(encoding) + + if capture_output: + kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE + + try: + proc = subprocess.Popen(cmd, startupinfo=get_startupinfo(), **kwargs) + except OSError as e: + if e.errno == errno.ENOENT: + raise ExecutableNotFound(cmd) + else: + raise + + out, err = proc.communicate(input) + + if not quiet and err: + _compat.stderr_write_bytes(err, flush=True) + + if encoding is not None: + if out is not None: + out = out.decode(encoding) + if err is not None: + err = err.decode(encoding) + + if check and proc.returncode: + raise CalledProcessError(proc.returncode, cmd, + output=out, stderr=err) + + return out, err + + +def render(engine, format, filepath, renderer=None, formatter=None, quiet=False): + """Render file with Graphviz ``engine`` into ``format``, return result filename. + + Args: + engine: The layout commmand used for rendering (``'dot'``, ``'neato'``, ...). + format: The output format used for rendering (``'pdf'``, ``'png'``, ...). + filepath: Path to the DOT source file to render. + renderer: The output renderer used for rendering (``'cairo'``, ``'gd'``, ...). + formatter: The output formatter used for rendering (``'cairo'``, ``'gd'``, ...). + quiet (bool): Suppress ``stderr`` output from the layout subprocess. + Returns: + The (possibly relative) path of the rendered file. + Raises: + ValueError: If ``engine``, ``format``, ``renderer``, or ``formatter`` are not known. + graphviz.RequiredArgumentError: If ``formatter`` is given but ``renderer`` is None. + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + + The layout command is started from the directory of ``filepath``, so that + references to external files (e.g. ``[image=...]``) can be given as paths + relative to the DOT source file. + """ + dirname, filename = os.path.split(filepath) + del filepath + + cmd, rendered = command(engine, format, filename, renderer, formatter) + if dirname: + cwd = dirname + rendered = os.path.join(dirname, rendered) + else: + cwd = None + + run(cmd, capture_output=True, cwd=cwd, check=True, quiet=quiet) + return rendered + + +def pipe(engine, format, data, renderer=None, formatter=None, quiet=False): + """Return ``data`` piped through Graphviz ``engine`` into ``format``. + + Args: + engine: The layout commmand used for rendering (``'dot'``, ``'neato'``, ...). + format: The output format used for rendering (``'pdf'``, ``'png'``, ...). + data: The binary (encoded) DOT source string to render. + renderer: The output renderer used for rendering (``'cairo'``, ``'gd'``, ...). + formatter: The output formatter used for rendering (``'cairo'``, ``'gd'``, ...). + quiet (bool): Suppress ``stderr`` output from the layout subprocess. + Returns: + Binary (encoded) stdout of the layout command. + Raises: + ValueError: If ``engine``, ``format``, ``renderer``, or ``formatter`` are not known. + graphviz.RequiredArgumentError: If ``formatter`` is given but ``renderer`` is None. + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + """ + cmd, _ = command(engine, format, None, renderer, formatter) + out, _ = run(cmd, input=data, capture_output=True, check=True, quiet=quiet) + return out + + +def version(): + """Return the version number tuple from the ``stderr`` output of ``dot -V``. + + Returns: + Two, three, or four ``int`` version ``tuple``. + Raises: + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + RuntimmeError: If the output cannot be parsed into a version number. + """ + cmd = ['dot', '-V'] + out, _ = run(cmd, check=True, encoding='ascii', + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + ma = re.search(r'graphviz version (\d+\.\d+(?:\.\d+){,2}) ', out) + if ma is None: + raise RuntimeError('cannot parse %r output: %r' % (cmd, out)) + + return tuple(int(d) for d in ma.group(1).split('.')) + + +def view(filepath, quiet=False): + """Open filepath with its default viewing application (platform-specific). + + Args: + filepath: Path to the file to open in viewer. + quiet (bool): Suppress ``stderr`` output from the viewer process + (ineffective on Windows). + Raises: + RuntimeError: If the current platform is not supported. + """ + try: + view_func = getattr(view, PLATFORM) + except AttributeError: + raise RuntimeError('platform %r not supported' % PLATFORM) + view_func(filepath, quiet) + + +@tools.attach(view, 'darwin') +def view_darwin(filepath, quiet): + """Open filepath with its default application (mac).""" + cmd = ['open', filepath] + log.debug('view: %r', cmd) + popen_func = _compat.Popen_stderr_devnull if quiet else subprocess.Popen + popen_func(cmd) + + +@tools.attach(view, 'linux') +@tools.attach(view, 'freebsd') +def view_unixoid(filepath, quiet): + """Open filepath in the user's preferred application (linux, freebsd).""" + cmd = ['xdg-open', filepath] + log.debug('view: %r', cmd) + popen_func = _compat.Popen_stderr_devnull if quiet else subprocess.Popen + popen_func(cmd) + + +@tools.attach(view, 'windows') +def view_windows(filepath, quiet): + """Start filepath with its associated application (windows).""" + # TODO: implement quiet=True + filepath = os.path.normpath(filepath) + log.debug('view: %r', filepath) + os.startfile(filepath) diff --git a/src/graphviz/dot.py b/src/graphviz/dot.py new file mode 100644 index 0000000..40cf1ae --- /dev/null +++ b/src/graphviz/dot.py @@ -0,0 +1,287 @@ +# dot.py - create dot code + +r"""Assemble DOT source code objects. + +>>> dot = Graph(comment=u'M\xf8nti Pyth\xf8n ik den H\xf8lie Grailen') + +>>> dot.node(u'M\xf8\xf8se') +>>> dot.node('trained_by', u'trained by') +>>> dot.node('tutte', u'TUTTE HERMSGERVORDENBROTBORDA') + +>>> dot.edge(u'M\xf8\xf8se', 'trained_by') +>>> dot.edge('trained_by', 'tutte') + +>>> dot.node_attr['shape'] = 'rectangle' + +>>> print(dot.source.replace(u'\xf8', '0')) #doctest: +NORMALIZE_WHITESPACE +// M0nti Pyth0n ik den H0lie Grailen +graph { + node [shape=rectangle] + "M00se" + trained_by [label="trained by"] + tutte [label="TUTTE HERMSGERVORDENBROTBORDA"] + "M00se" -- trained_by + trained_by -- tutte +} + +>>> dot.view('test-output/m00se.gv') # doctest: +SKIP +'test-output/m00se.gv.pdf' +""" + +from . import lang +from . import files + +__all__ = ['Graph', 'Digraph'] + + +class Dot(files.File): + """Assemble, save, and render DOT source code, open result in viewer.""" + + _comment = '// %s' + _subgraph = 'subgraph %s{' + _subgraph_plain = '%s{' + _node = _attr = '\t%s%s' + _attr_plain = _attr % ('%s', '') + _tail = '}' + + _quote = staticmethod(lang.quote) + _quote_edge = staticmethod(lang.quote_edge) + + _a_list = staticmethod(lang.a_list) + _attr_list = staticmethod(lang.attr_list) + + def __init__(self, name=None, comment=None, + filename=None, directory=None, + format=None, engine=None, encoding=files.ENCODING, + graph_attr=None, node_attr=None, edge_attr=None, body=None, + strict=False): + self.name = name + self.comment = comment + + super(Dot, self).__init__(filename, directory, format, engine, encoding) + + self.graph_attr = dict(graph_attr) if graph_attr is not None else {} + self.node_attr = dict(node_attr) if node_attr is not None else {} + self.edge_attr = dict(edge_attr) if edge_attr is not None else {} + + self.body = list(body) if body is not None else [] + + self.strict = strict + + def _kwargs(self): + result = super(Dot, self)._kwargs() + result.update(name=self.name, + comment=self.comment, + graph_attr=dict(self.graph_attr), + node_attr=dict(self.node_attr), + edge_attr=dict(self.edge_attr), + body=list(self.body), + strict=self.strict) + return result + + def clear(self, keep_attrs=False): + """Reset content to an empty body, clear graph/node/egde_attr mappings. + + Args: + keep_attrs (bool): preserve graph/node/egde_attr mappings + """ + if not keep_attrs: + for a in (self.graph_attr, self.node_attr, self.edge_attr): + a.clear() + del self.body[:] + + def __iter__(self, subgraph=False): + """Yield the DOT source code line by line (as graph or subgraph).""" + if self.comment: + yield self._comment % self.comment + + if subgraph: + if self.strict: + raise ValueError('subgraphs cannot be strict') + head = self._subgraph if self.name else self._subgraph_plain + else: + head = self._head_strict if self.strict else self._head + yield head % (self._quote(self.name) + ' ' if self.name else '') + + for kw in ('graph', 'node', 'edge'): + attrs = getattr(self, '%s_attr' % kw) + if attrs: + yield self._attr % (kw, self._attr_list(None, attrs)) + + for line in self.body: + yield line + + yield self._tail + + def __str__(self): + """The DOT source code as string.""" + return '\n'.join(self) + + source = property(__str__, doc=__str__.__doc__) + + def node(self, name, label=None, _attributes=None, **attrs): + """Create a node. + + Args: + name: Unique identifier for the node inside the source. + label: Caption to be displayed (defaults to the node ``name``). + attrs: Any additional node attributes (must be strings). + """ + name = self._quote(name) + attr_list = self._attr_list(label, attrs, _attributes) + line = self._node % (name, attr_list) + self.body.append(line) + + def edge(self, tail_name, head_name, label=None, _attributes=None, **attrs): + """Create an edge between two nodes. + + Args: + tail_name: Start node identifier. + head_name: End node identifier. + label: Caption to be displayed near the edge. + attrs: Any additional edge attributes (must be strings). + """ + tail_name = self._quote_edge(tail_name) + head_name = self._quote_edge(head_name) + attr_list = self._attr_list(label, attrs, _attributes) + line = self._edge % (tail_name, head_name, attr_list) + self.body.append(line) + + def edges(self, tail_head_iter): + """Create a bunch of edges. + + Args: + tail_head_iter: Iterable of ``(tail_name, head_name)`` pairs. + """ + edge = self._edge_plain + quote = self._quote_edge + lines = (edge % (quote(t), quote(h)) for t, h in tail_head_iter) + self.body.extend(lines) + + def attr(self, kw=None, _attributes=None, **attrs): + """Add a general or graph/node/edge attribute statement. + + Args: + kw: Attributes target (``None`` or ``'graph'``, ``'node'``, ``'edge'``). + attrs: Attributes to be set (must be strings, may be empty). + + See the :ref:`usage examples in the User Guide `. + """ + if kw is not None and kw.lower() not in ('graph', 'node', 'edge'): + raise ValueError('attr statement must target graph, node, or edge: ' + '%r' % kw) + if attrs or _attributes: + if kw is None: + a_list = self._a_list(None, attrs, _attributes) + line = self._attr_plain % a_list + else: + attr_list = self._attr_list(None, attrs, _attributes) + line = self._attr % (kw, attr_list) + self.body.append(line) + + def subgraph(self, graph=None, name=None, comment=None, + graph_attr=None, node_attr=None, edge_attr=None, body=None): + """Add the current content of the given sole ``graph`` argument as subgraph \ + or return a context manager returning a new graph instance created \ + with the given (``name``, ``comment``, etc.) arguments whose content is \ + added as subgraph when leaving the context manager's ``with``-block. + + Args: + graph: An instance of the same kind (:class:`.Graph`, :class:`.Digraph`) + as the current graph (sole argument in non-with-block use). + name: Subgraph name (``with``-block use). + comment: Subgraph comment (``with``-block use). + graph_attr: Subgraph-level attribute-value mapping (``with``-block use). + node_attr: Node-level attribute-value mapping (``with``-block use). + edge_attr: Edge-level attribute-value mapping (``with``-block use). + body: Verbatim lines to add to the subgraph ``body`` (``with``-block use). + + See the :ref:`usage examples in the User Guide `. + + .. note:: + If the ``name`` of the subgraph begins with ``'cluster'`` (all lowercase) + the layout engine will treat it as a special cluster subgraph. + """ + if graph is None: + return SubgraphContext(self, {'name': name, + 'comment': comment, + 'graph_attr': graph_attr, + 'node_attr': node_attr, + 'edge_attr': edge_attr, + 'body': body}) + + args = [name, comment, graph_attr, node_attr, edge_attr, body] + if not all(a is None for a in args): + raise ValueError('graph must be sole argument of subgraph()') + + if graph.directed != self.directed: + raise ValueError('%r cannot add subgraph of different kind:' + ' %r' % (self, graph)) + + lines = ['\t' + line for line in graph.__iter__(subgraph=True)] + self.body.extend(lines) + + +class SubgraphContext(object): + """Return a blank instance of the parent and add as subgraph on exit.""" + + def __init__(self, parent, kwargs): + self.parent = parent + self.graph = parent.__class__(**kwargs) + + def __enter__(self): + return self.graph + + def __exit__(self, type_, value, traceback): + if type_ is None: + self.parent.subgraph(self.graph) + + +class Graph(Dot): + """Graph source code in the DOT language. + + Args: + name: Graph name used in the source code. + comment: Comment added to the first line of the source. + filename: Filename for saving the source (defaults to ``name`` + ``'.gv'``). + directory: (Sub)directory for source saving and rendering. + format: Rendering output format (``'pdf'``, ``'png'``, ...). + engine: Layout command used (``'dot'``, ``'neato'``, ...). + encoding: Encoding for saving the source. + graph_attr: Mapping of ``(attribute, value)`` pairs for the graph. + node_attr: Mapping of ``(attribute, value)`` pairs set for all nodes. + edge_attr: Mapping of ``(attribute, value)`` pairs set for all edges. + body: Iterable of verbatim lines to add to the graph ``body``. + strict (bool): Rendering should merge multi-edges. + + Note: + All parameters are optional and can be changed under their + corresponding attribute name after instance creation. + """ + + _head = 'graph %s{' + _head_strict = 'strict %s' % _head + _edge = '\t%s -- %s%s' + _edge_plain = _edge % ('%s', '%s', '') + + @property + def directed(self): + """``False``""" + return False + + +class Digraph(Dot): + """Directed graph source code in the DOT language.""" + + if Graph.__doc__ is not None: + __doc__ += Graph.__doc__.partition('.')[2] + + _head = 'digraph %s{' + _head_strict = 'strict %s' % _head + _edge = '\t%s -> %s%s' + _edge_plain = _edge % ('%s', '%s', '') + + @property + def directed(self): + """``True``""" + return True diff --git a/src/graphviz/files.py b/src/graphviz/files.py new file mode 100644 index 0000000..bf1585f --- /dev/null +++ b/src/graphviz/files.py @@ -0,0 +1,311 @@ +# files.py - save, render, view + +"""Save DOT code objects, render with Graphviz dot, and open in viewer.""" + +import os +import io +import codecs +import locale +import logging + +from ._compat import text_type + +from . import backend +from . import tools + +__all__ = ['File', 'Source'] + +ENCODING = 'utf-8' + + +log = logging.getLogger(__name__) + + +class Base(object): + + _format = 'pdf' + _engine = 'dot' + _encoding = ENCODING + + @property + def format(self): + """The output format used for rendering (``'pdf'``, ``'png'``, ...).""" + return self._format + + @format.setter + def format(self, format): + format = format.lower() + if format not in backend.FORMATS: + raise ValueError('unknown format: %r' % format) + self._format = format + + @property + def engine(self): + """The layout commmand used for rendering (``'dot'``, ``'neato'``, ...).""" + return self._engine + + @engine.setter + def engine(self, engine): + engine = engine.lower() + if engine not in backend.ENGINES: + raise ValueError('unknown engine: %r' % engine) + self._engine = engine + + @property + def encoding(self): + """The encoding for the saved source file.""" + return self._encoding + + @encoding.setter + def encoding(self, encoding): + if encoding is None: + encoding = locale.getpreferredencoding() + codecs.lookup(encoding) # raise early + self._encoding = encoding + + def copy(self): + """Return a copied instance of the object. + + Returns: + An independent copy of the current object. + """ + kwargs = self._kwargs() + return self.__class__(**kwargs) + + def _kwargs(self): + ns = self.__dict__ + return {a[1:]: ns[a] for a in ('_format', '_engine', '_encoding') + if a in ns} + + +class File(Base): + + directory = '' + + _default_extension = 'gv' + + def __init__(self, filename=None, directory=None, + format=None, engine=None, encoding=ENCODING): + if filename is None: + name = getattr(self, 'name', None) or self.__class__.__name__ + filename = '%s.%s' % (name, self._default_extension) + self.filename = filename + + if directory is not None: + self.directory = directory + + if format is not None: + self.format = format + + if engine is not None: + self.engine = engine + + self.encoding = encoding + + def _kwargs(self): + result = super(File, self)._kwargs() + result['filename'] = self.filename + if 'directory' in self.__dict__: + result['directory'] = self.directory + return result + + def _repr_svg_(self): + return self.pipe(format='svg').decode(self._encoding) + + def pipe(self, format=None, renderer=None, formatter=None, quiet=False): + """Return the source piped through the Graphviz layout command. + + Args: + format: The output format used for rendering (``'pdf'``, ``'png'``, etc.). + renderer: The output renderer used for rendering (``'cairo'``, ``'gd'``, ...). + formatter: The output formatter used for rendering (``'cairo'``, ``'gd'``, ...). + quiet (bool): Suppress ``stderr`` output from the layout subprocess. + Returns: + Binary (encoded) stdout of the layout command. + Raises: + ValueError: If ``format``, ``renderer``, or ``formatter`` are not known. + graphviz.RequiredArgumentError: If ``formatter`` is given but ``renderer`` is None. + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + """ + if format is None: + format = self._format + + data = text_type(self.source).encode(self._encoding) + + out = backend.pipe(self._engine, format, data, + renderer=renderer, formatter=formatter, + quiet=quiet) + + return out + + @property + def filepath(self): + return os.path.join(self.directory, self.filename) + + def save(self, filename=None, directory=None): + """Save the DOT source to file. Ensure the file ends with a newline. + + Args: + filename: Filename for saving the source (defaults to ``name`` + ``'.gv'``) + directory: (Sub)directory for source saving and rendering. + Returns: + The (possibly relative) path of the saved source file. + """ + if filename is not None: + self.filename = filename + if directory is not None: + self.directory = directory + + filepath = self.filepath + tools.mkdirs(filepath) + + data = text_type(self.source) + + log.debug('write %d bytes to %r', len(data), filepath) + with io.open(filepath, 'w', encoding=self.encoding) as fd: + fd.write(data) + if not data.endswith(u'\n'): + fd.write(u'\n') + + return filepath + + def render(self, filename=None, directory=None, view=False, cleanup=False, + format=None, renderer=None, formatter=None, + quiet=False, quiet_view=False): + """Save the source to file and render with the Graphviz engine. + + Args: + filename: Filename for saving the source (defaults to ``name`` + ``'.gv'``) + directory: (Sub)directory for source saving and rendering. + view (bool): Open the rendered result with the default application. + cleanup (bool): Delete the source file after rendering. + format: The output format used for rendering (``'pdf'``, ``'png'``, etc.). + renderer: The output renderer used for rendering (``'cairo'``, ``'gd'``, ...). + formatter: The output formatter used for rendering (``'cairo'``, ``'gd'``, ...). + quiet (bool): Suppress ``stderr`` output from the layout subprocess. + quiet_view (bool): Suppress ``stderr`` output from the viewer process + (implies ``view=True``, ineffective on Windows). + Returns: + The (possibly relative) path of the rendered file. + Raises: + ValueError: If ``format``, ``renderer``, or ``formatter`` are not known. + graphviz.RequiredArgumentError: If ``formatter`` is given but ``renderer`` is None. + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + RuntimeError: If viewer opening is requested but not supported. + + The layout command is started from the directory of ``filepath``, so that + references to external files (e.g. ``[image=...]``) can be given as paths + relative to the DOT source file. + """ + filepath = self.save(filename, directory) + + if format is None: + format = self._format + + rendered = backend.render(self._engine, format, filepath, + renderer=renderer, formatter=formatter, + quiet=quiet) + + if cleanup: + log.debug('delete %r', filepath) + os.remove(filepath) + + if quiet_view or view: + self._view(rendered, self._format, quiet_view) + + return rendered + + def view(self, filename=None, directory=None, cleanup=False, + quiet=False, quiet_view=False): + """Save the source to file, open the rendered result in a viewer. + + Args: + filename: Filename for saving the source (defaults to ``name`` + ``'.gv'``) + directory: (Sub)directory for source saving and rendering. + cleanup (bool): Delete the source file after rendering. + quiet (bool): Suppress ``stderr`` output from the layout subprocess. + quiet_view (bool): Suppress ``stderr`` output from the viewer process + (ineffective on Windows). + Returns: + The (possibly relative) path of the rendered file. + Raises: + graphviz.ExecutableNotFound: If the Graphviz executable is not found. + subprocess.CalledProcessError: If the exit status is non-zero. + RuntimeError: If opening the viewer is not supported. + + Short-cut method for calling :meth:`.render` with ``view=True``. + """ + return self.render(filename=filename, directory=directory, + view=True, cleanup=cleanup, + quiet=quiet, quiet_view=quiet_view) + + def _view(self, filepath, format, quiet): + """Start the right viewer based on file format and platform.""" + methodnames = [ + '_view_%s_%s' % (format, backend.PLATFORM), + '_view_%s' % backend.PLATFORM, + ] + for name in methodnames: + view_method = getattr(self, name, None) + if view_method is not None: + break + else: + raise RuntimeError('%r has no built-in viewer support for %r' + ' on %r platform' % (self.__class__, format, + backend.PLATFORM)) + view_method(filepath, quiet) + + _view_darwin = staticmethod(backend.view.darwin) + _view_freebsd = staticmethod(backend.view.freebsd) + _view_linux = staticmethod(backend.view.linux) + _view_windows = staticmethod(backend.view.windows) + + +class Source(File): + """Verbatim DOT source code string to be rendered by Graphviz. + + Args: + source: The verbatim DOT source code string. + filename: Filename for saving the source (defaults to ``'Source.gv'``). + directory: (Sub)directory for source saving and rendering. + format: Rendering output format (``'pdf'``, ``'png'``, ...). + engine: Layout command used (``'dot'``, ``'neato'``, ...). + encoding: Encoding for saving the source. + + Note: + All parameters except ``source`` are optional. All of them can be changed + under their corresponding attribute name after instance creation. + """ + + @classmethod + def from_file(cls, filename, directory=None, + format=None, engine=None, encoding=ENCODING): + """Return an instance with the source string read from the given file. + + Args: + filename: Filename for loading/saving the source. + directory: (Sub)directory for source loading/saving and rendering. + format: Rendering output format (``'pdf'``, ``'png'``, ...). + engine: Layout command used (``'dot'``, ``'neato'``, ...). + encoding: Encoding for loading/saving the source. + """ + filepath = os.path.join(directory or '', filename) + if encoding is None: + encoding = locale.getpreferredencoding() + log.debug('read %r with encoding %r', filepath, encoding) + with io.open(filepath, encoding=encoding) as fd: + source = fd.read() + return cls(source, filename, directory, format, engine, encoding) + + def __init__(self, source, filename=None, directory=None, + format=None, engine=None, encoding=ENCODING): + super(Source, self).__init__(filename, directory, + format, engine, encoding) + self.source = source #: The verbatim DOT source code string. + + def _kwargs(self): + result = super(Source, self)._kwargs() + result['source'] = self.source + return result diff --git a/src/graphviz/lang.py b/src/graphviz/lang.py new file mode 100644 index 0000000..8969700 --- /dev/null +++ b/src/graphviz/lang.py @@ -0,0 +1,195 @@ +# lang.py - dot language creation helpers + +"""Quote strings to be valid DOT identifiers, assemble attribute lists.""" + +import re +import collections +import functools + +from . import _compat + +from . import tools + +__all__ = ['quote', 'quote_edge', 'a_list', 'attr_list', 'escape', 'nohtml'] + +# https://www.graphviz.org/doc/info/lang.html +# https://www.graphviz.org/doc/info/attrs.html#k:escString + +HTML_STRING = re.compile(r'<.*>$', re.DOTALL) + +ID = re.compile(r'([a-zA-Z_][a-zA-Z0-9_]*|-?(\.[0-9]+|[0-9]+(\.[0-9]*)?))$') + +KEYWORDS = {'node', 'edge', 'graph', 'digraph', 'subgraph', 'strict'} + +COMPASS = {'n', 'ne', 'e', 'se', 's', 'sw', 'w', 'nw', 'c', '_'} # TODO + +QUOTE_OPTIONAL_BACKSLASHES = re.compile(r'(?P(?:\\\\)*)' + r'\\?(?P")') + +ESCAPE_UNESCAPED_QUOTES = functools.partial(QUOTE_OPTIONAL_BACKSLASHES.sub, + r'\g\\\g') + + +def quote(identifier, + is_html_string=HTML_STRING.match, + is_valid_id=ID.match, dot_keywords=KEYWORDS, + escape_unescaped_quotes=ESCAPE_UNESCAPED_QUOTES): + r"""Return DOT identifier from string, quote if needed. + + >>> quote('') + '""' + + >>> quote('spam') + 'spam' + + >>> quote('spam spam') + '"spam spam"' + + >>> quote('-4.2') + '-4.2' + + >>> quote('.42') + '.42' + + >>> quote('<spam>') + '<spam>' + + >>> quote(nohtml('<>')) + '"<>"' + + >>> print(quote('"')) + "\"" + + >>> print(quote('\\"')) + "\"" + + >>> print(quote('\\\\"')) + "\\\"" + + >>> print(quote('\\\\\\"')) + "\\\"" + """ + if is_html_string(identifier) and not isinstance(identifier, NoHtml): + pass + elif not is_valid_id(identifier) or identifier.lower() in dot_keywords: + return '"%s"' % escape_unescaped_quotes(identifier) + return identifier + + +def quote_edge(identifier): + """Return DOT edge statement node_id from string, quote if needed. + + >>> quote_edge('spam') + 'spam' + + >>> quote_edge('spam spam:eggs eggs') + '"spam spam":"eggs eggs"' + + >>> quote_edge('spam:eggs:s') + 'spam:eggs:s' + """ + node, _, rest = identifier.partition(':') + parts = [quote(node)] + if rest: + port, _, compass = rest.partition(':') + parts.append(quote(port)) + if compass: + parts.append(compass) + return ':'.join(parts) + + +def a_list(label=None, kwargs=None, attributes=None): + """Return assembled DOT a_list string. + + >>> a_list('spam', {'spam': None, 'ham': 'ham ham', 'eggs': ''}) + 'label=spam eggs="" ham="ham ham"' + """ + result = ['label=%s' % quote(label)] if label is not None else [] + if kwargs: + items = ['%s=%s' % (quote(k), quote(v)) + for k, v in tools.mapping_items(kwargs) if v is not None] + result.extend(items) + if attributes: + if hasattr(attributes, 'items'): + attributes = tools.mapping_items(attributes) + items = ['%s=%s' % (quote(k), quote(v)) + for k, v in attributes if v is not None] + result.extend(items) + return ' '.join(result) + + +def attr_list(label=None, kwargs=None, attributes=None): + """Return assembled DOT attribute list string. + + Sorts ``kwargs`` and ``attributes`` if they are plain dicts (to avoid + unpredictable order from hash randomization in Python 3 versions). + + >>> attr_list() + '' + + >>> attr_list('spam spam', kwargs={'eggs': 'eggs', 'ham': 'ham ham'}) + ' [label="spam spam" eggs=eggs ham="ham ham"]' + + >>> attr_list(kwargs={'spam': None, 'eggs': ''}) + ' [eggs=""]' + """ + content = a_list(label, kwargs, attributes) + if not content: + return '' + return ' [%s]' % content + + +def escape(s): + r"""Return ``s`` as literal disabling special meaning of backslashes and ``'<...>'``. + + see also https://www.graphviz.org/doc/info/attrs.html#k:escString + + Args: + s: String in which backslashes and ``'<...>'`` should be treated as literal. + Raises: + TypeError: If ``s`` is not a ``str`` on Python 3, or a ``str``/``unicode`` on Python 2. + + >>> print(escape(r'\l')) + \\l + """ + return nohtml(s.replace('\\', '\\\\')) + + +class NoHtml(object): + """Mixin for string subclasses disabling fall-through of ``'<...>'``.""" + + __slots__ = () + + _doc = "%s subclass that does not treat ``'<...>'`` as DOT HTML string." + + @classmethod + def _subcls(cls, other): + name = '%s_%s' % (cls.__name__, other.__name__) + bases = (other, cls) + ns = {'__doc__': cls._doc % other.__name__} + return type(name, bases, ns) + + +NOHTML = collections.OrderedDict((c, NoHtml._subcls(c)) for c in _compat.string_classes) + + +def nohtml(s): + """Return copy of ``s`` that will not treat ``'<...>'`` as DOT HTML string in quoting. + + Args: + s: String in which leading ``'<'`` and trailing ``'>'`` should be treated as literal. + Raises: + TypeError: If ``s`` is not a ``str`` on Python 3, or a ``str``/``unicode`` on Python 2. + + >>> quote('<>-*-<>') + '<>-*-<>' + + >>> quote(nohtml('<>-*-<>')) + '"<>-*-<>"' + """ + try: + subcls = NOHTML[type(s)] + except KeyError: + raise TypeError('%r does not have one of the required types:' + ' %r' % (s, list(NOHTML))) + return subcls(s) diff --git a/src/graphviz/tools.py b/src/graphviz/tools.py new file mode 100644 index 0000000..ef435b7 --- /dev/null +++ b/src/graphviz/tools.py @@ -0,0 +1,47 @@ +# tools.py - generic helpers + +import os + +from . import _compat + +__all__ = ['attach', 'mkdirs', 'mapping_items'] + + +def attach(object, name): + """Return a decorator doing ``setattr(object, name)`` with its argument. + + >>> spam = type('Spam', (object,), {})() + >>> @attach(spam, 'eggs') + ... def func(): + ... pass + >>> spam.eggs # doctest: +ELLIPSIS + + """ + def decorator(func): + setattr(object, name, func) + return func + return decorator + + +def mkdirs(filename, mode=0o777): + """Recursively create directories up to the path of ``filename`` as needed.""" + dirname = os.path.dirname(filename) + if not dirname: + return + _compat.makedirs(dirname, mode=mode, exist_ok=True) + + +def mapping_items(mapping): + """Return an iterator over the ``mapping`` items, sort if it's a plain dict. + + >>> list(mapping_items({'spam': 0, 'ham': 1, 'eggs': 2})) + [('eggs', 2), ('ham', 1), ('spam', 0)] + + >>> from collections import OrderedDict + >>> list(mapping_items(OrderedDict(enumerate(['spam', 'ham', 'eggs'])))) + [(0, 'spam'), (1, 'ham'), (2, 'eggs')] + """ + result = _compat.iteritems(mapping) + if type(mapping) is dict: + result = iter(sorted(result)) + return result