From fe0eaff75d61a78801b7921517a78c4d9d41a082 Mon Sep 17 00:00:00 2001 From: LcfherShell Date: Tue, 3 Jun 2025 17:26:42 +0700 Subject: [PATCH 1/2] Upload file baru --- README.md | 333 ++++++- main.py | 1175 ------------------------- memoryawarestruct/__init__.py | 6 + memoryawarestruct/core/__init__.py | 4 + memoryawarestruct/core/memory.py | 1163 ++++++++++++++++++++++++ memoryawarestruct/test/test.py | 78 ++ memoryawarestruct/utils/__init__.py | 2 + memoryawarestruct/utils/middleware.py | 353 ++++++++ memoryawarestruct/utils/typedata.py | 768 ++++++++++++++++ 9 files changed, 2676 insertions(+), 1206 deletions(-) delete mode 100644 main.py create mode 100644 memoryawarestruct/__init__.py create mode 100644 memoryawarestruct/core/__init__.py create mode 100644 memoryawarestruct/core/memory.py create mode 100644 memoryawarestruct/test/test.py create mode 100644 memoryawarestruct/utils/__init__.py create mode 100644 memoryawarestruct/utils/middleware.py create mode 100644 memoryawarestruct/utils/typedata.py diff --git a/README.md b/README.md index 4fb5c57..646d495 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,319 @@ -# MemoryAwareStruct -MemoryAwareStruct is a class designed to manage data in the form of a dictionary in a safe and efficient way in terms of memory usage. It also supports multi-threaded access. +# 📚 MemoryAwareStruct Documentation -## Features +## 🔒 Overview -1. Memory Management: MemoryAwareStruct features effective memory management, so as to prevent memory exhaustion and ensure that the stored data does not 2. exceed the specified memory limit. -3. Multi-Threaded Access: MemoryAwareStruct can be accessed by multiple threads securely, so it can be used in applications that require fast and secure data access. -4. Data Management: This class has effective data management features, making it easy to manage stored data. +MemoryAwareStruct is a secure and protected data structure system with high-level protection against unauthorized modification. This library provides the `memory` class that can protect attributes and methods from direct access, with various integrated security mechanisms. -## Use Cases +## ✨ Key Features -MemoryAwareStruct can be used as temporary data storage, such as: +- 🛡️ **Attribute Protection**: Prevent direct modification of attributes from outside the class +- 🔐 **Dictionary Protection**: Protect `__dict__` access with an authorization system +- 🎯 **Method Registry**: Protect important methods from being overridden +- 🔄 **Smart Conversion**: Automatic conversion of mixed lists to dictionaries +- 💾 **Backup System**: Backup and restore system for data safety +- 🏗️ **Factory Pattern**: memory builder with special configuration +--- -1. Cache: MemoryAwareStruct can be used as a cache to store frequently accessed data. -2. Session Data: MemoryAwareStruct can be used to store session data that does not need to be stored persistently. -3. Temporary Data: MemoryAwareStruct can be used to store data that only needs to be stored temporarily. +## 📖 Class Documentation -## Usage +### 🏛️ Class `memory` +The main class that provides secure data structures with various protection mechanisms. + +#### Constructor + +```python +memory( + __config_id: str = "default", + __allow_unsafe_operations: bool = False, + __dict_protection: bool = True, + __attr_protection: bool = True, + **entries: Union[int, str, float, list, tuple, dict, bool, bytes] +) +``` + +**Parameters:** +- `__config_id`: Unique identifier for the instance configuration +- `__allow_unsafe_operations`: Allow unsafe operations (not recommended) +- `__dict_protection`: Enable dictionary protection +- `__attr_protection`: Enable attribute protection +- `**entries`: Initial data for memory + +#### 🔧 Properties & Methods + +##### Properties +```python +config # Get a configuration instance +``` + +#### Safe Operations Methods +```python +def safe_get(key: str, default: Any = None) -> Any +"""Safely get an attribute value""" + +def safe_set(key: str, value: Any, allow_override: bool = True) -> bool +"""Safely set an attribute value""" + +def dell_dict(params: str) -> bool +"""Safely delete a dictionary key""" +``` + +##### Dictionary Operations +```python +@property +def update_dict # Getter for update dictionary + +@update_dict.setter +def update_dict(dict_new: Union[dict, list, tuple]) -> None +"""Update an existing dictionary""" + +@property +def insert_dict # Getter for insert dictionary + +@insert_dict.setter +def insert_dict(dict_new: Union[dict, list, tuple]) -> None +"""Adds a new key to the dictionary""" +``` + +##### Backup & Restore Methods +```python +def restore_backup() -> bool +"""Restores data from the last backup""" + +def reset_to_original() -> None +"""Resets to the original data at creation""" +``` + +##### Utility Methods +```python +def get_user_attributes() -> dict +"""Gets only user-defined attributes""" + +def get_protection_status() -> dict +"""Gets the protection status of a struct""" + +def set_struct_name(name: str) -> None +"""Sets the struct name for this instance""" + +def get_struct_name() -> str +"""Getting struct name""" +``` + +#### 📝 Usage Examples + +##### Basic Usage +```python +# Create a simple memory +person = memory(name="John", age=30, city="Jakarta") +print(person) # memory('John', 'age(30)', 'city(Jakarta)') + +# Accessing data +print(person.name) # "John" +print(person["age"]) # 30 +``` + +##### Safe Operations +```python +# Using safe methods +person.safe_set("email", "john@example.com") +old_city = person.safe_get("city", "Unknown") + +# Update multiple values +person.update_dict = {"age": 31, "city": "Bandung"} + +# Insert new values +person.insert_dict = {"phone": "08123456789", "country": "Indonesia"} +``` + +##### Working with Nested Data +```python +# Nested structure +company = memory( + "company1", + name="TechCorp", + employees=[ + {"name": "Alice", "role": "Developer"}, + {"name": "Bob", "role": "Designer"} + ], + location={"city": "Jakarta", "country": "Indonesia"} +) + +print(company.location.city) # "Jakarta" +``` + +--- + +### 🏭 Function ` create_secure_memory` + +Factory function to create a Struct with a special configuration. + +```python +def create_secure_memory( + config_id: str = "default", + struct_name: str = "Struct", + dict_protection: bool = True, + attr_protection: bool = True +) -> callable ``` -def functions(): - return 3 -memory = MemoryAwareStruct() -memory.insert = {"key2": open("index.py", "rb").read()} -memory.update = {"key2": ["value2", 1, "ffffffffff"]} -memory -memory.get("key2") -memory.pop("key2") -memory.insert_function("functions", functions) -memory.clear() +**Parameters:** +- `config_id`: Unique configuration ID +- `struct_name`: Default name for struct +- `dict_protection`: Enable dictionary protection +- `attr_protection`: Enable attribute protection + +**Returns:** +- Factory function that can be called to create struct + +#### 📝 Usage Example + +```python +# Create a factory with a custom configuration +UserFactory = create_secure_memory( +config_id="user_config", +struct_name="User", +dict_protection=True, +attr_protection=True +) + +# Using factory +user1 = UserFactory(name="Alice", role="Admin") +user2 = UserFactory(name="Bob", role="User") + +print(user1) # User('Alice', 'role(Admin)') ``` -## Keep in mind +--- + +### 🎭 Class ` SecureMemoryContext` + +Context manager for making temporary changes with security controls. + +```python +class SecureMemoryContext: +def __init__(self, struct_instance, allow_unsafe: bool = False) +``` + +**Parameters:** +- `struct_instance`: Struct instance to modify +- `allow_unsafe`: Allow unsafe operations temporarily + +#### 📝 Usage Example + +```python +person = Struct("temp", name="John", age=30) + +# Using context manager (DISABLED for security) +with SecureMemoryContext(person, allow_unsafe=True) as temp_struct: +# Operations that are normally blocked may be allowed +# (However this feature is disabled for security) +pass + +# After leaving the context, protection is back on +``` + +--- + +## 🔐 Security Features -MemoryAwareStruct is not designed for persistent data storage. -If you need persistent data storage, then you may need to use another data storage solution, such as a database or file system. +### Attribute Protection +- ✅ Prevent direct modification of user attributes +- ✅ Protect internal attributes from external access +- ✅ Authorization system for internal methods + +### Dictionary Protection +- ✅ Block dictionary-style operations (`obj[key] = value`) +- ✅ Protected dictionary with lock system + +### Method Protection +- ✅ Registry methods to prevent replacement +- ✅ Protect important methods from replacement +- ✅ Validate callers for internal operations + +--- + +## ⚠️ Important Notes + +### Security +- 🚫 Dictionary unlock is permanently disabled +- 🚫 Context manager unsafe operations are disabled +- 🚫 Maintenance unlock is disabled for security +- ✅ All modifications must be made through safe methods + +### Best Practices +- 🎯 Always use `safe_set()` for attribute modification +- 🔄 Use `update_dict` to update existing data +- ➕ Use `insert_dict` to add new data +- 💾 Take advantage of backup system for data safety + +### Error Handling +```python +try: + person.name = "Direct modification" # Will error +except AttributeError as e: + print(f"Modification blocked: {e}") + +# The correct way +success = person.safe_set("name", "Safe modification") +if success: + print("Modification successful") +# Or use insert_dict or update_dict +person.update_dict = {"name": "Safe modification"} +``` + +--- + +## 🚀 Advanced Usage + +### Custom Configuration +```python +# Create custom configurations +AdminFactory = create_secure_memory( + config_id="admin_system", + struct_name="AdminUser", + dict_protection=True, + attr_protection=True +) + +admin = AdminFactory( + username="admin", + permissions=["read", "write", "delete"], + settings={"theme": "dark", "notifications": True} +) +``` + +### Monitoring Protection Status +```python +status = person.get_protection_status() +print(f"Protected attributes: {status['protected_attrs']}") +print(f"Protected methods: {status['protected_methods']}") +print(f"Dictionary protection: {status['dict_protection']}") +``` + +### Working with Complex Data +```python +# Smart list to dict conversion +data = Struct("complex", +users=[ +["admin", {"role": "administrator", "active": True}], +["user1", {"role": "user", "active": False}] +] +) + +# Automatically converted to an accessible structure +print(data.users.admin.role) # "administrator" +``` -## Advantages +--- -1. Effective Memory Management: MemoryAwareStruct can prevent memory exhaustion and ensure that the stored data does not exceed the specified memory limit. -2. Secure Multi-Threaded Access: MemoryAwareStruct can be accessed by multiple threads securely, so it can be used in applications that require fast and secure data access. -3. Effective Data Management: This class features effective data management, making it easy to manage the stored data. +## 📋 Summary -## Disadvantages +MemoryAwareStruct provides a comprehensive solution for secure data structures with: -1. Not Designed for Persistent Data Storage: MemoryAwareStruct is not designed for persistent data storage, so it cannot be used as a persistent data storage solution. +- 🛡️ **Multi-layer Protection**: Protection at attribute, method, and dictionary levels +- 🔒 **Security First**: Security as the top priority with unsafe features disabled +- 🎯 **Safe Operations**: Safe and intuitive API for data manipulation +- 🏭 **Flexible Factory**: Struct creation with customizable configurations +- 💾 **Data Integrity**: Backup and restore system to maintain data integrity +This library is ideal for applications that require data structures with high levels of security and strict access control. \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 5424e90..0000000 --- a/main.py +++ /dev/null @@ -1,1175 +0,0 @@ -import threading -import json -import sys -import re -import psutil -import asyncio -import time -import signal - -try: - import resource -except: - resource = None - -import os - -version = int(str(sys.version_info.major) + str(sys.version_info.minor)) -if version > 39: - from typing import Any, Dict, Union, TypeAlias - - class SelectType: - Union_: TypeAlias = Union[Dict, str, tuple, int, float, bool] - String_: TypeAlias = str - Any_: TypeAlias = Any - Dict_: TypeAlias = dict - Numeric_: TypeAlias = Union[int, float] - Boolean_: TypeAlias = bool - List_: TypeAlias = Union[list, tuple] - -else: - from typing import Any, Dict, Union - - class SelectType: - Union_ = Union[Dict, str, tuple, int, float, bool] - String_ = str - Any_ = Any - Dict_ = dict - Numeric_ = Union[int, float] - Boolean_ = bool - List_ = Union[list, tuple] - -class AwareData: - def __init__(self, entries: Union[Dict[str, Any], Any]): - """Initialize AwareData with the provided entries. - - Args: - entries (Union[Dict[str, Any], Any]): A dictionary of initial entries - or a single value (int or str). - """ - self._data = entries - - def __setitem__(self, key: str, value: Any) -> None: - """Block direct modification of the data. - - Raises: - AttributeError: If an attempt is made to modify the data. - """ - raise AttributeError( - "Direct modification is not allowed. Use the update() method." - ) - - def __delitem__(self, key: str) -> None: - """Block direct deletion of the data. - - Raises: - AttributeError: If an attempt is made to delete an item from the data. - """ - raise AttributeError("Direct deletion is not allowed. Use the update() method.") - - def __dir__(self): - """Block the dir() function. - - Raises: - AttributeError: If dir() is called on this class. - """ - raise AttributeError("The use of dir() on this class is not allowed.") - - def __repr__(self): - """Return a string representation of the stored data.""" - return repr(self._data) - -class ReadOnlyJSON: - def __init__(self, initial_data: SelectType.Any_) -> None: - """ - Initialize the ReadOnlyJSON with a dictionary. - - Args: - initial_data (Any): The initial data to be secured. - """ - # Store a deep copy of the initial data to prevent external modifications - try: - self._data = json.loads(json.dumps(initial_data)) - except: - self._data = initial_data - - def __setitem__(self, key: str, value: any) -> None: - raise AttributeError("Direct modification is not allowed.") - - def __delitem__(self, key: str) -> None: - raise AttributeError("Direct deletion is not allowed.") - - def __dir__(self): - """Block the dir() function.""" - raise AttributeError("The use of dir() on this class is not allowed.") - - @property - def data(self) -> Dict[str, Any]: - """ - Get a copy of the stored JSON data. - - Returns: - Dict[str, Any]: A deep copy of the internal data. - """ - return json.loads(json.dumps(self._data)) - - @property - def to_json(self) -> SelectType.String_: - """ - Convert the stored data to a JSON string. - - Returns: - str: JSON representation of the internal data. - """ - return json.dumps(self._data) - - def __repr__(self) -> SelectType.String_: - """ - Return a string representation of the ReadOnlyJSON object. - - Returns: - str: String representation. - """ - return f"ReadOnlyJSON({self.to_json})" - - def __str__(self) -> SelectType.String_: - return f"ReadOnlyJSON({self.to_json})" - - -class RestrictedDict: - """A dictionary that restricts certain keys and only allows specific operations.""" - - def __init__(self, **entries: SelectType.Dict_): - self._data = {} - self.mainsession = {} - for key, value in entries.items(): - if not self.is_restricted(key): - self._data[key] = value - else: - raise KeyError(f"The key '{key}' is restricted.") - - def __setitem__(self, key: str, value: any) -> None: - raise AttributeError( - "Direct modification is not allowed. Use the update() method." - ) - - def __getitem__(self, key: str) -> any: - return self._data[key] - - def __delitem__(self, key: str) -> None: - raise AttributeError("Direct deletion is not allowed. Use the update() method.") - - def __contains__(self, key: str) -> bool: - return key in self._data - - def __dir__(self): - """Block the dir() function.""" - raise AttributeError("The use of dir() on this class is not allowed.") - - def __contains__(self, key: SelectType.String_) -> SelectType.Boolean_: - return key in self._data - - def keys(self): - return self._data.keys() - - def items(self): - return self._data.items() - - def values(self): - return self._data.values() - - def is_restricted(self, key: SelectType.String_) -> SelectType.Boolean_: - """Defines restricted keys.""" - return key in ["__struct_name", "__lock"] - - def pop( - self, key: SelectType.String_, default: SelectType.Any_ = None - ) -> SelectType.Any_: - """Remove a key and return its value or a default value.""" - if self.is_restricted(key): - raise KeyError(f"The key '{key}' is restricted.") - return self._data.pop(key, default) - - def update(self, other: SelectType.Dict_) -> None: - """Update the dictionary with the provided key-value pairs.""" - for key, value in other.items(): - if isinstance(value, dict): - # If a branch is a dictionary, wrap it in RestrictedDict - value = RestrictedDict(**value) - if self.is_restricted(key): - raise KeyError(f"The key '{key}' is restricted.") - self._data[key] = value # Use internal storage - - def clear(self): - self._data.clear() - - def __repr__(self) -> SelectType.String_: - return f"{self._data}" - - def get(self, key: SelectType.String_, default: Any = None): - """Retrieve items matching the given pattern or string.""" - if key.startswith("%") and key.endswith("%"): - # Convert SQL LIKE to regex - regex_pattern = key[1:-1].replace("%", ".*") # Mengganti % dengan .* - regex_pattern = regex_pattern.replace("?", ".") # Mengganti ? dengan . - regex = re.compile(f"^{regex_pattern}$") - - # Find the first item that matches - for key, value in self.items(): - if regex.match(key): - return value # Return the first matching value - - return self._data.get(key, default) # Return default if no match is found - - -memory_warning_triggered: SelectType.Boolean_ = False -max_memory_usage: SelectType.Numeric_ = 0 - - -def MemoryUsage(): - global max_memory_usage, memory_warning_triggered - total_memory = psutil.virtual_memory().total - satuan = ["bytes", "KB", "MB", "GB"] - i = 0 - while total_memory >= 1024 and i < len(satuan) - 1: - total_memory /= 1024 - i += 1 - print(f"Total memory: {total_memory:.2f} {satuan[i]}") - - -def getMemory(total_memory: SelectType.Numeric_) -> SelectType.String_: - satuan = ["bytes", "KB", "MB", "GB"] - i = 0 - while total_memory >= 1024 and i < len(satuan) - 1: - total_memory /= 1024 - i += 1 - return satuan[i] - - -class MemoryAwareStruct(SelectType): - """ - A class designed to manage structured data with memory awareness. - - This class allows for the secure manipulation of data in a dictionary-like structure - while ensuring that memory usage is monitored and controlled. The data can be accessed - and functions can be executed dynamically, with thread safety in mind. - - Args: - memory_default (int, optional): An optional parameter to specify the default maximum - memory usage for this instance. If not provided, - the instance will rely on global memory settings. - **entries (SelectType.Dict_): Key-value pairs to initialize the internal data structure. - This allows for flexible initialization with multiple entries. - """ - __slots__: SelectType.List_ = [ - "__struct_name", - "_lock", - "__data", - "max_memory_usage", - "memory_warning_triggered", - ] - - def __init__(self, memory_default: int = None, **entries: SelectType.Dict_) -> None: - """ - Initializes the MemoryAwareStruct with optional memory constraints and initial entries. - - This constructor sets up the instance of the MemoryAwareStruct class, allowing for the - configuration of memory usage limits and populating the internal data structure with - provided entries. It also handles thread safety and initializes a global memory - monitoring system. - - Args: - memory_default (int, optional): An optional parameter to specify the default maximum - memory usage for this instance. If not provided, - the instance will rely on global memory settings. - **entries (SelectType.Dict_): Key-value pairs to initialize the internal data structure. - This allows for flexible initialization with multiple entries. - - Behavior: - - Initializes the instance variable __struct_name with the name of the class. - - If a memory warning has not been triggered, it invokes MemoryUsage() to start monitoring. - - If memory_default is provided, it sets the instance's max_memory_usage and initializes - memory_warning_triggered to False. - - Creates an instance of RestrictedDict to hold the provided entries, ensuring that only - allowed operations can be performed on the data. - - Assigns a threading lock (self.__data.mainsession) to the __data attribute to manage concurrency - and ensure thread-safe operations. - - If the instance does not specify a max_memory_usage, it uses a global memory limit. - - If there are existing entries in the internal data structure, it adjusts the max_memory_usage - by deducting the size of the currently stored data. - - Raises: - ValueError: If the provided entries exceed the allowed memory limits when the instance is created.""" - global max_memory_usage, memory_warning_triggered - self.__struct_name = self.__class__.__name__ # Private variable - if memory_warning_triggered == False: - MemoryUsage() - - if memory_default: # Memisahkan memori instance dari memori global - self.max_memory_usage: SelectType.Numeric_ = memory_default - self.memory_warning_triggered: SelectType.Boolean_ = False - - self.__data = RestrictedDict(**entries) # Gunakan RestrictedDict - self.__data.mainsession = threading.Lock() # Lock untuk concurrency - - # Jika instance tidak memiliki batas memori, gunakan batas memori global - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - if self.__data._data.__len__() > 0: - self.max_memory_usage = self.max_memory_usage - self.__get_total_size__( - self.__data._data - ) - # Kurangi batas memori global dengan memori instance - # if not memory_warning_triggered and max_memory_usage: - # max_memory_usage = max_memory_usage - self.max_memory_usage - # elif max_memory_usage<=0: - # max_memory_usage = self.__get_max_allowed_memory__() - self.max_memory_usage - - def __setattr__(self, name: SelectType.String_, value: SelectType.Any_) -> None: - if name in ["__dict__"]: - raise AttributeError( - f"Direct updates to '{name}' are not allowed. Please use insert or update." - ) - super().__setattr__(name, value) - - def __delattr__(self, name: SelectType.String_) -> None: - """Prevent deletion of __dict__.""" - if name in ["__dict__", "__struct_name"]: - raise AttributeError(f"Deleting '{name}' is not allowed.") - super().__delattr__(name) - - def __dir__(self): - """Block the dir() function.""" - raise AttributeError("The use of dir() on this class is not allowed.") - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type: - print(f"Error: {exc_val}") - else: - print("No errors occurred.") - - @property - def __dict__(self): - raise AttributeError("Cannot access __dict__ because not allowed.") - - @property - def struct_name(self) -> SelectType.String_: - """ - Property to retrieve the structure name. - - This property returns the name of the structure stored in the instance. - It ensures thread-safe access to the structure name. - - Returns: - SelectType.String_: The name of the structure. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe access when reading the structure name. - """ - with self.__data.mainsession: # Lock saat akses - return self.__struct_name - - - def set_name(self, params: SelectType.String_) -> None: - """ - Function to set the name of the structure. - - This function allows setting the name of the structure only once. If the structure name is - currently set to "Struct", it can be modified. Otherwise, a ValueError is raised. - - Args: - params (SelectType.String_): The new name to set for the structure. - - Raises: - ValueError: If the structure name has already been set to a value other than "Struct". - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe modification of the structure name. - - Checks if the current structure name is "Struct" before allowing the modification. - """ - with self.__data.mainsession: # Lock saat modifikasi - if ( - self.__struct_name == "Struct" and self.__struct_name - ): # Can only set once - self.__struct_name = params - else: - raise ValueError("Struct name can only be set once.") - - - def get( - self, key: SelectType.String_, default: SelectType.Any_ = None - ) -> SelectType.Any_: - """ - Function to retrieve a value from the dictionary based on the provided key. - - This function returns the value associated with the specified key in the dictionary. - If the key does not exist, it returns a default value. - - Args: - key (SelectType.String_): The key whose value is to be retrieved from the dictionary. - default (SelectType.Any_, optional): The value to return if the key is not found. Default is None. - - Returns: - SelectType.Any_: The value associated with the key, or the default value if the key is not found. - - Behavior: - - Utilizes a lock (`self.__data.mainsession`) to ensure thread-safe access when reading data. - """ - with self.__data.mainsession: # Lock saat membaca data - data = self.__data.get(key, default) - if isinstance(data, (dict, tuple, list)): - return AwareData(data) - return data - - @property - def update(self) -> None: - pass - - - @update.setter - def update(self, dict_new: SelectType.Dict_) -> None: - """ - Function to update values in the dictionary based on the provided new dictionary. - - This function updates existing keys in the dictionary with new values from the provided dictionary. - It checks for memory limits before updating the dictionary. - - Args: - dict_new (SelectType.Dict_): The new dictionary containing values to update in the existing dictionary. - - Raises: - TypeError: If dict_new is not of dictionary type. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe modification of the dictionary. - - Calculates the total memory usage before and after the update to prevent exceeding memory limits. - - If memory is full or the new memory exceeds the maximum allowed, a warning is printed and the update is restricted. - """ - global max_memory_usage, memory_warning_triggered - if isinstance(dict_new, self.Dict_): - with self.__data.mainsession: # Lock saat modifikasi dictionary - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__(dict_new) - - # Hitung memori total setelah insert - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ): - for key in dict_new.keys(): - if key in self.__data: - # Jika key sudah ada, lakukan update - old_value = self.__data[key] - old_value_size = sys.getsizeof(old_value) - new_value_size = sys.getsizeof(dict_new) - # Jika instance tidak memiliki batas memori, gunakan batas memori global - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage += old_value_size - new_value_size - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - - self.max_memory_usage - ) - time.sleep(0.02) - if self.__check_max_memory_usage__() > 0: - self.__data.update( - {key: dict_new[key]} - ) # Gunakan RestrictedDict - - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - self.memory_warning_triggered = True - print("Warning: Memory full, updates restricted!") - else: - raise TypeError("Not Type Dict Error") - - - async def async_update(self, dict_new: SelectType.Dict_) -> None: - """ - Asynchronous function to update values in the dictionary based on the provided new dictionary. - - This function updates existing keys in the dictionary with new values from the provided dictionary asynchronously. - It checks for memory limits before updating the dictionary. - - Args: - dict_new (SelectType.Dict_): The new dictionary containing values to update in the existing dictionary. - - Raises: - TypeError: If dict_new is not of dictionary type. - - Behavior: - - Uses an asynchronous lock to ensure thread-safe modification of the dictionary. - - Calculates the total memory usage before and after the update to prevent exceeding memory limits. - - If memory is full or the new memory exceeds the maximum allowed, a warning is printed and the update is restricted. - """ - global max_memory_usage, memory_warning_triggered - - if isinstance(dict_new, self.Dict_): - async with asyncio.Lock(): # Menggunakan Lock saat modifikasi dictionary - # Kunci lock untuk memastikan hanya satu thread yang dapat mengakses data - with self.__data.mainsession: - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__(dict_new) - - # Hitung memori total setelah update - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ) and self.__can_insert_or_update__(new_dict_size): - await asyncio.sleep( - 1 - ) # Simulasi penundaan untuk operasi asinkron - for key in dict_new.keys(): - if key in self.__data: - # Jika key sudah ada, lakukan update - old_value = self.__data[key] - old_value_size = sys.getsizeof(old_value) - new_value_size = sys.getsizeof(dict_new[key]) - - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage += ( - old_value_size - new_value_size - ) - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - - self.max_memory_usage - ) - time.sleep(0.02) - if self.__check_max_memory_usage__() > 0: - self.__data.update({key: dict_new[key]}) - - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - # print(self.max_memory_usage, self.memory_warning_triggered) - self.memory_warning_triggered = True - - print("Warning: Memory full, updates restricted!") - - else: - raise TypeError("Not Type Dict Error") - - @property - def insert(self): - pass - - - @insert.setter - def insert(self, dict_new: SelectType.Dict_) -> None: - """ - Function to insert values into the dictionary based on the provided new dictionary. - - This function adds new items to the existing dictionary from the provided dictionary. - It checks for memory limits before inserting new items. - - Args: - dict_new (SelectType.Dict_): The new dictionary containing values to be inserted into the existing dictionary. - - Raises: - TypeError: If dict_new is not of dictionary type. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe modification of the dictionary. - - Calculates the total memory usage before and after the insertion to prevent exceeding memory limits. - - If memory is full or the new memory exceeds the maximum allowed, a warning is printed and the insertion is restricted. - """ - global max_memory_usage, memory_warning_triggered - if isinstance(dict_new, self.Dict_): - with self.__data.mainsession: # Lock saat modifikasi dictionary - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__(dict_new) - - # Hitung memori total setelah insert - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ) and self.__can_insert_or_update__(new_dict_size): - self.__data.update(dict_new) # Menggunakan RestrictedDict - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage -= new_dict_size - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - self.max_memory_usage - ) - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - self.memory_warning_triggered = True - else: - raise TypeError("Not Type Dict Error") - - - async def async_insert(self, dict_new: SelectType.Dict_) -> None: - """ - Asynchronous function to insert values into the dictionary based on the provided new dictionary. - - This function adds new items to the existing dictionary from the provided dictionary asynchronously. - It checks for memory limits before inserting new items. - - Args: - dict_new (SelectType.Dict_): The new dictionary containing values to be inserted into the existing dictionary. - - Raises: - TypeError: If dict_new is not of dictionary type. - - Behavior: - - Uses an asynchronous lock to ensure thread-safe modification of the dictionary. - - Calculates the total memory usage before and after the insertion to prevent exceeding memory limits. - - If memory is full or the new memory exceeds the maximum allowed, a warning is printed and the insertion is restricted. - """ - global max_memory_usage, memory_warning_triggered - if isinstance(dict_new, self.Dict_): - async with asyncio.Lock(): # Menggunakan Lock saat modifikasi dictionary - # Kunci lock untuk memastikan hanya satu thread yang dapat mengakses data - with self.__data.mainsession: # Lock saat modifikasi dictionary - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__(dict_new) - - # Hitung memori total setelah insert - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ) and self.__can_insert_or_update__(new_dict_size): - await asyncio.sleep( - 1 - ) # Simulasi penundaan untuk operasi asinkron - self.__data.update(dict_new) # Menggunakan RestrictedDict - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage -= new_dict_size - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - - self.max_memory_usage - ) - - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - # print(self.max_memory_usage, self.memory_warning_triggered) - self.memory_warning_triggered = True - - else: - raise TypeError("Not Type Dict Error") - - - def insert_function(self, key: SelectType.String_, func: SelectType.Any_) -> None: - """ - Function to insert a callable function into the dictionary, with memory usage checks. - - This function inserts a key-function pair into the dictionary, provided that the function - is callable and memory constraints are not exceeded. - - Args: - key (SelectType.String_): The key to associate with the function. - func (SelectType.Any_): The function to be inserted into the dictionary. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe access to the dictionary - during the insert operation. - - Calculates the memory usage of the current dictionary and the potential new size after - the insertion. - - Checks if the memory limit is exceeded, and whether inserting the new function is allowed - based on the available memory. - - If the insertion is possible, the function is added to the dictionary, and memory usage is - adjusted. - - If the memory usage exceeds the limit, a warning (`memory_warning_triggered`) is raised. - - Raises a `TypeError` if the provided function is not callable. - - Raises: - TypeError: If `func` is not a callable function. - """ - global max_memory_usage, memory_warning_triggered - if callable(func): - with self.__data.mainsession: # Lock saat menambahkan fungsi - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__({key: func}) - - # Hitung memori total setelah insert - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ) and self.__can_insert_or_update__(new_dict_size): - self.__data[key] = func # Menyimpan fungsi dalam RestrictedDict - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage -= new_dict_size - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - self.max_memory_usage - ) - - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - self.memory_warning_triggered = True - else: - raise TypeError("The parameter must be a callable function.") - - - async def async_insert_function( - self, key: SelectType.String_, func: SelectType.Any_ - ) -> None: - """ - Asynchronous function to insert a key-function pair into the dictionary, with memory usage checks. - - This function inserts a function into the dictionary, provided the memory constraints are met and - the function is callable. It also tracks memory usage and triggers warnings when necessary. - - Args: - key (SelectType.String_): The key to associate with the function. - func (SelectType.Any_): The function to be inserted into the dictionary. - - Behavior: - - Uses `self.__data.mainsession` to ensure thread-safe access to the dictionary during the insert operation. - - Calculates the potential memory usage after insertion and checks if it is below the allowed memory threshold. - - If memory usage is within the limit, the function is inserted into the dictionary. - - If memory exceeds the limit, a memory warning is triggered and the insertion is prevented. - - Simulates asynchronous operation with `await asyncio.sleep(0.001)`. - - Raises: - TypeError: If `func` is not a callable function. - """ - global max_memory_usage, memory_warning_triggered - if callable(func): - async with asyncio.Lock(): # Menggunakan Lock saat modifikasi dictionary - # Kunci lock untuk memastikan hanya satu thread yang dapat mengakses data - with self.__data.mainsession: # Lock saat modifikasi dictionary - current_dict_size = self.__get_total_size__() - new_dict_size = self.__get_total_size__({key: func}) - - # Hitung memori total setelah insert - potential_used_memory = current_dict_size + new_dict_size - time.sleep(0.001) - if ( - self.__h_Data__() - and not self.__is_memory_full__() - and potential_used_memory < self.__check_max_memory_usage__() - and not self.__check_memory_warning_triggered__() - ) and self.__can_insert_or_update__(new_dict_size): - await asyncio.sleep( - 1 - ) # Simulasi penundaan untuk operasi asinkron - self.__data[key] = func # Menyimpan fungsi dalam RestrictedDict - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage = self.__get_max_allowed_memory__() - else: - self.max_memory_usage -= new_dict_size - if self.max_memory_usage <= 0: - self.max_memory_usage = 0 - max_memory_usage = ( - self.__get_max_allowed_memory__() - - self.max_memory_usage - ) - - else: - if not self.__get_attribute__("max_memory_usage"): - memory_warning_triggered = True - else: - # print(self.max_memory_usage, self.memory_warning_triggered) - self.memory_warning_triggered = True - # print(self.max_memory_usage, memory_warning_triggered) - else: - raise TypeError("Not Type Dict Error") - - - def pop(self, params: SelectType.String_) -> None: - """ - Function to remove a key from the dictionary, adjusting memory usage accordingly. - - This function removes an item from the dictionary if the key exists, and adjusts the memory usage - based on the size of the removed item. - - Args: - params (SelectType.String_): The key of the item to be removed. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe removal of the item. - - Calculates the size of the item to be removed and adjusts both the instance and global memory usage limits. - - If the key exists, it removes the item and prints "success", otherwise prints "failed". - """ - global max_memory_usage - with self.__data.mainsession: # Lock saat penghapusan data - if params in self.__data: - # kembalikan ukuran sesuai size dict dipop - curentsize_old = self.__get_total_size__(self.__data[params]) - time.sleep(0.001) - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage += curentsize_old - else: - self.max_memory_usage += curentsize_old - max_memory_usage += curentsize_old - time.sleep(0.02) - self.__data.pop(params) # Menggunakan pop dari RestrictedDict - print("success") - else: - print("failed") - - - async def async_pop(self, params: SelectType.String_) -> None: - """ - Asynchronous function to remove an item from the dictionary based on the given key. - - - This function uses an `self.__data.mainsession` to ensure that the dictionary is modified in a thread-safe manner. - - If the key exists in the dictionary, it removes the item and adjusts the memory usage accordingly. - - The function simulates a delay using `asyncio.sleep(0.001)` to represent asynchronous operations. - - Args: - params (SelectType.String_): The key of the item to be removed from the dictionary. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure that the dictionary is not modified by multiple threads concurrently. - - Checks if the key exists, and if found, calculates the memory size of the item to be removed and adjusts both the instance's and global memory usage limits. - - Removes the item from the dictionary using the `pop` method. - - If the key does not exist, it prints "failed". - """ - global max_memory_usage - async with asyncio.Lock(): # Menggunakan Lock saat modifikasi dictionary - # Kunci lock untuk memastikan hanya satu thread yang dapat mengakses data - with self.__data.mainsession: # Lock saat penghapusan data - if params in self.__data: - await asyncio.sleep(0.001) # Simulasi penundaan untuk operasi asinkro - - # kembalikan ukuran sesuai size dict dipop - curentsize_old = self.__get_total_size__(self.__data[params]) - if not self.__get_attribute__("max_memory_usage"): - max_memory_usage += curentsize_old - else: - self.max_memory_usage += curentsize_old - max_memory_usage += curentsize_old - time.sleep(0.02) - self.__data.pop(params) # Menggunakan pop dari RestrictedDict - print("success") - else: - print("failed") - - - def clear(self): - """ - Function to clear all items from the dictionary. - - - This function uses a lock (`self.__data.mainsession`) to ensure that only one thread can clear the dictionary at a time. - - Introduces a delay of 0.06 seconds to simulate the clearing operation. - - Behavior: - - Acquires the `self.__data.mainsession` lock to prevent simultaneous access to the dictionary. - - Clears all items from the dictionary using the `clear` method of `RestrictedDict`. - """ - with self.__data.mainsession: # Lock saat penghapusan data - time.sleep(0.06) - self.__data.clear() - - - def reset(self): - """ - Function to reset the dictionary by clearing all items. - - - Similar to `clear()`, but uses a shorter delay of 0.05 seconds to simulate a faster operation. - - This function also locks the dictionary during the reset operation to ensure thread safety. - - Behavior: - - Acquires the `self.__data.mainsession` lock to ensure thread safety. - - Clears the dictionary using the `clear` method, effectively resetting it. - """ - with self.__data.mainsession: # Lock saat penghapusan data - time.sleep(0.001) - self.__data.clear() - - - def execute_function( - self, key: SelectType.String_, *args, **kwargs - ) -> SelectType.Any_: - """ - Function to execute a callable function stored in the dictionary. - - This method retrieves a function by its key from the internal data structure - and executes it with the provided arguments. It supports both synchronous - and asynchronous functions. - - Args: - key (SelectType.String_): The key of the function to be executed. - *args: Positional arguments to be passed to the function. - **kwargs: Keyword arguments to be passed to the function. - - Returns: - SelectType.Any_: The result of the executed function. - - Raises: - KeyError: If the specified key is not found in the dictionary. - TypeError: If the retrieved item is not callable. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe execution of the function. - - Checks if the function associated with the key is callable and executes it with the provided arguments. - - If the function is asynchronous, it handles it accordingly using `asyncio.run()`. - """ - with self.__data.mainsession: # Lock saat eksekusi fungsi - if key in self.__data: - func = self.__data[key] - if callable(func): - if asyncio.iscoroutinefunction(func): - return asyncio.run( - func(*args, **kwargs) - ) # Handle async functions - return func(*args, **kwargs) # Handle sync functions - else: - raise TypeError(f"{key} is not a callable function.") - else: - raise KeyError(f"{key} is not found.") - - def json(self)->SelectType.Any_: - """ - Function to convert the internal dictionary to a JSON string. - - This method retrieves a copy of the internal dictionary and converts it to - a JSON-formatted string for easy serialization and transfer. - - Returns: - SelectType.Any_: A JSON string representation of the internal dictionary. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe reading of the data. - - Returns a copy of the internal data to avoid unintended modifications. - """ - with self.__data.mainsession: # Lock saat membaca data - return ReadOnlyJSON(self.__data._data.copy()) - - def from_json(self, json_data: SelectType.String_) -> None: - """ - Function to populate the internal dictionary using a JSON string. - - This method takes a JSON string, parses it, and inserts the resulting - dictionary into the internal data structure. - - Args: - json_data (SelectType.String_): A JSON string representing the data to be inserted. - - Behavior: - - Uses the `insert` setter to add the parsed data into the internal dictionary. - - Raises a TypeError if the JSON string cannot be parsed into a dictionary. - """ - data = json.loads(json_data) - self.insert = data - - def __repr__(self) -> SelectType.String_: - """ - Function to provide a string representation of the object. - - This method generates a string that includes the class name and its - attributes in a format suitable for debugging and logging. The - attributes are represented as key-value pairs, with the values - using their `repr` representation for more informative output. - - Returns: - SelectType.String_: A string representation of the object in the form - 'ClassName(key1=value1, key2=value2, ...)'. - - Behavior: - - Uses a lock (`self.__data.mainsession`) to ensure thread-safe - reading of the internal data. - - Iterates through the internal dictionary to construct the output - string, ensuring all items are included. - """ - with self.__data.mainsession: # Lock saat membaca data - output_dictory = tuple( - f"{k}={repr(v)}" # Using repr for more informative output - for k, v in self.__data.items() - ) - return f"{self.__struct_name}({', '.join(output_dictory)})" - - def __str__(self) -> SelectType.String_: - return self.__repr__() - - def __get_max_allowed_memory__(self) -> SelectType.Numeric_: - """Restores 3/4 of total remaining memory.""" - memory_info = psutil.virtual_memory() - memory_dict_size = self.__get_total_size__(self.__data._data) - if self.__get_attribute__("max_memory_usage") and not self.__get_attribute__( - "passessionX" - ): - self.max_memory_usage = self.max_memory_usage * 1.024 - self.passessionX = 1 - # Calculate nu based on max_memory_usage - nu = self.__get_attribute__("max_memory_usage", 0) + memory_dict_size - # Calculate the maximum allowed memory - max_allowed_memory = (memory_info.total * 0.75) - round(memory_dict_size) - nu - return round(max_allowed_memory * 1.024) - # return round( - # (((memory_info.total * 0.75) - round(memory_dict_size)) - nu) * 1.024 - # ) - - def __is_memory_full__(self) -> SelectType.Boolean_: - """Check if the memory is full.""" - memory_info = psutil.virtual_memory() - memory_dict_size = self.__get_total_size__(self.__data._data) - - if not self.__get_attribute__("max_memory_usage"): - used_memory = (memory_info.used - memory_info.available) + round( - memory_dict_size - ) - else: - used_memory = (memory_info.used - memory_info.available) + ( - round(memory_dict_size) + self.max_memory_usage - ) - return ( - (max_memory_usage <= 0) - or (used_memory >= max_memory_usage) - or (used_memory >= memory_info.total) - or self.__check_memory_warning_triggered__() - ) - - def __get_total_size__(self, data=None) -> SelectType.Numeric_: - """Function to get size of data.""" - if data is None: - data = self.__data._data - - total_size = 0 - seen = set() - - # Recursive inner function to help with caching - def _recursive_size(obj): - if id(obj) in seen: - return 0 - seen.add(id(obj)) - obj_size = sys.getsizeof(obj) - size = obj_size - - if hasattr(obj, "__dict__"): - for val in obj.__dict__.values(): - size += _recursive_size(val) - - elif hasattr(obj, "__iter__"): - if hasattr(obj, "keys"): # for dictionary - for key in obj: - size += _recursive_size(obj[key]) - elif not isinstance(obj, str): # Other iterable, not string - for item in obj: - size += _recursive_size(item) - - return size - - total_size = _recursive_size(data) - return total_size - - - def __check_max_memory_usage__(self): - """Restores the remaining allowed memory.""" - if self.__get_attribute__("max_memory_usage"): - return self.max_memory_usage - return max_memory_usage - - def __check_memory_warning_triggered__(self): - """Checks whether a memory warning has been triggered.""" - if self.__get_attribute__("max_memory_usage"): - return self.memory_warning_triggered - return memory_warning_triggered - - def __h_Data__(self): - sizemax = self.__check_max_memory_usage__() - getmeterBytes = getMemory(sizemax) - if getmeterBytes in ["bytes", "KB"]: - if getmeterBytes in ["KB"] and sizemax <= 120: - return False - elif getmeterBytes in ["bytes"]: - return False - else: - return True - return True - - def __get_attribute__( - self, attr_name: SelectType.String_, valuedef: SelectType.Any_ = None - ): - # Menggunakan getattr untuk mendapatkan atribut dengan nama yang diberikan - return getattr(self, attr_name, valuedef) - - def __can_insert_or_update__(self, size_to_add): - """Function to check if we can insert or update based on memory limits.""" - if self.__check_max_memory_usage__() - size_to_add <= 0: - print("Not enough memory to proceed with operation.") - return False - return True - - def exit_handler(self, signum, frame): - print("Exiting program...") - sys.exit(0) - - - def set_max_runtime(self, seconds: SelectType.Numeric_) -> None: - """ - Set a maximum runtime for the program. After the specified time (in seconds) has passed, - the program will trigger the `exit_handler` to terminate or perform a cleanup. - """ - if sys.platform == "win32": - # Windows - timer = threading.Timer(seconds, self.exit_handler) - timer.start() - else: - # Linux - signal.signal(signal.SIGALRM, self.exit_handler) - signal.alarm(seconds) - - - def set_max_memory_usage(self, megabytes: SelectType.Numeric_): - """ - Set a maximum memory usage limit for the program, specified in megabytes. If the program - exceeds this memory limit, the operating system will take appropriate action to terminate or restrict the process. - """ - - if sys.platform == "win32": - # Windows - import ctypes - - ctypes.cdll.msvcrt.setlimit(ctypes.c_int(2), megabytes * 1024 * 1024) - else: - # Linux - resource.setrlimit( - resource.RLIMIT_AS, (megabytes * 1024 * 1024, megabytes * 1024 * 1024) - ) - - -# method chaining -__all__ = ["MemoryAwareStruct"] diff --git a/memoryawarestruct/__init__.py b/memoryawarestruct/__init__.py new file mode 100644 index 0000000..0b94c2a --- /dev/null +++ b/memoryawarestruct/__init__.py @@ -0,0 +1,6 @@ +import os, sys +path, filename = os.path.split(__file__) +sys.path.insert(0, path) +from .utils.typedata import * +from .utils.middleware import ProtectedDict, ReadOnlyDictView, ProtectedAttribute, metaClass, replace_special_chars, smart_list_to_dict +from .core.memory import memory, create_secure_memory, SecureMemoryContext \ No newline at end of file diff --git a/memoryawarestruct/core/__init__.py b/memoryawarestruct/core/__init__.py new file mode 100644 index 0000000..8618f0a --- /dev/null +++ b/memoryawarestruct/core/__init__.py @@ -0,0 +1,4 @@ + +import os, sys +from .memory import memory, create_secure_memory, SecureMemoryContext +__all__ = ["memory", "create_secure_memory", "SecureMemoryContext"] \ No newline at end of file diff --git a/memoryawarestruct/core/memory.py b/memoryawarestruct/core/memory.py new file mode 100644 index 0000000..fc9b0bf --- /dev/null +++ b/memoryawarestruct/core/memory.py @@ -0,0 +1,1163 @@ +import os, re, abc, copy, inspect +import difflib +from threading import Lock +import sys, json +import logging +try: + from memoryawarestruct.utils.typedata import TypeVar, Generic, Any, Union, Optional, Set, OrderedDict + from memoryawarestruct.utils.middleware import ProtectedDict, ReadOnlyDictView, metaClass, replace_special_chars, smart_list_to_dict +except: + from utils.typedata import TypeVar, Generic, Any, Union, Optional, Set, OrderedDict + from utils.middleware import ProtectedDict, ReadOnlyDictView, metaClass, replace_special_chars, smart_list_to_dict + + +__all__ = ["memory", "create_secure_memory", "SecureMemoryContext"] + +# Membuat konfigurasi logger yang lebih canggih tanpa FileHandler +def setup_logger(name): + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) # Set level logging ke DEBUG + + # Membuat format log + formatter = logging.Formatter('%(levelname)s - %(message)s') + + # Handler untuk menampilkan log ke konsol + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.CRITICAL) # Menonaktifkan semua log, hanya menunjukkan CRITICAL + console_handler.setFormatter(formatter) + + # Menambahkan handler ke logger + logger.addHandler(console_handler) + + return logger + +logger = setup_logger('MemoryAwareStruct') +def __main(): + + SecureStructMeta = metaClass() + class SelectType: + Union_ = Union[int, str, float, list, tuple, dict, bool, bytes] + Any_ = Any + Dict_ = dict + List_ = Union[list, tuple] + Ordered_ = OrderedDict() + + + class StructConfig: + """Configuration class untuk menghindari bentrok global variable""" + + _instances = {} + _lock = Lock() + + def __init__(self, instance_id: str = "default"): + self.instance_id = instance_id + self._name = "MemoryAwareStruct" + self._defaults = { + "name": "MemoryAwareStruct", + "case_sensitive": True, + "allow_override": True, + } + self._locked = False + + def __dir__(self): + return [] + + @classmethod + def get_instance(cls, instance_id: str = "default"): + """Get or create config instance""" + with cls._lock: + if instance_id not in cls._instances: + cls._instances[instance_id] = cls(instance_id) + return cls._instances[instance_id] + + def set_name(self, name: str): + """Set memoryawarestruct name for this instance""" + if not self._locked: + self._name = name + else: + raise RuntimeError("Configuration is locked") + + def get_name(self): + """Get memoryawarestruct name for this instance""" + return self._name + + def lock(self): + """Lock configuration to prevent changes""" + self._locked = True + + def unlock(self): + """Unlock configuration""" + self._locked = False + + def reset_to_defaults(self): + """Reset configuration to defaults""" + if not self._locked: + self._name = self._defaults["name"] + + class ImmutableList(tuple): + def __new__(cls, iterable): + # Konversi elemen list ke bentuk yang sudah diamankan + return super().__new__(cls, [deep_struct_freeze(item) for item in iterable]) + + def __init__(self, iterable): + pass # Tidak ada init + + def __setitem__(self, *args): + raise TypeError("ImmutableList does not support item assignment") + + def append(self, *args): + raise TypeError("ImmutableList does not support append") + + def extend(self, *args): + raise TypeError("ImmutableList does not support extend") + + def pop(self, *args): + raise TypeError("ImmutableList does not support pop") + + def remove(self, *args): + raise TypeError("ImmutableList does not support remove") + + def insert(self, *args): + raise TypeError("ImmutableList does not support insert") + + + class ImmutableDict: + def __init__(self, data): + self._data = {k: deep_struct_freeze(v) for k, v in data.items()} + + def __getitem__(self, key): + return self._data[key] + + def __iter__(self): + return iter(self._data) + + def __len__(self): + return len(self._data) + + def __contains__(self, key): + return key in self._data + + def items(self): + return self._data.items() + + def keys(self): + return self._data.keys() + + def values(self): + return self._data.values() + + def __setitem__(self, *args): + raise TypeError("ImmutableDict does not support item assignment") + + def __delitem__(self, *args): + raise TypeError("ImmutableDict does not support deletion") + + def __repr__(self): + return f"ImmutableDict({self._data})" + + def deep_struct_freeze(obj, blacklist: list = None, config_id="default", existing_keys=None): + if blacklist is None: + blacklist = ['get_struct_name', 'set_struct_name', 'restore_backup', 'reset_to_original', 'safe_get', 'safe_set', 'dell_dict', 'lock_dict', 'unlock_dict', 'get_dict_protection_status'] + + if existing_keys is None: + existing_keys = set() + + def generate_unique_key(base, used): + counter = 1 + new_key = base + while new_key in used: + new_key = f"{base}{counter}" + counter += 1 + used.add(new_key) + return new_key + + if isinstance(obj, memoryawarestruct): + return obj + + if isinstance(obj, dict): + frozen_items = {} + used_keys = set(existing_keys) + + for k, v in obj.items(): + original_key = replace_special_chars(str(k)) + + # Deteksi key mirip blacklist + similar = difflib.get_close_matches(original_key, blacklist, n=1, cutoff=0.8) + if similar: + original_key = generate_unique_key(original_key, used_keys) + elif original_key in used_keys: + original_key = generate_unique_key(original_key, used_keys) + else: + used_keys.add(original_key) + + # Tangani nilai rekursif + if isinstance(v, dict): + frozen_items[original_key] = memoryawarestruct( + original_key, + **{ + replace_special_chars(str(subk)): deep_struct_freeze( + subv, blacklist=blacklist, config_id=original_key + ) + for subk, subv in v.items() + }, + ) + else: + frozen_items[original_key] = deep_struct_freeze(v, blacklist=blacklist, config_id=original_key) + return memoryawarestruct(config_id, **frozen_items) + + elif isinstance(obj, list): + return ImmutableList(obj) + + elif isinstance(obj, tuple): + return ImmutableList(obj) + + elif isinstance(obj, set): + return frozenset(deep_struct_freeze(i, blacklist=blacklist, config_id=config_id) for i in obj) + + return obj + + + class memoryawarestruct(metaclass=SecureStructMeta): + """ + memoryawarestruct is a highly secure and extensible memory-aware data structure designed to prevent + unauthorized access or modification of internal state. It enables controlled dynamic attribute + storage while enforcing protection on both attributes and internal dictionaries. + + This class is suitable for secure runtime environments or sensitive data models that + require robust encapsulation and anti-tampering mechanisms. + + Features: + - Dynamic attribute storage and protection + - Configurable dictionary and attribute protection + - Safe interface for reading, writing, and deleting keys + - Method and attribute registry with access control + - Built-in backup and restoration mechanism + + Parameters: + __config_id (str): Configuration identifier used to load predefined security config. Default is 'default'. + __allow_unsafe_operations (bool): If True, allows unsafe modifications. Use with caution. + __dict_protection (bool): Enables or disables internal dictionary access protection. + __attr_protection (bool): Enables or disables protection of internal attributes. + **entries: Initial user-defined attributes to set on the memoryawarestruct. + """ + def __init__( + self, + __config_id: str = "default", + __allow_unsafe_operations: bool = False, + __dict_protection: bool = True, + __attr_protection: bool = True, + **entries: SelectType.Union_, + ) -> None: + """ + Initialize the secure memory with optional configuration and initial attributes. + + Internal states such as protection flags, method registries, and original entries + are set up during this phase. If dictionary protection is enabled, a `ProtectedDict` + is used to secure all user-defined attributes. + + Args: + __config_id (str): The identifier used to retrieve a shared config instance. + __allow_unsafe_operations (bool): Allows direct changes if set to True. Default is False. + __dict_protection (bool): Enables protection on the internal dictionary. Default is True. + __attr_protection (bool): Enables protection on internal attributes. Default is True. + **entries (dict): Initial key-value pairs to be added to the memoryawarestruct as dynamic attributes. + """ + # Initialize basic attributes first to avoid AttributeError + object.__setattr__(self, "_method_registry", {}) + object.__setattr__(self, "_allow_unsafe_operations", __allow_unsafe_operations) + object.__setattr__(self, "_dict_protection_enabled", __dict_protection) + object.__setattr__(self, "_attr_protection_enabled", __attr_protection) + object.__setattr__(self, "_user_attributes", set()) + object.__setattr__( + self, "_dict_access_blocked", True + ) # Block all dict access by default + + # Protected attributes - tidak bisa diubah dari luar + object.__setattr__(self, "_config_id", __config_id) + object.__setattr__(self, "_config", StructConfig.get_instance(__config_id)) + object.__setattr__(self, "_original_entries", copy.deepcopy(entries)) + object.__setattr__( + self, + "_protected_attrs", + { + "_config_id", + "_config", + "_original_entries", + "_backup_dict", + "_protected_attrs", + "_allow_unsafe_operations", + "_method_registry", + "_protected_dict", + "_dict_protection_enabled", + "_attr_protection_enabled", + "_user_attributes", + "_dict_access_blocked", + }, + ) + + # Register original methods + self._register_original_methods() + + # Buat protected dictionary sebagai pengganti __dict__ + if __dict_protection: + object.__setattr__(self, "_protected_dict", ProtectedDict(self)) + # Authorize ONLY internal methods to modify the dict - VERY LIMITED + self._protected_dict.authorize_method( + "_internal_dict_update", max_access=1000 + ) # Allow multiple internal updates + else: + object.__setattr__(self, "_protected_dict", None) + if entries.__len__() >= 1: + entries = smart_list_to_dict([entries]) + + # Update dengan entries yang diberikan + for key, value in entries.items(): + key = replace_special_chars(key) + if not key.startswith("_"): + self._internal_dict_update(key, value) + self._user_attributes.add(key) + + def __dir__(self): + return [ + "config", + "lock_dict", + "get_dict_protection_status", + "set_struct_name", + "insert_dict", + "update_dict", + "dell_dict", + "restore_backup", + "reset_to_original", + "safe_set", + "safe_get", + "get_protection_status", + "__repr__", + "__str__", + ] + + def _is_internal_call(self) -> bool: + """Check if the call is coming from internal methods""" + frame = inspect.currentframe() + try: + # Check multiple frames up the call stack + current_frame = frame.f_back + internal_call_found = False + + while current_frame and not internal_call_found: + caller_locals = current_frame.f_locals + method_name = current_frame.f_code.co_name + + # List of internal methods allowed to modify attributes + internal_methods = { + "__init__", + "_internal_dict_update", + "safe_set", + "restore_backup", + "reset_to_original", + "update_dict", + "insert_dict", + "dell_dict", + } + + # If we find an internal method in the call stack + if method_name in internal_methods and "self" in caller_locals: + if caller_locals["self"] is self: + internal_call_found = True + break + + current_frame = current_frame.f_back + + return internal_call_found + except Exception: + return False + finally: + del frame + + def _internal_dict_update(self, key: str, value: Any): + """Internal method to update dictionary safely with reserved name protection""" + + # Define method blacklist + protected_names = set([ name for name, obj in inspect.getmembers(self, inspect.ismethod)]) + + # Sanitize key + original_key = replace_special_chars(str(key)) + + # Periksa kemiripan dengan blacklist + similar = difflib.get_close_matches(original_key, protected_names, n=1, cutoff=0.9) + + # Jika mirip, atau key sudah ada, modifikasi nama + def generate_unique_key(base_key, existing_keys): + counter = 1 + new_key = base_key + while new_key in existing_keys or new_key in protected_names: + new_key = f"{base_key}{counter}" + counter += 1 + return new_key + + existing_keys = set(self.get_user_attributes().keys()) if hasattr(self, "get_user_attributes") else set() + if similar or original_key in existing_keys or original_key in protected_names: + original_key = generate_unique_key(original_key, existing_keys) + + # Bekukan nilainya + frozen_value = deep_struct_freeze(value, blacklist=list(protected_names), config_id=self._config_id) + + # Set sebagai atribut + object.__setattr__(self, original_key, frozen_value) + + if self._protected_dict is not None: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + try: + self._protected_dict[original_key] = frozen_value + finally: + self._protected_dict._locked = was_locked + + if not original_key.startswith("_"): + self._user_attributes.add(original_key) + + + def _register_original_methods(self): + """Register original methods are used for protection""" + methods_to_protect = [ + "get_struct_name", + "set_struct_name", + "restore_backup", + "reset_to_original", + "safe_get", + "safe_set", + "dell_dict", + "lock_dict", + "unlock_dict", + "get_dict_protection_status", + ] + + for method_name in methods_to_protect: + if hasattr(self, method_name): + original_method = getattr(self, method_name) + self._method_registry[method_name] = original_method + + def __setattr__(self, name: str, value: Any) -> None: + # IZINKAN jika ini adalah property dengan setter + cls_attr = getattr(type(self), name, None) + if isinstance(cls_attr, property) and cls_attr.fset: + return cls_attr.fset(self, value) + if hasattr(self, "_protected_attrs") and name in self._protected_attrs: + raise AttributeError(f"Cannot modify protected attribute '{name}'") + # Cek apakah ini user attribute + is_user_attr = ( + hasattr(self, "_user_attributes") and name in self._user_attributes + ) + is_internal = self._is_internal_call() + allow_unsafe = getattr(self, "_allow_unsafe_operations", False) + if not is_internal and not allow_unsafe: + raise AttributeError( + f"Attribute access denied: Cannot add modify attribute '{name}' directly. " + "Use safe_set, insert_dict, or update_dict instead." + ) + # Safe to set attribute + object.__setattr__(self, name, value) + # Update ke protected_dict jika tidak diawali underscore + if ( + hasattr(self, "_protected_dict") + and self._protected_dict is not None + and not name.startswith("_") + ): + try: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + self._protected_dict[name] = value + self._protected_dict._locked = was_locked + except AttributeError: + pass + # Track user attribute + if hasattr(self, "_user_attributes") and not name.startswith("_"): + self._user_attributes.add(name) + + def __delattr__(self, name: str) -> None: + """Override delattr for protection""" + if hasattr(self, "_protected_attrs") and name in self._protected_attrs: + raise AttributeError(f"Cannot delete protected attribute '{name}'") + + # Cek proteksi atribut user - HANYA method internal yang boleh menghapus + if ( + hasattr(self, "_attr_protection_enabled") + and getattr(self, "_attr_protection_enabled", True) + and hasattr(self, "_user_attributes") + and name in getattr(self, "_user_attributes", set()) + ): + + # Periksa apakah ini panggilan internal yang sah + is_internal = self._is_internal_call() + allow_unsafe = getattr(self, "_allow_unsafe_operations", False) + + if not is_internal and not allow_unsafe: + raise AttributeError( + f"Attribute access denied: Cannot delete user attribute '{name}' directly. Use dell_dict() method instead." + ) + + if name in getattr(self, "_method_registry", {}): + raise AttributeError( + f"Method deletion denied: Cannot delete protected method '{name}'" + ) + + if name == "__dict__": + raise AttributeError( + "Dictionary access denied: Cannot delete __dict__ in protected memoryawarestruct" + ) + + # Update protected dict jika ada + if ( + hasattr(self, "_protected_dict") + and self._protected_dict is not None + and not name.startswith("_") + ): + try: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + del self._protected_dict[name] + self._protected_dict._locked = was_locked + except (AttributeError, KeyError): + pass + + # Remove from user attributes tracking + if hasattr(self, "_user_attributes") and name in self._user_attributes: + self._user_attributes.discard(name) + + object.__delattr__(self, name) + + + def __getattribute__(self, name: str) -> Any: + """Override getattribute for method protection and controlled access""" + if name == "__dict__": + try: + dict_protection = object.__getattribute__(self, "_dict_protection_enabled") + if dict_protection: + user_attrs = { + k: v + for k, v in object.__getattribute__(self, "__dict__").items() + if not k.startswith("_") + } + return ReadOnlyDictView(user_attrs) + except AttributeError: + pass # Ignore if attributes not yet initialized + + try: + attr = object.__getattribute__(self, name) + + # Protection for critical method overrides + try: + method_registry = object.__getattribute__(self, "_method_registry") + if method_registry and name in method_registry: + if attr != method_registry[name]: + return method_registry[name] + except AttributeError: + pass + + return attr + + except AttributeError as e: + # Suggest similar attribute if not found + try: + user_attrs = object.__getattribute__(self, "_user_attributes") + suggestions = difflib.get_close_matches(name, user_attrs, n=1, cutoff=0.7) + if suggestions: + suggestion_msg = f" Did you mean '{suggestions[0]}'?" + else: + suggestion_msg = "" + except Exception: + suggestion_msg = "" + + raise AttributeError(f"Attribute '{name}' not found.{suggestion_msg}".strip()) from e + + + def __getitem__(self, key: str) -> Any: + """Allow dictionary-style access - READ ONLY""" + # Block dictionary-style access completely + # if getattr(self, '_dict_access_blocked', True): + # raise AttributeError("Dictionary access denied: Dictionary-style access is completely blocked") + + try: + return object.__getattribute__(self, replace_special_chars(key)) + except AttributeError: + raise KeyError(key) + + def __setitem__(self, key: str, value: Any) -> None: + """Block dictionary-style setting completely""" + raise AttributeError( + "Dictionary access denied: Dictionary-style modification is completely blocked" + ) + + def __delitem__(self, key: str) -> None: + """Block dictionary-style deletion completely""" + raise AttributeError( + "Dictionary access denied: Dictionary-style deletion is completely blocked" + ) + + @property + def config(self): + """ + Read-only property to access the internal configuration instance. + + Returns: + Any: The configuration object stored in the internal `_config` attribute. + + This property provides controlled read access to the `_config` attribute, + which is assumed to hold the configuration data for this object. Direct + access to `_config` is discouraged outside this interface. + + Notes: + - This property is read-only. To modify the configuration, use specific + methods or setters provided elsewhere (if available). + - Bypasses `__getattr__` and other overrides by using `object.__getattribute__`. + + Raises: + AttributeError: If the `_config` attribute is not set on the object. + + Example: + >>> obj.config # returns the configuration instance + """ + return object.__getattribute__(self, "_config") + + def lock_dict(self) -> None: + """Lock the internal dictionary to prevent modifications""" + if self._protected_dict is not None: + self._protected_dict.lock() + + def unlock_dict(self) -> None: + """Permanently disabled - dictionary cannot be unlocked""" + logger.warning("Dictionary unlock is permanently disabled for security") + + def get_dict_protection_status(self) -> dict: + """ + Get the current dictionary protection status. + + Notes: + If the internal `_protected_dict` is not initialized (None), + only basic access-blocking status is returned. + + Returns: + dict: A dictionary describing the current protection state, including: + - protection_enabled (bool): Whether protection is enabled. + - locked (bool, optional): Whether the protected dictionary is locked. + - authorized_methods (int, optional): Number of authorized methods. + - dict_access_blocked (bool): Whether dictionary access is globally blocked. + """ + if self._protected_dict is not None: + return { + "protection_enabled": True, + "locked": self._protected_dict._locked, + "authorized_methods": len(self._protected_dict._authorized_methods), + "dict_access_blocked": self._dict_access_blocked, + } + else: + return { + "protection_enabled": False, + "dict_access_blocked": self._dict_access_blocked, + } + + def set_struct_name(self, name: str) -> None: + """ + Set the memoryawarestruct name for the current instance only. + + Args: + name (str): The new name to assign to this instance's configuration. + + Raises: + AttributeError: If the internal configuration object `_config` is not available. + """ + self._config.set_name(name) + + def get_struct_name(self) -> str: + """ + Get the memoryawarestruct name assigned to this instance. + + Returns: + str: The name currently set in this instance's configuration. + + Raises: + AttributeError: If the internal configuration object `_config` is not available. + """ + return self._config.get_name() + + @staticmethod + def set_global_name(name: str, config_id: str = "default") -> None: + """ + Set the memoryawarestruct name globally for a specific configuration ID. + + Args: + name (str): The new global memoryawarestruct name. + config_id (str): The ID of the config instance to apply the name to. Default is "default". + + Raises: + KeyError: If the specified configuration ID does not exist. + """ + config = StructConfig.get_instance(config_id) + config.set_name(name) + + @property + def insert_dict(self): + """ + Write-only property for inserting new key-value pairs as object attributes. + + This property allows you to assign a dictionary or a list of key-value pairs + to add new attributes dynamically to the current object. + + Constraints: + - Only keys that do not start with an underscore `_` are processed. + - If a key already exists as an attribute, insertion is denied to avoid overwriting. + + Raises: + - AttributeError: When attempting to read the property, or if a key already exists. + - TypeError: If the assigned value is neither a dictionary nor a list. + + Example usage: + obj.insert_dict = {"name": "Alice", "age": 30} + """ + raise AttributeError("insert_dict is write-only and cannot be read.") + + + @insert_dict.setter + def insert_dict(self, dict_new:SelectType.Union_) -> None: + + if isinstance(dict_new, SelectType.Dict_): + for key, value in dict_new.items(): + key = replace_special_chars(key) + if not key.startswith("_"): + if hasattr(self, key): + raise AttributeError( + f"Insert failed: Key '{key}' already exists. Use update_dict instead. Use update_dict instead." + ) + + self.safe_set(key, value, allow_override=False) + elif isinstance(dict_new, SelectType.List_): + data = smart_list_to_dict(dict_new) + self.insert_dict = data + else: + raise TypeError("insert_dict expects a dictionary") + + @insert_dict.deleter + def insert_dict(self, dict_new:SelectType.Union_) -> None: + raise AttributeError("update_dict is write-only and cannot be read or delete.") + + @property + def update_dict(self): + """ + Write-only property for updating existing public attributes using a dictionary. + + This property allows you to assign a dictionary or a list of key-value pairs + to update existing attributes of the object. It ensures that only attributes + which already exist and are public (i.e., not prefixed with `_`) can be updated. + + Constraints: + - Only existing public keys can be updated. + - Keys starting with an underscore `_` are ignored. + + Side Effects: + - Creates a backup of current public attributes into `_backup_dict` before update. + + Raises: + - AttributeError: When attempting to read the property, or if a key does not exist. + - TypeError: If the assigned value is neither a dictionary nor a list. + + Example usage: + obj.update_dict = {"name": "Bob"} # Only if 'name' already exists + """ + raise AttributeError("update_dict is write-only and cannot be read.") + + + @update_dict.setter + def update_dict(self, dict_new:SelectType.Union_) -> None: + if isinstance(dict_new, SelectType.Dict_): + public_attrs = { + k: copy.deepcopy(v) + for k, v in self.__dict__.items() + if not k.startswith("_") + } + object.__setattr__(self, "_backup_dict", public_attrs) + for key, value in dict_new.items(): + key = replace_special_chars(key) + if not key.startswith("_"): + if not hasattr(self, key): + raise AttributeError( + f"Update failed: Key '{key}' does not exist. Use insert_dict instead." + ) + self.safe_set(key, value, allow_override=True) + elif isinstance(dict_new, SelectType.List_): + data = smart_list_to_dict(dict_new) + self.update_dict = data + else: + raise TypeError("update_dict expects a dictionary") + + @update_dict.deleter + def update_dict(self, dict_new:SelectType.Union_) -> None: + raise AttributeError("update_dict is write-only and cannot be read or delete.") + + + def dell_dict(self, params: str) -> bool: + """ + Delete a user-defined attribute from the dictionary. + + Args: + params (str): The key (attribute name) to delete. + + Notes: + - Internal attributes (starting with `_`) and protected methods cannot be deleted. + - Also updates the protected dictionary if applicable. + + Returns: + bool: True if deletion succeeded, False otherwise. + """ + + getallmethod = [name for name, func in inspect.getmembers(self, inspect.ismethod)] + user_attrs = object.__getattribute__(self, "_user_attributes") + suggestions = difflib.get_close_matches(params, user_attrs, n=1, cutoff=0.7) + if params in getallmethod: + raise AttributeError(f"Attribute '{params}' not found.{suggestions}".strip()) + + # Prevent deletion of internal attributes + params = replace_special_chars(params) + if params.startswith("_"): + logger.warning(f"Cannot delete internal attribute: {params}") + return False + + # Prevent deletion of protected methods + if params in getattr(self, "_method_registry", {}): + logger.warning(f"Cannot delete protected method: {params}") + return False + + user_attrs = self.get_user_attributes() + + if params in user_attrs: + try: + # Update protected dict first if exists + if self._protected_dict is not None: + try: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + del self._protected_dict[params] + self._protected_dict._locked = was_locked + except (AttributeError, KeyError): + pass + + # Remove from user attributes tracking + if hasattr(self, "_user_attributes"): + self._user_attributes.discard(params) + + object.__delattr__(self, params) + logger.info(f"Successfully deleted: {params}") + return True + except AttributeError: + logger.error(f"Cannot delete: {params}") + return False + else: + logger.error(f"Key not found: {params}") + return False + + def restore_backup(self) -> bool: + """ + Restore instance attributes from the last backup. + + Notes: + - This only restores user-defined attributes (non-internal). + - Also attempts to unlock and update protected dictionary if needed. + - Clears current user-defined attributes before restoring. + + Returns: + bool: True if restoration was successful, False otherwise. + """ + if hasattr(self, "_backup_dict"): + backup = object.__getattribute__(self, "_backup_dict") + + # Clear user attributes only + user_attrs = list(self.get_user_attributes().keys()) + for attr in user_attrs: + try: + object.__delattr__(self, attr) + if self._protected_dict is not None: + try: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + del self._protected_dict[attr] + self._protected_dict._locked = was_locked + except (AttributeError, KeyError): + pass + except: + pass + + # Clear user attributes tracking + if hasattr(self, "_user_attributes"): + self._user_attributes.clear() + + # Restore non-protected attributes from backup + for key, value in backup.items(): + if not key.startswith("_"): + self._internal_dict_update(key, value) + + logger.info("Restored from backup") + return True + else: + logger.warning("No backup available") + return False + + def reset_to_original(self) -> None: + """ + Reset all user-defined attributes to the original state. + """ + # Clear user attributes + user_attrs = list(self.get_user_attributes().keys()) + for attr in user_attrs: + try: + object.__delattr__(self, attr) + if self._protected_dict is not None: + try: + was_locked = self._protected_dict._locked + self._protected_dict._locked = False + del self._protected_dict[attr] + self._protected_dict._locked = was_locked + except (AttributeError, KeyError): + pass + except: + pass + + # Clear user attributes tracking + if hasattr(self, "_user_attributes"): + self._user_attributes.clear() + + # Restore original entries + original_entries = object.__getattribute__(self, "_original_entries") + for key, value in original_entries.items(): + self._internal_dict_update(key, value) + + logger.info("Reset to original state") + + def safe_get(self, key: str, default: Any = None) -> Any: + """ + Safely retrieve an attribute with a fallback default value. + + Args: + key (str): Attribute name to fetch. + default (Any): Value to return if attribute is not found. + + Returns: + Any: Attribute value if found, otherwise the `default` value. + """ + try: + return object.__getattribute__(self, replace_special_chars(key)) + except AttributeError: + return default + + def safe_set(self, key: str, value: Any, allow_override: bool = True) -> bool: + """ + Safely set a user-defined attribute. + + Args: + key (str): Attribute name to set. + value (Any): Value to assign. + allow_override (bool): Whether to override existing attribute. Default is True. + + Returns: + bool: True if set succeeded, False otherwise. + """ + key = replace_special_chars(key) + user_attrs = object.__getattribute__(self, "_user_attributes") + suggestions = difflib.get_close_matches(key, user_attrs, n=1, cutoff=1) + if key in getattr(self, "_method_registry", {}): + raise AttributeError(f"Attribute '{key}' not found.{suggestions}".strip()) + + if key.startswith("_"): + logger.warning(f"Cannot set internal attribute: {key}") + return False + + if not allow_override and hasattr(self, key): + logger.warning(f"Attribute {key} already exists and override not allowed") + return False + + try: + frozen_value = deep_struct_freeze(value, config_id=key) + self._internal_dict_update(key, frozen_value) + return True + except AttributeError as e: + logger.error(f"Cannot set attribute {key}: {str(e)}") + return False + + def get_user_attributes(self) -> dict: + """ + Get all user-defined attributes, excluding internal ones. + + Returns: + dict: A dictionary of user-defined attributes (key-value pairs). + """ + return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + + def unlock_for_maintenance(self, confirmation: str = None) -> None: + """ + Permanently disabled method to prevent unauthorized access. + + Args: + confirmation (str, optional): No longer accepted. + + Notes: + This method is intentionally disabled for security reasons. + """ + logger.warning("SECURITY: Maintenance unlock is permanently disabled") + + def lock_after_maintenance(self) -> None: + """ + Lock the memoryawarestruct again after maintenance (already locked by default). + + Notes: + Currently a no-op as the memoryawarestruct is always in a secured state. + """ + logger.info("memoryawarestruct is already fully locked and secured") + + def get_protection_status(self) -> dict: + """ + Get the complete protection status of the memoryawarestruct instance. + + Returns: + dict: A dictionary containing: + - protected_attrs (int): Number of protected attributes. + - protected_methods (int): Number of registered protected methods. + - unsafe_operations_allowed (bool): If unsafe ops are enabled. + - user_attributes (int): Number of current user-defined attributes. + - user_attributes_count (int): Size of user attribute tracking set. + - dict_protection (dict): Status from `get_dict_protection_status`. + - attribute_protection (bool): Whether attribute protection is active. + - dict_access_completely_blocked (bool): If dict access is fully blocked. + """ + dict_status = self.get_dict_protection_status() + return { + "protected_attrs": len(getattr(self, "_protected_attrs", {})), + "protected_methods": len(getattr(self, "_method_registry", {})), + "unsafe_operations_allowed": getattr( + self, "_allow_unsafe_operations", False + ), + "user_attributes": len(self.get_user_attributes()), + "dict_protection": dict_status, + "attribute_protection": getattr(self, "_attr_protection_enabled", True), + "user_attributes_count": len(getattr(self, "_user_attributes", set())), + "dict_access_completely_blocked": getattr( + self, "_dict_access_blocked", True + ), + } + + def __repr__(self) -> str: + # Only show user attributes in representation + user_attrs = self.get_user_attributes() + + if not user_attrs: + return f"{self.get_struct_name()}()" + + output_dictionary = tuple( + [ + ( + f"{k}({v})" + if isinstance(v, (list, dict)) + else f"{k}({list(v)})" if isinstance(v, tuple) else f"{k}('{v}')" + ) + for k, v in user_attrs.items() + ] + ) + return f"{self.get_struct_name()}" + str(output_dictionary).replace('"', "") + + + # Factory function untuk membuat memoryawarestruct dengan konfigurasi khusus + def create_secure_memory( + config_id: str = "default", + struct_name: str = "MemoryAwareStruct", + dict_protection: bool = True, + attr_protection: bool = True, + ): + """ + Factory function to create a customized Secure memoryawarestruct factory. + + This function sets up a reusable factory that generates instances of `memoryawarestruct` with + predefined security configurations such as dictionary protection, attribute protection, + and a specific configuration ID and name. Useful for creating scoped secure data objects + with uniform behavior across different parts of an application. + + Args: + config_id (str): The identifier for the shared configuration instance. Default is "default". + struct_name (str): A label or name for this memoryawarestruct configuration, used for tracking or debugging. + dict_protection (bool): Whether to enable protection of the internal dictionary (default: True). + attr_protection (bool): Whether to enable protection of internal attributes (default: True). + + Returns: + Callable[..., memoryawarestruct]: A function that, when called with keyword arguments, returns a `memoryawarestruct` instance + initialized with the predefined configuration. + + Example: + >>> MySecureStruct = create_secure_memory("session", "UserSession") + >>> obj = MySecureStruct(username="alice", role="admin") + >>> print(obj.safe_get("username")) # "alice" + """ + StructConfig.get_instance(config_id).set_name(struct_name) + + def struct_factory(**kwargs): + return memoryawarestruct( + config_id=config_id, + _dict_protection=dict_protection, + _attr_protection=attr_protection, + **kwargs, + ) + + return struct_factory + + + # Context manager untuk temporary changes - DISABLED + class SecureMemoryContext: + """ + Context manager for safely enabling and disabling unsafe operations + on a memoryawarestruct instance temporarily. + + This is useful for making temporary modifications to a protected memoryawarestruct + while ensuring the original protection settings are restored afterward. + + Attributes: + memoryawarestruct (Any): The struct-like object being wrapped. + allow_unsafe (bool): Whether to allow unsafe operations within context. + original_state (bool): Backup of the original unsafe operations flag. + original_dict_state (bool): Backup of the protected dictionary lock state. + """ + def __init__(self, struct_instance, allow_unsafe: bool = False): + """ + Initialize the context manager with the memoryawarestruct instance. + + Args: + struct_instance (Any): The struct-like object to manage. + allow_unsafe (bool): Whether to enable unsafe operations during the context. + """ + self.struct = struct_instance + self.allow_unsafe = allow_unsafe + self.original_state = None + self.original_dict_state = None + + def __enter__(self): + # Backup state + self.original_state = getattr(self.struct, "_allow_unsafe_operations", False) + if self.struct._protected_dict is not None: + self.original_dict_state = self.struct._protected_dict._locked + + if self.allow_unsafe: + object.__setattr__(self.struct, "_allow_unsafe_operations", True) + if self.struct._protected_dict is not None: + self.struct._protected_dict.unlock() + + return self.struct + + def __exit__(self, exc_type, exc_val, exc_tb): + # Restore original state + object.__setattr__(self.struct, "_allow_unsafe_operations", self.original_state) + if ( + self.struct._protected_dict is not None + and self.original_dict_state is not None + ): + if self.original_dict_state: + self.struct._protected_dict.lock() + + return ImmutableList([memoryawarestruct, create_secure_memory, SecureMemoryContext]) + +__main__ = __main() +create_secure_memory:Any = __main__[1] +SecureMemoryContext:Any = __main__[2] +memory:Any = __main__[0] \ No newline at end of file diff --git a/memoryawarestruct/test/test.py b/memoryawarestruct/test/test.py new file mode 100644 index 0000000..efb016d --- /dev/null +++ b/memoryawarestruct/test/test.py @@ -0,0 +1,78 @@ +from memoryawarestruct.core.memory import memory + +if __name__ == "__main__": + print("=== Testing Enhanced Secure Struct with Attribute Protection ===") + + # Test 1: Normal usage + print("\n1. Normal Usage:") + struct = memory(name="John", age=30, city="Jakarta") + print(f"Created: {struct}") + print(f"Protection status: {struct.get_protection_status()}") + + # Test 2: Try to modify attribute directly (should fail) + print("\n2. Trying to modify attribute directly:") + try: + struct.name = "HACKED" + print(f"After direct modification: {struct.name}") + print("ERROR: Direct attribute modification succeeded (THIS SHOULD NOT HAPPEN)") + except AttributeError as e: + print(f"SUCCESS: Direct attribute modification blocked: {e}") + + # Test 3: Try to delete attribute directly (should fail) + print("\n3. Trying to delete attribute directly:") + try: + del struct.age + print("ERROR: Direct attribute deletion succeeded (THIS SHOULD NOT HAPPEN)") + except AttributeError as e: + print(f"SUCCESS: Direct attribute deletion blocked: {e}") + + # Test 4: Using safe methods (should work) + print("\n4. Using safe methods:") + success = struct.safe_set("name", "Jane") + print(f"safe_set result: {success}") + print(f"Name after safe_set: {struct.name}") + + success = struct.dell_dict("city") + print(f"dell_dict result: {success}") + print(f"Struct after dell_dict: {struct}") + + # Test 5: Try to modify __dict__ directly (should fail) + print("\n5. Trying to modify __dict__ directly:") + try: + struct.__dict__["name"] = "HACKED" + print("ERROR: Dict modification succeeded (THIS SHOULD NOT HAPPEN)") + except (AttributeError, TypeError) as e: + print(f"SUCCESS: Dict modification blocked: {e}") + + # Test 6: Additional protection tests + print("\n6. Additional protection tests:") + + # Try to add new attribute directly (should fail if it becomes user attribute) + print("6a. Trying to add new attribute directly:") + try: + struct.new_attr = "test" + print(f"New attribute added: {struct.new_attr}") + except AttributeError as e: + print(f"New attribute addition blocked: {e}") + + # But safe_set should work + print("6b. Adding new attribute via safe_set:") + success = struct.safe_set("new_attr", "test_safe") + print(f"safe_set new attribute result: {success}") + if hasattr(struct, "new_attr"): + print(f"New attribute value: {struct.new_attr}") + + # Now try to modify the new attribute directly (should fail) + print("6c. Trying to modify new attribute directly:") + try: + struct["new_attr"] = "HACKED" + print("ERROR: New attribute modification succeeded") + except AttributeError as e: + print(f"SUCCESS: New attribute modification blocked: {e}") + # Final test + print("\nFinal test") + struct.insert_dict = [{"pangkat": [{"bintang": {"angkatan": 12}}, [8, 9], "hallo"]}] + + # struct.pangkat[0].bintang = 98 + print(struct.pangkat[0].bintang.angkatan) + print(struct["name"]) diff --git a/memoryawarestruct/utils/__init__.py b/memoryawarestruct/utils/__init__.py new file mode 100644 index 0000000..0227fac --- /dev/null +++ b/memoryawarestruct/utils/__init__.py @@ -0,0 +1,2 @@ +from .typedata import * +from .middleware import ProtectedDict, ReadOnlyDictView, ProtectedAttribute, metaClass, replace_special_chars, smart_list_to_dict \ No newline at end of file diff --git a/memoryawarestruct/utils/middleware.py b/memoryawarestruct/utils/middleware.py new file mode 100644 index 0000000..7f77b6f --- /dev/null +++ b/memoryawarestruct/utils/middleware.py @@ -0,0 +1,353 @@ +import os, sys, re, inspect, weakref +from functools import wraps + +def replace_special_chars(input_string): + if not isinstance(input_string, str): + raise TypeError("Input must be a string.") + + # Remove extra spaces and replace with underscore + input_string = "_".join(input_string.strip().split()) + + if not input_string: + raise ValueError("Input string is empty after trimming and space replacement.") + + # Separate digits at the beginning + match = re.match(r"^(\d+)([a-zA-Z_].*)?", input_string) + if match: + digits = match.group(1) + rest = match.group(2) or "" + input_string = rest + "_" + digits + + # If result is only digits, it's invalid + if input_string.isdigit(): + raise ValueError( + "Resulting string contains only digits, which is not a valid name." + ) + + # Replace all non-alphanumeric and non-underscore characters with "_" + output_string = re.sub(r"[^a-zA-Z0-9_]", "_", input_string) + + if output_string.replace("_", "").__len__() <= 0: + raise ValueError("Input string is empty after trimming and space replacement.") + + # Strip leading/trailing underscores + return output_string.strip("_") + + +def is_all_list(data): + return all(isinstance(i, list) for i in data) + + +def clean_json(raw_data: dict): + # Bersihkan key-key menggunakan fungsi + cleaned_data = {} + for key, value in raw_data.items(): + try: + cleaned_key = replace_special_chars(key) + cleaned_data[cleaned_key] = value + except ValueError as e: + pass + except TypeError as e: + pass + return cleaned_data + + +def smart_list_to_dict(data): + """ + Smartly convert mixed list to dict. + Can detect patterns such as: + - ["key", value] + - [{"key": value}] + - ["key", value, {"key2": val2}] + - [{"key": [ {"nested": val}, ... ] }, value] + """ + result = {} + i = 0 + length = len(data) + + while i < length: + item = data[i] + + if isinstance(item, dict): + # Kasus dict langsung, gabungkan key-nya + for k, v in item.items(): + key = replace_special_chars(str(k)) + if key in result: + # Jadikan list jika duplikat + if not isinstance(result[key], list): + result[key] = [result[key]] + result[key].append(v) + else: + result[key] = v + i += 1 + + elif isinstance(item, list): + # Jika list dalam list: konversi rekursif + sub_dict = smart_list_to_dict(item) + result.update(sub_dict) + i += 1 + + elif isinstance(item, str) and (i + 1 < length): + # Pola ["key", value] + key = replace_special_chars(item) + val = data[i + 1] + if key in result: + if not isinstance(result[key], list): + result[key] = [result[key]] + result[key].append(val) + else: + result[key] = val + i += 2 + + else: + i += 1 # lewati jika tidak cocok pola + + return result + + +class ProtectedDict(dict): + """Protected dictionary that prevents external modification""" + + def __init__(self, owner_instance, *args, **kwargs): + super().__init__(*args, **kwargs) + self._owner = weakref.ref(owner_instance) + self._locked = True # Default to unlocked + self._authorized_methods = set() + self._access_count = {} # Track access count per method + self._max_access_per_method = 1 # Maximum access allowed per method + + def __dir__(self): + return [] + + def _check_authorization(self): + """Check if caller is authorized to modify this dict""" + + frame = inspect.currentframe() + try: + # Go up the call stack to find the caller + caller_frame = frame.f_back.f_back if frame.f_back else None + if not caller_frame: + return False + + caller_locals = caller_frame.f_locals + method_name = caller_frame.f_code.co_name + + # Check if caller is the owner instance + owner = self._owner() + if owner and "self" in caller_locals: + if caller_locals["self"] is owner: + # Check if method is authorized + if method_name in self._authorized_methods: + # Check access count limit + current_count = self._access_count.get(method_name, 0) + if current_count < self._max_access_per_method: + self._access_count[method_name] = current_count + 1 + return True + else: + return False + return False + + return False + except Exception: + return False + finally: + del frame + + def authorize_method(self, method_name: str, max_access: int = 1): + """Authorize a method to modify this dict with access limit""" + self._authorized_methods.add(method_name) + self._max_access_per_method = max_access + + def reset_access_count(self): + """Reset access count for all methods""" + self._access_count.clear() + + def lock(self): + """Lock the dictionary to prevent modifications""" + self._locked = True + + def unlock(self): + """Unlock the dictionary (NOT RECOMMENDED)""" + self._locked = False + + def __setitem__(self, key, value): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot modify protected dictionary from external context" + ) + # super().__setitem__(key, value) + + def __delitem__(self, key): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot delete from protected dictionary from external context" + ) + # super().__delitem__(key) + + def clear(self): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot clear protected dictionary from external context" + ) + super().clear() + + def pop(self, *args, **kwargs): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot pop from protected dictionary from external context" + ) + return super().pop(*args, **kwargs) + + def popitem(self): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot popitem from protected dictionary from external context" + ) + return super().popitem() + + def update(self, *args, **kwargs): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot update protected dictionary from external context" + ) + super().update(*args, **kwargs) + + def setdefault(self, key, default=None): + if self._locked or not self._check_authorization(): + raise AttributeError( + "Dictionary access denied: Cannot setdefault on protected dictionary from external context" + ) + return super().setdefault(key, default) + + +class ReadOnlyDictView: + """Read-only view of dictionary that prevents all modifications""" + + def __init__(self, source_dict): + self._source = source_dict + + def __dir__(self): + return [] + + def __getitem__(self, key): + return self._source[key] + + def __iter__(self): + return iter(self._source) + + def __len__(self): + return len(self._source) + + def __contains__(self, key): + return key in self._source + + def get(self, key, default=None): + return self._source.get(key, default) + + def keys(self): + return self._source.keys() + + def values(self): + return self._source.values() + + def items(self): + return self._source.items() + + def __setitem__(self, key, value): + raise AttributeError( + "Dictionary access denied: Cannot modify dictionary through __dict__ access" + ) + + def __delitem__(self, key): + raise AttributeError( + "Dictionary access denied: Cannot delete from dictionary through __dict__ access" + ) + + def __repr__(self): + return f"ReadOnlyDictView({dict(self._source)})" + +class ProtectedAttribute: + """Descriptor to protect critical attributes""" + + def __init__(self, initial_value, read_only=False, validator=None): + self.initial_value = initial_value + self.read_only = read_only + self.validator = validator + self.name = None + + def __dir__(self): + return [] + + def __set_name__(self, owner, name): + self.name = f"_{name}_protected" + + def __get__(self, obj, objtype=None): + if obj is None: + return self + return getattr(obj, self.name, self.initial_value) + + def __set__(self, obj, value): + if self.read_only and hasattr(obj, self.name): + raise AttributeError(f"Cannot modify read-only attribute") + + if self.validator and not self.validator(value): + raise ValueError(f"Invalid value for protected attribute") + + setattr(obj, self.name, value) + + +def metaClass(): + def protect_method(func): + """Decorator to protect method from replacement""" + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # Mark as protected + wrapper._is_protected = True + wrapper._original_func = func + return wrapper + + + class SecureStructMeta(type): + """Metaclass to secure Struct class""" + + def __new__(mcs, name, bases, namespace, **kwargs): + # Daftar method dan atribut yang dilindungi + protected_items = { + "__init__", + "__setattr__", + "__getattribute__", + "__delattr__", + "__dict__", + "__getitem__", + "__setitem__", + "__delitem__", + "_config", + "_config_id", + "_original_entries", + "_backup_dict", + "get_struct_name", + "set_struct_name", + "restore_backup", + "reset_to_original", + "safe_get", + "safe_set", + "_protected_attrs", + "_protected_dict", + "lock_dict", + "unlock_dict", + "_is_internal_call", + } + + # Tandai method yang dilindungi + for key, value in namespace.items(): + if key in protected_items and callable(value): + namespace[key] = protect_method(value) + + cls = super().__new__(mcs, name, bases, namespace) + cls._protected_items = protected_items + return cls + return SecureStructMeta + \ No newline at end of file diff --git a/memoryawarestruct/utils/typedata.py b/memoryawarestruct/utils/typedata.py new file mode 100644 index 0000000..519b234 --- /dev/null +++ b/memoryawarestruct/utils/typedata.py @@ -0,0 +1,768 @@ +import inspect +import sys + +# Python version compatibility check +PY35_PLUS = sys.version_info >= (3, 5) +PY36_PLUS = sys.version_info >= (3, 6) +PY39_PLUS = sys.version_info >= (3, 9) + +# Import collections.abc with fallback for older Python +try: + from collections.abc import Iterable as AbcIterable, Mapping, Sequence +except ImportError: + # Python < 3.3 + from collections import Iterable as AbcIterable, Mapping, Sequence + +# ===== Version Info ===== +__version__ = "1.0.0" +TYPE_CHECKING = False + +# ===== Compatibility Utilities ===== +def _format_string(template, *args, **kwargs): + """F-string compatibility for Python < 3.6""" + if PY36_PLUS: + # Use actual f-string formatting when available + return template.format(*args, **kwargs) + else: + return template.format(*args, **kwargs) + +def _get_type_name(tp): + """Get type name with compatibility""" + if hasattr(tp, '__name__'): + return tp.__name__ + elif hasattr(tp, '_name'): + return tp._name + else: + return str(tp) + +# ===== Special Forms ===== +class _SpecialForm: + def __init__(self, name, doc=None): + self._name = name + self.__doc__ = doc + + def __repr__(self): + return _format_string("typing.{}", self._name) + + def __reduce__(self): + return self._name + +# ===== Any ===== +class AnyType(_SpecialForm): + def __init__(self): + super(AnyType, self).__init__("Any", "Special type indicating an unconstrained type.") + + def __instancecheck__(self, instance): + return True + + def __subclasscheck__(self, subclass): + return True + +Any = AnyType() + +# ===== NoReturn ===== +class NoReturnType(_SpecialForm): + def __init__(self): + super(NoReturnType, self).__init__("NoReturn", "Special type indicating functions that never return.") + +NoReturn = NoReturnType() + +# ===== ClassVar ===== +class ClassVar: + def __class_getitem__(cls, param): + return ClassVarAlias(param) + +class ClassVarAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return _format_string("ClassVar[{}]", self.param) + +# ===== Final ===== +class Final: + def __class_getitem__(cls, param): + return FinalAlias(param) + +class FinalAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return _format_string("Final[{}]", self.param) + +# ===== Union ===== +class Union: + def __class_getitem__(cls, params): + if not isinstance(params, tuple): + params = (params,) + + # Flatten nested unions + flattened = [] + for param in params: + if isinstance(param, UnionAlias): + flattened.extend(param.types) + else: + flattened.append(param) + + # Remove duplicates while preserving order + unique_types = [] + for t in flattened: + if t not in unique_types: + unique_types.append(t) + + if len(unique_types) == 1: + return unique_types[0] + + return UnionAlias(tuple(unique_types)) + +class UnionAlias: + def __init__(self, types): + self.types = types + + def __repr__(self): + type_reprs = [repr(t) for t in self.types] + return _format_string("Union[{}]", ', '.join(type_reprs)) + + def __instancecheck__(self, instance): + return any(isinstance(instance, t) for t in self.types) + + def __eq__(self, other): + return isinstance(other, UnionAlias) and set(self.types) == set(other.types) + + def __hash__(self): + return hash(tuple(sorted(_get_type_name(t) for t in self.types))) + +# ===== Optional ===== +class Optional: + def __class_getitem__(cls, param): + return Union[param, type(None)] + +# ===== Generic Collections ===== +class List: + def __class_getitem__(cls, param): + return ListAlias(param) + +class ListAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return _format_string("List[{}]", self.param) + + def __instancecheck__(self, instance): + if not isinstance(instance, list): + return False + # For empty list, always return True as it's compatible with any List type + if len(instance) == 0: + return True + if self.param is Any: + return True + return all(isinstance(item, self.param) for item in instance) + +class Dict: + def __class_getitem__(cls, params): + if not isinstance(params, tuple) or len(params) != 2: + raise TypeError("Dict requires two type parameters: key, value") + return DictAlias(*params) + +class DictAlias: + def __init__(self, key_type, value_type): + self.key_type = key_type + self.value_type = value_type + + def __repr__(self): + return _format_string("Dict[{}, {}]", self.key_type, self.value_type) + + def __instancecheck__(self, instance): + if not isinstance(instance, dict): + return False + # For empty dict, always return True as it's compatible with any Dict type + if len(instance) == 0: + return True + if self.key_type is Any and self.value_type is Any: + return True + return all( + isinstance(k, self.key_type) and isinstance(v, self.value_type) + for k, v in instance.items() + ) + +class Tuple: + def __class_getitem__(cls, params): + if not isinstance(params, tuple): + params = (params,) + return TupleAlias(params) + +class TupleAlias: + def __init__(self, types): + self.types = types + + def __repr__(self): + if len(self.types) == 2 and self.types[1] is ...: + return _format_string("Tuple[{}, ...]", self.types[0]) + type_reprs = [repr(t) for t in self.types] + return _format_string("Tuple[{}]", ', '.join(type_reprs)) + + def __instancecheck__(self, instance): + if not isinstance(instance, tuple): + return False + + # Handle Tuple[type, ...] (variable length) + if len(self.types) == 2 and self.types[1] is ...: + element_type = self.types[0] + return all(isinstance(item, element_type) for item in instance) + + # Handle fixed length tuple + if len(instance) != len(self.types): + return False + return all(isinstance(i, t) for i, t in zip(instance, self.types)) + +class Set: + def __class_getitem__(cls, param): + return SetAlias(param) + +class SetAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return _format_string("Set[{}]", self.param) + + def __instancecheck__(self, instance): + if not isinstance(instance, set): + return False + # For empty set, always return True as it's compatible with any Set type + if len(instance) == 0: + return True + if self.param is Any: + return True + return all(isinstance(item, self.param) for item in instance) + +# ===== DefaultDict ===== +class DefaultDict: + def __class_getitem__(cls, params): + if not isinstance(params, tuple) or len(params) != 2: + raise TypeError("DefaultDict requires two type parameters: key, value") + return DefaultDictAlias(*params) + +class DefaultDictAlias: + def __init__(self, key_type, value_type): + self.key_type = key_type + self.value_type = value_type + + def __repr__(self): + return _format_string("DefaultDict[{}, {}]", self.key_type, self.value_type) + + def __instancecheck__(self, instance): + try: + from collections import defaultdict + except ImportError: + # Fallback if defaultdict not available + return isinstance(instance, dict) + + # Allow empty dict to be compatible with DefaultDict + if isinstance(instance, dict) and len(instance) == 0: + return True + if not isinstance(instance, defaultdict): + return False + if self.key_type is Any and self.value_type is Any: + return True + return all( + isinstance(k, self.key_type) and isinstance(v, self.value_type) + for k, v in instance.items() + ) + +# ===== Literal ===== +class Literal: + def __class_getitem__(cls, values): + if not isinstance(values, tuple): + values = (values,) + return LiteralAlias(values) + +class LiteralAlias: + def __init__(self, values): + self.values = values + + def __repr__(self): + value_reprs = [repr(v) for v in self.values] + return _format_string("Literal[{}]", ', '.join(value_reprs)) + + def __instancecheck__(self, instance): + return instance in self.values + + def __eq__(self, other): + return isinstance(other, LiteralAlias) and set(self.values) == set(other.values) + + def __hash__(self): + try: + return hash(tuple(sorted(self.values, key=lambda x: (type(x).__name__, x)))) + except TypeError: + # Handle unhashable types + return hash(tuple(str(v) for v in self.values)) + +# ===== Callable ===== +class Callable: + def __class_getitem__(cls, params): + if params is ...: + return CallableAlias((...,), Any) + if not isinstance(params, tuple) or len(params) != 2: + raise TypeError("Callable[[args], return_type] expected") + arg_types, return_type = params + return CallableAlias(arg_types, return_type) + +class CallableAlias: + def __init__(self, arg_types, return_type): + self.arg_types = arg_types + self.return_type = return_type + + def __repr__(self): + if self.arg_types == (...,): + return _format_string("Callable[..., {}]", self.return_type) + arg_reprs = [repr(t) for t in self.arg_types] + return _format_string("Callable[[{}], {}]", ', '.join(arg_reprs), self.return_type) + + def __instancecheck__(self, instance): + return callable(instance) + +# ===== TypeVar ===== +class TypeVar: + def __init__(self, name, *constraints, **kwargs): + self.name = name + self.constraints = constraints + # Handle keyword arguments with compatibility + self.bound = kwargs.get('bound', None) + self.covariant = kwargs.get('covariant', False) + self.contravariant = kwargs.get('contravariant', False) + + if self.covariant and self.contravariant: + raise ValueError("Bivariant type variables are not supported") + + def __repr__(self): + return self.name + + def __instancecheck__(self, instance): + if self.bound: + return isinstance(instance, self.bound) + if self.constraints: + return any(isinstance(instance, constraint) for constraint in self.constraints) + return True + +# ===== Generic ===== +if PY36_PLUS: + # Use modern metaclass syntax for Python 3.6+ + class GenericMeta(type): + def __getitem__(cls, params): + if not isinstance(params, tuple): + params = (params,) + + class_name = "{name}[{params}]".format( + name=cls.__name__, + params=', '.join(repr(p) for p in params) + ) + new_class = type(class_name, (cls,), { + '__type_params__': params, + '__origin__': cls, + '__args__': params + }) + return new_class + + class Generic(object): + __metaclass__ = GenericMeta +else: + # Fallback for older Python versions + class GenericMeta(type): + def __getitem__(cls, params): + if not isinstance(params, tuple): + params = (params,) + return cls + + class Generic(object): + __metaclass__ = GenericMeta + +# ===== Type Checking Utilities ===== +def _is_compatible_type(value, expected_type): + """Check if value is compatible with expected type, with special handling for generic types.""" + if expected_type is Any: + return True + + # Handle None/Optional cases + if value is None: + if hasattr(expected_type, 'types') and type(None) in expected_type.types: + return True + return expected_type is type(None) + + # Handle Union types + if isinstance(expected_type, UnionAlias): + return any(_is_compatible_type(value, t) for t in expected_type.types) + + # Handle generic aliases with special compatibility rules + if hasattr(expected_type, '__instancecheck__'): + try: + return expected_type.__instancecheck__(value) + except (TypeError, AttributeError): + pass + + # Handle regular type checking + try: + return isinstance(value, expected_type) + except (TypeError, AttributeError): + # Fallback for complex types + return True + +# ===== Type Decorators ===== +def enforce_types(func): + """Decorator to enforce type checking at runtime.""" + if not PY35_PLUS: + # Skip type checking for very old Python + return func + + try: + sig = inspect.signature(func) + except (ValueError, TypeError): + # Fallback if signature inspection fails + return func + + def wrapper(*args, **kwargs): + try: + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + except TypeError: + # If binding fails, just call the original function + return func(*args, **kwargs) + + # Check argument types + annotations = getattr(func, '__annotations__', {}) + for name, value in bound.arguments.items(): + expected = annotations.get(name) + if expected and expected != inspect.Parameter.empty: + if not _is_compatible_type(value, expected): + error_msg = "Argument '{name}' must be {expected}, got {actual}".format( + name=name, expected=expected, actual=_get_type_name(type(value)) + ) + raise TypeError(error_msg) + + # Call original function + result = func(*args, **kwargs) + + # Check return type + return_annotation = annotations.get('return') + if return_annotation and return_annotation != inspect.Parameter.empty: + if not _is_compatible_type(result, return_annotation): + error_msg = "Return value must be {expected}, got {actual}".format( + expected=return_annotation, actual=_get_type_name(type(result)) + ) + raise TypeError(error_msg) + + return result + + wrapper.__wrapped__ = func + wrapper.__annotations__ = getattr(func, '__annotations__', {}) + return wrapper + +def enforce_all_types(cls): + """Class decorator to enforce type checking on all methods.""" + if not PY35_PLUS: + return cls + + for name in dir(cls): + try: + method = getattr(cls, name) + if callable(method) and hasattr(method, '__annotations__'): + setattr(cls, name, enforce_types(method)) + except (AttributeError, TypeError): + continue + return cls + +# ===== Utility Functions ===== +def cast(typ, val): + """Cast a value to the given type.""" + return val + +def overload(func): + """Decorator for overloaded functions.""" + if not hasattr(func, '_overloaded_signatures'): + func._overloaded_signatures = [] + return func + +def get_type_hints(obj, globalns=None, localns=None): + """Get type hints for an object.""" + if hasattr(obj, '__annotations__'): + return dict(obj.__annotations__) + return {} + +def get_origin(tp): + """Get the unsubscripted version of a type.""" + return getattr(tp, '__origin__', None) + +def get_args(tp): + """Get type arguments with all substitutions performed.""" + return getattr(tp, '__args__', ()) + +# ===== NewType ===== +def NewType(name, tp): + def new_type(x): + return x + + new_type.__name__ = name + new_type.__qualname__ = name + try: + new_type.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__') + except (AttributeError, ValueError): + new_type.__module__ = '__main__' + new_type.__supertype__ = tp + + return new_type + +# ===== TypedDict ===== +def TypedDict(typename, fields, **kwargs): + """Create a typed dictionary class.""" + total = kwargs.get('total', True) + + if isinstance(fields, dict): + annotations = fields + else: + # Support list of (name, type) tuples + annotations = dict(fields) + + namespace = { + '__annotations__': annotations, + '__total__': total, + '__required_keys__': frozenset(annotations.keys()) if total else frozenset(), + '__optional_keys__': frozenset() if total else frozenset(annotations.keys()), + } + + return type(typename, (dict,), namespace) + +# ===== NamedTuple ===== +def NamedTuple(typename, fields): + """Create a named tuple with type annotations.""" + try: + from collections import namedtuple + except ImportError: + # Very old Python fallback + def namedtuple(name, fields): + return type(name, (tuple,), {}) + + if isinstance(fields, dict): + field_names = list(fields.keys()) + field_types = list(fields.values()) + else: + field_names = [name for name, _ in fields] + field_types = [tp for _, tp in fields] + + base_class = namedtuple(typename, field_names) + + # Add type annotations + base_class.__annotations__ = dict(zip(field_names, field_types)) + + return base_class + +if PY39_PLUS: + # ===== Generic ===== + class GenericMeta(type): + def __getitem__(cls, params): + if not isinstance(params, tuple): + params = (params,) + + # Create new class with type parameters + class_name = f"{cls.__name__}[{', '.join(repr(p) for p in params)}]" + new_class = type(class_name, (cls,), { + '__type_params__': params, + '__origin__': cls, + '__args__': params + }) + return new_class + + def __instancecheck__(cls, instance): + # Check if instance is of the generic type or its parameterized versions + return isinstance(instance, cls.__bases__[0] if cls.__bases__ else object) + + class Generic(metaclass=GenericMeta): + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + # Store type parameters for runtime introspection + if hasattr(cls, '__orig_bases__'): + cls.__type_params__ = getattr(cls.__orig_bases__[0], '__args__', ()) + + class AsyncIterable: + def __class_getitem__(cls, param): + return AsyncIterableAlias(param) + + class AsyncIterableAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return f"AsyncIterable[{self.param}]" + + def __instancecheck__(self, instance): + return hasattr(instance, '__aiter__') + + class AsyncIterator: + def __class_getitem__(cls, param): + return AsyncIteratorAlias(param) + + class AsyncIteratorAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return f"AsyncIterator[{self.param}]" + + def __instancecheck__(self, instance): + return hasattr(instance, '__aiter__') and hasattr(instance, '__anext__') + + class AsyncGenerator: + def __class_getitem__(cls, params): + if not isinstance(params, tuple): + params = (params, type(None)) + return AsyncGeneratorAlias(*params) + + class AsyncGeneratorAlias: + def __init__(self, yield_type, send_type=None): + self.yield_type = yield_type + self.send_type = send_type or type(None) + + def __repr__(self): + return f"AsyncGenerator[{self.yield_type}, {self.send_type}]" + + def __instancecheck__(self, instance): + import types + return isinstance(instance, types.AsyncGeneratorType) + + class ContextManager: + def __class_getitem__(cls, param): + return ContextManagerAlias(param) + + class ContextManagerAlias: + def __init__(self, param): + self.param = param + + def __repr__(self): + return f"ContextManager[{self.param}]" + + def __instancecheck__(self, instance): + return hasattr(instance, '__enter__') and hasattr(instance, '__exit__') + + class Generator: + def __class_getitem__(cls, params): + if not isinstance(params, tuple): + params = (params, type(None), type(None)) + elif len(params) == 1: + params = (params[0], type(None), type(None)) + return GeneratorAlias(*params) + + class GeneratorAlias: + def __init__(self, yield_type, send_type=None, return_type=None): + self.yield_type = yield_type + self.send_type = send_type or type(None) + self.return_type = return_type or type(None) + + def __repr__(self): + return f"Generator[{self.yield_type}, {self.send_type}, {self.return_type}]" + + def __instancecheck__(self, instance): + import types + return isinstance(instance, types.GeneratorType) + + # ===== Protocol ===== + class ProtocolMeta(type): + def __instancecheck__(cls, instance): + # Simple structural typing check + required_methods = [name for name, value in cls.__dict__.items() + if callable(value) and not name.startswith('_')] + return all(hasattr(instance, method) for method in required_methods) + + class Protocol(metaclass=ProtocolMeta): + pass + +try: + from collections import OrderedDict +except ImportError: + # Fallback minimal OrderedDict untuk Python < 2.7 + class OrderedDict(dict): + """Dictionary that remembers insertion order""" + def __init__(self, *args, **kwargs): + super(OrderedDict, self).__init__(*args, **kwargs) + self._keys = [] + for key in self: + self._keys.append(key) + + def __setitem__(self, key, value): + if key not in self: + self._keys.append(key) + super(OrderedDict, self).__setitem__(key, value) + + def __delitem__(self, key): + super(OrderedDict, self).__delitem__(key) + self._keys.remove(key) + + def keys(self): + return self._keys[:] + + def items(self): + return [(k, self[k]) for k in self._keys] + + def values(self): + return [self[k] for k in self._keys] + + def iteritems(self): + for k in self._keys: + yield (k, self[k]) + + def itervalues(self): + for k in self._keys: + yield self[k] + + def iterkeys(self): + for k in self._keys: + yield k + + def __iter__(self): + return iter(self._keys) + + def clear(self): + super(OrderedDict, self).clear() + self._keys = [] + + def popitem(self, last=True): + if not self._keys: + raise KeyError('dictionary is empty') + key = self._keys.pop(-1 if last else 0) + value = self.pop(key) + return key, value + + def update(self, *args, **kwargs): + for k, v in dict(*args, **kwargs).items(): + self[k] = v + + def copy(self): + return OrderedDict(self.items()) + + def __repr__(self): + items = ", ".join(f"{k!r}: {v!r}" for k, v in self.items()) + return f"OrderedDict({{{items}}})" + +# ===== Example Usage ===== +if __name__ == "__main__": + try: + # Test basic types + test_dict = {} # Should be compatible with any Dict type + test_list = [] # Should be compatible with any List type + + # Test Union + union_type = Union[int, float] + print("Union test:", isinstance("42", union_type)) + + # Test Optional + optional_type = Optional[str] + print("Optional test:", isinstance(None, optional_type)) + + # Test Literal + literal_type = Literal["active", "inactive"] + print("Literal test:", isinstance("active", literal_type)) + + print("All basic tests passed!") + + except Exception as e: + print("Error during testing: {}".format(e)) \ No newline at end of file From 4ad073e07632695d5368adc095892d0190d30761 Mon Sep 17 00:00:00 2001 From: LcferShell <71859305+LcfherShell@users.noreply.github.com> Date: Tue, 3 Jun 2025 17:27:13 +0700 Subject: [PATCH 2/2] Add files via upload --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 pyproject.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..00666c1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" \ No newline at end of file