Эх сурвалжийг харах

Initial commit of openaq_ingestion. A small package to ingest a
OpenAQ SQS queue.

Aurelien 6 жил өмнө
commit
039686bb22

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+__pycache__
+settings.yaml

+ 21 - 0
README.md

@@ -0,0 +1,21 @@
+# Package to help import the openaq SQS queue in RDS.
+
+## Configuration
+
+Copy `settings.example.yaml` to `settings.yaml` and set a valid database uri for storage.
+Set up AWS credentials in `~/.aws/` by using for example `aws configure` (if you have the AWS CLI installed).
+This package expects an SQS queue named `openaq` in the `eu-west-1` region.
+
+## Usage
+
+Two tooling entry points are defined:
+
+ - `initdb`: initialize the database with the required table for openaq measurements.
+ - `processqueue`: a utility to ingest the messages of the openaq queue:
+
+    Usage: processqueue [OPTIONS]
+    
+    Options:
+      --amount INTEGER  Amount of messages to fetch.
+      --delete BOOLEAN  Delete messages from the queue after successful storage.
+      --help            Show this message and exit.

+ 0 - 0
openaq_ingestion/__init__.py


+ 144 - 0
openaq_ingestion/openaq_ingestion.py

@@ -0,0 +1,144 @@
+import boto3
+import click
+import json
+
+from datetime import datetime
+from dynaconf import settings
+from sqlalchemy import create_engine, Table, MetaData, Column, Float, String, \
+    Integer, UniqueConstraint
+from sqlalchemy.sql import insert, select
+
+
+__ENGINE = None
+
+
+def _get_engine():
+    """
+    Get the sqlalchemy engine
+    """
+    global __ENGINE
+    if __ENGINE is None:
+        __ENGINE = create_engine(settings.get('database_uri'))
+    return __ENGINE
+
+
+def _initialize_db():
+    """
+    Initialize the database
+    """
+    engine = _get_engine()
+    meta = MetaData()
+    Table(
+        'openaq_measurements', meta,
+        Column('id', Integer, primary_key=True),
+        Column('message_id', String),
+        Column('time_received', Integer),
+        Column('time_measurement', Integer),
+        Column('location', String),
+        Column('averaging_hours', Float),
+        Column('parameter', String),
+        Column('value_ug_m3', Float),
+        UniqueConstraint('message_id', name='unique_msgid')
+    )
+    meta.create_all(engine)
+
+
+def _get_averaging_hours(period):
+    """
+    Return the averaging period in hours
+    """
+    period_time = period['value']
+    period_unit = period['unit']
+    if period_unit == 'hours':
+        return period_time
+    raise ValueError(f'Invalid period unit encountered: {period_unit}')
+
+
+def _get_value(message):
+    """
+    Return the measurement value in message
+    """
+    measurement_unit = message['unit']
+    measurement = message['value']
+    if measurement_unit == "µg/m³":
+        return measurement
+    raise ValueError(f'Invalid measurement unit: {measurement_unit}')
+
+
+def _parse_date(strdate):
+    """
+    Parse date in ISO 8601 format to unix timestamp
+    """
+    return int(datetime.strptime(strdate, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
+
+
+def store(message):
+    """
+    Store the message in the database.
+    """
+    engine = _get_engine()
+    i = insert(Table('openaq_measurements', MetaData(), autoload=True,
+                     autoload_with=engine))
+    content = json.loads(message['Message'])
+    statement = i.values({
+        'message_id': message['MessageId'],
+        'time_received': _parse_date(message['Timestamp']),
+        'time_measurement': _parse_date(content['date']['utc']),
+        'location': content['location'],
+        'averaging_hours': _get_averaging_hours(
+            content['averagingPeriod']
+        ),
+        'parameter': content['parameter'],
+        'value_ug_m3': _get_value(content)
+    })
+    engine.execute(statement)
+
+
+def get(message_id):
+    """
+    Get the message with id message_id.
+    """
+    engine = _get_engine()
+    table = Table('openaq_measurements', MetaData(), autoload=True,
+                  autoload_with=engine)
+    s = select([table]).where(table.c.message_id == message_id)
+    return engine.execute(s).fetchall()
+
+
+def process_queue(amount=10, delete=False):
+    """
+    Process messages in the queue
+    """
+    # Get the service resource
+    sqs = boto3.resource('sqs', region_name='eu-west-1')
+
+    # Get the queue
+    queue = sqs.get_queue_by_name(QueueName='openaq')
+
+    msgs = queue.receive_messages(MaxNumberOfMessages=10)
+    counter = 10
+    while len(msgs):
+        for msg in msgs:
+            data = json.loads(msg.body)
+            print(f'Treating message {data["MessageId"]}')
+            if not len(get(data['MessageId'])):
+                store(data)
+            if delete:
+                msg.delete()
+        counter += 10
+        if counter > amount:
+            break
+        msgs = queue.receive_messages(MaxNumberOfMessages=10)
+
+
+@click.command()
+@click.option('--amount', default=10, help='Amount of messages to fetch.')
+@click.option('--delete', default=False, type=click.BOOL,
+              help='Delete messages from the queue after successful storage.')
+def run_process_queue(amount, delete):
+    process_queue(amount, delete)
+
+
+@click.command()
+def init_db():
+    _initialize_db()

+ 5 - 0
requirements.txt

@@ -0,0 +1,5 @@
+boto3
+click
+dynaconf
+psycopg2
+sqlalchemy

+ 2 - 0
settings.example.yaml

@@ -0,0 +1,2 @@
+default:
+  database_uri: 'postgresql://postgres:password@my-db.host.com/mydbname'

+ 38 - 0
setup.py

@@ -0,0 +1,38 @@
+
+from setuptools import setup, find_packages
+from os import path
+
+here = path.abspath(path.dirname(__file__))
+
+# Get the long description from the README file
+with open(path.join(here, 'README.md'), encoding='utf-8') as f:
+    long_description = f.read()
+
+setup(
+    name='openaq_ingestion',
+    version='0.0.1',
+    description='Utils for SQS ingestion of openaq data',
+    long_description=long_description,
+    long_description_content_type='text/markdown',
+    author='Aurelien Vermylen',
+    classifiers=[
+        'Development Status :: 3 - Alpha',
+        'Intended Audience :: Developers',
+        'License :: OSI Approved :: MIT License',
+        'Programming Language :: Python :: 3.5',
+        'Programming Language :: Python :: 3.6',
+        'Programming Language :: Python :: 3.7',
+        'Programming Language :: Python :: 3.8',
+    ],
+    packages=['openaq_ingestion'],
+    python_requires='!=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4',
+    install_requires=[],
+    package_data={
+    },
+    entry_points={
+        'console_scripts': [
+            'processqueue=openaq_ingestion.openaq_ingestion:run_process_queue',
+            'initdb=openaq_ingestion.openaq_ingestion:init_db'
+        ],
+    },
+)

+ 24 - 0
test.py

@@ -0,0 +1,24 @@
+import os
+import shutil
+import tempfile
+import unittest
+
+from openaq_ingestion.openaq_ingestion import process_queue, \
+    settings, _initialize_db
+
+
+class TestOpenaqIngestion(unittest.TestCase):
+    """
+    Test openaq_ingestion
+    """
+    def setUp(self):
+        self.test_dir = tempfile.mkdtemp()
+        settings['database_uri'] = 'sqlite:///' + \
+            os.path.join(self.test_dir, 'db.sqlite')
+        _initialize_db()
+
+    def tearDown(self):
+        shutil.rmtree(self.test_dir)
+
+    def test_process_queue(self):
+        process_queue()