From 95853a24ef16b31da4ce13e3e6b10ab8c0250113 Mon Sep 17 00:00:00 2001
From: Andrew Asseily <77591070+AndrewAsseily@users.noreply.github.com>
Date: Thu, 12 Feb 2026 13:43:34 -0500
Subject: [PATCH] Tighten history file permissions (#10028)

* Tighten history file permissions

* Pre-commit formatting

* add changelog entry

* Update file permissions fix

* Show warning instead of failing when history database cannot be opened
---
 .../enhancement-clihistory-71745.json         |   5 +
 awscli/customizations/history/__init__.py     |  57 +-
 awscli/customizations/history/db.py           |  61 +-
 tests/integration/test_cli.py                 |   9 +
 tests/unit/customizations/history/test_db.py  | 530 +++++++++++-------
 .../customizations/history/test_history.py    | 108 ++--
 6 files changed, 487 insertions(+), 283 deletions(-)
 create mode 100644 .changes/next-release/enhancement-clihistory-71745.json

diff --git a/.changes/next-release/enhancement-clihistory-71745.json b/.changes/next-release/enhancement-clihistory-71745.json
new file mode 100644
index 000000000..8bc65bcbf
--- /dev/null
+++ b/.changes/next-release/enhancement-clihistory-71745.json
@@ -0,0 +1,5 @@
+{
+  "type": "enhancement",
+  "category": "cli-history",
+  "description": "Create local history files with specific permissions"
+}
diff --git a/awscli/customizations/history/__init__.py b/awscli/customizations/history/__init__.py
index a3e580827..30091f036 100644
--- a/awscli/customizations/history/__init__.py
+++ b/awscli/customizations/history/__init__.py
@@ -10,37 +10,40 @@
 # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
+import logging
 import os
 import sys
-import logging
 
-from botocore.history import get_global_history_recorder
 from botocore.exceptions import ProfileNotFound
+from botocore.history import get_global_history_recorder
 
 from awscli.compat import sqlite3
 from awscli.customizations.commands import BasicCommand
-from awscli.customizations.history.constants import HISTORY_FILENAME_ENV_VAR
-from awscli.customizations.history.constants import DEFAULT_HISTORY_FILENAME
-from awscli.customizations.history.db import DatabaseConnection
-from awscli.customizations.history.db import DatabaseRecordWriter
-from awscli.customizations.history.db import RecordBuilder
-from awscli.customizations.history.db import DatabaseHistoryHandler
-from awscli.customizations.history.show import ShowCommand
+from awscli.customizations.history.constants import (
+    DEFAULT_HISTORY_FILENAME,
+    HISTORY_FILENAME_ENV_VAR,
+)
+from awscli.customizations.history.db import (
+    DatabaseConnection,
+    DatabaseHistoryHandler,
+    DatabaseRecordWriter,
+    RecordBuilder,
+)
 from awscli.customizations.history.list import ListCommand
-
+from awscli.customizations.history.show import ShowCommand
 
 LOG = logging.getLogger(__name__)
 HISTORY_RECORDER = get_global_history_recorder()
 
 
 def register_history_mode(event_handlers):
-    event_handlers.register(
-        'session-initialized', attach_history_handler)
+    event_handlers.register('session-initialized', attach_history_handler)
 
 
 def register_history_commands(event_handlers):
     event_handlers.register(
-        "building-command-table.main", add_history_commands)
+        "building-command-table.main", add_history_commands
+    )
 
 
 def attach_history_handler(session, parsed_args, **kwargs):
@@ -48,11 +51,21 @@ def attach_history_handler(session, parsed_args, **kwargs):
         LOG.debug('Enabling CLI history')
 
         history_filename = os.environ.get(
-            HISTORY_FILENAME_ENV_VAR, DEFAULT_HISTORY_FILENAME)
-        if not os.path.isdir(os.path.dirname(history_filename)):
-            os.makedirs(os.path.dirname(history_filename))
-
-        connection = DatabaseConnection(history_filename)
+            HISTORY_FILENAME_ENV_VAR, DEFAULT_HISTORY_FILENAME
+        )
+        history_dir = os.path.dirname(history_filename)
+        if not os.path.isdir(history_dir):
+            os.makedirs(history_dir)
+
+        try:
+            connection = DatabaseConnection(history_filename)
+        except Exception as e:
+            LOG.debug('Unable to open history database: %s', e)
+            sys.stderr.write(
+                'Warning: Unable to record CLI history. '
+                'Check file permissions for %s\n' % history_filename
+            )
+            return
         writer = DatabaseRecordWriter(connection)
         record_builder = RecordBuilder()
         db_handler = DatabaseHistoryHandler(writer, record_builder)
@@ -98,10 +111,12 @@ class HistoryCommand(BasicCommand):
     )
     SUBCOMMANDS = [
         {'name': 'show', 'command_class': ShowCommand},
-        {'name': 'list', 'command_class': ListCommand}
+        {'name': 'list', 'command_class': ListCommand},
     ]
 
     def _run_main(self, parsed_args, parsed_globals):
         if parsed_args.subcommand is None:
