compton_filter.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta, timezone
  7. import sys
  8. from typing import Tuple, List, Dict, Union, Optional
  9. import warnings
  10. import logging
  11. import psycopg2
  12. from psycopg2.extras import execute_values
  13. class PostgreSQLHandler():
  14. """A common class for processing postgresql databases
  15. """
  16. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  17. """
  18. Parameters
  19. ----------
  20. host : str
  21. host name (default is "cmddb")
  22. database : str
  23. database name (default is "slowdb")
  24. user : str
  25. username (default is None)
  26. password : str
  27. password (default is None)
  28. """
  29. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  30. self.cur = self.conn.cursor()
  31. logging.info("PostgreSQL Hander created")
  32. @property
  33. def list_tables(self) -> List[str]:
  34. """Returns list of existed tables in the compton measurements slowDB
  35. Returns
  36. -------
  37. List[str]
  38. list of tables
  39. """
  40. logging.info("Get list of the slowdb tables")
  41. self.cur.execute("""
  42. SELECT table_name FROM information_schema.tables
  43. WHERE table_schema = 'public'
  44. """)
  45. return list(map(lambda x: x[0], self.cur.fetchall()))
  46. class SlowdbComptonHandler(PostgreSQLHandler):
  47. """A class for processing and filtering of compton measurements from slowdb
  48. """
  49. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  50. gap = timedelta(seconds=2)
  51. if(start_time_next < stop_time_prev):
  52. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  53. return start_time_next < stop_time_prev - gap
  54. def __drop_overlapping_rows_list(self, table: list) -> list:
  55. """Removes rows with overlapping time intervals from the table
  56. Parameters
  57. ----------
  58. table : list
  59. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  60. Returns
  61. -------
  62. list
  63. clear table
  64. """
  65. n_rows = len(table)
  66. if n_rows == 0:
  67. logging.info("Empty list. No overlapping rows")
  68. return table
  69. table = table[::-1] # sort table by time from last to past
  70. min_time = table[0][6]
  71. overlapped_idxs = list()
  72. for idx, row in enumerate(table):
  73. start_time, stop_time = row[5], row[6]
  74. if self.__is_overlapped_row(min_time, stop_time):
  75. overlapped_idxs.append(idx)
  76. else:
  77. min_time = start_time
  78. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  79. table.pop(index)
  80. logging.info(f"Drop overlapping rows in list representation. Survived {len(table)} from {n_rows}")
  81. return table[::-1]
  82. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  83. """Returns tables containing compton energy measurements
  84. Parameters
  85. ----------
  86. tables : List[str]
  87. names of tables in the slowdb compton measurements database
  88. (full list of available tables can be seen with the property tables)
  89. daterange : Optional[datetime]
  90. minimum time for selection (should contain timezone)
  91. Returns
  92. -------
  93. Union[pd.DataFrame, list]
  94. table containing compton energy measurements with fields:
  95. write_time - time when the row was written (contains timezone)
  96. mean_energy - compton mean of the energy measurement [MeV]
  97. std_energy - compton std of the energy measurement [MeV]
  98. mean_spread - compton mean of the spread measurement [MeV]
  99. std_spread - compton std of the spread measurement [MeV]
  100. start_time - beginning time of the compton measurement (contains timezone)
  101. end_time - end time of the compton measurement (contains timezone)
  102. """
  103. time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
  104. sql_query = lambda table: f"""SELECT
  105. time AS time,
  106. CAST(values_array[1] AS numeric) AS mean_energy,
  107. CAST(values_array[2] AS numeric) AS std_energy,
  108. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  109. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  110. date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
  111. date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  112. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  113. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  114. logging.debug(f"Full sql query {full_sql_query}")
  115. self.cur.execute(full_sql_query, {'date': daterange})
  116. table = self.cur.fetchall()
  117. table = self.__drop_overlapping_rows_list(table)
  118. return table
  119. class CalibrdbHandler(PostgreSQLHandler):
  120. """A class for processing of calibration database
  121. """
  122. def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
  123. """Selects the table from database
  124. Parameters
  125. ----------
  126. system : str
  127. name of the system
  128. algo : str
  129. name of the algorithm
  130. name : str
  131. name of the calibration
  132. version : str
  133. name of the calibration version (default is Default)
  134. Returns
  135. -------
  136. sid : int
  137. value corresponding the table
  138. """
  139. self.cur.execute(f"""SELECT * FROM clbrset
  140. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  141. result = self.cur.fetchall()
  142. logging.debug(f"selected clbrset: {result}")
  143. if len(result) > 1:
  144. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  145. return result[0]
  146. sid = result[0][0]
  147. return sid
  148. def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
  149. num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
  150. return_timezone: bool = False) -> Tuple[list, list]:
  151. """Loads the calibration table
  152. Parameters
  153. ----------
  154. system : str
  155. name of the system
  156. algo : str
  157. name of the algorithm
  158. name : str
  159. name of the calibration
  160. version : str
  161. name of the calibration version (default is Default)
  162. num_last_rows : Optional[int]
  163. the number of last rows of the table
  164. timerange : Optional[Tuple[datetime, datetime]]
  165. time range condition on the selection of the table (default is None)
  166. return_timezone : bool
  167. return timezone in output datetimes as a field or not (default is False)
  168. Returns
  169. -------
  170. Tuple[list, list]
  171. the calibration table and name of fields
  172. """
  173. sid = self.select_table(system, algo, name, version)
  174. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  175. tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
  176. sql_query = f"""SELECT
  177. cid, sid, createdby,
  178. time {tzone} AS time,
  179. begintime {tzone} AS begintime,
  180. endtime {tzone} AS endtime,
  181. comment, parameters, data
  182. FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  183. if num_last_rows is not None:
  184. sql_query += f"LIMIT {num_last_rows}"
  185. if timerange is None:
  186. self.cur.execute(sql_query)
  187. else:
  188. self.cur.execute(sql_query, timerange)
  189. fields_name = [i[0] for i in self.cur.description]
  190. table = self.cur.fetchall()
  191. return table, fields_name
  192. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
  193. name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = False):
  194. """Writes new_rows in clbrdb
  195. Parameters
  196. ----------
  197. new_rows : list
  198. list of the data for writing
  199. handle_last_time_row : bool
  200. (DANGEROUS PLACE - keep default False or don't commit changes if you don't know what you want)
  201. update current values or not: replace all values in interval from min(begintime in new_rows) to max(endtime in new_rows)
  202. """
  203. if len(new_rows) == 0:
  204. return
  205. sid = self.select_table(system, algo, name, version)
  206. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  207. if handle_last_time_row:
  208. min_new_time, max_new_time = min(map(lambda x: x[3], new_rows)), max(map(lambda x: x[4], new_rows))
  209. self.delete_rows(sid = sid, createdby = 'lxeuser', time = (min_new_time, max_new_time))
  210. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  211. execute_values(self.cur, insert_query, new_rows, fetch=False)
  212. logging.info(f"Inserted {len(new_rows)} new rows")
  213. return
  214. def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
  215. update: bool = True, comment: Optional[str] = None):
  216. """Insert new_rows in the table
  217. Parameters
  218. ----------
  219. new_rows : list
  220. list of new rows in the follwing format
  221. update : bool
  222. update current calibration
  223. comment : Optional[str]
  224. common comment field
  225. """
  226. sid = self.select_table(system, algo, name, version)
  227. if update:
  228. update_query = f"""UPDATE clbrdata
  229. SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
  230. WHERE sid = %(sid)s AND comment = %(comment)s
  231. """
  232. for x in new_rows:
  233. season_point = (comment if comment is not None else '') + '_' + str(x[4]) + '_' + str(x[3])
  234. dict_row = {
  235. 'sid': sid,
  236. 'createdby': 'lxeuser',
  237. 'time': x[0],
  238. 'begintime': x[1],
  239. 'endtime': x[2],
  240. 'comment': season_point,
  241. 'data': x[3:],
  242. }
  243. self.cur.execute(update_query, dict_row)
  244. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
  245. comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[4])}_{str(x[3])}'
  246. insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
  247. execute_values(self.cur, insert_query, insert_rows, fetch=False)
  248. drop_query = f"""
  249. DELETE FROM clbrdata a
  250. USING clbrdata b
  251. WHERE
  252. a.sid = {sid}
  253. AND a.cid < b.cid
  254. AND a.sid = b.sid
  255. AND a.begintime = b.begintime
  256. """
  257. #AND a.comment = b.comment
  258. self.cur.execute(drop_query)
  259. logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
  260. return
  261. def clear_table(self, sid: int, createdby: str):
  262. delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
  263. logging.info(f"Clear ({sid}, {createdby}) table")
  264. self.cur.execute(delete_query, (sid, createdby))
  265. return
  266. def delete_row(self, sid: int, createdby: str, time: datetime):
  267. delete_query = f"""DELETE FROM clbrdata
  268. WHERE sid = %s AND createdby = %s AND time = %s
  269. """
  270. self.cur.execute(delete_query, (sid, createdby, time))
  271. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  272. return
  273. def delete_rows(self, sid: int, createdby: str, time: Tuple[datetime, datetime]):
  274. delete_query = f"""DELETE FROM clbrdata
  275. WHERE sid = %s AND createdby = %s AND endtime > %s AND begintime < %s
  276. """
  277. self.cur.execute(delete_query, (sid, createdby, time[0], time[1]))
  278. logging.info(f"Deleted ({sid}, {createdby} from {time[0]} to {time[1]}) rows")
  279. return
  280. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
  281. sid = self.select_table(system, algo, name, version)
  282. keep_rule = ''
  283. if keep == 'last':
  284. keep_rule = '<'
  285. elif keep == 'first':
  286. keep_rule = '>'
  287. else:
  288. raise ValueError("keep argument must be 'last' or 'first'")
  289. remove_query = f"""
  290. DELETE FROM clbrdata a
  291. USING clbrdata b
  292. WHERE
  293. a.sid = {sid}
  294. AND a.cid {keep_rule} b.cid
  295. AND a.sid = b.sid
  296. AND a.time = b.time
  297. """
  298. self.cur.execute(remove_query)
  299. pass
  300. def commit(self):
  301. logging.info("Changes commited")
  302. self.conn.commit()
  303. return
  304. def rollback(self):
  305. logging.info("Changes aborted")
  306. self.conn.rollback()
  307. return
  308. def __del__(self):
  309. logging.info("del clbr class")
  310. self.cur.close()
  311. self.conn.close()
  312. def main():
  313. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  314. logging.basicConfig(stream=sys.stdout, format=log_format, level=logging.INFO) #"filename=compton_filter.log"
  315. logging.info("Program started")
  316. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  317. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  318. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  319. parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
  320. args = parser.parse_args()
  321. logging.info(f"Arguments: season: {args.season}, config {args.config}, update {args.update}")
  322. parser = ConfigParser()
  323. parser.read(args.config);
  324. logging.info("Config parsed")
  325. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  326. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
  327. last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
  328. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  329. res = compton_slowdb.load_tables([args.season], last_time)
  330. clbrdb.update(res, handle_last_time_row = args.update)
  331. clbrdb.commit()
  332. del clbrdb
  333. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
  334. if __name__ == "__main__":
  335. main()