I had 3 different tries at doing this. Earlier this week, I got very big on spring-boot -- I still am, incidentally. Then I tried it, briefly, in PHP, before returning home, to Python, which is resembling Java more and more with every release.
Without further ado:
#!~/.virtualenvs/around-web/bin/python
import binascii
import cStringIO as StringIO
import csv
import json
import logging
import time
from flask import Flask, request, session, redirect, url_for, abort, render_template, flash
from sqlalchemy import create_engine, Column, Integer, Sequence, String, DateTime, Float, BIGINT, Table, MetaData, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import select, expression, text
from sqlalchemy.types import UserDefinedType, _Binary, TypeDecorator
Base = declarative_base()
# Python datatypes
class GisElement(object):
"""Represents a geometry value."""
def __str__(self):
return self.desc
class BinaryGisElement(GisElement, expression.FunctionElement):
"""Represents a Geometry value expressed as binary."""
def __init__(self, data):
self.data = data
self.name = 'ST_GeomFromEWKB'
self.is_literal = True
expression.FunctionElement.__init__(self, "ST_GeomFromEWKB", data,
type_=Geometry(coerce_="binary"))
@property
def desc(self):
return self.as_hex
@property
def as_hex(self):
return binascii.hexlify(self.data)
class TextualGisElement(GisElement, expression.FunctionElement):
"""Represents a Geometry value expressed as text."""
name = 'ST_GeomFromText'
table = None
def __init__(self, desc, srid=-1):
self.is_literal = True
logging.debug('TextualGISElement constructor')
desc = desc
expression.FunctionElement.__init__(self, func.ST_GeomFromText, desc, srid,
type_=Geometry)
# SQL datatypes.
class Geometry(UserDefinedType):
"""Base PostGIS Geometry column type."""
name = "GEOMETRY"
def __init__(self, dimension=None, srid=-1,
coerce_="text"):
self.dimension = dimension
self.srid = srid
self.coerce = coerce_
class comparator_factory(UserDefinedType.Comparator):
"""Define custom operations for geometry types."""
# override the __eq__() operator
def __eq__(self, other):
return self.op('~=')(other)
# add a custom operator
def intersects(self, other):
return self.op('&&')(other)
# any number of GIS operators can be overridden/added here
# using the techniques above.
def _coerce_compared_value(self, op, value):
return self
def get_col_spec(self):
return 'Geometry'
def bind_expression(self, bindvalue):
if self.coerce == "text":
return TextualGisElement(bindvalue)
elif self.coerce == "binary":
return BinaryGisElement(bindvalue)
else:
assert False
def column_expression(self, col):
if self.coerce == "text":
return func.ST_AsText(col, type_=self)
elif self.coerce == "binary":
return func.ST_AsBinary(col, type_=self)
else:
assert False
def bind_processor(self, dialect):
def process(value):
if isinstance(value, GisElement):
return value.desc
else:
return value
return process
def result_processor(self, dialect, coltype):
if self.coerce == "text":
fac = TextualGisElement
elif self.coerce == "binary":
fac = BinaryGisElement
else:
assert False
def process(value):
if value is not None:
return fac(value)
else:
return value
return process
def adapt(self, impltype):
return impltype(dimension=self.dimension,
srid=self.srid, coerce_=self.coerce)
# SQL datatypes.
class Geometry(UserDefinedType):
"""Base PostGIS Geometry column type."""
def __init__(self, dimension=None, srid=-1,
coerce_="text"):
self.dimension = dimension
self.srid = srid
self.name = 'GEOMETRY'
self.coerce = coerce_
class comparator_factory(UserDefinedType.Comparator):
"""Define custom operations for geometry types."""
# override the __eq__() operator
def __eq__(self, other):
return self.op('~=')(other)
# add a custom operator
def intersects(self, other):
return self.op('&&')(other)
# any number of GIS operators can be overridden/added here
# using the techniques above.
def _coerce_compared_value(self, op, value):
return self
def get_col_spec(self):
return self.coerce
def bind_expression(self, bindvalue):
if self.coerce == "text":
return TextualGisElement(bindvalue)
elif self.coerce == "binary":
return BinaryGisElement(bindvalue)
else:
assert False
def bind_processor(self, dialect):
def process(value):
if isinstance(value, GisElement):
return value.desc
else:
return value
return process
def result_processor(self, dialect, coltype):
if self.coerce == "text":
fac = TextualGisElement
elif self.coerce == "binary":
fac = BinaryGisElement
else:
assert False
def process(value):
if value is not None:
return fac(value)
else:
return value
return process
def adapt(self, impltype):
return impltype(dimension=self.dimension,
srid=self.srid, coerce_=self.coerce)
# other datatypes can be added as needed.
class Point(Geometry):
name = 'POINT'
# DDL integration
# Postgis historically has required AddGeometryColumn/DropGeometryColumn
# and other management methods in order to create Postgis columns. Newer
# versions don't appear to require these special steps anymore. However,
# here we illustrate how to set up these features in any case.
def setup_ddl_events():
@event.listens_for(Table, "before_create")
def before_create(target, connection, **kw):
dispatch("before-create", target, connection)
@event.listens_for(Table, "after_create")
def after_create(target, connection, **kw):
dispatch("after-create", target, connection)
@event.listens_for(Table, "before_drop")
def before_drop(target, connection, **kw):
dispatch("before-drop", target, connection)
@event.listens_for(Table, "after_drop")
def after_drop(target, connection, **kw):
dispatch("after-drop", target, connection)
def dispatch(event, table, bind):
if event in ('before-create', 'before-drop'):
regular_cols = [c for c in table.c if not
isinstance(c.type, Geometry)]
gis_cols = set(table.c).difference(regular_cols)
table.info["_saved_columns"] = table.c
# temporarily patch a set of columns not including the
# Geometry columns
table.columns = expression.ColumnCollection(*regular_cols)
if event == 'before-drop':
for c in gis_cols:
bind.execute(
select([
func.DropGeometryColumn(
'public', table.name, c.name)],
autocommit=True)
)
elif event == 'after-create':
table.columns = table.info.pop('_saved_columns')
for c in table.c:
if isinstance(c.type, Geometry):
bind.execute(
select([
func.AddGeometryColumn(
table.name, c.name,
c.type.srid,
c.type.name,
c.type.dimension)],
autocommit=True)
)
elif event == 'after-drop':
table.columns = table.info.pop('_saved_columns')
metadata = MetaData()
class Location_history(Base):# Table('history', metadata, Column('history_id', Integer, primary_key = True), Column('device_id', String), Column('device_timestamp', DateTime), Column('location', Geometry))
__tablename__ = 'history'
history_id = Column(Integer, primary_key = True)
device_id = Column(String)
device_timestamp = Column(DateTime)
latitude = Column(Float)
longitude = Column(Float)
def location(self):
return ' ({}, {})'.format(self.latitude, self.longitude)
def __str__(self):
return '%s @ %s at timestamp %s'.format(self.device_id, self.location(), self.device_timestamp)
def keys(self):
return ['device_timestamp','device_id','location']
app = Flask(__name__)
def results():
Session = sessionmaker()
engine = create_engine(u'postgres://pgsql@localhost/Around')
Session.configure(bind = engine)
session = Session()
rows = session.query(Location_history).all()
return rows
@app.route('/json', methods=['GET'])
def json_out():
directory = results()
results_ = []
for d in directory:
result = {}
result['Location'] = '({},{})'.format(d.latitude, d.longitude)
result['Time'] = d.device_timestamp.strftime('%c')
result['Device'] = d.device_id
results_.append(result)
logging.debug(results_)
return json.dumps(results_)
@app.route('/xml', methods=['GET'])
def xml_out():
directory = results()
out = StringIO.StringIO()
out.write('''''')
out.write('')
for entry in directory:
out.write('\t\n')
out.write('\t\t{} \n'.format(entry.device_timestamp))
out.write('\t\t{} \n'.format(entry.location()))
out.write('\t\t{} \n'.format(entry.device_id))
out.write('\t \n')
out.write(' \n')
return out.getvalue()
@app.route('/csv', methods=['GET'])
def csv_out():
directory = results()
out = StringIO.StringIO()
try:
writer = csv.DictWriter(out, fieldnames = directory[0].keys())
for entry in directory:
# ['device_timestamp',' device_id ','location']
writer.writerow({'device_timestamp' : entry.device_timestamp, 'device_id' : entry.device_id, 'location': entry.location()})
return out.getvalue()
except IndexError, e:
return "No values found"
@app.route('/new/', methods=['POST'])
def new():
device_id = request.args.get('device')
if device_id is None:
device_id = 'test'
latitude = request.args.get('latitude')
longitude = request.args.get('longitude')
timestamp = request.args.get('time')
if timestamp is None:
timestamp = time.time()
timestamp = long(timestamp)
latitude = float(latitude)
longitude = float(longitude)
location = Location_history()
engine = create_engine(u'postgres://pgsql@localhost/Around')
connection = engine.connect()
cmd = 'INSERT INTO history (device_timestamp, device_id, latitude, longitude) VALUES (to_timestamp(:timestamp), :device_id, :latitude, :longitude)'
connection.execute(text(cmd), device_id = device_id, timestamp = timestamp, latitude = latitude, longitude = longitude)
return 'Success!'
if __name__ == '__main__':
logging.basicConfig( level = logging.DEBUG )
app.run()
Ideally, I'd like to make the device_id part of the url and remove the unnecessary code. Also would like to use postgis instead of using a view, but this was a quick and dirty implementation and the functionality will probably be heavier on the client side.
No comments:
Post a Comment