This textbook was written for the clinical research community at Johns Hopkins leveraging the precision medicine analytics platform (PMAP). These notebooks are available in html form on the Precision Medicine portal as well as in computational form in the CAMP-share folder on Crunchr (Crunchr.pm.jh.edu).
Last Updated : April 1st, 2019 Please go here to submit comments and report problems
Publication Attribution:
In this notebook, we will look at defining patient cohorts by exploring the medications table.
Pandas
Regular Expressions
to define cohorts of patients as defined by Asthma guidelinesSet Operators
to compare patient cohorts#Libraries needed
import sqlalchemy
import urllib.parse
import pandas as pd
import getpass
from SciServer import Authentication
myUserName = Authentication.getKeystoneUserWithToken(Authentication.getToken()).userName
passwd = getpass.getpass('Password for ' + myUserName + ': ')
user = "win\\" + myUserName
#SQL Driver
driver="FreeTDS"
tds_ver="8.0"
# Database
host_ip="jhepicdwdbprd1.WIN.AD.JHU.EDU" # Update this accordingly
db_port="1433"
db="CDW" # Update this accordingly
# Create Connection String
conn_str=("DRIVER={};Server={};PORT={};DATABASE={};UID={};PWD={};TDS_VERSION={}"
.format(driver, host_ip, db_port, db, user, passwd, tds_ver)
)
# Create Engine
engine = sqlalchemy.create_engine('mssql+pyodbc:///?odbc_connect=' +
urllib.parse.quote(conn_str)
)
Pandas is a standard library in Python that allows you to perform data manipulation on datasets. It is a good tool for data exploration, data cleaning, and model building. Here are some useful links to learn more about pandas.
Note we are running an SQL query that brings all the medications which has over 500k rows. This will take a few seconds and the output just displays the first row as an example
Note. If this query doesn't run, make sure you have requested access to the asthma training dataset
query=("SELECT top 10 * from AllergyFact")
df = pd.read_sql_query(query, engine)
df.head(5)
OperationalErrorTraceback (most recent call last) ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2274 try: -> 2275 return fn() 2276 except dialect.dbapi.Error as e: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in connect(self) 362 if not self._use_threadlocal: --> 363 return _ConnectionFairy._checkout(self) 364 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy) 759 if not fairy: --> 760 fairy = _ConnectionRecord.checkout(pool) 761 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in checkout(cls, pool) 491 def checkout(cls, pool): --> 492 rec = pool._do_get() 493 try: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 138 with util.safe_reraise(): --> 139 self._dec_overflow() 140 else: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 67 if not self.warn_only: ---> 68 compat.reraise(exc_type, exc_value, exc_tb) 69 else: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause) 152 raise value.with_traceback(tb) --> 153 raise value 154 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 135 try: --> 136 return self._create_connection() 137 except: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _create_connection(self) 307 --> 308 return _ConnectionRecord(self) 309 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect) 436 if connect: --> 437 self.__connect(first_connect_check=True) 438 self.finalize_callback = deque() ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self, first_connect_check) 638 self.starttime = time.time() --> 639 connection = pool._invoke_creator(self) 640 pool.logger.debug("Created new connection %r", connection) ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py in connect(connection_record) 113 return connection --> 114 return dialect.connect(*cargs, **cparams) 115 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 480 def connect(self, *cargs, **cparams): --> 481 return self.dbapi.connect(*cargs, **cparams) 482 OperationalError: ('08S01', '[08S01] [FreeTDS][SQL Server]Unable to connect: Adaptive Server is unavailable or does not exist (20009) (SQLDriverConnect)') The above exception was the direct cause of the following exception: OperationalErrorTraceback (most recent call last) <ipython-input-3-5866076cbfa9> in <module> 1 query=("SELECT top 10 * from AllergyFact") ----> 2 df = pd.read_sql_query(query, engine) 3 df.head(5) ~/miniconda3/lib/python3.7/site-packages/pandas/io/sql.py in read_sql_query(sql, con, index_col, coerce_float, params, parse_dates, chunksize) 330 coerce_float=coerce_float, 331 parse_dates=parse_dates, --> 332 chunksize=chunksize, 333 ) 334 ~/miniconda3/lib/python3.7/site-packages/pandas/io/sql.py in read_query(self, sql, index_col, coerce_float, parse_dates, params, chunksize) 1216 args = _convert_params(sql, params) 1217 -> 1218 result = self.execute(*args) 1219 columns = result.keys() 1220 ~/miniconda3/lib/python3.7/site-packages/pandas/io/sql.py in execute(self, *args, **kwargs) 1085 def execute(self, *args, **kwargs): 1086 """Simple passthrough to SQLAlchemy connectable""" -> 1087 return self.connectable.execute(*args, **kwargs) 1088 1089 def read_table( ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in execute(self, statement, *multiparams, **params) 2176 """ 2177 -> 2178 connection = self._contextual_connect(close_with_result=True) 2179 return connection.execute(statement, *multiparams, **params) 2180 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _contextual_connect(self, close_with_result, **kwargs) 2237 return self._connection_cls( 2238 self, -> 2239 self._wrap_pool_connect(self.pool.connect, None), 2240 close_with_result=close_with_result, 2241 **kwargs ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2277 if connection is None: 2278 Connection._handle_dbapi_exception_noconnection( -> 2279 e, dialect, self 2280 ) 2281 else: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine) 1542 util.raise_from_cause(newraise, exc_info) 1543 elif should_wrap: -> 1544 util.raise_from_cause(sqlalchemy_exception, exc_info) 1545 else: 1546 util.reraise(*exc_info) ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info) 396 exc_type, exc_value, exc_tb = exc_info 397 cause = exc_value if exc_value is not exception else None --> 398 reraise(type(exception), exception, tb=exc_tb, cause=cause) 399 400 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause) 150 value.__cause__ = cause 151 if value.__traceback__ is not tb: --> 152 raise value.with_traceback(tb) 153 raise value 154 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2273 dialect = self.dialect 2274 try: -> 2275 return fn() 2276 except dialect.dbapi.Error as e: 2277 if connection is None: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in connect(self) 361 """ 362 if not self._use_threadlocal: --> 363 return _ConnectionFairy._checkout(self) 364 365 try: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy) 758 def _checkout(cls, pool, threadconns=None, fairy=None): 759 if not fairy: --> 760 fairy = _ConnectionRecord.checkout(pool) 761 762 fairy._pool = pool ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in checkout(cls, pool) 490 @classmethod 491 def checkout(cls, pool): --> 492 rec = pool._do_get() 493 try: 494 dbapi_connection = rec.get_connection() ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 137 except: 138 with util.safe_reraise(): --> 139 self._dec_overflow() 140 else: 141 return self._do_get() ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 66 self._exc_info = None # remove potential circular references 67 if not self.warn_only: ---> 68 compat.reraise(exc_type, exc_value, exc_tb) 69 else: 70 if not compat.py3k and self._exc_info and self._exc_info[1]: ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause) 151 if value.__traceback__ is not tb: 152 raise value.with_traceback(tb) --> 153 raise value 154 155 def u(s): ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 134 if self._inc_overflow(): 135 try: --> 136 return self._create_connection() 137 except: 138 with util.safe_reraise(): ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _create_connection(self) 306 """Called by subclasses to create a new ConnectionRecord.""" 307 --> 308 return _ConnectionRecord(self) 309 310 def _invalidate(self, connection, exception=None, _checkin=True): ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect) 435 self.__pool = pool 436 if connect: --> 437 self.__connect(first_connect_check=True) 438 self.finalize_callback = deque() 439 ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self, first_connect_check) 637 try: 638 self.starttime = time.time() --> 639 connection = pool._invoke_creator(self) 640 pool.logger.debug("Created new connection %r", connection) 641 self.connection = connection ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py in connect(connection_record) 112 if connection is not None: 113 return connection --> 114 return dialect.connect(*cargs, **cparams) 115 116 creator = pop_kwarg("creator", connect) ~/miniconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 479 480 def connect(self, *cargs, **cparams): --> 481 return self.dbapi.connect(*cargs, **cparams) 482 483 def create_connect_args(self, url): OperationalError: (pyodbc.OperationalError) ('08S01', '[08S01] [FreeTDS][SQL Server]Unable to connect: Adaptive Server is unavailable or does not exist (20009) (SQLDriverConnect)') (Background on this error at: http://sqlalche.me/e/e3q8)
query=("""SELECT gender,count(*) FROM patients
Inner Join encounters
On patients.osler_id=encounters.osler_id
Where encounter_type='Hospital Encounter'
group by gender
order by count(*) DESC""")
df = pd.read_sql_query(query, engine)
df.head(5)
gender | ||
---|---|---|
0 | Female | 58344 |
1 | Male | 28445 |
query=("SELECT distinct osler_id FROM encounters where encounter_type='Hospital Encounter' ")
encs = pd.read_sql_query(query, engine)
query=("SELECT * from patients")
pats = pd.read_sql_query(query, engine)
df=pd.merge(pats,encs, on='osler_id')
df.gender.value_counts()
Female 13229 Male 6222 Name: gender, dtype: int64
If you get an error here, make sure you have been granted access to the Asthma data set?
For the Pandas dataframe named df, here are some useful ways to explore the dataframe.
df.shape
- The dimensions of the data frame in rows, colsdf.size
- The number of data elements in the dataframedf.columns
- A list of the names of the columnsdf.d*
- A list of the data * for each columndf.info()
- A list of the columns with data * as well the number of non-null valuesdf.describe(include='all')
- Descriptive statistics for each column such as min, mean, median, and max.df.head(n)
- Returns the first n records. Defaults to 5len(df)
- The Len function returns the number of rows in the dataframeTry these commands here to get a feel of this dataset
len(df)
631022
Regular Expressions are useful tools for matching patterns of characters in text.
Useful links for Regular Expressions.
For our dataset of medications, there is a pharaceutical class field that we are going to explore to see which patients have prescriptions for rescue inhalers, inhaled corticoids, and oral coriticoids.
df.loc[:,:]
- return all the rows and all the columns df.loc[:,'pharmaceutical_class']
- return all the rows but only the pharmaceutical_class columndf.loc[:,'pharmaceutical_class']=='GLUCOCORTICOIDS']
- return a Boolean mask of all columns that have an exact match to GLUCOCORTICOIDSdf[df.loc[:,'pharmaceutical_class']=='GLUCOCORTICOIDS']]
- filter the dataset wtih only the rows that have an exact match to GLUCOCORTICOIDSdf[df.loc[:,'pharmaceutical_class'].str.contains(r'CORT.+INH', regex=True)]
- Return rows that have a pharmaceutical_class that has the characters CORT followed by any number of characters and then the characters INHdf[df.loc[:,'pharmaceutical_class'].str.contains(r'BETA.+INH.+SHORT', regex=True)]
- Return rows that have a pharmaceutical_class that has the characters BETA followed by any number of characters and then the characters INH followed by any number of characters and then the characters SHORTdf.head()
NameErrorTraceback (most recent call last) <ipython-input-4-c42a15b2c7cf> in <module> ----> 1 df.head() NameError: name 'df' is not defined
#patients with prescriptions based on pharmaceutical class
#Markers of an exacerbation (An OCS prescription)
pts_ocs=df[df.loc[:,'pharmaceutical_class']=='GLUCOCORTICOIDS']
#Markers of an inhaled steroid (An ICS prescription)
pts_ics=df[df.loc[:,'pharmaceutical_class'].str.contains(r'CORT.+INH', regex=True)]
#Markers of an SABA short acting beta antagonist
pts_saba=df[df.loc[:,'pharmaceutical_class'].str.contains(r'BETA.+INH.+SHORT', regex=True)]
print('Number of prescriptions ',len(df))
print('Number of unique patients with a prescription ',len(df.osler_id.unique()))
print('Number of patients with SABA prescription ',len(pts_saba.osler_id.unique()))
print('Number of patients with Inhaled CS prescription ',len(pts_ics.osler_id.unique()))
print('Number of patients with Oral CS prescription ',len(pts_ocs.osler_id.unique()))
Number of prescriptions 631022 Number of unique patients with a prescription 49084 Number of patients with SABA prescription 26951 Number of patients with Inhaled CS prescription 16475 Number of patients with Oral CS prescription 7065
Python Set Objects are derived from list objects. They are a unique list. Like lists, they are mutable and unordered.
set(mylist)
- convert a list of values into a unique setset_a.union(set_b)
- return the union of set_a and set_bset_a.intersection(set_b)
- return the only values that are in both setsset_a.difference(set_b)
- return elements that are in set_a but not in set_bset_all_pts=set(df.osler_id)
set_saba=set(pts_saba.osler_id)
set_ics=set(pts_ics.osler_id)
set_ocs=set(pts_ocs.osler_id)
print('Of {} patients with prescriptions, {:.0%} have a prescription for an inhaler '.format(len(set_all_pts),len(set_saba)/len(set_all_pts)))
print('Of {} patients with prescriptions, {:.0%} have a prescription for an inhaled corticocoid '.format(len(set_all_pts),len(set_ics)/len(set_all_pts)))
print('Of {} patients with prescriptions, {:.0%} have a prescription for an oral corticoid '.format(len(set_all_pts),len(set_ocs)/len(set_all_pts)))
print('{} patients have had at least one of the three prescriptions '.format(len(set_saba.union(set_ics,set_ocs))))
print('{} patients have had all three prescriptions '.format(len(set_saba.intersection(set_ics,set_ocs))))
Of 49084 patients with prescriptions, 55% have a prescription for an inhaler Of 49084 patients with prescriptions, 34% have a prescription for an inhaled corticocoid Of 49084 patients with prescriptions, 14% have a prescription for an oral corticoid 32233 patients have had at least one of the three prescriptions 3493 patients have had all three prescriptions
In case you were wondering if you could do the string filtering in SQL instead of Pandas, you can. In SQL there is the LIKE command that allows % as wildcards. It is not quite as flexible as regex, but it works in this case. Here is the code.
#Define COhort who is getting most of their care here
#(All with outpatients, only inpatients/ED who have outpatients)
#Markers of an exacerbation (An OCS prescription)
query=("SELECT medication_name,count(*) FROM meds "
"where pharmaceutical_class = 'GLUCOCORTICOIDS' group by medication_name")
df3 = pd.read_sql_query(query, engine)
#Markers of an ICS (An OCS prescription)
query=("SELECT distinct pharmaceutical_class FROM meds "
"where pharmaceutical_class like '%CORT%INH%' ")
df3 = pd.read_sql_query(query, engine)
#Markers of an SABA short acting beta antagonist
query=("SELECT medication_name, count(*) FROM meds "
"where pharmaceutical_class like '%BETA%INH%SHORT%' group by medication_name order by count(*) DESC")
df3 = pd.read_sql_query(query, engine)