June 6, 2014

How to Persist your Location

I had 3 different tries at doing this. Earlier this week, I got very big on spring-boot -- I still am, incidentally. Then I tried it, briefly, in PHP, before returning home, to Python, which is resembling Java more and more with every release.

Without further ado:
#!~/.virtualenvs/around-web/bin/python
import binascii
import cStringIO as StringIO
import csv
import json
import logging
import time
from flask import Flask, request, session, redirect, url_for, abort, render_template, flash
from sqlalchemy import create_engine, Column, Integer, Sequence, String, DateTime, Float, BIGINT, Table, MetaData, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import select, expression, text
from sqlalchemy.types import UserDefinedType, _Binary, TypeDecorator

Base = declarative_base()
# Python datatypes

class GisElement(object):
    """Represents a geometry value."""

    def __str__(self):
        return self.desc


class BinaryGisElement(GisElement, expression.FunctionElement):
    """Represents a Geometry value expressed as binary."""

    def __init__(self, data):
        self.data = data
        self.name = 'ST_GeomFromEWKB'
        self.is_literal = True
        expression.FunctionElement.__init__(self, "ST_GeomFromEWKB", data,
                                    type_=Geometry(coerce_="binary"))

    @property
    def desc(self):
        return self.as_hex

    @property
    def as_hex(self):
        return binascii.hexlify(self.data)

class TextualGisElement(GisElement, expression.FunctionElement):
    """Represents a Geometry value expressed as text."""
    name = 'ST_GeomFromText'
    table = None
    def __init__(self, desc, srid=-1):
        self.is_literal = True
        logging.debug('TextualGISElement constructor')
        desc = desc
        expression.FunctionElement.__init__(self, func.ST_GeomFromText, desc, srid,
                                    type_=Geometry)
        

# SQL datatypes.

class Geometry(UserDefinedType):
    """Base PostGIS Geometry column type."""

    name = "GEOMETRY"

    def __init__(self, dimension=None, srid=-1,
                coerce_="text"):
        self.dimension = dimension
        self.srid = srid
        self.coerce = coerce_

    class comparator_factory(UserDefinedType.Comparator):
        """Define custom operations for geometry types."""

        # override the __eq__() operator
        def __eq__(self, other):
            return self.op('~=')(other)

        # add a custom operator
        def intersects(self, other):
            return self.op('&&')(other)

        # any number of GIS operators can be overridden/added here
        # using the techniques above.

    def _coerce_compared_value(self, op, value):
        return self

    def get_col_spec(self):
        return 'Geometry'

    def bind_expression(self, bindvalue):
        if self.coerce == "text":
            return TextualGisElement(bindvalue)
        elif self.coerce == "binary":
            return BinaryGisElement(bindvalue)
        else:
            assert False

    def column_expression(self, col):
        if self.coerce == "text":
            return func.ST_AsText(col, type_=self)
        elif self.coerce == "binary":
            return func.ST_AsBinary(col, type_=self)
        else:
            assert False

    def bind_processor(self, dialect):
        def process(value):
            if isinstance(value, GisElement):
                return value.desc
            else:
                return value
        return process

    def result_processor(self, dialect, coltype):
        if self.coerce == "text":
            fac = TextualGisElement
        elif self.coerce == "binary":
            fac = BinaryGisElement
        else:
            assert False
        def process(value):
            if value is not None:
                return fac(value)
            else:
                return value
        return process

    def adapt(self, impltype):
        return impltype(dimension=self.dimension,
                srid=self.srid, coerce_=self.coerce)

# SQL datatypes.

class Geometry(UserDefinedType):
    """Base PostGIS Geometry column type."""

    def __init__(self, dimension=None, srid=-1,
                coerce_="text"):
        self.dimension = dimension
        self.srid = srid
        self.name = 'GEOMETRY'
        self.coerce = coerce_

    class comparator_factory(UserDefinedType.Comparator):
        """Define custom operations for geometry types."""

        # override the __eq__() operator
        def __eq__(self, other):
            return self.op('~=')(other)

        # add a custom operator
        def intersects(self, other):
            return self.op('&&')(other)

        # any number of GIS operators can be overridden/added here
        # using the techniques above.

    def _coerce_compared_value(self, op, value):
        return self

    def get_col_spec(self):
        return self.coerce

    def bind_expression(self, bindvalue):
        if self.coerce == "text":
            return TextualGisElement(bindvalue)
        elif self.coerce == "binary":
            return BinaryGisElement(bindvalue)
        else:
            assert False

    def bind_processor(self, dialect):
        def process(value):
            if isinstance(value, GisElement):
                return value.desc
            else:
                return value
        return process

    def result_processor(self, dialect, coltype):
        if self.coerce == "text":
            fac = TextualGisElement
        elif self.coerce == "binary":
            fac = BinaryGisElement
        else:
            assert False
        def process(value):
            if value is not None:
                return fac(value)
            else:
                return value
        return process

    def adapt(self, impltype):
        return impltype(dimension=self.dimension,
                srid=self.srid, coerce_=self.coerce)

# other datatypes can be added as needed.

class Point(Geometry):
    name = 'POINT'

