compton_filter.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. """
  2. Script to fill calibration database with filtering slowdb compton measurements
  3. """
  4. import argparse
  5. from configparser import ConfigParser
  6. from datetime import datetime, timedelta, timezone
  7. import sys
  8. from typing import Tuple, List, Dict, Union, Optional
  9. import warnings
  10. import logging
  11. try:
  12. import psycopg2
  13. from psycopg2.extras import execute_values
  14. except ImportError:
  15. sys.path = list(filter(lambda x: "python2.7" not in x, sys.path))
  16. import psycopg2
  17. from psycopg2.extras import execute_values
  18. class PostgreSQLHandler():
  19. """A common class for processing postgresql databases
  20. """
  21. def __init__(self, host: str = 'cmddb', database: str = 'slowdb', user: str = None, password: str = None):
  22. """
  23. Parameters
  24. ----------
  25. host : str
  26. host name (default is "cmddb")
  27. database : str
  28. database name (default is "slowdb")
  29. user : str
  30. username (default is None)
  31. password : str
  32. password (default is None)
  33. """
  34. self.conn = psycopg2.connect(host = host, database = database, user = user, password = password)
  35. self.cur = self.conn.cursor()
  36. logging.info("PostgreSQL Handler created")
  37. @property
  38. def list_tables(self) -> List[str]:
  39. """Returns list of existed tables in the compton measurements slowDB
  40. Returns
  41. -------
  42. List[str]
  43. list of tables
  44. """
  45. logging.info("Get list of the slowdb tables")
  46. self.cur.execute("""
  47. SELECT table_name FROM information_schema.tables
  48. WHERE table_schema = 'public'
  49. """)
  50. return list(map(lambda x: x[0], self.cur.fetchall()))
  51. class SlowdbComptonHandler(PostgreSQLHandler):
  52. """A class for processing and filtering of compton measurements from slowdb
  53. """
  54. def __is_overlapped_row(self, start_time_next: datetime, stop_time_prev: datetime):
  55. gap = timedelta(seconds=2)
  56. if(start_time_next < stop_time_prev):
  57. logging.debug(f'time gap {abs(start_time_next - stop_time_prev)}')
  58. return start_time_next < stop_time_prev - gap
  59. def __drop_overlapping_rows_list(self, table: list) -> list:
  60. """Removes rows with overlapping time intervals from the table
  61. Parameters
  62. ----------
  63. table : list
  64. the table MUST BE ORDERED BY TIME where 5th column is start_time, 6th column is end_time
  65. Returns
  66. -------
  67. list
  68. clear table
  69. """
  70. n_rows = len(table)
  71. if n_rows == 0:
  72. logging.info("Empty list. No overlapping rows")
  73. return table
  74. table = table[::-1] # sort table by time from last to past
  75. min_time = table[0][6]
  76. overlapped_idxs = list()
  77. for idx, row in enumerate(table):
  78. start_time, stop_time = row[5], row[6]
  79. if self.__is_overlapped_row(min_time, stop_time):
  80. overlapped_idxs.append(idx)
  81. else:
  82. min_time = start_time
  83. for index in sorted(overlapped_idxs, reverse=True): # strict condition of the backward loop
  84. table.pop(index)
  85. logging.info(f"Drop overlapping rows in list representation. Survived {len(table)} from {n_rows}")
  86. return table[::-1]
  87. def load_tables(self, tables: List[str], daterange: Optional[datetime] = None):
  88. """Returns tables containing compton energy measurements
  89. Parameters
  90. ----------
  91. tables : List[str]
  92. names of tables in the slowdb compton measurements database
  93. (full list of available tables can be seen with the property tables)
  94. daterange : Optional[datetime]
  95. minimum time for selection (should contain timezone)
  96. Returns
  97. -------
  98. Union[pd.DataFrame, list]
  99. table containing compton energy measurements with fields:
  100. write_time - time when the row was written (contains timezone)
  101. mean_energy - compton mean of the energy measurement [MeV]
  102. std_energy - compton std of the energy measurement [MeV]
  103. mean_spread - compton mean of the spread measurement [MeV]
  104. std_spread - compton std of the spread measurement [MeV]
  105. start_time - beginning time of the compton measurement (contains timezone)
  106. end_time - end time of the compton measurement (contains timezone)
  107. """
  108. time_condition = f"AND time>(%(date)s)" if daterange is not None else ""
  109. sql_query = lambda table: f"""SELECT
  110. time AS time,
  111. CAST(values_array[1] AS numeric) AS mean_energy,
  112. CAST(values_array[2] AS numeric) AS std_energy,
  113. ROUND(CAST(values_array[5]/1000 AS numeric), 6) AS mean_spread,
  114. ROUND(CAST(values_array[6]/1000 AS numeric), 6) AS std_spread,
  115. date_trunc('second', time + (values_array[8] * interval '1 second')) AS start_time,
  116. date_trunc('second', time + (values_array[8] * interval '1 second') + (values_array[7] * interval '1 second')) AS stop_time
  117. FROM {table} WHERE g_id=43 AND dt>0 {time_condition}"""
  118. full_sql_query = '\nUNION ALL\n'.join([sql_query(table) for table in tables]) + '\nORDER BY time;'
  119. logging.debug(f"Full sql query {full_sql_query}")
  120. self.cur.execute(full_sql_query, {'date': daterange})
  121. table = self.cur.fetchall()
  122. table = self.__drop_overlapping_rows_list(table)
  123. return table
  124. class CalibrdbHandler(PostgreSQLHandler):
  125. """A class for processing of calibration database
  126. """
  127. def select_table(self, system: str, algo: str, name: str, version: str = 'Default') -> int:
  128. """Selects the table from database
  129. Parameters
  130. ----------
  131. system : str
  132. name of the system
  133. algo : str
  134. name of the algorithm
  135. name : str
  136. name of the calibration
  137. version : str
  138. name of the calibration version (default is Default)
  139. Returns
  140. -------
  141. sid : int
  142. value corresponding the table
  143. """
  144. self.cur.execute(f"""SELECT * FROM clbrset
  145. WHERE system='{system}' AND algo='{algo}' AND name='{name}' AND version='{version}'""")
  146. result = self.cur.fetchall()
  147. logging.debug(f"selected clbrset: {result}")
  148. if len(result) > 1:
  149. logging.warning('Multiple equal calibration sets. clbrset DB problem')
  150. return result[0]
  151. sid = result[0][0]
  152. return sid
  153. def load_table(self, system: str, algo: str, name: str, version: str = 'Default',
  154. num_last_rows: Optional[int] = None, timerange: Optional[Tuple[datetime, datetime]] = None,
  155. return_timezone: bool = False) -> Tuple[list, list]:
  156. """Loads the calibration table
  157. Parameters
  158. ----------
  159. system : str
  160. name of the system
  161. algo : str
  162. name of the algorithm
  163. name : str
  164. name of the calibration
  165. version : str
  166. name of the calibration version (default is Default)
  167. num_last_rows : Optional[int]
  168. the number of last rows of the table
  169. timerange : Optional[Tuple[datetime, datetime]]
  170. time range condition on the selection of the table (default is None)
  171. return_timezone : bool
  172. return timezone in output datetimes as a field or not (default is False)
  173. Returns
  174. -------
  175. Tuple[list, list]
  176. the calibration table and name of fields
  177. """
  178. sid = self.select_table(system, algo, name, version)
  179. time_condition = "AND begintime BETWEEN %s AND %s" if timerange is not None else ""
  180. tzone = "AT TIME ZONE 'ALMST'" if return_timezone else ''
  181. sql_query = f"""SELECT
  182. cid, sid, createdby,
  183. time {tzone} AS time,
  184. begintime {tzone} AS begintime,
  185. endtime {tzone} AS endtime,
  186. comment, parameters, data
  187. FROM clbrdata WHERE sid={sid} {time_condition} ORDER BY time DESC """
  188. if num_last_rows is not None:
  189. sql_query += f"LIMIT {num_last_rows}"
  190. if timerange is None:
  191. self.cur.execute(sql_query)
  192. else:
  193. self.cur.execute(sql_query, timerange)
  194. fields_name = [i[0] for i in self.cur.description]
  195. table = self.cur.fetchall()
  196. return table, fields_name
  197. def update(self, new_rows: list, system: str = "Misc", algo: str = "RunHeader",
  198. name: str = "Compton_run", version: str = 'Default', handle_last_time_row: bool = False):
  199. """Writes new_rows in clbrdb (for raw compton measurements)
  200. Parameters
  201. ----------
  202. new_rows : list
  203. list of the data for writing
  204. handle_last_time_row : bool
  205. (DANGEROUS PLACE - keep default False or don't commit changes if you don't know what you want)
  206. update current values or not: replace all values in interval from min(begintime in new_rows) to max(endtime in new_rows)
  207. """
  208. logging.info(f"Update {system}/{algo}/{name} is running...")
  209. if len(new_rows) == 0:
  210. logging.info("Success. Nothing new.")
  211. return
  212. sid = self.select_table(system, algo, name, version)
  213. new_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[5], x[6], [x[1], x[2], x[3], x[4]]), new_rows))
  214. if handle_last_time_row:
  215. min_new_time, max_new_time = min(map(lambda x: x[3], new_rows)), max(map(lambda x: x[4], new_rows))
  216. self.delete_rows(sid = sid, createdby = 'lxeuser', time = (min_new_time, max_new_time))
  217. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, data) VALUES %s;"""
  218. execute_values(self.cur, insert_query, new_rows, fetch=False)
  219. logging.info(f"Success. Inserted {len(new_rows)} new rows")
  220. return
  221. def insert(self, new_rows: list, system: str, algo: str, name: str, version: str,
  222. update: bool = True, comment: Optional[str] = None):
  223. """Insert new_rows in the table (for average by energy points)
  224. Parameters
  225. ----------
  226. new_rows : list
  227. list of new rows in the follwing format
  228. update : bool
  229. update current calibration
  230. comment : Optional[str]
  231. common comment field
  232. """
  233. sid = self.select_table(system, algo, name, version)
  234. if update:
  235. update_query = f"""UPDATE clbrdata
  236. SET data = %(data)s, createdby = %(createdby)s, time = %(time)s, begintime = %(begintime)s, endtime = %(endtime)s
  237. WHERE sid = %(sid)s AND comment = %(comment)s
  238. """
  239. for x in new_rows:
  240. season_point = (comment if comment is not None else '') + '_' + str(x[4]) + '_' + str(x[3])
  241. dict_row = {
  242. 'sid': sid,
  243. 'createdby': 'lxeuser',
  244. 'time': x[0],
  245. 'begintime': x[1],
  246. 'endtime': x[2],
  247. 'comment': season_point,
  248. 'data': x[3:],
  249. }
  250. self.cur.execute(update_query, dict_row)
  251. insert_query = """INSERT INTO clbrdata (sid, createdby, time, begintime, endtime, comment, data) VALUES %s"""
  252. comment_creator = lambda x: f'{comment if comment is not None else ""}_{str(x[4])}_{str(x[3])}'
  253. insert_rows = list(map(lambda x: (sid, 'lxeuser', x[0], x[1], x[2], comment_creator(x), x[3:]), new_rows))
  254. execute_values(self.cur, insert_query, insert_rows, fetch=False)
  255. drop_query = f"""
  256. DELETE FROM clbrdata a
  257. USING clbrdata b
  258. WHERE
  259. a.sid = {sid}
  260. AND a.cid < b.cid
  261. AND a.sid = b.sid
  262. AND (a.begintime = b.begintime OR a.endtime = b.endtime)
  263. """
  264. #AND a.comment = b.comment
  265. self.cur.execute(drop_query)
  266. logging.info(f"Inserted {len(insert_rows)} rows into table: {system}/{algo}/{name}/{version}")
  267. return
  268. def clear_table(self, sid: int, createdby: str):
  269. delete_query = f"""DELETE FROM clbrdata WHERE sid = %s AND createdby = %s"""
  270. logging.info(f"Clear ({sid}, {createdby}) table")
  271. self.cur.execute(delete_query, (sid, createdby))
  272. return
  273. def delete_row(self, sid: int, createdby: str, time: datetime):
  274. delete_query = f"""DELETE FROM clbrdata
  275. WHERE sid = %s AND createdby = %s AND time = %s
  276. """
  277. self.cur.execute(delete_query, (sid, createdby, time))
  278. logging.info(f"Deleted ({sid}, {createdby}, {time}) row")
  279. return
  280. def delete_rows(self, sid: int, createdby: str, time: Tuple[datetime, datetime]):
  281. delete_query = f"""DELETE FROM clbrdata
  282. WHERE sid = %s AND createdby = %s AND endtime > %s AND begintime < %s
  283. """
  284. self.cur.execute(delete_query, (sid, createdby, time[0], time[1]))
  285. logging.info(f"Deleted ({sid}, {createdby} from {time[0]} to {time[1]}) rows")
  286. return
  287. def remove_duplicates(self, system: str = "Misc", algo: str = "RunHeader", name: str = "Compton_run", version: str = 'Default', keep: str = 'last'):
  288. sid = self.select_table(system, algo, name, version)
  289. keep_rule = ''
  290. if keep == 'last':
  291. keep_rule = '<'
  292. elif keep == 'first':
  293. keep_rule = '>'
  294. else:
  295. raise ValueError("keep argument must be 'last' or 'first'")
  296. remove_query = f"""
  297. DELETE FROM clbrdata a
  298. USING clbrdata b
  299. WHERE
  300. a.sid = {sid}
  301. AND a.cid {keep_rule} b.cid
  302. AND a.sid = b.sid
  303. AND a.time = b.time
  304. """
  305. self.cur.execute(remove_query)
  306. pass
  307. def commit(self):
  308. logging.info("Changes commited")
  309. self.conn.commit()
  310. return
  311. def rollback(self):
  312. logging.info("Changes aborted")
  313. self.conn.rollback()
  314. return
  315. def __del__(self):
  316. logging.info("del clbr class")
  317. self.cur.close()
  318. self.conn.close()
  319. def processing_from_file(path):
  320. """Processes text files (with names like 'vepp2k.edge.txt') that represent compton reanalyses by N.Muchnoi
  321. Parameters
  322. ----------
  323. path : Union[str, list]
  324. path to the file/files, can be mask if str
  325. Returns
  326. -------
  327. List[Tuple[datetime, Decimal, Decimal, Decimal, Decimal, datetime, datetime]]
  328. list of tuples representing a single compton measurement with fields:
  329. writetime, energy_mean, energy_err, spread_mean, spread_err, starttime, stoptime
  330. """
  331. logging.info("Reading from files is running...")
  332. from glob import glob
  333. from decimal import Decimal
  334. from datetime import datetime, timedelta, timezone
  335. if isinstance(path, str):
  336. files = glob(path)
  337. else:
  338. files = path
  339. logging.info(f"Handle {len(files)} files")
  340. rows = []
  341. current_timezone = timedelta(hours=7)
  342. # colnames = ('t', 'dt', 'E', 'dE', 'S', 'dS', 'B', 'dB', 'I', 'dI') # columns names in the text files
  343. def preprocess_row(row):
  344. row = row.strip().split()
  345. timestamp_mean, dt = int(row[0]), int(row[1])
  346. timetamp_to_date = lambda timestamp: (datetime.utcfromtimestamp(timestamp) + current_timezone).astimezone(timezone(current_timezone))
  347. t_start = timetamp_to_date(timestamp_mean - dt)
  348. t_stop = timetamp_to_date(timestamp_mean + dt)
  349. t_write = t_stop
  350. e_mean, de = Decimal(row[2]), Decimal(row[3])
  351. s_mean, ds = Decimal(row[4])*Decimal('0.001'), Decimal(row[5])*Decimal('0.001') # keV to MeV
  352. return (t_write, e_mean, de, s_mean, ds, t_start, t_stop)
  353. for file in files:
  354. with open(file, 'r') as f:
  355. for row in f:
  356. if not(row.startswith('#')):
  357. rows.append(preprocess_row(row))
  358. logging.info(f"Success files reading. {len(rows)} in result.")
  359. return rows
  360. def main():
  361. log_format = '[%(asctime)s] %(levelname)s: %(message)s'
  362. logging.basicConfig(stream=sys.stdout, format=log_format, level=logging.INFO) #"filename=compton_filter.log"
  363. logging.info("Program started")
  364. parser = argparse.ArgumentParser(description = 'Filter compton energy measurements from slowdb')
  365. parser.add_argument('--config', help = 'Config file containing information for access to databases')
  366. parser.add_argument('--season', help = 'Name of compton measurement table from slowdb')
  367. parser.add_argument('--update', action = 'store_true', help = 'Writes only newest values into the db')
  368. parser.add_argument('--files', nargs='*', help = """Mask to the path to the files like vepp2k.edge.txt. It has a higher priority than season.
  369. Update flag will be set as True if using this option.""")
  370. args = parser.parse_args()
  371. if args.files is not None:
  372. args.update = True
  373. logging.info(f"Arguments: config {args.config}, season: {args.season}, update {args.update}, files {args.files}")
  374. parser = ConfigParser()
  375. parser.read(args.config);
  376. logging.info("Config parsed")
  377. clbrdb = CalibrdbHandler(**parser['clbrDB'])
  378. last_written_row, _ = clbrdb.load_table('Misc', 'RunHeader', 'Compton_run', num_last_rows = 1, return_timezone = True)
  379. last_time = last_written_row[0][3] if (len(last_written_row) > 0) and (args.update) else None
  380. if args.files is None:
  381. compton_slowdb = SlowdbComptonHandler(**parser['postgresql'])
  382. res = compton_slowdb.load_tables([args.season], last_time)
  383. else:
  384. res = processing_from_file(args.files)
  385. clbrdb.update(res, handle_last_time_row = args.update)
  386. clbrdb.commit()
  387. del clbrdb
  388. # python scripts/compton_filter.py --season cmd3_2021_2 --config database.ini --update
  389. if __name__ == "__main__":
  390. main()