Module pycollector.user
Expand source code Browse git
import os
import boto3
import botocore
from botocore.config import Config
import requests
from datetime import datetime, timedelta
import decimal
import pandas as pd
import numpy as np
import json
# import logging
import pycollector
import getpass
from pycollector.globals import GLOBALS # print
from pycollector.util import is_email_address
class User(object):
"""User class for collector's user management"""
def __init__(self, username=None, password=None):
"""
Args:
username ([str]): username of pycollector.
password ([str], optional): password for the user. Defaults to None.
"""
# TODO - Apply paramter store
# self.app_client_id = GLOBALS["COGNITO"]["app_client_id"]
# self.identity_pool_id = GLOBALS["COGNITO"]["identity_pool_id"]
# self.provider_name = GLOBALS["COGNITO"]["provider_name"]
# self.region_name = GLOBALS["COGNITO"]["region_name"]
# Initialize the base user properties and create the logger used by the function
self._is_login = False
# self._token_initialized_time = None
self._token_expiration_time = None
self._program_name = None
# # Initialize cognito clients
# # Ensure the system AWS credentials are not being used
# config = Config(signature_version=botocore.UNSIGNED)
# self._cognito_idp_client = boto3.client("cognito-idp", config=config, region_name=self.region_name)
# self._cognito_id_client = boto3.client("cognito-identity", config=config, region_name=self.region_name)
# Login
if username is None and "VISYM_COLLECTOR_EMAIL" in os.environ:
username = os.environ["VISYM_COLLECTOR_EMAIL"]
# Set user properties
self._username = username
self._password = password
if password is not None:
self.login(password)
self.region_name = 'us-east-1'
self.refresh()
def refresh(self):
if "VIPY_AWS_SESSION_TOKEN" in os.environ:
self._set_S3_clients()
self._set_lambda_clients()
self._is_login = True
self._cognito_username = os.environ["VIPY_AWS_COGNITO_USERNAME"]
return self
def login(self, password=None):
"""[summary]
Args:
username ([str]): username of pycollector.
password ([str], optional): password for the user. Defaults to None.
"""
username = self._username if self._username is not None else input("Collector email: ")
assert is_email_address(username), 'Invalid collector email address "%s"' % username
password = password if password is not None else getpass.getpass()
try:
# Set up API gateway request for login
request_body = {"username": username, "password": password}
self._aws_credentials = requests.post(
GLOBALS["API_GATEWAY_HTTP"]["pycollector_login"],
data=json.dumps(request_body),
headers={"Content-type": "application/json", "Accept": "text/plain"},
).json()
self._cognito_username = self._aws_credentials["cognito_username"]
self._token_expiration_time = datetime.now() + timedelta(0, self._aws_credentials["token_expires_in_secs"])
self.region_name = self._aws_credentials["region_name"]
# Set up AWS services
self._set_os_environ()
self._set_parameter_store()
self._set_S3_clients()
self._set_lambda_clients()
self._is_login = True
except Exception as e:
raise
custom_error = "Failed to sign in due to exception: {0}".format(e)
raise Exception(custom_error)
return self
def _set_parameter_store(self):
"""[summary]"""
self._ssm_client = boto3.client(
"ssm",
aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"],
aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"],
region_name=os.environ["VIPY_AWS_REGION"],
)
def get_ssm_param(self, param_name: str = None, WithDecryption: bool = False) -> str:
"""[summary]"""
self._set_parameter_store()
if self.is_token_expired():
self.login()
return self._ssm_client.get_parameter(Name=param_name, WithDecryption=WithDecryption).get("Parameter").get("Value")
def _set_S3_clients(self):
"""[summary]"""
assert "VIPY_AWS_SESSION_TOKEN" in os.environ
self._s3_client = boto3.client(
"s3",
aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"],
aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"],
region_name=os.environ["VIPY_AWS_REGION"],
)
self._s3_resource = boto3.resource(
"s3",
aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"],
aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"],
region_name=os.environ["VIPY_AWS_REGION"],
)
def _set_lambda_clients(self):
"""[summary]"""
assert "VIPY_AWS_SESSION_TOKEN" in os.environ
self._lambda_client = boto3.client(
"lambda",
aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"],
aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"],
region_name=os.environ["VIPY_AWS_REGION"],
)
def _set_os_environ(self):
"""[summary]"""
os.environ["VIPY_AWS_ACCESS_KEY_ID"] = self._aws_credentials["access_key_id"]
os.environ["VIPY_AWS_SECRET_ACCESS_KEY"] = self._aws_credentials["secret_key"]
os.environ["VIPY_AWS_SESSION_TOKEN"] = self._aws_credentials["session_token"]
os.environ["VIPY_AWS_SESSION_TOKEN_EXPIRATION"] = str((datetime.now() + timedelta(0, self._aws_credentials["token_expires_in_secs"])).strftime("%Y-%m-%dT%H:%M:%S"))
os.environ["VIPY_AWS_COGNITO_USERNAME"] = self._cognito_username
os.environ["VIPY_AWS_REGION"] = self._aws_credentials["region_name"]
def is_token_expired(self):
"""[summary]
Returns:
[type]: [description]
"""
return "VIPY_AWS_SESSION_TOKEN_EXPIRATION" in os.environ and datetime.now() > pycollector.util.fromclockstamp(os.environ["VIPY_AWS_SESSION_TOKEN_EXPIRATION"])
def token_expired_by(self):
return self._token_expiration_time
def is_authenticated(self):
return self._is_login
def add_user_to_group(self):
"""Check if the current user is already in the pycollector user group, if not add the user to group
Returns:
[type]: [description]
"""
@property
def username(self):
""""""
return self._username
@property
def cognito_username(self):
""""""
return self._cognito_username
@property
def lambda_client(self):
""""""
return self._lambda_client
Classes
class User (username=None, password=None)-
User class for collector's user management
Args
username:[str]- username of pycollector.
password:[str], optional- password for the user. Defaults to None.
Expand source code Browse git
class User(object): """User class for collector's user management""" def __init__(self, username=None, password=None): """ Args: username ([str]): username of pycollector. password ([str], optional): password for the user. Defaults to None. """ # TODO - Apply paramter store # self.app_client_id = GLOBALS["COGNITO"]["app_client_id"] # self.identity_pool_id = GLOBALS["COGNITO"]["identity_pool_id"] # self.provider_name = GLOBALS["COGNITO"]["provider_name"] # self.region_name = GLOBALS["COGNITO"]["region_name"] # Initialize the base user properties and create the logger used by the function self._is_login = False # self._token_initialized_time = None self._token_expiration_time = None self._program_name = None # # Initialize cognito clients # # Ensure the system AWS credentials are not being used # config = Config(signature_version=botocore.UNSIGNED) # self._cognito_idp_client = boto3.client("cognito-idp", config=config, region_name=self.region_name) # self._cognito_id_client = boto3.client("cognito-identity", config=config, region_name=self.region_name) # Login if username is None and "VISYM_COLLECTOR_EMAIL" in os.environ: username = os.environ["VISYM_COLLECTOR_EMAIL"] # Set user properties self._username = username self._password = password if password is not None: self.login(password) self.region_name = 'us-east-1' self.refresh() def refresh(self): if "VIPY_AWS_SESSION_TOKEN" in os.environ: self._set_S3_clients() self._set_lambda_clients() self._is_login = True self._cognito_username = os.environ["VIPY_AWS_COGNITO_USERNAME"] return self def login(self, password=None): """[summary] Args: username ([str]): username of pycollector. password ([str], optional): password for the user. Defaults to None. """ username = self._username if self._username is not None else input("Collector email: ") assert is_email_address(username), 'Invalid collector email address "%s"' % username password = password if password is not None else getpass.getpass() try: # Set up API gateway request for login request_body = {"username": username, "password": password} self._aws_credentials = requests.post( GLOBALS["API_GATEWAY_HTTP"]["pycollector_login"], data=json.dumps(request_body), headers={"Content-type": "application/json", "Accept": "text/plain"}, ).json() self._cognito_username = self._aws_credentials["cognito_username"] self._token_expiration_time = datetime.now() + timedelta(0, self._aws_credentials["token_expires_in_secs"]) self.region_name = self._aws_credentials["region_name"] # Set up AWS services self._set_os_environ() self._set_parameter_store() self._set_S3_clients() self._set_lambda_clients() self._is_login = True except Exception as e: raise custom_error = "Failed to sign in due to exception: {0}".format(e) raise Exception(custom_error) return self def _set_parameter_store(self): """[summary]""" self._ssm_client = boto3.client( "ssm", aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"], aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"], region_name=os.environ["VIPY_AWS_REGION"], ) def get_ssm_param(self, param_name: str = None, WithDecryption: bool = False) -> str: """[summary]""" self._set_parameter_store() if self.is_token_expired(): self.login() return self._ssm_client.get_parameter(Name=param_name, WithDecryption=WithDecryption).get("Parameter").get("Value") def _set_S3_clients(self): """[summary]""" assert "VIPY_AWS_SESSION_TOKEN" in os.environ self._s3_client = boto3.client( "s3", aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"], aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"], region_name=os.environ["VIPY_AWS_REGION"], ) self._s3_resource = boto3.resource( "s3", aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"], aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"], region_name=os.environ["VIPY_AWS_REGION"], ) def _set_lambda_clients(self): """[summary]""" assert "VIPY_AWS_SESSION_TOKEN" in os.environ self._lambda_client = boto3.client( "lambda", aws_access_key_id=os.environ["VIPY_AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["VIPY_AWS_SECRET_ACCESS_KEY"], aws_session_token=os.environ["VIPY_AWS_SESSION_TOKEN"], region_name=os.environ["VIPY_AWS_REGION"], ) def _set_os_environ(self): """[summary]""" os.environ["VIPY_AWS_ACCESS_KEY_ID"] = self._aws_credentials["access_key_id"] os.environ["VIPY_AWS_SECRET_ACCESS_KEY"] = self._aws_credentials["secret_key"] os.environ["VIPY_AWS_SESSION_TOKEN"] = self._aws_credentials["session_token"] os.environ["VIPY_AWS_SESSION_TOKEN_EXPIRATION"] = str((datetime.now() + timedelta(0, self._aws_credentials["token_expires_in_secs"])).strftime("%Y-%m-%dT%H:%M:%S")) os.environ["VIPY_AWS_COGNITO_USERNAME"] = self._cognito_username os.environ["VIPY_AWS_REGION"] = self._aws_credentials["region_name"] def is_token_expired(self): """[summary] Returns: [type]: [description] """ return "VIPY_AWS_SESSION_TOKEN_EXPIRATION" in os.environ and datetime.now() > pycollector.util.fromclockstamp(os.environ["VIPY_AWS_SESSION_TOKEN_EXPIRATION"]) def token_expired_by(self): return self._token_expiration_time def is_authenticated(self): return self._is_login def add_user_to_group(self): """Check if the current user is already in the pycollector user group, if not add the user to group Returns: [type]: [description] """ @property def username(self): """""" return self._username @property def cognito_username(self): """""" return self._cognito_username @property def lambda_client(self): """""" return self._lambda_clientSubclasses
Instance variables
var cognito_username-
Expand source code Browse git
@property def cognito_username(self): """""" return self._cognito_username var lambda_client-
Expand source code Browse git
@property def lambda_client(self): """""" return self._lambda_client var username-
Expand source code Browse git
@property def username(self): """""" return self._username
Methods
def add_user_to_group(self)-
Check if the current user is already in the pycollector user group, if not add the user to group
Returns
[type]- [description]
Expand source code Browse git
def add_user_to_group(self): """Check if the current user is already in the pycollector user group, if not add the user to group Returns: [type]: [description] """ def get_ssm_param(self, param_name: str = None, WithDecryption: bool = False) ‑> str-
[summary]
Expand source code Browse git
def get_ssm_param(self, param_name: str = None, WithDecryption: bool = False) -> str: """[summary]""" self._set_parameter_store() if self.is_token_expired(): self.login() return self._ssm_client.get_parameter(Name=param_name, WithDecryption=WithDecryption).get("Parameter").get("Value") def is_authenticated(self)-
Expand source code Browse git
def is_authenticated(self): return self._is_login def is_token_expired(self)-
[summary]
Returns
[type]- [description]
Expand source code Browse git
def is_token_expired(self): """[summary] Returns: [type]: [description] """ return "VIPY_AWS_SESSION_TOKEN_EXPIRATION" in os.environ and datetime.now() > pycollector.util.fromclockstamp(os.environ["VIPY_AWS_SESSION_TOKEN_EXPIRATION"]) def login(self, password=None)-
[summary]
Args
username:[str]- username of pycollector.
password:[str], optional- password for the user. Defaults to None.
Expand source code Browse git
def login(self, password=None): """[summary] Args: username ([str]): username of pycollector. password ([str], optional): password for the user. Defaults to None. """ username = self._username if self._username is not None else input("Collector email: ") assert is_email_address(username), 'Invalid collector email address "%s"' % username password = password if password is not None else getpass.getpass() try: # Set up API gateway request for login request_body = {"username": username, "password": password} self._aws_credentials = requests.post( GLOBALS["API_GATEWAY_HTTP"]["pycollector_login"], data=json.dumps(request_body), headers={"Content-type": "application/json", "Accept": "text/plain"}, ).json() self._cognito_username = self._aws_credentials["cognito_username"] self._token_expiration_time = datetime.now() + timedelta(0, self._aws_credentials["token_expires_in_secs"]) self.region_name = self._aws_credentials["region_name"] # Set up AWS services self._set_os_environ() self._set_parameter_store() self._set_S3_clients() self._set_lambda_clients() self._is_login = True except Exception as e: raise custom_error = "Failed to sign in due to exception: {0}".format(e) raise Exception(custom_error) return self def refresh(self)-
Expand source code Browse git
def refresh(self): if "VIPY_AWS_SESSION_TOKEN" in os.environ: self._set_S3_clients() self._set_lambda_clients() self._is_login = True self._cognito_username = os.environ["VIPY_AWS_COGNITO_USERNAME"] return self def token_expired_by(self)-
Expand source code Browse git
def token_expired_by(self): return self._token_expiration_time