aboutsummaryrefslogtreecommitdiffstats
path: root/pyGHDL/lsp
diff options
context:
space:
mode:
Diffstat (limited to 'pyGHDL/lsp')
-rw-r--r--pyGHDL/lsp/README18
-rw-r--r--pyGHDL/lsp/__init__.py0
-rw-r--r--pyGHDL/lsp/document.py226
-rw-r--r--pyGHDL/lsp/lsp.py311
-rw-r--r--pyGHDL/lsp/lsptools.py41
-rw-r--r--pyGHDL/lsp/main.py129
-rw-r--r--pyGHDL/lsp/references.py100
-rw-r--r--pyGHDL/lsp/symbols.py177
-rw-r--r--pyGHDL/lsp/version.py1
-rw-r--r--pyGHDL/lsp/vhdl_ls.py150
-rw-r--r--pyGHDL/lsp/workspace.py499
11 files changed, 1652 insertions, 0 deletions
diff --git a/pyGHDL/lsp/README b/pyGHDL/lsp/README
new file mode 100644
index 000000000..c82ccc4d4
--- /dev/null
+++ b/pyGHDL/lsp/README
@@ -0,0 +1,18 @@
+pyghdl is a language server for VHDL based on ghdl.
+
+It implements the Language Server Protocol.
+The server is implemented in Python (3.x) but relies on libghdl for parsing.
+
+It also provides a python interface to libghdl, which could be used to
+develop tools around the parser and analyzer.
+
+To install:
+1) First install ghdl (add --enable-python during configuration).
+ This is needed so that the libraries are available
+2) In ghdl/python, install pyghdl. There is a setup.py script, so you can do:
+ $ pip install .
+ To install for development: pip install -e .
+ Add --user to install in your home directory.
+
+The executable is named 'ghdl-ls'. It uses stdin/stdout to communicate with
+its client.
diff --git a/pyGHDL/lsp/__init__.py b/pyGHDL/lsp/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pyGHDL/lsp/__init__.py
diff --git a/pyGHDL/lsp/document.py b/pyGHDL/lsp/document.py
new file mode 100644
index 000000000..1b988220c
--- /dev/null
+++ b/pyGHDL/lsp/document.py
@@ -0,0 +1,226 @@
+import ctypes
+import logging
+import os
+import pyGHDL.libghdl.name_table as name_table
+import pyGHDL.libghdl.files_map as files_map
+import pyGHDL.libghdl.files_map_editor as files_map_editor
+import pyGHDL.libghdl.libraries as libraries
+import pyGHDL.libghdl.vhdl.nodes as nodes
+import pyGHDL.libghdl.vhdl.sem_lib as sem_lib
+import pyGHDL.libghdl.vhdl.sem as sem
+import pyGHDL.libghdl.vhdl.formatters as formatters
+
+from . import symbols, references
+
+log = logging.getLogger(__name__)
+
+
+class Document(object):
+ # The encoding used for the files.
+ # Unfortunately this is not fully reliable. The client can read the
+ # file using its own view of the encoding. It then pass the document
+ # to the server using unicode(utf-8). Then the document is converted
+ # back to bytes using this encoding. And we hope the result would be
+ # the same as the file. Because VHDL uses the iso 8859-1 character
+ # set, we use the same encoding. The client should also use 8859-1.
+ encoding = "iso-8859-1"
+
+ initial_gap_size = 4096
+
+ def __init__(self, uri, sfe=None, version=None):
+ self.uri = uri
+ self.version = version
+ self._fe = sfe
+ self.gap_size = Document.initial_gap_size
+ self._tree = nodes.Null_Iir
+
+ @staticmethod
+ def load(source, dirname, filename):
+ # Write text to file buffer.
+ src_bytes = source.encode(Document.encoding, "replace")
+ src_len = len(src_bytes)
+ buf_len = src_len + Document.initial_gap_size
+ fileid = name_table.Get_Identifier(filename.encode("utf-8"))
+ if os.path.isabs(filename):
+ dirid = name_table.Null_Identifier
+ else:
+ dirid = name_table.Get_Identifier(dirname.encode("utf-8"))
+ sfe = files_map.Reserve_Source_File(dirid, fileid, buf_len)
+ files_map_editor.Fill_Text(sfe, ctypes.c_char_p(src_bytes), src_len)
+ return sfe
+
+ def reload(self, source):
+ """Reload the source of a document. """
+ src_bytes = source.encode(Document.encoding, "replace")
+ files_map_editor.Fill_Text(self._fe, ctypes.c_char_p(src_bytes), len(src_bytes))
+
+ def __str__(self):
+ return str(self.uri)
+
+ def apply_change(self, change):
+ """Apply a change to the document."""
+ text = change["text"]
+ change_range = change.get("range")
+
+ text_bytes = text.encode(Document.encoding, "replace")
+
+ if not change_range:
+ # The whole file has changed
+ raise AssertionError
+ # if len(text_bytes) < libghdl.Files_Map.Get_Buffer_Length(self._fe):
+ # xxxx_replace
+ # else:
+ # xxxx_free
+ # xxxx_allocate
+ # return
+
+ start_line = change_range["start"]["line"]
+ start_col = change_range["start"]["character"]
+ end_line = change_range["end"]["line"]
+ end_col = change_range["end"]["character"]
+
+ status = files_map_editor.Replace_Text(
+ self._fe,
+ start_line + 1,
+ start_col,
+ end_line + 1,
+ end_col,
+ ctypes.c_char_p(text_bytes),
+ len(text_bytes),
+ )
+ if status:
+ return
+
+ # Failed to replace text.
+ # Increase size
+ self.gap_size *= 2
+ fileid = files_map.Get_File_Name(self._fe)
+ dirid = files_map.Get_Directory_Name(self._fe)
+ buf_len = files_map.Get_File_Length(self._fe) + len(text_bytes) + self.gap_size
+ files_map.Discard_Source_File(self._fe)
+ new_sfe = files_map.Reserve_Source_File(dirid, fileid, buf_len)
+ files_map_editor.Copy_Source_File(new_sfe, self._fe)
+ files_map.Free_Source_File(self._fe)
+ self._fe = new_sfe
+ status = files_map_editor.Replace_Text(
+ self._fe,
+ start_line + 1,
+ start_col,
+ end_line + 1,
+ end_col,
+ ctypes.c_char_p(text_bytes),
+ len(text_bytes),
+ )
+ assert status
+
+ def check_document(self, text):
+ log.debug("Checking document: %s", self.uri)
+
+ text_bytes = text.encode(Document.encoding, "replace")
+
+ files_map_editor.Check_Buffer_Content(
+ self._fe, ctypes.c_char_p(text_bytes), len(text_bytes)
+ )
+
+ @staticmethod
+ def add_to_library(tree):
+ # Detach the chain of units.
+ unit = nodes.Get_First_Design_Unit(tree)
+ nodes.Set_First_Design_Unit(tree, nodes.Null_Iir)
+ # FIXME: free the design file ?
+ tree = nodes.Null_Iir
+ # Analyze unit after unit.
+ while unit != nodes.Null_Iir:
+ # Pop the first unit.
+ next_unit = nodes.Get_Chain(unit)
+ nodes.Set_Chain(unit, nodes.Null_Iir)
+ lib_unit = nodes.Get_Library_Unit(unit)
+ if (
+ lib_unit != nodes.Null_Iir
+ and nodes.Get_Identifier(unit) != name_table.Null_Identifier
+ ):
+ # Put the unit (only if it has a library unit) in the library.
+ libraries.Add_Design_Unit_Into_Library(unit, False)
+ tree = nodes.Get_Design_File(unit)
+ unit = next_unit
+ return tree
+
+ def parse_document(self):
+ """Parse a document and put the units in the library"""
+ assert self._tree == nodes.Null_Iir
+ tree = sem_lib.Load_File(self._fe)
+ if tree == nodes.Null_Iir:
+ return
+ self._tree = Document.add_to_library(tree)
+ log.debug("add_to_library(%u) -> %u", tree, self._tree)
+ if self._tree == nodes.Null_Iir:
+ return
+ nodes.Set_Design_File_Source(self._tree, self._fe)
+
+ def compute_diags(self):
+ log.debug("parse doc %d %s", self._fe, self.uri)
+ self.parse_document()
+ if self._tree == nodes.Null_Iir:
+ # No units, nothing to add.
+ return
+ # Semantic analysis.
+ unit = nodes.Get_First_Design_Unit(self._tree)
+ while unit != nodes.Null_Iir:
+ sem.Semantic(unit)
+ nodes.Set_Date_State(unit, nodes.Date_State.Analyze)
+ unit = nodes.Get_Chain(unit)
+
+ def flatten_symbols(self, syms, parent):
+ res = []
+ for s in syms:
+ s["location"] = {"uri": self.uri, "range": s["range"]}
+ del s["range"]
+ s.pop("detail", None)
+ if parent is not None:
+ s["containerName"] = parent
+ res.append(s)
+ children = s.pop("children", None)
+ if children is not None:
+ res.extend(self.flatten_symbols(children, s))
+ return res
+
+ def document_symbols(self):
+ log.debug("document_symbols")
+ if self._tree == nodes.Null_Iir:
+ return []
+ syms = symbols.get_symbols_chain(
+ self._fe, nodes.Get_First_Design_Unit(self._tree)
+ )
+ return self.flatten_symbols(syms, None)
+
+ def position_to_location(self, position):
+ pos = files_map.File_Line_To_Position(self._fe, position["line"] + 1)
+ return files_map.File_Pos_To_Location(self._fe, pos) + position["character"]
+
+ def goto_definition(self, position):
+ loc = self.position_to_location(position)
+ return references.goto_definition(self._tree, loc)
+
+ def format_range(self, rng):
+ first_line = rng["start"]["line"] + 1
+ last_line = rng["end"]["line"] + (1 if rng["end"]["character"] != 0 else 0)
+ if last_line < first_line:
+ return None
+ if self._tree == nodes.Null_Iir:
+ return None
+ hand = formatters.Allocate_Handle()
+ formatters.Indent_String(self._tree, hand, first_line, last_line)
+ buffer = formatters.Get_C_String(hand)
+ buf_len = formatters.Get_Length(hand)
+ newtext = buffer[:buf_len].decode(Document.encoding)
+ res = [
+ {
+ "range": {
+ "start": {"line": first_line - 1, "character": 0},
+ "end": {"line": last_line, "character": 0},
+ },
+ "newText": newtext,
+ }
+ ]
+ formatters.Free_Handle(hand)
+ return res
diff --git a/pyGHDL/lsp/lsp.py b/pyGHDL/lsp/lsp.py
new file mode 100644
index 000000000..60f32be76
--- /dev/null
+++ b/pyGHDL/lsp/lsp.py
@@ -0,0 +1,311 @@
+import os
+import logging
+import json
+import attr
+from attr.validators import instance_of
+
+try:
+ from urllib.parse import unquote, quote
+except ImportError:
+ from urllib2 import quote
+ from urlparse import unquote
+
+log = logging.getLogger("ghdl-ls")
+
+
+class ProtocolError(Exception):
+ pass
+
+
+class LSPConn:
+ def __init__(self, reader, writer):
+ self.reader = reader
+ self.writer = writer
+
+ def readline(self):
+ data = self.reader.readline()
+ return data.decode("utf-8")
+
+ def read(self, size):
+ data = self.reader.read(size)
+ return data.decode("utf-8")
+
+ def write(self, out):
+ self.writer.write(out.encode())
+ self.writer.flush()
+
+
+def path_from_uri(uri):
+ # Convert file uri to path (strip html like head part)
+ if not uri.startswith("file://"):
+ return uri
+ if os.name == "nt":
+ _, path = uri.split("file:///", 1)
+ else:
+ _, path = uri.split("file://", 1)
+ return os.path.normpath(unquote(path))
+
+
+def path_to_uri(path):
+ # Convert path to file uri (add html like head part)
+ if os.name == "nt":
+ return "file:///" + quote(path.replace("\\", "/"))
+ else:
+ return "file://" + quote(path)
+
+
+class LanguageProtocolServer(object):
+ def __init__(self, handler, conn):
+ self.conn = conn
+ self.handler = handler
+ if handler is not None:
+ handler.set_lsp(self)
+ self.running = True
+ self._next_id = 0
+
+ def read_request(self):
+ headers = {}
+ while True:
+ # Read a line
+ line = self.conn.readline()
+ # Return on EOF.
+ if not line:
+ return None
+ if line[-2:] != "\r\n":
+ raise ProtocolError("invalid end of line in header")
+ line = line[:-2]
+ if not line:
+ # End of headers.
+ log.debug("Headers: %r", headers)
+ length = headers.get("Content-Length", None)
+ if length is not None:
+ body = self.conn.read(int(length))
+ return body
+ else:
+ raise ProtocolError("missing Content-Length in header")
+ else:
+ key, value = line.split(": ", 1)
+ headers[key] = value
+
+ def run(self):
+ while self.running:
+ body = self.read_request()
+ if body is None:
+ # EOF
+ break
+
+ # Text to JSON
+ msg = json.loads(body)
+ log.debug("Read msg: %s", msg)
+
+ reply = self.handle(msg)
+ if reply is not None:
+ self.write_output(reply)
+
+ def handle(self, msg):
+ if msg.get("jsonrpc", None) != "2.0":
+ raise ProtocolError("invalid jsonrpc version")
+ tid = msg.get("id", None)
+ method = msg.get("method", None)
+ if method is None:
+ # This is a reply.
+ log.error("Unexpected reply for %s", tid)
+ return
+ params = msg.get("params", None)
+ fmethod = self.handler.dispatcher.get(method, None)
+ if fmethod:
+ if params is None:
+ params = {}
+ try:
+ response = fmethod(**params)
+ except Exception as e:
+ log.exception(
+ "Caught exception while handling %s with params %s:", method, params
+ )
+ self.show_message(
+ MessageType.Error,
+ (
+ "Caught exception while handling {}, "
+ + "see VHDL language server output for details."
+ ).format(method),
+ )
+ response = None
+ if tid is None:
+ # If this was just a notification, discard it
+ return None
+ log.debug("Response: %s", response)
+ rbody = {
+ "jsonrpc": "2.0",
+ "id": tid,
+ "result": response,
+ }
+ else:
+ # Unknown method.
+ log.error("Unknown method %s", method)
+ # If this was just a notification, discard it
+ if tid is None:
+ return None
+ # Otherwise create an error.
+ rbody = {
+ "jsonrpc": "2.0",
+ "id": tid,
+ "error": {
+ "code": JSONErrorCodes.MethodNotFound,
+ "message": "unknown method {}".format(method),
+ },
+ }
+ return rbody
+
+ def write_output(self, body):
+ output = json.dumps(body, separators=(",", ":"))
+ self.conn.write("Content-Length: {}\r\n".format(len(output)))
+ self.conn.write("\r\n")
+ self.conn.write(output)
+
+ def notify(self, method, params):
+ """Send a notification"""
+ body = {
+ "jsonrpc": "2.0",
+ "method": method,
+ "params": params,
+ }
+ self.write_output(body)
+
+ def send_request(self, method, params):
+ """Send a request"""
+ self._next_id += 1
+ body = {
+ "jsonrpc": "2.0",
+ "id": self._next_id,
+ "method": method,
+ "params": params,
+ }
+ self.write_output(body)
+
+ def shutdown(self):
+ """Prepare to shutdown the server"""
+ self.running = False
+
+ def show_message(self, typ, message):
+ self.notify("window/showMessage", {"type": typ, "message": message})
+
+ def configuration(self, items):
+ return self.send_request("workspace/configuration", {"items": items})
+
+
+# ----------------------------------------------------------------------
+# Standard defines and object types
+#
+
+
+class JSONErrorCodes(object):
+ # Defined by JSON RPC
+ ParseError = -32700
+ InvalidRequest = -32600
+ MethodNotFound = -32601
+ InvalidParams = -32602
+ InternalError = -32603
+ serverErrorStart = -32099
+ serverErrorEnd = -32000
+ ServerNotInitialized = -32002
+ UnknownErrorCode = -32001
+
+ # Defined by the protocol.
+ RequestCancelled = -32800
+ ContentModified = -32801
+
+
+class CompletionKind(object):
+ Text = 1
+ Method = 2
+ Function = 3
+ Constructor = 4
+ Field = 5
+ Variable = 6
+ Class = 7
+ Interface = 8
+ Module = 9
+ Property = 10
+ Unit = 11
+ Value = 12
+ Enum = 13
+ Keyword = 14
+ Snippet = 15
+ Color = 16
+ File = 17
+ Reference = 18
+
+
+class DiagnosticSeverity(object):
+ Error = 1
+ Warning = 2
+ Information = 3
+ Hint = 4
+
+
+class TextDocumentSyncKind(object):
+ NONE = (0,)
+ FULL = 1
+ INCREMENTAL = 2
+
+
+class MessageType(object):
+ Error = 1
+ Warning = 2
+ Info = 3
+ Log = 4
+
+
+class SymbolKind(object):
+ File = 1
+ Module = 2
+ Namespace = 3
+ Package = 4
+ Class = 5
+ Method = 6
+ Property = 7
+ Field = 8
+ Constructor = 9
+ Enum = 10
+ Interface = 11
+ Function = 12
+ Variable = 13
+ Constant = 14
+ String = 15
+ Number = 16
+ Boolean = 17
+ Array = 18
+
+
+@attr.s
+class HoverInfo(object):
+ language = attr.ib()
+ value = attr.ib()
+
+
+@attr.s
+class Completion(object):
+ label = attr.ib()
+ kind = attr.ib()
+ detail = attr.ib()
+ documentation = attr.ib()
+
+
+@attr.s
+class Position(object):
+ line = attr.ib()
+ character = attr.ib()
+
+
+@attr.s
+class Range(object):
+ start = attr.ib(validator=instance_of(Position))
+ end = attr.ib(validator=instance_of(Position))
+
+
+@attr.s
+class Diagnostic(object):
+ range = attr.ib(validator=instance_of(Range))
+ severity = attr.ib()
+ source = attr.ib()
+ message = attr.ib()
diff --git a/pyGHDL/lsp/lsptools.py b/pyGHDL/lsp/lsptools.py
new file mode 100644
index 000000000..648f0a8c0
--- /dev/null
+++ b/pyGHDL/lsp/lsptools.py
@@ -0,0 +1,41 @@
+import sys
+import argparse
+import json
+from . import lsp
+
+
+def lsp2json():
+ "Utility that transforms lsp log file to a JSON list"
+ conn = lsp.LSPConn(sys.stdin.buffer, sys.stdout.buffer)
+ ls = lsp.LanguageProtocolServer(None, conn)
+ res = []
+ while True:
+ req = ls.read_request()
+ if req is None:
+ break
+ res.append(json.loads(req))
+ print(json.dumps(res, indent=2))
+
+
+def json2lsp():
+ "Utility that transform a JSON list to an lsp file"
+ res = json.load(sys.stdin)
+ conn = lsp.LSPConn(sys.stdin.buffer, sys.stdout.buffer)
+ ls = lsp.LanguageProtocolServer(None, conn)
+ for req in res:
+ ls.write_output(req)
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ subparsers = parser.add_subparsers(help="sub-command help")
+ parser_l2j = subparsers.add_parser("lsp2json", help="convert lsp dump to JSON")
+ parser_l2j.set_defaults(func=lsp2json)
+ parser_j2l = subparsers.add_parser("json2lsp", help="convert JSON to lsp dump")
+ parser_j2l.set_defaults(func=json2lsp)
+ args = parser.parse_args()
+ args.func()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pyGHDL/lsp/main.py b/pyGHDL/lsp/main.py
new file mode 100644
index 000000000..7cfe06683
--- /dev/null
+++ b/pyGHDL/lsp/main.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+from __future__ import absolute_import
+
+import argparse
+import logging
+import sys
+import os
+
+import pyGHDL.libghdl as libghdl
+
+from . import version
+from . import lsp
+from . import vhdl_ls
+
+logger = logging.getLogger("ghdl-ls")
+
+
+class LSPConnTrace(object):
+ """Wrapper class to save in and out packets"""
+
+ def __init__(self, basename, conn):
+ self.conn = conn
+ self.trace_in = open(basename + ".in", "w")
+ self.trace_out = open(basename + ".out", "w")
+
+ def readline(self):
+ res = self.conn.readline()
+ self.trace_in.write(res)
+ return res
+
+ def read(self, size):
+ res = self.conn.read(size)
+ self.trace_in.write(res)
+ self.trace_in.flush()
+ return res
+
+ def write(self, out):
+ self.conn.write(out)
+ self.trace_out.write(out)
+ self.trace_out.flush()
+
+
+def rotate_log_files(basename, num):
+ for i in range(num, 0, -1):
+ oldfile = "{}.{}".format(basename, i - 1)
+ if os.path.isfile(oldfile):
+ os.rename(oldfile, "{}.{}".format(basename, i))
+ if os.path.isfile(basename):
+ os.rename(basename, "{}.0".format(basename))
+
+
+def main():
+ parser = argparse.ArgumentParser(description="VHDL Language Protocol Server")
+ parser.add_argument(
+ "--version", "-V", action="version", version="%(prog)s " + version.__version__
+ )
+ parser.add_argument(
+ "--verbose", "-v", action="count", default=0, help="Show debug output"
+ )
+ parser.add_argument(
+ "--log-file", help="Redirect logs to the given file instead of stderr"
+ )
+ parser.add_argument("--trace-file", help="Save rpc data to FILE.in and FILE.out")
+ parser.add_argument("--input", "-i", help="Read request from file")
+ parser.add_argument(
+ "--disp-config",
+ action="store_true",
+ help="Disp installation configuration and exit",
+ )
+
+ args = parser.parse_args()
+
+ if args.disp_config:
+ libghdl.errorout_console.Install_Handler()
+ libghdl.disp_config()
+ return
+
+ # Setup logging
+ if args.verbose >= 2:
+ loglevel = logging.DEBUG
+ elif args.verbose >= 1:
+ loglevel = logging.INFO
+ else:
+ loglevel = logging.ERROR
+
+ if args.log_file:
+ rotate_log_files(args.log_file, 5)
+ logstream = open(args.log_file, "w")
+ else:
+ logstream = sys.stderr
+ logging.basicConfig(
+ format="%(asctime)-15s [%(levelname)s] %(message)s",
+ stream=logstream,
+ level=loglevel,
+ )
+
+ if args.verbose != 0:
+ sys.stderr.write("Args: {}\n".format(sys.argv))
+ sys.stderr.write("Current directory: {}\n".format(os.getcwd()))
+
+ logger.info("Args: %s", sys.argv)
+ logger.info("Current directory is %s", os.getcwd())
+
+ # Connection
+ instream = sys.stdin.buffer
+ if args.input is not None:
+ instream = open(args.input, "rb")
+
+ conn = lsp.LSPConn(instream, sys.stdout.buffer)
+
+ trace_file = args.trace_file
+ if trace_file is None:
+ trace_file = os.environ.get("GHDL_LS_TRACE")
+ if trace_file is not None:
+ if args.input is None:
+ rotate_log_files(trace_file + ".in", 5)
+ rotate_log_files(trace_file + ".out", 5)
+ conn = LSPConnTrace(trace_file, conn)
+ else:
+ logger.info("Traces disabled when -i/--input")
+
+ handler = vhdl_ls.VhdlLanguageServer()
+
+ try:
+ server = lsp.LanguageProtocolServer(handler, conn)
+ server.run()
+ except Exception:
+ logger.exception("Uncaught error")
+ sys.exit(1)
diff --git a/pyGHDL/lsp/references.py b/pyGHDL/lsp/references.py
new file mode 100644
index 000000000..65bbeead4
--- /dev/null
+++ b/pyGHDL/lsp/references.py
@@ -0,0 +1,100 @@
+import logging
+import pyGHDL.libghdl.vhdl.nodes as nodes
+import pyGHDL.libghdl.vhdl.nodes_meta as nodes_meta
+import pyGHDL.libghdl.name_table as name_table
+import pyGHDL.libghdl.utils as pyutils
+
+log = logging.getLogger(__name__)
+
+
+def find_def_chain(first, loc):
+ n1 = first
+ while n1 != nodes.Null_Iir:
+ res = find_def(n1, loc)
+ if res is not None:
+ return res
+ n1 = nodes.Get_Chain(n1)
+ return None
+
+
+def find_def(n, loc):
+ "Return the node at location :param loc:, or None if not under :param n:"
+ if n == nodes.Null_Iir:
+ return None
+ k = nodes.Get_Kind(n)
+ if k in [
+ nodes.Iir_Kind.Simple_Name,
+ nodes.Iir_Kind.Character_Literal,
+ nodes.Iir_Kind.Operator_Symbol,
+ nodes.Iir_Kind.Selected_Name,
+ nodes.Iir_Kind.Attribute_Name,
+ nodes.Iir_Kind.Selected_Element,
+ ]:
+ n_loc = nodes.Get_Location(n)
+ if loc >= n_loc:
+ ident = nodes.Get_Identifier(n)
+ id_len = name_table.Get_Name_Length(ident)
+ if loc < n_loc + id_len:
+ return n
+ if k == nodes.Iir_Kind.Simple_Name:
+ return None
+ elif k == nodes.Iir_Kind.Design_File:
+ return find_def_chain(nodes.Get_First_Design_Unit(n), loc)
+ elif k == nodes.Iir_Kind.Design_Unit:
+ # if loc > elocations.Get_End_Location(unit):
+ # return None
+ res = find_def_chain(nodes.Get_Context_Items(n), loc)
+ if res is not None:
+ return res
+ unit = nodes.Get_Library_Unit(n)
+ return find_def(unit, loc)
+
+ # This is *much* faster than using node_iter!
+ for f in pyutils.fields_iter(n):
+ typ = nodes_meta.get_field_type(f)
+ if typ == nodes_meta.types.Iir:
+ attr = nodes_meta.get_field_attribute(f)
+ if attr == nodes_meta.Attr.ANone:
+ res = find_def(nodes_meta.Get_Iir(n, f), loc)
+ if res is not None:
+ return res
+ elif attr == nodes_meta.Attr.Chain:
+ res = find_def_chain(nodes_meta.Get_Iir(n, f), loc)
+ if res is not None:
+ return res
+ elif attr == nodes_meta.Attr.Maybe_Ref:
+ if not nodes.Get_Is_Ref(n, f):
+ res = find_def(nodes_meta.Get_Iir(n, f), loc)
+ if res is not None:
+ return res
+ elif typ == nodes_meta.types.Iir_List:
+ attr = nodes_meta.get_field_attribute(f)
+ if attr == nodes_meta.Attr.ANone:
+ for n1 in pyutils.list_iter(nodes_meta.Get_Iir_List(n, f)):
+ res = find_def(n1, loc)
+ if res is not None:
+ return res
+ elif typ == nodes_meta.types.Iir_Flist:
+ attr = nodes_meta.get_field_attribute(f)
+ if attr == nodes_meta.Attr.ANone:
+ for n1 in pyutils.flist_iter(nodes_meta.Get_Iir_Flist(n, f)):
+ res = find_def(n1, loc)
+ if res is not None:
+ return res
+
+ return None
+
+
+def goto_definition(n, loc):
+ "Return the declaration (as a node) under :param loc: or None"
+ ref = find_def(n, loc)
+ log.debug("for loc %u found node %s", loc, ref)
+ if ref is None:
+ return None
+ log.debug(
+ "for loc %u id=%s",
+ loc,
+ name_table.Get_Name_Ptr(nodes.Get_Identifier(ref)).decode("utf-8"),
+ )
+ ent = nodes.Get_Named_Entity(ref)
+ return None if ent == nodes.Null_Iir else ent
diff --git a/pyGHDL/lsp/symbols.py b/pyGHDL/lsp/symbols.py
new file mode 100644
index 000000000..3a0f1da30
--- /dev/null
+++ b/pyGHDL/lsp/symbols.py
@@ -0,0 +1,177 @@
+import pyGHDL.libghdl.name_table as name_table
+import pyGHDL.libghdl.files_map as files_map
+import pyGHDL.libghdl.vhdl.nodes as nodes
+import pyGHDL.libghdl.vhdl.nodes_meta as nodes_meta
+import pyGHDL.libghdl.vhdl.elocations as elocations
+import pyGHDL.libghdl.utils as pyutils
+
+from . import lsp
+
+SYMBOLS_MAP = {
+ nodes.Iir_Kind.Package_Declaration: {
+ "kind": lsp.SymbolKind.Package,
+ "detail": "(declaration)",
+ },
+ nodes.Iir_Kind.Package_Body: {"kind": lsp.SymbolKind.Package, "detail": "(body)"},
+ nodes.Iir_Kind.Entity_Declaration: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Architecture_Body: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Configuration_Declaration: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Package_Instantiation_Declaration: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Component_Declaration: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Context_Declaration: {"kind": lsp.SymbolKind.Module},
+ nodes.Iir_Kind.Use_Clause: {"kind": None},
+ nodes.Iir_Kind.Library_Clause: {"kind": None},
+ nodes.Iir_Kind.Procedure_Declaration: {"kind": lsp.SymbolKind.Function},
+ nodes.Iir_Kind.Function_Declaration: {"kind": lsp.SymbolKind.Function},
+ nodes.Iir_Kind.Interface_Procedure_Declaration: {"kind": lsp.SymbolKind.Function},
+ nodes.Iir_Kind.Interface_Function_Declaration: {"kind": lsp.SymbolKind.Function},
+ nodes.Iir_Kind.Procedure_Body: {
+ "kind": lsp.SymbolKind.Function,
+ "detail": "(body)",
+ },
+ nodes.Iir_Kind.Function_Body: {"kind": lsp.SymbolKind.Function, "detail": "(body)"},
+ nodes.Iir_Kind.Type_Declaration: {"kind": lsp.SymbolKind.Constructor},
+ nodes.Iir_Kind.Subtype_Declaration: {"kind": lsp.SymbolKind.Constructor},
+ nodes.Iir_Kind.Attribute_Declaration: {"kind": lsp.SymbolKind.Property},
+ nodes.Iir_Kind.Attribute_Specification: {"kind": None},
+ nodes.Iir_Kind.Disconnection_Specification: {"kind": None},
+ nodes.Iir_Kind.Anonymous_Type_Declaration: {"kind": None},
+ nodes.Iir_Kind.Variable_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Constant_Declaration: {"kind": lsp.SymbolKind.Constant},
+ nodes.Iir_Kind.Signal_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Signal_Attribute_Declaration: {"kind": None},
+ nodes.Iir_Kind.File_Declaration: {"kind": lsp.SymbolKind.File},
+ nodes.Iir_Kind.Interface_Variable_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Interface_Constant_Declaration: {"kind": lsp.SymbolKind.Constant},
+ nodes.Iir_Kind.Interface_Signal_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Interface_File_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.File_Declaration: {"kind": lsp.SymbolKind.File},
+ nodes.Iir_Kind.Object_Alias_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Non_Object_Alias_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Protected_Type_Body: {"kind": lsp.SymbolKind.Class},
+ nodes.Iir_Kind.Group_Template_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Group_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Concurrent_Simple_Signal_Assignment: {"kind": None},
+ nodes.Iir_Kind.Concurrent_Conditional_Signal_Assignment: {"kind": None},
+ nodes.Iir_Kind.Concurrent_Selected_Signal_Assignment: {"kind": None},
+ nodes.Iir_Kind.Concurrent_Procedure_Call_Statement: {"kind": None},
+ nodes.Iir_Kind.Concurrent_Assertion_Statement: {"kind": None},
+ nodes.Iir_Kind.Component_Instantiation_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Block_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.If_Generate_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.For_Generate_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Case_Generate_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Sensitized_Process_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Process_Statement: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Psl_Assert_Directive: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Psl_Assume_Directive: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Psl_Cover_Directive: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Psl_Restrict_Directive: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Psl_Endpoint_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Psl_Declaration: {"kind": lsp.SymbolKind.Variable},
+ nodes.Iir_Kind.Psl_Assert_Directive: {"kind": lsp.SymbolKind.Method},
+ nodes.Iir_Kind.Configuration_Specification: {"kind": None},
+}
+
+
+def location_to_position(fe, loc):
+ assert loc != files_map.No_Location
+ line = files_map.Location_File_To_Line(loc, fe)
+ off = files_map.Location_File_Line_To_Offset(loc, fe, line)
+ return {"line": line - 1, "character": off}
+
+
+def get_symbols_chain(fe, n):
+ res = [get_symbols(fe, el) for el in pyutils.chain_iter(n)]
+ return [e for e in res if e is not None]
+
+
+def get_symbols(fe, n):
+ if n == nodes.Null_Iir:
+ return None
+ k = nodes.Get_Kind(n)
+ if k == nodes.Iir_Kind.Design_Unit:
+ return get_symbols(fe, nodes.Get_Library_Unit(n))
+ m = SYMBOLS_MAP.get(k, None)
+ if m is None:
+ raise AssertionError("get_symbol: unhandled {}".format(pyutils.kind_image(k)))
+ kind = m["kind"]
+ if kind is None:
+ return None
+ if k in [nodes.Iir_Kind.Procedure_Declaration, nodes.Iir_Kind.Function_Declaration]:
+ # Discard implicit declarations.
+ if nodes.Get_Implicit_Definition(n) < nodes.Iir_Predefined.PNone:
+ return None
+ if nodes.Get_Has_Body(n):
+ # Use the body instead.
+ # FIXME: but get interface from the spec!
+ return None
+ res = {"kind": kind}
+ detail = m.get("detail")
+ if detail is not None:
+ res["detail"] = detail
+ # Get the name
+ if k in [nodes.Iir_Kind.Function_Body, nodes.Iir_Kind.Procedure_Body]:
+ nid = nodes.Get_Identifier(nodes.Get_Subprogram_Specification(n))
+ else:
+ nid = nodes.Get_Identifier(n)
+ if nid == name_table.Null_Identifier:
+ name = None
+ else:
+ name = pyutils.name_image(nid)
+ # Get the range. Use elocations when possible.
+ if k in (
+ nodes.Iir_Kind.Architecture_Body,
+ nodes.Iir_Kind.Entity_Declaration,
+ nodes.Iir_Kind.Package_Declaration,
+ nodes.Iir_Kind.Package_Body,
+ nodes.Iir_Kind.Component_Declaration,
+ nodes.Iir_Kind.Process_Statement,
+ nodes.Iir_Kind.Sensitized_Process_Statement,
+ nodes.Iir_Kind.If_Generate_Statement,
+ nodes.Iir_Kind.For_Generate_Statement,
+ ):
+ start_loc = elocations.Get_Start_Location(n)
+ end_loc = elocations.Get_End_Location(n)
+ if end_loc == files_map.No_Location:
+ # Can happen in case of parse error
+ end_loc = start_loc
+ else:
+ start_loc = nodes.Get_Location(n)
+ end_loc = start_loc + name_table.Get_Name_Length(nid)
+ res["range"] = {
+ "start": location_to_position(fe, start_loc),
+ "end": location_to_position(fe, end_loc),
+ }
+
+ # Gather children.
+ # FIXME: should we use a list of fields to inspect ?
+ children = []
+ # if nodes_meta.Has_Generic_Chain(k):
+ # children.extend(get_symbols_chain(fe, nodes.Get_Generic_Chain(n)))
+ # if nodes_meta.Has_Port_Chain(k):
+ # children.extend(get_symbols_chain(fe, nodes.Get_Port_Chain(n)))
+ # if nodes_meta.Has_Interface_Declaration_Chain(k):
+ # children.extend(get_symbols_chain(fe, nodes.Get_Interface_Declaration_Chain(n)))
+ if k in (nodes.Iir_Kind.Package_Declaration, nodes.Iir_Kind.Package_Body):
+ children.extend(get_symbols_chain(fe, nodes.Get_Declaration_Chain(n)))
+ if nodes_meta.Has_Concurrent_Statement_Chain(k):
+ children.extend(get_symbols_chain(fe, nodes.Get_Concurrent_Statement_Chain(n)))
+ if nodes_meta.Has_Generate_Statement_Body(k):
+ children.extend(
+ get_symbols_chain(
+ fe,
+ nodes.Get_Concurrent_Statement_Chain(
+ nodes.Get_Generate_Statement_Body(n)
+ ),
+ )
+ )
+
+ if children:
+ res["children"] = children
+ else:
+ # Discard anonymous symbols without children.
+ if name is None:
+ return None
+ res["name"] = name if name is not None else "<anon>"
+ return res
diff --git a/pyGHDL/lsp/version.py b/pyGHDL/lsp/version.py
new file mode 100644
index 000000000..4b0d124d5
--- /dev/null
+++ b/pyGHDL/lsp/version.py
@@ -0,0 +1 @@
+__version__ = "0.1dev"
diff --git a/pyGHDL/lsp/vhdl_ls.py b/pyGHDL/lsp/vhdl_ls.py
new file mode 100644
index 000000000..61c4aed23
--- /dev/null
+++ b/pyGHDL/lsp/vhdl_ls.py
@@ -0,0 +1,150 @@
+import logging
+
+from . import lsp
+from .workspace import Workspace
+
+log = logging.getLogger(__name__)
+
+
+class VhdlLanguageServer(object):
+ def __init__(self):
+ self.workspace = None
+ self.lsp = None
+ self._shutdown = False
+ self.dispatcher = {
+ "initialize": self.initialize,
+ "initialized": self.initialized,
+ "shutdown": self.shutdown,
+ "$/setTraceNotification": self.setTraceNotification,
+ "textDocument/didOpen": self.textDocument_didOpen,
+ "textDocument/didChange": self.textDocument_didChange,
+ "textDocument/didClose": self.textDocument_didClose,
+ "textDocument/didSave": self.textDocument_didSave,
+ # 'textDocument/hover': self.hover,
+ "textDocument/definition": self.textDocument_definition,
+ "textDocument/documentSymbol": self.textDocument_documentSymbol,
+ # 'textDocument/completion': self.completion,
+ "textDocument/rangeFormatting": self.textDocument_rangeFormatting,
+ "workspace/xShowAllFiles": self.workspace_xShowAllFiles,
+ "workspace/xGetAllEntities": self.workspace_xGetAllEntities,
+ "workspace/xGetEntityInterface": self.workspace_xGetEntityInterface,
+ }
+
+ def set_lsp(self, server):
+ self.lsp = server
+
+ def shutdown(self):
+ self.lsp.shutdown()
+
+ def setTraceNotification(self, value):
+ pass
+
+ def capabilities(self):
+ server_capabilities = {
+ "textDocumentSync": {
+ "openClose": True,
+ "change": lsp.TextDocumentSyncKind.INCREMENTAL,
+ "save": {"includeText": True},
+ },
+ "hoverProvider": False,
+ # 'completionProvider': False,
+ # 'signatureHelpProvider': {
+ # 'triggerCharacters': ['(', ',']
+ # },
+ "definitionProvider": True,
+ "referencesProvider": False,
+ "documentHighlightProvider": False,
+ "documentSymbolProvider": True,
+ "codeActionProvider": False,
+ "documentFormattingProvider": False,
+ "documentRangeFormattingProvider": True,
+ "renameProvider": False,
+ }
+ return server_capabilities
+
+ def initialize(
+ self,
+ processId,
+ rootPath,
+ capabilities,
+ rootUri=None,
+ initializationOptions=None,
+ **_
+ ):
+ log.debug(
+ "Language server initialized with %s %s %s %s",
+ processId,
+ rootUri,
+ rootPath,
+ initializationOptions,
+ )
+ if rootUri is None:
+ rootUri = lsp.path_to_uri(rootPath) if rootPath is not None else ""
+ self.workspace = Workspace(rootUri, self.lsp)
+
+ # Get our capabilities
+ return {"capabilities": self.capabilities()}
+
+ def initialized(self):
+ # Event when the client is fully initialized.
+ return None
+
+ def textDocument_didOpen(self, textDocument=None):
+ doc_uri = textDocument["uri"]
+ self.workspace.put_document(
+ doc_uri, textDocument["text"], version=textDocument.get("version")
+ )
+ self.lint(doc_uri)
+
+ def textDocument_didChange(self, textDocument=None, contentChanges=None, **_kwargs):
+ doc_uri = textDocument["uri"]
+ new_version = textDocument.get("version")
+ self.workspace.apply_changes(doc_uri, contentChanges, new_version)
+
+ def lint(self, doc_uri):
+ self.workspace.lint(doc_uri)
+
+ def textDocument_didClose(self, textDocument=None, **_kwargs):
+ self.workspace.rm_document(textDocument["uri"])
+
+ def textDocument_didSave(self, textDocument=None, text=None, **_kwargs):
+ if text is not None:
+ # Sanity check: check we have the same content for the document.
+ self.workspace.check_document(textDocument["uri"], text)
+ else:
+ log.debug("did save - no text")
+ self.lint(textDocument["uri"])
+
+ def textDocument_definition(self, textDocument=None, position=None):
+ return self.workspace.goto_definition(textDocument["uri"], position)
+
+ def textDocument_documentSymbol(self, textDocument=None):
+ doc = self.workspace.get_or_create_document(textDocument["uri"])
+ return doc.document_symbols()
+
+ def textDocument_rangeFormatting(self, textDocument=None, range=None, options=None):
+ doc_uri = textDocument["uri"]
+ doc = self.workspace.get_document(doc_uri)
+ assert doc is not None, "Try to format a non-loaded document"
+ res = doc.format_range(range)
+ if res is not None:
+ self.lint(doc_uri)
+ return res
+
+ def m_workspace__did_change_configuration(self, _settings=None):
+ for doc_uri in self.workspace.documents:
+ self.lint(doc_uri)
+
+ def m_workspace__did_change_watched_files(self, **_kwargs):
+ # Externally changed files may result in changed diagnostics
+ for doc_uri in self.workspace.documents:
+ self.lint(doc_uri)
+
+ def workspace_xShowAllFiles(self):
+ return self.workspace.x_show_all_files()
+
+ def workspace_xGetAllEntities(self):
+ return self.workspace.x_get_all_entities()
+
+ def workspace_xGetEntityInterface(self, library, name):
+ return self.workspace.x_get_entity_interface(library, name)
diff --git a/pyGHDL/lsp/workspace.py b/pyGHDL/lsp/workspace.py
new file mode 100644
index 000000000..19cdc5309
--- /dev/null
+++ b/pyGHDL/lsp/workspace.py
@@ -0,0 +1,499 @@
+import logging
+import os
+import json
+from ctypes import byref
+import pyGHDL.libghdl as libghdl
+import pyGHDL.libghdl.errorout_memory as errorout_memory
+import pyGHDL.libghdl.flags
+import pyGHDL.libghdl.errorout as errorout
+import pyGHDL.libghdl.files_map as files_map
+import pyGHDL.libghdl.libraries as libraries
+import pyGHDL.libghdl.name_table as name_table
+import pyGHDL.libghdl.vhdl.nodes as nodes
+import pyGHDL.libghdl.vhdl.lists as lists
+import pyGHDL.libghdl.vhdl.std_package as std_package
+import pyGHDL.libghdl.vhdl.parse
+import pyGHDL.libghdl.vhdl.sem_lib as sem_lib
+import pyGHDL.libghdl.utils as pyutils
+
+from . import lsp
+from . import document, symbols
+
+log = logging.getLogger(__name__)
+
+
+class ProjectError(Exception):
+ "Exception raised in case of unrecoverable error in the project file."
+
+ def __init__(self, msg):
+ super().__init__()
+ self.msg = msg
+
+
+class Workspace(object):
+ def __init__(self, root_uri, server):
+ self._root_uri = root_uri
+ self._server = server
+ self._root_path = lsp.path_from_uri(self._root_uri)
+ self._docs = {} # uri -> doc
+ self._fe_map = {} # fe -> doc
+ self._prj = {}
+ self._last_linted_doc = None
+ errorout_memory.Install_Handler()
+ libghdl.flags.Flag_Elocations.value = True
+ # libghdl.Flags.Verbose.value = True
+ # We do analysis even in case of errors.
+ libghdl.vhdl.parse.Flag_Parse_Parenthesis.value = True
+ # Force analysis to get more feedback + navigation even in case
+ # of errors.
+ libghdl.flags.Flag_Force_Analysis.value = True
+ # Do not consider analysis order issues.
+ libghdl.flags.Flag_Elaborate_With_Outdated.value = True
+ libghdl.errorout.Enable_Warning(errorout.Msgid.Warnid_Unused, True)
+ self.read_project()
+ self.set_options_from_project()
+ libghdl.analyze_init()
+ self._diags_set = set() # Documents with at least one diagnostic.
+ self.read_files_from_project()
+ self.gather_diagnostics(None)
+
+ @property
+ def documents(self):
+ return self._docs
+
+ @property
+ def root_path(self):
+ return self._root_path
+
+ @property
+ def root_uri(self):
+ return self._root_uri
+
+ def _create_document(self, doc_uri, sfe, version=None):
+ """Create a document and put it in this workspace."""
+ doc = document.Document(doc_uri, sfe, version)
+ self._docs[doc_uri] = doc
+ self._fe_map[sfe] = doc
+ return doc
+
+ def create_document_from_sfe(self, sfe, abspath):
+ # A filename has been given without a corresponding document.
+ # Create the document.
+ # Common case: an error message was reported in a non-open document.
+ # Create a document so that it could be reported to the client.
+ doc_uri = lsp.path_to_uri(os.path.normpath(abspath))
+ return self._create_document(doc_uri, sfe)
+
+ def create_document_from_uri(self, doc_uri, source=None, version=None):
+ # A document is referenced by an uri but not known. Load it.
+ # We assume the path is correct.
+ path = lsp.path_from_uri(doc_uri)
+ if source is None:
+ source = open(path).read()
+ sfe = document.Document.load(
+ source, os.path.dirname(path), os.path.basename(path)
+ )
+ return self._create_document(doc_uri, sfe)
+
+ def get_or_create_document(self, doc_uri):
+ res = self.get_document(doc_uri)
+ if res is not None:
+ return res
+ res = self.create_document_from_uri(doc_uri)
+ res.parse_document()
+ return res
+
+ def get_document(self, doc_uri):
+ """Get a document from :param doc_uri: Note that the document may not exist,
+ and this function may return None."""
+ return self._docs.get(doc_uri)
+
+ def put_document(self, doc_uri, source, version=None):
+ doc = self.get_document(doc_uri)
+ if doc is None:
+ doc = self.create_document_from_uri(doc_uri, source=source, version=version)
+ else:
+ # The document may already be present (loaded from a project)
+ # In that case, overwrite it as the client may have a more
+ # recent version.
+ doc.reload(source)
+ return doc
+
+ def sfe_to_document(self, sfe):
+ """Get the document correspond to :param sfe: source file.
+ Can create the document if needed."""
+ assert sfe != 0
+ doc = self._fe_map.get(sfe, None)
+ if doc is None:
+ # Could be a document from outside...
+ filename = pyutils.name_image(files_map.Get_File_Name(sfe))
+ if not os.path.isabs(filename):
+ dirname = pyutils.name_image(files_map.Get_Directory_Name(sfe))
+ filename = os.path.join(dirname, filename)
+ doc = self.create_document_from_sfe(sfe, filename)
+ return doc
+
+ def add_vhdl_file(self, name):
+ log.info("loading %s", name)
+ if os.path.isabs(name):
+ absname = name
+ else:
+ absname = os.path.join(self._root_path, name)
+ # Create a document for this file.
+ try:
+ fd = open(absname)
+ sfe = document.Document.load(fd.read(), self._root_path, name)
+ fd.close()
+ except OSError as err:
+ self._server.show_message(
+ lsp.MessageType.Error, "cannot load {}: {}".format(name, err.strerror)
+ )
+ return
+ doc = self.create_document_from_sfe(sfe, absname)
+ doc.parse_document()
+
+ def read_project(self):
+ prj_file = os.path.join(self.root_path, "hdl-prj.json")
+ if not os.path.exists(prj_file):
+ log.info("project file %s does not exist", prj_file)
+ return
+ try:
+ f = open(prj_file)
+ except OSError as err:
+ self._server.show_message(
+ lsp.MessageType.Error,
+ "cannot open project file {}: {}".format(prj_file, err.strerror),
+ )
+ return
+ log.info("reading project file %s", prj_file)
+ try:
+ self._prj = json.load(f)
+ except json.decoder.JSONDecodeError as e:
+ log.info("error in project file")
+ self._server.show_message(
+ lsp.MessageType.Error,
+ "json error in project file {}:{}:{}".format(
+ prj_file, e.lineno, e.colno
+ ),
+ )
+ f.close()
+
+ def set_options_from_project(self):
+ try:
+ if self._prj is None:
+ return
+ if not isinstance(self._prj, dict):
+ raise ProjectError("project file is not a dictionnary")
+ opts = self._prj.get("options", None)
+ if opts is None:
+ return
+ if not isinstance(opts, dict):
+ raise ProjectError("'options' is not a dictionnary")
+ ghdl_opts = opts.get("ghdl_analysis", None)
+ if ghdl_opts is None:
+ return
+ log.info("Using options: %s", ghdl_opts)
+ for opt in ghdl_opts:
+ if not libghdl.set_option(opt.encode("utf-8")):
+ self._server.show_message(
+ lsp.MessageType.Error, "error with option: {}".format(opt)
+ )
+ except ProjectError as e:
+ self._server.show_message(
+ lsp.MessageType.Error, "error in project file: {}".format(e.msg)
+ )
+
+ def read_files_from_project(self):
+ try:
+ files = self._prj.get("files", [])
+ if not isinstance(files, list):
+ raise ProjectError("'files' is not a list")
+ for f in files:
+ if not isinstance(f, dict):
+ raise ProjectError("an element of 'files' is not a dict")
+ name = f.get("file")
+ if not isinstance(name, str):
+ raise ProjectError("a 'file' is not a string")
+ lang = f.get("language", "vhdl")
+ if lang == "vhdl":
+ self.add_vhdl_file(name)
+ except ProjectError as e:
+ self._server.show_message(
+ lsp.MessageType.Error, "error in project file: {}".format(e.msg)
+ )
+
+ def get_configuration(self):
+ self._server.configuration(
+ [{"scopeUri": "", "section": "vhdl.maxNumberOfProblems"}]
+ )
+
+ def gather_diagnostics(self, doc):
+ # Gather messages (per file)
+ nbr_msgs = errorout_memory.Get_Nbr_Messages()
+ diags = {}
+ diag = {}
+ for i in range(nbr_msgs):
+ hdr = errorout_memory.Get_Error_Record(i + 1)
+ msg = errorout_memory.Get_Error_Message(i + 1).decode("utf-8")
+ if hdr.file == 0:
+ # Possible for error limit reached.
+ continue
+ err_range = {
+ "start": {"line": hdr.line - 1, "character": hdr.offset},
+ "end": {"line": hdr.line - 1, "character": hdr.offset + hdr.length},
+ }
+ if hdr.group <= errorout_memory.Msg_Main:
+ if hdr.id <= errorout.Msgid.Msgid_Note:
+ severity = lsp.DiagnosticSeverity.Information
+ elif hdr.id <= errorout.Msgid.Msgid_Warning:
+ severity = lsp.DiagnosticSeverity.Warning
+ else:
+ severity = lsp.DiagnosticSeverity.Error
+ diag = {
+ "source": "ghdl",
+ "range": err_range,
+ "message": msg,
+ "severity": severity,
+ }
+ if hdr.group == errorout_memory.Msg_Main:
+ diag["relatedInformation"] = []
+ fdiag = diags.get(hdr.file, None)
+ if fdiag is None:
+ diags[hdr.file] = [diag]
+ else:
+ fdiag.append(diag)
+ else:
+ assert diag
+ if True:
+ doc = self.sfe_to_document(hdr.file)
+ diag["relatedInformation"].append(
+ {
+ "location": {"uri": doc.uri, "range": err_range},
+ "message": msg,
+ }
+ )
+ errorout_memory.Clear_Errors()
+ # Publish diagnostics
+ for sfe, diag in diags.items():
+ doc = self.sfe_to_document(sfe)
+ self.publish_diagnostics(doc.uri, diag)
+ if doc is not None and doc._fe not in diags:
+ # Clear previous diagnostics for the doc.
+ self.publish_diagnostics(doc.uri, [])
+
+ def obsolete_dependent_units(self, unit, antideps):
+ """Obsolete units that depends of :param unit:"""
+ udeps = antideps.get(unit, None)
+ if udeps is None:
+ # There are no units.
+ return
+ # Avoid infinite recursion
+ antideps[unit] = None
+ for un in udeps:
+ log.debug(
+ "obsolete %d %s", un, pyutils.name_image(nodes.Get_Identifier(un))
+ )
+ # Recurse
+ self.obsolete_dependent_units(un, antideps)
+ if nodes.Set_Date_State(un) == nodes.Date_State.Disk:
+ # Already obsolete!
+ continue
+ # FIXME: just de-analyze ?
+ nodes.Set_Date_State(un, nodes.Date_State.Disk)
+ sem_lib.Free_Dependence_List(un)
+ loc = nodes.Get_Location(un)
+ fil = files_map.Location_To_File(loc)
+ pos = files_map.Location_File_To_Pos(loc, fil)
+ line = files_map.Location_File_To_Line(loc, fil)
+ col = files_map.Location_File_Line_To_Offset(loc, fil, line)
+ nodes.Set_Design_Unit_Source_Pos(un, pos)
+ nodes.Set_Design_Unit_Source_Line(un, line)
+ nodes.Set_Design_Unit_Source_Col(un, col)
+
+ def obsolete_doc(self, doc):
+ if doc._tree == nodes.Null_Iir:
+ return
+ # Free old tree
+ assert nodes.Get_Kind(doc._tree) == nodes.Iir_Kind.Design_File
+ if self._last_linted_doc == doc:
+ antideps = None
+ else:
+ antideps = self.compute_anti_dependences()
+ unit = nodes.Get_First_Design_Unit(doc._tree)
+ while unit != nodes.Null_Iir:
+ if antideps is not None:
+ self.obsolete_dependent_units(unit, antideps)
+ # FIXME: free unit; it is not referenced.
+ unit = nodes.Get_Chain(unit)
+ libraries.Purge_Design_File(doc._tree)
+ doc._tree = nodes.Null_Iir
+
+ def lint(self, doc_uri):
+ doc = self.get_document(doc_uri)
+ self.obsolete_doc(doc)
+ doc.compute_diags()
+ self.gather_diagnostics(doc)
+
+ def apply_changes(self, doc_uri, contentChanges, new_version):
+ doc = self.get_document(doc_uri)
+ assert doc is not None, "try to modify a non-loaded document"
+ self.obsolete_doc(doc)
+ prev_sfe = doc._fe
+ for change in contentChanges:
+ doc.apply_change(change)
+ if doc._fe != prev_sfe:
+ del self._fe_map[prev_sfe]
+ self._fe_map[doc._fe] = doc
+ # Like lint
+ doc.compute_diags()
+ self.gather_diagnostics(doc)
+
+ def check_document(self, doc_uri, source):
+ self._docs[doc_uri].check_document(source)
+
+ def rm_document(self, doc_uri):
+ pass
+
+ def apply_edit(self, edit):
+ return self._server.request("workspace/applyEdit", {"edit": edit})
+
+ def publish_diagnostics(self, doc_uri, diagnostics):
+ self._server.notify(
+ "textDocument/publishDiagnostics",
+ params={"uri": doc_uri, "diagnostics": diagnostics},
+ )
+
+ def show_message(self, message, msg_type=lsp.MessageType.Info):
+ self._server.notify(
+ "window/showMessage", params={"type": msg_type, "message": message}
+ )
+
+ def declaration_to_location(self, decl):
+ "Convert declaration :param decl: to an LSP Location"
+ decl_loc = nodes.Get_Location(decl)
+ if decl_loc == std_package.Std_Location.value:
+ # There is no real file for the std.standard package.
+ return None
+ if decl_loc == libraries.Library_Location.value:
+ # Libraries declaration are virtual.
+ return None
+ fe = files_map.Location_To_File(decl_loc)
+ doc = self.sfe_to_document(fe)
+ res = {"uri": doc.uri}
+ nid = nodes.Get_Identifier(decl)
+ res["range"] = {
+ "start": symbols.location_to_position(fe, decl_loc),
+ "end": symbols.location_to_position(
+ fe, decl_loc + name_table.Get_Name_Length(nid)
+ ),
+ }
+ return res
+
+ def goto_definition(self, doc_uri, position):
+ decl = self._docs[doc_uri].goto_definition(position)
+ if decl is None:
+ return None
+ decl_loc = self.declaration_to_location(decl)
+ if decl_loc is None:
+ return None
+ res = [decl_loc]
+ if nodes.Get_Kind(decl) == nodes.Iir_Kind.Component_Declaration:
+ ent = libraries.Find_Entity_For_Component(nodes.Get_Identifier(decl))
+ if ent != nodes.Null_Iir:
+ res.append(self.declaration_to_location(nodes.Get_Library_Unit(ent)))
+ return res
+
+ def x_show_all_files(self):
+ res = []
+ for fe in range(1, files_map.Get_Last_Source_File_Entry() + 1):
+ doc = self._fe_map.get(fe, None)
+ res.append(
+ {
+ "fe": fe,
+ "uri": doc.uri if doc is not None else None,
+ "name": pyutils.name_image(files_map.Get_File_Name(fe)),
+ "dir": pyutils.name_image(files_map.Get_Directory_Name(fe)),
+ }
+ )
+ return res
+
+ def x_get_all_entities(self):
+ res = []
+ lib = libraries.Get_Libraries_Chain()
+ while lib != nodes.Null_Iir:
+ files = nodes.Get_Design_File_Chain(lib)
+ ents = []
+ while files != nodes.Null_Iir:
+ units = nodes.Get_First_Design_Unit(files)
+ while units != nodes.Null_Iir:
+ unitlib = nodes.Get_Library_Unit(units)
+ if nodes.Get_Kind(unitlib) == nodes.Iir_Kind.Entity_Declaration:
+ ents.append(unitlib)
+ units = nodes.Get_Chain(units)
+ files = nodes.Get_Chain(files)
+ ents = [pyutils.name_image(nodes.Get_Identifier(e)) for e in ents]
+ lib_name = pyutils.name_image(nodes.Get_Identifier(lib))
+ res.extend([{"name": n, "library": lib_name} for n in ents])
+ lib = nodes.Get_Chain(lib)
+ return res
+
+ def x_get_entity_interface(self, library, name):
+ def create_interfaces(inters):
+ res = []
+ while inters != nodes.Null_Iir:
+ res.append(
+ {
+ "name": name_table.Get_Name_Ptr(
+ nodes.Get_Identifier(inters)
+ ).decode("latin-1")
+ }
+ )
+ inters = nodes.Get_Chain(inters)
+ return res
+
+ # Find library
+ lib_id = name_table.Get_Identifier(library.encode("utf-8"))
+ lib = libraries.Get_Library_No_Create(lib_id)
+ if lib == name_table.Null_Identifier:
+ return None
+ # Find entity
+ ent_id = name_table.Get_Identifier(name.encode("utf-8"))
+ unit = libraries.Find_Primary_Unit(lib, ent_id)
+ if unit == nodes.Null_Iir:
+ return None
+ ent = nodes.Get_Library_Unit(unit)
+ return {
+ "library": library,
+ "entity": name,
+ "generics": create_interfaces(nodes.Get_Generic_Chain(ent)),
+ "ports": create_interfaces(nodes.Get_Port_Chain(ent)),
+ }
+
+ def compute_anti_dependences(self):
+ """Return a dictionnary of anti dependencies for design unit"""
+ res = {}
+ lib = libraries.Get_Libraries_Chain()
+ while lib != nodes.Null_Iir:
+ files = nodes.Get_Design_File_Chain(lib)
+ while files != nodes.Null_Iir:
+ units = nodes.Get_First_Design_Unit(files)
+ while units != nodes.Null_Iir:
+ if nodes.Get_Date_State(units) == nodes.Date_State.Analyze:
+ # The unit has been analyzed, so the dependencies are know.
+ deps = nodes.Get_Dependence_List(units)
+ assert deps != nodes.Null_Iir_List
+ deps_it = lists.Iterate(deps)
+ while lists.Is_Valid(byref(deps_it)):
+ el = lists.Get_Element(byref(deps_it))
+ if nodes.Get_Kind(el) == nodes.Iir_Kind.Design_Unit:
+ if res.get(el, None):
+ res[el].append(units)
+ else:
+ res[el] = [units]
+ else:
+ assert False
+ lists.Next(byref(deps_it))
+ units = nodes.Get_Chain(units)
+ files = nodes.Get_Chain(files)
+ lib = nodes.Get_Chain(lib)
+ return res