import pandas as pd
import warnings
import d6tcollect
d6tcollect.init(__name__)
[docs]class PrintLogger(object):
[docs] def send_log(self, msg, status):
print(msg,status)
[docs] def send(self, data):
print(data)
import os
@d6tcollect.collect
def pd_readsql_query_from_sqlengine(uri, sql, schema_name=None, connect_args=None):
"""
Load SQL statement into pandas dataframe using `sql_engine.execute` making execution faster.
Args:
uri (str): postgres psycopg2 sqlalchemy database uri
sql (str): sql query
schema_name (str): name of schema
connect_args (dict): dictionary of connection arguments to pass to `sqlalchemy.create_engine`
Returns:
df: pandas dataframe
"""
import sqlalchemy
if connect_args is not None:
sql_engine = sqlalchemy.create_engine(uri, connect_args=connect_args)
elif schema_name is not None:
if 'psycopg2' in uri:
sql_engine = sqlalchemy.create_engine(uri, connect_args={'options': '-csearch_path={}'.format(schema_name)})
else:
raise NotImplementedError('only `psycopg2` supported with schema_name, pass connect_args for your db engine')
else:
sql_engine = sqlalchemy.create_engine(uri)
sql = sql_engine.execute(sql)
df = pd.DataFrame(sql.fetchall())
return df
@d6tcollect.collect
def pd_readsql_table_from_sqlengine(uri, table_name, schema_name=None, connect_args=None):
"""
Load SQL table into pandas dataframe using `sql_engine.execute` making execution faster. Convenience function that returns full table.
Args:
uri (str): postgres psycopg2 sqlalchemy database uri
table_name (str): table
schema_name (str): name of schema
connect_args (dict): dictionary of connection arguments to pass to `sqlalchemy.create_engine`
Returns:
df: pandas dataframe
"""
return pd_readsql_query_from_sqlengine(uri, "SELECT * FROM {};".fromat(table_name), schema_name=schema_name, connect_args=connect_args)
@d6tcollect.collect
def pd_to_psql(df, uri, table_name, schema_name=None, if_exists='fail', sep=','):
"""
Load pandas dataframe into a sql table using native postgres COPY FROM.
Args:
df (dataframe): pandas dataframe
uri (str): postgres psycopg2 sqlalchemy database uri
table_name (str): table to store data in
schema_name (str): name of schema in db to write to
if_exists (str): {‘fail’, ‘replace’, ‘append’}, default ‘fail’. See `pandas.to_sql()` for details
sep (str): separator for temp file, eg ',' or '\t'
Returns:
bool: True if loader finished
"""
if not 'psycopg2' in uri:
raise ValueError('need to use psycopg2 uri eg postgresql+psycopg2://psqlusr:psqlpwdpsqlpwd@localhost/psqltest. install with `pip install psycopg2-binary`')
table_name = table_name.lower()
if schema_name:
schema_name = schema_name.lower()
import sqlalchemy
import io
if schema_name is not None:
sql_engine = sqlalchemy.create_engine(uri, connect_args={'options': '-csearch_path={}'.format(schema_name)})
else:
sql_engine = sqlalchemy.create_engine(uri)
sql_cnxn = sql_engine.raw_connection()
cursor = sql_cnxn.cursor()
df[:0].to_sql(table_name, sql_engine, schema=schema_name, if_exists=if_exists, index=False)
fbuf = io.StringIO()
df.to_csv(fbuf, index=False, header=False, sep=sep)
fbuf.seek(0)
cursor.copy_from(fbuf, table_name, sep=sep, null='')
sql_cnxn.commit()
cursor.close()
return True
@d6tcollect.collect
def pd_to_mysql(df, uri, table_name, if_exists='fail', tmpfile='mysql.csv', sep=',', newline='\n'):
"""
Load dataframe into a sql table using native postgres LOAD DATA LOCAL INFILE.
Args:
df (dataframe): pandas dataframe
uri (str): mysql mysqlconnector sqlalchemy database uri
table_name (str): table to store data in
if_exists (str): {‘fail’, ‘replace’, ‘append’}, default ‘fail’. See `pandas.to_sql()` for details
tmpfile (str): filename for temporary file to load from
sep (str): separator for temp file, eg ',' or '\t'
Returns:
bool: True if loader finished
"""
if not 'mysql+mysqlconnector' in uri:
raise ValueError('need to use mysql+mysqlconnector uri eg mysql+mysqlconnector://testusr:testpwd@localhost/testdb. install with `pip install mysql-connector`')
table_name = table_name.lower()
import sqlalchemy
sql_engine = sqlalchemy.create_engine(uri)
df[:0].to_sql(table_name, sql_engine, if_exists=if_exists, index=False)
logger = PrintLogger()
logger.send_log('creating ' + tmpfile, 'ok')
with open(tmpfile, mode='w', newline=newline) as fhandle:
df.to_csv(fhandle, na_rep='\\N', index=False, sep=sep)
logger.send_log('loading ' + tmpfile, 'ok')
sql_load = "LOAD DATA LOCAL INFILE '{}' INTO TABLE {} FIELDS TERMINATED BY '{}' LINES TERMINATED BY '{}' IGNORE 1 LINES;".format(tmpfile, table_name, sep, newline)
sql_engine.execute(sql_load)
os.remove(tmpfile)
return True
@d6tcollect.collect
def pd_to_mssql(df, uri, table_name, schema_name=None, if_exists='fail', tmpfile='mysql.csv'):
"""
Load dataframe into a sql table using native postgres LOAD DATA LOCAL INFILE.
Args:
df (dataframe): pandas dataframe
uri (str): mysql mysqlconnector sqlalchemy database uri
table_name (str): table to store data in
schema_name (str): name of schema in db to write to
if_exists (str): {‘fail’, ‘replace’, ‘append’}, default ‘fail’. See `pandas.to_sql()` for details
tmpfile (str): filename for temporary file to load from
Returns:
bool: True if loader finished
"""
if not 'mssql+pymssql' in uri:
raise ValueError('need to use mssql+pymssql uri (conda install -c prometeia pymssql)')
table_name = table_name.lower()
if schema_name:
schema_name = schema_name.lower()
warnings.warn('`.pd_to_mssql()` is experimental, if any problems please raise an issue on https://github.com/d6t/d6tstack/issues or make a pull request')
import sqlalchemy
sql_engine = sqlalchemy.create_engine(uri)
df[:0].to_sql(table_name, sql_engine, if_exists=if_exists, index=False)
logger = PrintLogger()
logger.send_log('creating ' + tmpfile, 'ok')
df.to_csv(tmpfile, na_rep='\\N', index=False)
logger.send_log('loading ' + tmpfile, 'ok')
if schema_name is not None:
table_name = '{}.{}'.format(schema_name,table_name)
sql_load = "BULK INSERT {} FROM '{}';".format(table_name, tmpfile)
sql_engine.execute(sql_load)
os.remove(tmpfile)
return True