Moderator note:
This post contains scripts, or links to scripts, that are untested by the Pi-hole team.
We encourage users to be creative, but we are unable to support any potential problems caused by running these scripts
I am sure many have run into the same issue as me. It can be hard to search DNS queries on the web interface for a large time period. It often times out even after adjusting settings to prevent that.
This also may be a niche use, but I have multiple databases backed up from over the years and at times want to go back and search those as well.
So, I came up with this python script that searches your pihole directory (/etc/pihole/ by default) and returns all the .db files you have in there allowing you to select a database to search. You can then enter a domain, client, or both to search for. The results can be saved to CSV in the current directory.
Hopefully this will be of use to someone else as well. Maybe I can adapt it to have a web interface for ease of use and reporting.
Enjoy!
import sqlite3
import csv
import os
import glob
from tqdm import tqdm
from datetime import datetime
while True:
# Search for databases in the /etc/pihole/ directory
db_files = glob.glob('/etc/pihole/pihole-FTL*.db')
# If no database files found, exit the script
if not db_files:
print("No database files found in /etc/pihole/.")
exit(1)
# Print the found databases and ask the user to select one
print("Select a database to search:")
for i, db_file in enumerate(db_files, 1):
print(f"{i}. {db_file}")
db_index = int(input("Enter the number of the database: "))
db_path = db_files[db_index - 1]
# Connect to the SQLite database
conn = sqlite3.connect(db_path)
# Create a cursor object
cur = conn.cursor()
print("Enter search type: domain, client, or both")
search_type = input()
if search_type not in ['domain', 'client', 'both']:
print("Invalid search type")
continue
if search_type == 'both':
print("Enter a domain search term:")
domain_search_term = input()
print("Enter a client search term:")
client_search_term = input()
# Execute a query for both domain and client
cur.execute(f"SELECT * FROM queries WHERE domain LIKE ? AND client LIKE ?",
('%' + domain_search_term + '%', '%' + client_search_term + '%'))
else:
print(f"Enter a search term for {search_type}")
search_term = input()
# Execute a query for either domain or client
cur.execute(f"SELECT * FROM queries WHERE {search_type} LIKE ?", ('%' + search_term + '%',))
# Fetch all rows from the last executed statement
rows = cur.fetchall()
# Initialize tqdm with the total count equal to the number of rows
progress_bar = tqdm(total=len(rows))
# Process rows
processed_rows = []
for row in rows:
# Convert unix timestamp to datetime
timestamp = datetime.fromtimestamp(row[1]).isoformat()
# Replace the timestamp in the row
new_row = (row[0], timestamp) + row[2:]
# Add to processed rows
processed_rows.append(new_row)
# Update the progress bar
progress_bar.update(1)
# Close the progress bar
progress_bar.close()
print(f"Found {len(processed_rows)} items")
# Only ask to save as CSV if there are returned rows
if len(processed_rows) > 0:
print("Would you like to save the results to a CSV file? (yes/no)")
save = input()
if save.lower() == 'yes':
print("Enter a filename for the CSV file:")
filename = input()
with open(filename + '.csv', 'w', newline='') as f:
writer = csv.writer(f)
# Write the column names
writer.writerow(['id', 'timestamp', 'type', 'status', 'domain', 'client', 'forward'])
# Write the data
for row in processed_rows:
writer.writerow(row)
print("Would you like to perform another search? (yes/no)")
again = input()
conn.close()
if again.lower() != 'yes':
break
# Close the connection
conn.close()