Python Requests & BeautifulSoup 튜토리얼: 웹 스크래핑 완벽 가이드
웹 스크래핑 소개
웹 스크래핑(Web Scraping)은 웹사이트에서 데이터를 자동으로 추출하는 기술입니다. Python의 Requests와 BeautifulSoup 라이브러리는 웹 스크래핑을 위한 가장 강력하고 사용하기 쉬운 도구입니다. Requests는 HTTP 요청을 간편하게 보낼 수 있게 해주며, BeautifulSoup는 HTML과 XML 문서를 파싱하고 원하는 데이터를 추출하는 데 사용됩니다.
설치 방법
Requests와 BeautifulSoup를 설치하려면 pip를 사용합니다:
pip install requests beautifulsoup4 lxml
추가로 유용한 라이브러리들:
pip install pandas selenium fake-useragent
1. Requests 기초: HTTP 요청 보내기
기본적인 GET 요청
Requests 라이브러리를 사용하면 웹 페이지의 HTML을 쉽게 가져올 수 있습니다. GET 요청은 가장 기본적인 HTTP 메서드입니다.
import requests
from bs4 import BeautifulSoup
import time
import json
# 기본 GET 요청
url = 'https://example.com'
response = requests.get(url)
# 응답 상태 확인
print(f"상태 코드: {response.status_code}")
print(f"응답 헤더: {response.headers['content-type']}")
print(f"인코딩: {response.encoding}")
print(f"텍스트 길이: {len(response.text)} 문자")
# HTML 내용 일부 출력
print("\nHTML 처음 500자:")
print(response.text[:500])
# 상태 코드 체크
if response.status_code == 200:
print("\n✅ 요청 성공!")
else:
print(f"\n❌ 요청 실패: {response.status_code}")
다양한 HTTP 메서드
GET 외에도 POST, PUT, DELETE 등 다양한 HTTP 메서드를 사용할 수 있습니다.
# POST 요청 예제
post_url = 'https://httpbin.org/post'
data = {
'username': 'testuser',
'email': 'test@example.com',
'age': 25
}
# POST 요청 보내기
post_response = requests.post(post_url, data=data)
print("POST 응답:")
print(json.dumps(post_response.json(), indent=2, ensure_ascii=False))
# JSON 데이터로 POST 요청
json_data = {
'products': [
{'id': 1, 'name': '노트북', 'price': 1500000},
{'id': 2, 'name': '마우스', 'price': 30000}
]
}
json_response = requests.post(post_url, json=json_data)
print("\nJSON POST 응답:")
print(json.dumps(json_response.json()['json'], indent=2, ensure_ascii=False))
# 파라미터가 있는 GET 요청
params_url = 'https://httpbin.org/get'
params = {
'search': 'python',
'category': 'programming',
'page': 1
}
params_response = requests.get(params_url, params=params)
print("\n파라미터가 있는 GET 요청:")
print(f"최종 URL: {params_response.url}")
print(f"전송된 파라미터: {params_response.json()['args']}")
2. BeautifulSoup 기초: HTML 파싱
BeautifulSoup는 HTML과 XML 문서를 파싱하여 파이썬 객체로 변환합니다. 이를 통해 웹 페이지의 구조를 쉽게 탐색하고 원하는 데이터를 추출할 수 있습니다.
# BeautifulSoup 객체 생성
html_doc = """
<html>
<head>
<title>샘플 웹페이지</title>
</head>
<body>
<div class="container">
<h1 id="main-title">웹 스크래핑 튜토리얼</h1>
<p class="description">BeautifulSoup를 사용한 HTML 파싱 예제입니다.</p>
<div class="content">
<h2>프로그래밍 언어</h2>
<ul class="languages">
<li data-level="easy">Python</li>
<li data-level="medium">JavaScript</li>
<li data-level="hard">C++</li>
</ul>
</div>
<div class="products">
<div class="product" data-id="1">
<h3>노트북</h3>
<span class="price">1,500,000원</span>
<p class="stock">재고: 10개</p>
</div>
<div class="product" data-id="2">
<h3>마우스</h3>
<span class="price">30,000원</span>
<p class="stock">재고: 50개</p>
</div>
</div>
</div>
</body>
</html>
"""
# BeautifulSoup 객체 생성
soup = BeautifulSoup(html_doc, 'html.parser')
# HTML 구조 예쁘게 출력
print("HTML 구조 (prettify):")
print(soup.prettify()[:500])
# 기본 탐색 메서드
print("\n=== 기본 탐색 ===")
print(f"제목 태그: {soup.title}")
print(f"제목 텍스트: {soup.title.string}")
print(f"제목 태그 이름: {soup.title.name}")
# 첫 번째 태그 찾기
print(f"\n첫 번째 h1 태그: {soup.h1}")
print(f"첫 번째 p 태그: {soup.p}")
print(f"첫 번째 div 태그의 class: {soup.div.get('class')}")
3. CSS 선택자와 태그 찾기
BeautifulSoup는 다양한 방법으로 HTML 요소를 찾을 수 있습니다. find(), find_all(), select() 메서드를 활용하여 원하는 데이터를 정확하게 추출할 수 있습니다.
# find()와 find_all() 메서드
print("=== find() 메서드 ===")
# ID로 찾기
main_title = soup.find('h1', id='main-title')
print(f"메인 타이틀: {main_title.text}")
# 클래스로 찾기
description = soup.find('p', class_='description')
print(f"설명: {description.text}")
# 속성으로 찾기
easy_lang = soup.find('li', attrs={'data-level': 'easy'})
print(f"쉬운 언어: {easy_lang.text}")
print("\n=== find_all() 메서드 ===")
# 모든 li 태그 찾기
all_languages = soup.find_all('li')
print("프로그래밍 언어 목록:")
for lang in all_languages:
level = lang.get('data-level')
print(f" - {lang.text} (난이도: {level})")
# 여러 태그 찾기
headers = soup.find_all(['h1', 'h2', 'h3'])
print("\n모든 헤더 태그:")
for header in headers:
print(f" {header.name}: {header.text}")
# 람다 함수 사용
products_with_id = soup.find_all(lambda tag: tag.name == 'div' and tag.get('data-id'))
print(f"\ndata-id 속성이 있는 div 개수: {len(products_with_id)}")
print("\n=== CSS 선택자 (select) ===")
# CSS 선택자 사용
# ID 선택자
title_css = soup.select_one('#main-title')
print(f"ID 선택자로 찾은 제목: {title_css.text}")
# 클래스 선택자
products = soup.select('.product')
print(f"\n제품 개수: {len(products)}")
# 복합 선택자
product_prices = soup.select('.product .price')
print("제품 가격:")
for price in product_prices:
print(f" {price.text}")
# 속성 선택자
hard_level = soup.select('li[data-level="hard"]')
print(f"\n어려운 언어: {hard_level[0].text if hard_level else '없음'}")
# 자식 선택자
ul_children = soup.select('.languages > li')
print(f"languages 클래스의 직접 자식 li 개수: {len(ul_children)}")
4. 데이터 추출 기법
웹 페이지에서 텍스트, 속성, 링크 등 다양한 형태의 데이터를 추출하는 방법을 알아봅니다.
# 실제 웹사이트 스크래핑 예제
def extract_data_example():
"""다양한 데이터 추출 기법 시연"""
# 샘플 HTML
html = """
<div class="article">
<h2>Python 웹 스크래핑 가이드</h2>
<div class="meta">
<span class="author">작성자: 김개발</span>
<span class="date">2024-01-15</span>
<span class="views">조회수: 1,234</span>
</div>
<div class="content">
<p>웹 스크래핑은 <strong>매우 유용한</strong> 기술입니다.</p>
<p>다양한 <a href="https://example.com/tools">도구들</a>을 활용할 수 있습니다.</p>
<img src="image1.jpg" alt="스크래핑 다이어그램" width="500">
</div>
<div class="tags">
<span class="tag">#Python</span>
<span class="tag">#WebScraping</span>
<span class="tag">#BeautifulSoup</span>
</div>
</div>
"""
soup = BeautifulSoup(html, 'html.parser')
# 1. 텍스트 추출
print("=== 텍스트 추출 ===")
# text vs string vs get_text()
title = soup.find('h2')
print(f"text 속성: {title.text}")
print(f"string 속성: {title.string}")
print(f"get_text() 메서드: {title.get_text()}")
# 중첩된 태그의 텍스트
content = soup.find('div', class_='content')
print(f"\n전체 텍스트: {content.get_text(strip=True)}")
print(f"구분자 포함: {content.get_text(' | ', strip=True)}")
# 2. 속성 추출
print("\n=== 속성 추출 ===")
img = soup.find('img')
if img:
print(f"이미지 src: {img.get('src')}")
print(f"이미지 alt: {img.get('alt')}")
print(f"이미지 width: {img.get('width')}")
print(f"모든 속성: {img.attrs}")
# 3. 링크 추출
print("\n=== 링크 추출 ===")
links = soup.find_all('a')
for link in links:
href = link.get('href')
text = link.text
print(f"링크: {text} -> {href}")
# 4. 정규식을 사용한 추출
print("\n=== 정규식 활용 ===")
import re
# 숫자 추출
views_text = soup.find('span', class_='views').text
views_number = re.findall(r'\d+', views_text.replace(',', ''))
print(f"조회수 (숫자만): {views_number[0] if views_number else '없음'}")
# 날짜 추출
date_text = soup.find('span', class_='date').text
date_pattern = re.findall(r'\d{4}-\d{2}-\d{2}', date_text)
print(f"날짜: {date_pattern[0] if date_pattern else '없음'}")
# 5. 태그 제거
print("\n=== 태그 제거 ===")
p_with_strong = soup.find('p')
print(f"원본: {p_with_strong}")
print(f"텍스트만: {p_with_strong.get_text()}")
# 6. 리스트로 데이터 수집
print("\n=== 리스트로 수집 ===")
tags = soup.select('.tag')
tag_list = [tag.text.strip() for tag in tags]
print(f"태그 목록: {tag_list}")
# 7. 딕셔너리로 구조화
article_data = {
'title': soup.find('h2').text.strip(),
'author': soup.find('span', class_='author').text.replace('작성자: ', ''),
'date': soup.find('span', class_='date').text,
'views': int(re.findall(r'\d+', views_text.replace(',', ''))[0]),
'tags': tag_list
}
print("\n=== 구조화된 데이터 ===")
print(json.dumps(article_data, indent=2, ensure_ascii=False))
# 함수 실행
extract_data_example()
5. 동적 콘텐츠와 세션 처리
많은 현대 웹사이트는 JavaScript로 동적으로 콘텐츠를 로드합니다. 또한 로그인이 필요한 페이지를 스크래핑하려면 세션을 유지해야 합니다.
# 세션 유지하기
def session_example():
"""세션을 사용한 로그인 상태 유지"""
# 세션 생성
session = requests.Session()
# 로그인 정보
login_url = 'https://httpbin.org/cookies/set'
cookies_url = 'https://httpbin.org/cookies'
# 쿠키 설정 (로그인 시뮬레이션)
session.get(f"{login_url}/sessionid/abc123xyz")
session.get(f"{login_url}/username/testuser")
# 쿠키 확인
response = session.get(cookies_url)
print("세션 쿠키:")
print(json.dumps(response.json(), indent=2))
# 세션 헤더 설정
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'ko-KR,ko;q=0.9,en;q=0.8'
})
return session
# AJAX 요청 처리
def handle_ajax_request():
"""AJAX 요청 처리 예제"""
# API 엔드포인트 직접 호출
api_url = 'https://jsonplaceholder.typicode.com/posts'
# AJAX 요청 헤더
headers = {
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json'
}
response = requests.get(api_url, headers=headers)
posts = response.json()
print("AJAX로 가져온 포스트 (처음 3개):")
for post in posts[:3]:
print(f"\nID: {post['id']}")
print(f"제목: {post['title']}")
print(f"내용: {post['body'][:50]}...")
# Selenium을 사용한 동적 콘텐츠 처리 (예제 코드)
def selenium_example_code():
"""Selenium 사용 예제 (실행하려면 드라이버 필요)"""
example_code = '''
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Chrome 드라이버 설정
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 백그라운드 실행
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
try:
# 페이지 로드
driver.get('https://example.com')
# JavaScript 실행 대기
wait = WebDriverWait(driver, 10)
element = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "dynamic-content"))
)
# 페이지 소스 가져오기
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# 데이터 추출
dynamic_data = soup.find_all('div', class_='dynamic-content')
finally:
driver.quit()
'''
print("Selenium 사용 예제 코드:")
print(example_code)
# 실행
print("=== 세션 예제 ===")
session = session_example()
print("\n=== AJAX 요청 예제 ===")
handle_ajax_request()
print("\n=== Selenium 예제 코드 ===")
selenium_example_code()
6. 에러 처리와 예외 상황
웹 스크래핑 중에는 다양한 에러가 발생할 수 있습니다. 안정적인 스크래퍼를 만들기 위해서는 적절한 에러 처리가 필수입니다.
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
import time
from urllib.parse import urljoin, urlparse
def safe_request(url, max_retries=3, timeout=10):
"""안전한 요청 처리 함수"""
for attempt in range(max_retries):
try:
print(f"시도 {attempt + 1}/{max_retries}: {url}")
response = requests.get(url, timeout=timeout)
# 상태 코드 확인
response.raise_for_status()
print(f"✅ 성공: {response.status_code}")
return response
except Timeout:
print(f"⏱️ 타임아웃 발생 (시도 {attempt + 1})")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수 백오프
except ConnectionError:
print(f"🔌 연결 오류 (시도 {attempt + 1})")
if attempt < max_retries - 1:
time.sleep(5)
except requests.HTTPError as e:
print(f"❌ HTTP 에러: {e}")
if response.status_code == 429: # Too Many Requests
print("⚠️ 요청 제한 초과. 대기 중...")
time.sleep(60)
elif response.status_code == 404:
print("📄 페이지를 찾을 수 없습니다.")
return None
else:
break
except RequestException as e:
print(f"🚫 요청 실패: {e}")
if attempt < max_retries - 1:
time.sleep(3)
print("❌ 모든 재시도 실패")
return None
def robust_scraper(url):
"""견고한 스크래퍼 예제"""
# 안전한 요청
response = safe_request(url)
if response is None:
return None
try:
# HTML 파싱
soup = BeautifulSoup(response.content, 'html.parser')
# 데이터 추출 (안전하게)
data = {}
# title 태그가 없을 수도 있음
title_tag = soup.find('title')
data['title'] = title_tag.text.strip() if title_tag else '제목 없음'
# meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
data['description'] = meta_desc.get('content', '') if meta_desc else ''
# 모든 링크 수집 (상대 경로 처리)
links = []
for link in soup.find_all('a', href=True):
href = link['href']
# 상대 경로를 절대 경로로 변환
absolute_url = urljoin(url, href)
links.append(absolute_url)
data['links'] = links[:10] # 처음 10개만
# 이미지 수집
images = []
for img in soup.find_all('img', src=True):
src = img['src']
absolute_src = urljoin(url, src)
images.append({
'src': absolute_src,
'alt': img.get('alt', '')
})
data['images'] = images[:5] # 처음 5개만
return data
except Exception as e:
print(f"파싱 중 에러 발생: {e}")
return None
# 테스트
test_urls = [
'https://httpbin.org/html',
'https://httpbin.org/status/404', # 404 에러
'https://httpbin.org/delay/15', # 타임아웃 테스트
]
for test_url in test_urls:
print(f"\n{'='*50}")
print(f"테스트 URL: {test_url}")
result = robust_scraper(test_url)
if result:
print("결과:")
print(json.dumps(result, indent=2, ensure_ascii=False)[:500] + "...")
7. 헤더와 쿠키 다루기
많은 웹사이트는 적절한 헤더와 쿠키가 없으면 접근을 차단합니다. User-Agent를 설정하고 쿠키를 관리하는 방법을 알아봅니다.
from fake_useragent import UserAgent
import random
def headers_and_cookies_example():
"""헤더와 쿠키 처리 예제"""
# 1. User-Agent 설정
print("=== User-Agent 설정 ===")
# 수동 User-Agent 리스트
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
random_ua = random.choice(user_agents)
print(f"선택된 User-Agent: {random_ua}")
# fake-useragent 라이브러리 사용 (설치 필요)
try:
ua = UserAgent()
print(f"Chrome UA: {ua.chrome}")
print(f"Firefox UA: {ua.firefox}")
print(f"Random UA: {ua.random}")
except:
print("fake-useragent 라이브러리가 설치되지 않았습니다.")
# 2. 전체 헤더 설정
print("\n=== 전체 헤더 설정 ===")
headers = {
'User-Agent': random_ua,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': 'https://www.google.com/'
}
# 헤더 테스트
response = requests.get('https://httpbin.org/headers', headers=headers)
print("전송된 헤더:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
# 3. 쿠키 처리
print("\n=== 쿠키 처리 ===")
# 쿠키 설정
cookies = {
'session_id': 'abc123xyz',
'user_preference': 'dark_mode',
'language': 'ko'
}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
print("전송된 쿠키:")
print(response.json())
# 4. 쿠키 받기 및 저장
print("\n=== 쿠키 받기 ===")
# 쿠키 설정 요청
set_cookie_url = 'https://httpbin.org/cookies/set/test_cookie/test_value'
response = requests.get(set_cookie_url, allow_redirects=False)
print("받은 쿠키:")
for cookie in response.cookies:
print(f" {cookie.name} = {cookie.value}")
# 5. 쿠키 jar 사용
print("\n=== Cookie Jar 사용 ===")
from http.cookiejar import CookieJar
jar = CookieJar()
session = requests.Session()
session.cookies = jar
# 여러 요청에서 쿠키 유지
session.get('https://httpbin.org/cookies/set/persistent/true')
session.get('https://httpbin.org/cookies/set/session_data/important')
# 저장된 쿠키 확인
final_response = session.get('https://httpbin.org/cookies')
print("세션에 저장된 쿠키:")
print(final_response.json())
def browser_simulation():
"""브라우저 동작 시뮬레이션"""
print("\n=== 브라우저 시뮬레이션 ===")
session = requests.Session()
# 브라우저처럼 동작하는 헤더
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'max-age=0',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1'
})
# 첫 페이지 방문 (홈페이지)
print("1. 홈페이지 방문")
home_response = session.get('https://httpbin.org/')
print(f" 상태: {home_response.status_code}")
# Referer 헤더 추가 (이전 페이지에서 왔다고 표시)
session.headers['Referer'] = 'https://httpbin.org/'
# 다음 페이지 방문
print("2. 하위 페이지 방문")
sub_response = session.get('https://httpbin.org/headers')
print(f" 상태: {sub_response.status_code}")
return session
# 실행
headers_and_cookies_example()
browser_session = browser_simulation()
8. 속도 제한과 윤리적 스크래핑
웹 스크래핑을 할 때는 서버에 부담을 주지 않도록 적절한 속도 제한을 두어야 합니다. 또한 robots.txt를 확인하고 준수해야 합니다.
import time
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
import random
class EthicalScraper:
"""윤리적인 웹 스크래퍼 클래스"""
def __init__(self, base_url, delay_range=(1, 3)):
self.base_url = base_url
self.delay_range = delay_range
self.session = requests.Session()
self.robot_parser = None
self.last_request_time = 0
# User-Agent 설정
self.session.headers.update({
'User-Agent': 'EthicalBot 1.0 (Contact: your-email@example.com)'
})
# robots.txt 확인
self._check_robots_txt()
def _check_robots_txt(self):
"""robots.txt 파일 확인"""
robots_url = f"{self.base_url}/robots.txt"
try:
self.robot_parser = RobotFileParser()
self.robot_parser.set_url(robots_url)
self.robot_parser.read()
print(f"✅ robots.txt 확인 완료: {robots_url}")
except:
print("⚠️ robots.txt를 찾을 수 없습니다.")
def can_fetch(self, url):
"""URL 접근 가능 여부 확인"""
if self.robot_parser:
return self.robot_parser.can_fetch('*', url)
return True
def _apply_delay(self):
"""요청 간 지연 적용"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
# 랜덤 지연
min_delay, max_delay = self.delay_range
required_delay = random.uniform(min_delay, max_delay)
if time_since_last < required_delay:
sleep_time = required_delay - time_since_last
print(f"⏱️ {sleep_time:.2f}초 대기 중...")
time.sleep(sleep_time)
self.last_request_time = time.time()
def scrape(self, url):
"""URL 스크래핑"""
# robots.txt 확인
if not self.can_fetch(url):
print(f"🚫 robots.txt에 의해 차단됨: {url}")
return None
# 지연 적용
self._apply_delay()
try:
print(f"📥 스크래핑: {url}")
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response
except requests.RequestException as e:
print(f"❌ 에러: {e}")
return None
def scrape_multiple(self, urls, max_pages=10):
"""여러 URL 스크래핑"""
results = []
for i, url in enumerate(urls[:max_pages]):
print(f"\n진행률: {i+1}/{min(len(urls), max_pages)}")
response = self.scrape(url)
if response:
soup = BeautifulSoup(response.content, 'html.parser')
# 데이터 추출
data = {
'url': url,
'title': soup.title.text if soup.title else '',
'status_code': response.status_code,
'content_length': len(response.content)
}
results.append(data)
return results
def rate_limiter_decorator(calls_per_second=1):
"""속도 제한 데코레이터"""
min_interval = 1.0 / calls_per_second
last_called = [0.0]
def decorator(func):
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
# 사용 예제
@rate_limiter_decorator(calls_per_second=0.5) # 2초에 1번
def fetch_url(url):
"""속도 제한이 적용된 URL 가져오기"""
print(f"Fetching: {url}")
return requests.get(url)
# 테스트
print("=== 윤리적 스크래퍼 테스트 ===")
scraper = EthicalScraper('https://httpbin.org', delay_range=(0.5, 1.5))
test_urls = [
'https://httpbin.org/html',
'https://httpbin.org/json',
'https://httpbin.org/xml'
]
results = scraper.scrape_multiple(test_urls)
print("\n=== 스크래핑 결과 ===")
for result in results:
print(json.dumps(result, indent=2, ensure_ascii=False))
9. 스크래핑 데이터 저장
수집한 데이터를 다양한 형식으로 저장하는 방법을 알아봅니다. CSV, JSON, 데이터베이스 등 여러 저장 방식을 다룹니다.
import csv
import json
import sqlite3
import pandas as pd
from datetime import datetime
class DataStorage:
"""스크래핑 데이터 저장 클래스"""
def __init__(self, base_filename='scraped_data'):
self.base_filename = base_filename
self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
def save_to_json(self, data, pretty=True):
"""JSON 파일로 저장"""
filename = f"{self.base_filename}_{self.timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
if pretty:
json.dump(data, f, ensure_ascii=False, indent=2)
else:
json.dump(data, f, ensure_ascii=False)
print(f"✅ JSON 저장 완료: {filename}")
return filename
def save_to_csv(self, data, headers=None):
"""CSV 파일로 저장"""
filename = f"{self.base_filename}_{self.timestamp}.csv"
if not data:
print("⚠️ 저장할 데이터가 없습니다.")
return
# 헤더 자동 감지
if headers is None and isinstance(data[0], dict):
headers = list(data[0].keys())
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
if isinstance(data[0], dict):
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader()
writer.writerows(data)
else:
writer = csv.writer(f)
if headers:
writer.writerow(headers)
writer.writerows(data)
print(f"✅ CSV 저장 완료: {filename}")
return filename
def save_to_excel(self, data, sheet_name='Sheet1'):
"""Excel 파일로 저장"""
filename = f"{self.base_filename}_{self.timestamp}.xlsx"
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
print(f"✅ Excel 저장 완료: {filename}")
return filename
def save_to_database(self, data, table_name='scraped_data'):
"""SQLite 데이터베이스에 저장"""
db_name = f"{self.base_filename}.db"
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
if not data:
print("⚠️ 저장할 데이터가 없습니다.")
return
# 테이블 생성 (첫 번째 데이터 기준)
first_item = data[0]
columns = list(first_item.keys())
# 테이블 생성 쿼리
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{', '.join([f'{col} TEXT' for col in columns])},
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_query)
# 데이터 삽입
for item in data:
placeholders = ','.join(['?'] * len(columns))
insert_query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
values = [item.get(col, '') for col in columns]
cursor.execute(insert_query, values)
conn.commit()
conn.close()
print(f"✅ 데이터베이스 저장 완료: {db_name} (테이블: {table_name})")
return db_name
# 실제 스크래핑 및 저장 예제
def scrape_and_save_example():
"""스크래핑 후 데이터 저장 예제"""
print("=== 데이터 수집 중 ===")
# 샘플 데이터 스크래핑 (JSONPlaceholder API 사용)
posts_url = 'https://jsonplaceholder.typicode.com/posts'
users_url = 'https://jsonplaceholder.typicode.com/users'
posts_response = requests.get(posts_url)
users_response = requests.get(users_url)
posts = posts_response.json()[:5] # 처음 5개만
users = users_response.json()[:5] # 처음 5개만
# 데이터 가공
processed_data = []
for post in posts:
user = next((u for u in users if u['id'] == post['userId']), None)
processed_data.append({
'post_id': post['id'],
'title': post['title'],
'body': post['body'][:100] + '...',
'author': user['name'] if user else 'Unknown',
'author_email': user['email'] if user else 'N/A',
'scraped_at': datetime.now().isoformat()
})
print(f"✅ {len(processed_data)}개 데이터 수집 완료")
# 데이터 저장
storage = DataStorage('blog_posts')
print("\n=== 다양한 형식으로 저장 ===")
# JSON으로 저장
storage.save_to_json(processed_data)
# CSV로 저장
storage.save_to_csv(processed_data)
# Excel로 저장
try:
storage.save_to_excel(processed_data, sheet_name='Posts')
except ImportError:
print("⚠️ openpyxl이 설치되지 않아 Excel 저장을 건너뜁니다.")
# 데이터베이스에 저장
storage.save_to_database(processed_data, table_name='blog_posts')
print("\n=== 저장된 데이터 샘플 ===")
print(json.dumps(processed_data[0], indent=2, ensure_ascii=False))
# 실행
scrape_and_save_example()
10. 고급 스크래핑 기법
복잡한 웹사이트를 스크래핑하기 위한 고급 기법들을 소개합니다. 멀티스레딩, 프록시 사용, 캡차 우회 등을 다룹니다.
import concurrent.futures
from threading import Lock
import hashlib
class AdvancedScraper:
"""고급 스크래핑 기능을 갖춘 클래스"""
def __init__(self):
self.session = requests.Session()
self.lock = Lock()
self.cache = {}
self.proxies = []
def setup_proxies(self, proxy_list):
"""프록시 설정"""
self.proxies = proxy_list
print(f"✅ {len(proxy_list)}개 프록시 설정 완료")
def get_with_proxy(self, url):
"""프록시를 사용한 요청"""
if not self.proxies:
return self.session.get(url)
proxy = random.choice(self.proxies)
proxies = {
'http': proxy,
'https': proxy
}
try:
return self.session.get(url, proxies=proxies, timeout=10)
except:
# 프록시 실패 시 직접 연결
return self.session.get(url)
def cached_request(self, url):
"""캐시된 요청"""
url_hash = hashlib.md5(url.encode()).hexdigest()
if url_hash in self.cache:
print(f"📦 캐시에서 로드: {url}")
return self.cache[url_hash]
response = self.get_with_proxy(url)
self.cache[url_hash] = response
return response
def parallel_scrape(self, urls, max_workers=5):
"""병렬 스크래핑"""
results = []
def scrape_url(url):
try:
response = self.cached_request(url)
soup = BeautifulSoup(response.content, 'html.parser')
return {
'url': url,
'title': soup.title.text if soup.title else '',
'status': response.status_code,
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(scrape_url, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
result = future.result()
with self.lock:
results.append(result)
print(f"{'✅' if result['success'] else '❌'} {result['url']}")
return results
def handle_pagination(self, base_url, max_pages=5):
"""페이지네이션 처리"""
all_data = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
print(f"📄 페이지 {page} 스크래핑 중...")
try:
response = self.cached_request(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 페이지별 데이터 추출 (예시)
items = soup.find_all('div', class_='item')
if not items:
print(f"페이지 {page}에 데이터가 없습니다. 중단.")
break
for item in items:
all_data.append({
'page': page,
'text': item.get_text(strip=True)
})
# 다음 페이지 링크 확인
next_link = soup.find('a', text='Next')
if not next_link:
print("마지막 페이지입니다.")
break
except Exception as e:
print(f"페이지 {page} 에러: {e}")
break
# 페이지 간 지연
time.sleep(random.uniform(1, 2))
return all_data
# 고급 기능 테스트
def test_advanced_features():
"""고급 기능 테스트"""
scraper = AdvancedScraper()
# 1. 병렬 스크래핑 테스트
print("=== 병렬 스크래핑 테스트 ===")
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/html',
'https://httpbin.org/json',
'https://httpbin.org/xml'
]
start_time = time.time()
results = scraper.parallel_scrape(urls, max_workers=3)
elapsed_time = time.time() - start_time
print(f"\n완료 시간: {elapsed_time:.2f}초")
print(f"성공: {sum(1 for r in results if r['success'])} / {len(results)}")
# 2. 폼 데이터 제출
print("\n=== 폼 데이터 제출 ===")
form_data = {
'username': 'test_user',
'password': 'secure_password',
'remember': 'on'
}
form_response = requests.post(
'https://httpbin.org/post',
data=form_data
)
print("제출된 폼 데이터:")
print(json.dumps(form_response.json()['form'], indent=2))
# 3. 파일 다운로드
print("\n=== 파일 다운로드 ===")
def download_file(url, filename):
"""파일 다운로드 함수"""
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
progress = (downloaded / total_size) * 100 if total_size > 0 else 0
print(f"\r다운로드 진행률: {progress:.1f}%", end='')
print(f"\n✅ 다운로드 완료: {filename}")
# 작은 이미지 다운로드 예제
image_url = 'https://httpbin.org/image/png'
download_file(image_url, 'test_image.png')
# 실행
test_advanced_features()
11. 실전 예제: 뉴스 사이트 스크래핑
실제 뉴스 사이트를 스크래핑하는 예제를 통해 지금까지 배운 내용을 종합적으로 활용해봅니다.
class NewsScraperExample:
"""뉴스 스크래핑 예제 클래스 (실제 사이트가 아닌 시뮬레이션)"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.articles = []
def create_sample_html(self):
"""샘플 뉴스 HTML 생성"""
html = """
<html>
<head><title>Tech News Today</title></head>
<body>
<div class="news-container">
<article class="news-item">
<h2 class="title">
<a href="/news/1">Python 4.0 출시 예정: 새로운 기능 소개</a>
</h2>
<div class="meta">
<span class="author">김기자</span>
<span class="date">2024-01-15</span>
<span class="category">프로그래밍</span>
</div>
<p class="summary">Python 4.0이 곧 출시될 예정입니다.
새로운 버전에서는 성능이 크게 향상되고...</p>
<div class="tags">
<span class="tag">Python</span>
<span class="tag">Programming</span>
</div>
</article>
<article class="news-item">
<h2 class="title">
<a href="/news/2">AI가 변화시키는 웹 개발의 미래</a>
</h2>
<div class="meta">
<span class="author">이기자</span>
<span class="date">2024-01-14</span>
<span class="category">AI</span>
</div>
<p class="summary">인공지능 기술이 웹 개발 분야에 혁명을 일으키고 있습니다...</p>
<div class="tags">
<span class="tag">AI</span>
<span class="tag">WebDev</span>
<span class="tag">Future</span>
</div>
</article>
<article class="news-item">
<h2 class="title">
<a href="/news/3">클라우드 보안의 중요성과 베스트 프랙티스</a>
</h2>
<div class="meta">
<span class="author">박기자</span>
<span class="date">2024-01-13</span>
<span class="category">보안</span>
</div>
<p class="summary">클라우드 환경에서의 보안이 점점 중요해지고 있습니다...</p>
<div class="tags">
<span class="tag">Cloud</span>
<span class="tag">Security</span>
</div>
</article>
</div>
<div class="pagination">
<a href="?page=1" class="active">1</a>
<a href="?page=2">2</a>
<a href="?page=3">3</a>
<a href="?page=2" class="next">다음</a>
</div>
</body>
</html>
"""
return html
def parse_article(self, article_element):
"""기사 요소 파싱"""
article_data = {}
# 제목과 링크
title_elem = article_element.find('h2', class_='title')
if title_elem:
link = title_elem.find('a')
article_data['title'] = link.text.strip() if link else ''
article_data['url'] = link.get('href', '') if link else ''
# 메타 정보
meta = article_element.find('div', class_='meta')
if meta:
author = meta.find('span', class_='author')
date = meta.find('span', class_='date')
category = meta.find('span', class_='category')
article_data['author'] = author.text.strip() if author else ''
article_data['date'] = date.text.strip() if date else ''
article_data['category'] = category.text.strip() if category else ''
# 요약
summary = article_element.find('p', class_='summary')
article_data['summary'] = summary.text.strip() if summary else ''
# 태그
tags = article_element.find_all('span', class_='tag')
article_data['tags'] = [tag.text.strip() for tag in tags]
return article_data
def scrape_news_list(self):
"""뉴스 목록 스크래핑"""
print("📰 뉴스 목록 스크래핑 시작...")
# 샘플 HTML 사용 (실제로는 requests.get()을 사용)
html = self.create_sample_html()
soup = BeautifulSoup(html, 'html.parser')
# 모든 기사 찾기
articles = soup.find_all('article', class_='news-item')
for article in articles:
article_data = self.parse_article(article)
article_data['scraped_at'] = datetime.now().isoformat()
self.articles.append(article_data)
print(f" ✅ {article_data['title'][:30]}...")
print(f"\n총 {len(self.articles)}개 기사 수집 완료")
return self.articles
def analyze_data(self):
"""수집된 데이터 분석"""
if not self.articles:
print("분석할 데이터가 없습니다.")
return
print("\n📊 데이터 분석 결과:")
print("=" * 50)
# 카테고리별 분포
categories = {}
for article in self.articles:
cat = article.get('category', '기타')
categories[cat] = categories.get(cat, 0) + 1
print("\n카테고리별 기사 수:")
for cat, count in categories.items():
print(f" • {cat}: {count}개")
# 작성자별 분포
authors = {}
for article in self.articles:
author = article.get('author', 'Unknown')
authors[author] = authors.get(author, 0) + 1
print("\n작성자별 기사 수:")
for author, count in authors.items():
print(f" • {author}: {count}개")
# 인기 태그
all_tags = []
for article in self.articles:
all_tags.extend(article.get('tags', []))
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
print("\n인기 태그:")
for tag, count in sorted(tag_counts.items(), key=lambda x: x[1], reverse=True):
print(f" • #{tag}: {count}회")
def export_results(self):
"""결과 내보내기"""
if not self.articles:
print("내보낼 데이터가 없습니다.")
return
# JSON으로 저장
filename = f"news_articles_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"\n💾 데이터 저장 완료: {filename}")
# 요약 리포트 생성
report = f"""
뉴스 스크래핑 리포트
========================
스크래핑 일시: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
수집된 기사 수: {len(self.articles)}
기사 목록:
------------------------
"""
for i, article in enumerate(self.articles, 1):
report += f"""
{i}. {article['title']}
작성자: {article['author']}
날짜: {article['date']}
카테고리: {article['category']}
태그: {', '.join(article['tags'])}
"""
report_filename = f"news_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_filename, 'w', encoding='utf-8') as f:
f.write(report)
print(f"📄 리포트 저장 완료: {report_filename}")
# 실행
print("=== 뉴스 스크래핑 예제 ===")
news_scraper = NewsScraperExample()
news_scraper.scrape_news_list()
news_scraper.analyze_data()
news_scraper.export_results()
12. 종합 프로젝트: 전자상거래 사이트 모니터링
지금까지 배운 모든 내용을 종합하여 전자상거래 사이트의 제품 정보를 모니터링하는 완전한 스크래핑 시스템을 구축합니다.
# 종합 프로젝트: 전자상거래 제품 모니터링 시스템
import requests
from bs4 import BeautifulSoup
import json
import csv
import time
import sqlite3
from datetime import datetime, timedelta
import hashlib
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EcommercePriceMonitor:
"""전자상거래 가격 모니터링 시스템"""
def __init__(self, db_name='price_monitor.db'):
self.db_name = db_name
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'ko-KR,ko;q=0.9,en;q=0.8'
})
self.init_database()
self.products = []
self.price_changes = []
def init_database(self):
"""데이터베이스 초기화"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 제품 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT UNIQUE,
name TEXT,
url TEXT,
category TEXT,
brand TEXT,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 가격 히스토리 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
price REAL,
original_price REAL,
discount_percent REAL,
availability TEXT,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (product_id)
)
''')
# 알림 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
alert_type TEXT,
message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
print("✅ 데이터베이스 초기화 완료")
def create_sample_product_html(self, product_id, name, price, original_price=None):
"""샘플 제품 HTML 생성 (테스트용)"""
discount = 0
if original_price:
discount = int((1 - price / original_price) * 100)
html = f"""
<html>
<head><title>{name} - 온라인 쇼핑몰</title></head>
<body>
<div class="product-detail">
<h1 class="product-name">{name}</h1>
<div class="product-id">상품코드: {product_id}</div>
<div class="price-info">
{'<span class="original-price">' + f'{original_price:,}원' + '</span>' if original_price else ''}
<span class="current-price">{price:,}원</span>
{f'<span class="discount">{discount}% 할인</span>' if discount > 0 else ''}
</div>
<div class="product-meta">
<span class="brand">브랜드: TechCorp</span>
<span class="category">카테고리: 전자제품</span>
<span class="availability">재고: 있음</span>
</div>
<div class="product-specs">
<h3>제품 사양</h3>
<ul>
<li>모델명: TC-2024</li>
<li>제조사: TechCorp</li>
<li>보증기간: 1년</li>
</ul>
</div>
<div class="reviews">
<div class="rating">평점: 4.5/5.0</div>
<div class="review-count">리뷰: 234개</div>
</div>
</div>
</body>
</html>
"""
return html
def scrape_product(self, url, product_id=None):
"""제품 정보 스크래핑"""
try:
# 실제 환경에서는 requests.get(url) 사용
# 여기서는 샘플 HTML 사용
if 'product1' in url:
html = self.create_sample_product_html('PROD001', '스마트 워치 프로', 299000, 399000)
elif 'product2' in url:
html = self.create_sample_product_html('PROD002', '무선 이어폰 X', 159000, 199000)
else:
html = self.create_sample_product_html('PROD003', '태블릿 PC 10인치', 549000)
soup = BeautifulSoup(html, 'html.parser')
# 제품 정보 추출
product_data = {}
# 제품명
name_elem = soup.find('h1', class_='product-name')
product_data['name'] = name_elem.text.strip() if name_elem else 'Unknown'
# 제품 ID
id_elem = soup.find('div', class_='product-id')
if id_elem:
product_data['product_id'] = id_elem.text.replace('상품코드:', '').strip()
elif product_id:
product_data['product_id'] = product_id
else:
product_data['product_id'] = hashlib.md5(url.encode()).hexdigest()[:8]
# 가격 정보
current_price_elem = soup.find('span', class_='current-price')
if current_price_elem:
price_text = current_price_elem.text.replace(',', '').replace('원', '')
product_data['price'] = float(price_text)
original_price_elem = soup.find('span', class_='original-price')
if original_price_elem:
original_text = original_price_elem.text.replace(',', '').replace('원', '')
product_data['original_price'] = float(original_text)
else:
product_data['original_price'] = product_data.get('price', 0)
# 할인율 계산
if product_data.get('original_price') > product_data.get('price', 0):
discount = (1 - product_data['price'] / product_data['original_price']) * 100
product_data['discount_percent'] = round(discount, 1)
else:
product_data['discount_percent'] = 0
# 기타 정보
brand_elem = soup.find('span', class_='brand')
product_data['brand'] = brand_elem.text.replace('브랜드:', '').strip() if brand_elem else ''
category_elem = soup.find('span', class_='category')
product_data['category'] = category_elem.text.replace('카테고리:', '').strip() if category_elem else ''
availability_elem = soup.find('span', class_='availability')
product_data['availability'] = availability_elem.text.replace('재고:', '').strip() if availability_elem else 'Unknown'
product_data['url'] = url
product_data['scraped_at'] = datetime.now().isoformat()
return product_data
except Exception as e:
print(f"❌ 스크래핑 실패 ({url}): {e}")
return None
def save_product_data(self, product_data):
"""제품 데이터 저장"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 제품 정보 저장/업데이트
cursor.execute('''
INSERT OR REPLACE INTO products (product_id, name, url, category, brand)
VALUES (?, ?, ?, ?, ?)
''', (
product_data['product_id'],
product_data['name'],
product_data['url'],
product_data.get('category', ''),
product_data.get('brand', '')
))
# 가격 히스토리 저장
cursor.execute('''
INSERT INTO price_history (product_id, price, original_price, discount_percent, availability)
VALUES (?, ?, ?, ?, ?)
''', (
product_data['product_id'],
product_data['price'],
product_data.get('original_price', product_data['price']),
product_data.get('discount_percent', 0),
product_data.get('availability', 'Unknown')
))
conn.commit()
conn.close()
def check_price_changes(self, product_id):
"""가격 변동 확인"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 최근 2개 가격 정보 가져오기
cursor.execute('''
SELECT price, checked_at FROM price_history
WHERE product_id = ?
ORDER BY checked_at DESC
LIMIT 2
''', (product_id,))
prices = cursor.fetchall()
conn.close()
if len(prices) >= 2:
current_price = prices[0][0]
previous_price = prices[1][0]
if current_price < previous_price:
change_percent = ((previous_price - current_price) / previous_price) * 100
return {
'type': 'price_drop',
'previous': previous_price,
'current': current_price,
'change_percent': round(change_percent, 1)
}
elif current_price > previous_price:
change_percent = ((current_price - previous_price) / previous_price) * 100
return {
'type': 'price_increase',
'previous': previous_price,
'current': current_price,
'change_percent': round(change_percent, 1)
}
return None
def monitor_products(self, product_urls):
"""제품 목록 모니터링"""
print("🔍 제품 모니터링 시작...")
print("=" * 50)
for url in product_urls:
print(f"\n📦 스크래핑: {url}")
# 제품 정보 스크래핑
product_data = self.scrape_product(url)
if product_data:
self.products.append(product_data)
# 데이터 저장
self.save_product_data(product_data)
# 가격 변동 확인
price_change = self.check_price_changes(product_data['product_id'])
if price_change:
self.price_changes.append({
'product_id': product_data['product_id'],
'product_name': product_data['name'],
'change': price_change
})
if price_change['type'] == 'price_drop':
print(f" 💰 가격 하락! {price_change['change_percent']}% ↓")
print(f" {price_change['previous']:,}원 → {price_change['current']:,}원")
else:
print(f" 📈 가격 상승: {price_change['change_percent']}% ↑")
print(f" ✅ {product_data['name']}")
print(f" 현재가: {product_data['price']:,}원")
if product_data.get('discount_percent', 0) > 0:
print(f" 할인율: {product_data['discount_percent']}%")
# 요청 간 지연
time.sleep(1)
print(f"\n✅ 모니터링 완료: {len(self.products)}개 제품")
def generate_report(self):
"""리포트 생성"""
print("\n📊 모니터링 리포트")
print("=" * 50)
if self.price_changes:
print("\n💵 가격 변동 알림:")
for change in self.price_changes:
print(f"\n• {change['product_name']}")
change_info = change['change']
if change_info['type'] == 'price_drop':
print(f" 🔻 {change_info['change_percent']}% 하락")
else:
print(f" 🔺 {change_info['change_percent']}% 상승")
print(f" {change_info['previous']:,}원 → {change_info['current']:,}원")
# 통계 정보
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 전체 제품 수
cursor.execute('SELECT COUNT(DISTINCT product_id) FROM products')
total_products = cursor.fetchone()[0]
# 오늘 체크한 제품 수
today = datetime.now().strftime('%Y-%m-%d')
cursor.execute('''
SELECT COUNT(DISTINCT product_id) FROM price_history
WHERE DATE(checked_at) = ?
''', (today,))
today_checked = cursor.fetchone()[0]
# 평균 할인율
cursor.execute('''
SELECT AVG(discount_percent) FROM price_history
WHERE DATE(checked_at) = ?
''', (today,))
avg_discount = cursor.fetchone()[0] or 0
conn.close()
print(f"\n📈 통계:")
print(f"• 전체 모니터링 제품: {total_products}개")
print(f"• 오늘 체크한 제품: {today_checked}개")
print(f"• 평균 할인율: {avg_discount:.1f}%")
# CSV 파일로 내보내기
if self.products:
csv_filename = f"price_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(csv_filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['product_id', 'name', 'price', 'original_price',
'discount_percent', 'brand', 'category', 'availability']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for product in self.products:
writer.writerow({k: product.get(k, '') for k in fieldnames})
print(f"\n💾 데이터 저장: {csv_filename}")
# 메인 실행 함수
def main():
"""메인 실행 함수"""
print("🚀 전자상거래 가격 모니터링 시스템 시작")
print("=" * 60)
# 모니터링할 제품 URL 목록 (예제)
product_urls = [
'https://example-shop.com/product1',
'https://example-shop.com/product2',
'https://example-shop.com/product3'
]
# 모니터 초기화
monitor = EcommercePriceMonitor()
# 제품 모니터링 실행
monitor.monitor_products(product_urls)
# 리포트 생성
monitor.generate_report()
print("\n✨ 모니터링 시스템 종료")
# 스케줄링 예제 (실제 사용 시)
def scheduled_monitoring():
"""정기적인 모니터링 실행 (예: 매일 오전 9시)"""
import schedule
def job():
print(f"\n⏰ 정기 모니터링 시작: {datetime.now()}")
main()
# 매일 오전 9시에 실행
schedule.every().day.at("09:00").do(job)
print("📅 스케줄러 시작 (매일 오전 9시 실행)")
while True:
schedule.run_pending()
time.sleep(60)
# 프로그램 실행
if __name__ == "__main__":
main()
# 스케줄링을 원하는 경우 아래 주석 해제
# scheduled_monitoring()
마무리
이 튜토리얼에서는 Python의 Requests와 BeautifulSoup를 사용한 웹 스크래핑의 기초부터 고급 기법까지 포괄적으로 다루었습니다. HTTP 요청 처리, HTML 파싱, 데이터 추출, 에러 처리, 그리고 실전 프로젝트까지 웹 스크래핑에 필요한 모든 핵심 개념을 학습했습니다.
- 항상 robots.txt를 확인하고 준수하세요
- 적절한 지연 시간을 두어 서버에 부담을 주지 마세요
- User-Agent를 설정하여 봇임을 명시하세요
- 에러 처리와 재시도 로직을 구현하세요
- 수집한 데이터를 체계적으로 저장하고 관리하세요
- 가능하면 공식 API를 우선적으로 사용하세요
- 개인정보와 저작권을 존중하세요