-            raise ValueError("usage: aws [options] <command> <subcommand> "
-                             "[parameters]\naws: error: too few arguments")
+            raise ValueError(
+                "usage: aws [options] <command> <subcommand> "
+                "[parameters]\naws: error: too few arguments"
+            )
diff --git a/awscli/customizations/history/db.py b/awscli/customizations/history/db.py
index bdb96d1dc..bd1a292da 100644
--- a/awscli/customizations/history/db.py
+++ b/awscli/customizations/history/db.py
@@ -10,24 +10,22 @@
 # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
-import uuid
-import time
-import json
 import datetime
-import threading
+import json
 import logging
-from awscli.compat import collections_abc
+import os
+import threading
+import time
+import uuid
 
 from botocore.history import BaseHistoryHandler
 
-from awscli.compat import sqlite3
-from awscli.compat import binary_type
-
+from awscli.compat import binary_type, collections_abc, sqlite3
 
 LOG = logging.getLogger(__name__)
 
 
-class DatabaseConnection(object):
+class DatabaseConnection:
     _CREATE_TABLE = """
         CREATE TABLE IF NOT EXISTS records (
           id TEXT,
@@ -40,13 +38,26 @@ class DatabaseConnection(object):
     _ENABLE_WAL = 'PRAGMA journal_mode=WAL'
 
     def __init__(self, db_filename):
+        self._db_filename = db_filename
         self._connection = sqlite3.connect(
-            db_filename, check_same_thread=False, isolation_level=None)
+            db_filename, check_same_thread=False, isolation_level=None
+        )
+        self._set_file_permissions()
         self._ensure_database_setup()
 
     def close(self):
         self._connection.close()
 
+    def _set_file_permissions(self):
+        for suffix in ('', '-wal', '-shm'):
+            path = self._db_filename + suffix
+            if not os.path.exists(path):
+                continue
+            try:
+                os.chmod(path, 0o600)
+            except OSError as e:
+                LOG.debug('Unable to set file permissions for %s: %s', path, e)
+
     def execute(self, query, *parameters):
         return self._connection.execute(query, *parameters)
 
@@ -92,8 +103,9 @@ class PayloadSerializer(json.JSONEncoder):
         if isinstance(obj, str):
             obj = self._try_decode_bytes(obj)
         elif isinstance(obj, dict):
-            obj = dict((k, self._remove_non_unicode_stings(v)) for k, v
-                       in obj.items())
+            obj = dict(
+                (k, self._remove_non_unicode_stings(v)) for k, v in obj.items()
+            )
         elif isinstance(obj, (list, tuple)):
             obj = [self._remove_non_unicode_stings(o) for o in obj]
         return obj
@@ -132,7 +144,7 @@ class PayloadSerializer(json.JSONEncoder):
             return repr(obj)
 
 
-class DatabaseRecordWriter(object):
+class DatabaseRecordWriter:
     _WRITE_RECORD = """
         INSERT INTO records(
             id, request_id, source, event_type, timestamp, payload)
@@ -152,26 +164,30 @@ class DatabaseRecordWriter(object):
 
     def _create_db_record(self, record):
         event_type = record['event_type']
-        json_serialized_payload = json.dumps(record['payload'],
-                                             cls=PayloadSerializer)
+        json_serialized_payload = json.dumps(
+            record['payload'], cls=PayloadSerializer
+        )
         db_record = (
             record['command_id'],
             record.get('request_id'),
             record['source'],
             event_type,
             record['timestamp'],
-            json_serialized_payload
+            json_serialized_payload,
         )
         return db_record
 
 
-class DatabaseRecordReader(object):
+class DatabaseRecordReader:
     _ORDERING = 'ORDER BY timestamp'
-    _GET_LAST_ID_RECORDS = """
+    _GET_LAST_ID_RECORDS = (
+        """
         SELECT * FROM records
         WHERE id =
         (SELECT id FROM records WHERE timestamp =
-        (SELECT max(timestamp) FROM records)) %s;""" % _ORDERING
+        (SELECT max(timestamp) FROM records)) %s;"""
+        % _ORDERING
+    )
     _GET_RECORDS_BY_ID = 'SELECT * from records where id = ? %s' % _ORDERING
     _GET_ALL_RECORDS = (
         'SELECT a.id AS id_a, '
@@ -218,9 +234,10 @@ class DatabaseRecordReader(object):
             yield row
 
 
-class RecordBuilder(object):
+class RecordBuilder:
     _REQUEST_LIFECYCLE_EVENTS = set(
-        ['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE'])
+        ['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE']
+    )
     _START_OF_REQUEST_LIFECYCLE_EVENT = 'API_CALL'
 
     def __init__(self):
@@ -254,7 +271,7 @@ class RecordBuilder(object):
             'event_type': event_type,
             'payload': payload,
             'source': source,
-            'timestamp': int(time.time() * 1000)
+            'timestamp': int(time.time() * 1000),
         }
         request_id = self._get_request_id(event_type)
         if request_id:
diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py
index bf63d45c7..1ff435155 100644
--- a/tests/integration/test_cli.py
+++ b/tests/integration/test_cli.py
@@ -267,6 +267,15 @@ class TestBasicCommandFunctionality(unittest.TestCase):
             p.stdout.startswith('aws-cli')
         self.assertTrue(version_output, p.stderr)
 
+    def test_version_does_not_create_cache_directory(self):
+        # Regression test: --version should not create any files/directories.
+        with tempfile.TemporaryDirectory() as tmpdir:
+            env = os.environ.copy()
+            env['HOME'] = tmpdir
+            aws('--version', env_vars=env)
+            aws_dir = os.path.join(tmpdir, '.aws')
+            self.assertFalse(os.path.exists(aws_dir))
+
     def test_traceback_printed_when_debug_on(self):
         p = aws('ec2 describe-instances --filters BADKEY=foo --debug')
         self.assertIn('Traceback (most recent call last):', p.stderr, p.stderr)
diff --git a/tests/unit/customizations/history/test_db.py b/tests/unit/customizations/history/test_db.py
index c481d06cd..1375d942e 100644
--- a/tests/unit/customizations/history/test_db.py
+++ b/tests/unit/customizations/history/test_db.py
@@ -10,25 +10,28 @@
 # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
 # ANY KIND, either express or implied. See the License for the specific
 # language governing permissions and limitations under the License.
+import datetime
+import json
+import numbers
 import os
 import re
-import json
+import stat
 import threading
-import datetime
-import numbers
 
 from awscli.compat import queue
-from awscli.customizations.history.db import DatabaseConnection
-from awscli.customizations.history.db import DatabaseHistoryHandler
-from awscli.customizations.history.db import DatabaseRecordWriter
-from awscli.customizations.history.db import DatabaseRecordReader
-from awscli.customizations.history.db import PayloadSerializer
-from awscli.customizations.history.db import RecordBuilder
-from awscli.testutils import mock, unittest, FileCreator
+from awscli.customizations.history.db import (
+    DatabaseConnection,
+    DatabaseHistoryHandler,
+    DatabaseRecordReader,
+    DatabaseRecordWriter,
+    PayloadSerializer,
+    RecordBuilder,
+)
+from awscli.testutils import FileCreator, mock, skip_if_windows, unittest
 from tests import CaseInsensitiveDict
 
 
-class FakeDatabaseConnection(object):
+class FakeDatabaseConnection:
     def __init__(self):
         self.execute = mock.MagicMock()
         self.closed = False
@@ -46,13 +49,21 @@ class TestGetHistoryDBFilename(unittest.TestCase):
 
 
 class TestDatabaseConnection(unittest.TestCase):
+    @mock.patch(
+        'awscli.customizations.history.db.os.path.exists', return_value=True
+    )
+    @mock.patch('awscli.customizations.history.db.os.chmod')
     @mock.patch('awscli.compat.sqlite3.connect')
-    def test_can_connect_to_argument_file(self, mock_connect):
-        expected_location = os.path.expanduser(os.path.join(
-            '~', 'foo', 'bar', 'baz.db'))
+    def test_can_connect_to_argument_file(
+        self, mock_connect, mock_chmod, mock_exists
+    ):
+        expected_location = os.path.expanduser(
+            os.path.join('~', 'foo', 'bar', 'baz.db')
+        )
         DatabaseConnection(expected_location)
         mock_connect.assert_called_with(
-            expected_location, check_same_thread=False, isolation_level=None)
+            expected_location, check_same_thread=False, isolation_level=None
+        )
 
     @mock.patch('awscli.compat.sqlite3.connect')
     def test_does_try_to_enable_wal(self, mock_connect):
@@ -82,10 +93,32 @@ class TestDatabaseConnection(unittest.TestCase):
         self.assertTrue(connection.close.called)
 
 
+@skip_if_windows
+class TestDatabaseConnectionPermissions:
+    def setup_method(self):
+        self.files = FileCreator()
+
+    def teardown_method(self):
+        self.files.remove_all()
+
+    def test_create_new_file_with_secure_permissions(self):
+        db_path = self.files.full_path('history.db')
+        DatabaseConnection(db_path)
+        file_mode = stat.S_IMODE(os.stat(db_path).st_mode)
+        assert file_mode == 0o600
+
+    def test_tighten_existing_file_permissions(self):
+        db_path = self.files.full_path('history.db')
+        open(db_path, 'a').close()
+        os.chmod(db_path, 0o644)
+        DatabaseConnection(db_path)
+        file_mode = stat.S_IMODE(os.stat(db_path).st_mode)
+        assert file_mode == 0o600
+
+
 class TestDatabaseHistoryHandler(unittest.TestCase):
     UUID_PATTERN = re.compile(
-        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$',
-        re.I
+        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', re.I
     )
 
     def test_emit_does_write_cli_rc_record(self):
@@ -94,13 +127,16 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         handler = DatabaseHistoryHandler(writer, record_builder)
         handler.emit('CLI_RC', 0, 'CLI')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'event_type': 'CLI_RC',
