1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
|
from opendc.util import database, exceptions
class Model(object):
# MUST OVERRIDE IN DERIVED CLASS
JSON_TO_PYTHON_DICT = {
'Model': {
'jsonParameterName': 'python_parameter_name'
}
}
PATH = ''
PATH_PARAMETERS = {}
TABLE_NAME = ''
COLUMNS = []
COLUMNS_PRIMARY_KEY = []
# INITIALIZATION
def __init__(self, **kwargs):
"""Initialize a model from its keyword arguments."""
for name, value in kwargs.items():
setattr(self, name, value)
def __repr__(self):
"""Return a string representation of this object."""
identifiers = []
for attribute in self.COLUMNS_PRIMARY_KEY:
identifiers.append('{} = {}'.format(attribute, getattr(self, attribute)))
return '{} ({})'.format(
self.TABLE_NAME[:-1].title().replace('_', ''),
'; '.join(identifiers)
)
# JSON CONVERSION METHODS
@classmethod
def from_JSON(cls, json_object):
"""Initialize a Model from its JSON object representation."""
parameters = {}
parameter_map = cls.JSON_TO_PYTHON_DICT.values()[0]
for json_name in parameter_map:
python_name = parameter_map[json_name]
if json_name in json_object:
parameters[python_name] = json_object.get(json_name)
else:
parameters[python_name] = None
return cls(**parameters)
def to_JSON(self):
"""Return a JSON-serializable object representation of this Model."""
parameters = {}
parameter_map = self.JSON_TO_PYTHON_DICT.values()[0]
for json_name in parameter_map:
python_name = parameter_map[json_name]
if hasattr(self, python_name):
parameters[json_name] = getattr(self, python_name)
else:
parameters[json_name] = None
return parameters
# API CALL GENERATION
def generate_api_call(self, path_parameters, token):
"""Return a message that can be executed by a Request to recreate this object."""
return {
'id': 0,
'path': self.PATH,
'method': 'POST',
'parameters': {
'body': {
self.JSON_TO_PYTHON_DICT.keys()[0]: self.to_JSON()
},
'path': path_parameters,
'query': {}
},
'token': token
}
# SQL STATEMENT GENERATION METHODS
@classmethod
def _generate_insert_columns_string(cls):
"""Generate a SQLite insertion columns string for this Model"""
return ', '.join(cls.COLUMNS)
@classmethod
def _generate_insert_placeholders_string(cls):
"""Generate a SQLite insertion placeholders string for this Model."""
return ', '.join(['?'] * len(cls.COLUMNS))
@classmethod
def _generate_primary_key_string(cls):
"""Generate the SQLite primary key string for this Model."""
return ' AND '.join(['{} = ?'.format(x) for x in cls.COLUMNS_PRIMARY_KEY])
@classmethod
def _generate_update_columns_string(cls):
"""Generate a SQLite updatable columns string for this Model."""
return ', '.join(
['{} = ?'.format(x) for x in cls.COLUMNS if not x in cls.COLUMNS_PRIMARY_KEY]
)
# SQL TUPLE GENERATION METHODS
def _generate_insert_columns_tuple(self):
"""Generate the tuple of insertion column values for this object."""
value_list = []
for column in self.COLUMNS:
value_list.append(getattr(self, column, None))
return tuple(value_list)
def _generate_primary_key_tuple(self):
"""Generate the tuple of primary key values for this object."""
primary_key_list = []
for column in self.COLUMNS_PRIMARY_KEY:
primary_key_list.append(getattr(self, column, None))
return tuple(primary_key_list)
def _generate_update_columns_tuple(self):
"""Generate the tuple of updatable column values for this object."""
value_list = []
for column in [x for x in self.COLUMNS if not x in self.COLUMNS_PRIMARY_KEY]:
value_list.append(getattr(self, column, None))
return tuple(value_list)
# DATABASE HELPER METHODS
@classmethod
def _from_database(cls, statement, values):
"""Initialize a Model by fetching it from the database."""
parameters = {}
model_from_database = database.fetchone(statement, values)
if model_from_database is None:
return None
for i in range(len(cls.COLUMNS)):
parameters[cls.COLUMNS[i]] = model_from_database[i]
return cls(**parameters)
# PUBLIC DATABASE INTERACTION METHODS
@classmethod
def from_primary_key(cls, primary_key_tuple):
"""Initialize a Model by fetching it by its primary key tuple.
If the primary key does not exist in the database, return a stub.
"""
query = 'SELECT * FROM {} WHERE {}'.format(
cls.TABLE_NAME,
cls._generate_primary_key_string()
)
# Return an instantiation of the Model with values from the row if it exists
model = cls._from_database(query, primary_key_tuple)
if model is not None:
return model
# Return a stub instantiation of the Model with just the primary key if it does not
parameters = {}
for i, column in enumerate(cls.COLUMNS_PRIMARY_KEY):
parameters[column] = primary_key_tuple[i]
return cls(**parameters)
@classmethod
def query(cls, column_name=None, value=None):
"""Return all instances of the Model in the database where column_name = value."""
if column_name is not None and value is not None:
statement = 'SELECT * FROM {} WHERE {} = ?'.format(cls.TABLE_NAME, column_name)
database_models = database.fetchall(statement, (value,))
else:
statement = 'SELECT * FROM {}'.format(cls.TABLE_NAME)
database_models = database.fetchall(statement)
models = []
for database_model in database_models:
parameters = {}
for i, parameter in enumerate(cls.COLUMNS):
parameters[parameter] = database_model[i]
models.append(cls(**parameters))
return models
def delete(self):
"""Delete this Model from the database."""
self.read()
statement = 'DELETE FROM {} WHERE {}'.format(
self.TABLE_NAME,
self._generate_primary_key_string()
)
values = self._generate_primary_key_tuple()
database.execute(statement, values)
def exists(self, column=None):
"""Return True if this Model exists in the database.
Check the primary key by default, or a column if one is specified.
"""
query = """
SELECT EXISTS (
SELECT 1 FROM {}
WHERE {}
LIMIT 1
)
"""
if column is None:
query = query.format(
self.TABLE_NAME,
self._generate_primary_key_string()
)
values = self._generate_primary_key_tuple()
else:
query = query.format(
self.TABLE_NAME,
'{} = ?'.format(column)
)
values = (getattr(self, column),)
return database.fetchone(query, values)[0] == 1
def insert(self):
"""Insert this Model into the database."""
if hasattr(self, 'id'):
self.id = None
self.insert_with_id()
def insert_with_id(self):
"""Insert this Model into the database without removing its id."""
statement = 'INSERT INTO {} ({}) VALUES ({})'.format(
self.TABLE_NAME,
self._generate_insert_columns_string(),
self._generate_insert_placeholders_string()
)
values = self._generate_insert_columns_tuple()
try:
last_row_id = database.execute(statement, values)
except Exception as e:
raise exceptions.ForeignKeyError(e.message)
if 'id' in self.COLUMNS_PRIMARY_KEY:
setattr(self, 'id', last_row_id)
self.read()
def read(self):
"""Read this Model's non-primary key attributes from the database."""
if not self.exists():
raise exceptions.RowNotFoundError(self.TABLE_NAME)
database_model = self.from_primary_key(self._generate_primary_key_tuple())
for attribute in self.COLUMNS:
setattr(self, attribute, getattr(database_model, attribute))
return self
def update(self):
"""Update this Model's non-primary key attributes in the database."""
statement = 'UPDATE {} SET {} WHERE {}'.format(
self.TABLE_NAME,
self._generate_update_columns_string(),
self._generate_primary_key_string()
)
values = self._generate_update_columns_tuple() + self._generate_primary_key_tuple()
try:
database.execute(statement, values)
except Exception as e:
raise exceptions.ForeignKeyError(e.message)
|