# DDL integration
# Postgis historically has required AddGeometryColumn/DropGeometryColumn
# and other management methods in order to create Postgis columns.  Newer
# versions don't appear to require these special steps anymore.  However,
# here we illustrate how to set up these features in any case.

def setup_ddl_events():
    @event.listens_for(Table, "before_create")
    def before_create(target, connection, **kw):
        dispatch("before-create", target, connection)

    @event.listens_for(Table, "after_create")
    def after_create(target, connection, **kw):
        dispatch("after-create", target, connection)

    @event.listens_for(Table, "before_drop")
    def before_drop(target, connection, **kw):
        dispatch("before-drop", target, connection)

    @event.listens_for(Table, "after_drop")
    def after_drop(target, connection, **kw):
        dispatch("after-drop", target, connection)

    def dispatch(event, table, bind):
        if event in ('before-create', 'before-drop'):
            regular_cols = [c for c in table.c if not
                                    isinstance(c.type, Geometry)]
            gis_cols = set(table.c).difference(regular_cols)
            table.info["_saved_columns"] = table.c

            # temporarily patch a set of columns not including the
            # Geometry columns
            table.columns = expression.ColumnCollection(*regular_cols)

            if event == 'before-drop':
                for c in gis_cols:
                    bind.execute(
                            select([
                                func.DropGeometryColumn(
                                    'public', table.name, c.name)],
                                    autocommit=True)
                            )

        elif event == 'after-create':
            table.columns = table.info.pop('_saved_columns')
            for c in table.c:
                if isinstance(c.type, Geometry):
                    bind.execute(
                            select([
                                    func.AddGeometryColumn(
                                        table.name, c.name,
                                        c.type.srid,
                                        c.type.name,
                                        c.type.dimension)],
                                autocommit=True)
                        )
        elif event == 'after-drop':
            table.columns = table.info.pop('_saved_columns')


metadata = MetaData()

class Location_history(Base):# Table('history', metadata, Column('history_id', Integer, primary_key = True), Column('device_id', String), Column('device_timestamp', DateTime), Column('location', Geometry))
    __tablename__ = 'history'
    history_id = Column(Integer, primary_key = True)
    device_id = Column(String)
    device_timestamp = Column(DateTime)
    latitude = Column(Float)
    longitude = Column(Float)

    def location(self):
        return ' ({}, {})'.format(self.latitude, self.longitude)

    def __str__(self):
        return '%s @ %s at timestamp %s'.format(self.device_id, self.location(), self.device_timestamp)

    def keys(self):
        return ['device_timestamp','device_id','location']

app = Flask(__name__)

def results():
    Session = sessionmaker()
    engine = create_engine(u'postgres://pgsql@localhost/Around')
    Session.configure(bind = engine)
    session = Session()
    rows = session.query(Location_history).all()
    return rows

@app.route('/json', methods=['GET'])
def json_out():
    directory = results()
    results_ = []
    for d in directory:
        result = {}
        result['Location'] = '({},{})'.format(d.latitude, d.longitude)
        result['Time'] = d.device_timestamp.strftime('%c')
        result['Device'] = d.device_id
        results_.append(result)
    logging.debug(results_)
    return json.dumps(results_)

@app.route('/xml', methods=['GET'])
def xml_out():
    directory = results()
    out = StringIO.StringIO()
    out.write('''''')
    out.write('')
    for entry in directory:
        out.write('\t\n')
        out.write('\t\t{}\n'.format(entry.device_timestamp))
        out.write('\t\t{}\n'.format(entry.location()))
        out.write('\t\t{}\n'.format(entry.device_id))
        out.write('\t\n')
    out.write('\n')
    return out.getvalue()

@app.route('/csv', methods=['GET'])
def csv_out():
    directory = results()
    out = StringIO.StringIO()
    try:
        writer = csv.DictWriter(out, fieldnames = directory[0].keys())
        for entry in directory:
            # ['device_timestamp',' device_id ','location']
            writer.writerow({'device_timestamp' : entry.device_timestamp, 'device_id' : entry.device_id, 'location': entry.location()})
        return out.getvalue()
    except IndexError, e:
        return "No values found"

@app.route('/new/', methods=['POST'])
def new():
    device_id = request.args.get('device')
    if device_id is None:
        device_id = 'test'
    latitude = request.args.get('latitude')
    longitude = request.args.get('longitude')
    timestamp = request.args.get('time')
    if timestamp is None:
        timestamp = time.time()
    timestamp = long(timestamp)
    latitude = float(latitude)
    longitude = float(longitude)
    location = Location_history()

    engine = create_engine(u'postgres://pgsql@localhost/Around')
    connection = engine.connect()
    cmd = 'INSERT INTO history (device_timestamp, device_id, latitude, longitude) VALUES (to_timestamp(:timestamp), :device_id, :latitude, :longitude)'
    connection.execute(text(cmd), device_id = device_id, timestamp = timestamp, latitude = latitude, longitude = longitude)
    return 'Success!'

if __name__ == '__main__':
    logging.basicConfig( level = logging.DEBUG )
    app.run()
Ideally, I'd like to make the device_id part of the url and remove the unnecessary code. Also would like to use postgis instead of using a view, but this was a quick and dirty implementation and the functionality will probably be heavier on the client side.

No comments:

Post a Comment