# -*- coding: utf-8 -*-
import os
import csv
import StringIO
import hashlib
import zipfile
from . import HAVE_GEOJSON
if HAVE_GEOJSON:
import geojson
import bng_to_latlon # https://github.com/fmalina/bng_latlon
from . import FORMATS
import ogt.ags4
import ogt.ogt_group
import ogt.utils
[docs]class OGTDocument:
"""Class :class:`~ogt.ogt_doc.OGTDocument` represents an ags file and
contains the groups (:class:`~ogt.ogt_group.OGTGroup`).
.. code-block:: python
from ogt import ogt_doc
doc = ogt_doc.OGTDocument()
err = doc.load_ags4_file("/path/to/my.ags")
if err:
print err
else:
# print the groups index
print doc.groups_index()
# Headings in the SAMP group
print doc.group("SAMP").headings()
# Return a list of units used in the document
print doc.units()
"""
def __init__(self):
self.source_file_path = None
"""Full path to original source file, if any"""
self.source = ""
"""The original source files contents as string"""
self.groups = {}
"""A `dict` of group code to :class:`~ogt.ogt_group.OGTGroup` instances"""
self.lines = []
"""A `list` of strings with original source lines"""
self.csv_rows = []
"""A `list` of a list of csv rows"""
self.error_rows = {}
"""A `list` of rows with errors"""
[docs] def hash(self):
"""Calculate the `sha1` hash
:rtype: str
:return: A **`str`** with the hash
.. seealso:: See also
- https://en.wikipedia.org/wiki/SHA-1
- https://docs.python.org/2/library/sha.html
"""
hasher = hashlib.sha1()
hasher.update(self.source)
return hasher.hexdigest()
[docs] def groups_sort(self):
"""Return a list of group_codes in preferred order (see :func:`~ogt.ogt_group.groups_sort`)"""
return ogt.ogt_group.groups_sort(self.groups.keys())
[docs] def groups_count(self):
"""Returns no of groups in document
:rtype: int
:return: Groups count
"""
return len(self.groups.keys())
[docs] def append_group(self, grp):
"""Appends an :class;`~ogt.ogt_group.OGTGroup` instance to this document
:param grp: The group object to add
:type grp: ~ogt.ogt_group.OGTGroup
:return: An `Error` message is group exists, else `None`
"""
if grp.group_code in self.groups:
return "Error: Group already exists in doc"
grp.docParent = self
#self.groups_sort.append(grp.group_code)
self.groups[grp.group_code] = grp
return None
[docs] def group(self, group_code):
"""
:param group_code: Four character group code
:type group_code: str
:return: An instance of :class:`~ogt.ogt_group.OGTGroup` if exists, else `None`
"""
return self.groups.get(group_code)
[docs] def proj(self):
"""Shortcut to `PROJ` group object
:return: An instance of :class:`~ogt.ogt_group.OGTGroup` if exists, else `None`
"""
return self.group("PROJ")
[docs] def proj_dict(self):
"""Shortcut to `PROJ` group data
:return: A dict with data if exists, else `None`
"""
grpOb = self.group("PROJ")
if not grpOb:
return None
#print grpOb.data[0]
if len(grpOb.data) > 0:
return grpOb.data[0]
return None
[docs] def units(self):
"""Shortcut to `UNIT` group
:rtype: tuple
:return:
- An instance of :class:`~ogt.ogt_group.OGTGroup` if exists, else `None`
- `bool` = `True` if group found in document, else false
"""
return self.group("UNIT")
[docs] def types(self):
"""Shortcut to `TYPE` group
:rtype: tuple
:return:
- An instance of :class:`~ogt.ogt_group.OGTGroup` if exists, else `None`
- `bool` = `True` if group found in document, else false
"""
return self.group("TYPE")
[docs] def write(self, ext="json", beside=False, file_path=None,
include_source=False, edit_mode=False, minify=False,
zip=False, overwrite=False, include_stats=False):
"""Write out the data to file in the selected format
:param ext: The file format, see :data:`~ogt.__init__.FORMATS`
:type ext: str
:type beside: bool
:param beside: Save the output file alongside the original with extention appended, eg
- Source = `/path/to/myproject.ags`
- Output = `/path/to/myproject.ags.json`
:param file_path: Relative or absolute path to write to including extention
:type file_path: str
:param include_source: If `True`, the original ags source is also included.
:type include_source: bool
:param zip: If `True`, the original and converted file are packaged in a zip
:type zip: bool
:param minify: If `True`, all white space is removed from output file
:type minify: bool
:param overwrite: If `True`, the target file is overwritten, otherwise an error is returned
:type overwrite: bool
:return: A tuple with
- A `Message` string if no errors, else `None`
- Any `Error` that occured, otherwise `None`
.. Note:: **Note**
- Either **`beside=True`** or a **`file_path`** is required, otherwise and error occurs
- If both are provided, and error is returned
"""
#print "----------------"
stats = self.stats()['site_geometry']
#print ".................."
## Do some validations
if not ext in FORMATS:
return None, "Error: Invalid format specified - `%s` % ext. Use %s" % ",".join(FORMATS)
if beside == False and file_path == None:
return None, "Error: need an output, either -b or -w"
if beside == True and file_path != None:
return None, "Error: conflict in options, either -b or -w, not BOTH"
## make target filename's
base_name = os.path.basename(self.source_file_path)
target_file_path = None
if beside:
# File is beside the original
if zip:
target_file_path = self.source_file_path + ".zip"
else:
target_file_path = self.source_file_path + ".%s" % ext
else:
# file is from argument
target_file_path = file_path
base_name = os.path.basename(file_path)
if len(base_name) == 0:
# directory given only
return None, "Error: Invalid file name `%s`" % target_file_path
parts = base_name.split(".")
if len(parts) == 1:
# no extention
return None, "Error: Invalid file name `%s`" % target_file_path
# Check the extention is what we expect
gext = parts[-1]
if zip == False and gext != ext:
return None, "Error: Conflict in file name extention, expected '%s' `%s`" % (ext, target_file_path)
elif zip == True and gext != "zip":
# extentions mismatched eg json != yaml
return None, "Error: Conflict in file name extention expected 'zip' `%s`" % target_file_path
## warn if not overwrite
if overwrite == False:
if os.path.exists(target_file_path):
return None, "Error: Target file exists - `%s` " % target_file_path
## convert the file to target format string blob
blob = None
err = None
if ext in ["js", "json"]:
blob, err = self.to_json(include_source=include_source, edit_mode=edit_mode, minify=minify, include_stats=include_stats)
elif ext == "geojson":
blob, err = self.to_geojson(minify=minify)
elif ext == "yaml":
blob, err = self.to_yaml(include_source=include_source, edit_mode=edit_mode, include_stats=include_stats)
elif ext == "ags4":
blob, err = ogt.ags.ags4.doc_to_ags4_csv(self)
else:
return None, "Error: No valid output format specified - `%s` % ext"
if err:
return None, err
if zip:
# create zip
try:
zipee = zipfile.ZipFile(target_file_path, mode="w")
# add source file
zipee.writestr( base_name, self.source)
# add converted file
zipee.writestr( "%s.%s" % (base_name, ext), blob)
# write out and done
zipee.close()
siz = ogt.utils.file_size(target_file_path, human=True)
return "Wrote: %s `%s`" % (siz, target_file_path), None
except Exception as e:
return None, "Error: %s" % str(e)
else:
try:
with open(target_file_path, "w") as f:
f.write(blob)
f.close()
siz = ogt.utils.file_size(target_file_path, human=True)
return "Wrote: %s `%s`" % (siz, target_file_path), None
except Exception as e:
return None, "Error: %s" % str(e)
return None, "Error: OOPS unexpected error"
[docs] def to_dict(self, include_source=False, edit_mode=False, include_stats=False):
"""Return the document data
:param include_source: if `True` then the source string is included in the **source:** key.
:type include_source: bool
:param edit_mode: see :ref:`edit_mode`
:type edit_mode: bool
:rtype: dict
:return: A `dict` with the data
"""
# base dict to return
dic = dict(file_name=self.source_file_path,
version="ags4",
groups={},
hash=self.hash())
# loop groups and add struct based on edit_mode
for k, g in self.groups.iteritems():
dic['groups'][k] = g.to_dict(edit_mode=edit_mode)
# include source raw source
if include_source:
dic['source'] = self.source
# include statistics
if include_stats:
dic['stats'] = self.stats()
return dic
[docs] def to_json(self, include_source=False, edit_mode=False, minify=False, include_stats=False):
"""Return the document data in :ref:`json` format
:param include_source: if `True` then the source string is included in the **source:** key.
:type include_source: bool
:param edit_mode: see :ref:`edit_mode`
:type edit_mode: bool
:rtype: str
:return: A tuple with:
- `None` if error else a `str` with :ref:`json` encoded data
- An `error` string is error occured, else `None`
"""
return ogt.utils.to_json( self.to_dict(include_source=include_source,
include_stats=include_stats,
edit_mode=edit_mode),
minify=minify)
[docs] def to_yaml(self, include_source=False, edit_mode=False, include_stats=False):
"""Return the document data in :ref:`yaml` format
:param include_source: if `True` then the source string is included in the **source:** key.
:type include_source: bool
:param edit_mode: see :ref:`edit_mode`
:type edit_mode: bool
:rtype: str
:return: A tuple with:
- `None` if error else a `str` with :ref:`yaml` encoded data
- An `error` string is error occured, else `None`
"""
return ogt.utils.to_yaml( self.to_dict(include_source=include_source,
include_stats=include_stats,
edit_mode=edit_mode) )
[docs] def to_geojson(self, minify=False):
loca = self.group("LOCA")
if loca == None:
return None, "No `LOCA` Group"
def make_feature(rec, lat, lon):
props = dict(PointID=rec.get("LOCA_ID"), Type=rec.get("LOCA_TYPE"), GroundLevel=rec.get("LOCA_GL"))
return geojson.Feature(geometry=geojson.Point((lon, lat)), properties=props)
features = []
## WSG84
if "LOCA_LAT" in loca.headings and "LOCA_LON" in loca.headings:
for rec in loca.data:
lat_s = rec.get("LOCA_LAT")
lon_s = rec.get("LOCA_LON")
if lat_s and lon_s:
features.append(make_feature(rec, lat_s, lon_s))
## BNG British National grid
elif "LOCA_NATE" in loca.headings and "LOCA_NATN" in loca.headings:
for rec in loca.data:
east = ogt.utils.to_int(rec.get("LOCA_NATE"))
north = ogt.utils.to_int(rec.get("LOCA_NATN"))
if east and north:
lat, lon = bng_to_latlon.OSGB36toWGS84(east, north)
features.append(make_feature(rec, lat, lon))
print rec
print "ere", features
if len(features) > 0:
f = geojson.FeatureCollection(features)
print f
print ogt.utils.to_json(f, minify=minify)
return ogt.utils.to_json(f, minify=minify)
return None, None
[docs] def write_excel(self):
"""Experimental writing to xlsx"""
wbook = openpyxl.Workbook()
for idx, ki in enumerate(self.groups_sort):
grpobj = self.groups_sort[ki]
if idx == 0:
## By default an empty workbook has a first sheet
sheet = wbook.active
sheet.title = ki
else:
sheet = wbook.create_sheet(title=ki)
# DAMN this is where groups order goes mad
wbook.save(self.file_path + ".xlsx")
[docs] def stats(self):
dic = {}
## Number of locations
locaGrp = self.group("LOCA")
if locaGrp == None:
dic['locations'] = None
else:
recs = locaGrp.data_column("LOCA_ID")
dic['locations'] = dict(count=len(recs), data=recs)
## Data rows
lst = []
for gc in sorted(self.groups.keys()):
grp = self.group(gc)
lst.append(dict(GROUP=gc, count=len(grp.data)))
dic['data'] = lst
## Sample Types
grp = self.group("SAMP")
if not grp:
dic['sample_types'] = None
else:
d = {}
recs = grp.data_column("SAMP_TYPE")
for st in sorted(recs):
if not st in d:
d[st] = 0
d[st] += 1
dic['sample_types'] = d
## Site Geom
d = {}
# TODO X.Y.Z
d['LOCA_LOCX'] = "todo"
d['LOCA_LOCY'] = "todo"
d['LOCA_LOCZ'] = "todo"
# National Grid
def calc_ng_stats(recs):
# TODO - need to check type casting ?
if recs == None:
return None
ds = {}
ds['min'] = min(recs)
ds['max'] = max(recs)
ds['row_count'] = len(recs)
ds['rows_with_data'] = 0
ds['rows_without_data'] = 0
for rec in recs:
if rec == "":
ds['rows_without_data'] += 1
else:
ds['rows_with_data'] += 1
return ds
recs = locaGrp.data_column("LOCA_NATE")
d['LOCA_NATE'] = calc_ng_stats(recs)
recs = locaGrp.data_column("LOCA_NATN")
d['LOCA_NATN'] = calc_ng_stats(recs)
recs = locaGrp.data_column("LOCA_GL")
d['LOCA_GL'] = calc_ng_stats(recs)
dic['site_geometry'] = d
# GEOL
grp = self.group("GEOL")
if not grp:
dic['geol'] = None
else:
recs = grp.data_column("LOCA_ID")
locs = dic['locations']['data']
ll = []
for l in locs:
if not l in recs:
if not l in ll:
ll.append(l)
dic['geol'] = dict(no_entries=ll if len(ll) > 0 else None)
# SAMP
grp = self.group("SAMP")
if not grp:
dic['samp'] = None
else:
recs = grp.data_column("LOCA_ID")
locs = dic['locations']['data']
ll = []
for l in locs:
if not l in recs:
if not l in ll:
ll.append(l)
dic['samp'] = dict(no_entries=ll if len(ll) > 0 else None)
## Unused Groups
all_g = ogt.ags4.groups()
dic['unused_groups'] = None
ags_groups = all_g.keys()
dic['unused_groups'] = sorted(list( set(ags_groups) - set(self.groups.keys())))
return dic
[docs] def load_ags4_file( self, ags4_file_path):
"""Loads document from an :term:`ags4` formatted file
:param ags4_file_path: absolute or relative path to file, will be at source_file_path
:type ags4_file_path: str
:rtype: str
:return: A String if an error else None
.. todo:: Ensure we can read ascii
"""
try:
# TODO ensure asccii ??
self.source_file_path = ags4_file_path
with open(ags4_file_path, "r") as f:
err = self.load_ags4_string( f.read() )
if err:
return err
return None
except IOError as e:
return None, e
# should never happen
return "WTF in `load_ags4_file`"
[docs] def load_ags4_string(self, ags4_str):
"""Load document from an :term:`ags4` formatted string
Hackers guide
This is a tthree step parsing process.
-
:param ags4_str: string to load
:type ags4_str: str
:rtype: str
:return: An `Error` message if string not loaded, else `None`
"""
## Copy source as a string into mem here
self.source = ags4_str
# first:
# - split ags_string into lines
# - and parse each line into csv
# - and add to the doc
for lidx, line in enumerate(self.source.split("\n")):
# removing and trailing whitespace eg \r
# were on nix land, so assemble with CRLF when dumping to ags
stripped = line.strip()
if stripped == "":
# blank line
self.lines.append([])
self.csv_rows.append([])
continue
# decode the csv line
reader = csv.reader( StringIO.StringIO(stripped) )
row = reader.next() # first row of reader
self.lines.append(line)
self.csv_rows.append(row)
# second
# walk the decoded rows, and recognise the groups
# me mark the start_index, and end index of group
curr_grp = None
for lidx, row in enumerate(self.csv_rows):
line_no = lidx + 1
lenny = len(row)
#print row
if lenny == 0:
# blank row so reset groups
if curr_grp:
curr_grp.csv_end_index = lidx
#print "idx=", curr_grp.csv_start_index, curr_grp.csv_end_index
#print curr_grp.csv_rows()
curr_grp = None
continue
if lenny < 2:
# min of two items, so add to errors
self.error_rows[lidx + 1] = row
else:
typ = row[0] # first item is row type
#xrow = row[1:] # row without data descriptor
if typ == ogt.ags4.AGS4_DESCRIPTOR.group:
## we got a new group
curr_grp = ogt.ogt_group.OGTGroup(row[1])
#curr_grp.csv_rows.append(row)
curr_grp.csv_start_index = lidx
self.append_group(curr_grp)
else:
if curr_grp == None:
self.error_rows[line_no] = row
#else:
# curr_grp.csv_rows.append(row)
# thirdly
# - we parse each group's csv rows into the parts
for group_code, grp in self.groups.items():
#print group_code, "<<<<<<<<<"
#print grp.csv_rows()
for idx, row in enumerate(grp.csv_rows()):
typ = row[0]
xrow = row[1:] # row without data descriptor
if typ == ogt.ags4.AGS4_DESCRIPTOR.group:
pass
elif typ == ogt.ags4.AGS4_DESCRIPTOR.heading:
grp.headings_source_sort = xrow
for idx, head_code in enumerate(grp.headings_source_sort):
grp.headings[head_code] = xrow[idx]
elif typ == ogt.ags4.AGS4_DESCRIPTOR.unit:
if grp.headings_source_sort == None:
self.error_rows[line_no] = row
else:
for idx, head_code in enumerate(grp.headings_source_sort):
grp.units[head_code] = xrow[idx]
elif typ == ogt.ags4.AGS4_DESCRIPTOR.type:
if grp.headings_source_sort == None:
self.error_rows[line_no] = row
else:
for idx, head_code in enumerate(grp.headings_source_sort):
grp.types[head_code] = xrow[idx]
elif typ == ogt.ags4.AGS4_DESCRIPTOR.data:
if grp.headings_source_sort == None:
self.error_rows[line_no] = row
else:
dic = {}
for idx, head_code in enumerate(grp.headings_source_sort):
dic[head_code] = xrow[idx]
grp.data.append( dic )
print self.error_rows
return None
[docs]def create_doc_from_ags4_file(ags_file_path):
"""Convenience function to create and load an OGTDocument from an ags file
.. code-block:: python
doc, err = ogt_doc.create_doc_from_ags4_file("/path/to/my.ags")
if err:
print err
else:
print doc.group("PROJ")
"""
doc = OGTDocument()
err = doc.load_ags4_file(ags_file_path)
return doc, err
[docs]def create_doc_from_json_file( json_file_path):
"""Creates a document from a :ref:`json` formatted file
.. code-block:: python
doc, err = ogt_doc.create_doc_from_json_file("/path/to/my.json")
if err:
print err
:param json_file_path: absolute or relative path to file
:type json_file_path: str
:rtype: tuple
:return: A `tuple` containing
- An :class:`~ogt.ogt_doc.OGTDocument` object on success, else `None`
- An `Error` message if error, otherwise `None`
"""
data, err = ogt.utils.read_json_file(json_file_path)
if err:
return None, err
groups = data.get('groups')
if groups == None:
return None, "Error: no `groups` key in json file"
doc = ogt.ogt_doc.OGTDocument()
doc.source_file_path = json_file_path
for group_code in groups.keys():
group = groups[group_code]
grp = ogt.ogt_group.OGTGroup(group_code)
doc.append_group(grp)
## add units + also headings
for head_code in group['UNIT'].keys():
valu = str(group['UNIT'][head_code])
grp.units[head_code] = valu
grp.headings[head_code] = valu
## add TYPE
for head_code in group['TYPE'].keys():
valu = str(group['TYPE'][head_code])
grp.types[head_code] = valu
## add data
for rec in group['DATA']:
dic = {}
for head_code in rec.keys():
dic[head_code] = str(rec[head_code])
grp.data.append(dic)
return doc, None