compton_filter.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta, timezone
  7. import sys
  8. from typing import Tuple, List, Dict, Union, Optional
  9. import warnings
  10. import logging
  11. import psycopg2
  12. from psycopg2.extras import execute_values
  13. class PostgreSQLHandler():
  14. """A common class for processing postgresql databases
  15. """
  16. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  17. """
  18. Parameters
  19. ----------
  20. host : str
  21. host name (default is "cmddb")
  22. database : str
  23. database name (default is "slowdb")
  24. user : str
  25. username (default is None)
  26. password : str
  27. password (default is None)
  28. """
  29. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  30. self.cur = self.conn.cursor()
  31. logging.info("PostgreSQL Hander created")
  32. @property
  33. def list_tables(self) -> List[str]:
  34. """Returns list of existed tables in the compton measurements slowDB
  35. Returns
  36. -------
  37. List[str]
  38. list of tables
  39. """
  40. logging.info("Get list of the slowdb tables")
  41. self.cur.execute("""
  42. SELECT table_name FROM information_schema.tables
  43. WHERE table_schema = 'public'
  44. """)
  45. return list(map(lambda x: x[0], self.cur.fetchall()))
  46. class SlowdbComptonHandler(PostgreSQLHandler):
  47. """A class for processing and filtering of compton measurements from slowdb
  48. """
  49. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  50. gap = timedelta(seconds=2)
  51. if(start_time_next < stop_time_prev):
  52. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  53. return start_time_next < stop_time_prev - gap
  54. def __drop_overlapping_rows_list(self, table: list) -> list:
  55. """Removes rows with overlapping time intervals from the table
  56. Parameters
  57. ----------
  58. table : list
  59. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  60. Returns
  61. -------
  62. list
  63. clear table
  64. """
  65. if len(table) == 0:
  66. logging.info("Empty list. No overlapping rows")
  67. return table
  68. logging.info("Drop overlapping rows in list representation")
  69. table = table[::-1] # sort table by time from last to past
  70. min_time = table[0][6]
  71. overlapped_idxs = list()
  72. for idx, row in enumerate(table):
  73. start_time, stop_time = row[5], row[6]
  74. if self.__is_overlapped_row(min_time, stop_time):
  75. overlapped_idxs.append(idx)
  76. else:
  77. min_time = start_time
  78. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  79. table.pop(index)
  80. return table[::-1]
  81. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  82. """Returns tables containing compton energy measurements
  83. Parameters
  84. ----------
  85. tables : List[str]
  86. names of tables in the slowdb compton measurements database
  87. (full list of available tables can be seen with the property tables)
  88. daterange : Optional[datetime]
  89. minimum time for selection (should contain timezone)
  90. Returns
  91. -------
  92. Union[pd.DataFrame, list]
  93. table containing compton energy measurements with fields:
  94. write_time - time when the row was written (contains timezone)
  95. mean_energy - compton mean of the energy measurement [MeV]
  96. std_energy - compton std of the energy measurement [MeV]
  97. mean_spread - compton mean of the spread measurement [MeV]
  98. std_spread - compton std of the spread measurement [MeV]
  99. start_time - beginning time of the compton measurement (contains timezone)
  100. end_time - end time of the compton measurement (contains timezone)
  101. """
  102. time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
  103. sql_query = lambda table: f"""SELECT
  104. time AS time,
  105. CAST(values_array[1] AS numeric) AS mean_energy,
  106. CAST(values_array[2] AS numeric) AS std_energy,
  107. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  108. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  109. date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
  110. date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  111. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  112. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  113. logging.debug(f"Full sql query {full_sql_query}")
  114. self.cur.execute(full_sql_query, {'date': daterange})
  115. table = self.cur.fetchall()
  116. table = self.__drop_overlapping_rows_list(table)
  117. return table
  118. class CalibrdbHandler(PostgreSQLHandler):
  119. """A class for processing of calibration database
  120. """
  121. def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
  122. """Selects the table from database
  123. Parameters
  124. ----------
  125. system : str
  126. name of the system
  127. algo : str
  128. name of the algorithm
  129. name : str
  130. name of the calibration
  131. version : str
  132. name of the calibration version (default is Default)
  133. Returns
  134. -------
  135. sid : int
  136. value corresponding the table
  137. """
  138. self.cur.execute(f"""SELECT * FROM clbrset
  139. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  140. result = self.cur.fetchall()
  141. logging.debug(f"selected clbrset: {result}")
  142. if len(result) > 1:
  143. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  144. return result[0]
  145. sid = result[0][0]
  146. return sid
  147. def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
  148. num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
  149. return_timezone: bool = False) -> Tuple[list, list]:
  150. """Loads the calibration table
  151. Parameters
  152. ----------
  153. system : str
  154. name of the system
  155. algo : str
  156. name of the algorithm
  157. name : str
  158. name of the calibration
  159. version : str
  160. name of the calibration version (default is Default)
  161. num_last_rows : Optional[int]
  162. the number of last rows of the table
  163. timerange : Optional[Tuple[datetime, datetime]]
  164. time range condition on the selection of the table (default is None)
  165. return_timezone : bool
  166. return timezone in output datetimes as a field or not (default is False)
  167. Returns
  168. -------
  169. Tuple[list, list]
  170. the calibration table and name of fields
  171. """
  172. sid = self.select_table(system, algo, name, version)
  173. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  174. tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
  175. sql_query = f"""SELECT
  176. cid, sid, createdby,
  177. time {tzone} AS time,
  178. begintime {tzone} AS begintime,
  179. endtime {tzone} AS endtime,
  180. comment, parameters, data
  181. FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  182. if num_last_rows is not None:
  183. sql_query += f"LIMIT {num_last_rows}"
  184. if timerange is None:
  185. self.cur.execute(sql_query)
  186. else:
  187. self.cur.execute(sql_query, timerange)
  188. fields_name = [i[0] for i in self.cur.description]
  189. table = self.cur.fetchall()
  190. return table, fields_name
  191. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
  192. name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = False):
  193. """Writes new_rows in clbrdb
  194. Parameters
  195. ----------
  196. new_rows : list
  197. list of the data for writing
  198. handle_last_time_row : bool
  199. (DANGEROUS PLACE - keep default False or don't commit changes if you don't know what you want)
  200. update current values or not: replace all values in interval from min(begintime in new_rows) to max(endtime in new_rows)
  201. """
  202. if len(new_rows) == 0:
  203. return
  204. sid = self.select_table(system, algo, name, version)
  205. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  206. if handle_last_time_row:
  207. min_new_time, max_new_time = min(map(lambda x: x[3], new_rows)), max(map(lambda x: x[4], new_rows))
  208. self.delete_rows(sid = sid, createdby = 'lxeuser', time = (min_new_time, max_new_time))
  209. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  210. execute_values(self.cur, insert_query, new_rows, fetch=False)
  211. logging.info(f"Inserted {len(new_rows)} new rows")
  212. return
  213. def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
  214. update: bool = True, comment: Optional[str] = None):
  215. """Insert new_rows in the table
  216. Parameters
  217. ----------
  218. new_rows : list
  219. list of new rows in the follwing format
  220. update : bool
  221. update current calibration
  222. comment : Optional[str]
  223. common comment field
  224. """
  225. sid = self.select_table(system, algo, name, version)
  226. if update:
  227. update_query = f"""UPDATE clbrdata
  228. SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
  229. WHERE sid = %(sid)s AND comment = %(comment)s
  230. """
  231. for x in new_rows:
  232. season_point = (comment if comment is not None else '') + '_' + str(x[4]) + '_' + str(x[3])
  233. dict_row = {
  234. 'sid': sid,
  235. 'createdby': 'lxeuser',
  236. 'time': x[0],
  237. 'begintime': x[1],
  238. 'endtime': x[2],
  239. 'comment': season_point,
  240. 'data': x[3:],
  241. }
  242. self.cur.execute(update_query, dict_row)
  243. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
  244. comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[4])}_{str(x[3])}'
  245. insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
  246. execute_values(self.cur, insert_query, insert_rows, fetch=False)
  247. drop_query = f"""
  248. DELETE FROM clbrdata a
  249. USING clbrdata b
  250. WHERE
  251. a.sid = {sid}
  252. AND a.cid < b.cid
  253. AND a.sid = b.sid
  254. AND a.begintime = b.begintime
  255. """
  256. #AND a.comment = b.comment
  257. self.cur.execute(drop_query)
  258. logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
  259. return
  260. def clear_table(self, sid: int, createdby: str):
  261. delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
  262. logging.info(f"Clear ({sid}, {createdby}) table")
  263. self.cur.execute(delete_query, (sid, createdby))
  264. return
  265. def delete_row(self, sid: int, createdby: str, time: datetime):
  266. delete_query = f"""DELETE FROM clbrdata
  267. WHERE sid = %s AND createdby = %s AND time = %s
  268. """
  269. self.cur.execute(delete_query, (sid, createdby, time))
  270. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  271. return
  272. def delete_rows(self, sid: int, createdby: str, time: Tuple[datetime, datetime]):
  273. delete_query = f"""DELETE FROM clbrdata
  274. WHERE sid = %s AND createdby = %s AND endtime > %s AND begintime < %s
  275. """
  276. self.cur.execute(delete_query, (sid, createdby, time[0], time[1]))
  277. logging.info(f"Deleted ({sid}, {createdby} from {time[0]} to {time[1]}) rows")
  278. return
  279. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
  280. sid = self.select_table(system, algo, name, version)
  281. keep_rule = ''
  282. if keep == 'last':
  283. keep_rule = '<'
  284. elif keep == 'first':
  285. keep_rule = '>'
  286. else:
  287. raise ValueError("keep argument must be 'last' or 'first'")
  288. remove_query = f"""
  289. DELETE FROM clbrdata a
  290. USING clbrdata b
  291. WHERE
  292. a.sid = {sid}
  293. AND a.cid {keep_rule} b.cid
  294. AND a.sid = b.sid
  295. AND a.time = b.time
  296. """
  297. self.cur.execute(remove_query)
  298. pass
  299. def commit(self):
  300. logging.info("Changes commited")
  301. self.conn.commit()
  302. return
  303. def rollback(self):
  304. logging.info("Changes aborted")
  305. self.conn.rollback()
  306. return
  307. def __del__(self):
  308. logging.info("del clbr class")
  309. self.cur.close()
  310. self.conn.close()
  311. def main():
  312. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  313. logging.basicConfig(stream=sys.stdout, format=log_format, level=logging.INFO) #"filename=compton_filter.log"
  314. logging.info("Program started")
  315. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  316. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  317. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  318. parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
  319. args = parser.parse_args()
  320. logging.info(f"Arguments: season: {args.season}, config {args.config}, update {args.update}")
  321. parser = ConfigParser()
  322. parser.read(args.config);
  323. logging.info("Config parsed")
  324. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  325. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
  326. last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
  327. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  328. res = compton_slowdb.load_tables([args.season], last_time)
  329. clbrdb.update(res, handle_last_time_row = args.update)
  330. clbrdb.commit()
  331. del clbrdb
  332. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
  333. if __name__ == "__main__":
  334. main()