-                    'payload': 0,
-                    'source': 'CLI',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'event_type': 'CLI_RC',
+                'payload': 0,
+                'source': 'CLI',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertIsInstance(call['timestamp'], numbers.Number)
 
@@ -110,13 +146,16 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         handler = DatabaseHistoryHandler(writer, record_builder)
         handler.emit('CLI_VERSION', 'Version Info', 'CLI')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'event_type': 'CLI_VERSION',
-                    'payload': 'Version Info',
-                    'source': 'CLI',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'event_type': 'CLI_VERSION',
+                'payload': 'Version Info',
+                'source': 'CLI',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertIsInstance(call['timestamp'], numbers.Number)
 
@@ -127,14 +166,17 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         payload = {'foo': 'bar'}
         handler.emit('API_CALL', payload, 'BOTOCORE')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'request_id': mock.ANY,
-                    'event_type': 'API_CALL',
-                    'payload': payload,
-                    'source': 'BOTOCORE',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'request_id': mock.ANY,
+                'event_type': 'API_CALL',
+                'payload': payload,
+                'source': 'BOTOCORE',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertTrue(self.UUID_PATTERN.match(call['request_id']))
 
@@ -148,14 +190,17 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         handler.emit('API_CALL', '', 'BOTOCORE')
         handler.emit('HTTP_REQUEST', payload, 'BOTOCORE')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'request_id': mock.ANY,
-                    'event_type': 'HTTP_REQUEST',
-                    'payload': payload,
-                    'source': 'BOTOCORE',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'request_id': mock.ANY,
+                'event_type': 'HTTP_REQUEST',
+                'payload': payload,
+                'source': 'BOTOCORE',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertTrue(self.UUID_PATTERN.match(call['request_id']))
 
@@ -169,14 +214,17 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         handler.emit('API_CALL', '', 'BOTOCORE')
         handler.emit('HTTP_RESPONSE', payload, 'BOTOCORE')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'request_id': mock.ANY,
-                    'event_type': 'HTTP_RESPONSE',
-                    'payload': payload,
-                    'source': 'BOTOCORE',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'request_id': mock.ANY,
+                'event_type': 'HTTP_RESPONSE',
+                'payload': payload,
+                'source': 'BOTOCORE',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertTrue(self.UUID_PATTERN.match(call['request_id']))
 
@@ -190,14 +238,17 @@ class TestDatabaseHistoryHandler(unittest.TestCase):
         handler.emit('API_CALL', '', 'BOTOCORE')
         handler.emit('PARSED_RESPONSE', payload, 'BOTOCORE')
         call = writer.write_record.call_args[0][0]
-        self.assertEqual(call, {
-                    'command_id': mock.ANY,
-                    'request_id': mock.ANY,
-                    'event_type': 'PARSED_RESPONSE',
-                    'payload': payload,
-                    'source': 'BOTOCORE',
-                    'timestamp': mock.ANY
-        })
+        self.assertEqual(
+            call,
+            {
+                'command_id': mock.ANY,
+                'request_id': mock.ANY,
+                'event_type': 'PARSED_RESPONSE',
+                'payload': payload,
+                'source': 'BOTOCORE',
+                'timestamp': mock.ANY,
+            },
+        )
         self.assertTrue(self.UUID_PATTERN.match(call['command_id']))
         self.assertTrue(self.UUID_PATTERN.match(call['request_id']))
 
@@ -207,13 +258,12 @@ class BaseDatabaseRecordTester(unittest.TestCase):
         for line in lines:
             self.assertIn(line, contents)
             beginning = contents.find(line)
-            contents = contents[(beginning + len(line)):]
+            contents = contents[(beginning + len(line)) :]
 
 
 class BaseDatabaseRecordWriterTester(BaseDatabaseRecordTester):
     UUID_PATTERN = re.compile(
-        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$',
-        re.I
+        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', re.I
     )
 
     def setUp(self):
@@ -237,180 +287,233 @@ class TestDatabaseRecordWriter(BaseDatabaseRecordWriterTester):
         self.assertTrue(connection.close.called)
 
     def test_can_write_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'FOO',
-            'payload': 'bar',
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'FOO',
+                'payload': 'bar',
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         # Now that we have verified the order of the fields in the insert
         # statement we can verify that the record values are in the correct
         # order in the tuple.
         # (command_id, request_id, source, event_type, timestamp, payload)
         written_record = self._read_last_record()
-        self.assertEqual(written_record,
-                         ('command', None, 'TEST', 'FOO', 1234, '"bar"'))
+        self.assertEqual(
+            written_record, ('command', None, 'TEST', 'FOO', 1234, '"bar"')
+        )
 
     def test_commit_count_matches_write_count(self):
         records_to_write = 10
         for _ in range(records_to_write):
