본문 바로가기

여러가지/테스트

[프로젝트] 접근 제어 - 전체 차단 / 대상 서버 허용 방식

● 환경

(OS) Rocky Linux release 8.10 (Green Obsidian)

(서버 1) 192.168.112.222, 로컬 서버 - 사용자

(서버 2) 192.168.112.218, 중앙 서버 - 접근 제어

 

[참고] 중앙 서버 방화벽 기본 세팅

URL : https://uyijune15.tistory.com/234

 

● 파일

(b) 중앙 서버 (프록시)

(파일) app.py

-----------------------------------------------------------------------------------------------------------------------------------------------------------------

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
import subprocess

app = Flask(__name__)
app.secret_key = 'supersecretkey'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///iptables_rules.db'
db = SQLAlchemy(app)

class Rule(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    ip = db.Column(db.String(15), nullable=False)
    port = db.Column(db.String(30), nullable=False)

with app.app_context():
    db.create_all()

 

# 특정 IP 방화벽 정책 추가
def allow_iptables_rule(ip, port):
    try:
        command1 = f"sudo iptables -I INPUT -s {ip} -d 192.168.112.218 -j ACCEPT"
        command2 = f"sudo iptables -I OUTPUT -s 192.168.112.218 -d {ip} -p tcp --dport {port} -j ACCEPT"

        # - subprocess.check_call() : 시스템 셸을 통해 명령어 실행
        subprocess.check_call(command1, shell=True)
        subprocess.check_call(command2, shell=True)

        # Save to database

        # - new_rule : 새로운 인스턴스 생성

        new_rule = Rule(ip=ip, port=port)

        # - session.add() : 인스턴스를 데이터베이스 세션에 추가

        # - session.commit() : 세션에 추가된 모든 변경사항을 데이터베이스에 영구적으로 반영        

        db.session.add(new_rule)
        db.session.commit()

        # 성공 메시지 반환
        return f"Rules added for IP/PORT: {ip}/{port}"

    # 실패 메시지 반환
    except subprocess.CalledProcessError as e:
        return f"Failed to add rules: {e}"

 

# 특정 IP 방화벽 정책 삭제
def delete_iptables_rule(ip, port):
    try:
        command1 = f"sudo iptables -D INPUT -s {ip} -d 192.168.112.218 -j ACCEPT"
        command2 = f"sudo iptables -D OUTPUT -s 192.168.112.218 -d {ip} -p tcp --dport {port} -j ACCEPT"
        subprocess.check_call(command1, shell=True)
        subprocess.check_call(command2, shell=True)

        # - Rule.query.filter_by() : 데이터베이스 필터링

        # - .first() : 첫번째 값

        # - rule : 인스턴스

        rule = Rule.query.filter_by(ip=ip, port=port).first()
        if rule:

            # db.session.delete() : 인스턴스를 데이터베이스 세션에서 삭제

            # - session.commit() : 세션에 추가된 모든 변경사항을 데이터베이스에 영구적으로 반영
            db.session.delete(rule)
            db.session.commit()

        # 성공 메시지 반환  
        return f"Rules deleted for IP/PORT: {ip}/{port}"

    # 실패 메시지 반환
    except subprocess.CalledProcessError as e:
        return f"Failed to delete rules: {e}"

 

@app.route('/')
def home():
    return render_template('home.html')

@app.route('/list')
def rule_list():
    rules = Rule.query.all()
    return render_template('rule_list.html', rules=rules)

@app.route('/add', methods=['POST'])
def add_rule():
    ip = request.form['ip']
    port = request.form['port']
    if Rule.query.filter_by(ip=ip, port=port).first():
        message = f"Rule already exists for IP/PORT: {ip}/{port}"
    else:
        message = allow_iptables_rule(ip, port)
    flash(message)
    return redirect(url_for('home'))

@app.route('/delete', methods=['POST'])
def delete_rule():
    rule_id = request.form['rule_id']
    rule = Rule.query.get(rule_id)
    message = delete_iptables_rule(rule.ip, rule.port)
    flash(message)
    return redirect(url_for('rule_list'))

 

# 어플리케이션 실행

# - if __name__ == '__main__': 해당 스크립트가 직접 실행도리 때만 아래의 코드 실행

# - run() :  어플리케이션을 로컬 개발 서버에서 실행

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=2024)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------