import cx_Oracle
def fetch_column_names(cursor, table_name, owner_name):
query = """
SELECT column_name
FROM sys.all_tab_cols
WHERE table_name = :table_name AND owner = :owner_name
ORDER BY column_id
"""
cursor.execute(query, table_name=table_name.upper(), owner_name=owner_name.upper())
columns = [row[0] for row in cursor.fetchall()]
return columns
def transfer_data(src_conn, dest_conn, table_name, owner_name):
# Create cursors
src_cursor = src_conn.cursor()
dest_cursor = dest_conn.cursor()
try:
# Fetch column names
columns = fetch_column_names(src_cursor, table_name, owner_name)
cols_list = ', '.join(columns)
# Generate SELECT query for source
select_query = f"SELECT {cols_list} FROM {owner_name}.{table_name}"
# Fetch data from source
src_cursor.execute(select_query)
data = src_cursor.fetchall()
# Generate INSERT query for destination
bind_vars = ', '.join([':' + str(i + 1) for i in range(len(columns))])
insert_query = f"INSERT INTO {owner_name}.{table_name} ({cols_list}) VALUES ({bind_vars})"
# Insert data to destination
dest_cursor.executemany(insert_query, data)
dest_conn.commit()
except cx_Oracle.DatabaseError as e:
print("Error occurred:", e)
finally:
src_cursor.close()
dest_cursor.close()
# Main Execution
if __name__ == "__main__":
src_connection = cx_Oracle.connect('user/password@production_db')
dest_connection = cx_Oracle.connect('user/password@dev_db')
table_name = 'YOUR_TABLE_NAME'
owner_name = 'YOUR_OWNER_NAME'
transfer_data(src_connection, dest_connection, table_name, owner_name)
src_connection.close()
dest_connection.close()