-            self.writer.write_record({
-                'command_id': 'command',
-                'event_type': 'foo',
-                'payload': '',
-                'source': 'TEST',
-                'timestamp': 1234
-            })
+            self.writer.write_record(
+                {
+                    'command_id': 'command',
+                    'event_type': 'foo',
+                    'payload': '',
+                    'source': 'TEST',
+                    'timestamp': 1234,
+                }
+            )
         cursor = self.db.execute('SELECT COUNT(*) FROM records')
         record_count = cursor.fetchone()[0]
 
         self.assertEqual(record_count, records_to_write)
 
     def test_can_write_cli_version_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'CLI_VERSION',
-            'payload': ('aws-cli/1.11.184 Python/3.6.2 Darwin/15.6.0 '
-                        'botocore/1.7.42'),
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'CLI_VERSION',
+                'payload': (
+                    'aws-cli/1.11.184 Python/3.6.2 Darwin/15.6.0 '
+                    'botocore/1.7.42'
+                ),
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
         written_record = self._read_last_record()
 
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'CLI_VERSION', 1234,
-             '"aws-cli/1.11.184 Python/3.6.2 Darwin/15.6.0 botocore/1.7.42"')
+            (
+                'command',
+                None,
+                'TEST',
+                'CLI_VERSION',
+                1234,
+                '"aws-cli/1.11.184 Python/3.6.2 Darwin/15.6.0 botocore/1.7.42"',
+            ),
         )
 
     def test_can_write_cli_arguments_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'CLI_ARGUMENTS',
-            'payload': ['s3', 'ls'],
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'CLI_ARGUMENTS',
+                'payload': ['s3', 'ls'],
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'CLI_ARGUMENTS', 1234, '["s3", "ls"]')
+            ('command', None, 'TEST', 'CLI_ARGUMENTS', 1234, '["s3", "ls"]'),
         )
 
     def test_can_write_api_call_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'API_CALL',
-            'payload': {
-                'service': 's3',
-                'operation': 'ListBuckets',
-                'params': {},
-            },
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'API_CALL',
+                'payload': {
+                    'service': 's3',
+                    'operation': 'ListBuckets',
+                    'params': {},
+                },
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'API_CALL', 1234, json.dumps({
-                'service': 's3',
-                'operation': 'ListBuckets',
-                'params': {},
-            }))
+            (
+                'command',
+                None,
+                'TEST',
+                'API_CALL',
+                1234,
+                json.dumps(
+                    {
+                        'service': 's3',
+                        'operation': 'ListBuckets',
+                        'params': {},
+                    }
+                ),
+            ),
         )
 
     def test_can_write_http_request_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'HTTP_REQUEST',
-            'payload': {
-                'method': 'GET',
-                'headers': CaseInsensitiveDict({}),
-                'body': '...',
-            },
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'HTTP_REQUEST',
+                'payload': {
+                    'method': 'GET',
+                    'headers': CaseInsensitiveDict({}),
+                    'body': '...',
+                },
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'HTTP_REQUEST', 1234, json.dumps({
-                'method': 'GET',
-                'headers': {},
-                'body': '...',
-            }))
+            (
+                'command',
+                None,
+                'TEST',
+                'HTTP_REQUEST',
+                1234,
+                json.dumps(
+                    {
+                        'method': 'GET',
+                        'headers': {},
+                        'body': '...',
+                    }
+                ),
+            ),
         )
 
     def test_can_write_http_response_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'HTTP_RESPONSE',
-            'payload': {
-                'streaming': False,
-                'headers': {},
-                'body': '...',
-                'status_code': 200,
-                'request_id': '1234abcd'
-            },
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'HTTP_RESPONSE',
+                'payload': {
+                    'streaming': False,
+                    'headers': {},
+                    'body': '...',
+                    'status_code': 200,
+                    'request_id': '1234abcd',
+                },
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'HTTP_RESPONSE', 1234, json.dumps({
-                'streaming': False,
-                'headers': {},
-                'body': '...',
-                'status_code': 200,
-                'request_id': '1234abcd'
-            }))
+            (
+                'command',
+                None,
+                'TEST',
+                'HTTP_RESPONSE',
+                1234,
+                json.dumps(
+                    {
+                        'streaming': False,
+                        'headers': {},
+                        'body': '...',
+                        'status_code': 200,
+                        'request_id': '1234abcd',
+                    }
+                ),
+            ),
         )
 
     def test_can_write_parsed_response_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'PARSED_RESPONSE',
-            'payload': {},
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'PARSED_RESPONSE',
+                'payload': {},
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
             written_record,
-            ('command', None, 'TEST', 'PARSED_RESPONSE', 1234, '{}')
+            ('command', None, 'TEST', 'PARSED_RESPONSE', 1234, '{}'),
         )
 
     def test_can_write_cli_rc_record(self):
-        self.writer.write_record({
-            'command_id': 'command',
-            'event_type': 'CLI_RC',
-            'payload': 0,
-            'source': 'TEST',
-            'timestamp': 1234
-        })
+        self.writer.write_record(
+            {
+                'command_id': 'command',
+                'event_type': 'CLI_RC',
+                'payload': 0,
+                'source': 'TEST',
+                'timestamp': 1234,
+            }
+        )
 
         written_record = self._read_last_record()
         self.assertEqual(
-            written_record,
-            ('command', None, 'TEST', 'CLI_RC', 1234, '0')
+            written_record, ('command', None, 'TEST', 'CLI_RC', 1234, '0')
         )
 
 
