0
0

compton_filter.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta, timezone
  7. from typing import Tuple, List, Dict, Union, Optional
  8. import warnings
  9. import logging
  10. import psycopg2
  11. from psycopg2.extras import execute_values
  12. class PostgreSQLHandler():
  13. """A common class for processing postgresql databases
  14. """
  15. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  16. """
  17. Parameters
  18. ----------
  19. host : str
  20. host name (default is "cmddb")
  21. database : str
  22. database name (default is "slowdb")
  23. user : str
  24. username (default is None)
  25. password : str
  26. password (default is None)
  27. """
  28. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  29. self.cur = self.conn.cursor()
  30. logging.info("PostgreSQL Hander created")
  31. @property
  32. def list_tables(self) -> List[str]:
  33. """Returns list of existed tables in the compton measurements slowDB
  34. Returns
  35. -------
  36. List[str]
  37. list of tables
  38. """
  39. logging.info("Get list of the slowdb tables")
  40. self.cur.execute("""
  41. SELECT table_name FROM information_schema.tables
  42. WHERE table_schema = 'public'
  43. """)
  44. return list(map(lambda x: x[0], self.cur.fetchall()))
  45. class SlowdbComptonHandler(PostgreSQLHandler):
  46. """A class for processing and filtering of compton measurements from slowdb
  47. """
  48. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  49. gap = timedelta(seconds=2)
  50. if(start_time_next < stop_time_prev):
  51. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  52. return start_time_next < stop_time_prev - gap
  53. def __drop_overlapping_rows_list(self, table: list) -> list:
  54. """Removes rows with overlapping time intervals from the table
  55. Parameters
  56. ----------
  57. table : list
  58. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  59. Returns
  60. -------
  61. list
  62. clear table
  63. """
  64. if len(table) == 0:
  65. logging.info("Empty list. No overlapping rows")
  66. return table
  67. logging.info("Drop overlapping rows in list representation")
  68. table = table[::-1] # sort table by time from last to past
  69. min_time = table[0][6]
  70. overlapped_idxs = list()
  71. for idx, row in enumerate(table):
  72. start_time, stop_time = row[5], row[6]
  73. if self.__is_overlapped_row(min_time, stop_time):
  74. overlapped_idxs.append(idx)
  75. else:
  76. min_time = start_time
  77. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  78. table.pop(index)
  79. return table[::-1]
  80. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  81. """Returns tables containing compton energy measurements
  82. Parameters
  83. ----------
  84. tables : List[str]
  85. names of tables in the slowdb compton measurements database
  86. (full list of available tables can be seen with the property tables)
  87. daterange : Optional[datetime]
  88. minimum time for selection (should contain timezone)
  89. Returns
  90. -------
  91. Union[pd.DataFrame, list]
  92. table containing compton energy measurements with fields:
  93. write_time - time when the row was written (contains timezone)
  94. mean_energy - compton mean of the energy measurement [MeV]
  95. std_energy - compton std of the energy measurement [MeV]
  96. mean_spread - compton mean of the spread measurement [MeV]
  97. std_spread - compton std of the spread measurement [MeV]
  98. start_time - beginning time of the compton measurement (contains timezone)
  99. end_time - end time of the compton measurement (contains timezone)
  100. """
  101. time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
  102. sql_query = lambda table: f"""SELECT
  103. time AS time,
  104. CAST(values_array[1] AS numeric) AS mean_energy,
  105. CAST(values_array[2] AS numeric) AS std_energy,
  106. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  107. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  108. date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
  109. date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  110. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  111. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  112. logging.debug(f"Full sql query {full_sql_query}")
  113. self.cur.execute(full_sql_query, {'date': daterange})
  114. table = self.cur.fetchall()
  115. table = self.__drop_overlapping_rows_list(table)
  116. return table
  117. class CalibrdbHandler(PostgreSQLHandler):
  118. """A class for processing of calibration database
  119. """
  120. def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
  121. """Selects the table from database
  122. Parameters
  123. ----------
  124. system : str
  125. name of the system
  126. algo : str
  127. name of the algorithm
  128. name : str
  129. name of the calibration
  130. version : str
  131. name of the calibration version (default is Default)
  132. Returns
  133. -------
  134. sid : int
  135. value corresponding the table
  136. """
  137. self.cur.execute(f"""SELECT * FROM clbrset
  138. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  139. result = self.cur.fetchall()
  140. logging.debug(f"selected clbrset: {result}")
  141. if len(result) > 1:
  142. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  143. return result[0]
  144. sid = result[0][0]
  145. return sid
  146. def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
  147. num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
  148. return_timezone: bool = False) -> Tuple[list, list]:
  149. """Loads the calibration table
  150. Parameters
  151. ----------
  152. system : str
  153. name of the system
  154. algo : str
  155. name of the algorithm
  156. name : str
  157. name of the calibration
  158. version : str
  159. name of the calibration version (default is Default)
  160. num_last_rows : Optional[int]
  161. the number of last rows of the table
  162. timerange : Optional[Tuple[datetime, datetime]]
  163. time range condition on the selection of the table (default is None)
  164. return_timezone : bool
  165. return timezone in output datetimes as a field or not (default is False)
  166. Returns
  167. -------
  168. Tuple[list, list]
  169. the calibration table and name of fields
  170. """
  171. sid = self.select_table(system, algo, name, version)
  172. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  173. tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
  174. sql_query = f"""SELECT
  175. cid, sid, createdby,
  176. time {tzone} AS time,
  177. begintime {tzone} AS begintime,
  178. endtime {tzone} AS endtime,
  179. comment, parameters, data
  180. FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  181. if num_last_rows is not None:
  182. sql_query += f"LIMIT {num_last_rows}"
  183. if timerange is None:
  184. self.cur.execute(sql_query)
  185. else:
  186. self.cur.execute(sql_query, timerange)
  187. fields_name = [i[0] for i in self.cur.description]
  188. table = self.cur.fetchall()
  189. return table, fields_name
  190. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
  191. name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
  192. if len(new_rows) == 0:
  193. return
  194. sid = self.select_table(system, algo, name, version)
  195. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  196. if handle_last_time_row:
  197. last_written_row, _ = self.load_table(system, algo, name, version, num_last_rows = 1, return_timezone = True)
  198. if len(last_written_row) > 0:
  199. if last_written_row[0][5] > new_rows[0][3]:
  200. logging.info('Removing of overlapping written row')
  201. self.delete_row(sid = last_written_row[0][1], createdby = last_written_row[0][2], time = last_written_row[0][3])
  202. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  203. execute_values(self.cur, insert_query, new_rows, fetch=False)
  204. logging.info(f"Inserted {len(new_rows)} new rows")
  205. return
  206. def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
  207. update: bool = True, comment: Optional[str] = None):
  208. """Insert new_rows in the table
  209. Parameters
  210. ----------
  211. new_rows : list
  212. list of new rows in the follwing format
  213. update : bool
  214. update current calibration
  215. comment : Optional[str]
  216. common comment field
  217. """
  218. sid = self.select_table(system, algo, name, version)
  219. if update:
  220. update_query = f"""UPDATE clbrdata
  221. SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
  222. WHERE sid = %(sid)s AND comment = %(comment)s
  223. """
  224. for x in new_rows:
  225. season_point = (comment if comment is not None else '') + '_' + str(x[3])
  226. dict_row = {
  227. 'sid': sid,
  228. 'createdby': 'lxeuser',
  229. 'time': x[0],
  230. 'begintime': x[1],
  231. 'endtime': x[2],
  232. 'comment': season_point,
  233. 'data': x[3:],
  234. }
  235. self.cur.execute(update_query, dict_row)
  236. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
  237. comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[3])}'
  238. insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
  239. execute_values(self.cur, insert_query, insert_rows, fetch=False)
  240. drop_query = f"""
  241. DELETE FROM clbrdata a
  242. USING clbrdata b
  243. WHERE
  244. a.sid = {sid}
  245. AND a.cid > b.cid
  246. AND a.sid = b.sid
  247. AND a.comment = b.comment
  248. """
  249. self.cur.execute(drop_query)
  250. logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
  251. return
  252. def clear_table(self, sid: int, createdby: str):
  253. delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
  254. logging.info(f"Clear ({sid}, {createdby}) table")
  255. self.cur.execute(delete_query, (sid, createdby))
  256. return
  257. def delete_row(self, sid: int, createdby: str, time: datetime):
  258. delete_query = f"""DELETE FROM clbrdata
  259. WHERE sid = %s AND createdby = %s AND time = %s
  260. """
  261. self.cur.execute(delete_query, (sid, createdby, time))
  262. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  263. return
  264. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
  265. sid = self.select_table(system, algo, name, version)
  266. keep_rule = ''
  267. if keep == 'last':
  268. keep_rule = '<'
  269. elif keep == 'first':
  270. keep_rule = '>'
  271. else:
  272. raise ValueError("keep argument must be 'last' or 'first'")
  273. remove_query = f"""
  274. DELETE FROM clbrdata a
  275. USING clbrdata b
  276. WHERE
  277. a.sid = {sid}
  278. AND a.cid {keep_rule} b.cid
  279. AND a.sid = b.sid
  280. AND a.time = b.time
  281. """
  282. self.cur.execute(remove_query)
  283. pass
  284. def commit(self):
  285. logging.info("Changes commited")
  286. self.conn.commit()
  287. return
  288. def rollback(self):
  289. logging.info("Changes aborted")
  290. self.conn.rollback()
  291. return
  292. def __del__(self):
  293. logging.info("del clbr class")
  294. self.cur.close()
  295. self.conn.close()
  296. def main():
  297. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  298. logging.basicConfig(filename="compton_filter.log", format=log_format, level=logging.INFO)
  299. logging.info("Program started")
  300. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  301. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  302. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  303. parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
  304. args = parser.parse_args()
  305. logging.info(f"Arguments: season: {args.season}, config {args.config}, update {args.update}")
  306. parser = ConfigParser()
  307. parser.read(args.config);
  308. logging.info("Config parsed")
  309. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  310. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
  311. last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
  312. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  313. res = compton_slowdb.load_tables([args.season], last_time)
  314. clbrdb.update(res, handle_last_time_row = args.update)
  315. clbrdb.commit()
  316. del clbrdb
  317. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
  318. if __name__ == "__main__":
  319. main()