0
0

compton_filter.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta, timezone
  7. from typing import Tuple, List, Dict, Union, Optional
  8. import warnings
  9. import logging
  10. import psycopg2
  11. from psycopg2.extras import execute_values
  12. class PostgreSQLHandler():
  13. """A common class for processing postgresql databases
  14. """
  15. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  16. """
  17. Parameters
  18. ----------
  19. host : str
  20. host name (default is "cmddb")
  21. database : str
  22. database name (default is "slowdb")
  23. user : str
  24. username (default is None)
  25. password : str
  26. password (default is None)
  27. """
  28. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  29. self.cur = self.conn.cursor()
  30. logging.info("PostgreSQL Hander created")
  31. @property
  32. def list_tables(self) -> List[str]:
  33. """Returns list of existed tables in the compton measurements slowDB
  34. Returns
  35. -------
  36. List[str]
  37. list of tables
  38. """
  39. logging.info("Get list of the slowdb tables")
  40. self.cur.execute("""
  41. SELECT table_name FROM information_schema.tables
  42. WHERE table_schema = 'public'
  43. """)
  44. return list(map(lambda x: x[0], self.cur.fetchall()))
  45. def table_columns(self, table: str) -> List[str]:
  46. """Returns list of the fields of the table
  47. Parameters
  48. ----------
  49. table : str
  50. table from slowdb compton measurements
  51. Returns
  52. -------
  53. List[str]
  54. list of the fields
  55. """
  56. pass
  57. class SlowdbComptonHandler(PostgreSQLHandler):
  58. """A class for processing and filtering of compton measurements from slowdb
  59. """
  60. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  61. gap = timedelta(seconds=2)
  62. if(start_time_next < stop_time_prev):
  63. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  64. return start_time_next < stop_time_prev - gap
  65. def __drop_overlapping_rows_list(self, table: list) -> list:
  66. """Removes rows with overlapping time intervals from the table
  67. Parameters
  68. ----------
  69. table : list
  70. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  71. Returns
  72. -------
  73. list
  74. clear table
  75. """
  76. if len(table) == 0:
  77. logging.info("Empty list. No overlapping rows")
  78. return table
  79. logging.info("Drop overlapping rows in list representation")
  80. table = table[::-1] # sort table by time from last to past
  81. min_time = table[0][6]
  82. overlapped_idxs = list()
  83. for idx, row in enumerate(table):
  84. start_time, stop_time = row[5], row[6]
  85. if self.__is_overlapped_row(min_time, stop_time):
  86. overlapped_idxs.append(idx)
  87. else:
  88. min_time = start_time
  89. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  90. table.pop(index)
  91. return table[::-1]
  92. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  93. """Returns tables containing compton energy measurements
  94. Parameters
  95. ----------
  96. tables : List[str]
  97. names of tables in the slowdb compton measurements database
  98. (full list of available tables can be seen with the property tables)
  99. daterange : Optional[datetime]
  100. minimum time for selection (should contain timezone)
  101. Returns
  102. -------
  103. Union[pd.DataFrame, list]
  104. table containing compton energy measurements with fields:
  105. write_time - time when the row was written (contains timezone)
  106. mean_energy - compton mean of the energy measurement [MeV]
  107. std_energy - compton std of the energy measurement [MeV]
  108. mean_spread - compton mean of the spread measurement [MeV]
  109. std_spread - compton std of the spread measurement [MeV]
  110. start_time - beginning time of the compton measurement (contains timezone)
  111. end_time - end time of the compton measurement (contains timezone)
  112. """
  113. time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
  114. sql_query = lambda table: f"""SELECT
  115. time AS time,
  116. CAST(values_array[1] AS numeric) AS mean_energy,
  117. CAST(values_array[2] AS numeric) AS std_energy,
  118. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  119. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  120. date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
  121. date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  122. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  123. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  124. logging.debug(f"Full sql query {full_sql_query}")
  125. self.cur.execute(full_sql_query, {'date': daterange})
  126. table = self.cur.fetchall()
  127. table = self.__drop_overlapping_rows_list(table)
  128. return table
  129. class CalibrdbHandler(PostgreSQLHandler):
  130. """A class for processing of calibration database
  131. """
  132. def select_table(self, system: str, algo: str, name: str, version: str = 'Default', verbose: bool = True) -> int:
  133. """Selects the table from database
  134. Parameters
  135. ----------
  136. system : str
  137. name of the system
  138. algo : str
  139. name of the algorithm
  140. name : str
  141. name of the calibration
  142. version : str
  143. name of the calibration version (default is Default)
  144. verbose : bool
  145. print additional information or not (default is False)
  146. Returns
  147. -------
  148. sid : int
  149. value corresponding the table
  150. """
  151. self.cur.execute(f"""SELECT * FROM clbrset
  152. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  153. result = self.cur.fetchall()
  154. logging.debug(f"selected clbrset: {result}")
  155. if len(result) > 1:
  156. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  157. return result[0]
  158. sid = result[0][0]
  159. return sid
  160. def load_table(self, system: str, algo: str, name: str, version: str = 'Default', num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None, return_timezone: bool = False) -> Tuple[list, list]:
  161. """Loads the calibration table
  162. Parameters
  163. ----------
  164. system : str
  165. name of the system
  166. algo : str
  167. name of the algorithm
  168. name : str
  169. name of the calibration
  170. version : str
  171. name of the calibration version (default is Default)
  172. num_last_rows : Optional[int]
  173. the number of last rows of the table
  174. timerange : Optional[Tuple[datetime, datetime]]
  175. time range condition on the selection of the table (default is None)
  176. return_timezone : bool
  177. return timezone in output datetimes as a field or not (default is False)
  178. Returns
  179. -------
  180. Tuple[list, list]
  181. the calibration table and name of fields
  182. """
  183. sid = self.select_table(system, algo, name, version)
  184. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  185. tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
  186. sql_query = f"""SELECT
  187. cid, sid, createdby,
  188. time {tzone} AS time,
  189. begintime {tzone} AS begintime,
  190. endtime {tzone} AS endtime,
  191. comment, parameters, data
  192. FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  193. if num_last_rows is not None:
  194. sql_query += f"LIMIT {num_last_rows}"
  195. if timerange is None:
  196. self.cur.execute(sql_query)
  197. else:
  198. self.cur.execute(sql_query, timerange)
  199. fields_name = [i[0] for i in self.cur.description]
  200. table = self.cur.fetchall()
  201. return table, fields_name
  202. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = True):
  203. if len(new_rows) == 0:
  204. return
  205. sid = self.select_table(system, algo, name, version)
  206. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  207. # print(new_rows[0])
  208. if handle_last_time_row:
  209. last_written_row, _ = self.load_table(system, algo, name, version, num_last_rows = 1, return_timezone = True)
  210. if len(last_written_row) > 0:
  211. if last_written_row[0][5] > new_rows[0][3]:
  212. logging.info('Removing of overlapping written row')
  213. self.delete_row(sid = last_written_row[0][1], createdby = last_written_row[0][2], time = last_written_row[0][3])
  214. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  215. execute_values(self.cur, insert_query, new_rows, fetch=False)
  216. logging.info(f"Inserted {len(new_rows)} new rows")
  217. return
  218. def insert(self, new_rows: List[Tuple[str, list]], system: str, algo: str, name: str, version: str, update: bool = True):
  219. """Insert new_rows in the table
  220. Parameters
  221. ----------
  222. new_rows : List[Tuple[datetime, datetime, datetime, list]]
  223. list of new rows (tuples) in the follwing format (comment: str, data: list)
  224. update : bool
  225. update current calibration
  226. """
  227. sid = self.select_table(system, algo, name, version)
  228. time_now, dlt0 = datetime.utcnow(), timedelta(days=5000)
  229. if update:
  230. update_query = f"""UPDATE clbrdata SET endtime = %s
  231. WHERE comment = %s AND endtime > %s AND sid = {sid}
  232. """
  233. for row in new_rows:
  234. season = row[0]
  235. self.cur.execute(update_query, (time_now, season, time_now))
  236. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s;"""
  237. insert_rows = list(map(lambda x: (sid, 'lxeuser', time_now, time_now, time_now + dlt0, x[0], x[1]), new_rows))
  238. execute_values(self.cur, insert_query, insert_rows, fetch=False)
  239. logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
  240. return
  241. def clear_table(self, sid: int, createdby: str):
  242. delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
  243. logging.info(f"Clear ({sid}, {createdby}) table")
  244. self.cur.execute(delete_query, (sid, createdby))
  245. return
  246. def delete_row(self, sid: int, createdby: str, time: datetime):
  247. delete_query = f"""DELETE FROM clbrdata
  248. WHERE sid = %s AND createdby = %s AND time = %s
  249. """
  250. self.cur.execute(delete_query, (sid, createdby, time))
  251. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  252. return
  253. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default'):
  254. sid = self.select_table(system, algo, name, version)
  255. remove_query = f"""
  256. DELETE FROM clbrdata a
  257. USING clbrdata b
  258. WHERE
  259. a.sid = {sid}
  260. AND a.cid < b.cid
  261. AND a.sid = b.sid
  262. AND a.time = b.time
  263. """
  264. self.cur.execute(remove_query)
  265. pass
  266. def commit(self):
  267. logging.info("Changes commited")
  268. self.conn.commit()
  269. return
  270. def rollback(self):
  271. logging.info("Changes aborted")
  272. self.conn.rollback()
  273. return
  274. def __del__(self):
  275. logging.info("del clbr class")
  276. self.cur.close()
  277. self.conn.close()
  278. def main():
  279. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  280. logging.basicConfig(filename="compton_filter.log", format=log_format, level=logging.INFO)
  281. logging.info("Program started")
  282. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  283. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  284. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  285. parser.add_argument('--update', action = 'store_true', help = 'Only update table with new values')
  286. args = parser.parse_args()
  287. logging.info(f"Arguments: season: {args.season}, config {args.config}, update {args.update}")
  288. parser = ConfigParser()
  289. parser.read(args.config);
  290. logging.info("Config parsed")
  291. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  292. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
  293. last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
  294. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  295. res = compton_slowdb.load_tables([args.season], last_time)
  296. clbrdb.update(res, handle_last_time_row = args.update)
  297. clbrdb.commit()
  298. del clbrdb
  299. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
  300. if __name__ == "__main__":
  301. main()