-class ThreadedRecordBuilder(object):
+class ThreadedRecordBuilder:
     def __init__(self, tracker):
         self._read_q = queue.Queue()
         self._write_q = queue.Queue()
         self._thread = threading.Thread(
-            target=self._threaded_request_tracker,
-            args=(tracker,))
+            target=self._threaded_request_tracker, args=(tracker,)
+        )
 
     def _threaded_request_tracker(self, builder):
         while True:
@@ -498,8 +601,9 @@ class TestDatabaseRecordReader(BaseDatabaseRecordTester):
         self.assertTrue(self.fake_connection.closed)
 
     def test_row_factory_set(self):
-        self.assertEqual(self.fake_connection.row_factory,
-                         self.reader._row_factory)
+        self.assertEqual(
+            self.fake_connection.row_factory, self.reader._row_factory
+        )
 
     def test_iter_latest_records_performs_correct_query(self):
         expected_query = (
@@ -511,27 +615,32 @@ class TestDatabaseRecordReader(BaseDatabaseRecordTester):
         [_ for _ in self.reader.iter_latest_records()]
         self.assertEqual(
             self.fake_connection.execute.call_args[0][0].strip(),
-            expected_query.strip())
+            expected_query.strip(),
+        )
 
     def test_iter_latest_records_does_iter_records(self):
         records_to_get = [1, 2, 3]
         self.fake_connection.execute.return_value.__iter__.return_value = iter(
-            records_to_get)
+            records_to_get
+        )
         records = [r for r in self.reader.iter_latest_records()]
         self.assertEqual(records, records_to_get)
 
     def test_iter_records_performs_correct_query(self):
-        expected_query = ('SELECT * from records where id = ? '
-                          'ORDER BY timestamp')
+        expected_query = (
+            'SELECT * from records where id = ? ' 'ORDER BY timestamp'
+        )
         [_ for _ in self.reader.iter_records('fake_id')]
         self.assertEqual(
             self.fake_connection.execute.call_args[0][0].strip(),
-            expected_query.strip())
+            expected_query.strip(),
+        )
 
     def test_iter_records_does_iter_records(self):
         records_to_get = [1, 2, 3]
         self.fake_connection.execute.return_value.__iter__.return_value = iter(
-            records_to_get)
+            records_to_get
+        )
         records = [r for r in self.reader.iter_records('fake_id')]
         self.assertEqual(records, records_to_get)
 
@@ -542,10 +651,8 @@ class TestPayloadSerialzier(unittest.TestCase):
             'string': 'foo',
             'int': 4,
             'list': [1, 2, 'bar'],
-            'dict': {
-                'sun': 'moon'
-            },
-            'float': 1.2
+            'dict': {'sun': 'moon'},
+            'float': 1.2,
         }
         string_value = json.dumps(original, cls=PayloadSerializer)
         reloaded = json.loads(string_value)
@@ -559,9 +666,7 @@ class TestPayloadSerialzier(unittest.TestCase):
         self.assertEqual(iso_now, reloaded)
 
     def test_can_serialize_case_insensitive_dict(self):
-        original = CaseInsensitiveDict({
-            'fOo': 'bar'
-        })
+        original = CaseInsensitiveDict({'fOo': 'bar'})
         string_value = json.dumps(original, cls=PayloadSerializer)
         reloaded = json.loads(string_value)
         self.assertEqual(original, reloaded)
@@ -615,8 +720,8 @@ class TestPayloadSerialzier(unittest.TestCase):
             'list': ['foo', b'\xfe\xed'],
             'more_nesting': {
                 'bytes': b'\xfe\xed',
-                'tuple': ('bar', 'baz', b'\xfe\ed')
-            }
+                'tuple': ('bar', 'baz', b'\xfe\ed'),
+            },
         }
         encoded = {
             'foo': 'bar',
@@ -624,8 +729,8 @@ class TestPayloadSerialzier(unittest.TestCase):
             'list': ['foo', '<Byte sequence>'],
             'more_nesting': {
                 'bytes': '<Byte sequence>',
-                'tuple': ['bar', 'baz', '<Byte sequence>']
-            }
+                'tuple': ['bar', 'baz', '<Byte sequence>'],
+            },
         }
         string_value = json.dumps(original, cls=PayloadSerializer)
         reloaded = json.loads(string_value)
@@ -634,8 +739,7 @@ class TestPayloadSerialzier(unittest.TestCase):
 
 class TestRecordBuilder(unittest.TestCase):
     UUID_PATTERN = re.compile(
-        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$',
-        re.I
+        '^[0-9a-f]{8}-([0-9a-f]{4}-){3}[0-9a-f]{12}$', re.I
     )
 
     def setUp(self):
