● 환경
(OS) Rocky Linux release 8.10 (Green Obsidian)
(서버 1) 192.168.112.222, 로컬 서버 - 사용자
(서버 2) 192.168.112.218, 중앙 서버 - 접근 제어
[참고] 중앙 서버 방화벽 기본 세팅
URL : https://uyijune15.tistory.com/234
● 파일
(b) 중앙 서버 (프록시)
(파일) app.py
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
import subprocess
app = Flask(__name__)
app.secret_key = 'supersecretkey'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///iptables_rules.db'
db = SQLAlchemy(app)
class Rule(db.Model):
id = db.Column(db.Integer, primary_key=True)
ip = db.Column(db.String(15), nullable=False)
port = db.Column(db.String(30), nullable=False)
with app.app_context():
db.create_all()
# 특정 IP 방화벽 정책 추가
def allow_iptables_rule(ip, port):
try:
command1 = f"sudo iptables -I INPUT -s {ip} -d 192.168.112.218 -j ACCEPT"
command2 = f"sudo iptables -I OUTPUT -s 192.168.112.218 -d {ip} -p tcp --dport {port} -j ACCEPT"
# - subprocess.check_call() : 시스템 셸을 통해 명령어 실행
subprocess.check_call(command1, shell=True)
subprocess.check_call(command2, shell=True)
# Save to database
# - new_rule : 새로운 인스턴스 생성
new_rule = Rule(ip=ip, port=port)
# - session.add() : 인스턴스를 데이터베이스 세션에 추가
# - session.commit() : 세션에 추가된 모든 변경사항을 데이터베이스에 영구적으로 반영
db.session.add(new_rule)
db.session.commit()
# 성공 메시지 반환
return f"Rules added for IP/PORT: {ip}/{port}"
# 실패 메시지 반환
except subprocess.CalledProcessError as e:
return f"Failed to add rules: {e}"
# 특정 IP 방화벽 정책 삭제
def delete_iptables_rule(ip, port):
try:
command1 = f"sudo iptables -D INPUT -s {ip} -d 192.168.112.218 -j ACCEPT"
command2 = f"sudo iptables -D OUTPUT -s 192.168.112.218 -d {ip} -p tcp --dport {port} -j ACCEPT"
subprocess.check_call(command1, shell=True)
subprocess.check_call(command2, shell=True)
# - Rule.query.filter_by() : 데이터베이스 필터링
# - .first() : 첫번째 값
# - rule : 인스턴스
rule = Rule.query.filter_by(ip=ip, port=port).first()
if rule:
# db.session.delete() : 인스턴스를 데이터베이스 세션에서 삭제
# - session.commit() : 세션에 추가된 모든 변경사항을 데이터베이스에 영구적으로 반영
db.session.delete(rule)
db.session.commit()
# 성공 메시지 반환
return f"Rules deleted for IP/PORT: {ip}/{port}"
# 실패 메시지 반환
except subprocess.CalledProcessError as e:
return f"Failed to delete rules: {e}"
@app.route('/')
def home():
return render_template('home.html')
@app.route('/list')
def rule_list():
rules = Rule.query.all()
return render_template('rule_list.html', rules=rules)
@app.route('/add', methods=['POST'])
def add_rule():
ip = request.form['ip']
port = request.form['port']
if Rule.query.filter_by(ip=ip, port=port).first():
message = f"Rule already exists for IP/PORT: {ip}/{port}"
else:
message = allow_iptables_rule(ip, port)
flash(message)
return redirect(url_for('home'))
@app.route('/delete', methods=['POST'])
def delete_rule():
rule_id = request.form['rule_id']
rule = Rule.query.get(rule_id)
message = delete_iptables_rule(rule.ip, rule.port)
flash(message)
return redirect(url_for('rule_list'))
# 어플리케이션 실행
# - if __name__ == '__main__': 해당 스크립트가 직접 실행도리 때만 아래의 코드 실행
# - run() : 어플리케이션을 로컬 개발 서버에서 실행
if __name__ == '__main__':
app.run(host='0.0.0.0', port=2024)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
'여러가지 > 테스트' 카테고리의 다른 글
[참고] TUI - dialog (0) | 2024.06.21 |
---|---|
[참고] 서버 로그 기록 (0) | 2024.06.18 |
[테스트] 프록시 서버 방화벽 및 ssh (0) | 2024.06.18 |
[프로젝트] 접근 제어 - 리스트 분리, 포트 추가 및 css 파일 적용 (0) | 2024.06.17 |
[프로젝트] 접근 제어 - 웹 브라우저 (0) | 2024.06.17 |