This is an idea for how to provide secondary index queries, similar to Riak 2i, on top of Amazon S3, using nothing but S3, boto3 and some Python.
This code hasn't been anywhere near a production environment, never benchmarked, only processed trivial amounts of data and tested only against localstack. It's not even commented. As such, it should not be used by anybody for any reason - ever.
If you do give it a try, let me know how it went.
s32i.sh
from concurrent.futures.thread import ThreadPoolExecutor import re from botocore.exceptions import ClientError class S32iDatastore(): __EXECUTOR = ThreadPoolExecutor(max_workers=os. cpu_count() - 1) INDEXES_FOLDER = 'indexes' LIST_OBJECTS = 'list_objects_v2' def __init__(self, s3_resource, bucket_name): self.s3_resource = s3_resource self.bucket_name = bucket_name def __run_in_thread(self, fn, *args): return self.__EXECUTOR.submit(fn, *args) def get(self, key): record = self.s3_resource.Object(self.bucket_name, key).get() indexes = record['Metadata'] data = record['Body'].read() return data, indexes def head(self, key): record = self.s3_resource.meta.client.head_object(Bucket=self.bucket_name, Key=key) return record['Metadata'] def exists(self, key): try: self.head(key) return True except ClientError: return False def put(self, key, data='', indexes={}): self.__run_in_thread(self.create_secondary_indexes, key, indexes) return self.s3_resource.Object(self.bucket_name, key).put( Body=data, Metadata=indexes) def delete(self, key): self.__run_in_thread(self.remove_secondary_indexes, key, self.head(key)) return self.s3_resource.Object(self.bucket_name, key).delete() def create_secondary_indexes(self, key, indexes): for index, values in indexes.items(): for value in values.split(','): self.put(f'{self.INDEXES_FOLDER}/{index}/{value}/{key}') def remove_secondary_indexes(self, key, indexes): for index, values in indexes.items(): for value in values.split(','): self.s3_resource.Object(self.bucket_name, f'{self.INDEXES_FOLDER}/{index}/{value}/{key}').delete() def secondary_index_range_query(self, index, start, end=None, page_size=1000, max_results=10000, term_regex=None, return_terms=False): if end is None: end = start if term_regex: pattern = re.compile(f'^{self.INDEXES_FOLDER}/{index}/{term_regex}$') start_key = f'{self.INDEXES_FOLDER}/{index}/{start}' end_key = f'{self.INDEXES_FOLDER}/{index}/{end}' paginator = self.s3_resource.meta.client.get_paginator(self.LIST_OBJECTS) pages = paginator.paginate( Bucket=self.bucket_name, StartAfter=start_key, PaginationConfig={ 'MaxItems': max_results, 'PageSize': page_size}) for page in pages: for result in page['Contents']: result_key = result['Key'] if result_key[0:len(end_key)] > end_key: return if term_regex and not pattern.match(result_key): continue parts = result_key.split('/') if return_terms: yield (parts[-1], parts[-2]) else: yield parts[-1]
s32i_test.sh
import json import unittest import boto3 from s32i import S32iDatastore class S32iDatastoreTest(unittest.TestCase): LOCALSTACK_ENDPOINT_URL = "http://localhost.localstack.cloud:4566" TEST_BUCKET = 's32idatastore-test-bucket' @classmethod def setUpClass(cls): cls.s3_resource = cls.create_s3_resource() cls.bucket = cls.create_bucket(cls.TEST_BUCKET) cls.datastore = S32iDatastore(cls.s3_resource, cls.TEST_BUCKET) cls.create_test_data() @classmethod def tearDownClass(cls): cls.delete_bucket() @classmethod def create_s3_resource(cls, endpoint_url=LOCALSTACK_ENDPOINT_URL): return boto3.resource( 's3', endpoint_url=endpoint_url) @classmethod def create_bucket(cls, bucket_name): return cls.s3_resource.create_bucket(Bucket=bucket_name) @classmethod def delete_bucket(cls): cls.bucket.objects.all().delete() @classmethod def create_test_data(cls): cls.datastore.put( 'KEY0001', json.dumps({'name': 'Alice', 'dob': '19700101', 'gender': '2'}), {'idx-gender-dob': '2|19700101'}) cls.datastore.put( 'KEY0002', json.dumps({'name': 'Bob', 'dob': '19800101', 'gender': '1'}), {'idx-gender-dob': '1|19800101'}) cls.datastore.put( 'KEY0003', json.dumps({'name': 'Carol', 'dob': '19900101', 'gender': '2'}), {'idx-gender-dob': '2|19900101'}) cls.datastore.put( 'KEY0004', json.dumps({'name': 'Dan', 'dob': '20000101', 'gender': '1'}), {'idx-gender-dob': '1|20000101'}) cls.datastore.put( 'KEY0005', json.dumps({'name': 'Eve', 'dob': '20100101', 'gender': '2'}), {'idx-gender-dob': '2|20100101'}) cls.datastore.put( 'KEY0006', json.dumps({'name': ['Faythe', 'Grace'], 'dob': '20200101', 'gender': '2'}), {'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'}) cls.datastore.put('KEY0007', indexes={'idx-same': 'same'}) cls.datastore.put('KEY0008', indexes={'idx-same': 'same'}) cls.datastore.put('KEY0009', indexes={'idx-same': 'same'}) cls.datastore.put( 'KEY9999', json.dumps({'name': 'DELETE ME', 'dob': '99999999', 'gender': '9'}), {'idx-gender-dob': '9|99999999'}) def test_get_record(self): data, indexes = self.datastore.get('KEY0001') self.assertDictEqual({'name': 'Alice', 'dob': '19700101', 'gender': '2'}, json.loads(data)) self.assertDictEqual({'idx-gender-dob': '2|19700101'}, indexes) def test_head_record(self): indexes = self.datastore.head('KEY0002') self.assertDictEqual({'idx-gender-dob': '1|19800101'}, indexes) def test_2i_no_results(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '3|30100101') self.assertListEqual([], list(keys)) def test_2i_index_does_not_exist(self): keys = self.datastore.secondary_index_range_query('idx-does-not-exist', '3|30100101') self.assertListEqual([], list(keys)) def test_2i_exact_value(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|20100101') self.assertListEqual(['KEY0005'], list(keys)) def test_2i_gender_2(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|') self.assertListEqual(['KEY0001', 'KEY0003', 'KEY0005', 'KEY0006'], sorted(list(keys))) def test_2i_gender_2_max_results_2(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|', max_results=2) self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys))) def test_2i_gender_1_dob_19(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|19') self.assertListEqual(['KEY0002'], list(keys)) def test_2i_gender_2_dob_19(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|19') self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys))) def test_2i_gender_2_dob_1990_2000(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|1990', '2|2000') self.assertListEqual(['KEY0003'], list(keys)) def test_2i_term_regex(self): keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|', '2|', term_regex='[1|2]\|20[1|2]0.*') self.assertListEqual(['KEY0005', 'KEY0006'], list(keys)) def test_2i_return_terms(self): key_terms = self.datastore.secondary_index_range_query( 'idx-gender-dob', '1|', '2|', return_terms=True) self.assertListEqual([ ('KEY0001', '2|19700101'), ('KEY0002', '1|19800101'), ('KEY0003', '2|19900101'), ('KEY0004', '1|20000101'), ('KEY0005', '2|20100101'), ('KEY0006', '2|20200101')], sorted(list(key_terms))) def test_2i_term_regex_return_terms(self): key_terms = self.datastore.secondary_index_range_query( 'idx-gender-dob', '1|', '2|', term_regex='[1|2]\|20[1|2]0.*', return_terms=True) self.assertListEqual([('KEY0005', '2|20100101'), ('KEY0006', '2|20200101')], list(key_terms)) def test_exists(self): self.assertTrue(self.datastore.exists('KEY0001')) self.assertFalse(self.datastore.exists('1000YEK')) def test_multiple_index_values(self): indexes = self.datastore.head('KEY0006') self.assertDictEqual({'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'}, indexes) keys = self.datastore.secondary_index_range_query('idx-name', 'Faythe') self.assertListEqual(['KEY0006'], list(keys)) keys = self.datastore.secondary_index_range_query('idx-name', 'Grace') self.assertListEqual(['KEY0006'], list(keys)) def test_multiple_keys_same_index(self): keys = self.datastore.secondary_index_range_query('idx-same', 'same') self.assertListEqual(['KEY0007', 'KEY0008', 'KEY0009'], sorted(list(keys))) def test_delete(self): self.assertTrue(self.datastore.exists('KEY9999')) keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999') self.assertListEqual(['KEY9999'], list(keys)) self.datastore.delete('KEY9999') self.assertFalse(self.datastore.exists('KEY9999')) keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999') self.assertListEqual([], list(keys))