@@ -684,7 +788,8 @@ class TestRecordBuilder(unittest.TestCase):
     def test_does_get_id_for_http_request_with_api_call(self):
         call_identifier = self._get_request_id_for_event_type('API_CALL')
         request_identifier = self._get_request_id_for_event_type(
-            'HTTP_REQUEST')
+            'HTTP_REQUEST'
+        )
 
         self.assertEqual(call_identifier, request_identifier)
         self.assertTrue(self.UUID_PATTERN.match(call_identifier))
@@ -692,7 +797,8 @@ class TestRecordBuilder(unittest.TestCase):
     def test_does_get_id_for_http_response_with_api_call(self):
         call_identifier = self._get_request_id_for_event_type('API_CALL')
         response_identifier = self._get_request_id_for_event_type(
-            'HTTP_RESPONSE')
+            'HTTP_RESPONSE'
+        )
 
         self.assertEqual(call_identifier, response_identifier)
         self.assertTrue(self.UUID_PATTERN.match(call_identifier))
@@ -700,7 +806,8 @@ class TestRecordBuilder(unittest.TestCase):
     def test_does_get_id_for_parsed_response_with_api_call(self):
         call_identifier = self._get_request_id_for_event_type('API_CALL')
         response_identifier = self._get_request_id_for_event_type(
-            'PARSED_RESPONSE')
+            'PARSED_RESPONSE'
+        )
 
         self.assertEqual(call_identifier, response_identifier)
         self.assertTrue(self.UUID_PATTERN.match(call_identifier))
@@ -725,29 +832,38 @@ class TestIdentifierLifecycles(unittest.TestCase):
     def _get_multiple_request_ids(self, events):
         fake_payload = {'body': b''}
         request_ids = [
-            self.builder.build_record(
-                event,
-                fake_payload.copy(),
-                ''
-            )['request_id']
+            self.builder.build_record(event, fake_payload.copy(), '')[
+                'request_id'
+            ]
             for event in events
         ]
         return request_ids
 
     def test_multiple_http_lifecycle_writes_have_same_request_id(self):
         request_ids = self._get_multiple_request_ids(
-             ['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE']
-         )
+            ['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE']
+        )
         # All request_ids should match since this is one request lifecycle
         unique_request_ids = set(request_ids)
         self.assertEqual(len(unique_request_ids), 1)
 
     def test_request_id_reset_on_api_call(self):
         request_ids = self._get_multiple_request_ids(
-             ['API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE',
-              'API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE',
-              'API_CALL', 'HTTP_REQUEST', 'HTTP_RESPONSE', 'PARSED_RESPONSE']
-         )
+            [
+                'API_CALL',
+                'HTTP_REQUEST',
+                'HTTP_RESPONSE',
+                'PARSED_RESPONSE',
+                'API_CALL',
+                'HTTP_REQUEST',
+                'HTTP_RESPONSE',
+                'PARSED_RESPONSE',
+                'API_CALL',
+                'HTTP_REQUEST',
+                'HTTP_RESPONSE',
+                'PARSED_RESPONSE',
+            ]
+        )
 
         # There should be three distinct requet_ids since there are three
         # distinct calls that end with a parsed response.
diff --git a/tests/unit/customizations/history/test_history.py b/tests/unit/customizations/history/test_history.py
index 5e364b98e..a96c48026 100644
--- a/tests/unit/customizations/history/test_history.py
+++ b/tests/unit/customizations/history/test_history.py
@@ -13,16 +13,18 @@
 import argparse
 import os
 
-from botocore.session import Session
-from botocore.history import HistoryRecorder
 from botocore.exceptions import ProfileNotFound
+from botocore.history import HistoryRecorder
+from botocore.session import Session
 
-from awscli.testutils import unittest, mock, FileCreator
 from awscli.compat import StringIO
-from awscli.customizations.history import attach_history_handler
-from awscli.customizations.history import add_history_commands
-from awscli.customizations.history import HistoryCommand
+from awscli.customizations.history import (
+    HistoryCommand,
+    add_history_commands,
+    attach_history_handler,
+)
 from awscli.customizations.history.db import DatabaseHistoryHandler
+from awscli.testutils import FileCreator, mock, unittest
 
 
 class TestAttachHistoryHandler(unittest.TestCase):
