Search DNS Queries from CLI and save to CSV

Moderator note:
This post contains scripts, or links to scripts, that are untested by the Pi-hole team.
We encourage users to be creative, but we are unable to support any potential problems caused by running these scripts

I am sure many have run into the same issue as me. It can be hard to search DNS queries on the web interface for a large time period. It often times out even after adjusting settings to prevent that.

This also may be a niche use, but I have multiple databases backed up from over the years and at times want to go back and search those as well.

So, I came up with this python script that searches your pihole directory (/etc/pihole/ by default) and returns all the .db files you have in there allowing you to select a database to search. You can then enter a domain, client, or both to search for. The results can be saved to CSV in the current directory.

Hopefully this will be of use to someone else as well. Maybe I can adapt it to have a web interface for ease of use and reporting.

Enjoy!

import sqlite3
import csv
import os
import glob
from tqdm import tqdm
from datetime import datetime

while True:
    # Search for databases in the /etc/pihole/ directory
    db_files = glob.glob('/etc/pihole/pihole-FTL*.db')

    # If no database files found, exit the script
    if not db_files:
        print("No database files found in /etc/pihole/.")
        exit(1)

    # Print the found databases and ask the user to select one
    print("Select a database to search:")
    for i, db_file in enumerate(db_files, 1):
        print(f"{i}. {db_file}")

    db_index = int(input("Enter the number of the database: "))
    db_path = db_files[db_index - 1]

    # Connect to the SQLite database
    conn = sqlite3.connect(db_path)

    # Create a cursor object
    cur = conn.cursor()

    print("Enter search type: domain, client, or both")
    search_type = input()

    if search_type not in ['domain', 'client', 'both']:
        print("Invalid search type")
        continue

    if search_type == 'both':
        print("Enter a domain search term:")
        domain_search_term = input()
        print("Enter a client search term:")
        client_search_term = input()

        # Execute a query for both domain and client
        cur.execute(f"SELECT * FROM queries WHERE domain LIKE ? AND client LIKE ?",
                    ('%' + domain_search_term + '%', '%' + client_search_term + '%'))
    else:
        print(f"Enter a search term for {search_type}")
        search_term = input()

        # Execute a query for either domain or client
        cur.execute(f"SELECT * FROM queries WHERE {search_type} LIKE ?", ('%' + search_term + '%',))

    # Fetch all rows from the last executed statement
    rows = cur.fetchall()

    # Initialize tqdm with the total count equal to the number of rows
    progress_bar = tqdm(total=len(rows))

    # Process rows
    processed_rows = []
    for row in rows:
        # Convert unix timestamp to datetime
        timestamp = datetime.fromtimestamp(row[1]).isoformat()

        # Replace the timestamp in the row
        new_row = (row[0], timestamp) + row[2:]

        # Add to processed rows
        processed_rows.append(new_row)

        # Update the progress bar
        progress_bar.update(1)

    # Close the progress bar
    progress_bar.close()

    print(f"Found {len(processed_rows)} items")

    # Only ask to save as CSV if there are returned rows
    if len(processed_rows) > 0:
        print("Would you like to save the results to a CSV file? (yes/no)")
        save = input()

        if save.lower() == 'yes':
            print("Enter a filename for the CSV file:")
            filename = input()

            with open(filename + '.csv', 'w', newline='') as f:
                writer = csv.writer(f)
                # Write the column names
                writer.writerow(['id', 'timestamp', 'type', 'status', 'domain', 'client', 'forward'])
                # Write the data
                for row in processed_rows:
                    writer.writerow(row)

    print("Would you like to perform another search? (yes/no)")
    again = input()
    conn.close()
    if again.lower() != 'yes':
        break

# Close the connection
conn.close()