diff options
author | umarcor <unai.martinezcorral@ehu.eus> | 2020-12-27 22:10:38 +0100 |
---|---|---|
committer | umarcor <unai.martinezcorral@ehu.eus> | 2020-12-27 23:13:21 +0100 |
commit | bf74a0983c2d534e217e7312ab559ca8929ff8a2 (patch) | |
tree | fb630d5b83afc2f112996ebb5f1eac44de868132 /python/vhdl_langserver/workspace.py | |
parent | 340fc792bba2ffdb4f930bc427a39ea3a1b659b2 (diff) | |
download | ghdl-bf74a0983c2d534e217e7312ab559ca8929ff8a2.tar.gz ghdl-bf74a0983c2d534e217e7312ab559ca8929ff8a2.tar.bz2 ghdl-bf74a0983c2d534e217e7312ab559ca8929ff8a2.zip |
rework 'python', rename to 'pyGHDL'
* Rename 'python' to 'pyGHDL'.
* Let 'thin' be 'libghdl'.
* Move move 'pyutils.py' from 'python/libghdl/vhdl' to a separate
package ('pyGHDL/libghdl/utils/').
* Update 'vhdl_langserver' accordingly.
* Rename 'vhdl_langserver' to 'lsp'.
* Move 'ghdl-ls' to 'pyGHDL/cli'.
Diffstat (limited to 'python/vhdl_langserver/workspace.py')
-rw-r--r-- | python/vhdl_langserver/workspace.py | 499 |
1 files changed, 0 insertions, 499 deletions
diff --git a/python/vhdl_langserver/workspace.py b/python/vhdl_langserver/workspace.py deleted file mode 100644 index 9d61225ee..000000000 --- a/python/vhdl_langserver/workspace.py +++ /dev/null @@ -1,499 +0,0 @@ -import logging -import os -import json -from ctypes import byref -import libghdl -import libghdl.thin.errorout_memory as errorout_memory -import libghdl.thin.flags -import libghdl.thin.errorout as errorout -import libghdl.thin.files_map as files_map -import libghdl.thin.libraries as libraries -import libghdl.thin.name_table as name_table -import libghdl.thin.vhdl.nodes as nodes -import libghdl.thin.vhdl.lists as lists -import libghdl.thin.vhdl.std_package as std_package -import libghdl.thin.vhdl.parse -import libghdl.thin.vhdl.pyutils as pyutils -import libghdl.thin.vhdl.sem_lib as sem_lib - -from . import lsp -from . import document, symbols - -log = logging.getLogger(__name__) - - -class ProjectError(Exception): - "Exception raised in case of unrecoverable error in the project file." - - def __init__(self, msg): - super().__init__() - self.msg = msg - - -class Workspace(object): - def __init__(self, root_uri, server): - self._root_uri = root_uri - self._server = server - self._root_path = lsp.path_from_uri(self._root_uri) - self._docs = {} # uri -> doc - self._fe_map = {} # fe -> doc - self._prj = {} - self._last_linted_doc = None - errorout_memory.Install_Handler() - libghdl.thin.flags.Flag_Elocations.value = True - # thin.Flags.Verbose.value = True - # We do analysis even in case of errors. - libghdl.thin.vhdl.parse.Flag_Parse_Parenthesis.value = True - # Force analysis to get more feedback + navigation even in case - # of errors. - libghdl.thin.flags.Flag_Force_Analysis.value = True - # Do not consider analysis order issues. - libghdl.thin.flags.Flag_Elaborate_With_Outdated.value = True - libghdl.thin.errorout.Enable_Warning(errorout.Msgid.Warnid_Unused, True) - self.read_project() - self.set_options_from_project() - libghdl.analyze_init() - self._diags_set = set() # Documents with at least one diagnostic. - self.read_files_from_project() - self.gather_diagnostics(None) - - @property - def documents(self): - return self._docs - - @property - def root_path(self): - return self._root_path - - @property - def root_uri(self): - return self._root_uri - - def _create_document(self, doc_uri, sfe, version=None): - """Create a document and put it in this workspace.""" - doc = document.Document(doc_uri, sfe, version) - self._docs[doc_uri] = doc - self._fe_map[sfe] = doc - return doc - - def create_document_from_sfe(self, sfe, abspath): - # A filename has been given without a corresponding document. - # Create the document. - # Common case: an error message was reported in a non-open document. - # Create a document so that it could be reported to the client. - doc_uri = lsp.path_to_uri(os.path.normpath(abspath)) - return self._create_document(doc_uri, sfe) - - def create_document_from_uri(self, doc_uri, source=None, version=None): - # A document is referenced by an uri but not known. Load it. - # We assume the path is correct. - path = lsp.path_from_uri(doc_uri) - if source is None: - source = open(path).read() - sfe = document.Document.load( - source, os.path.dirname(path), os.path.basename(path) - ) - return self._create_document(doc_uri, sfe) - - def get_or_create_document(self, doc_uri): - res = self.get_document(doc_uri) - if res is not None: - return res - res = self.create_document_from_uri(doc_uri) - res.parse_document() - return res - - def get_document(self, doc_uri): - """Get a document from :param doc_uri: Note that the document may not exist, - and this function may return None.""" - return self._docs.get(doc_uri) - - def put_document(self, doc_uri, source, version=None): - doc = self.get_document(doc_uri) - if doc is None: - doc = self.create_document_from_uri(doc_uri, source=source, version=version) - else: - # The document may already be present (loaded from a project) - # In that case, overwrite it as the client may have a more - # recent version. - doc.reload(source) - return doc - - def sfe_to_document(self, sfe): - """Get the document correspond to :param sfe: source file. - Can create the document if needed.""" - assert sfe != 0 - doc = self._fe_map.get(sfe, None) - if doc is None: - # Could be a document from outside... - filename = pyutils.name_image(files_map.Get_File_Name(sfe)) - if not os.path.isabs(filename): - dirname = pyutils.name_image(files_map.Get_Directory_Name(sfe)) - filename = os.path.join(dirname, filename) - doc = self.create_document_from_sfe(sfe, filename) - return doc - - def add_vhdl_file(self, name): - log.info("loading %s", name) - if os.path.isabs(name): - absname = name - else: - absname = os.path.join(self._root_path, name) - # Create a document for this file. - try: - fd = open(absname) - sfe = document.Document.load(fd.read(), self._root_path, name) - fd.close() - except OSError as err: - self._server.show_message( - lsp.MessageType.Error, "cannot load {}: {}".format(name, err.strerror) - ) - return - doc = self.create_document_from_sfe(sfe, absname) - doc.parse_document() - - def read_project(self): - prj_file = os.path.join(self.root_path, "hdl-prj.json") - if not os.path.exists(prj_file): - log.info("project file %s does not exist", prj_file) - return - try: - f = open(prj_file) - except OSError as err: - self._server.show_message( - lsp.MessageType.Error, - "cannot open project file {}: {}".format(prj_file, err.strerror), - ) - return - log.info("reading project file %s", prj_file) - try: - self._prj = json.load(f) - except json.decoder.JSONDecodeError as e: - log.info("error in project file") - self._server.show_message( - lsp.MessageType.Error, - "json error in project file {}:{}:{}".format( - prj_file, e.lineno, e.colno - ), - ) - f.close() - - def set_options_from_project(self): - try: - if self._prj is None: - return - if not isinstance(self._prj, dict): - raise ProjectError("project file is not a dictionnary") - opts = self._prj.get("options", None) - if opts is None: - return - if not isinstance(opts, dict): - raise ProjectError("'options' is not a dictionnary") - ghdl_opts = opts.get("ghdl_analysis", None) - if ghdl_opts is None: - return - log.info("Using options: %s", ghdl_opts) - for opt in ghdl_opts: - if not libghdl.set_option(opt.encode("utf-8")): - self._server.show_message( - lsp.MessageType.Error, "error with option: {}".format(opt) - ) - except ProjectError as e: - self._server.show_message( - lsp.MessageType.Error, "error in project file: {}".format(e.msg) - ) - - def read_files_from_project(self): - try: - files = self._prj.get("files", []) - if not isinstance(files, list): - raise ProjectError("'files' is not a list") - for f in files: - if not isinstance(f, dict): - raise ProjectError("an element of 'files' is not a dict") - name = f.get("file") - if not isinstance(name, str): - raise ProjectError("a 'file' is not a string") - lang = f.get("language", "vhdl") - if lang == "vhdl": - self.add_vhdl_file(name) - except ProjectError as e: - self._server.show_message( - lsp.MessageType.Error, "error in project file: {}".format(e.msg) - ) - - def get_configuration(self): - self._server.configuration( - [{"scopeUri": "", "section": "vhdl.maxNumberOfProblems"}] - ) - - def gather_diagnostics(self, doc): - # Gather messages (per file) - nbr_msgs = errorout_memory.Get_Nbr_Messages() - diags = {} - diag = {} - for i in range(nbr_msgs): - hdr = errorout_memory.Get_Error_Record(i + 1) - msg = errorout_memory.Get_Error_Message(i + 1).decode("utf-8") - if hdr.file == 0: - # Possible for error limit reached. - continue - err_range = { - "start": {"line": hdr.line - 1, "character": hdr.offset}, - "end": {"line": hdr.line - 1, "character": hdr.offset + hdr.length}, - } - if hdr.group <= errorout_memory.Msg_Main: - if hdr.id <= errorout.Msgid.Msgid_Note: - severity = lsp.DiagnosticSeverity.Information - elif hdr.id <= errorout.Msgid.Msgid_Warning: - severity = lsp.DiagnosticSeverity.Warning - else: - severity = lsp.DiagnosticSeverity.Error - diag = { - "source": "ghdl", - "range": err_range, - "message": msg, - "severity": severity, - } - if hdr.group == errorout_memory.Msg_Main: - diag["relatedInformation"] = [] - fdiag = diags.get(hdr.file, None) - if fdiag is None: - diags[hdr.file] = [diag] - else: - fdiag.append(diag) - else: - assert diag - if True: - doc = self.sfe_to_document(hdr.file) - diag["relatedInformation"].append( - { - "location": {"uri": doc.uri, "range": err_range}, - "message": msg, - } - ) - errorout_memory.Clear_Errors() - # Publish diagnostics - for sfe, diag in diags.items(): - doc = self.sfe_to_document(sfe) - self.publish_diagnostics(doc.uri, diag) - if doc is not None and doc._fe not in diags: - # Clear previous diagnostics for the doc. - self.publish_diagnostics(doc.uri, []) - - def obsolete_dependent_units(self, unit, antideps): - """Obsolete units that depends of :param unit:""" - udeps = antideps.get(unit, None) - if udeps is None: - # There are no units. - return - # Avoid infinite recursion - antideps[unit] = None - for un in udeps: - log.debug( - "obsolete %d %s", un, pyutils.name_image(nodes.Get_Identifier(un)) - ) - # Recurse - self.obsolete_dependent_units(un, antideps) - if nodes.Set_Date_State(un) == nodes.Date_State.Disk: - # Already obsolete! - continue - # FIXME: just de-analyze ? - nodes.Set_Date_State(un, nodes.Date_State.Disk) - sem_lib.Free_Dependence_List(un) - loc = nodes.Get_Location(un) - fil = files_map.Location_To_File(loc) - pos = files_map.Location_File_To_Pos(loc, fil) - line = files_map.Location_File_To_Line(loc, fil) - col = files_map.Location_File_Line_To_Offset(loc, fil, line) - nodes.Set_Design_Unit_Source_Pos(un, pos) - nodes.Set_Design_Unit_Source_Line(un, line) - nodes.Set_Design_Unit_Source_Col(un, col) - - def obsolete_doc(self, doc): - if doc._tree == nodes.Null_Iir: - return - # Free old tree - assert nodes.Get_Kind(doc._tree) == nodes.Iir_Kind.Design_File - if self._last_linted_doc == doc: - antideps = None - else: - antideps = self.compute_anti_dependences() - unit = nodes.Get_First_Design_Unit(doc._tree) - while unit != nodes.Null_Iir: - if antideps is not None: - self.obsolete_dependent_units(unit, antideps) - # FIXME: free unit; it is not referenced. - unit = nodes.Get_Chain(unit) - libraries.Purge_Design_File(doc._tree) - doc._tree = nodes.Null_Iir - - def lint(self, doc_uri): - doc = self.get_document(doc_uri) - self.obsolete_doc(doc) - doc.compute_diags() - self.gather_diagnostics(doc) - - def apply_changes(self, doc_uri, contentChanges, new_version): - doc = self.get_document(doc_uri) - assert doc is not None, "try to modify a non-loaded document" - self.obsolete_doc(doc) - prev_sfe = doc._fe - for change in contentChanges: - doc.apply_change(change) - if doc._fe != prev_sfe: - del self._fe_map[prev_sfe] - self._fe_map[doc._fe] = doc - # Like lint - doc.compute_diags() - self.gather_diagnostics(doc) - - def check_document(self, doc_uri, source): - self._docs[doc_uri].check_document(source) - - def rm_document(self, doc_uri): - pass - - def apply_edit(self, edit): - return self._server.request("workspace/applyEdit", {"edit": edit}) - - def publish_diagnostics(self, doc_uri, diagnostics): - self._server.notify( - "textDocument/publishDiagnostics", - params={"uri": doc_uri, "diagnostics": diagnostics}, - ) - - def show_message(self, message, msg_type=lsp.MessageType.Info): - self._server.notify( - "window/showMessage", params={"type": msg_type, "message": message} - ) - - def declaration_to_location(self, decl): - "Convert declaration :param decl: to an LSP Location" - decl_loc = nodes.Get_Location(decl) - if decl_loc == std_package.Std_Location.value: - # There is no real file for the std.standard package. - return None - if decl_loc == libraries.Library_Location.value: - # Libraries declaration are virtual. - return None - fe = files_map.Location_To_File(decl_loc) - doc = self.sfe_to_document(fe) - res = {"uri": doc.uri} - nid = nodes.Get_Identifier(decl) - res["range"] = { - "start": symbols.location_to_position(fe, decl_loc), - "end": symbols.location_to_position( - fe, decl_loc + name_table.Get_Name_Length(nid) - ), - } - return res - - def goto_definition(self, doc_uri, position): - decl = self._docs[doc_uri].goto_definition(position) - if decl is None: - return None - decl_loc = self.declaration_to_location(decl) - if decl_loc is None: - return None - res = [decl_loc] - if nodes.Get_Kind(decl) == nodes.Iir_Kind.Component_Declaration: - ent = libraries.Find_Entity_For_Component(nodes.Get_Identifier(decl)) - if ent != nodes.Null_Iir: - res.append(self.declaration_to_location(nodes.Get_Library_Unit(ent))) - return res - - def x_show_all_files(self): - res = [] - for fe in range(1, files_map.Get_Last_Source_File_Entry() + 1): - doc = self._fe_map.get(fe, None) - res.append( - { - "fe": fe, - "uri": doc.uri if doc is not None else None, - "name": pyutils.name_image(files_map.Get_File_Name(fe)), - "dir": pyutils.name_image(files_map.Get_Directory_Name(fe)), - } - ) - return res - - def x_get_all_entities(self): - res = [] - lib = libraries.Get_Libraries_Chain() - while lib != nodes.Null_Iir: - files = nodes.Get_Design_File_Chain(lib) - ents = [] - while files != nodes.Null_Iir: - units = nodes.Get_First_Design_Unit(files) - while units != nodes.Null_Iir: - unitlib = nodes.Get_Library_Unit(units) - if nodes.Get_Kind(unitlib) == nodes.Iir_Kind.Entity_Declaration: - ents.append(unitlib) - units = nodes.Get_Chain(units) - files = nodes.Get_Chain(files) - ents = [pyutils.name_image(nodes.Get_Identifier(e)) for e in ents] - lib_name = pyutils.name_image(nodes.Get_Identifier(lib)) - res.extend([{"name": n, "library": lib_name} for n in ents]) - lib = nodes.Get_Chain(lib) - return res - - def x_get_entity_interface(self, library, name): - def create_interfaces(inters): - res = [] - while inters != nodes.Null_Iir: - res.append( - { - "name": name_table.Get_Name_Ptr( - nodes.Get_Identifier(inters) - ).decode("latin-1") - } - ) - inters = nodes.Get_Chain(inters) - return res - - # Find library - lib_id = name_table.Get_Identifier(library.encode("utf-8")) - lib = libraries.Get_Library_No_Create(lib_id) - if lib == name_table.Null_Identifier: - return None - # Find entity - ent_id = name_table.Get_Identifier(name.encode("utf-8")) - unit = libraries.Find_Primary_Unit(lib, ent_id) - if unit == nodes.Null_Iir: - return None - ent = nodes.Get_Library_Unit(unit) - return { - "library": library, - "entity": name, - "generics": create_interfaces(nodes.Get_Generic_Chain(ent)), - "ports": create_interfaces(nodes.Get_Port_Chain(ent)), - } - - def compute_anti_dependences(self): - """Return a dictionnary of anti dependencies for design unit""" - res = {} - lib = libraries.Get_Libraries_Chain() - while lib != nodes.Null_Iir: - files = nodes.Get_Design_File_Chain(lib) - while files != nodes.Null_Iir: - units = nodes.Get_First_Design_Unit(files) - while units != nodes.Null_Iir: - if nodes.Get_Date_State(units) == nodes.Date_State.Analyze: - # The unit has been analyzed, so the dependencies are know. - deps = nodes.Get_Dependence_List(units) - assert deps != nodes.Null_Iir_List - deps_it = lists.Iterate(deps) - while lists.Is_Valid(byref(deps_it)): - el = lists.Get_Element(byref(deps_it)) - if nodes.Get_Kind(el) == nodes.Iir_Kind.Design_Unit: - if res.get(el, None): - res[el].append(units) - else: - res[el] = [units] - else: - assert False - lists.Next(byref(deps_it)) - units = nodes.Get_Chain(units) - files = nodes.Get_Chain(files) - lib = nodes.Get_Chain(lib) - return res |