@@ -34,9 +36,12 @@ class TestAttachHistoryHandler(unittest.TestCase):
 
     @mock.patch('awscli.customizations.history.sqlite3')
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
-    def test_attach_history_handler(self, mock_recorder, mock_db_sqlite3, mock_sqlite3):
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
+    def test_attach_history_handler(
+        self, mock_recorder, mock_db_sqlite3, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.return_value = {
             'cli_history': 'enabled'
@@ -48,15 +53,18 @@ class TestAttachHistoryHandler(unittest.TestCase):
         attach_history_handler(session=mock_session, parsed_args=parsed_args)
         self.assertEqual(mock_recorder.add_handler.call_count, 1)
         self.assertIsInstance(
-            mock_recorder.add_handler.call_args[0][0], DatabaseHistoryHandler)
+            mock_recorder.add_handler.call_args[0][0], DatabaseHistoryHandler
+        )
         self.assertTrue(mock_db_sqlite3.connect.called)
 
     @mock.patch('awscli.customizations.history.sqlite3')
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
     def test_no_attach_history_handler_when_history_not_configured(
-            self, mock_recorder, mock_db_sqlite3, mock_sqlite3):
+        self, mock_recorder, mock_db_sqlite3, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.return_value = {}
 
@@ -69,10 +77,12 @@ class TestAttachHistoryHandler(unittest.TestCase):
 
     @mock.patch('awscli.customizations.history.sqlite3')
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
     def test_no_attach_history_handler_when_command_is_history(
-            self, mock_recorder, mock_db_sqlite3, mock_sqlite3):
+        self, mock_recorder, mock_db_sqlite3, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.return_value = {
             'cli_history': 'enabled'
@@ -87,10 +97,12 @@ class TestAttachHistoryHandler(unittest.TestCase):
 
     @mock.patch('awscli.customizations.history.sqlite3', None)
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
     def test_no_attach_history_handler_when_no_sqlite3(
-            self, mock_recorder, mock_sqlite3):
+        self, mock_recorder, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.return_value = {
             'cli_history': 'enabled'
@@ -101,18 +113,22 @@ class TestAttachHistoryHandler(unittest.TestCase):
 
         with mock.patch('sys.stderr', StringIO()) as mock_stderr:
             attach_history_handler(
-                session=mock_session, parsed_args=parsed_args)
+                session=mock_session, parsed_args=parsed_args
+            )
             self.assertIn(
-                'enabled but sqlite3 is unavailable', mock_stderr.getvalue())
+                'enabled but sqlite3 is unavailable', mock_stderr.getvalue()
+            )
         self.assertFalse(mock_recorder.add_handler.called)
         self.assertFalse(mock_sqlite3.connect.called)
 
     @mock.patch('awscli.customizations.history.sqlite3')
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
-    def test_create_directory_no_exists(self, mock_recorder,
-                                        mock_db_sqlite3, mock_sqlite3):
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
+    def test_create_directory_no_exists(
+        self, mock_recorder, mock_db_sqlite3, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.return_value = {
             'cli_history': 'enabled'
@@ -125,7 +141,8 @@ class TestAttachHistoryHandler(unittest.TestCase):
         db_filename = os.path.join(directory_to_create, 'name.db')
         with mock.patch('os.environ', {'AWS_CLI_HISTORY_FILE': db_filename}):
             attach_history_handler(
-                session=mock_session, parsed_args=parsed_args)
+                session=mock_session, parsed_args=parsed_args
+            )
             self.assertEqual(mock_recorder.add_handler.call_count, 1)
             # Is should create any missing parent directories of the
             # file as well.
@@ -134,13 +151,16 @@ class TestAttachHistoryHandler(unittest.TestCase):
 
     @mock.patch('awscli.customizations.history.sqlite3')
     @mock.patch('awscli.customizations.history.db.sqlite3')
-    @mock.patch('awscli.customizations.history.HISTORY_RECORDER',
-                spec=HistoryRecorder)
-    def test_profile_does_not_exist(self, mock_recorder, mock_db_sqlite3,
-                                    mock_sqlite3):
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
+    def test_profile_does_not_exist(
+        self, mock_recorder, mock_db_sqlite3, mock_sqlite3
+    ):
         mock_session = mock.Mock(Session)
         mock_session.get_scoped_config.side_effect = ProfileNotFound(
-            profile='no-exist')
+            profile='no-exist'
+        )
 
         parsed_args = argparse.Namespace()
         parsed_args.command = 'configure'
@@ -149,13 +169,35 @@ class TestAttachHistoryHandler(unittest.TestCase):
         self.assertFalse(mock_recorder.add_handler.called)
         self.assertFalse(mock_db_sqlite3.connect.called)
 
+    @mock.patch('awscli.customizations.history.DatabaseConnection')
+    @mock.patch(
+        'awscli.customizations.history.HISTORY_RECORDER', spec=HistoryRecorder
+    )
+    def test_warning_when_database_connection_fails(
+        self, mock_recorder, mock_db_connection
+    ):
+        mock_db_connection.side_effect = Exception('Permission denied')
+        mock_session = mock.Mock(Session)
+        mock_session.get_scoped_config.return_value = {
+            'cli_history': 'enabled'
+        }
+
+        parsed_args = argparse.Namespace()
+        parsed_args.command = 's3'
+
+        with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
+            attach_history_handler(
+                session=mock_session, parsed_args=parsed_args
+            )
+            self.assertIn('Warning: Unable to record CLI history', mock_stderr.getvalue())
+        self.assertFalse(mock_recorder.add_handler.called)
+
 
 class TestAddHistoryCommand(unittest.TestCase):
     def test_add_history_command(self):
         command_table = {}
         mock_session = mock.Mock(Session)
-        add_history_commands(
-            command_table=command_table, session=mock_session)
+        add_history_commands(command_table=command_table, session=mock_session)
         self.assertIn('history', command_table)
         self.assertIsInstance(command_table['history'], HistoryCommand)
 
-- 
2.53.0

